Commit 058dca53 authored by Zhu Yong's avatar Zhu Yong
Browse files

Merge branch 'feature/batch' into develop

parents 25db6a25 1234e742
Loading
Loading
Loading
Loading
+56 −0
Original line number Diff line number Diff line
@@ -167,6 +167,62 @@ func (conn *BlockConnection) P2PPush(request *P2PPushRequest) (*P2PPushStatus, S
	return &callback.P2PStatus, callback.Status(), err
}

// BatchStart starts new batch operation, all following batch PUT / DELETE share same batch ID until
// BatchEnd or BatchAbort is called.
func (conn *BlockConnection) BatchStart() (Status, error) {
	callback := &GenericCallback{}
	h := NewResponseHandler(callback)
	err := conn.nbc.BatchStart(h)
	if err != nil {
		return callback.Status(), err
	}

	err = conn.nbc.Listen(h)

	return callback.Status(), err
}

// BatchPut puts objects to kinetic drive, as a batch job. Batch PUT / DELETE won't expect acknowledgement
// from kinetic device. Status for batch PUT / DELETE will only availabe in response message for BatchEnd.
func (conn *BlockConnection) BatchPut(entry *Record) error {
	return conn.nbc.BatchPut(entry)
}

// BatchDelete delete object from kinetic drive, as a batch job. Batch PUT / DELETE won't expect acknowledgement
// from kinetic device. Status for batch PUT / DELETE will only availabe in response message for BatchEnd.
func (conn *BlockConnection) BatchDelete(entry *Record) error {
	return conn.nbc.BatchDelete(entry)
}

// BatchEnd commits all batch jobs. Response from kinetic device will indicate succeeded jobs sequence number, or
// the first failed job sequence number if there is a failure.
func (conn *BlockConnection) BatchEnd() (*BatchStatus, Status, error) {
	callback := &BatchEndCallback{}
	h := NewResponseHandler(callback)
	err := conn.nbc.BatchEnd(h)
	if err != nil {
		return nil, callback.Status(), err
	}

	err = conn.nbc.Listen(h)

	return &callback.BatchStatus, callback.Status(), err
}

// BatchAbort aborts jobs in current batch operation.
func (conn *BlockConnection) BatchAbort() (Status, error) {
	callback := &GenericCallback{}
	h := NewResponseHandler(callback)
	err := conn.nbc.BatchAbort(h)
	if err != nil {
		return callback.Status(), err
	}

	err = conn.nbc.Listen(h)

	return callback.Status(), err
}

// GetLog gets kinetic device Log information. Can request single LogType or multiple LogType.
// On success, device Log information will return, and Status.Code = OK
func (conn *BlockConnection) GetLog(logs []LogType) (*Log, Status, error) {
+19 −2
Original line number Diff line number Diff line
@@ -11,7 +11,7 @@ import (
// Status return the MessateType operation status.
type Callback interface {
	Success(resp *kproto.Command, value []byte)
	Failure(status Status)
	Failure(resp *kproto.Command, status Status)
	Status() Status
}

@@ -27,7 +27,7 @@ func (c *GenericCallback) Success(resp *kproto.Command, value []byte) {
}

// Failure is called ResponseHandler when response message received from kinetic device with status code other than OK.
func (c *GenericCallback) Failure(status Status) {
func (c *GenericCallback) Failure(resp *kproto.Command, status Status) {
	c.status = status
}

@@ -106,3 +106,20 @@ func (c *GetLogCallback) Success(resp *kproto.Command, value []byte) {
	c.GenericCallback.Success(resp, value)
	c.Logs = getLogFromProto(resp)
}

type BatchEndCallback struct {
	GenericCallback
	BatchStatus BatchStatus
}

func (c *BatchEndCallback) Success(resp *kproto.Command, value []byte) {
	c.GenericCallback.Success(resp, value)
	c.BatchStatus.DoneSequence = resp.GetBody().GetBatch().GetSequence()
	c.BatchStatus.FailedSequence = resp.GetBody().GetBatch().GetFailedSequence()
}

func (c *BatchEndCallback) Failure(resp *kproto.Command, status Status) {
	c.GenericCallback.Failure(resp, status)
	c.BatchStatus.DoneSequence = resp.GetBody().GetBatch().GetSequence()
	c.BatchStatus.FailedSequence = resp.GetBody().GetBatch().GetFailedSequence()
}
+2 −2
Original line number Diff line number Diff line
@@ -20,7 +20,7 @@ func (h *ResponseHandler) handle(cmd *kproto.Command, value []byte) error {
			if cmd.GetStatus().GetCode() == kproto.Command_Status_SUCCESS {
				h.callback.Success(cmd, value)
			} else {
				h.callback.Failure(getStatusFromProto(cmd))
				h.callback.Failure(cmd, getStatusFromProto(cmd))
			}
		} else {
			klog.Warn("Other status received")
@@ -37,7 +37,7 @@ func (h *ResponseHandler) handle(cmd *kproto.Command, value []byte) error {

func (h *ResponseHandler) fail(s Status) {
	if h.callback != nil {
		h.callback.Failure(s)
		h.callback.Failure(nil, s)
	}
	h.cond.L.Lock()
	h.done = true
+6 −0
Original line number Diff line number Diff line
@@ -674,3 +674,9 @@ type P2PPushStatus struct {
	AllOperationsSucceeded bool     // Overall status for all child operations
	PushStatus             []Status // individual operation status
}

// BatchStatus indicates status of all operations in a batch commit.
type BatchStatus struct {
	DoneSequence   []int64 // All sequence Ids of those commands (PUT/DELETE) performed successfully in the batch
	FailedSequence int64   // Non 0 value means the first failed operation sequence in the batch, 0 means no failure
}
+89 −6
Original line number Diff line number Diff line
@@ -2,6 +2,7 @@ package kinetic

import (
	"bytes"
	"sync"

	kproto "github.com/yongzhy/kinetic-go/proto"
)
@@ -10,6 +11,9 @@ import (
// response message from device.
type NonBlockConnection struct {
	service    *networkService
	batchID    uint32 // Current batch Operation ID
	batchCount int32  // Current batch operation count
	batchMu    sync.Mutex
}

// Helper function to establish non-block connection to device.
@@ -23,7 +27,7 @@ func NewNonBlockConnection(op ClientOptions) (*NonBlockConnection, error) {
		return nil, err
	}

	return &NonBlockConnection{service}, nil
	return &NonBlockConnection{service: service, batchID: 0, batchCount: 0}, nil
}

// NoOp does nothing but wait for drive to return response.
@@ -105,11 +109,15 @@ func (conn *NonBlockConnection) Flush(h *ResponseHandler) error {
	return conn.service.submit(msg, cmd, nil, h)
}

// Delete deletes object from kinetic device.
func (conn *NonBlockConnection) Delete(entry *Record, h *ResponseHandler) error {
func (conn *NonBlockConnection) delete(entry *Record, batch bool, h *ResponseHandler) error {
	msg := newMessage(kproto.Message_HMACAUTH)
	cmd := newCommand(kproto.Command_DELETE)

	// Bathc operation, batchID needed
	if batch {
		cmd.Header.BatchID = &conn.batchID
	}

	sync := convertSyncToProto(entry.Sync)
	//algo := convertAlgoToProto(entry.Algo)
	cmd.Body = &kproto.Command_Body{
@@ -124,11 +132,21 @@ func (conn *NonBlockConnection) Delete(entry *Record, h *ResponseHandler) error
	return conn.service.submit(msg, cmd, nil, h)
}

// Put store object to kinetic device.
func (conn *NonBlockConnection) Put(entry *Record, h *ResponseHandler) error {
// Delete deletes object from kinetic device.
func (conn *NonBlockConnection) Delete(entry *Record, h *ResponseHandler) error {
	// Normal DELETE operation, not batch operation.
	return conn.delete(entry, false, h)
}

func (conn *NonBlockConnection) put(entry *Record, batch bool, h *ResponseHandler) error {
	msg := newMessage(kproto.Message_HMACAUTH)
	cmd := newCommand(kproto.Command_PUT)

	// Bathc operation, batchID needed
	if batch {
		cmd.Header.BatchID = &conn.batchID
	}

	sync := convertSyncToProto(entry.Sync)
	algo := convertAlgoToProto(entry.Algo)
	cmd.Body = &kproto.Command_Body{
@@ -144,6 +162,12 @@ func (conn *NonBlockConnection) Put(entry *Record, h *ResponseHandler) error {
	return conn.service.submit(msg, cmd, entry.Value, h)
}

// Put store object to kinetic device.
func (conn *NonBlockConnection) Put(entry *Record, h *ResponseHandler) error {
	// Normal PUT operation, not batch operation
	return conn.put(entry, false, h)
}

func (conn *NonBlockConnection) buildP2PMessage(request *P2PPushRequest) *kproto.Command_P2POperation {
	var p2pop *kproto.Command_P2POperation
	if request != nil {
@@ -183,6 +207,65 @@ func (conn *NonBlockConnection) P2PPush(request *P2PPushRequest, h *ResponseHand
	return conn.service.submit(msg, cmd, nil, h)
}

// BatchStart starts new batch operation, all following batch PUT / DELETE share same batch ID until
// BatchEnd or BatchAbort is called.
func (conn *NonBlockConnection) BatchStart(h *ResponseHandler) error {
	msg := newMessage(kproto.Message_HMACAUTH)
	cmd := newCommand(kproto.Command_START_BATCH)

	// TODO: Need to confirm can start new batch if current one not end / abort yet???
	conn.batchMu.Lock()
	conn.batchID++
	conn.batchCount = 0 // Reset
	conn.batchMu.Unlock()
	cmd.Header.BatchID = &conn.batchID
	return conn.service.submit(msg, cmd, nil, h)
}

// BatchPut puts objects to kinetic drive, as a batch job. Batch PUT / DELETE won't expect acknowledgement
// from kinetic device. Status for batch PUT / DELETE will only availabe in response message for BatchEnd.
func (conn *NonBlockConnection) BatchPut(entry *Record) error {
	// Batch operation PUT
	conn.batchMu.Lock()
	conn.batchCount++
	conn.batchMu.Unlock()
	return conn.put(entry, true, nil)
}

// BatchDelete delete object from kinetic drive, as a batch job. Batch PUT / DELETE won't expect acknowledgement
// from kinetic device. Status for batch PUT / DELETE will only availabe in response message for BatchEnd.
func (conn *NonBlockConnection) BatchDelete(entry *Record) error {
	// Batch operation DELETE
	conn.batchMu.Lock()
	conn.batchCount++
	conn.batchMu.Unlock()
	return conn.delete(entry, true, nil)
}

// BatchEnd commits all batch jobs. Response from kinetic device will indicate succeeded jobs sequence number, or
// the first failed job sequence number if there is a failure.
func (conn *NonBlockConnection) BatchEnd(h *ResponseHandler) error {
	msg := newMessage(kproto.Message_HMACAUTH)
	cmd := newCommand(kproto.Command_END_BATCH)

	cmd.Header.BatchID = &conn.batchID
	cmd.Body = &kproto.Command_Body{
		Batch: &kproto.Command_Batch{
			Count: &conn.batchCount,
		},
	}
	return conn.service.submit(msg, cmd, nil, h)
}

// BatchAbort aborts jobs in current batch operation.
func (conn *NonBlockConnection) BatchAbort(h *ResponseHandler) error {
	msg := newMessage(kproto.Message_HMACAUTH)
	cmd := newCommand(kproto.Command_ABORT_BATCH)

	cmd.Header.BatchID = &conn.batchID
	return conn.service.submit(msg, cmd, nil, h)
}

// GetLog gets kinetic device Log information. Can request single LogType or multiple LogType.
func (conn *NonBlockConnection) GetLog(logs []LogType, h *ResponseHandler) error {
	msg := newMessage(kproto.Message_HMACAUTH)
Loading