Commit 400b7f12 authored by chiaming2000's avatar chiaming2000
Browse files

Minor error handling/validation improvement for batch operations.

parent 7183733f
Loading
Loading
Loading
Loading
+22 −0
Original line number Diff line number Diff line
@@ -49,6 +49,9 @@ public class KineticMessage {
    // set to true if this is a batch message
    private volatile boolean isBatchMessage = false;

    // set to true if this is the first message in the batch
    private volatile boolean isFirstBatchMessage = false;

	/**
	 * Set protocol buffer message.
	 *
@@ -140,4 +143,23 @@ public class KineticMessage {
        this.isBatchMessage = flag;
    }

    /**
     * Get if this message is the first batch message.
     * 
     * @return true if this is a batch message
     */
    public boolean getIsFirstBatchMessage() {
        return this.isFirstBatchMessage;
    }

    /**
     * Set if this message is the first batch message.
     * 
     * @param flag
     *            true if this is a batch message.
     */
    public void setIsFirstBatchMessage(boolean flag) {
        this.isFirstBatchMessage = flag;
    }

}
+10 −6
Original line number Diff line number Diff line
@@ -60,7 +60,7 @@ public class BatchOperationHandler {
    private ArrayList<KineticMessage> list = new ArrayList<KineticMessage>();

    @SuppressWarnings("unchecked")
    public BatchOperationHandler(KineticMessage request, KineticMessage respond,
    public BatchOperationHandler(KineticMessage request,
 SimulatorEngine engine)
            throws InvalidBatchException {

@@ -94,11 +94,14 @@ public class BatchOperationHandler {

        try {

            // check if request is from the same client/connection
            if (request.getCommand().getHeader().getConnectionID() != this.cid) {
                throw new RuntimeException("DB is locked by: " + cid
                        + ", request cid: "
                        + request.getCommand().getHeader().getConnectionID());
            // check if request is from the same batch
            if (request.getCommand().getHeader().getConnectionID() != this.cid
                    || request.getCommand().getHeader().getBatchID() != this.batchId) {

                throw new RuntimeException("DB is locked by: " + cid + "-"
                        + batchId + ", request id: "
                        + request.getCommand().getHeader().getConnectionID()
                        + request.getCommand().getHeader().getBatchID());
            }

            MessageType mtype = request.getCommand().getHeader()
@@ -246,6 +249,7 @@ public class BatchOperationHandler {

        ByteString key = requestKeyValue.getKey();

        @SuppressWarnings("unchecked")
        KVValue storeKv = (KVValue) store.get(key);

        if (storeKv != null) {
+15 −8
Original line number Diff line number Diff line
@@ -563,7 +563,7 @@ public class SimulatorEngine implements MessageService {
            try {
                
                if (this.batchOp != null && batchOp.isClosed()) {
                    this.releaseBatchOperation();
                    this.endBatchOperation(kmreq);
                }
                
                // get command byte stirng
@@ -1027,8 +1027,9 @@ public class SimulatorEngine implements MessageService {
        return this.deviceLocked;
    }

    private synchronized void initBatchOperation(KineticMessage kmreq,
            KineticMessage kmresp) throws InvalidBatchException {
    @Override
    public synchronized void startBatchOperation(KineticMessage kmreq)
            throws InvalidBatchException {

        if (this.batchOp != null) {
            throw new InvalidBatchException("batch op already started");
@@ -1041,12 +1042,13 @@ public class SimulatorEngine implements MessageService {
        this.isInBatchMode = true;

        // start a new batch, db is locked by this user
        this.batchOp = new BatchOperationHandler(kmreq, kmresp, this);
        this.batchOp = new BatchOperationHandler(kmreq, this);

        logger.info("batch op handler initialized ...");
        logger.info("batch op started ...");
    }

    private synchronized void releaseBatchOperation() {
    @Override
    public synchronized void endBatchOperation(KineticMessage kmreq) {

        this.batchOp = null;

@@ -1054,7 +1056,7 @@ public class SimulatorEngine implements MessageService {

        this.notifyAll();

        logger.info("batch op handler released ...");
        logger.info("batch op ended/released ...");
    }

    private synchronized void checkBatchMode(KineticMessage kmreq) {
@@ -1095,9 +1097,14 @@ public class SimulatorEngine implements MessageService {
            return;
        }

        if (kmreq.getIsFirstBatchMessage()) {
            startBatchOperation(kmreq);
        }

        // init batch operation
        if (this.batchOp == null) {
            this.initBatchOperation(kmreq, kmresp);
            throw new InvalidBatchException(
                    "batch operation is either not started or failed");
        }

        // process batch message
+15 −2
Original line number Diff line number Diff line
@@ -22,6 +22,7 @@ package com.seagate.kinetic.simulator.io.provider.nio.tcp;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
@@ -31,6 +32,7 @@ import com.seagate.kinetic.common.lib.KineticMessage;
import com.seagate.kinetic.proto.Kinetic.Command.MessageType;
import com.seagate.kinetic.simulator.internal.ConnectionInfo;
import com.seagate.kinetic.simulator.internal.FaultInjectedCloseConnectionException;
import com.seagate.kinetic.simulator.internal.InvalidBatchException;
import com.seagate.kinetic.simulator.internal.SimulatorEngine;
import com.seagate.kinetic.simulator.io.provider.nio.BatchQueue;
import com.seagate.kinetic.simulator.io.provider.nio.NioConnectionStateManager;
@@ -260,8 +262,8 @@ public class NioMessageServiceHandler extends
    }

    private synchronized void processBatchQueue(ChannelHandlerContext ctx,
            KineticMessage km)
            throws InterruptedException {
            KineticMessage km) throws InterruptedException,
            InvalidBatchException {
        
        String key = km.getCommand().getHeader().getConnectionID() + SEP
                + km.getCommand().getHeader().getBatchID();
@@ -274,12 +276,23 @@ public class NioMessageServiceHandler extends

        try {

            List<KineticMessage> mlist = batchQueue.getMessageList();
            if (mlist.size() > 0) {
                mlist.get(0).setIsFirstBatchMessage(true);
            }

            for (KineticMessage request : batchQueue.getMessageList()) {
                this.processRequest(ctx, request);
            }

        } finally {

            this.batchMap.remove(key);

            /**
             * end batch is called in the end of message processing
             * (simulatorEngine).
             */
        }
    }

+23 −0
Original line number Diff line number Diff line
@@ -24,6 +24,7 @@ import kinetic.simulator.SimulatorConfiguration;

import com.seagate.kinetic.common.lib.KineticMessage;
import com.seagate.kinetic.simulator.internal.ConnectionInfo;
import com.seagate.kinetic.simulator.internal.InvalidBatchException;
import com.seagate.kinetic.simulator.io.provider.nio.NioEventLoopGroupManager;

/**
@@ -85,4 +86,26 @@ public interface MessageService {
	 * @return
	 */
	public ConnectionInfo registerNewConnection (ChannelHandlerContext ctx);

    /**
     * Start a new batch operation.
     * 
     * @param req
     *            request message
     * @throws InvalidBatchException
     *             if any internal error occurred.
     */
    public void startBatchOperation(KineticMessage req)
            throws InvalidBatchException;

    /**
     * End the batch operation.
     * 
     * @param req
     *            request message
     * @throws InvalidBatchException
     *             if any internal error occurred.
     */
    public void endBatchOperation(KineticMessage req)
            throws InvalidBatchException;
}
Loading