Commit f5bca374 authored by Zhu Yong's avatar Zhu Yong
Browse files

Status error message, and UNSOLICITEDSTATUS handling.

parent d0140610
Loading
Loading
Loading
Loading
+32 −30
Original line number Diff line number Diff line
@@ -18,10 +18,6 @@ var (
	networkTimeout = 20 * time.Second
)

//const (
//	seq_UNSOLICITEDSTATUS = -1
//)

func newMessage(t kproto.Message_AuthType) *kproto.Message {
	msg := &kproto.Message{
		AuthType: t.Enum(),
@@ -52,8 +48,8 @@ type networkService struct {
	option         ClientOptions              // current connection operation
	hmap           map[int64]*ResponseHandler // Message handler map
	fatal          bool                       // Network has fatal failure
	fatalError     error                      // Network fatal error detauls
	device         Log                        // Store device inforamtion from handshake package
	fatalError     error                      // Network fatal error details
	device         Log                        // Store device information from handshake package
}

func newNetworkService(op ClientOptions) (*networkService, error) {
@@ -136,30 +132,36 @@ func (ns *networkService) listen() error {
	ns.mapMu.Unlock()

	ns.rxMu.Lock()
	_, cmd, value, err := ns.receive()
	msg, cmd, value, err := ns.receive()
	ns.rxMu.Unlock()
	if err != nil {
		klog.Error("Network Service listen error")
		return err
	}

	if cmd.GetHeader() != nil {
		klog.Debug("Kinetic response received ", cmd.GetHeader().GetMessageType().String(),
			", AckSeq = ", cmd.GetHeader().GetAckSequence(),
			", Code = ", cmd.GetStatus().GetCode())
	} else if msg.GetAuthType() == kproto.Message_UNSOLICITEDSTATUS {
		klog.Debug("Kinetic UNSOLICITEDSTATUS : ",
			"Code = ", cmd.GetStatus().GetCode(),
			", StatusMessage = ", cmd.GetStatus().GetStatusMessage())
	}

	// TODO: Need to review this code block, is it necessary to set the AckSeq for UNSOLICITEDSTATUS
	//if msg.GetAuthType() == kproto.Message_UNSOLICITEDSTATUS {
	//	if cmd.GetHeader() != nil {
	//		*(cmd.GetHeader().AckSequence) = seq_UNSOLICITEDSTATUS
	//	}
	//}
	// For UNSOLICITEDSTATUS, command may not have Header or AckSequence, set the ack to -1 so
	// no ResponseHandler will be found from hmap table.
	var ack int64 = -1
	if cmd.Header != nil && cmd.Header.AckSequence != nil {
		ack = cmd.GetHeader().GetAckSequence()
	}

	ack := cmd.GetHeader().GetAckSequence()
	ns.mapMu.Lock()
	h, ok := ns.hmap[ack]
	ns.mapMu.Unlock()
	if ok == false {
		klog.Error("Couldn't find a handler for acksequence ", ack)
		// It's high chance this is an UNSOLICITEDSTATUS message, display the Status.
		klog.Errorf("Couldn't find a handler for acksequence %d, status=%s", ack, getStatusFromProto(cmd).String())
		// This is an unexpected packet. Each listen() call expect remove one ResponseHandler from hmap.
		// So need to fire another listen() to make sure ResponseHandler in hmap got chance to exit.
		// Either by receive correct packet, or network read failure.
@@ -247,8 +249,8 @@ func (ns *networkService) send(msg *kproto.Message, value []byte) error {

	_, err = ns.conn.Write(packet)
	if err != nil {
		klog.Error("Network I/O write error")
		s := Status{Code: CLIENT_IO_ERROR, ErrorMsg: "Network I/O write error"}
		klog.Error("Network I/O write error, " + err.Error())
		s := Status{Code: CLIENT_IO_ERROR, ErrorMsg: "Network I/O write error, " + err.Error()}
		ns.clientError(s, nil)
		ns.fatal = true
		ns.fatalError = err
@@ -266,8 +268,8 @@ func (ns *networkService) receive() (*kproto.Message, *kproto.Command, []byte, e

	_, err := io.ReadFull(ns.conn, header[0:])
	if err != nil {
		klog.Error("Network I/O read error")
		s := Status{Code: CLIENT_IO_ERROR, ErrorMsg: "Network I/O read error"}
		klog.Error("Network I/O read error, " + err.Error())
		s := Status{Code: CLIENT_IO_ERROR, ErrorMsg: "Network I/O read error, " + err.Error()}
		ns.clientError(s, nil)
		ns.fatal = true
		ns.fatalError = err
@@ -290,8 +292,8 @@ func (ns *networkService) receive() (*kproto.Message, *kproto.Command, []byte, e
	protoBuf := make([]byte, protoLen)
	_, err = io.ReadFull(ns.conn, protoBuf)
	if err != nil {
		klog.Error("Network I/O read error receive Kinetic Header")
		s := Status{Code: CLIENT_IO_ERROR, ErrorMsg: "Network I/O read error receive Kinetic Header"}
		klog.Error("Network I/O read error receive Kinetic Header, " + err.Error())
		s := Status{Code: CLIENT_IO_ERROR, ErrorMsg: "Network I/O read error receive Kinetic Header, " + err.Error()}
		ns.clientError(s, nil)
		ns.fatal = true
		ns.fatalError = err
@@ -301,8 +303,8 @@ func (ns *networkService) receive() (*kproto.Message, *kproto.Command, []byte, e
	msg := &kproto.Message{}
	err = proto.Unmarshal(protoBuf, msg)
	if err != nil {
		klog.Error("Network I/O read error receive Kinetic Header")
		s := Status{Code: CLIENT_IO_ERROR, ErrorMsg: "Network I/O read error reaceive Kinetic Message"}
		klog.Error("Network I/O read error receive Kinetic Header, " + err.Error())
		s := Status{Code: CLIENT_IO_ERROR, ErrorMsg: "Network I/O read error reaceive Kinetic Message, " + err.Error()}
		ns.clientError(s, nil)
		ns.fatal = true
		ns.fatalError = err
@@ -319,8 +321,8 @@ func (ns *networkService) receive() (*kproto.Message, *kproto.Command, []byte, e
	cmd := &kproto.Command{}
	err = proto.Unmarshal(msg.CommandBytes, cmd)
	if err != nil {
		klog.Error("Network I/O read error parsing Kinetic Command")
		s := Status{Code: CLIENT_IO_ERROR, ErrorMsg: "Network I/O read error parsing Kinetic Command"}
		klog.Error("Network I/O read error parsing Kinetic Command, " + err.Error())
		s := Status{Code: CLIENT_IO_ERROR, ErrorMsg: "Network I/O read error parsing Kinetic Command, " + err.Error()}
		ns.clientError(s, nil)
		ns.fatal = true
		ns.fatalError = err
@@ -344,8 +346,8 @@ func (ns *networkService) receive() (*kproto.Message, *kproto.Command, []byte, e
		valueBuf := make([]byte, valueLen)
		_, err = io.ReadFull(ns.conn, valueBuf)
		if err != nil {
			klog.Error("Network I/O read error parsing Kinetic Value")
			s := Status{Code: CLIENT_IO_ERROR, ErrorMsg: "Network I/O read error parsing Kinetic Value"}
			klog.Error("Network I/O read error parsing Kinetic Value, " + err.Error())
			s := Status{Code: CLIENT_IO_ERROR, ErrorMsg: "Network I/O read error parsing Kinetic Value, " + err.Error()}
			ns.clientError(s, nil)
			ns.fatal = true
			ns.fatalError = err