Commit 75db8324 authored by Zhu Yong's avatar Zhu Yong
Browse files

Add execute and listen function for service.

parent 8931e63c
Loading
Loading
Loading
Loading
+9 −1
Original line number Diff line number Diff line
package kinetic

import (
	kproto "github.com/yongzhy/kinetic-go/proto"
)

type Connection struct {
	service *networkService
}
@@ -18,7 +22,11 @@ func NewConnection(op ClientOptions) (*Connection, error) {
}

func (conn *Connection) Nop() error {
	return nil
	msg := newMessage(kproto.Message_HMACAUTH)
	cmd := newCommand(kproto.Command_NOOP)

	err := conn.service.execute(msg, cmd, nil, nil)
	return err
}

func (conn *Connection) Close() {
+18 −1
Original line number Diff line number Diff line
package kinetic

import (
	"os"
	"testing"
)

var (
	testConn *Connection
)

const testDevice string = "10.29.24.55"

func TestMain(m *testing.M) {
	testConn = nil
	code := m.Run()
	os.Exit(code)
}

func TestHandshake(t *testing.T) {

	if testConn == nil {
		t.Skip("No Connection, skip this test")
	}
	var option = ClientOptions{
		Host: "10.29.24.55", Port: 8123,
		Host: testDevice, Port: 8123,
		User: 1, Hmac: []byte("asfdasfd")}

	conn, err := NewConnection(option)

handler.go

0 → 100644
+28 −0
Original line number Diff line number Diff line
package kinetic

import (
	kproto "github.com/yongzhy/kinetic-go/proto"
)

type Callback interface {
	Failure()
	Done() bool
}

type MessageHandler interface {
	Handle(cmd *kproto.Command, value []byte) error
	Error()
	SetCallback(callback *Callback)
}

type SimpleCallback struct {
	done bool
}

func (c *SimpleCallback) Failure() {
	c.done = true
}

func (c *SimpleCallback) Done() bool {
	return c.done
}
+42 −12
Original line number Diff line number Diff line
@@ -4,7 +4,6 @@ import (
	"os"

	"github.com/Sirupsen/logrus"
	kproto "github.com/yongzhy/kinetic-go/proto"
)

// Create logger for Kinetic package
@@ -17,24 +16,55 @@ func init() {
type ClientOptions struct {
	Host string
	Port int
	User int
	User int64
	Hmac []byte
}

type Callback interface {
	Success()
	Failure()
// algorithm
type Algorithm int32

const (
	ALGO_INVALID_ALGORITHM Algorithm = -1
	ALGO_SHA1              Algorithm = 1
	ALGO_SHA2              Algorithm = 2
	ALGO_SHA3              Algorithm = 3
	ALGO_CRC32             Algorithm = 4
	ALGO_CRC64             Algorithm = 5
)

type Record struct {
	Key     []byte
	Value   []byte
	Version []byte
	Tag     []byte
	Algo    Algorithm
}

type MessageHandler interface {
	Handle(cmd *kproto.Command, value []byte) error
	Error()
type KeyRange struct {
	StartKey          []byte
	EndKey            []byte
	StartKeyInclusive bool
	EndKeyInclusive   bool
	Reverse           bool
	Max               uint
}

type Client interface {
	Nop() error
	Put(key, value []byte) error
	Get(key []byte) ([]byte, error)
	Delete(key []byte) error
	GetRange(startKey []byte, startKeyInclusive bool, endKey []byte, endKeyInclusive bool, reverse bool, max int32) ([][]byte, error)
	Version() error
	Put(key, value []byte, h *MessageHandler) error
	Get(key []byte, h *MessageHandler) ([]byte, error)
	GetNext() error
	GetPrevious() error
	Flush(h *MessageHandler) error
	Delete(key []byte, h *MessageHandler) error
	GetRange(r *KeyRange, h *MessageHandler) ([][]byte, error)

	SetErasePin(old, new []byte, h *MessageHandler) error
	SecureErase(pin []byte) error
	InstantErase(pin []byte) error
	SetLockPin(old, new []byte) error
	Lock(pin []byte) error
	UnLock(pin []byte) error
	GetLog() error
}
+83 −7
Original line number Diff line number Diff line
@@ -10,11 +10,31 @@ import (
	kproto "github.com/yongzhy/kinetic-go/proto"
)

func newMessage(t kproto.Message_AuthType) *kproto.Message {
	msg := &kproto.Message{
		AuthType: t.Enum(),
	}
	if msg.GetAuthType() == kproto.Message_HMACAUTH {
		msg.HmacAuth = &kproto.Message_HMACauth{}
	}

	return msg
}

func newCommand(t kproto.Command_MessageType) *kproto.Command {
	return &kproto.Command{
		Header: &kproto.Command_Header{
			MessageType: t.Enum(),
		},
	}
}

type networkService struct {
	conn   net.Conn
	seq    int64
	connId int64
	option ClientOptions
	seq    int64                     // Operation sequence ID
	connId int64                     // current conection ID
	option ClientOptions             // current connection operation
	hmap   map[int64]*MessageHandler // Message handler map
}

func newNetworkService(op ClientOptions) (*networkService, error) {
@@ -32,13 +52,69 @@ func newNetworkService(op ClientOptions) (*networkService, error) {
		return nil, err
	}

	go s.run()

	return s, nil
}

func (s *networkService) run() {
func (s *networkService) listen() error {
	if len(s.hmap) == 0 {
		return nil
	}

	msg, cmd, value, err := s.receive()
	if err != nil {
		return err
	}

	klog.Info("Kinetic response received", cmd.GetHeader().GetMessageType().String())

	if msg.GetAuthType() == kproto.Message_UNSOLICITEDSTATUS {
		*cmd.GetHeader().AckSequence = -1
	}

	ack := cmd.GetHeader().GetAckSequence()
	h, ok := s.hmap[ack]
	if ok == false {
		klog.Error("Couldn't find a handler for acksequence ", ack)
		return nil
	}

	(*h).Handle(cmd, value)

	delete(s.hmap, ack)

	return nil
}

func (s *networkService) execute(msg *kproto.Message, cmd *kproto.Command, value []byte, h *MessageHandler) error {
	cmd.GetHeader().ConnectionID = &s.connId
	cmd.GetHeader().Sequence = &s.seq
	cmdBytes, err := proto.Marshal(cmd)
	if err != nil {
		klog.Error("Can't marshl Kinetic Command", err)
		return err
	}
	msg.CommandBytes = cmdBytes[:]

	if msg.GetAuthType() == kproto.Message_HMACAUTH {
		msg.GetHmacAuth().Identity = &s.option.User
		msg.GetHmacAuth().Hmac = s.option.Hmac[:]
	}

	err = s.send(msg, value)
	if err != nil {
		return err
	}

	klog.Info("Kinetic message send", cmd.GetHeader().GetMessageType().String())

	if h != nil {
		s.hmap[s.seq] = h
	}

	// update sequence number
	s.seq++

	return err
}

func (s *networkService) send(msg *kproto.Message, value []byte) error {
@@ -114,7 +190,7 @@ func (s *networkService) receive() (*kproto.Message, *kproto.Command, []byte, er
		return nil, nil, nil, err
	}

	if validate_hmac(msg, s.option.Hmac) == false {
	if msg.GetAuthType() == kproto.Message_HMACAUTH && validate_hmac(msg, s.option.Hmac) == false {
		klog.Error("Received packet has invalid HMAC")
		return nil, nil, nil, err
	}