Commit b329be24 authored by Zhu Yong's avatar Zhu Yong
Browse files

Merge branch 'develop'

Conflicts:
	utility.go
	utility_test.go
parents df347eba 5f9d900c
Loading
Loading
Loading
Loading

blockconnection.go

0 → 100644
+294 −0
Original line number Diff line number Diff line
package kinetic

import (
	kproto "github.com/yongzhy/kinetic-go/proto"
)

type BlockConnection struct {
	nbc *NonBlockConnection
}

// Helper function to establish block connection to device.
func NewBlockConnection(op ClientOptions) (*BlockConnection, error) {
	nbc, err := NewNonBlockConnection(op)
	if err != nil {
		klog.Error("Can't establish nonblocking connection")
		return nil, err
	}

	return &BlockConnection{nbc: nbc}, err
}

func (conn *BlockConnection) NoOp() (Status, error) {
	callback := &GenericCallback{}
	h := NewResponseHandler(callback)
	err := conn.nbc.NoOp(h)
	if err != nil {
		return callback.Status(), err
	}

	err = conn.nbc.Listen(h)

	return callback.Status(), err
}

func (conn *BlockConnection) get(key []byte, getCmd kproto.Command_MessageType) (*Record, Status, error) {
	callback := &GetCallback{}
	h := NewResponseHandler(callback)

	var err error = nil
	switch getCmd {
	case kproto.Command_GET:
		err = conn.nbc.Get(key, h)
	case kproto.Command_GETPREVIOUS:
		err = conn.nbc.GetPrevious(key, h)
	case kproto.Command_GETNEXT:
		err = conn.nbc.GetNext(key, h)
	}
	if err != nil {
		return nil, callback.Status(), err
	}

	err = conn.nbc.Listen(h)

	return &callback.Entry, callback.Status(), err
}

func (conn *BlockConnection) Get(key []byte) (*Record, Status, error) {
	return conn.get(key, kproto.Command_GET)
}

func (conn *BlockConnection) GetNext(key []byte) (*Record, Status, error) {
	return conn.get(key, kproto.Command_GETNEXT)
}

func (conn *BlockConnection) GetPrevious(key []byte) (*Record, Status, error) {
	return conn.get(key, kproto.Command_GETPREVIOUS)
}

func (conn *BlockConnection) GetKeyRange(r *KeyRange) ([][]byte, Status, error) {
	callback := &GetKeyRangeCallback{}
	h := NewResponseHandler(callback)
	err := conn.nbc.GetKeyRange(r, h)
	if err != nil {
		return nil, callback.Status(), err
	}

	err = conn.nbc.Listen(h)

	return callback.Keys, callback.Status(), err
}

func (conn *BlockConnection) GetVersion(key []byte) ([]byte, Status, error) {
	callback := &GetVersionCallback{}
	h := NewResponseHandler(callback)
	err := conn.nbc.GetVersion(key, h)
	if err != nil {
		return nil, callback.Status(), err
	}

	err = conn.nbc.Listen(h)

	return callback.Version, callback.Status(), err
}

func (conn *BlockConnection) Flush() (Status, error) {
	callback := &GenericCallback{}
	h := NewResponseHandler(callback)
	err := conn.nbc.Flush(h)
	if err != nil {
		return callback.Status(), err
	}

	err = conn.nbc.Listen(h)

	return callback.Status(), err
}

func (conn *BlockConnection) Delete(entry *Record) (Status, error) {
	callback := &GenericCallback{}
	h := NewResponseHandler(callback)
	err := conn.nbc.Delete(entry, h)
	if err != nil {
		return callback.Status(), err
	}

	err = conn.nbc.Listen(h)

	return callback.Status(), err
}

func (conn *BlockConnection) Put(entry *Record) (Status, error) {
	callback := &GenericCallback{}
	h := NewResponseHandler(callback)
	err := conn.nbc.Put(entry, h)
	if err != nil {
		return callback.Status(), err
	}

	err = conn.nbc.Listen(h)

	return callback.Status(), err
}

func (conn *BlockConnection) P2PPush(request *P2PPushRequest) ([]Status, Status, error) {
	callback := &P2PPushCallback{}
	h := NewResponseHandler(callback)
	err := conn.nbc.P2PPush(request, h)
	if err != nil {
		return nil, callback.Status(), err
	}

	err = conn.nbc.Listen(h)

	return callback.Statuses, callback.Status(), err
}

func (conn *BlockConnection) GetLog(logs []LogType) (*Log, Status, error) {
	callback := &GetLogCallback{}
	h := NewResponseHandler(callback)
	err := conn.nbc.GetLog(logs, h)
	if err != nil {
		return nil, callback.Status(), err
	}

	err = conn.nbc.Listen(h)

	return &callback.Logs, callback.Status(), err
}

func (conn *BlockConnection) pinop(pin []byte, op kproto.Command_PinOperation_PinOpType) (Status, error) {
	callback := &GenericCallback{}
	h := NewResponseHandler(callback)

	var err error = nil
	switch op {
	case kproto.Command_PinOperation_SECURE_ERASE_PINOP:
		err = conn.nbc.SecureErase(pin, h)
	case kproto.Command_PinOperation_ERASE_PINOP:
		err = conn.nbc.InstantErase(pin, h)
	case kproto.Command_PinOperation_LOCK_PINOP:
		err = conn.nbc.LockDevice(pin, h)
	case kproto.Command_PinOperation_UNLOCK_PINOP:
		err = conn.nbc.UnlockDevice(pin, h)
	}
	if err != nil {
		return callback.Status(), err
	}

	err = conn.nbc.Listen(h)

	return callback.Status(), err
}

func (conn *BlockConnection) SecureErase(pin []byte) (Status, error) {
	return conn.pinop(pin, kproto.Command_PinOperation_SECURE_ERASE_PINOP)
}

func (conn *BlockConnection) InstantErase(pin []byte) (Status, error) {
	return conn.pinop(pin, kproto.Command_PinOperation_ERASE_PINOP)

}

func (conn *BlockConnection) LockDevice(pin []byte) (Status, error) {
	return conn.pinop(pin, kproto.Command_PinOperation_LOCK_PINOP)
}

func (conn *BlockConnection) UnlockDevice(pin []byte) (Status, error) {
	return conn.pinop(pin, kproto.Command_PinOperation_UNLOCK_PINOP)
}

func (conn *BlockConnection) UpdateFirmware(code []byte) (Status, error) {
	callback := &GenericCallback{}
	h := NewResponseHandler(callback)
	err := conn.nbc.UpdateFirmware(code, h)
	if err != nil {
		return callback.Status(), err
	}

	err = conn.nbc.Listen(h)

	return callback.Status(), err
}

func (conn *BlockConnection) SetClusterVersion(version int64) (Status, error) {
	callback := &GenericCallback{}
	h := NewResponseHandler(callback)
	err := conn.nbc.SetClusterVersion(version, h)
	if err != nil {
		return callback.Status(), err
	}

	err = conn.nbc.Listen(h)

	return callback.Status(), err
}

func (conn *BlockConnection) SetLockPin(currentPin []byte, newPin []byte) (Status, error) {
	callback := &GenericCallback{}
	h := NewResponseHandler(callback)
	err := conn.nbc.SetLockPin(currentPin, newPin, h)
	if err != nil {
		return callback.Status(), err
	}

	err = conn.nbc.Listen(h)

	return callback.Status(), err
}

func (conn *BlockConnection) SetErasePin(currentPin []byte, newPin []byte) (Status, error) {
	callback := &GenericCallback{}
	h := NewResponseHandler(callback)
	err := conn.nbc.SetErasePin(currentPin, newPin, h)
	if err != nil {
		return callback.Status(), err
	}

	err = conn.nbc.Listen(h)

	return callback.Status(), err
}

func (conn *BlockConnection) SetACL(acls []SecurityACL) (Status, error) {
	callback := &GenericCallback{}
	h := NewResponseHandler(callback)
	err := conn.nbc.SetACL(acls, h)
	if err != nil {
		return callback.Status(), err
	}

	err = conn.nbc.Listen(h)

	return callback.Status(), err
}

func (conn *BlockConnection) MediaScan(op *MediaOperation, pri Priority) (Status, error) {
	callback := &GenericCallback{}
	h := NewResponseHandler(callback)
	err := conn.nbc.MediaScan(op, pri, h)
	if err != nil {
		return callback.Status(), err
	}

	err = conn.nbc.Listen(h)

	return callback.Status(), err
}

func (conn *BlockConnection) MediaOptimize(op *MediaOperation, pri Priority) (Status, error) {
	callback := &GenericCallback{}
	h := NewResponseHandler(callback)
	err := conn.nbc.MediaOptimize(op, pri, h)
	if err != nil {
		return callback.Status(), err
	}

	err = conn.nbc.Listen(h)

	return callback.Status(), err
}

func (conn *BlockConnection) Close() {
	conn.nbc.Close()
}

callback.go

0 → 100644
+100 −0
Original line number Diff line number Diff line
package kinetic

import (
	kproto "github.com/yongzhy/kinetic-go/proto"
)

// Callback is the interface define actions for MessageType.
// Success is called when XXXXX_RESPONSE message recieved from drive without problem.
// Failure is called when XXXXX_RESPONSE message status code is not OK, or any other kind of failure.
// Done return true if either Success or Failure is called to indicate XXXXX_RESPONSE received and processed.
// Status return the MessateType operation status.
type Callback interface {
	Success(resp *kproto.Command, value []byte)
	Failure(status Status)
	Status() Status
}

// Generic Callback, can be used for all MessageType which doesn't require data from Kinetic drive.
// And for MessageType that require data from drive, a new struct need to be defined GenericCallback
type GenericCallback struct {
	status Status
}

func (c *GenericCallback) Success(resp *kproto.Command, value []byte) {
	c.status = Status{Code: OK}
}

func (c *GenericCallback) Failure(status Status) {
	c.status = status
}

func (c *GenericCallback) Status() Status {
	return c.status
}

// Callback for Command_GET Message
type GetCallback struct {
	GenericCallback
	Entry Record // Entity information
}

// Success function extracts object information from kinetic message protocol and
// store into GetCallback.Entry.
func (c *GetCallback) Success(resp *kproto.Command, value []byte) {
	c.GenericCallback.Success(resp, value)
	c.Entry.Key = resp.GetBody().GetKeyValue().GetKey()
	c.Entry.Tag = resp.GetBody().GetKeyValue().GetTag()
	c.Entry.Version = resp.GetBody().GetKeyValue().GetDbVersion()
	c.Entry.Algo = convertAlgoFromProto(resp.GetBody().GetKeyValue().GetAlgorithm())

	c.Entry.Value = value
}

// Callback for Command_GETKEYRANGE Message
type GetKeyRangeCallback struct {
	GenericCallback
	Keys [][]byte // List of objects' keys within range, get from device
}

func (c *GetKeyRangeCallback) Success(resp *kproto.Command, value []byte) {
	c.GenericCallback.Success(resp, value)
	c.Keys = resp.GetBody().GetRange().GetKeys()
}

// Callback for Command_GETVERSION Message
type GetVersionCallback struct {
	GenericCallback
	Version []byte // Version of the object on device
}

func (c *GetVersionCallback) Success(resp *kproto.Command, value []byte) {
	c.GenericCallback.Success(resp, value)
	c.Version = resp.GetBody().GetKeyValue().GetDbVersion()
}

// Callback for Command_PEER2PEERPUSH
type P2PPushCallback struct {
	GenericCallback
	Statuses []Status
}

func (c *P2PPushCallback) Success(resp *kproto.Command, value []byte) {
	c.GenericCallback.Success(resp, value)
	c.Statuses = make([]Status, len(resp.GetBody().GetP2POperation().GetOperation()))
	for k, op := range resp.GetBody().GetP2POperation().GetOperation() {
		c.Statuses[k].Code = convertStatusCodeFromProto(op.GetStatus().GetCode())
		c.Statuses[k].ErrorMsg = op.GetStatus().GetStatusMessage()
	}
}

// Callback for Command_GETLOG Message
type GetLogCallback struct {
	GenericCallback
	Logs Log // Device log information
}

func (c *GetLogCallback) Success(resp *kproto.Command, value []byte) {
	c.GenericCallback.Success(resp, value)
	c.Logs = getLogFromProto(resp)
}

connection_test.go

0 → 100644
+189 −0
Original line number Diff line number Diff line
package kinetic

import (
	"os"
	"testing"
)

var (
	blockConn    *BlockConnection    = nil
	nonblockConn *NonBlockConnection = nil
)

var option = ClientOptions{
	Host: "127.0.0.1",
	Port: 8123,
	User: 1,
	Hmac: []byte("asdfasdf")}

func TestMain(m *testing.M) {
	blockConn, _ = NewBlockConnection(option)
	if blockConn != nil {
		code := m.Run()
		blockConn.Close()
		os.Exit(code)
	} else {
		os.Exit(-1)
	}
}

func TestBlockNoOp(t *testing.T) {
	status, err := blockConn.NoOp()
	if err != nil || status.Code != OK {
		t.Fatal("Blocking NoOp Failure")
	}
}

func TestBlockGet(t *testing.T) {
	_, status, err := blockConn.Get([]byte("object000"))
	if err != nil || status.Code != OK {
		t.Fatal("Blocking Get Failure")
	}
}

func TestBlockGetNext(t *testing.T) {
	_, status, err := blockConn.GetNext([]byte("object000"))
	if err != nil || status.Code != OK {
		t.Fatal("Blocking GetNext Failure")
	}
}

func TestBlockGetPrevious(t *testing.T) {
	_, status, err := blockConn.GetPrevious([]byte("object000"))
	if err != nil || status.Code != OK {
		t.Fatal("Blocking GetPrevious Failure")
	}
}

func TestBlockGetVersion(t *testing.T) {
	version, status, err := blockConn.GetVersion([]byte("object000"))
	if err != nil || status.Code != OK {
		t.Fatal("Blocking GetVersion Failure")
	}
	t.Logf("Object version = %x", version)
}

func TestBlockFlush(t *testing.T) {
	status, err := blockConn.Flush()
	if err != nil || status.Code != OK {
		t.Fatal("Blocking Flush Failure")
	}
}

func TestBlockPut(t *testing.T) {
	entry := Record{
		Key:   []byte("object001"),
		Value: []byte("ABCDEFG"),
		Sync:  SYNC_WRITETHROUGH,
		Algo:  ALGO_SHA1,
		Tag:   []byte(""),
		Force: true,
	}
	status, err := blockConn.Put(&entry)
	if err != nil || status.Code != OK {
		t.Fatal("Blocking Put Failure")
	}
}

func TestBlockDelete(t *testing.T) {
	entry := Record{
		Key:   []byte("object001"),
		Sync:  SYNC_WRITETHROUGH,
		Algo:  ALGO_SHA1,
		Force: true,
	}
	status, err := blockConn.Delete(&entry)
	if err != nil || status.Code != OK {
		t.Fatal("Blocking Delete Failure")
	}
}

func TestBlockGetKeyRange(t *testing.T) {
	r := KeyRange{
		StartKey:          []byte("object000"),
		EndKey:            []byte("object999"),
		StartKeyInclusive: true,
		EndKeyInclusive:   true,
		Max:               5,
	}
	keys, status, err := blockConn.GetKeyRange(&r)
	if err != nil || status.Code != OK {
		t.Fatal("Blocking GetKeyRange Failure: ", status.Error())
	}
	for k, key := range keys {
		t.Logf("key[%d] = %s", k, string(key))
	}
}

func TestBlockGetLogCapacity(t *testing.T) {
	logs := []LogType{
		LOG_CAPACITIES,
	}
	klogs, status, err := blockConn.GetLog(logs)
	if err != nil || status.Code != OK {
		t.Fatal("Blocking GetLog Failure")
	}
	if !(klogs.Capacity.CapacityInBytes > 0 &&
		klogs.Capacity.PortionFull > 0) {
		t.Logf("%#v", klogs.Capacity)
		t.Fatal("Blocking GetLog for Capacity Failure")
	}
}

func TestBlockGetLogLimit(t *testing.T) {
	logs := []LogType{
		LOG_LIMITS,
	}
	klogs, status, err := blockConn.GetLog(logs)
	if err != nil || status.Code != OK {
		t.Fatal("Blocking GetLog Failure")
	}
	if klogs.Limits.MaxKeySize != 4096 ||
		klogs.Limits.MaxValueSize != 1024*1024 {
		t.Logf("%#v", klogs.Limits)
		t.Fatal("Blocking GetLog for Limits Failure")
	}
}

func TestBlockGetLogAll(t *testing.T) {
	logs := []LogType{
		LOG_UTILIZATIONS,
		LOG_TEMPERATURES,
		LOG_CAPACITIES,
		LOG_CONFIGURATION,
		LOG_STATISTICS,
		LOG_MESSAGES,
		LOG_LIMITS,
	}
	klogs, status, err := blockConn.GetLog(logs)
	if err != nil || status.Code != OK {
		t.Fatal("Blocking GetLog Failure")
	}
	t.Logf("GetLog %+v", klogs)
}

func TestBlockMediaScan(t *testing.T) {
	op := MediaOperation{
		StartKey:          []byte("object000"),
		EndKey:            []byte("object999"),
		StartKeyInclusive: true,
		EndKeyInclusive:   true,
	}
	status, err := blockConn.MediaScan(&op, PRIORITY_NORMAL)
	if err != nil || status.Code != OK {
		t.Fatal("Blocking MediaScan Failure: ", status.Error())
	}
}

func TestBlockMediaOptimize(t *testing.T) {
	op := MediaOperation{
		StartKey:          []byte("object000"),
		EndKey:            []byte("object999"),
		StartKeyInclusive: true,
		EndKeyInclusive:   true,
	}
	status, err := blockConn.MediaOptimize(&op, PRIORITY_NORMAL)
	if err != nil || status.Code != OK {
		t.Fatal("Blocking MediaOptimize Failure: ", status.Error())
	}
}

getlog.go

0 → 100644
+328 −0

File added.

Preview size limit exceeded, changes collapsed.

handler.go

0 → 100644
+60 −0
Original line number Diff line number Diff line
package kinetic

import (
	kproto "github.com/yongzhy/kinetic-go/proto"
	"sync"
)

// ResponseHandler is the handler for XXXXX_RESPONSE message from drive.
// For each operation, a unique ResponseHandler is requried
type ResponseHandler struct {
	callback Callback
	done     bool
	cond     *sync.Cond
}

func (h *ResponseHandler) handle(cmd *kproto.Command, value []byte) error {
	if h.callback != nil {
		if cmd.Status != nil && cmd.Status.Code != nil {
			if cmd.GetStatus().GetCode() == kproto.Command_Status_SUCCESS {
				h.callback.Success(cmd, value)
			} else {
				h.callback.Failure(getStatusFromProto(cmd))
			}
		} else {
			klog.Warn("Other status received")
			klog.Info("%v", cmd)
		}

	}
	h.cond.L.Lock()
	h.done = true
	h.cond.Signal()
	h.cond.L.Unlock()
	return nil
}

func (h *ResponseHandler) fail(s Status) {
	if h.callback != nil {
		h.callback.Failure(s)
	}
	h.cond.L.Lock()
	h.done = true
	h.cond.Signal()
	h.cond.L.Unlock()
}

func (h *ResponseHandler) wait() {
	h.cond.L.Lock()
	if h.done == false {
		h.cond.Wait()
	}
	h.cond.L.Unlock()
}

// Helper function to build a ResponseHandler with call as the Callback.
// For each operation, a unique ResponseHandler is requried
func NewResponseHandler(call Callback) *ResponseHandler {
	h := &ResponseHandler{callback: call, done: false, cond: sync.NewCond(&sync.Mutex{})}
	return h
}
Loading