Commit 56c97764 authored by Zhu Yong's avatar Zhu Yong
Browse files

Add P2PPush operation

parent 6baab02a
Loading
Loading
Loading
Loading
+15 −0
Original line number Diff line number Diff line
@@ -144,6 +144,21 @@ func (conn *BlockConnection) Put(entry *Record) (Status, error) {
	return callback.Status(), nil
}

func (conn *BlockConnection) P2PPush(request *P2PPushRequest) ([]Status, Status, error) {
	callback := &P2PPushCallback{}
	h := NewResponseHandler(callback)
	err := conn.nbc.P2PPush(request, h)
	if err != nil {
		return nil, callback.Status(), err
	}

	for callback.Done() == false {
		conn.nbc.Run()
	}

	return callback.Statuses, callback.Status(), nil
}

func (conn *BlockConnection) GetLog(logs []LogType) (Log, Status, error) {
	callback := &GetLogCallback{}
	h := NewResponseHandler(callback)
+15 −0
Original line number Diff line number Diff line
@@ -75,6 +75,21 @@ func (c *GetVersionCallback) Success(resp *kproto.Command, value []byte) {
	c.Version = resp.GetBody().GetKeyValue().GetDbVersion()
}

// Callback for Command_PEER2PEERPUSH
type P2PPushCallback struct {
	GenericCallback
	Statuses []Status
}

func (c *P2PPushCallback) Success(resp *kproto.Command, value []byte) {
	c.GenericCallback.Success(resp, value)
	c.Statuses = make([]Status, len(resp.GetBody().GetP2POperation().GetOperation()))
	for k, op := range resp.GetBody().GetP2POperation().GetOperation() {
		c.Statuses[k].Code = convertStatusCodeFromProto(op.GetStatus().GetCode())
		c.Statuses[k].ErrorMsg = op.GetStatus().GetStatusMessage()
	}
}

// Callback for Command_GETLOG Message
type GetLogCallback struct {
	GenericCallback
+15 −0
Original line number Diff line number Diff line
@@ -601,3 +601,18 @@ type SecurityACL struct {
	Scope       []SecurityACLScope
	MaxPriority Priority
}

type P2PPushOperation struct {
	Key     []byte
	Version []byte
	NewKey  []byte
	Force   bool
	Request *P2PPushRequest
}

type P2PPushRequest struct {
	HostName   string
	Port       int32
	Tls        bool
	Operations []P2PPushOperation
}
+40 −0
Original line number Diff line number Diff line
package kinetic

import (
	"bytes"

	kproto "github.com/yongzhy/kinetic-go/proto"
)

@@ -130,6 +132,44 @@ func (conn *NonBlockConnection) Put(entry *Record, h *ResponseHandler) error {
	return conn.service.submit(msg, cmd, entry.Value, h)
}

func (conn *NonBlockConnection) buildP2PMessage(request *P2PPushRequest) *kproto.Command_P2POperation {
	var p2pop *kproto.Command_P2POperation = nil
	if request != nil {
		p2pop = &kproto.Command_P2POperation{
			Peer: &kproto.Command_P2POperation_Peer{
				Hostname: &request.HostName,
				Port:     &request.Port,
				Tls:      &request.Tls,
			},
			Operation: make([]*kproto.Command_P2POperation_Operation, len(request.Operations)),
		}
		for k, op := range request.Operations {
			p2pop.Operation[k] = &kproto.Command_P2POperation_Operation{
				Key:     op.Key,
				Version: op.Version,
				NewKey:  nil,
				Force:   &op.Force,
				P2Pop:   conn.buildP2PMessage(op.Request),
			}
			if op.NewKey != nil && !bytes.Equal(op.NewKey, op.Key) {
				p2pop.Operation[k].NewKey = op.NewKey
			}
		}
	}
	return p2pop
}

func (conn *NonBlockConnection) P2PPush(request *P2PPushRequest, h *ResponseHandler) error {
	msg := newMessage(kproto.Message_HMACAUTH)
	cmd := newCommand(kproto.Command_PEER2PEERPUSH)

	cmd.Body = &kproto.Command_Body{
		P2POperation: conn.buildP2PMessage(request),
	}

	return conn.service.submit(msg, cmd, nil, h)
}

func (conn *NonBlockConnection) GetLog(logs []LogType, h *ResponseHandler) error {
	msg := newMessage(kproto.Message_HMACAUTH)

+0 −58
Original line number Diff line number Diff line
@@ -196,62 +196,4 @@ func getStatusFromProto(cmd *kproto.Command) Status {
	msg := cmd.GetStatus().GetStatusMessage()

	return Status{code, msg}

	/*
		switch code {
		case CLIENT_IO_ERROR:
			return Status{code, "IO error"}
		case CLIENT_SHUTDOWN:
			return Status{code, "Client shutdown"}
		case PROTOCOL_ERROR_RESPONSE_NO_ACKSEQUENCE:
			return Status{code, "Response did not contain ack sequence"}
		case CLIENT_RESPONSE_HMAC_VERIFICATION_ERROR:
			return Status{code, "Response HMAC verification failed"}
		case REMOTE_HMAC_ERROR:
			return Status{code, "Remote HMAC verification failed"}
		case REMOTE_NOT_AUTHORIZED:
			return Status{code, "Not authorized"}
		case REMOTE_CLUSTER_VERSION_MISMATCH:
			expected_cluster_version := cmd.GetHeader().GetClusterVersion()
			return Status{code, "Cluster version mismatch " + string(expected_cluster_version)}
		case REMOTE_INTERNAL_ERROR:
			return Status{code, "Remote internal error"}
		case REMOTE_HEADER_REQUIRED:
			return Status{code, "Request requires a header to be set"}
		case REMOTE_NOT_FOUND:
			return Status{code, "Key not found"}
		case REMOTE_VERSION_MISMATCH:
			return Status{code, "Version mismatch"}
		case REMOTE_SERVICE_BUSY:
			return Status{code, "Remote service is busy"}
		case REMOTE_EXPIRED:
			return Status{code, "Remote timeout"}
		case REMOTE_DATA_ERROR:
			return Status{code, "Remote transient data error"}
		case REMOTE_PERM_DATA_ERROR:
			return Status{code, "Remote permanent data error"}
		case REMOTE_CONNECTION_ERROR:
			return Status{code, "Remote connection to peer failed"}
		case REMOTE_NO_SPACE:
			return Status{code, "No space left"}
		case REMOTE_NO_SUCH_HMAC_ALGORITHM:
			return Status{code, "Unknown HMAC algorithm"}
		case REMOTE_NESTED_OPERATION_ERRORS:
			return Status{code, "Operation completed but has nested errors"}
		case REMOTE_DEVICE_LOCKED:
			return Status{code, "Remote device is locked"}
		case REMOTE_DEVICE_ALREADY_UNLOCKED:
			return Status{code, "Remote device is already unlocked"}
		case REMOTE_CONNECTION_TERMINATED:
			return Status{code, "Remote connection is terminated"}
		case REMOTE_INVALID_BATCH:
			return Status{code, "Invalid batch"}
		case REMOTE_INVALID_EXECUTE:
			return Status{code, "Invalid execute of applet"}
		case REMOTE_EXECUTE_COMPLETE:
			return Status{code, "Applet execute complete"}
		default:
			return Status{code, "Internal Error"}
		}
	*/
}