Commit 8eda1c96 authored by Zhu Yong's avatar Zhu Yong
Browse files

Merge branch 'develop'

parents e4daccb5 058dca53
Loading
Loading
Loading
Loading
+58 −2
Original line number Diff line number Diff line
@@ -154,7 +154,7 @@ func (conn *BlockConnection) Put(entry *Record) (Status, error) {
}

// P2Push
func (conn *BlockConnection) P2PPush(request *P2PPushRequest) ([]Status, Status, error) {
func (conn *BlockConnection) P2PPush(request *P2PPushRequest) (*P2PPushStatus, Status, error) {
	callback := &P2PPushCallback{}
	h := NewResponseHandler(callback)
	err := conn.nbc.P2PPush(request, h)
@@ -164,7 +164,63 @@ func (conn *BlockConnection) P2PPush(request *P2PPushRequest) ([]Status, Status,

	err = conn.nbc.Listen(h)

	return callback.Statuses, callback.Status(), err
	return &callback.P2PStatus, callback.Status(), err
}

// BatchStart starts new batch operation, all following batch PUT / DELETE share same batch ID until
// BatchEnd or BatchAbort is called.
func (conn *BlockConnection) BatchStart() (Status, error) {
	callback := &GenericCallback{}
	h := NewResponseHandler(callback)
	err := conn.nbc.BatchStart(h)
	if err != nil {
		return callback.Status(), err
	}

	err = conn.nbc.Listen(h)

	return callback.Status(), err
}

// BatchPut puts objects to kinetic drive, as a batch job. Batch PUT / DELETE won't expect acknowledgement
// from kinetic device. Status for batch PUT / DELETE will only availabe in response message for BatchEnd.
func (conn *BlockConnection) BatchPut(entry *Record) error {
	return conn.nbc.BatchPut(entry)
}

// BatchDelete delete object from kinetic drive, as a batch job. Batch PUT / DELETE won't expect acknowledgement
// from kinetic device. Status for batch PUT / DELETE will only availabe in response message for BatchEnd.
func (conn *BlockConnection) BatchDelete(entry *Record) error {
	return conn.nbc.BatchDelete(entry)
}

// BatchEnd commits all batch jobs. Response from kinetic device will indicate succeeded jobs sequence number, or
// the first failed job sequence number if there is a failure.
func (conn *BlockConnection) BatchEnd() (*BatchStatus, Status, error) {
	callback := &BatchEndCallback{}
	h := NewResponseHandler(callback)
	err := conn.nbc.BatchEnd(h)
	if err != nil {
		return nil, callback.Status(), err
	}

	err = conn.nbc.Listen(h)

	return &callback.BatchStatus, callback.Status(), err
}

// BatchAbort aborts jobs in current batch operation.
func (conn *BlockConnection) BatchAbort() (Status, error) {
	callback := &GenericCallback{}
	h := NewResponseHandler(callback)
	err := conn.nbc.BatchAbort(h)
	if err != nil {
		return callback.Status(), err
	}

	err = conn.nbc.Listen(h)

	return callback.Status(), err
}

// GetLog gets kinetic device Log information. Can request single LogType or multiple LogType.
+24 −6
Original line number Diff line number Diff line
@@ -11,7 +11,7 @@ import (
// Status return the MessateType operation status.
type Callback interface {
	Success(resp *kproto.Command, value []byte)
	Failure(status Status)
	Failure(resp *kproto.Command, status Status)
	Status() Status
}

@@ -27,7 +27,7 @@ func (c *GenericCallback) Success(resp *kproto.Command, value []byte) {
}

// Failure is called ResponseHandler when response message received from kinetic device with status code other than OK.
func (c *GenericCallback) Failure(status Status) {
func (c *GenericCallback) Failure(resp *kproto.Command, status Status) {
	c.status = status
}

@@ -81,16 +81,17 @@ func (c *GetVersionCallback) Success(resp *kproto.Command, value []byte) {
// P2PPushCallback is the Callback for Command_PEER2PEERPUSH
type P2PPushCallback struct {
	GenericCallback
	Statuses []Status
	P2PStatus P2PPushStatus
}

// Success extracts P2Push operation status from response message.
func (c *P2PPushCallback) Success(resp *kproto.Command, value []byte) {
	c.GenericCallback.Success(resp, value)
	c.Statuses = make([]Status, len(resp.GetBody().GetP2POperation().GetOperation()))
	c.P2PStatus.AllOperationsSucceeded = resp.GetBody().GetP2POperation().GetAllChildOperationsSucceeded()
	c.P2PStatus.PushStatus = make([]Status, len(resp.GetBody().GetP2POperation().GetOperation()))
	for k, op := range resp.GetBody().GetP2POperation().GetOperation() {
		c.Statuses[k].Code = convertStatusCodeFromProto(op.GetStatus().GetCode())
		c.Statuses[k].ErrorMsg = op.GetStatus().GetStatusMessage()
		c.P2PStatus.PushStatus[k].Code = convertStatusCodeFromProto(op.GetStatus().GetCode())
		c.P2PStatus.PushStatus[k].ErrorMsg = op.GetStatus().GetStatusMessage()
	}
}

@@ -105,3 +106,20 @@ func (c *GetLogCallback) Success(resp *kproto.Command, value []byte) {
	c.GenericCallback.Success(resp, value)
	c.Logs = getLogFromProto(resp)
}

type BatchEndCallback struct {
	GenericCallback
	BatchStatus BatchStatus
}

func (c *BatchEndCallback) Success(resp *kproto.Command, value []byte) {
	c.GenericCallback.Success(resp, value)
	c.BatchStatus.DoneSequence = resp.GetBody().GetBatch().GetSequence()
	c.BatchStatus.FailedSequence = resp.GetBody().GetBatch().GetFailedSequence()
}

func (c *BatchEndCallback) Failure(resp *kproto.Command, status Status) {
	c.GenericCallback.Failure(resp, status)
	c.BatchStatus.DoneSequence = resp.GetBody().GetBatch().GetSequence()
	c.BatchStatus.FailedSequence = resp.GetBody().GetBatch().GetFailedSequence()
}
+2 −2
Original line number Diff line number Diff line
@@ -20,7 +20,7 @@ func (h *ResponseHandler) handle(cmd *kproto.Command, value []byte) error {
			if cmd.GetStatus().GetCode() == kproto.Command_Status_SUCCESS {
				h.callback.Success(cmd, value)
			} else {
				h.callback.Failure(getStatusFromProto(cmd))
				h.callback.Failure(cmd, getStatusFromProto(cmd))
			}
		} else {
			klog.Warn("Other status received")
@@ -37,7 +37,7 @@ func (h *ResponseHandler) handle(cmd *kproto.Command, value []byte) error {

func (h *ResponseHandler) fail(s Status) {
	if h.callback != nil {
		h.callback.Failure(s)
		h.callback.Failure(nil, s)
	}
	h.cond.L.Lock()
	h.done = true
+23 −6
Original line number Diff line number Diff line
@@ -649,17 +649,34 @@ type ACL struct {
	MaxPriority Priority
}

// P2PPushOperation
type P2PPushOperation struct {
	Key     []byte
	Key     []byte // Key for the object to push to peer kinetic device
	Version []byte
	NewKey  []byte
	NewKey  []byte // NewKey to be used for the object on peer kinetic device, if not specify, will be same as Key
	Force   bool
	Request *P2PPushRequest
	Request *P2PPushRequest // Chain P2PPushRequest, which will perform on peer kinetic device
}

// P2PPushRequest
type P2PPushRequest struct {
	HostName   string
	Port       int32
	HostName   string // Peer kinetic device IP / hostname
	Port       int32  // Peer kinetic drvice port
	Tls        bool
	Operations []P2PPushOperation
	Operations []P2PPushOperation // List of operations to perform on peer kinetic device
}

// P2PPushStatus holds the status for P2PPushOperations.
// AllOperationsSucceeded indicates whether all operations have Status SUCCESS
// When false, clients should traverse operation status codes to discover error cases.
// When true, no further error checking should be required.
type P2PPushStatus struct {
	AllOperationsSucceeded bool     // Overall status for all child operations
	PushStatus             []Status // individual operation status
}

// BatchStatus indicates status of all operations in a batch commit.
type BatchStatus struct {
	DoneSequence   []int64 // All sequence Ids of those commands (PUT/DELETE) performed successfully in the batch
	FailedSequence int64   // Non 0 value means the first failed operation sequence in the batch, 0 means no failure
}
+89 −6
Original line number Diff line number Diff line
@@ -2,6 +2,7 @@ package kinetic

import (
	"bytes"
	"sync"

	kproto "github.com/yongzhy/kinetic-go/proto"
)
@@ -10,6 +11,9 @@ import (
// response message from device.
type NonBlockConnection struct {
	service    *networkService
	batchID    uint32 // Current batch Operation ID
	batchCount int32  // Current batch operation count
	batchMu    sync.Mutex
}

// Helper function to establish non-block connection to device.
@@ -23,7 +27,7 @@ func NewNonBlockConnection(op ClientOptions) (*NonBlockConnection, error) {
		return nil, err
	}

	return &NonBlockConnection{service}, nil
	return &NonBlockConnection{service: service, batchID: 0, batchCount: 0}, nil
}

// NoOp does nothing but wait for drive to return response.
@@ -105,11 +109,15 @@ func (conn *NonBlockConnection) Flush(h *ResponseHandler) error {
	return conn.service.submit(msg, cmd, nil, h)
}

// Delete deletes object from kinetic device.
func (conn *NonBlockConnection) Delete(entry *Record, h *ResponseHandler) error {
func (conn *NonBlockConnection) delete(entry *Record, batch bool, h *ResponseHandler) error {
	msg := newMessage(kproto.Message_HMACAUTH)
	cmd := newCommand(kproto.Command_DELETE)

	// Bathc operation, batchID needed
	if batch {
		cmd.Header.BatchID = &conn.batchID
	}

	sync := convertSyncToProto(entry.Sync)
	//algo := convertAlgoToProto(entry.Algo)
	cmd.Body = &kproto.Command_Body{
@@ -124,11 +132,21 @@ func (conn *NonBlockConnection) Delete(entry *Record, h *ResponseHandler) error
	return conn.service.submit(msg, cmd, nil, h)
}

// Put store object to kinetic device.
func (conn *NonBlockConnection) Put(entry *Record, h *ResponseHandler) error {
// Delete deletes object from kinetic device.
func (conn *NonBlockConnection) Delete(entry *Record, h *ResponseHandler) error {
	// Normal DELETE operation, not batch operation.
	return conn.delete(entry, false, h)
}

func (conn *NonBlockConnection) put(entry *Record, batch bool, h *ResponseHandler) error {
	msg := newMessage(kproto.Message_HMACAUTH)
	cmd := newCommand(kproto.Command_PUT)

	// Bathc operation, batchID needed
	if batch {
		cmd.Header.BatchID = &conn.batchID
	}

	sync := convertSyncToProto(entry.Sync)
	algo := convertAlgoToProto(entry.Algo)
	cmd.Body = &kproto.Command_Body{
@@ -144,6 +162,12 @@ func (conn *NonBlockConnection) Put(entry *Record, h *ResponseHandler) error {
	return conn.service.submit(msg, cmd, entry.Value, h)
}

// Put store object to kinetic device.
func (conn *NonBlockConnection) Put(entry *Record, h *ResponseHandler) error {
	// Normal PUT operation, not batch operation
	return conn.put(entry, false, h)
}

func (conn *NonBlockConnection) buildP2PMessage(request *P2PPushRequest) *kproto.Command_P2POperation {
	var p2pop *kproto.Command_P2POperation
	if request != nil {
@@ -183,6 +207,65 @@ func (conn *NonBlockConnection) P2PPush(request *P2PPushRequest, h *ResponseHand
	return conn.service.submit(msg, cmd, nil, h)
}

// BatchStart starts new batch operation, all following batch PUT / DELETE share same batch ID until
// BatchEnd or BatchAbort is called.
func (conn *NonBlockConnection) BatchStart(h *ResponseHandler) error {
	msg := newMessage(kproto.Message_HMACAUTH)
	cmd := newCommand(kproto.Command_START_BATCH)

	// TODO: Need to confirm can start new batch if current one not end / abort yet???
	conn.batchMu.Lock()
	conn.batchID++
	conn.batchCount = 0 // Reset
	conn.batchMu.Unlock()
	cmd.Header.BatchID = &conn.batchID
	return conn.service.submit(msg, cmd, nil, h)
}

// BatchPut puts objects to kinetic drive, as a batch job. Batch PUT / DELETE won't expect acknowledgement
// from kinetic device. Status for batch PUT / DELETE will only availabe in response message for BatchEnd.
func (conn *NonBlockConnection) BatchPut(entry *Record) error {
	// Batch operation PUT
	conn.batchMu.Lock()
	conn.batchCount++
	conn.batchMu.Unlock()
	return conn.put(entry, true, nil)
}

// BatchDelete delete object from kinetic drive, as a batch job. Batch PUT / DELETE won't expect acknowledgement
// from kinetic device. Status for batch PUT / DELETE will only availabe in response message for BatchEnd.
func (conn *NonBlockConnection) BatchDelete(entry *Record) error {
	// Batch operation DELETE
	conn.batchMu.Lock()
	conn.batchCount++
	conn.batchMu.Unlock()
	return conn.delete(entry, true, nil)
}

// BatchEnd commits all batch jobs. Response from kinetic device will indicate succeeded jobs sequence number, or
// the first failed job sequence number if there is a failure.
func (conn *NonBlockConnection) BatchEnd(h *ResponseHandler) error {
	msg := newMessage(kproto.Message_HMACAUTH)
	cmd := newCommand(kproto.Command_END_BATCH)

	cmd.Header.BatchID = &conn.batchID
	cmd.Body = &kproto.Command_Body{
		Batch: &kproto.Command_Batch{
			Count: &conn.batchCount,
		},
	}
	return conn.service.submit(msg, cmd, nil, h)
}

// BatchAbort aborts jobs in current batch operation.
func (conn *NonBlockConnection) BatchAbort(h *ResponseHandler) error {
	msg := newMessage(kproto.Message_HMACAUTH)
	cmd := newCommand(kproto.Command_ABORT_BATCH)

	cmd.Header.BatchID = &conn.batchID
	return conn.service.submit(msg, cmd, nil, h)
}

// GetLog gets kinetic device Log information. Can request single LogType or multiple LogType.
func (conn *NonBlockConnection) GetLog(logs []LogType, h *ResponseHandler) error {
	msg := newMessage(kproto.Message_HMACAUTH)
Loading