Commit 2015a639 authored by chiaming2000's avatar chiaming2000
Browse files

Peer to peer operation behavior enhancement.

1. Unless the p2p operation cannot be completed for all the operations,
the method does not throw an KineticException. Instead, the overall
status is set and can be obtained with the returned object {@link
PeerToPeerOperation#getStatus()} API.

2. Applications check the overall status and if it is set to false, the
individual status can be obtained from {@link
PeerToPeerOperation#getOperationList()}.

3. Please see kinetic.client.p2p.KineticP2pClient JavaDoc for API
specification and behavior.
parent 591588da
Loading
Loading
Loading
Loading
+161 −145
Original line number Diff line number Diff line
@@ -32,7 +32,6 @@ import com.seagate.kinetic.client.internal.MessageFactory;
import com.seagate.kinetic.client.lib.ClientLogger;
import com.seagate.kinetic.common.lib.KineticMessage;
import com.seagate.kinetic.proto.Kinetic.Command;

import com.seagate.kinetic.proto.Kinetic.Command.MessageType;
import com.seagate.kinetic.proto.Kinetic.Command.P2POperation;
import com.seagate.kinetic.proto.Kinetic.Command.Status.StatusCode;
@@ -49,7 +48,8 @@ KineticP2pClient {

    private final static Logger LOG = ClientLogger.get();

	public DefaultKineticP2pClient(ClientConfiguration config) throws KineticException {
    public DefaultKineticP2pClient(ClientConfiguration config)
            throws KineticException {
        super(config);
    }

@@ -94,11 +94,12 @@ KineticP2pClient {
        Command.Builder commandBuilder = (Command.Builder) km.getCommand();

        // set request type
		commandBuilder.getHeaderBuilder()
		.setMessageType(MessageType.PEER2PEERPUSH);
        commandBuilder.getHeaderBuilder().setMessageType(
                MessageType.PEER2PEERPUSH);

        // p2p builder
		P2POperation.Builder p2pBuilder = commandBuilder.getBodyBuilder().getP2POperationBuilder();
        P2POperation.Builder p2pBuilder = commandBuilder.getBodyBuilder()
                .getP2POperationBuilder();

        // set peer hots/port/tls
        p2pBuilder.getPeerBuilder().setHostname(
@@ -112,7 +113,8 @@ KineticP2pClient {
        for (Operation op : operationList) {

            // operation builder
			Command.P2POperation.Operation.Builder operationBuilder = Command.P2POperation.Operation.newBuilder();
            Command.P2POperation.Operation.Builder operationBuilder = Command.P2POperation.Operation
                    .newBuilder();

            // set force flag
            operationBuilder.setForce(op.getForced());
@@ -142,8 +144,7 @@ KineticP2pClient {
        P2POperation peerToPeerOperationResponse = response.getCommand()
                .getBody().getP2POperation();

		StatusCode respScode = response.getCommand().getStatus()
				.getCode();
        StatusCode respScode = response.getCommand().getStatus().getCode();

        /**
         * throws KineticException if overall status failed.
@@ -151,8 +152,7 @@ KineticP2pClient {
        if (respScode != StatusCode.SUCCESS) {

            // get status message
			String msg = response.getCommand().getStatus()
					.getStatusMessage();
            String msg = response.getCommand().getStatus().getStatusMessage();

            // get exception message from response
            String emsg = (msg == null) ? "Internal error for P2P ops" : msg;
@@ -160,6 +160,12 @@ KineticP2pClient {
            // construct and throw exception
            KineticException ke = new KineticException(emsg);

            // set request message
            ke.setRequestMessage(km);

            // set response message
            ke.setResponseMessage(response);

            // log warning message
            LOG.warning("p2p op failed, status code: " + respScode + ", msg: "
                    + emsg);
@@ -168,21 +174,31 @@ KineticP2pClient {
        }

        // set overall status and message
		p2pOperation.setStatus(response.getCommand().getStatus()
				.getCode() == StatusCode.SUCCESS);
		p2pOperation.setErrorMessage(response.getCommand()
				.getStatus()
        // p2pOperation
        // .setStatus(response.getCommand().getStatus().getCode() ==
        // StatusCode.SUCCESS);

        // set overall status
        p2pOperation.setStatus(peerToPeerOperationResponse
                .getAllChildOperationsSucceeded());

        p2pOperation.setErrorMessage(response.getCommand().getStatus()
                .getStatusMessage());

        // set individual operation status and message
        for (int i = 0; i < operationList.size(); i++) {

            // status code
            StatusCode sc = peerToPeerOperationResponse.getOperation(i)
                    .getStatus().getCode();

            // set status
			operationList.get(i).setStatus(
					peerToPeerOperationResponse.getOperation(i).getStatus()
					.getCode() == StatusCode.SUCCESS);
            operationList.get(i).setStatus(sc == StatusCode.SUCCESS);

            // set status code
            operationList.get(i).setStatusCode(sc);

			// set message
            // set status message
            operationList.get(i).setErrorMessage(
                    peerToPeerOperationResponse.getOperation(i).getStatus()
                            .getStatusMessage());
+24 −15
Original line number Diff line number Diff line
@@ -32,6 +32,13 @@ public interface KineticP2pClient extends AdvancedKineticClient {
    /**
     * Perform Peer to Peer push operation.
     * <p>
     * Unless the p2p operation cannot be completed for all the operations, the
     * method does not throw an KineticException. Instead, the overall status is
     * set and can be obtained with {@link PeerToPeerOperation#getStatus()} API.
     * <p>
     * Applications check the overall status and if it is set to false, the
     * individual status can be obtained from
     * {@link PeerToPeerOperation#getOperationList()}.
     * 
     * @param p2pOperation
     *            specification to perform peer to peer operation.
@@ -41,6 +48,8 @@ public interface KineticP2pClient extends AdvancedKineticClient {
     * 
     * @throws KineticException
     *             if any internal error occurred.
     * 
     * @see PeerToPeerOperation
     */
    public PeerToPeerOperation PeerToPeerPush(PeerToPeerOperation p2pOperation)
            throws KineticException;
+157 −133
Original line number Diff line number Diff line
@@ -17,6 +17,8 @@
 */
package kinetic.client.p2p;

import com.seagate.kinetic.proto.Kinetic.Command.Status.StatusCode;

/**
 * 
 * Kinetic peer to peer push operation specification.
@@ -44,6 +46,8 @@ public class Operation {
    // operation status
    private boolean opStatus = false;

    private StatusCode statusCode = StatusCode.SUCCESS;

    // error message
    private String errorMessage = null;

@@ -162,4 +166,24 @@ public class Operation {
    public void setErrorMessage(String errorMessage) {
        this.errorMessage = errorMessage;
    }

    /**
     * Get status code for this operation.
     * 
     * @return status code for this operation
     */
    public StatusCode getStatusCode() {
        return this.statusCode;
    }

    /**
     * Set status code for this operation.
     * 
     * @param statusCode
     *            status code for this operation
     */
    public void setStatusCode(StatusCode statusCode) {
        this.statusCode = statusCode;
    }

}
+99 −37
Original line number Diff line number Diff line
@@ -26,11 +26,11 @@ import java.util.logging.Logger;

import kinetic.client.Entry;
import kinetic.client.KineticClient;
import kinetic.client.KineticException;

import com.google.protobuf.ByteString;
import com.seagate.kinetic.common.lib.KineticMessage;
import com.seagate.kinetic.proto.Kinetic.Command;

import com.seagate.kinetic.proto.Kinetic.Command.MessageType;
import com.seagate.kinetic.proto.Kinetic.Command.P2POperation;
import com.seagate.kinetic.proto.Kinetic.Command.P2POperation.Operation;
@@ -62,12 +62,12 @@ public class P2POperationHandler {
        Command.Builder commandBuilder = (Command.Builder) respond.getCommand();

        // set reply type
        commandBuilder.getHeaderBuilder()
        .setMessageType(MessageType.PEER2PEERPUSH_RESPONSE);
        commandBuilder.getHeaderBuilder().setMessageType(
                MessageType.PEER2PEERPUSH_RESPONSE);

        // set ack sequence
        commandBuilder.getHeaderBuilder()
        .setAckSequence(request.getCommand().getHeader().getSequence());
        commandBuilder.getHeaderBuilder().setAckSequence(
                request.getCommand().getHeader().getSequence());

        // check if has permission to set security
        if (currentMap == null) {
@@ -75,14 +75,15 @@ public class P2POperationHandler {
        } else {
            try {
                // check if client has permission
                Authorizer.checkPermission(currentMap, request.getMessage().getHmacAuth().getIdentity(), Permission.P2POP);
                Authorizer.checkPermission(currentMap, request.getMessage()
                        .getHmacAuth().getIdentity(), Permission.P2POP);

                hasPermission = true;
            } catch (KVSecurityException e) {
                commandBuilder.getStatusBuilder()
                .setCode(StatusCode.NOT_AUTHORIZED);
                commandBuilder.getStatusBuilder()
                .setStatusMessage(e.getMessage());
                commandBuilder.getStatusBuilder().setCode(
                        StatusCode.NOT_AUTHORIZED);
                commandBuilder.getStatusBuilder().setStatusMessage(
                        e.getMessage());
            }
        }

@@ -90,10 +91,11 @@ public class P2POperationHandler {
    }

    public void push(Map<Long, ACL> aclmap,
            Store<ByteString, ByteString, KVValue> store, KineticMessage request,
            KineticMessage response) {
            Store<ByteString, ByteString, KVValue> store,
            KineticMessage request, KineticMessage response) {

        Command.Builder commandBuilder = (Command.Builder) response.getCommand();
        Command.Builder commandBuilder = (Command.Builder) response
                .getCommand();

        // get client instance
        KineticClient client = this.getClient(request, response);
@@ -106,19 +108,24 @@ public class P2POperationHandler {
        if (client != null) {

            // get p2p operation list
            P2POperation p2pOp = request.getCommand().getBody().getP2POperation();
            P2POperation p2pOp = request.getCommand().getBody()
                    .getP2POperation();
            List<Operation> opList = p2pOp.getOperationList();

            // response operation builder
            P2POperation.Builder respP2POpBuilder = commandBuilder
                    .getBodyBuilder()
                    .getP2POperationBuilder();
                    .getBodyBuilder().getP2POperationBuilder();

            // set default value to true.
            // this will set to false when exception occurred
            respP2POpBuilder.setAllChildOperationsSucceeded(true);

            // loop through the list
            for (Operation operation : opList) {

                // response op builder
                Operation.Builder respOpBuilder = Operation.newBuilder(operation);
                Operation.Builder respOpBuilder = Operation
                        .newBuilder(operation);

                try {

@@ -160,7 +167,8 @@ public class P2POperationHandler {
                        // forced put ignore version
                        client.putForced(entry);
                    } else {
                        // if there is a version specified in op, use versioned put
                        // if there is a version specified in op, use versioned
                        // put
                        if (operation.hasVersion()) {

                            // set db version
@@ -169,7 +177,8 @@ public class P2POperationHandler {

                            // use store version as new version
                            // do versioned put
                            client.put(entry, kvvalue.getVersion().toByteArray());
                            client.put(entry, kvvalue.getVersion()
                                    .toByteArray());
                        } else {
                            // do forced put
                            client.putForced(entry);
@@ -177,27 +186,78 @@ public class P2POperationHandler {
                    }

                    // set success status
                    respOpBuilder.getStatusBuilder().setCode(StatusCode.SUCCESS);
                    respOpBuilder.getStatusBuilder()
                            .setCode(StatusCode.SUCCESS);
                } catch (KVStoreNotFound kvne) {

                    logger.warning("cannot find entry from the specified key in request message...");

                    respOpBuilder.getStatusBuilder().setCode(StatusCode.NOT_FOUND);
                    // set overall status
                    respP2POpBuilder.setAllChildOperationsSucceeded(false);

                    /**
                     * The (command) response code is set to OK even if
                     * exception occurred. The application can examine each of
                     * the operation status in the p2p response.
                     */
                    // set overall status code
                    // commandBuilder.getStatusBuilder().setCode(
                    // StatusCode.NOT_FOUND);

                    // set overall status message
                    // commandBuilder.getStatusBuilder().setStatusMessage(
                    // "cannot find the specified key");

                    // set individual status code
                    respOpBuilder.getStatusBuilder().setCode(
                            StatusCode.NOT_FOUND);

                    // set individual status message
                    respOpBuilder.getStatusBuilder().setStatusMessage(
                            "cannot find the specified key");

                    respOpBuilder.getStatusBuilder().setDetailedMessage(
                            ByteString
                            .copyFromUtf8("cannot find the specified key"));
                } catch (KineticException ke) {

                    /**
                     * errors occurred from remote peer
                     */

                    logger.warning(ke.getLocalizedMessage());

                    // set overall status
                    respP2POpBuilder.setAllChildOperationsSucceeded(false);

                    /**
                     * The (command) response code is set to OK even if
                     * exception occurred. The application can examine each of
                     * the operation status in the p2p response.
                     */

                    // set individual status code
                    respOpBuilder.getStatusBuilder().setCode(
                            ke.getResponseMessage().getCommand().getStatus()
                                    .getCode());

                    // set individual status message
                    respOpBuilder.getStatusBuilder().setStatusMessage(
                            ke.getResponseMessage().getCommand().getStatus()
                                    .getStatusMessage());

                } catch (Exception e) {

                    logger.log(Level.WARNING, e.getMessage(), e);

                    // set p2p overall status
                    respP2POpBuilder.setAllChildOperationsSucceeded(false);

                    // set individual status code
                    respOpBuilder.getStatusBuilder().setCode(
                            StatusCode.INTERNAL_ERROR);

                    // set individual status message
                    if (e.getMessage() != null) {
                        respOpBuilder.getStatusBuilder().setDetailedMessage(
                                ByteString.copyFromUtf8(e.getMessage()));
                        respOpBuilder.getStatusBuilder().setStatusMessage(
                                e.getMessage());
                    }
                }

@@ -217,11 +277,13 @@ public class P2POperationHandler {
     * @return client instance if created and cached. Return null if any error
     *         occurred.
     */
    private KineticClient getClient(KineticMessage request, KineticMessage response) {
    private KineticClient getClient(KineticMessage request,
            KineticMessage response) {

        KineticClient client = null;

        Command.Builder commandBuilder = (Command.Builder) response.getCommand();
        Command.Builder commandBuilder = (Command.Builder) response
                .getCommand();

        try {
            client = this.pool.getKineticClient(request);
@@ -231,13 +293,13 @@ public class P2POperationHandler {
            logger.log(Level.WARNING, e.getMessage(), e);

            // set status
            commandBuilder.getStatusBuilder()
            .setCode(StatusCode.REMOTE_CONNECTION_ERROR);
            commandBuilder.getStatusBuilder().setCode(
                    StatusCode.REMOTE_CONNECTION_ERROR);

            // set status message
            if (e.getMessage() != null) {
                commandBuilder.getStatusBuilder()
                .setStatusMessage(e.getMessage());
                commandBuilder.getStatusBuilder().setStatusMessage(
                        e.getMessage());
            }
        }

+21 −8
Original line number Diff line number Diff line
@@ -31,7 +31,7 @@ import kinetic.client.p2p.Operation;
import kinetic.client.p2p.Peer;
import kinetic.client.p2p.PeerToPeerOperation;

import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -40,6 +40,7 @@ import com.google.common.base.Charsets;
import com.seagate.kinetic.AbstractIntegrationTestTarget;
import com.seagate.kinetic.IntegrationTestCase;
import com.seagate.kinetic.IntegrationTestTargetFactory;
import com.seagate.kinetic.proto.Kinetic.Command.Status.StatusCode;

/**
 * P2P operation test.
@@ -73,7 +74,8 @@ public class PeerToPeerOperationTest extends IntegrationTestCase {
    }

    @Test(dataProvider = "transportProtocolOptions")
    public void testP2PPut_WorksOverPlainConnection(String clientName) throws Exception {
    public void testP2PPut_WorksOverPlainConnection(String clientName)
            throws Exception {
        byte[] key = "an awesome key!".getBytes(Charsets.UTF_8);
        byte[] value = "an awesome value!".getBytes(Charsets.UTF_8);

@@ -97,7 +99,8 @@ public class PeerToPeerOperationTest extends IntegrationTestCase {
    }

    @Test(dataProvider = "transportProtocolOptions")
    public void testP2PPut_WorksOverTlsConnection(String clientName) throws Exception {
    public void testP2PPut_WorksOverTlsConnection(String clientName)
            throws Exception {
        byte[] key = "an awesome key!".getBytes(Charsets.UTF_8);
        byte[] value = "an awesome value!".getBytes(Charsets.UTF_8);

@@ -121,7 +124,8 @@ public class PeerToPeerOperationTest extends IntegrationTestCase {
    }

    @Test(dataProvider = "transportProtocolOptions")
    public void testP2PPut_Fails_ForVersionMismatch(String clientName) throws Exception {
    public void testP2PPut_Fails_ForVersionMismatch(String clientName)
            throws Exception {
        byte[] key = "an awesome key!".getBytes(Charsets.UTF_8);
        byte[] value = "an awesome value!".getBytes(Charsets.UTF_8);

@@ -137,12 +141,21 @@ public class PeerToPeerOperationTest extends IntegrationTestCase {

        PeerToPeerOperation p2pResp = getClient(clientName).PeerToPeerPush(p2p);

        // expect overall status to be false due to version mismatch
        assertFalse(p2pResp.getStatus());

        // expect response op status to set to false
        assertFalse(p2pResp.getOperationList().get(0).getStatus());
        assertTrue(p2pResp.getStatus());

        // expect version mis-match due to the version specified does not exist
        assertTrue("expect version mis-match status code",
                StatusCode.VERSION_MISMATCH == p2pResp.getOperationList()
                        .get(0).getStatusCode());
    }

    @Test(dataProvider = "transportProtocolOptions")
    public void testP2PPut_Fails_ForInvalidPeer(String clientName) throws Exception {
    public void testP2PPut_Fails_ForInvalidPeer(String clientName)
            throws Exception {
        PeerToPeerOperation p2pop = new PeerToPeerOperation();
        Peer peer = new Peer();
        peer.setHost("localhost");
@@ -155,7 +168,7 @@ public class PeerToPeerOperationTest extends IntegrationTestCase {
        // the client should surface connection errors in a machine-readable way
        try {
            getClient(clientName).PeerToPeerPush(p2pop);
            Assert.fail("Should have thrown KineticException");
            AssertJUnit.fail("Should have thrown KineticException");
        } catch (KineticException e) {
            // expected exception should be caught here.
            ;