Commit 6cc6a0f2 authored by Zhu Yong's avatar Zhu Yong
Browse files

Cluster version

parent 363a6f5b
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
@@ -224,6 +224,10 @@ func (conn *BlockConnection) SetClusterVersion(version int64) (Status, error) {
	return callback.Status(), err
}

func (conn *BlockConnection) SetClientClusterVersion(version int64) {
	conn.nbc.SetClientClusterVersion(version)
}

func (conn *BlockConnection) SetLockPin(currentPin []byte, newPin []byte) (Status, error) {
	callback := &GenericCallback{}
	h := NewResponseHandler(callback)
+16 −2
Original line number Diff line number Diff line
@@ -72,7 +72,7 @@ func TestBlockFlush(t *testing.T) {

func TestBlockPut(t *testing.T) {
	entry := Record{
		Key:   []byte("object001"),
		Key:   []byte("object000"),
		Value: []byte("ABCDEFG"),
		Sync:  SYNC_WRITETHROUGH,
		Algo:  ALGO_SHA1,
@@ -87,7 +87,7 @@ func TestBlockPut(t *testing.T) {

func TestBlockDelete(t *testing.T) {
	entry := Record{
		Key:   []byte("object001"),
		Key:   []byte("object000"),
		Sync:  SYNC_WRITETHROUGH,
		Algo:  ALGO_SHA1,
		Force: true,
@@ -196,3 +196,17 @@ func TestBlockMediaOptimize(t *testing.T) {
		t.Fatal("Blocking MediaOptimize Failure: ", err, status.String())
	}
}

func TestBlockSetClusterVersion(t *testing.T) {
	status, err := blockConn.SetClusterVersion(1)
	if err != nil || status.Code != OK {
		t.Fatal("Blocking SetClusterVersion Failure: ", err, status.String())
	}

	blockConn.SetClientClusterVersion(2)
	_, status, err = blockConn.Get([]byte("object000"))
	if err != nil || status.Code != REMOTE_CLUSTER_VERSION_MISMATCH {
		t.Fatal("Blocking Get expected REMOTE_CLUSTER_VERSION_MISMATCH. ", err, status.String())
	}
	t.Log(status.String())
}
+4 −0
Original line number Diff line number Diff line
@@ -249,6 +249,10 @@ func (conn *NonBlockConnection) SetClusterVersion(version int64, h *ResponseHand
	return conn.service.submit(msg, cmd, nil, h)
}

func (conn *NonBlockConnection) SetClientClusterVersion(version int64) {
	conn.service.clusterVersion = version
}

func (conn *NonBlockConnection) SetLockPin(currentPin []byte, newPin []byte, h *ResponseHandler) error {
	msg := newMessage(kproto.Message_HMACAUTH)
	cmd := newCommand(kproto.Command_SECURITY)
+60 −33
Original line number Diff line number Diff line
@@ -17,6 +17,10 @@ var (
	networkTimeout = 20 * time.Second
)

//const (
//	seq_UNSOLICITEDSTATUS = -1
//)

func newMessage(t kproto.Message_AuthType) *kproto.Message {
	msg := &kproto.Message{
		AuthType: t.Enum(),
@@ -41,11 +45,13 @@ type networkService struct {
	txMu           sync.Mutex
	mapMu          sync.Mutex
	conn           net.Conn
	clusterVersion int64                      // Cluster version
	seq            int64                      // Operation sequence ID
	connId         int64                      // current conection ID
	option         ClientOptions              // current connection operation
	hmap           map[int64]*ResponseHandler // Message handler map
	fatal          bool                       // Network has fatal failure
	device         Log                        // Store device inforamtion from handshake package
}

func newNetworkService(op ClientOptions) (*networkService, error) {
@@ -58,8 +64,9 @@ func newNetworkService(op ClientOptions) (*networkService, error) {

	ns := &networkService{
		conn:           conn,
		clusterVersion: 0,
		seq:            0,
		connId: 0,
		connId:         -1,
		option:         op,
		hmap:           make(map[int64]*ResponseHandler),
		fatal:          false,
@@ -67,7 +74,7 @@ func newNetworkService(op ClientOptions) (*networkService, error) {

	ns.rxMu.Lock()
	// Do the handshake.
	// TODO: we can store the Device Configuration and Limits from handshake
	// Device Configuration and Limits from handshake will be stored in networkService.device
	_, _, _, err = ns.receive()
	ns.rxMu.Unlock()

@@ -76,6 +83,14 @@ func newNetworkService(op ClientOptions) (*networkService, error) {
		return nil, err
	}

	klog.Debugf("Connected to %s", op.Host)
	klog.Debugf("\tVendor: %s", ns.device.Configuration.Vendor)
	klog.Debugf("\tModel: %s", ns.device.Configuration.Model)
	klog.Debugf("\tWorldWideName: %s", ns.device.Configuration.WorldWideName)
	klog.Debugf("\tSerial Number: %s", ns.device.Configuration.SerialNumber)
	klog.Debugf("\tFirmware Version: %s", ns.device.Configuration.Version)
	klog.Debugf("\tKinetic Protocol Version: %s", ns.device.Configuration.ProtocolVersion)

	return ns, nil
}

@@ -107,7 +122,7 @@ func (ns *networkService) listen() error {
	ns.mapMu.Unlock()

	ns.rxMu.Lock()
	msg, cmd, value, err := ns.receive()
	_, cmd, value, err := ns.receive()
	ns.rxMu.Unlock()
	if err != nil {
		klog.Error("Network Service listen error")
@@ -118,11 +133,12 @@ func (ns *networkService) listen() error {
		", AckSeq = ", cmd.GetHeader().GetAckSequence(),
		", Code = ", cmd.GetStatus().GetCode())

	if msg.GetAuthType() == kproto.Message_UNSOLICITEDSTATUS {
		if cmd.GetHeader() != nil {
			*(cmd.GetHeader().AckSequence) = -1
		}
	}
	// TODO: Need to review this code block, is it necessary to set the AckSeq for UNSOLICITEDSTATUS
	//if msg.GetAuthType() == kproto.Message_UNSOLICITEDSTATUS {
	//	if cmd.GetHeader() != nil {
	//		*(cmd.GetHeader().AckSequence) = seq_UNSOLICITEDSTATUS
	//	}
	//}

	ack := cmd.GetHeader().GetAckSequence()
	ns.mapMu.Lock()
@@ -154,10 +170,12 @@ func (ns *networkService) submit(msg *kproto.Message, cmd *kproto.Command, value

	cmd.GetHeader().ConnectionID = &ns.connId
	cmd.GetHeader().Sequence = &ns.seq
	cmd.GetHeader().ClusterVersion = &ns.clusterVersion

	cmdBytes, err := proto.Marshal(cmd)
	if err != nil {
		klog.Error("Error marshl Kinetic Command")
		s := Status{CLIENT_INTERNAL_ERROR, "Error marshl Kinetic Command"}
		s := Status{Code: CLIENT_INTERNAL_ERROR, ErrorMsg: "Error marshl Kinetic Command"}
		ns.clientError(s, h)
		return err
	}
@@ -189,7 +207,7 @@ func (ns *networkService) submit(msg *kproto.Message, cmd *kproto.Command, value
func (ns *networkService) send(msg *kproto.Message, value []byte) error {
	msgBytes, err := proto.Marshal(msg)
	if err != nil {
		s := Status{CLIENT_INTERNAL_ERROR, "Error marshl Kinetic Message"}
		s := Status{Code: CLIENT_INTERNAL_ERROR, ErrorMsg: "Error marshl Kinetic Message"}
		ns.clientError(s, nil)
		return err
	}
@@ -211,7 +229,7 @@ func (ns *networkService) send(msg *kproto.Message, value []byte) error {
	_, err = ns.conn.Write(packet)
	if err != nil {
		klog.Error("Network I/O write error")
		s := Status{CLIENT_IO_ERROR, "Network I/O write error"}
		s := Status{Code: CLIENT_IO_ERROR, ErrorMsg: "Network I/O write error"}
		ns.clientError(s, nil)
		ns.fatal = true
		return err
@@ -229,7 +247,7 @@ func (ns *networkService) receive() (*kproto.Message, *kproto.Command, []byte, e
	_, err := io.ReadFull(ns.conn, header[0:])
	if err != nil {
		klog.Error("Network I/O read error")
		s := Status{CLIENT_IO_ERROR, "Network I/O read error"}
		s := Status{Code: CLIENT_IO_ERROR, ErrorMsg: "Network I/O read error"}
		ns.clientError(s, nil)
		ns.fatal = true
		return nil, nil, nil, err
@@ -238,7 +256,7 @@ func (ns *networkService) receive() (*kproto.Message, *kproto.Command, []byte, e
	magic := header[0]
	if magic != 'F' {
		klog.Error("Network I/O read error Header wrong magic")
		s := Status{CLIENT_IO_ERROR, "Network I/O read error Header wrong magic"}
		s := Status{Code: CLIENT_IO_ERROR, ErrorMsg: "Network I/O read error Header wrong magic"}
		ns.clientError(s, nil)
		ns.fatal = true
		return nil, nil, nil, errors.New("Network I/O read error Header wrong magic")
@@ -251,7 +269,7 @@ func (ns *networkService) receive() (*kproto.Message, *kproto.Command, []byte, e
	_, err = io.ReadFull(ns.conn, protoBuf)
	if err != nil {
		klog.Error("Network I/O read error receive Kinetic Header")
		s := Status{CLIENT_IO_ERROR, "Network I/O read error receive Kinetic Header"}
		s := Status{Code: CLIENT_IO_ERROR, ErrorMsg: "Network I/O read error receive Kinetic Header"}
		ns.clientError(s, nil)
		ns.fatal = true
		return nil, nil, nil, err
@@ -261,7 +279,7 @@ func (ns *networkService) receive() (*kproto.Message, *kproto.Command, []byte, e
	err = proto.Unmarshal(protoBuf, msg)
	if err != nil {
		klog.Error("Network I/O read error receive Kinetic Header")
		s := Status{CLIENT_IO_ERROR, "Network I/O read error reaceive Kinetic Message"}
		s := Status{Code: CLIENT_IO_ERROR, ErrorMsg: "Network I/O read error reaceive Kinetic Message"}
		ns.clientError(s, nil)
		ns.fatal = true
		return nil, nil, nil, err
@@ -269,7 +287,7 @@ func (ns *networkService) receive() (*kproto.Message, *kproto.Command, []byte, e

	if msg.GetAuthType() == kproto.Message_HMACAUTH && validate_hmac(msg, ns.option.Hmac) == false {
		klog.Error("Response HMAC mismatch")
		s := Status{CLIENT_RESPONSE_HMAC_VERIFICATION_ERROR, "Response HMAC mismatch"}
		s := Status{Code: CLIENT_RESPONSE_HMAC_VERIFICATION_ERROR, ErrorMsg: "Response HMAC mismatch"}
		ns.clientError(s, nil)
		return nil, nil, nil, err
	}
@@ -278,13 +296,22 @@ func (ns *networkService) receive() (*kproto.Message, *kproto.Command, []byte, e
	err = proto.Unmarshal(msg.CommandBytes, cmd)
	if err != nil {
		klog.Error("Network I/O read error parsing Kinetic Command")
		s := Status{CLIENT_IO_ERROR, "Network I/O read error parsing Kinetic Command"}
		s := Status{Code: CLIENT_IO_ERROR, ErrorMsg: "Network I/O read error parsing Kinetic Command"}
		ns.clientError(s, nil)
		ns.fatal = true
		return nil, nil, nil, err
	}

	if cmd.Header != nil && cmd.Header.ConnectionID != nil {
		if ns.connId < 0 {
			// This is handshake packet
			ns.device = getLogFromProto(cmd)

			// Only update client cluster version during Handshake
			if cmd.Header.ClusterVersion != nil {
				ns.clusterVersion = cmd.GetHeader().GetClusterVersion()
			}
		}
		ns.connId = cmd.GetHeader().GetConnectionID()
	}

@@ -293,7 +320,7 @@ func (ns *networkService) receive() (*kproto.Message, *kproto.Command, []byte, e
		_, err = io.ReadFull(ns.conn, valueBuf)
		if err != nil {
			klog.Error("Network I/O read error parsing Kinetic Value")
			s := Status{CLIENT_IO_ERROR, "Network I/O read error parsing Kinetic Value"}
			s := Status{Code: CLIENT_IO_ERROR, ErrorMsg: "Network I/O read error parsing Kinetic Value"}
			ns.clientError(s, nil)
			ns.fatal = true
			return nil, nil, nil, err
@@ -307,5 +334,5 @@ func (ns *networkService) receive() (*kproto.Message, *kproto.Command, []byte, e

func (ns *networkService) close() {
	ns.conn.Close()
	klog.Debug("Connection to %s closed", ns.option.Host)
	klog.Debugf("Connection to %s closed", ns.option.Host)
}
+12 −5
Original line number Diff line number Diff line
@@ -2,6 +2,7 @@ package kinetic

import (
	kproto "github.com/yongzhy/kinetic-go/proto"
	"strconv"
)

// Status code for kinetic message.
@@ -79,6 +80,7 @@ var statusName = map[StatusCode]string{
type Status struct {
	Code                   StatusCode
	ErrorMsg               string
	ExpectedClusterVersion int64
}

func (s Status) Error() string {
@@ -86,11 +88,15 @@ func (s Status) Error() string {
}

func (s Status) String() string {
	ret := "Unknown Status"
	str, ok := statusName[s.Code]
	if ok {
		return str + " : " + s.ErrorMsg
		ret = str + " : " + s.ErrorMsg
		if s.Code == REMOTE_CLUSTER_VERSION_MISMATCH {
			ret = ret + ", Expected cluster version =" + strconv.Itoa(int(s.ExpectedClusterVersion))
		}
	return "Unknown Status"
	}
	return ret
}

func convertStatusCodeToProto(s StatusCode) kproto.Command_Status_StatusCode {
@@ -198,6 +204,7 @@ func convertStatusCodeFromProto(s kproto.Command_Status_StatusCode) StatusCode {
func getStatusFromProto(cmd *kproto.Command) Status {
	code := convertStatusCodeFromProto(cmd.GetStatus().GetCode())
	msg := cmd.GetStatus().GetStatusMessage()
	version := cmd.GetHeader().GetClusterVersion()

	return Status{code, msg}
	return Status{code, msg, version}
}