Commit 1234e742 authored by Zhu Yong's avatar Zhu Yong
Browse files

Batch operation, tested with simulator

parent 93f8a790
Loading
Loading
Loading
Loading
+17 −28
Original line number Diff line number Diff line
@@ -167,6 +167,8 @@ func (conn *BlockConnection) P2PPush(request *P2PPushRequest) ([]Status, Status,
	return callback.Statuses, callback.Status(), err
}

// BatchStart starts new batch operation, all following batch PUT / DELETE share same batch ID until
// BatchEnd or BatchAbort is called.
func (conn *BlockConnection) BatchStart() (Status, error) {
	callback := &GenericCallback{}
	h := NewResponseHandler(callback)
@@ -180,47 +182,34 @@ func (conn *BlockConnection) BatchStart() (Status, error) {
	return callback.Status(), err
}

func (conn *BlockConnection) BatchPut(entry *Record) (Status, error) {
	// TODO: combine normal Put and BatchPut
	callback := &GenericCallback{}
	h := NewResponseHandler(callback)
	err := conn.nbc.BatchPut(entry, h)
	if err != nil {
		return callback.Status(), err
	}

	err = conn.nbc.Listen(h)

	return callback.Status(), err
}

func (conn *BlockConnection) BatchDelete(entry *Record) (Status, error) {
	// TODO: combine normal Delete and BatchDelete
	callback := &GenericCallback{}
	h := NewResponseHandler(callback)
	err := conn.nbc.BatchDelete(entry, h)
	if err != nil {
		return callback.Status(), err
// BatchPut puts objects to kinetic drive, as a batch job. Batch PUT / DELETE won't expect acknowledgement
// from kinetic device. Status for batch PUT / DELETE will only availabe in response message for BatchEnd.
func (conn *BlockConnection) BatchPut(entry *Record) error {
	return conn.nbc.BatchPut(entry)
}

	err = conn.nbc.Listen(h)

	return callback.Status(), err
// BatchDelete delete object from kinetic drive, as a batch job. Batch PUT / DELETE won't expect acknowledgement
// from kinetic device. Status for batch PUT / DELETE will only availabe in response message for BatchEnd.
func (conn *BlockConnection) BatchDelete(entry *Record) error {
	return conn.nbc.BatchDelete(entry)
}

func (conn *BlockConnection) BatchEnd() (Status, error) {
	callback := &GenericCallback{}
// BatchEnd commits all batch jobs. Response from kinetic device will indicate succeeded jobs sequence number, or
// the first failed job sequence number if there is a failure.
func (conn *BlockConnection) BatchEnd() (*BatchStatus, Status, error) {
	callback := &BatchEndCallback{}
	h := NewResponseHandler(callback)
	err := conn.nbc.BatchEnd(h)
	if err != nil {
		return callback.Status(), err
		return nil, callback.Status(), err
	}

	err = conn.nbc.Listen(h)

	return callback.Status(), err
	return &callback.BatchStatus, callback.Status(), err
}

// BatchAbort aborts jobs in current batch operation.
func (conn *BlockConnection) BatchAbort() (Status, error) {
	callback := &GenericCallback{}
	h := NewResponseHandler(callback)
+19 −2
Original line number Diff line number Diff line
@@ -11,7 +11,7 @@ import (
// Status return the MessateType operation status.
type Callback interface {
	Success(resp *kproto.Command, value []byte)
	Failure(status Status)
	Failure(resp *kproto.Command, status Status)
	Status() Status
}

@@ -27,7 +27,7 @@ func (c *GenericCallback) Success(resp *kproto.Command, value []byte) {
}

// Failure is called ResponseHandler when response message received from kinetic device with status code other than OK.
func (c *GenericCallback) Failure(status Status) {
func (c *GenericCallback) Failure(resp *kproto.Command, status Status) {
	c.status = status
}

@@ -105,3 +105,20 @@ func (c *GetLogCallback) Success(resp *kproto.Command, value []byte) {
	c.GenericCallback.Success(resp, value)
	c.Logs = getLogFromProto(resp)
}

type BatchEndCallback struct {
	GenericCallback
	BatchStatus BatchStatus
}

func (c *BatchEndCallback) Success(resp *kproto.Command, value []byte) {
	c.GenericCallback.Success(resp, value)
	c.BatchStatus.DoneSequence = resp.GetBody().GetBatch().GetSequence()
	c.BatchStatus.FailedSequence = resp.GetBody().GetBatch().GetFailedSequence()
}

func (c *BatchEndCallback) Failure(resp *kproto.Command, status Status) {
	c.GenericCallback.Failure(resp, status)
	c.BatchStatus.DoneSequence = resp.GetBody().GetBatch().GetSequence()
	c.BatchStatus.FailedSequence = resp.GetBody().GetBatch().GetFailedSequence()
}
+2 −2
Original line number Diff line number Diff line
@@ -20,7 +20,7 @@ func (h *ResponseHandler) handle(cmd *kproto.Command, value []byte) error {
			if cmd.GetStatus().GetCode() == kproto.Command_Status_SUCCESS {
				h.callback.Success(cmd, value)
			} else {
				h.callback.Failure(getStatusFromProto(cmd))
				h.callback.Failure(cmd, getStatusFromProto(cmd))
			}
		} else {
			klog.Warn("Other status received")
@@ -37,7 +37,7 @@ func (h *ResponseHandler) handle(cmd *kproto.Command, value []byte) error {

func (h *ResponseHandler) fail(s Status) {
	if h.callback != nil {
		h.callback.Failure(s)
		h.callback.Failure(nil, s)
	}
	h.cond.L.Lock()
	h.done = true
+6 −0
Original line number Diff line number Diff line
@@ -663,3 +663,9 @@ type P2PPushRequest struct {
	Tls        bool
	Operations []P2PPushOperation
}

// BatchStatus indicates status of all operations in a batch commit.
type BatchStatus struct {
	DoneSequence   []int64 // All sequence Ids of those commands (PUT/DELETE) performed successfully in the batch
	FailedSequence int64   // Non 0 value means the first failed operation sequence in the batch, 0 means no failure
}
+24 −8
Original line number Diff line number Diff line
@@ -207,27 +207,43 @@ func (conn *NonBlockConnection) P2PPush(request *P2PPushRequest, h *ResponseHand
	return conn.service.submit(msg, cmd, nil, h)
}

// BatchStart starts batch operations
// BatchStart starts new batch operation, all following batch PUT / DELETE share same batch ID until
// BatchEnd or BatchAbort is called.
func (conn *NonBlockConnection) BatchStart(h *ResponseHandler) error {
	msg := newMessage(kproto.Message_HMACAUTH)
	cmd := newCommand(kproto.Command_START_BATCH)

	// TODO: Need to confirm can start new batch if current one not end / abort yet???
	conn.batchMu.Lock()
	conn.batchID++
	conn.batchCount = 0 // Reset
	conn.batchMu.Unlock()
	cmd.Header.BatchID = &conn.batchID
	return conn.service.submit(msg, cmd, nil, h)
}

func (conn *NonBlockConnection) BatchPut(entry *Record, h *ResponseHandler) error {
// BatchPut puts objects to kinetic drive, as a batch job. Batch PUT / DELETE won't expect acknowledgement
// from kinetic device. Status for batch PUT / DELETE will only availabe in response message for BatchEnd.
func (conn *NonBlockConnection) BatchPut(entry *Record) error {
	// Batch operation PUT
	return conn.put(entry, true, h)
	conn.batchMu.Lock()
	conn.batchCount++
	conn.batchMu.Unlock()
	return conn.put(entry, true, nil)
}

// BatchDelete deletes object from kinetic device.
func (conn *NonBlockConnection) BatchDelete(entry *Record, h *ResponseHandler) error {
// BatchDelete delete object from kinetic drive, as a batch job. Batch PUT / DELETE won't expect acknowledgement
// from kinetic device. Status for batch PUT / DELETE will only availabe in response message for BatchEnd.
func (conn *NonBlockConnection) BatchDelete(entry *Record) error {
	// Batch operation DELETE
	return conn.delete(entry, true, h)
	conn.batchMu.Lock()
	conn.batchCount++
	conn.batchMu.Unlock()
	return conn.delete(entry, true, nil)
}

// BatchEnd commits all patch operations.
// BatchEnd commits all batch jobs. Response from kinetic device will indicate succeeded jobs sequence number, or
// the first failed job sequence number if there is a failure.
func (conn *NonBlockConnection) BatchEnd(h *ResponseHandler) error {
	msg := newMessage(kproto.Message_HMACAUTH)
	cmd := newCommand(kproto.Command_END_BATCH)
@@ -241,7 +257,7 @@ func (conn *NonBlockConnection) BatchEnd(h *ResponseHandler) error {
	return conn.service.submit(msg, cmd, nil, h)
}

// BatchAbort abort current batch operations.
// BatchAbort aborts jobs in current batch operation.
func (conn *NonBlockConnection) BatchAbort(h *ResponseHandler) error {
	msg := newMessage(kproto.Message_HMACAUTH)
	cmd := newCommand(kproto.Command_ABORT_BATCH)
Loading