Commit 296b9bc7 authored by Zhu Yong's avatar Zhu Yong
Browse files

add handler, callback and Get operation (not work yet)

parent a564a962
Loading
Loading
Loading
Loading

blockconnection.go

0 → 100644
+53 −0
Original line number Diff line number Diff line
package kinetic

//kproto "github.com/yongzhy/kinetic-go/proto"

type BlockConnection struct {
	nbc *NonBlockConnection
}

func NewBlockConnection(op ClientOptions) (*BlockConnection, error) {
	nbc, err := NewNonBlockConnection(op)
	if err != nil {
		return nil, err
	}

	return &BlockConnection{nbc: nbc}, err
}

func (conn *BlockConnection) NoOp() error {
	callback := &GenericCallback{}
	h := NewMessageHandler(callback)
	conn.nbc.Nop(h)

	for callback.Done() == false {
		conn.nbc.Run()
	}

	return nil
}

func (conn *BlockConnection) Get(key []byte) (Record, error) {
	callback := &GetCallback{}
	h := NewMessageHandler(callback)

	err := conn.nbc.Get(key, h)
	if err != nil {
		return Record{}, err
	}

	for i := 0; i < 1000; i++ {
		if callback.Done() == false {
			conn.nbc.Run()
		}
	}
	//for callback.Done() == false {
	//		conn.nbc.Run()
	//}

	return callback.Record(), nil
}

func (conn *BlockConnection) Close() {
	conn.nbc.Close()
}

callback.go

0 → 100644
+54 −0
Original line number Diff line number Diff line
package kinetic

import (
	kproto "github.com/yongzhy/kinetic-go/proto"
)

type Callback interface {
	Success(resp *kproto.Command, value []byte)
	Failure(status *Status)
	Done() bool
	Status() Status
}

type GenericCallback struct {
	done   bool
	status Status
}

func (c *GenericCallback) Success(resp *kproto.Command, value []byte) {
	c.done = true
	c.status = Status{Code: OK}
	klog.Info("Callback Success")
}

func (c *GenericCallback) Failure(status *Status) {
	c.done = true
	c.status = *status
	klog.Info("Callback Failure")
}

func (c *GenericCallback) Done() bool {
	return c.done
}

func (c *GenericCallback) Status() Status {
	return c.status
}

type GetCallback struct {
	GenericCallback
	record Record
}

func (c *GetCallback) Success(resp *kproto.Command, value []byte) {
	c.GenericCallback.Success(resp, value)
	c.record.Key = resp.GetBody().GetKeyValue().GetKey()
	c.record.Value = value
	klog.Info("Get Operation Success")
	klog.Info("%v", c.record)
}

func (c *GetCallback) Record() Record {
	return c.record
}
+16 −2
Original line number Diff line number Diff line
@@ -6,7 +6,7 @@ import (
)

var (
	testConn *Connection
	testConn *BlockConnection
)

const testDevice string = "10.29.24.55"
@@ -26,10 +26,24 @@ func TestHandshake(t *testing.T) {
		Host: testDevice, Port: 8123,
		User: 1, Hmac: []byte("asfdasfd")}

	conn, err := NewConnection(option)
	conn, err := NewNonBlockConnection(option)
	if err != nil {
		t.Fatal("Handshake fail")
	}

	conn.Close()
}

func TestNonBlockGet(t *testing.T) {
	var option = ClientOptions{
		Host: testDevice, Port: 8123,
		User: 1, Hmac: []byte("asfdasfd")}

	conn, err := NewBlockConnection(option)
	if err != nil {
		t.Fatal("Handshake fail")
	}

	conn.Get([]byte("object000"))
	conn.Close()
}
+11 −37
Original line number Diff line number Diff line
@@ -4,45 +4,18 @@ import (
	kproto "github.com/yongzhy/kinetic-go/proto"
)

type Callback interface {
	Success()
	Failure()
	Done() bool
type MessageHandler struct {
	callback Callback
}

type MessageHandler interface {
	Handle(cmd *kproto.Command, value []byte) error
	Error()
	SetCallback(callback *Callback)
}

type SimpleCallback struct {
	done bool
}

func (c *SimpleCallback) Success() {
	c.done = true
}

func (c *SimpleCallback) Failure() {
	c.done = true
}

func (c *SimpleCallback) Done() bool {
	return c.done
}

type SimpleHandler struct {
	callback *Callback
}

func (h *SimpleHandler) Handle(cmd *kproto.Command, value []byte) error {
func (h *MessageHandler) Handle(cmd *kproto.Command, value []byte) error {
	if h.callback != nil {
		if cmd.Status != nil && cmd.Status.Code != nil {
			if cmd.GetStatus().GetCode() == kproto.Command_Status_SUCCESS {
				(*h.callback).Success()
				h.callback.Success(cmd, value)
			} else {
				(*h.callback).Failure()
				var status = Status{}
				h.callback.Failure(&status)
			}
		}

@@ -50,10 +23,11 @@ func (h *SimpleHandler) Handle(cmd *kproto.Command, value []byte) error {
	return nil
}

func (h *SimpleHandler) Error() {

func (h *MessageHandler) SetCallback(call Callback) {
	h.callback = call
}

func (h *SimpleHandler) SetCallback(call *Callback) {
	h.callback = call
func NewMessageHandler(call Callback) *MessageHandler {
	h := &MessageHandler{callback: call}
	return h
}
+53 −0
Original line number Diff line number Diff line
@@ -4,11 +4,11 @@ import (
	kproto "github.com/yongzhy/kinetic-go/proto"
)

type Connection struct {
type NonBlockConnection struct {
	service *networkService
}

func NewConnection(op ClientOptions) (*Connection, error) {
func NewNonBlockConnection(op ClientOptions) (*NonBlockConnection, error) {
	if op.Hmac == nil {
		klog.Panic("HMAC is required for ClientOptions")
	}
@@ -18,17 +18,36 @@ func NewConnection(op ClientOptions) (*Connection, error) {
		return nil, err
	}

	return &Connection{service}, nil
	return &NonBlockConnection{service}, nil
}

func (conn *Connection) Nop() error {
func (conn *NonBlockConnection) Nop(h *MessageHandler) error {
	msg := newMessage(kproto.Message_HMACAUTH)

	cmd := newCommand(kproto.Command_NOOP)

	err := conn.service.execute(msg, cmd, nil, nil)
	err := conn.service.submit(msg, cmd, nil, h)
	return err
}

func (conn *NonBlockConnection) Get(key []byte, h *MessageHandler) error {
	msg := newMessage(kproto.Message_HMACAUTH)

	cmd := newCommand(kproto.Command_GET)
	cmd.Body = &kproto.Command_Body{
		KeyValue: &kproto.Command_KeyValue{
			Key: key,
		},
	}

	err := conn.service.submit(msg, cmd, nil, h)
	return err
}

func (conn *Connection) Close() {
func (conn *NonBlockConnection) Run() error {
	return conn.service.listen()
}

func (conn *NonBlockConnection) Close() {
	conn.service.close()
}
Loading