Loading blockconnection.go +1 −1 Original line number Diff line number Diff line Loading @@ -36,7 +36,7 @@ func (conn *BlockConnection) Get(key []byte) (Record, error) { return Record{}, err } for i := 0; i < 1000; i++ { for i := 0; i < 10; i++ { if callback.Done() == false { conn.nbc.Run() } Loading connection_test.go +6 −6 Original line number Diff line number Diff line Loading @@ -11,6 +11,12 @@ var ( const testDevice string = "10.29.24.55" var option = ClientOptions{ Host: testDevice, Port: 8123, User: 1, Hmac: []byte("asdfasdf")} func TestMain(m *testing.M) { testConn = nil code := m.Run() Loading @@ -22,9 +28,6 @@ func TestHandshake(t *testing.T) { if testConn == nil { t.Skip("No Connection, skip this test") } var option = ClientOptions{ Host: testDevice, Port: 8123, User: 1, Hmac: []byte("asfdasfd")} conn, err := NewNonBlockConnection(option) if err != nil { Loading @@ -35,9 +38,6 @@ func TestHandshake(t *testing.T) { } func TestNonBlockGet(t *testing.T) { var option = ClientOptions{ Host: testDevice, Port: 8123, User: 1, Hmac: []byte("asfdasfd")} conn, err := NewBlockConnection(option) if err != nil { Loading handler.go +4 −0 Original line number Diff line number Diff line Loading @@ -9,6 +9,7 @@ type MessageHandler struct { } func (h *MessageHandler) Handle(cmd *kproto.Command, value []byte) error { klog.Info("Message handler called") if h.callback != nil { if cmd.Status != nil && cmd.Status.Code != nil { if cmd.GetStatus().GetCode() == kproto.Command_Status_SUCCESS { Loading @@ -17,6 +18,9 @@ func (h *MessageHandler) Handle(cmd *kproto.Command, value []byte) error { var status = Status{} h.callback.Failure(&status) } } else { klog.Info("Other status received") klog.Info("%v", cmd) } } Loading service.go +13 −25 Original line number Diff line number Diff line Loading @@ -45,7 +45,7 @@ func newNetworkService(op ClientOptions) (*networkService, error) { } s := &networkService{conn: conn, seq: 1, seq: 0, connId: 0, option: op, hmap: make(map[int64]*MessageHandler)} Loading @@ -66,6 +66,7 @@ func (s *networkService) listen() error { msg, cmd, value, err := s.receive() if err != nil { klog.Error("Network Service listen error") return err } Loading Loading @@ -103,7 +104,7 @@ func (s *networkService) submit(msg *kproto.Message, cmd *kproto.Command, value if msg.GetAuthType() == kproto.Message_HMACAUTH { msg.GetHmacAuth().Identity = &s.option.User msg.GetHmacAuth().Hmac = s.option.Hmac[:] msg.GetHmacAuth().Hmac = compute_hmac(msg.CommandBytes, s.option.Hmac) } err = s.send(msg, value) Loading @@ -115,6 +116,7 @@ func (s *networkService) submit(msg *kproto.Message, cmd *kproto.Command, value if h != nil { s.hmap[s.seq] = h klog.Info("Insert handler for ACK ", s.seq) } // update sequence number Loading @@ -134,32 +136,18 @@ func (s *networkService) send(msg *kproto.Message, value []byte) error { binary.BigEndian.PutUint32(header[1:5], uint32(len(msgBytes))) binary.BigEndian.PutUint32(header[5:9], uint32(len(value))) var cnt int cnt, err = s.conn.Write(header) if err != nil { klog.Error("Write header fail") return err packet := append(header, msgBytes...) if value != nil && len(value) > 0 { packet = append(packet, value...) } if cnt != len(header) { klog.Fatal("Write header fail") } cnt, err = s.conn.Write(msgBytes) if err != nil { klog.Error("Write message fail") return err } if cnt != len(msgBytes) { klog.Fatal("Write message fail") } cnt, err = s.conn.Write(value) var cnt int cnt, err = s.conn.Write(packet) if err != nil { klog.Error("Write message fail") klog.Error("Send packet fail") return err } if cnt != len(value) { klog.Fatal("Write value fail") if cnt != len(packet) { klog.Fatal("Send packet length not match") } return nil Loading Loading @@ -228,5 +216,5 @@ func (s *networkService) receive() (*kproto.Message, *kproto.Command, []byte, er func (s *networkService) close() { s.conn.Close() klog.Info("Connection to %s closed", s.option.Host) klog.Infof("Connection to %s closed", s.option.Host) } Loading
blockconnection.go +1 −1 Original line number Diff line number Diff line Loading @@ -36,7 +36,7 @@ func (conn *BlockConnection) Get(key []byte) (Record, error) { return Record{}, err } for i := 0; i < 1000; i++ { for i := 0; i < 10; i++ { if callback.Done() == false { conn.nbc.Run() } Loading
connection_test.go +6 −6 Original line number Diff line number Diff line Loading @@ -11,6 +11,12 @@ var ( const testDevice string = "10.29.24.55" var option = ClientOptions{ Host: testDevice, Port: 8123, User: 1, Hmac: []byte("asdfasdf")} func TestMain(m *testing.M) { testConn = nil code := m.Run() Loading @@ -22,9 +28,6 @@ func TestHandshake(t *testing.T) { if testConn == nil { t.Skip("No Connection, skip this test") } var option = ClientOptions{ Host: testDevice, Port: 8123, User: 1, Hmac: []byte("asfdasfd")} conn, err := NewNonBlockConnection(option) if err != nil { Loading @@ -35,9 +38,6 @@ func TestHandshake(t *testing.T) { } func TestNonBlockGet(t *testing.T) { var option = ClientOptions{ Host: testDevice, Port: 8123, User: 1, Hmac: []byte("asfdasfd")} conn, err := NewBlockConnection(option) if err != nil { Loading
handler.go +4 −0 Original line number Diff line number Diff line Loading @@ -9,6 +9,7 @@ type MessageHandler struct { } func (h *MessageHandler) Handle(cmd *kproto.Command, value []byte) error { klog.Info("Message handler called") if h.callback != nil { if cmd.Status != nil && cmd.Status.Code != nil { if cmd.GetStatus().GetCode() == kproto.Command_Status_SUCCESS { Loading @@ -17,6 +18,9 @@ func (h *MessageHandler) Handle(cmd *kproto.Command, value []byte) error { var status = Status{} h.callback.Failure(&status) } } else { klog.Info("Other status received") klog.Info("%v", cmd) } } Loading
service.go +13 −25 Original line number Diff line number Diff line Loading @@ -45,7 +45,7 @@ func newNetworkService(op ClientOptions) (*networkService, error) { } s := &networkService{conn: conn, seq: 1, seq: 0, connId: 0, option: op, hmap: make(map[int64]*MessageHandler)} Loading @@ -66,6 +66,7 @@ func (s *networkService) listen() error { msg, cmd, value, err := s.receive() if err != nil { klog.Error("Network Service listen error") return err } Loading Loading @@ -103,7 +104,7 @@ func (s *networkService) submit(msg *kproto.Message, cmd *kproto.Command, value if msg.GetAuthType() == kproto.Message_HMACAUTH { msg.GetHmacAuth().Identity = &s.option.User msg.GetHmacAuth().Hmac = s.option.Hmac[:] msg.GetHmacAuth().Hmac = compute_hmac(msg.CommandBytes, s.option.Hmac) } err = s.send(msg, value) Loading @@ -115,6 +116,7 @@ func (s *networkService) submit(msg *kproto.Message, cmd *kproto.Command, value if h != nil { s.hmap[s.seq] = h klog.Info("Insert handler for ACK ", s.seq) } // update sequence number Loading @@ -134,32 +136,18 @@ func (s *networkService) send(msg *kproto.Message, value []byte) error { binary.BigEndian.PutUint32(header[1:5], uint32(len(msgBytes))) binary.BigEndian.PutUint32(header[5:9], uint32(len(value))) var cnt int cnt, err = s.conn.Write(header) if err != nil { klog.Error("Write header fail") return err packet := append(header, msgBytes...) if value != nil && len(value) > 0 { packet = append(packet, value...) } if cnt != len(header) { klog.Fatal("Write header fail") } cnt, err = s.conn.Write(msgBytes) if err != nil { klog.Error("Write message fail") return err } if cnt != len(msgBytes) { klog.Fatal("Write message fail") } cnt, err = s.conn.Write(value) var cnt int cnt, err = s.conn.Write(packet) if err != nil { klog.Error("Write message fail") klog.Error("Send packet fail") return err } if cnt != len(value) { klog.Fatal("Write value fail") if cnt != len(packet) { klog.Fatal("Send packet length not match") } return nil Loading Loading @@ -228,5 +216,5 @@ func (s *networkService) receive() (*kproto.Message, *kproto.Command, []byte, er func (s *networkService) close() { s.conn.Close() klog.Info("Connection to %s closed", s.option.Host) klog.Infof("Connection to %s closed", s.option.Host) }