Commit 3d4e18b6 authored by Zhu Yong's avatar Zhu Yong
Browse files

add mutex for multithread

parent 431395aa
Loading
Loading
Loading
Loading
+0 −2
Original line number Diff line number Diff line
@@ -26,13 +26,11 @@ type GenericCallback struct {
func (c *GenericCallback) Success(resp *kproto.Command, value []byte) {
	c.done = true
	c.status = Status{Code: OK}
	klog.Info("Callback Success")
}

func (c *GenericCallback) Failure(status Status) {
	c.done = true
	c.status = status
	klog.Info("Callback Failure")
}

func (c *GenericCallback) Done() bool {
+32 −31
Original line number Diff line number Diff line
@@ -7,6 +7,7 @@ import (

var (
	blockConn    *BlockConnection    = nil
	nonblockConn *NonBlockConnection = nil
)

var option = ClientOptions{
@@ -26,50 +27,50 @@ func TestMain(m *testing.M) {
	}
}

func TestNonBlockNoOp(t *testing.T) {
func TestBlockNoOp(t *testing.T) {
	status, err := blockConn.NoOp()
	if err != nil || status.Code != OK {
		t.Fatal("Nonblocking NoOp Failure")
		t.Fatal("Blocking NoOp Failure")
	}
}

func TestNonBlockGet(t *testing.T) {
func TestBlockGet(t *testing.T) {
	_, status, err := blockConn.Get([]byte("object000"))
	if err != nil || status.Code != OK {
		t.Fatal("Nonblocking Get Failure")
		t.Fatal("Blocking Get Failure")
	}
}

func TestNonBlockGetNext(t *testing.T) {
func TestBlockGetNext(t *testing.T) {
	_, status, err := blockConn.GetNext([]byte("object000"))
	if err != nil || status.Code != OK {
		t.Fatal("Nonblocking GetNext Failure")
		t.Fatal("Blocking GetNext Failure")
	}
}

func TestNonBlockGetPrevious(t *testing.T) {
func TestBlockGetPrevious(t *testing.T) {
	_, status, err := blockConn.GetPrevious([]byte("object000"))
	if err != nil || status.Code != OK {
		t.Fatal("Nonblocking GetPrevious Failure")
		t.Fatal("Blocking GetPrevious Failure")
	}
}

func TestNonBlockGetVersion(t *testing.T) {
func TestBlockGetVersion(t *testing.T) {
	version, status, err := blockConn.GetVersion([]byte("object000"))
	if err != nil || status.Code != OK {
		t.Fatal("Nonblocking GetVersion Failure")
		t.Fatal("Blocking GetVersion Failure")
	}
	t.Logf("Object version = %x", version)
}

func TestNonBlockFlush(t *testing.T) {
func TestBlockFlush(t *testing.T) {
	status, err := blockConn.Flush()
	if err != nil || status.Code != OK {
		t.Fatal("Nonblocking Flush Failure")
		t.Fatal("Blocking Flush Failure")
	}
}

func TestNonBlockPut(t *testing.T) {
func TestBlockPut(t *testing.T) {
	entry := Record{
		Key:   []byte("object001"),
		Value: []byte("ABCDEFG"),
@@ -80,11 +81,11 @@ func TestNonBlockPut(t *testing.T) {
	}
	status, err := blockConn.Put(&entry)
	if err != nil || status.Code != OK {
		t.Fatal("Nonblocking Put Failure")
		t.Fatal("Blocking Put Failure")
	}
}

func TestNonBlockDelete(t *testing.T) {
func TestBlockDelete(t *testing.T) {
	entry := Record{
		Key:   []byte("object001"),
		Sync:  SYNC_WRITETHROUGH,
@@ -93,11 +94,11 @@ func TestNonBlockDelete(t *testing.T) {
	}
	status, err := blockConn.Delete(&entry)
	if err != nil || status.Code != OK {
		t.Fatal("Nonblocking Delete Failure")
		t.Fatal("Blocking Delete Failure")
	}
}

func TestNonBlockGetKeyRange(t *testing.T) {
func TestBlockGetKeyRange(t *testing.T) {
	r := KeyRange{
		StartKey:          []byte("object000"),
		EndKey:            []byte("object999"),
@@ -107,44 +108,44 @@ func TestNonBlockGetKeyRange(t *testing.T) {
	}
	keys, status, err := blockConn.GetKeyRange(&r)
	if err != nil || status.Code != OK {
		t.Fatal("Nonblocking GetKeyRange Failure: ", status.Error())
		t.Fatal("Blocking GetKeyRange Failure: ", status.Error())
	}
	for k, key := range keys {
		t.Logf("key[%d] = %s", k, string(key))
	}
}

func TestNonBlockGetLogCapacity(t *testing.T) {
func TestBlockGetLogCapacity(t *testing.T) {
	logs := []LogType{
		LOG_CAPACITIES,
	}
	klogs, status, err := blockConn.GetLog(logs)
	if err != nil || status.Code != OK {
		t.Fatal("Nonblocking GetLog Failure")
		t.Fatal("Blocking GetLog Failure")
	}
	if !(klogs.Capacity.CapacityInBytes > 0 &&
		klogs.Capacity.PortionFull > 0) {
		t.Logf("%#v", klogs.Capacity)
		t.Fatal("Nonblocking GetLog for Capacity Failure")
		t.Fatal("Blocking GetLog for Capacity Failure")
	}
}

func TestNonBlockGetLogLimit(t *testing.T) {
func TestBlockGetLogLimit(t *testing.T) {
	logs := []LogType{
		LOG_LIMITS,
	}
	klogs, status, err := blockConn.GetLog(logs)
	if err != nil || status.Code != OK {
		t.Fatal("Nonblocking GetLog Failure")
		t.Fatal("Blocking GetLog Failure")
	}
	if klogs.Limits.MaxKeySize != 4096 ||
		klogs.Limits.MaxValueSize != 1024*1024 {
		t.Logf("%#v", klogs.Limits)
		t.Fatal("Nonblocking GetLog for Limits Failure")
		t.Fatal("Blocking GetLog for Limits Failure")
	}
}

func TestNonBlockGetLogAll(t *testing.T) {
func TestBlockGetLogAll(t *testing.T) {
	logs := []LogType{
		LOG_UTILIZATIONS,
		LOG_TEMPERATURES,
@@ -156,12 +157,12 @@ func TestNonBlockGetLogAll(t *testing.T) {
	}
	klogs, status, err := blockConn.GetLog(logs)
	if err != nil || status.Code != OK {
		t.Fatal("Nonblocking GetLog Failure")
		t.Fatal("Blocking GetLog Failure")
	}
	t.Logf("GetLog %+v", klogs)
}

func TestNonBlockMediaScan(t *testing.T) {
func TestBlockMediaScan(t *testing.T) {
	op := MediaOperation{
		StartKey:          []byte("object000"),
		EndKey:            []byte("object999"),
@@ -170,11 +171,11 @@ func TestNonBlockMediaScan(t *testing.T) {
	}
	status, err := blockConn.MediaScan(&op, PRIORITY_NORMAL)
	if err != nil || status.Code != OK {
		t.Fatal("Nonblocking MediaScan Failure: ", status.Error())
		t.Fatal("Blocking MediaScan Failure: ", status.Error())
	}
}

func TestNonBlockMediaOptimize(t *testing.T) {
func TestBlockMediaOptimize(t *testing.T) {
	op := MediaOperation{
		StartKey:          []byte("object000"),
		EndKey:            []byte("object999"),
@@ -183,6 +184,6 @@ func TestNonBlockMediaOptimize(t *testing.T) {
	}
	status, err := blockConn.MediaOptimize(&op, PRIORITY_NORMAL)
	if err != nil || status.Code != OK {
		t.Fatal("Nonblocking MediaOptimize Failure: ", status.Error())
		t.Fatal("Blocking MediaOptimize Failure: ", status.Error())
	}
}
+0 −1
Original line number Diff line number Diff line
@@ -10,7 +10,6 @@ type ResponseHandler struct {
}

func (h *ResponseHandler) Handle(cmd *kproto.Command, value []byte) error {
	klog.Info("Message handler called")
	if h.callback != nil {
		if cmd.Status != nil && cmd.Status.Code != nil {
			if cmd.GetStatus().GetCode() == kproto.Command_Status_SUCCESS {
+27 −12
Original line number Diff line number Diff line
@@ -6,6 +6,7 @@ import (
	"fmt"
	"io"
	"net"
	"sync"
	"time"

	proto "github.com/golang/protobuf/proto"
@@ -36,6 +37,8 @@ func newCommand(t kproto.Command_MessageType) *kproto.Command {
}

type networkService struct {
	rmutex sync.Mutex
	wmutex sync.Mutex
	conn   net.Conn
	seq    int64                      // Operation sequence ID
	connId int64                      // current conection ID
@@ -61,7 +64,10 @@ func newNetworkService(op ClientOptions) (*networkService, error) {
		fatal:  false,
	}

	ns.rmutex.Lock()
	_, _, _, err = ns.receive()
	ns.rmutex.Unlock()

	if err != nil {
		klog.Error("Can't establish connection to %s", op.Host)
		return nil, err
@@ -79,6 +85,7 @@ func (ns *networkService) clientError(s Status, mh *ResponseHandler) {
		}
		delete(ns.hmap, ack)
	}

	if mh != nil && mh.callback != nil {
		mh.callback.Failure(s)
	}
@@ -86,20 +93,24 @@ func (ns *networkService) clientError(s Status, mh *ResponseHandler) {

func (ns *networkService) listen() error {
	if ns.fatal {
		return errors.New("Network service has fatal error")
		return errors.New("Can't listen, network service has fatal error")
	}

	if len(ns.hmap) == 0 {
		return nil
	}

	ns.rmutex.Lock()
	msg, cmd, value, err := ns.receive()
	ns.rmutex.Unlock()
	if err != nil {
		klog.Error("Network Service listen error")
		return err
	}

	klog.Info("Kinetic response received ", cmd.GetHeader().GetMessageType().String())
	klog.Info("Kinetic response received ", cmd.GetHeader().GetMessageType().String(),
		", AckSeq = ", cmd.GetHeader().GetAckSequence(),
		", Code = ", cmd.GetStatus().GetCode())

	if msg.GetAuthType() == kproto.Message_UNSOLICITEDSTATUS {
		if cmd.GetHeader() != nil {
@@ -116,15 +127,23 @@ func (ns *networkService) listen() error {

	(*h).Handle(cmd, value)

	ns.wmutex.Lock()
	delete(ns.hmap, ack)
	ns.wmutex.Unlock()

	return nil
}

func (ns *networkService) submit(msg *kproto.Message, cmd *kproto.Command, value []byte, h *ResponseHandler) error {
	if ns.fatal {
		return errors.New("Network service has fatal error")
		return errors.New("Can't submit, network service has fatal error")
	}
	if h == nil {
		return errors.New("Valid ResponseHandler is required")
	}

	ns.wmutex.Lock()

	cmd.GetHeader().ConnectionID = &ns.connId
	cmd.GetHeader().Sequence = &ns.seq
	cmdBytes, err := proto.Marshal(cmd)
@@ -141,22 +160,18 @@ func (ns *networkService) submit(msg *kproto.Message, cmd *kproto.Command, value
		msg.GetHmacAuth().Hmac = compute_hmac(msg.CommandBytes, ns.option.Hmac)
	}

	klog.Info("Kinetic message send ", cmd.GetHeader().GetMessageType().String(), " Seq = ", ns.seq)

	err = ns.send(msg, value)
	if err != nil {
		return err
	}

	klog.Info("Kinetic message send ", cmd.GetHeader().GetMessageType().String())

	if h != nil {
	ns.hmap[ns.seq] = h
		klog.Info("Insert handler for ACK ", ns.seq)
	}

	// update sequence number
	// TODO: Need mutex protection here
	ns.seq++

	ns.wmutex.Unlock()

	return nil
}