Commit 670fca91 authored by chiaming2000's avatar chiaming2000
Browse files

Batch operation exception handling improvement and code clean up.

When an exception occurred for a batch operation, the response status
code for the message is set to the proper status code defined in
kinetic.proto (such as VERSION_MISMATCH). 

The subsequential response messages' status code
(PUT_RESPONSE/DELETE_RESPONSE) in the failed batch are set to
NOT_ATTEMPTED, and the END_BATCH_RESPONSE status is set to
INVALID_BATCH.
parent aa7cc92f
Loading
Loading
Loading
Loading
+186 −44
Original line number Diff line number Diff line
@@ -28,6 +28,7 @@ import com.seagate.kinetic.common.lib.KineticMessage;
import com.seagate.kinetic.proto.Kinetic.Command.Algorithm;
import com.seagate.kinetic.proto.Kinetic.Command.KeyValue;
import com.seagate.kinetic.proto.Kinetic.Command.MessageType;
import com.seagate.kinetic.proto.Kinetic.Command.Status.StatusCode;
import com.seagate.kinetic.simulator.persist.BatchOperation;
import com.seagate.kinetic.simulator.persist.KVValue;
import com.seagate.kinetic.simulator.persist.Store;
@@ -43,100 +44,238 @@ public class BatchOperationHandler {
    private final static Logger logger = Logger.getLogger(BatchOperationHandler.class
            .getName());

    @SuppressWarnings("unused")
    private long MAX_TIME_OUT = 30000;

    private SimulatorEngine engine = null;

    @SuppressWarnings("rawtypes")
    private Store store = null;

    RequestContext context = null;

    private long cid = -1;

    private int batchId = -1;

    boolean isEndBatch = false;
    // boolean isClosed = false;

    private BatchOperation<ByteString, KVValue> batch = null;

    private ArrayList<KineticMessage> list = new ArrayList<KineticMessage>();

    @SuppressWarnings("unchecked")
    public BatchOperationHandler(KineticMessage request,
 SimulatorEngine engine)
            throws InvalidBatchException {

        try {
            // start batch
            batch = engine.getStore().createBatchOperation();

            // this batch op handler belongs to this connection
            this.cid = request.getCommand().getHeader().getConnectionID();

            this.batchId = request.getCommand().getHeader().getBatchID();
    public BatchOperationHandler(SimulatorEngine engine) {

            // simulator engine
            this.engine = engine;

            // store
            this.store = engine.getStore();
    }

        } catch (Exception e) {
    @SuppressWarnings("unchecked")
    public synchronized void init(RequestContext context)
            throws InvalidBatchException, InvalidRequestException {

        if (this.batch != null) {
            throw new InvalidRequestException("Alread in batch mode.");
        }

        // init with this context
        this.context = context;

        // this batch op handler belongs to this connection
        this.cid = context.getRequestMessage().getCommand().getHeader()
                .getConnectionID();

        // batch Id
        this.batchId = context.getRequestMessage().getCommand().getHeader()
                .getBatchID();

        // start batch
        try {
            // create new batch instance
            batch = engine.getStore().createBatchOperation();
        } catch (KVStoreException e) {
            throw new InvalidBatchException(e);
        }
    }

    public synchronized boolean handleRequest(KineticMessage request,
            KineticMessage response)
            throws InvalidBatchException, NotAttemptedException {
    public void checkBatchMode(KineticMessage kmreq)
            throws InvalidRequestException {

        if (this.isEndBatch) {
            throw new InvalidBatchException("batch is not started or has ended");
        if (kmreq.getIsInvalidBatchMessage()) {
            throw new InvalidRequestException(
                    "Invalid batch Id found in message: "
                            + kmreq.getCommand().getHeader().getBatchID());
        }

        if (this.batch == null) {
            return;
        }

        if (kmreq.getCommand().getHeader().getBatchID() == this.batchId
                && kmreq.getCommand().getHeader().getConnectionID() == this.cid) {

            return;
        }

        this.waitForBatchToFinish();
    }

    private synchronized void waitForBatchToFinish() {



        long timeout = 0;
        long period = 3000;

        while (batch != null) {

            try {

            // check if request is from the same batch
            if (request.getCommand().getHeader().getConnectionID() != this.cid
                    || request.getCommand().getHeader().getBatchID() != this.batchId) {
                this.wait(period);

                if (batch == null) {
                    return;
                }

                throw new RuntimeException("DB is locked by: " + cid + "-"
                        + batchId + ", request id: "
                        + request.getCommand().getHeader().getConnectionID()
                        + request.getCommand().getHeader().getBatchID());
                timeout += period;

                if (timeout >= MAX_TIME_OUT) {
                    throw new RuntimeException(
                            "Timeout waiting for batch mode to finish");
                } else {
                    logger.warning("waiting for batch mode to finish., total wait time ="
                            + timeout);
                }
            } catch (InterruptedException e) {
                logger.log(Level.WARNING, e.getMessage(), e);
            }
        }
    }

            MessageType mtype = request.getCommand().getHeader()
                    .getMessageType();
    public synchronized void handleRequest(RequestContext context)
            throws InvalidBatchException,
            NotAttemptedException, KVStoreException {

        /**
         * messages will be queued or dequeued. no process until end batch is
         * received.
         */
        if (context.getMessageType() == MessageType.START_BATCH
                || context.getMessageType() == MessageType.ABORT_BATCH) {
            return;
        }

        try {

            /**
             * start batch if message is tagged as first batch message
             */
            if (context.getRequestMessage().getIsFirstBatchMessage()) {
                this.init(context);
            }

            // check if this is a valid batch message
            checkBatch(context);

            MessageType mtype = context.getMessageType();

            if (mtype == MessageType.END_BATCH) {
                this.commitBatch();
            } else if (mtype == MessageType.PUT) {
                this.batchPut(request);
                this.batchPut(context.getRequestMessage());
            } else if (mtype == MessageType.DELETE) {
                this.batchDelete(request);
                this.batchDelete(context.getRequestMessage());
            } else {
                throw new NotAttemptedException("invalid message type: "
                        + mtype);
            }
        } catch (NotAttemptedException nae) {

            logger.log(Level.WARNING, nae.getMessage(), nae);

            // set status code and message
            context.getCommandBuilder().getStatusBuilder()
                    .setCode(StatusCode.NOT_ATTEMPTED);
            context.getCommandBuilder().getStatusBuilder()
                    .setStatusMessage(nae.getMessage());

            close();

            throw nae;
        } catch (KVStoreVersionMismatch vmismatch) {

            logger.log(Level.WARNING, vmismatch.getMessage(), vmismatch);

            // set status code and message
            context.getCommandBuilder().getStatusBuilder()
                    .setCode(StatusCode.VERSION_MISMATCH);
            context.getCommandBuilder().getStatusBuilder()
                    .setStatusMessage(vmismatch.getMessage());

            close();

            throw vmismatch;
        } catch (KVStoreException kvse) {

            logger.log(Level.WARNING, kvse.getMessage(), kvse);

            context.getCommandBuilder().getStatusBuilder()
                    .setCode(StatusCode.INTERNAL_ERROR);
            context.getCommandBuilder().getStatusBuilder()
                    .setStatusMessage(kvse.getMessage());

            close();
            throw new NotAttemptedException(kvse);

            throw kvse;
        } catch (Exception e) {

            logger.log(Level.WARNING, e.getMessage(), e);

            context.getCommandBuilder().getStatusBuilder()
                    .setCode(StatusCode.INVALID_BATCH);
            context.getCommandBuilder().getStatusBuilder()
                    .setStatusMessage(e.getMessage());

            close();
            throw new InvalidBatchException(e);
        }
    }

    private void checkBatch(RequestContext context)
            throws InvalidBatchException, NotAttemptedException {

        return isEndBatch;
        if (this.batch == null) {

            String msg = "batch is not started or has ended";

            if (context.getMessageType() == MessageType.END_BATCH) {
                throw new InvalidBatchException(msg);
            } else {
                throw new NotAttemptedException(msg);
            }
        }

    private void batchDelete(KineticMessage km) throws KVStoreException {
        // check if request is from the same batch
        if (context.getRequestMessage().getCommand().getHeader()
                .getConnectionID() != this.cid
                || context.getRequestMessage().getCommand().getHeader()
                        .getBatchID() != this.batchId) {

            throw new RuntimeException("DB is locked by: "
                    + cid
                    + "-"
                    + batchId
                    + ", request id: "
                    + context.getRequestMessage().getCommand().getHeader()
                            .getConnectionID()
                    + context.getRequestMessage().getCommand().getHeader()
                            .getBatchID());
        }

    }

    private void batchDelete(KineticMessage km) throws KVStoreException {

        // proto request KV
        KeyValue requestKeyValue = km.getCommand().getBody().getKeyValue();
@@ -200,13 +339,15 @@ public class BatchOperationHandler {
    /**
     * close the current batch operation.
     */
    public void close() {
    public synchronized void close() {

        try {
        if (this.batch == null) {
            return;
        }

            if (batch != null) {
        try {
            // close db batch
            batch.close();
            }

            // clear list
            list.clear();
@@ -215,7 +356,8 @@ public class BatchOperationHandler {
        } catch (Exception e) {
            logger.log(Level.WARNING, e.getMessage(), e);
        } finally {
            this.isEndBatch = true;
            batch = null;
            this.notifyAll();
        }
    }

@@ -274,7 +416,7 @@ public class BatchOperationHandler {
    }

    public synchronized boolean isClosed() {
        return this.isEndBatch;
        return (this.batch == null);
    }

    public long getConnectionId() {
+7 −129
Original line number Diff line number Diff line
@@ -166,9 +166,6 @@ public class SimulatorEngine implements MessageService {

    private volatile boolean deviceLocked = false;

    // db lock for batch operations
    private volatile boolean isInBatchMode = false;

    static {
        // add shutdown hook to clean up resources
        Runtime.getRuntime().addShutdownHook(shutdownHook);
@@ -308,6 +305,8 @@ public class SimulatorEngine implements MessageService {

    private void initHandlers() {
        this.manager = new CommandManager(this);
        
        this.batchOp = new BatchOperationHandler(this);
    }

    public CommandManager getCommandManager() {
@@ -456,27 +455,16 @@ public class SimulatorEngine implements MessageService {
            // prepare to process this request
            context.preProcessRequest();

            checkBatchMode(kmreq);
            // check if in batch mode
            this.batchOp.checkBatchMode(kmreq);

            if (kmreq.getIsBatchMessage()) {
                this.processBatchOpMessage(context);
                this.batchOp.handleRequest(context);
            } else {
                // process request
                context.processRequest();
            }

        } catch (NotAttemptedException nae) {
            // handle not attempted exception
            context.getCommandBuilder().getStatusBuilder()
                    .setCode(StatusCode.NOT_ATTEMPTED);
            context.getCommandBuilder().getStatusBuilder()
                    .setStatusMessage(nae.getMessage());
        } catch (InvalidBatchException boe) {
            // handle invalid batch exception
            context.getCommandBuilder().getStatusBuilder()
                    .setCode(StatusCode.INVALID_BATCH);
            context.getCommandBuilder().getStatusBuilder()
                    .setStatusMessage(boe.getMessage());
        } catch (Exception e) {

            logger.log(Level.WARNING, e.getMessage(), e);
@@ -502,8 +490,6 @@ public class SimulatorEngine implements MessageService {

            try {

                checkAndReleaseBatchOperation(kmreq);

                // post process message
                context.postProcessRequest();

@@ -517,12 +503,6 @@ public class SimulatorEngine implements MessageService {
        return context.getResponseMessage();
    }

    public void checkAndReleaseBatchOperation(KineticMessage request) {
        if (this.batchOp != null && batchOp.isClosed()) {
            this.endBatchOperation(request);
        }
    }

    private void addStatisticCounter(KineticMessage kmreq, KineticMessage kmresp) {

        try {
@@ -820,12 +800,6 @@ public class SimulatorEngine implements MessageService {
            logger.log(Level.WARNING, e.getMessage(), e);
        }

        // cb.getBodyBuilder().getGetLogBuilder().getConfigurationBuilder().setProtocolVersion(PROTOCOL_VERSION);
        // cb.getBodyBuilder().getGetLogBuilder().getConfigurationBuilder().setCompilationDate(ConfigurationUtil.COMPILATION_DATE);
        // cb.getBodyBuilder().getGetLogBuilder().getConfigurationBuilder().setModel(ConfigurationUtil.MODEL);
        // cb.getBodyBuilder().getGetLogBuilder().getConfigurationBuilder().setVersion(SimulatorConfiguration.getSimulatorVersion());
        // cb.getBodyBuilder().getGetLogBuilder().getConfigurationBuilder().setSerialNumber(ByteString.copyFrom(ConfigurationUtil.SERIAL_NUMBER));
        //
        // limits
        try {
            Limits limits = LimitsUtil.getLimits(this.config);
@@ -845,8 +819,8 @@ public class SimulatorEngine implements MessageService {

        ctx.writeAndFlush(km);

        logger.info("***** connection registered., sent UNSOLICITEDSTATUS with cid = "
                + info.getConnectionId());
        // logger.info("***** connection registered., sent UNSOLICITEDSTATUS with cid = "
        // + info.getConnectionId());

        return info;
    }
@@ -935,100 +909,4 @@ public class SimulatorEngine implements MessageService {
        return this.deviceLocked;
    }

    @Override
    public synchronized void startBatchOperation(KineticMessage kmreq)
            throws InvalidBatchException {

        if (this.batchOp != null) {
            throw new InvalidBatchException("batch op already started");
        }

        if (kmreq.getCommand().getHeader().getMessageType() == MessageType.END_BATCH) {
            throw new InvalidBatchException("batch op failed");
        }

        this.isInBatchMode = true;

        // start a new batch, db is locked by this user
        this.batchOp = new BatchOperationHandler(kmreq, this);

        logger.info("batch op started ...");
    }

    @Override
    public synchronized void endBatchOperation(KineticMessage kmreq) {

        if (this.batchOp != null && batchOp.isClosed()) {

            this.batchOp = null;

            this.isInBatchMode = false;

            this.notifyAll();

            logger.info("batch op ended/released ...");
        }
    }

    private synchronized void checkBatchMode(KineticMessage kmreq)
            throws InvalidRequestException {

        if (kmreq.getIsInvalidBatchMessage()) {
            throw new InvalidRequestException(
                    "Invalid batch Id found in message: "
                            + kmreq.getCommand().getHeader().getBatchID());
        }

        if (this.isInBatchMode == false) {
            return;
        }

        if (kmreq.getCommand().getHeader().getBatchID() == this.batchOp
                .getBatchId()
                && kmreq.getCommand().getHeader().getConnectionID() == this.batchOp
                        .getConnectionId()) {

            return;
        }

        while (this.isInBatchMode) {

            try {
                this.wait(3000);
                if (this.isInBatchMode) {
                    logger.warning("waiting for batch mode to complete...");
                }
            } catch (InterruptedException e) {
                logger.log(Level.WARNING, e.getMessage(), e);
            }
        }
    }

    public void processBatchOpMessage(RequestContext context)
            throws InvalidBatchException,
            NotAttemptedException {

        MessageType mtype = context.getMessageType();

        if (mtype == MessageType.START_BATCH
                || mtype == MessageType.ABORT_BATCH) {
            return;
        }

        if (context.getRequestMessage().getIsFirstBatchMessage()) {
            startBatchOperation(context.getRequestMessage());
        }

        // init batch operation
        if (this.batchOp == null) {

            throw new InvalidBatchException(
                    "batch operation is either not started or failed");
        }

        // process batch message
        this.batchOp.handleRequest(context.getRequestMessage(),
                context.getResponseMessage());
    }

}
+0 −23
Original line number Diff line number Diff line
@@ -24,7 +24,6 @@ import kinetic.simulator.SimulatorConfiguration;

import com.seagate.kinetic.common.lib.KineticMessage;
import com.seagate.kinetic.simulator.internal.ConnectionInfo;
import com.seagate.kinetic.simulator.internal.InvalidBatchException;
import com.seagate.kinetic.simulator.io.provider.nio.NioEventLoopGroupManager;

/**
@@ -86,26 +85,4 @@ public interface MessageService {
	 * @return
	 */
	public ConnectionInfo registerNewConnection (ChannelHandlerContext ctx);

    /**
     * Start a new batch operation.
     * 
     * @param req
     *            request message
     * @throws InvalidBatchException
     *             if any internal error occurred.
     */
    public void startBatchOperation(KineticMessage req)
            throws InvalidBatchException;

    /**
     * End the batch operation.
     * 
     * @param req
     *            request message
     * @throws InvalidBatchException
     *             if any internal error occurred.
     */
    public void endBatchOperation(KineticMessage req)
            throws InvalidBatchException;
}