Commit 4d716f3f authored by chiaming2000's avatar chiaming2000
Browse files

Simulator and Java API:

Implemented UNSOLICITEDSTATUS auth type handshake protocol semantic. The
simualtor sends an UNSOLICITEDSTATUS message to client library that
connects to it and sets "connectionID", "clusterVersion", etc in it.

Client library implementation should wait/block the connection creation
process until the UNSOLICITEDSTATUS message is received. 

Client library is required to use the connectionID in all the
subsequential request messages.  Any request message sent without the
connectionID will be rejected (not yet enfornced, but will do so when
drive has the behavior).

Simulator sets the connectionID in each of the response message.
parent 99944ea8
Loading
Loading
Loading
Loading
+40 −20
Original line number Diff line number Diff line
@@ -22,6 +22,8 @@ import java.security.Key;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

@@ -39,6 +41,7 @@ import com.seagate.kinetic.common.lib.Hmac.HmacException;
import com.seagate.kinetic.common.lib.KineticMessage;
import com.seagate.kinetic.proto.Kinetic.Command;
import com.seagate.kinetic.proto.Kinetic.Message;
import com.seagate.kinetic.proto.Kinetic.Message.AuthType;
import com.seagate.kinetic.proto.Kinetic.Message.Builder;
import com.seagate.kinetic.proto.Kinetic.Command.Header;
import com.seagate.kinetic.proto.Kinetic.Command.MessageType;
@@ -86,6 +89,8 @@ public class ClientProxy {
    
    private volatile boolean isConnectionIdSetByServer = false;
    
    private CountDownLatch cidLatch = new CountDownLatch (1);

    /**
     * Construct a new instance of client proxy
     *
@@ -113,18 +118,32 @@ public class ClientProxy {

        // io handler
        this.iohandler = new IoHandler(this);
        
        // wait for status message
        this.waitForStatusMessage();
    }
    
    private void waitForStatusMessage() throws KineticException {
        
        try {
            this.cidLatch.await(30, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            logger.log(Level.WARNING, e.getMessage(), e);
        }
        
        if (this.isConnectionIdSetByServer == false) {
            throw new KineticException ("Did not receive a Status message from service.");
        }
    }
    
    /**
     * Set client connection ID by server. Usually this is set by first OP response from the server. The 
     * Set client connection ID by server. his is set by  from the server. The 
     * client library uses this connectionID after this is set by the server.
     * 
     * @param cid the connection id to be set for this connection.
     */
    public void setConnectionId(KineticMessage kresponse) {

        
        
        if (this.isConnectionIdSetByServer) {
            /**
             * if already set by server, simply return.
@@ -132,27 +151,28 @@ public class ClientProxy {
            return;
        }
        
        if (kresponse.getMessage().getAuthType() != AuthType.UNSOLICITEDSTATUS) {
            return;
        }

        /**
         * check if set connection ID is needed.
         */
        synchronized (this) {
            
            if (isConnectionIdSetByServer) {
                return;
            }

            if (kresponse.getCommand().getHeader()
                    .hasConnectionID()) {
        if (kresponse.getCommand().getHeader().hasConnectionID()) {

                this.connectionID = kresponse.getCommand()
                        .getHeader().getConnectionID();
            this.connectionID = kresponse.getCommand().getHeader()
                    .getConnectionID();

            // set flag to true
            this.isConnectionIdSetByServer = true;
            
            // countdown
            this.cidLatch.countDown();

            logger.fine("set connection Id: " + this.connectionID);
        }
        }

    }

    /**
+1 −1
Original line number Diff line number Diff line
@@ -87,7 +87,7 @@ public class DefaultKineticClient implements AdvancedKineticClient {
        client = new ClientProxy(config);
        
        //send a no-op and set connection ID.
        this.connectionSetUp();
        //this.connectionSetUp();

        LOG.fine("kinetic client initialized, server=" + config.getHost()
                + ", port=" + config.getPort());
+17 −0
Original line number Diff line number Diff line
@@ -44,6 +44,7 @@ import com.seagate.kinetic.client.internal.async.PutAsyncCallbackHandler;
import com.seagate.kinetic.client.io.provider.spi.ClientMessageService;
import com.seagate.kinetic.common.lib.KineticMessage;
import com.seagate.kinetic.proto.Kinetic.Command.MessageType;
import com.seagate.kinetic.proto.Kinetic.Message.AuthType;

/**
 *
@@ -85,6 +86,8 @@ public class MessageHandler implements ClientMessageService, Runnable {

	private final Object syncObj = new Object();
	
	private boolean isStatusMessageReceived = false;

	/**
	 * Constructor.
	 *
@@ -130,6 +133,20 @@ public class MessageHandler implements ClientMessageService, Runnable {
			logger.info("read/routing message: " + message);
		}
		
		/**
		 * check status message has received.
		 */
		if (this.isStatusMessageReceived == false) {
		    
		    if (message.getMessage().getAuthType() == AuthType.UNSOLICITEDSTATUS) {
		        this.client.setConnectionId(message);
		        this.isStatusMessageReceived = true;
		        return;
		    } else {
		        logger.warning("received unexpected message ..." + message.getMessage() + ", command=" + message.getCommand());
		    }
		}

		Long seq = Long.valueOf(message.getCommand().getHeader()
				.getAckSequence());

+26 −2
Original line number Diff line number Diff line
@@ -19,6 +19,8 @@
 */
package com.seagate.kinetic.simulator.internal;

import io.netty.channel.ChannelHandlerContext;

import java.io.File;
import java.security.Key;
import java.util.ArrayList;
@@ -36,6 +38,7 @@ import com.seagate.kinetic.heartbeat.message.ByteCounter;
import com.seagate.kinetic.heartbeat.message.OperationCounter;
import com.seagate.kinetic.proto.Kinetic.Command;
import com.seagate.kinetic.proto.Kinetic.Command.Security;
import com.seagate.kinetic.proto.Kinetic.Command.Status.StatusCode;
import com.seagate.kinetic.proto.Kinetic.Message;
import com.seagate.kinetic.proto.Kinetic.Message.AuthType;
import com.seagate.kinetic.proto.Kinetic.Command.MessageType;
@@ -791,9 +794,30 @@ public class SimulatorEngine implements MessageService {
     * 
     * @return the connection info instance associated with the connection.
     */
    public static ConnectionInfo registerNewConnection (Object connection) {
    public ConnectionInfo registerNewConnection (ChannelHandlerContext ctx) {
        ConnectionInfo info = newConnectionInfo();
        putConnectionInfo (connection, info);
        putConnectionInfo (ctx, info);
        
        KineticMessage km = new KineticMessage();
        
        Message.Builder mb = Message.newBuilder();
        mb.setAuthType(AuthType.UNSOLICITEDSTATUS);
        
        Command.Builder cb = Command.newBuilder();
        cb.getHeaderBuilder().setConnectionID(info.getConnectionId());
        
        cb.getHeaderBuilder().setClusterVersion(this.clusterVersion);
        
        cb.getStatusBuilder().setCode(StatusCode.SUCCESS);
        
        mb.setCommandBytes(cb.build().toByteString());
        
        km.setMessage(mb);
        km.setCommand(cb);
        
        ctx.writeAndFlush(km);
        
        logger.info("***** connection registered., sent UNSOLICITEDSTATUS with cid = " + info.getConnectionId());
        
        return info;
    }
+16 −38
Original line number Diff line number Diff line
@@ -53,30 +53,12 @@ public class NioConnectionStateManager {
     * @throws RuntimeException if connection has already set by the simulator but received a connection Id 
     *         does not match. 
     */
    public static StatefulMessage checkAndGetStatefulMessage(ChannelHandlerContext ctx,
    public static void checkIfConnectionIdSet(ChannelHandlerContext ctx,
            KineticMessage request) {

        // get connection info for this channel
        ConnectionInfo cinfo = SimulatorEngine.getConnectionInfo(ctx);
        
        // stateful message
        StatefulMessage sm = null;

        // this should not be null
        if (cinfo != null) {
            // check if this is the first message of this connection
            if (cinfo.getIsConnectionIdSetToClient() == false) {
                // new instance of stateful message
                sm = new StatefulMessage(request);
                // first connection, set connection id to it
                sm.setConnectionId(cinfo.getConnectionId());

                // only set once
                cinfo.setIsConnectionIdSetToClient(true);

                // update connection info map
                SimulatorEngine.putConnectionInfo(ctx, cinfo);
            } else {
        if (cinfo.getConnectionId() != request.getCommand().getHeader().getConnectionID()) {
            
            logger.warning ("expect connection Id="
@@ -94,9 +76,5 @@ public class NioConnectionStateManager {
            }
        }
    }
        }

        return sm;
    }

}
Loading