Commit 93f8a790 authored by Zhu Yong's avatar Zhu Yong
Browse files

code for Batch operation, not tested, drive not implemented yet

parent 1f9346e0
Loading
Loading
Loading
Loading
+67 −0
Original line number Diff line number Diff line
@@ -167,6 +167,73 @@ func (conn *BlockConnection) P2PPush(request *P2PPushRequest) ([]Status, Status,
	return callback.Statuses, callback.Status(), err
}

func (conn *BlockConnection) BatchStart() (Status, error) {
	callback := &GenericCallback{}
	h := NewResponseHandler(callback)
	err := conn.nbc.BatchStart(h)
	if err != nil {
		return callback.Status(), err
	}

	err = conn.nbc.Listen(h)

	return callback.Status(), err
}

func (conn *BlockConnection) BatchPut(entry *Record) (Status, error) {
	// TODO: combine normal Put and BatchPut
	callback := &GenericCallback{}
	h := NewResponseHandler(callback)
	err := conn.nbc.BatchPut(entry, h)
	if err != nil {
		return callback.Status(), err
	}

	err = conn.nbc.Listen(h)

	return callback.Status(), err
}

func (conn *BlockConnection) BatchDelete(entry *Record) (Status, error) {
	// TODO: combine normal Delete and BatchDelete
	callback := &GenericCallback{}
	h := NewResponseHandler(callback)
	err := conn.nbc.BatchDelete(entry, h)
	if err != nil {
		return callback.Status(), err
	}

	err = conn.nbc.Listen(h)

	return callback.Status(), err
}

func (conn *BlockConnection) BatchEnd() (Status, error) {
	callback := &GenericCallback{}
	h := NewResponseHandler(callback)
	err := conn.nbc.BatchEnd(h)
	if err != nil {
		return callback.Status(), err
	}

	err = conn.nbc.Listen(h)

	return callback.Status(), err
}

func (conn *BlockConnection) BatchAbort() (Status, error) {
	callback := &GenericCallback{}
	h := NewResponseHandler(callback)
	err := conn.nbc.BatchAbort(h)
	if err != nil {
		return callback.Status(), err
	}

	err = conn.nbc.Listen(h)

	return callback.Status(), err
}

// GetLog gets kinetic device Log information. Can request single LogType or multiple LogType.
// On success, device Log information will return, and Status.Code = OK
func (conn *BlockConnection) GetLog(logs []LogType) (*Log, Status, error) {
+73 −6
Original line number Diff line number Diff line
@@ -2,6 +2,7 @@ package kinetic

import (
	"bytes"
	"sync"

	kproto "github.com/yongzhy/kinetic-go/proto"
)
@@ -10,6 +11,9 @@ import (
// response message from device.
type NonBlockConnection struct {
	service    *networkService
	batchID    uint32 // Current batch Operation ID
	batchCount int32  // Current batch operation count
	batchMu    sync.Mutex
}

// Helper function to establish non-block connection to device.
@@ -23,7 +27,7 @@ func NewNonBlockConnection(op ClientOptions) (*NonBlockConnection, error) {
		return nil, err
	}

	return &NonBlockConnection{service}, nil
	return &NonBlockConnection{service: service, batchID: 0, batchCount: 0}, nil
}

// NoOp does nothing but wait for drive to return response.
@@ -105,11 +109,15 @@ func (conn *NonBlockConnection) Flush(h *ResponseHandler) error {
	return conn.service.submit(msg, cmd, nil, h)
}

// Delete deletes object from kinetic device.
func (conn *NonBlockConnection) Delete(entry *Record, h *ResponseHandler) error {
func (conn *NonBlockConnection) delete(entry *Record, batch bool, h *ResponseHandler) error {
	msg := newMessage(kproto.Message_HMACAUTH)
	cmd := newCommand(kproto.Command_DELETE)

	// Bathc operation, batchID needed
	if batch {
		cmd.Header.BatchID = &conn.batchID
	}

	sync := convertSyncToProto(entry.Sync)
	//algo := convertAlgoToProto(entry.Algo)
	cmd.Body = &kproto.Command_Body{
@@ -124,11 +132,21 @@ func (conn *NonBlockConnection) Delete(entry *Record, h *ResponseHandler) error
	return conn.service.submit(msg, cmd, nil, h)
}

// Put store object to kinetic device.
func (conn *NonBlockConnection) Put(entry *Record, h *ResponseHandler) error {
// Delete deletes object from kinetic device.
func (conn *NonBlockConnection) Delete(entry *Record, h *ResponseHandler) error {
	// Normal DELETE operation, not batch operation.
	return conn.delete(entry, false, h)
}

func (conn *NonBlockConnection) put(entry *Record, batch bool, h *ResponseHandler) error {
	msg := newMessage(kproto.Message_HMACAUTH)
	cmd := newCommand(kproto.Command_PUT)

	// Bathc operation, batchID needed
	if batch {
		cmd.Header.BatchID = &conn.batchID
	}

	sync := convertSyncToProto(entry.Sync)
	algo := convertAlgoToProto(entry.Algo)
	cmd.Body = &kproto.Command_Body{
@@ -144,6 +162,12 @@ func (conn *NonBlockConnection) Put(entry *Record, h *ResponseHandler) error {
	return conn.service.submit(msg, cmd, entry.Value, h)
}

// Put store object to kinetic device.
func (conn *NonBlockConnection) Put(entry *Record, h *ResponseHandler) error {
	// Normal PUT operation, not batch operation
	return conn.put(entry, false, h)
}

func (conn *NonBlockConnection) buildP2PMessage(request *P2PPushRequest) *kproto.Command_P2POperation {
	var p2pop *kproto.Command_P2POperation
	if request != nil {
@@ -183,6 +207,49 @@ func (conn *NonBlockConnection) P2PPush(request *P2PPushRequest, h *ResponseHand
	return conn.service.submit(msg, cmd, nil, h)
}

// BatchStart starts batch operations
func (conn *NonBlockConnection) BatchStart(h *ResponseHandler) error {
	msg := newMessage(kproto.Message_HMACAUTH)
	cmd := newCommand(kproto.Command_START_BATCH)

	cmd.Header.BatchID = &conn.batchID
	return conn.service.submit(msg, cmd, nil, h)
}

func (conn *NonBlockConnection) BatchPut(entry *Record, h *ResponseHandler) error {
	// Batch operation PUT
	return conn.put(entry, true, h)
}

// BatchDelete deletes object from kinetic device.
func (conn *NonBlockConnection) BatchDelete(entry *Record, h *ResponseHandler) error {
	// Batch operation DELETE
	return conn.delete(entry, true, h)
}

// BatchEnd commits all patch operations.
func (conn *NonBlockConnection) BatchEnd(h *ResponseHandler) error {
	msg := newMessage(kproto.Message_HMACAUTH)
	cmd := newCommand(kproto.Command_END_BATCH)

	cmd.Header.BatchID = &conn.batchID
	cmd.Body = &kproto.Command_Body{
		Batch: &kproto.Command_Batch{
			Count: &conn.batchCount,
		},
	}
	return conn.service.submit(msg, cmd, nil, h)
}

// BatchAbort abort current batch operations.
func (conn *NonBlockConnection) BatchAbort(h *ResponseHandler) error {
	msg := newMessage(kproto.Message_HMACAUTH)
	cmd := newCommand(kproto.Command_ABORT_BATCH)

	cmd.Header.BatchID = &conn.batchID
	return conn.service.submit(msg, cmd, nil, h)
}

// GetLog gets kinetic device Log information. Can request single LogType or multiple LogType.
func (conn *NonBlockConnection) GetLog(logs []LogType, h *ResponseHandler) error {
	msg := newMessage(kproto.Message_HMACAUTH)