Commit c75f408c authored by chiaming2000's avatar chiaming2000
Browse files

Java simulator:

1. Removed peer to peer copy connection pooling cache.  Make it
stateless such that it can handle failure (peer crashed) test
requirements from Kinetic systems (such as swift).

2. Fix for (NullPointerException is thrown when an exception contains no
exception message). The exception message is checked before it is set to
a Kinetic response message.
parent 1baca17f
Loading
Loading
Loading
Loading
+5 −7
Original line number Diff line number Diff line
@@ -476,14 +476,12 @@ public class SimulatorEngine implements MessageService {
                    StatusCode.INVALID_REQUEST);
            }

            if (context.getCommandBuilder().getStatusBuilder()
                    .hasStatusMessage() == false) {
                context.getCommandBuilder().getStatusBuilder()
            // set status message
            context.getCommandBuilder()
                    .getStatusBuilder()
                    .setStatusMessage(
                        e.getMessage());
            }
                            e.getClass().getName() + ":" + e.getMessage());

            logger.log(Level.WARNING, e.getMessage(), e);
        } finally {

            try {
+9 −42
Original line number Diff line number Diff line
@@ -19,8 +19,6 @@
 */
package com.seagate.kinetic.simulator.internal.p2p;

import java.util.HashMap;
import java.util.logging.Level;
import java.util.logging.Logger;

import kinetic.client.ClientConfiguration;
@@ -29,7 +27,6 @@ import kinetic.client.KineticClientFactory;
import kinetic.client.KineticException;

import com.seagate.kinetic.common.lib.KineticMessage;

import com.seagate.kinetic.proto.Kinetic.Command.P2POperation.Peer;

/**
@@ -44,9 +41,6 @@ public class P2PConnectionPool {
    private final static Logger logger = Logger
            .getLogger(P2PConnectionPool.class.getName());

    // client map
    private final HashMap<String, KineticClient> clientMap = new HashMap<String, KineticClient>();

    public P2PConnectionPool() {
        ;
    }
@@ -76,32 +70,8 @@ public class P2PConnectionPool {
    private synchronized KineticClient getFromCacheOrCreate(KineticMessage request)
            throws KineticException {

        // peer info
        Peer peer = request.getCommand().getBody().getP2POperation().getPeer();

        // user id
        long uid = request.getMessage().getHmacAuth().getIdentity();

        // map key
        String key = uid + ":" + peer.getHostname() + ":" + peer.getPort()
                + ":" + peer.getTls();

        // get from pool
        KineticClient client = this.clientMap.get(key);

        if (client == null) {
            // create client instance
            client = this.createClient(request);

            // add to pool
            this.clientMap.put(key, client);

            logger.info("created and put client instance to pool, key=" + key);
        } else {
            logger.info("got client instance from pool, key=" + key);
        }

        return client;
        // create client
        return this.createClient(request);
    }

    /**
@@ -129,9 +99,15 @@ public class P2PConnectionPool {
        config.setPort(peer.getPort());
        config.setUseSsl(peer.getTls());

        logger.info("creating p2p client: " + peer.getHostname() + ":"
                + peer.getPort());

        // create a new instance
        KineticClient client = KineticClientFactory.createInstance(config);

        logger.info("created p2p client: " + peer.getHostname() + ":"
                + peer.getPort());

        return client;
    }

@@ -139,15 +115,6 @@ public class P2PConnectionPool {
     * close the pool.
     */
    public void close() {

        for (KineticClient client : clientMap.values()) {
            try {
                client.close();
            } catch (Exception e) {
                logger.log(Level.WARNING, e.getMessage(), e);
            }
        }

        this.clientMap.clear();
        ;
    }
}
+144 −128
Original line number Diff line number Diff line
@@ -94,16 +94,17 @@ public class P2POperationHandler {
            Store<ByteString, ByteString, KVValue> store,
            KineticMessage request, KineticMessage response) {

        Command.Builder commandBuilder = (Command.Builder) response
                .getCommand();

        // get client instance
        KineticClient client = this.getClient(request, response);

        try {
            Command.Builder commandBuilder = (Command.Builder) response
                    .getCommand();

            /**
             * perform p2p Ops if connected to peer. Otherwise,
         * REMOTE_CONNECTION_ERROR code was set in the status code and return
         * back to the client.
             * REMOTE_CONNECTION_ERROR code was set in the status code and
             * return back to the client.
             */
            if (client != null) {

@@ -167,8 +168,8 @@ public class P2POperationHandler {
                            // forced put ignore version
                            client.putForced(entry);
                        } else {
                        // if there is a version specified in op, use versioned
                        // put
                            // if there is a version specified in op, use
                            // versioned put
                            if (operation.hasVersion()) {

                                // set db version
@@ -186,8 +187,8 @@ public class P2POperationHandler {
                        }

                        // set success status
                    respOpBuilder.getStatusBuilder()
                            .setCode(StatusCode.SUCCESS);
                        respOpBuilder.getStatusBuilder().setCode(
                                StatusCode.SUCCESS);
                    } catch (KVStoreNotFound kvne) {

                        logger.warning("cannot find entry from the specified key in request message...");
@@ -197,16 +198,9 @@ public class P2POperationHandler {

                        /**
                         * The (command) response code is set to OK even if
                     * exception occurred. The application can examine each of
                     * the operation status in the p2p response.
                         * exception occurred. The application can examine each
                         * of the operation status in the p2p response.
                         */
                    // set overall status code
                    // commandBuilder.getStatusBuilder().setCode(
                    // StatusCode.NOT_FOUND);

                    // set overall status message
                    // commandBuilder.getStatusBuilder().setStatusMessage(
                    // "cannot find the specified key");

                        // set individual status code
                        respOpBuilder.getStatusBuilder().setCode(
@@ -229,19 +223,23 @@ public class P2POperationHandler {

                        /**
                         * The (command) response code is set to OK even if
                     * exception occurred. The application can examine each of
                     * the operation status in the p2p response.
                         * exception occurred. The application can examine each
                         * of the operation status in the p2p response.
                         */

                        // set individual status code
                        respOpBuilder.getStatusBuilder().setCode(
                            ke.getResponseMessage().getCommand().getStatus()
                                    .getCode());
                                ke.getResponseMessage().getCommand()
                                        .getStatus().getCode());

                        // set individual status message
                        String sm = ke.getResponseMessage().getCommand()
                                .getStatus().getStatusMessage();

                        if (sm != null) {
                            respOpBuilder.getStatusBuilder().setStatusMessage(
                            ke.getResponseMessage().getCommand().getStatus()
                                    .getStatusMessage());
                                    sm);
                        }

                    } catch (Exception e) {

@@ -259,14 +257,19 @@ public class P2POperationHandler {
                            respOpBuilder.getStatusBuilder().setStatusMessage(
                                    e.getMessage());
                        }
                }

                    } finally {
                        // add response operation
                        respP2POpBuilder.addOperation(respOpBuilder.build());
                    }
                }
            }

        } finally {
            this.close(client);
        }
    }


    /**
     * Get peer instance.
     *
@@ -306,6 +309,19 @@ public class P2POperationHandler {
        return client;
    }

    private void close(KineticClient client) {

        if (client == null) {
            return;
        }

        try {
            client.close();
        } catch (Exception e) {
            logger.warning(e.getMessage());
        }
    }

    /**
     * close connection pool.
     */
+14 −0
Original line number Diff line number Diff line
@@ -22,6 +22,9 @@ package com.seagate.kinetic.simulator.client.p2p;
import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertTrue;
import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;

import java.util.logging.Logger;

import kinetic.admin.KineticAdminClient;
import kinetic.admin.KineticAdminClientFactory;
import kinetic.client.Entry;
@@ -51,6 +54,9 @@ import com.seagate.kinetic.proto.Kinetic.Command.Status.StatusCode;
 */
@Test(groups = { "simulator" })
public class PeerToPeerOperationTest extends IntegrationTestCase {

    Logger logger = Logger.getLogger(PeerToPeerOperationTest.class.getName());

    private AbstractIntegrationTestTarget secondaryTestTarget;
    private KineticP2pClient secondaryClient;

@@ -96,6 +102,8 @@ public class PeerToPeerOperationTest extends IntegrationTestCase {
        Entry peerEntry = secondaryClient.get(key);

        assertArrayEquals(value, peerEntry.getValue());

        logger.info("validated peer to peer works over plain TCP ...");
    }

    @Test(dataProvider = "transportProtocolOptions")
@@ -121,6 +129,8 @@ public class PeerToPeerOperationTest extends IntegrationTestCase {
        Entry peerEntry = secondaryClient.get(key);

        assertArrayEquals(value, peerEntry.getValue());

        logger.info("validated peer to peer works over SSL ...");
    }

    @Test(dataProvider = "transportProtocolOptions")
@@ -151,6 +161,9 @@ public class PeerToPeerOperationTest extends IntegrationTestCase {
        assertTrue("expect version mis-match status code",
                StatusCode.VERSION_MISMATCH == p2pResp.getOperationList()
                        .get(0).getStatusCode());

        logger.info("received expect version mis-match status code: "
                + StatusCode.VERSION_MISMATCH);
    }

    @Test(dataProvider = "transportProtocolOptions")
@@ -171,6 +184,7 @@ public class PeerToPeerOperationTest extends IntegrationTestCase {
            AssertJUnit.fail("Should have thrown KineticException");
        } catch (KineticException e) {
            // expected exception should be caught here.
            logger.info("caught expected exception: " + e.getMessage());
            ;
        }
    }