Commit ec1c58e7 authored by Zhu Yong's avatar Zhu Yong
Browse files

Added Status, network error handling, network timeout

parent c803d9d1
Loading
Loading
Loading
Loading
+12 −8
Original line number Diff line number Diff line
@@ -9,6 +9,7 @@ type BlockConnection struct {
func NewBlockConnection(op ClientOptions) (*BlockConnection, error) {
	nbc, err := NewNonBlockConnection(op)
	if err != nil {
		klog.Error("Can't establish nonblocking connection")
		return nil, err
	}

@@ -18,7 +19,15 @@ func NewBlockConnection(op ClientOptions) (*BlockConnection, error) {
func (conn *BlockConnection) NoOp() error {
	callback := &GenericCallback{}
	h := NewMessageHandler(callback)
	conn.nbc.Nop(h)
	if h == nil {
		klog.Error("Message Handler for NoOp Failure")
	}
	if conn == nil {
		klog.Error("Connection nil")
	} else if conn.nbc == nil {
		klog.Error("Nonblock Connection nil")
	}
	conn.nbc.NoOp(h)

	for callback.Done() == false {
		conn.nbc.Run()
@@ -36,14 +45,9 @@ func (conn *BlockConnection) Get(key []byte) (Record, error) {
		return Record{}, err
	}

	for i := 0; i < 10; i++ {
		if callback.Done() == false {
	for callback.Done() == false {
		conn.nbc.Run()
	}
	}
	//for callback.Done() == false {
	//		conn.nbc.Run()
	//}

	return callback.Record(), nil
}
+7 −5
Original line number Diff line number Diff line
@@ -6,7 +6,7 @@ import (

type Callback interface {
	Success(resp *kproto.Command, value []byte)
	Failure(status *Status)
	Failure(status Status)
	Done() bool
	Status() Status
}
@@ -22,9 +22,9 @@ func (c *GenericCallback) Success(resp *kproto.Command, value []byte) {
	klog.Info("Callback Success")
}

func (c *GenericCallback) Failure(status *Status) {
func (c *GenericCallback) Failure(status Status) {
	c.done = true
	c.status = *status
	c.status = status
	klog.Info("Callback Failure")
}

@@ -44,9 +44,11 @@ type GetCallback struct {
func (c *GetCallback) Success(resp *kproto.Command, value []byte) {
	c.GenericCallback.Success(resp, value)
	c.record.Key = resp.GetBody().GetKeyValue().GetKey()
	c.record.Tag = resp.GetBody().GetKeyValue().GetTag()
	c.record.Version = resp.GetBody().GetKeyValue().GetDbVersion()
	c.record.Algo = convertAlgoFromProto(resp.GetBody().GetKeyValue().GetAlgorithm())

	c.record.Value = value
	klog.Info("Get Operation Success")
	klog.Info("%v", c.record)
}

func (c *GetCallback) Record() Record {
+13 −27
Original line number Diff line number Diff line
@@ -6,44 +6,30 @@ import (
)

var (
	testConn *BlockConnection
	blockConn *BlockConnection = nil
)

const testDevice string = "10.29.24.55"

var option = ClientOptions{
	Host: testDevice,
	Host: "10.29.24.55",
	Port: 8123,
	User: 1,
	Hmac: []byte("asdfasdf")}

func TestMain(m *testing.M) {
	testConn = nil
	blockConn, _ = NewBlockConnection(option)
	if blockConn != nil {
		code := m.Run()
		blockConn.Close()
		os.Exit(code)
	} else {
		os.Exit(-1)
	}

func TestHandshake(t *testing.T) {

	if testConn == nil {
		t.Skip("No Connection, skip this test")
	}

	conn, err := NewNonBlockConnection(option)
	if err != nil {
		t.Fatal("Handshake fail")
}

	conn.Close()
func TestNonBlockNoOp(t *testing.T) {
	blockConn.NoOp()
}

func TestNonBlockGet(t *testing.T) {

	conn, err := NewBlockConnection(option)
	if err != nil {
		t.Fatal("Handshake fail")
	}

	conn.Get([]byte("object000"))
	conn.Close()
	blockConn.Get([]byte("object000"))
}
+7 −2
Original line number Diff line number Diff line
@@ -15,8 +15,7 @@ func (h *MessageHandler) Handle(cmd *kproto.Command, value []byte) error {
			if cmd.GetStatus().GetCode() == kproto.Command_Status_SUCCESS {
				h.callback.Success(cmd, value)
			} else {
				var status = Status{}
				h.callback.Failure(&status)
				h.callback.Failure(getStatusFromProto(cmd))
			}
		} else {
			klog.Info("Other status received")
@@ -27,6 +26,12 @@ func (h *MessageHandler) Handle(cmd *kproto.Command, value []byte) error {
	return nil
}

func (h *MessageHandler) Error(s Status) {
	if h.callback != nil {
		h.callback.Failure(s)
	}
}

func (h *MessageHandler) SetCallback(call Callback) {
	h.callback = call
}
+200 −0
Original line number Diff line number Diff line
@@ -4,6 +4,7 @@ import (
	"os"

	"github.com/Sirupsen/logrus"
	kproto "github.com/yongzhy/kinetic-go/proto"
)

// Create logger for Kinetic package
@@ -68,3 +69,202 @@ type Client interface {
	UnLock(pin []byte) error
	GetLog() error
}

func convertAlgoToProto(a Algorithm) kproto.Command_Algorithm {
	ret := kproto.Command_INVALID_ALGORITHM
	switch a {
	case ALGO_INVALID_ALGORITHM:
		ret = kproto.Command_INVALID_ALGORITHM
	case ALGO_SHA1:
		ret = kproto.Command_SHA1
	case ALGO_SHA2:
		ret = kproto.Command_SHA2
	case ALGO_SHA3:
		ret = kproto.Command_SHA3
	case ALGO_CRC32:
		ret = kproto.Command_CRC32
	case ALGO_CRC64:
		ret = kproto.Command_CRC64
	}
	return ret
}

func convertAlgoFromProto(a kproto.Command_Algorithm) Algorithm {
	ret := ALGO_INVALID_ALGORITHM
	switch a {
	case kproto.Command_INVALID_ALGORITHM:
		ret = ALGO_INVALID_ALGORITHM
	case kproto.Command_SHA1:
		ret = ALGO_SHA1
	case kproto.Command_SHA2:
		ret = ALGO_SHA2
	case kproto.Command_SHA3:
		ret = ALGO_SHA3
	case kproto.Command_CRC32:
		ret = ALGO_CRC32
	case kproto.Command_CRC64:
		ret = ALGO_CRC64
	}
	return ret
}

func convertStatusCodeToProto(s StatusCode) kproto.Command_Status_StatusCode {
	ret := kproto.Command_Status_INVALID_STATUS_CODE
	switch s {
	case REMOTE_NOT_ATTEMPTED:
		ret = kproto.Command_Status_NOT_ATTEMPTED
	case OK:
		ret = kproto.Command_Status_SUCCESS
	case REMOTE_HMAC_ERROR:
		ret = kproto.Command_Status_HMAC_FAILURE
	case REMOTE_NOT_AUTHORIZED:
		ret = kproto.Command_Status_NOT_AUTHORIZED
	case REMOTE_CLUSTER_VERSION_MISMATCH:
		ret = kproto.Command_Status_VERSION_FAILURE
	case REMOTE_INTERNAL_ERROR:
		ret = kproto.Command_Status_INTERNAL_ERROR
	case REMOTE_HEADER_REQUIRED:
		ret = kproto.Command_Status_HEADER_REQUIRED
	case REMOTE_NOT_FOUND:
		ret = kproto.Command_Status_NOT_FOUND
	case REMOTE_VERSION_MISMATCH:
		ret = kproto.Command_Status_VERSION_MISMATCH
	case REMOTE_SERVICE_BUSY:
		ret = kproto.Command_Status_SERVICE_BUSY
	case REMOTE_EXPIRED:
		ret = kproto.Command_Status_EXPIRED
	case REMOTE_DATA_ERROR:
		ret = kproto.Command_Status_DATA_ERROR
	case REMOTE_PERM_DATA_ERROR:
		ret = kproto.Command_Status_PERM_DATA_ERROR
	case REMOTE_CONNECTION_ERROR:
		ret = kproto.Command_Status_REMOTE_CONNECTION_ERROR
	case REMOTE_NO_SPACE:
		ret = kproto.Command_Status_NO_SPACE
	case REMOTE_NO_SUCH_HMAC_ALGORITHM:
		ret = kproto.Command_Status_NO_SUCH_HMAC_ALGORITHM
	case REMOTE_INVALID_REQUEST:
		ret = kproto.Command_Status_INVALID_REQUEST
	case REMOTE_NESTED_OPERATION_ERRORS:
		ret = kproto.Command_Status_NESTED_OPERATION_ERRORS
	case REMOTE_DEVICE_LOCKED:
		ret = kproto.Command_Status_DEVICE_LOCKED
	case REMOTE_DEVICE_ALREADY_UNLOCKED:
		ret = kproto.Command_Status_DEVICE_ALREADY_UNLOCKED
	case REMOTE_CONNECTION_TERMINATED:
		ret = kproto.Command_Status_CONNECTION_TERMINATED
	case REMOTE_INVALID_BATCH:
		ret = kproto.Command_Status_INVALID_BATCH
	}
	return ret
}

func convertStatusCodeFromProto(s kproto.Command_Status_StatusCode) StatusCode {
	ret := REMOTE_OTHER_ERROR
	switch s {
	case kproto.Command_Status_NOT_ATTEMPTED:
		ret = REMOTE_NOT_ATTEMPTED
	case kproto.Command_Status_SUCCESS:
		ret = OK
	case kproto.Command_Status_HMAC_FAILURE:
		ret = REMOTE_HMAC_ERROR
	case kproto.Command_Status_NOT_AUTHORIZED:
		ret = REMOTE_NOT_AUTHORIZED
	case kproto.Command_Status_VERSION_FAILURE:
		ret = REMOTE_CLUSTER_VERSION_MISMATCH
	case kproto.Command_Status_INTERNAL_ERROR:
		ret = REMOTE_INTERNAL_ERROR
	case kproto.Command_Status_HEADER_REQUIRED:
		ret = REMOTE_HEADER_REQUIRED
	case kproto.Command_Status_NOT_FOUND:
		ret = REMOTE_NOT_FOUND
	case kproto.Command_Status_VERSION_MISMATCH:
		ret = REMOTE_VERSION_MISMATCH
	case kproto.Command_Status_SERVICE_BUSY:
		ret = REMOTE_SERVICE_BUSY
	case kproto.Command_Status_EXPIRED:
		ret = REMOTE_EXPIRED
	case kproto.Command_Status_DATA_ERROR:
		ret = REMOTE_DATA_ERROR
	case kproto.Command_Status_PERM_DATA_ERROR:
		ret = REMOTE_PERM_DATA_ERROR
	case kproto.Command_Status_REMOTE_CONNECTION_ERROR:
		ret = REMOTE_CONNECTION_ERROR
	case kproto.Command_Status_NO_SPACE:
		ret = REMOTE_NO_SPACE
	case kproto.Command_Status_NO_SUCH_HMAC_ALGORITHM:
		ret = REMOTE_NO_SUCH_HMAC_ALGORITHM
	case kproto.Command_Status_INVALID_REQUEST:
		ret = REMOTE_INVALID_REQUEST
	case kproto.Command_Status_NESTED_OPERATION_ERRORS:
		ret = REMOTE_NESTED_OPERATION_ERRORS
	case kproto.Command_Status_DEVICE_LOCKED:
		ret = REMOTE_DEVICE_LOCKED
	case kproto.Command_Status_DEVICE_ALREADY_UNLOCKED:
		ret = REMOTE_DEVICE_ALREADY_UNLOCKED
	case kproto.Command_Status_CONNECTION_TERMINATED:
		ret = REMOTE_CONNECTION_TERMINATED
	case kproto.Command_Status_INVALID_BATCH:
		ret = REMOTE_INVALID_BATCH
	}
	return ret
}

func getStatusFromProto(cmd *kproto.Command) Status {
	code := convertStatusCodeFromProto(cmd.GetStatus().GetCode())
	switch code {
	case CLIENT_IO_ERROR:
		return Status{code, "IO error"}
	case CLIENT_SHUTDOWN:
		return Status{code, "Client shutdown"}
	case PROTOCOL_ERROR_RESPONSE_NO_ACKSEQUENCE:
		return Status{code, "Response did not contain ack sequence"}
	case CLIENT_RESPONSE_HMAC_VERIFICATION_ERROR:
		return Status{code, "Response HMAC verification failed"}
	case REMOTE_HMAC_ERROR:
		return Status{code, "Remote HMAC verification failed"}
	case REMOTE_NOT_AUTHORIZED:
		return Status{code, "Not authorized"}
	case REMOTE_CLUSTER_VERSION_MISMATCH:
		expected_cluster_version := cmd.GetHeader().GetClusterVersion()
		return Status{code, "Cluster version mismatch " + string(expected_cluster_version)}
	case REMOTE_INTERNAL_ERROR:
		return Status{code, "Remote internal error"}
	case REMOTE_HEADER_REQUIRED:
		return Status{code, "Request requires a header to be set"}
	case REMOTE_NOT_FOUND:
		return Status{code, "Key not found"}
	case REMOTE_VERSION_MISMATCH:
		return Status{code, "Version mismatch"}
	case REMOTE_SERVICE_BUSY:
		return Status{code, "Remote service is busy"}
	case REMOTE_EXPIRED:
		return Status{code, "Remote timeout"}
	case REMOTE_DATA_ERROR:
		return Status{code, "Remote transient data error"}
	case REMOTE_PERM_DATA_ERROR:
		return Status{code, "Remote permanent data error"}
	case REMOTE_CONNECTION_ERROR:
		return Status{code, "Remote connection to peer failed"}
	case REMOTE_NO_SPACE:
		return Status{code, "No space left"}
	case REMOTE_NO_SUCH_HMAC_ALGORITHM:
		return Status{code, "Unknown HMAC algorithm"}
	case REMOTE_NESTED_OPERATION_ERRORS:
		return Status{code, "Operation completed but has nested errors"}
	case REMOTE_DEVICE_LOCKED:
		return Status{code, "Remote device is locked"}
	case REMOTE_DEVICE_ALREADY_UNLOCKED:
		return Status{code, "Remote device is already unlocked"}
	case REMOTE_CONNECTION_TERMINATED:
		return Status{code, "Remote connection is terminated"}
	case REMOTE_INVALID_BATCH:
		return Status{code, "Invalid batch"}
	case REMOTE_INVALID_EXECUTE:
		return Status{code, "Invalid execute of applet"}
	case REMOTE_EXECUTE_COMPLETE:
		return Status{code, "Applet execute complete"}
	default:
		return Status{code, "Internal Error"}
	}
}
Loading