Commit cc387818 authored by Zhu Yong's avatar Zhu Yong
Browse files

Start new listen if an unexpected packet received. To make sure all...

Start new listen if an unexpected packet received. To make sure all ResponseHandler get chance to exit, either receive correct packet, or network read error
parent 7e552151
Loading
Loading
Loading
Loading
+15 −2
Original line number Diff line number Diff line
@@ -52,6 +52,7 @@ type networkService struct {
	option         ClientOptions              // current connection operation
	hmap           map[int64]*ResponseHandler // Message handler map
	fatal          bool                       // Network has fatal failure
	fatalError     error                      // Network fatal error detauls
	device         Log                        // Store device inforamtion from handshake package
}

@@ -80,6 +81,7 @@ func newNetworkService(op ClientOptions) (*networkService, error) {
		option:         op,
		hmap:           make(map[int64]*ResponseHandler),
		fatal:          false,
		fatalError:     nil,
	}

	ns.rxMu.Lock()
@@ -123,7 +125,7 @@ func (ns *networkService) clientError(s Status, mh *ResponseHandler) {

func (ns *networkService) listen() error {
	if ns.fatal {
		return errors.New("Can't listen, network service has fatal error")
		return errors.New("Can't listen, network service has fatal error: " + ns.fatalError.Error())
	}

	ns.mapMu.Lock()
@@ -158,6 +160,10 @@ func (ns *networkService) listen() error {
	ns.mapMu.Unlock()
	if ok == false {
		klog.Error("Couldn't find a handler for acksequence ", ack)
		// This is an unexpected packet. Each listen() call expect remove one ResponseHandler from hmap.
		// So need to fire another listen() to make sure ResponseHandler in hmap got chance to exit.
		// Either by receive correct packet, or network read failure.
		go ns.listen()
		return nil
	}

@@ -172,7 +178,7 @@ func (ns *networkService) listen() error {

func (ns *networkService) submit(msg *kproto.Message, cmd *kproto.Command, value []byte, h *ResponseHandler) error {
	if ns.fatal {
		return errors.New("Can't submit, network service has fatal error")
		return errors.New("Can't submit, network service has fatal error: " + ns.fatalError.Error())
	}
	if h == nil {
		return errors.New("Valid ResponseHandler is required")
@@ -244,6 +250,7 @@ func (ns *networkService) send(msg *kproto.Message, value []byte) error {
		s := Status{Code: CLIENT_IO_ERROR, ErrorMsg: "Network I/O write error"}
		ns.clientError(s, nil)
		ns.fatal = true
		ns.fatalError = err
		return err
	}

@@ -262,6 +269,7 @@ func (ns *networkService) receive() (*kproto.Message, *kproto.Command, []byte, e
		s := Status{Code: CLIENT_IO_ERROR, ErrorMsg: "Network I/O read error"}
		ns.clientError(s, nil)
		ns.fatal = true
		ns.fatalError = err
		return nil, nil, nil, err
	}

@@ -271,6 +279,7 @@ func (ns *networkService) receive() (*kproto.Message, *kproto.Command, []byte, e
		s := Status{Code: CLIENT_IO_ERROR, ErrorMsg: "Network I/O read error Header wrong magic"}
		ns.clientError(s, nil)
		ns.fatal = true
		ns.fatalError = errors.New("Wrong magic number")
		return nil, nil, nil, errors.New("Network I/O read error Header wrong magic")
	}

@@ -284,6 +293,7 @@ func (ns *networkService) receive() (*kproto.Message, *kproto.Command, []byte, e
		s := Status{Code: CLIENT_IO_ERROR, ErrorMsg: "Network I/O read error receive Kinetic Header"}
		ns.clientError(s, nil)
		ns.fatal = true
		ns.fatalError = err
		return nil, nil, nil, err
	}

@@ -294,6 +304,7 @@ func (ns *networkService) receive() (*kproto.Message, *kproto.Command, []byte, e
		s := Status{Code: CLIENT_IO_ERROR, ErrorMsg: "Network I/O read error reaceive Kinetic Message"}
		ns.clientError(s, nil)
		ns.fatal = true
		ns.fatalError = err
		return nil, nil, nil, err
	}

@@ -311,6 +322,7 @@ func (ns *networkService) receive() (*kproto.Message, *kproto.Command, []byte, e
		s := Status{Code: CLIENT_IO_ERROR, ErrorMsg: "Network I/O read error parsing Kinetic Command"}
		ns.clientError(s, nil)
		ns.fatal = true
		ns.fatalError = err
		return nil, nil, nil, err
	}

@@ -335,6 +347,7 @@ func (ns *networkService) receive() (*kproto.Message, *kproto.Command, []byte, e
			s := Status{Code: CLIENT_IO_ERROR, ErrorMsg: "Network I/O read error parsing Kinetic Value"}
			ns.clientError(s, nil)
			ns.fatal = true
			ns.fatalError = err
			return nil, nil, nil, err
		}