Commit 072547d9 authored by chiaming2000's avatar chiaming2000
Browse files

1. The client library will now use the connection Id set the in first

response message by the drive/simulator.

2. The simulator will now set connection Id in the first response
message to the client after conenction is created.

(The simulator does not raise an exception if it detects the client does
not use the associated set Id.  It only logs a warning message.  This
behavior may chnage if the drive started to enforce this feature.)
parent 84ba33f3
Loading
Loading
Loading
Loading
+42 −1
Original line number Diff line number Diff line
@@ -86,6 +86,8 @@ public class ClientProxy {
    // use protocol v2
    private boolean useV2Protocol = true;
    
    private volatile boolean isConnectionIdSetByServer = false;

    /**
     * Construct a new instance of client proxy
     *
@@ -118,6 +120,46 @@ public class ClientProxy {
        this.iohandler = new IoHandler(this);
    }
    
    /**
     * Set client connection ID by server. Usually this is set by first OP response from the server. The 
     * client library uses this connectionID after this is set by the server.
     * 
     * @param cid the connection id to be set for this connection.
     */
    public void setConnectionId(KineticMessage kresponse) {
        
        
        
        if (this.isConnectionIdSetByServer) {
            /**
             * if already set by server, simply return.
             */
            return;
        }
        
        /**
         * check if set connection ID is needed.
         */
        synchronized (this) {
            
            if (isConnectionIdSetByServer) {
                return;
            }
            
            if (kresponse.getMessage().getCommand().getHeader()
                    .hasConnectionID()) {
                
                this.connectionID = kresponse.getMessage().getCommand()
                        .getHeader().getConnectionID();
                
                //set flag to true
                this.isConnectionIdSetByServer = true;
                
                logger.info("set connection Id: " + this.connectionID);
            }
        }
    }

    /**
     * Get client configuration instance for this client instance.
     *
@@ -356,7 +398,6 @@ public class ClientProxy {
            
            //check status code
            MessageFactory.checkReply(krequest, kresponse);
            
        } catch (KineticException ke) {
            ke.setRequestMessage(krequest);
            ke.setResponseMessage(kresponse);
+33 −0
Original line number Diff line number Diff line
@@ -19,6 +19,7 @@ package com.seagate.kinetic.client.internal;

import java.util.ArrayList;
import java.util.List;

import java.util.logging.Logger;

import kinetic.client.CallbackHandler;
@@ -86,6 +87,9 @@ public class DefaultKineticClient implements AdvancedKineticClient {
        // create client proxy to talk to the drive
        client = new ClientProxy(config);
        
        //send a no-op and set connection ID.
        this.connectionSetUp();

        LOG.fine("kinetic client initialized, server=" + config.getHost()
                + ", port=" + config.getPort());
    }
@@ -934,4 +938,33 @@ public class DefaultKineticClient implements AdvancedKineticClient {
        this.client.requestAsync(message, handler);
    }
    
    /**
     * Set up kinetic connection.
     * <p>
     * The purpose is to get a connection ID from server. After this is returned, the connection is ready 
     * for concurrent and asynchronous operations. 
     * 
     * This may be extended to set up other connection attributes in the future.
     * @throws KineticException 
     */
    private void connectionSetUp ()  {
        
        KineticMessage response = null;
        
        try {
            // create get request message
            KineticMessage request = MessageFactory.createNoOpRequestMessage();

            // send request
            response = this.client.request(request); 
        } catch (KineticException e) {
            LOG.warning(e.getMessage());
        } finally {
            if (response != null) {
                // set connectionId
                this.client.setConnectionId(response);
            }
        }
    }

}
+74 −0
Original line number Diff line number Diff line
/**
 * 
 * Copyright (C) 2014 Seagate Technology.
 * 
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
 *
 */
package com.seagate.kinetic.simulator.internal;

/**
 * Container to hold connection id and its status of being set to the client.
 * 
 * @author chiaming
 */
public class ConnectionInfo {

    private long connectionId = -1;
    
    private boolean isConnectionIdSetToClient = false;
    
    /**
     * default constructor.
     */
    public ConnectionInfo() {
        // TODO Auto-generated constructor stub
    }
    
    /**
     * set connection Id.
     * 
     * @param cid the connection id association to its connection.
     */
    public synchronized void setConnectionId (long cid) {
        this.connectionId = cid;
    }
    
    /**
     * Get the connection Id.
     * 
     * @return connection Id.
     */
    public synchronized long getConnectionId() {
        return this.connectionId;
    }
    
    /**
     * Set if the connection Id has been set to the client.
     * 
     * @param flag true if the Id is set to the client.
     */
    public synchronized void setIsConnectionIdSetToClient (boolean flag) {
        this.isConnectionIdSetToClient = flag;
    }
    
    /**
     * Get if the connection Id is set to the client flag.
     * @return true if set to the client.  Otherwise, returns false.
     */
    public synchronized boolean getIsConnectionIdSetToClient() {
        return this.isConnectionIdSetToClient;
    }
}
+76 −1
Original line number Diff line number Diff line
@@ -23,6 +23,7 @@ import java.io.File;
import java.security.Key;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;

@@ -35,7 +36,6 @@ import com.seagate.kinetic.heartbeat.message.OperationCounter;
import com.seagate.kinetic.proto.Kinetic.Message;
import com.seagate.kinetic.proto.Kinetic.Message.MessageType;
import com.seagate.kinetic.proto.Kinetic.Message.Security.ACL;
import com.seagate.kinetic.proto.Kinetic.MessageOrBuilder;
import com.seagate.kinetic.simulator.heartbeat.Heartbeat;
import com.seagate.kinetic.simulator.internal.p2p.P2POperationHandler;
import com.seagate.kinetic.simulator.io.provider.nio.NioEventLoopGroupManager;
@@ -144,6 +144,12 @@ public class SimulatorEngine implements MessageService {
    private static SimulatorShutdownHook shutdownHook = new SimulatorShutdownHook(
            tpService);
    
    //connection map
    private static ConcurrentHashMap<Object, ConnectionInfo> connectionMap = new ConcurrentHashMap<Object, ConnectionInfo>();

    // next connection id
    private static long nextConnectionId = 0;
    
    static {
        // add shutdown hook to clean up resources
        Runtime.getRuntime().addShutdownHook(shutdownHook);
@@ -701,4 +707,73 @@ public class SimulatorEngine implements MessageService {
        }

    }
    
    /**
     * put connection/connection info into the connection map.
     *  
     * @param connection the key for the entry
     * @param cinfo value of the entry
     * @return the previous value associated with key, or null if there was no mapping for key
     */
    public static ConnectionInfo putConnectionInfo (Object connection, ConnectionInfo cinfo) {
        return connectionMap.put(connection, cinfo);
    }
    
    /**
     * Get connection info based on the specified key.
     * 
     * @param connection key to get the connection info.
     * 
     * @return the value to which the specified key is mapped, or null if this map contains no mapping for the key
     */
    public static ConnectionInfo getConnectionInfo (Object connection) {
        return connectionMap.get(connection);
    }
    
    /**
     * remove the value of the specified key.
     * @param connection the key od the entry that needs to be removed
     * @return the previous value associated with key, or null if there was no mapping for key
     */
    public static ConnectionInfo removeConnectionInfo (Object connection) {
        return connectionMap.remove (connection);
    }
    
    /**
     * register a new connection. A new connection info instance is created and associated with the connection.
     * 
     * @param connection the new connection to be added to the connection map.
     * 
     * @return the connection info instance associated with the connection.
     */
    public static ConnectionInfo registerNewConnection (Object connection) {
        ConnectionInfo info = newConnectionInfo();
        putConnectionInfo (connection, info);
        
        return info;
    }
    
    /**
     * instantiate a new connection info object with connection id set.
     * 
     * @return a new connection info object with connection id set
     */
    public static ConnectionInfo newConnectionInfo() {
        
        ConnectionInfo info = new ConnectionInfo();
        
        info.setConnectionId(getNextConnectionId());
        
        return info;
    }
    
    /**
     * Get next available connection id.
     * 
     * @return next available connection id in sequence.
     */
    private static synchronized long getNextConnectionId() {
        return nextConnectionId ++;
    }
    
}
+70 −0
Original line number Diff line number Diff line
/**
 * 
 * Copyright (C) 2014 Seagate Technology.
 * 
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
 *
 */
package com.seagate.kinetic.simulator.internal;

import com.seagate.kinetic.common.lib.KineticMessage;

/**
 * A stateful message is a Kinetic Message that contains a connection Id.
 * 
 * @author chiaming
 *
 */
public class StatefulMessage extends KineticMessage {
    
    // connection Id
    private long connectionId = -1;
    
    public StatefulMessage() {
        ;
    }
    
    /**
     * Construct a stateful message with the specified message.
     * 
     * @param km Kinetic Message based on which the stateful message is constructed.
     * 
     */
    public StatefulMessage(KineticMessage km) {
        //set message
        super.setMessage(km.getMessage());
        
        //set value
        super.setValue(km.getValue());
    }
    
    /**
     * Set connection Id.
     * 
     * @param cid the connection Id to be set to this message.
     */
    public void setConnectionId (long cid) {
        this.connectionId = cid;
    }
    
    /**
     * Get connection Id of this message.
     * 
     * @return connection Id
     */
    public long getConnectionId() {
        return this.connectionId;
    }
}
Loading