Commit 25db6a25 authored by Zhu Yong's avatar Zhu Yong
Browse files

P2P Push operation, tested

parent 1f9346e0
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -154,7 +154,7 @@ func (conn *BlockConnection) Put(entry *Record) (Status, error) {
}

// P2Push
func (conn *BlockConnection) P2PPush(request *P2PPushRequest) ([]Status, Status, error) {
func (conn *BlockConnection) P2PPush(request *P2PPushRequest) (*P2PPushStatus, Status, error) {
	callback := &P2PPushCallback{}
	h := NewResponseHandler(callback)
	err := conn.nbc.P2PPush(request, h)
@@ -164,7 +164,7 @@ func (conn *BlockConnection) P2PPush(request *P2PPushRequest) ([]Status, Status,

	err = conn.nbc.Listen(h)

	return callback.Statuses, callback.Status(), err
	return &callback.P2PStatus, callback.Status(), err
}

// GetLog gets kinetic device Log information. Can request single LogType or multiple LogType.
+5 −4
Original line number Diff line number Diff line
@@ -81,16 +81,17 @@ func (c *GetVersionCallback) Success(resp *kproto.Command, value []byte) {
// P2PPushCallback is the Callback for Command_PEER2PEERPUSH
type P2PPushCallback struct {
	GenericCallback
	Statuses []Status
	P2PStatus P2PPushStatus
}

// Success extracts P2Push operation status from response message.
func (c *P2PPushCallback) Success(resp *kproto.Command, value []byte) {
	c.GenericCallback.Success(resp, value)
	c.Statuses = make([]Status, len(resp.GetBody().GetP2POperation().GetOperation()))
	c.P2PStatus.AllOperationsSucceeded = resp.GetBody().GetP2POperation().GetAllChildOperationsSucceeded()
	c.P2PStatus.PushStatus = make([]Status, len(resp.GetBody().GetP2POperation().GetOperation()))
	for k, op := range resp.GetBody().GetP2POperation().GetOperation() {
		c.Statuses[k].Code = convertStatusCodeFromProto(op.GetStatus().GetCode())
		c.Statuses[k].ErrorMsg = op.GetStatus().GetStatusMessage()
		c.P2PStatus.PushStatus[k].Code = convertStatusCodeFromProto(op.GetStatus().GetCode())
		c.P2PStatus.PushStatus[k].ErrorMsg = op.GetStatus().GetStatusMessage()
	}
}

+17 −6
Original line number Diff line number Diff line
@@ -649,17 +649,28 @@ type ACL struct {
	MaxPriority Priority
}

// P2PPushOperation
type P2PPushOperation struct {
	Key     []byte
	Key     []byte // Key for the object to push to peer kinetic device
	Version []byte
	NewKey  []byte
	NewKey  []byte // NewKey to be used for the object on peer kinetic device, if not specify, will be same as Key
	Force   bool
	Request *P2PPushRequest
	Request *P2PPushRequest // Chain P2PPushRequest, which will perform on peer kinetic device
}

// P2PPushRequest
type P2PPushRequest struct {
	HostName   string
	Port       int32
	HostName   string // Peer kinetic device IP / hostname
	Port       int32  // Peer kinetic drvice port
	Tls        bool
	Operations []P2PPushOperation
	Operations []P2PPushOperation // List of operations to perform on peer kinetic device
}

// P2PPushStatus holds the status for P2PPushOperations.
// AllOperationsSucceeded indicates whether all operations have Status SUCCESS
// When false, clients should traverse operation status codes to discover error cases.
// When true, no further error checking should be required.
type P2PPushStatus struct {
	AllOperationsSucceeded bool     // Overall status for all child operations
	PushStatus             []Status // individual operation status
}