Commit a4ee590f authored by Zhu Yong's avatar Zhu Yong
Browse files

* Add operation GetNext, GetPrevious, GetKeyRange, Delete, Put.

* Updated Status to include status message from kinetic drive.
* Network service add fatal status, can't do any operation if in fatal
parent ec1c58e7
Loading
Loading
Loading
Loading
+65 −15
Original line number Diff line number Diff line
package kinetic

//kproto "github.com/yongzhy/kinetic-go/proto"
import (
	kproto "github.com/yongzhy/kinetic-go/proto"
)

type BlockConnection struct {
	nbc *NonBlockConnection
@@ -16,40 +18,88 @@ func NewBlockConnection(op ClientOptions) (*BlockConnection, error) {
	return &BlockConnection{nbc: nbc}, err
}

func (conn *BlockConnection) NoOp() error {
func (conn *BlockConnection) NoOp() (Status, error) {
	callback := &GenericCallback{}
	h := NewMessageHandler(callback)
	if h == nil {
		klog.Error("Message Handler for NoOp Failure")
	}
	if conn == nil {
		klog.Error("Connection nil")
	} else if conn.nbc == nil {
		klog.Error("Nonblock Connection nil")
	}
	conn.nbc.NoOp(h)

	for callback.Done() == false {
		conn.nbc.Run()
	}

	return nil
	return callback.Status(), nil
}

func (conn *BlockConnection) Get(key []byte) (Record, error) {
func (conn *BlockConnection) get(key []byte, getCmd kproto.Command_MessageType) (Record, Status, error) {
	callback := &GetCallback{}
	h := NewMessageHandler(callback)

	err := conn.nbc.Get(key, h)
	var err error = nil
	switch getCmd {
	case kproto.Command_GET:
		err = conn.nbc.Get(key, h)
	case kproto.Command_GETPREVIOUS:
		err = conn.nbc.GetPrevious(key, h)
	case kproto.Command_GETNEXT:
		err = conn.nbc.GetNext(key, h)
	}
	if err != nil {
		return Record{}, err
		return Record{}, callback.Status(), err
	}

	for callback.Done() == false {
		conn.nbc.Run()
	}

	return callback.Entry, callback.Status(), nil
}

func (conn *BlockConnection) Get(key []byte) (Record, Status, error) {
	return conn.get(key, kproto.Command_GET)
}

func (conn *BlockConnection) GetNext(key []byte) (Record, Status, error) {
	return conn.get(key, kproto.Command_GETNEXT)
}

func (conn *BlockConnection) GetPrevious(key []byte) (Record, Status, error) {
	return conn.get(key, kproto.Command_GETPREVIOUS)
}

func (conn *BlockConnection) GetKeyRange(r *KeyRange) ([][]byte, Status, error) {
	callback := &GetKeyRangeCallback{}
	h := NewMessageHandler(callback)
	conn.nbc.GetKeyRange(r, h)

	for callback.Done() == false {
		conn.nbc.Run()
	}

	return callback.Keys, callback.Status(), nil
}

func (conn *BlockConnection) Delete(entry *Record) (Status, error) {
	callback := &GenericCallback{}
	h := NewMessageHandler(callback)
	conn.nbc.Delete(entry, h)

	for callback.Done() == false {
		conn.nbc.Run()
	}

	return callback.Status(), nil
}

func (conn *BlockConnection) Put(entry *Record) (Status, error) {
	callback := &GenericCallback{}
	h := NewMessageHandler(callback)
	conn.nbc.Put(entry, h)

	for callback.Done() == false {
		conn.nbc.Run()
	}

	return callback.Record(), nil
	return callback.Status(), nil
}

func (conn *BlockConnection) Close() {
+20 −8
Original line number Diff line number Diff line
@@ -11,6 +11,7 @@ type Callback interface {
	Status() Status
}

// Generic Callback, for Message which doesn't require data from Kinetic drive.
type GenericCallback struct {
	done   bool
	status Status
@@ -36,21 +37,32 @@ func (c *GenericCallback) Status() Status {
	return c.status
}

// Callback for Command_GET Message
type GetCallback struct {
	GenericCallback
	record Record
	Entry Record
}

func (c *GetCallback) Success(resp *kproto.Command, value []byte) {
	c.GenericCallback.Success(resp, value)
	c.record.Key = resp.GetBody().GetKeyValue().GetKey()
	c.record.Tag = resp.GetBody().GetKeyValue().GetTag()
	c.record.Version = resp.GetBody().GetKeyValue().GetDbVersion()
	c.record.Algo = convertAlgoFromProto(resp.GetBody().GetKeyValue().GetAlgorithm())
	c.Entry.Key = resp.GetBody().GetKeyValue().GetKey()
	c.Entry.Tag = resp.GetBody().GetKeyValue().GetTag()
	c.Entry.Version = resp.GetBody().GetKeyValue().GetDbVersion()
	c.Entry.Algo = convertAlgoFromProto(resp.GetBody().GetKeyValue().GetAlgorithm())

	c.record.Value = value
	c.Entry.Value = value
}

func (c *GetCallback) Record() Record {
	return c.record
// Callback for Command_GETKEYRANGE Message
type GetKeyRangeCallback struct {
	GenericCallback
	Keys [][]byte
}

func (c *GetKeyRangeCallback) Success(resp *kproto.Command, value []byte) {
	c.GenericCallback.Success(resp, value)
	c.Keys = resp.GetBody().GetRange().GetKeys()
}

// Callback for Command_GETVERSION Message
// Callback for Command_GETLOG Message
+67 −2
Original line number Diff line number Diff line
@@ -27,9 +27,74 @@ func TestMain(m *testing.M) {
}

func TestNonBlockNoOp(t *testing.T) {
	blockConn.NoOp()
	status, err := blockConn.NoOp()
	if err != nil || status.Code != OK {
		t.Fatal("Nonblocking NoOp Failure")
	}
}

func TestNonBlockGet(t *testing.T) {
	blockConn.Get([]byte("object000"))
	_, status, err := blockConn.Get([]byte("object000"))
	if err != nil || status.Code != OK {
		t.Fatal("Nonblocking Get Failure")
	}
}

func TestNonBlockGetNext(t *testing.T) {
	_, status, err := blockConn.GetNext([]byte("object000"))
	if err != nil || status.Code != OK {
		t.Fatal("Nonblocking GetNext Failure")
	}
}

func TestNonBlockGetPrevious(t *testing.T) {
	_, status, err := blockConn.GetPrevious([]byte("object000"))
	if err != nil || status.Code != OK {
		t.Fatal("Nonblocking GetPrevious Failure")
	}
}

func TestNonBlockPut(t *testing.T) {
	entry := Record{
		Key:   []byte("object001"),
		Value: []byte("ABCDEFG"),
		Sync:  SYNC_WRITETHROUGH,
		Algo:  ALGO_SHA1,
		Tag:   []byte(""),
		Force: true,
	}
	status, err := blockConn.Put(&entry)
	if err != nil || status.Code != OK {
		t.Fatal("Nonblocking Put Failure")
	}
}

func TestNonBlockDelete(t *testing.T) {
	entry := Record{
		Key:   []byte("object001"),
		Sync:  SYNC_WRITETHROUGH,
		Algo:  ALGO_SHA1,
		Force: true,
	}
	status, err := blockConn.Delete(&entry)
	if err != nil || status.Code != OK {
		t.Fatal("Nonblocking Delete Failure")
	}
}

func TestNonBlockGetKeyRange(t *testing.T) {
	r := KeyRange{
		StartKey:          []byte("object000"),
		EndKey:            []byte("object999"),
		StartKeyInclusive: true,
		EndKeyInclusive:   true,
		Max:               5,
	}
	keys, status, err := blockConn.GetKeyRange(&r)
	if err != nil || status.Code != OK {
		t.Fatal("Nonblocking GetKeyRange Failure: ", status.Error())
	}
	for k, key := range keys {
		t.Logf("key[%d] = %s", k, string(key))
	}
}
+109 −81
Original line number Diff line number Diff line
@@ -33,12 +33,24 @@ const (
	ALGO_CRC64             Algorithm = 5
)

type Synchronization int32

const (
	SYNC_INVALID_SYNCHRONIZATION Synchronization = -1
	SYNC_WRITETHROUGH            Synchronization = 1
	SYNC_WRITEBACK               Synchronization = 2
	SYNC_FLUSH                   Synchronization = 3
)

type Record struct {
	Key      []byte
	Value    []byte
	Version  []byte
	Tag      []byte
	Algo     Algorithm
	Sync     Synchronization
	Force    bool
	MetaOnly bool
}

type KeyRange struct {
@@ -47,27 +59,7 @@ type KeyRange struct {
	StartKeyInclusive bool
	EndKeyInclusive   bool
	Reverse           bool
	Max               uint
}

type Client interface {
	Nop() error
	Version() error
	Put(key, value []byte, h *MessageHandler) error
	Get(key []byte, h *MessageHandler) ([]byte, error)
	GetNext() error
	GetPrevious() error
	Flush(h *MessageHandler) error
	Delete(key []byte, h *MessageHandler) error
	GetRange(r *KeyRange, h *MessageHandler) ([][]byte, error)

	SetErasePin(old, new []byte, h *MessageHandler) error
	SecureErase(pin []byte) error
	InstantErase(pin []byte) error
	SetLockPin(old, new []byte) error
	Lock(pin []byte) error
	UnLock(pin []byte) error
	GetLog() error
	Max               int32
}

func convertAlgoToProto(a Algorithm) kproto.Command_Algorithm {
@@ -108,6 +100,36 @@ func convertAlgoFromProto(a kproto.Command_Algorithm) Algorithm {
	return ret
}

func convertSyncToProto(sync Synchronization) kproto.Command_Synchronization {
	ret := kproto.Command_INVALID_SYNCHRONIZATION
	switch sync {
	case SYNC_INVALID_SYNCHRONIZATION:
		ret = kproto.Command_INVALID_SYNCHRONIZATION
	case SYNC_WRITETHROUGH:
		ret = kproto.Command_WRITETHROUGH
	case SYNC_WRITEBACK:
		ret = kproto.Command_WRITEBACK
	case SYNC_FLUSH:
		ret = kproto.Command_FLUSH
	}
	return ret
}

func convertSyncFromProto(sync kproto.Command_Synchronization) Synchronization {
	ret := SYNC_INVALID_SYNCHRONIZATION
	switch sync {
	case kproto.Command_INVALID_SYNCHRONIZATION:
		ret = SYNC_INVALID_SYNCHRONIZATION
	case kproto.Command_WRITETHROUGH:
		ret = SYNC_WRITETHROUGH
	case kproto.Command_WRITEBACK:
		ret = SYNC_WRITEBACK
	case kproto.Command_FLUSH:
		ret = SYNC_FLUSH
	}
	return ret
}

func convertStatusCodeToProto(s StatusCode) kproto.Command_Status_StatusCode {
	ret := kproto.Command_Status_INVALID_STATUS_CODE
	switch s {
@@ -212,6 +234,11 @@ func convertStatusCodeFromProto(s kproto.Command_Status_StatusCode) StatusCode {

func getStatusFromProto(cmd *kproto.Command) Status {
	code := convertStatusCodeFromProto(cmd.GetStatus().GetCode())
	msg := cmd.GetStatus().GetStatusMessage()

	return Status{code, msg}

	/*
		switch code {
		case CLIENT_IO_ERROR:
			return Status{code, "IO error"}
@@ -267,4 +294,5 @@ func getStatusFromProto(cmd *kproto.Command) Status {
		default:
			return Status{code, "Internal Error"}
		}
	*/
}
+72 −2
Original line number Diff line number Diff line
@@ -30,10 +30,10 @@ func (conn *NonBlockConnection) NoOp(h *MessageHandler) error {
	return err
}

func (conn *NonBlockConnection) Get(key []byte, h *MessageHandler) error {
func (conn *NonBlockConnection) get(key []byte, getType kproto.Command_MessageType, h *MessageHandler) error {
	msg := newMessage(kproto.Message_HMACAUTH)

	cmd := newCommand(kproto.Command_GET)
	cmd := newCommand(getType)
	cmd.Body = &kproto.Command_Body{
		KeyValue: &kproto.Command_KeyValue{
			Key: key,
@@ -44,6 +44,76 @@ func (conn *NonBlockConnection) Get(key []byte, h *MessageHandler) error {
	return err
}

func (conn *NonBlockConnection) Get(key []byte, h *MessageHandler) error {
	return conn.get(key, kproto.Command_GET, h)
}

func (conn *NonBlockConnection) GetNext(key []byte, h *MessageHandler) error {
	return conn.get(key, kproto.Command_GETNEXT, h)
}

func (conn *NonBlockConnection) GetPrevious(key []byte, h *MessageHandler) error {
	return conn.get(key, kproto.Command_GETPREVIOUS, h)
}

func (conn *NonBlockConnection) GetKeyRange(r *KeyRange, h *MessageHandler) error {
	msg := newMessage(kproto.Message_HMACAUTH)

	cmd := newCommand(kproto.Command_GETKEYRANGE)
	cmd.Body = &kproto.Command_Body{
		Range: &kproto.Command_Range{
			StartKey:          r.StartKey,
			EndKey:            r.EndKey,
			StartKeyInclusive: &r.StartKeyInclusive,
			EndKeyInclusive:   &r.EndKeyInclusive,
			MaxReturned:       &r.Max,
			Reverse:           &r.Reverse,
		},
	}

	err := conn.service.submit(msg, cmd, nil, h)
	return err
}

func (conn *NonBlockConnection) Delete(entry *Record, h *MessageHandler) error {
	msg := newMessage(kproto.Message_HMACAUTH)
	cmd := newCommand(kproto.Command_DELETE)

	sync := convertSyncToProto(entry.Sync)
	//algo := convertAlgoToProto(entry.Algo)
	cmd.Body = &kproto.Command_Body{
		KeyValue: &kproto.Command_KeyValue{
			Key:             entry.Key,
			Force:           &entry.Force,
			Synchronization: &sync,
			//Algorithm:       &algo,
		},
	}

	err := conn.service.submit(msg, cmd, nil, h)
	return err
}

func (conn *NonBlockConnection) Put(entry *Record, h *MessageHandler) error {
	msg := newMessage(kproto.Message_HMACAUTH)
	cmd := newCommand(kproto.Command_PUT)

	sync := convertSyncToProto(entry.Sync)
	algo := convertAlgoToProto(entry.Algo)
	cmd.Body = &kproto.Command_Body{
		KeyValue: &kproto.Command_KeyValue{
			Key:             entry.Key,
			Force:           &entry.Force,
			Synchronization: &sync,
			Algorithm:       &algo,
			Tag:             entry.Tag,
		},
	}

	err := conn.service.submit(msg, cmd, entry.Value, h)
	return err
}

func (conn *NonBlockConnection) Run() error {
	return conn.service.listen()
}
Loading