Commit 7e739eaf authored by chiaming2000's avatar chiaming2000
Browse files

Java simulator:

Enforce request message "sequence" field as specified in kinetic.proto.
Request message with sequence number not conformed is rejected with
INVALID_REQUEST response message.
parent bb3f5e13
Loading
Loading
Loading
Loading
+44 −0
Original line number Diff line number Diff line
@@ -55,6 +55,12 @@ public class KineticMessage {
    // set to true if this message is marked as invalid batch
    private volatile boolean isInvalidBatchMessage = false;

    // set to true if request is not valid
    private volatile boolean isInValidRequest = false;

    // error message
    private String errorMsg = null;

	/**
	 * Set protocol buffer message.
	 *
@@ -184,4 +190,42 @@ public class KineticMessage {
        this.isInvalidBatchMessage = flag;
    }

    /**
     * Set if this message is an invalid kinetic message.
     * 
     * @param flag
     *            true if this is an invalid kinetic message.
     */
    public void setIsInvalidRequest(boolean flag) {
        this.isInValidRequest = flag;
    }

    /**
     * get if this message is an invalid kinetic message.
     * 
     * @return true if invalid message.
     */
    public boolean getIsInvalidRequest() {
        return this.isInValidRequest;
    }

    /**
     * set error message for this message.
     * 
     * @param msg
     *            the error message to be set.
     */
    public void setErrorMessage(String msg) {
        this.errorMsg = msg;
    }

    /**
     * Get error message for this message.
     * 
     * @return error message for this message.
     */
    public String getErrorMessage() {
        return this.errorMsg;
    }

}
+48 −0
Original line number Diff line number Diff line
@@ -19,6 +19,10 @@
 */
package com.seagate.kinetic.simulator.internal;

import java.util.logging.Logger;

import com.seagate.kinetic.common.lib.KineticMessage;

/**
 * Container to hold connection id and its status of being set to the client.
 * 
@@ -26,10 +30,16 @@ package com.seagate.kinetic.simulator.internal;
 */
public class ConnectionInfo {

    private final static Logger logger = Logger.getLogger(SimulatorEngine.class
            .getName());

    private long connectionId = -1;
    
    private boolean isConnectionIdSetToClient = false;
    
    // last received seq#
    private long lastSequenceReceived = Long.MIN_VALUE;

    /**
     * default constructor.
     */
@@ -71,4 +81,42 @@ public class ConnectionInfo {
    public synchronized boolean getIsConnectionIdSetToClient() {
        return this.isConnectionIdSetToClient;
    }

    /**
     * check and set received sequence number. The sequence number received must
     * be greater than previous in a connection.
     * <p>
     * The internal sequence# is set to the sequence received if the comparison
     * is true.
     * 
     * @param sequence
     *            the last sequence received
     * @return true if last sequence received is greater than previous.
     */
    public synchronized boolean checkAndSetLastReceivedSequence(
            KineticMessage request) {

        boolean flag = false;

        // request sequence
        long sequence = request.getCommand().getHeader().getSequence();

        /**
         * update last received if there is a new one and set return flag to
         * true.
         */
        if (sequence > lastSequenceReceived) {
            this.lastSequenceReceived = sequence;
            flag = true;
        } else {
            // mark this message as invalid
            request.setIsInvalidRequest(true);
            request.setErrorMessage("Invalid Sequence Id: " + sequence);

            logger.warning("invalid sequence Id: " + sequence
                    + ", lastSequenceReceived: " + lastSequenceReceived);
        }

        return flag;
    }
}
+1 −1
Original line number Diff line number Diff line
@@ -211,7 +211,7 @@ public class RequestContext {
    public void preProcessRequest() throws Exception {

        HeaderOp.checkHeader(this.request, this.response, key,
                this.engine.getClusterVersion());
 this.engine);

        checkDeviceLocked();
    }
+5 −2
Original line number Diff line number Diff line
@@ -19,6 +19,8 @@
 */
package com.seagate.kinetic.simulator.io.provider.nio;

import io.netty.channel.ChannelHandlerContext;

import java.util.logging.Logger;

import kinetic.simulator.SimulatorConfiguration;
@@ -27,8 +29,6 @@ import com.seagate.kinetic.common.lib.KineticMessage;
import com.seagate.kinetic.simulator.internal.ConnectionInfo;
import com.seagate.kinetic.simulator.internal.SimulatorEngine;

import io.netty.channel.ChannelHandlerContext;

/**
 * Nio connection manager utility.
 * 
@@ -58,6 +58,9 @@ public class NioConnectionStateManager {
        // get connection info for this channel
        ConnectionInfo cinfo = SimulatorEngine.getConnectionInfo(ctx);
        
        // check sequence
        cinfo.checkAndSetLastReceivedSequence(request);

        if (cinfo.getConnectionId() != request.getCommand().getHeader().getConnectionID()) {
            
            logger.warning ("expect connection Id="
+10 −4
Original line number Diff line number Diff line
@@ -31,6 +31,7 @@ import com.seagate.kinetic.proto.Kinetic.Command.MessageType;
import com.seagate.kinetic.proto.Kinetic.Command.Status;
import com.seagate.kinetic.proto.Kinetic.Command.Status.StatusCode;
import com.seagate.kinetic.proto.Kinetic.Message.AuthType;
import com.seagate.kinetic.simulator.internal.SimulatorEngine;

class HeaderException extends Exception {
	private static final long serialVersionUID = 5201751340412081922L;
@@ -80,7 +81,7 @@ public class HeaderOp {
	}

	public static void checkHeader(KineticMessage km, KineticMessage kmresp,
			Key key, long clusterVersion) throws HeaderException {
            Key key, SimulatorEngine engine) throws HeaderException {

		LOG.fine("Header processing");
		
@@ -93,6 +94,11 @@ public class HeaderOp {
						"no header");
			}

            if (km.getIsInvalidRequest()) {
                throw new HeaderException(Status.StatusCode.INVALID_REQUEST,
                        km.getErrorMessage());
            }

			Command.Header in = km.getCommand().getHeader();

			// set ack sequence
@@ -110,16 +116,16 @@ public class HeaderOp {
                // check hmac
                checkHmac(km, key);

                if (in.getClusterVersion() != clusterVersion) {
                if (in.getClusterVersion() != engine.getClusterVersion()) {

                    // set cluster version in response message
                    respCommandBuilder.getHeaderBuilder().setClusterVersion(
                            clusterVersion);
                            engine.getClusterVersion());

                    throw new HeaderException(
                            Status.StatusCode.VERSION_FAILURE,
                            "CLUSTER_VERSION_FAILURE: Simulator cluster version is "
                                    + clusterVersion
                                    + engine.getClusterVersion()
                                    + "; Received request cluster version is "
                                    + in.getClusterVersion());
                }