Commit 88a52932 authored by chiaming2000's avatar chiaming2000
Browse files

Batch operation (simulator):

added error handling to check if batch operation commands are sent from
same client.
parent 93071d8e
Loading
Loading
Loading
Loading
+37 −42
Original line number Diff line number Diff line
@@ -54,6 +54,8 @@ public class NioMessageServiceHandler extends

    private BatchQueue batchQueue = null;

    private ChannelHandlerContext batchCtx = null;

	private static boolean faultInjectCloseConnection = Boolean
			.getBoolean(FaultInjectedCloseConnectionException.FAULT_INJECT_CLOSE_CONNECTION);

@@ -97,7 +99,7 @@ public class NioMessageServiceHandler extends
		NioConnectionStateManager.checkIfConnectionIdSet(ctx, request);

        // add to queue if batchQueue has started
        if (this.shouldAddToBatch(request)) {
        if (this.shouldAddToBatch(ctx, request)) {

            this.addToBatchQueue(request);

@@ -106,9 +108,9 @@ public class NioMessageServiceHandler extends
        }

        // check if this is a start batchQueue message
        if (this.isStartBatch(request)) {
            this.createBatchQueue(request);
        } else if (this.isEndBatch(request)) {
        if (this.isStartBatch(ctx, request)) {
            this.createBatchQueue(ctx, request);
        } else if (this.isEndBatch(ctx, request)) {
            this.processBatchQueue(ctx);
        }

@@ -153,75 +155,67 @@ public class NioMessageServiceHandler extends
	@Override
	public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
	    
	 // remove connection info of the channel handler context from conn info map
        // remove connection info of the channel handler context from conn info
        // map
	    @SuppressWarnings("unused")
        ConnectionInfo info = SimulatorEngine.removeConnectionInfo(ctx);

	    //logger.info("connection info is removed, id=" + info.getConnectionId() );
	    
		// close process runner
		if (this.queuedRequestProcessRunner != null) {
			logger.fine("removing/closing nio queued request process runner ...");
			this.queuedRequestProcessRunner.close();
		}
        if (ctx == batchCtx) {
            this.batchCtx = null;
            this.batchQueue = null;
        }
	   
    @SuppressWarnings("unused")
    private boolean isBatchMessage(KineticMessage request) {

        MessageType mtype = request.getCommand().getHeader().getMessageType();

        switch (mtype) {
        case START_BATCH:
        case END_BATCH:
            return true;
        default:
            return false;
        }
	    //logger.info("connection info is removed, id=" + info.getConnectionId() );
	}

    private boolean isStartBatch(KineticMessage request) {
    private boolean isStartBatch(ChannelHandlerContext ctx,
            KineticMessage request) {

        if (request.getCommand().getHeader().getMessageType() == MessageType.START_BATCH) {
        if (batchQueue == null
                && (request.getCommand().getHeader().getMessageType() == MessageType.START_BATCH)) {
            return true;
        }

        return false;
    }

    private boolean isEndBatch(KineticMessage request) {
    private boolean isEndBatch(ChannelHandlerContext ctx, KineticMessage request) {

        if (request.getCommand().getHeader().getMessageType() == MessageType.END_BATCH) {

            if (ctx == this.batchCtx) {
                return true;
            } else {
                throw new RuntimeException(
                        "Received END_BATCH from wrong client");
            }
        return false;
        }

    /**
     * check if batch has started.
     * 
     * @return
     */
    @SuppressWarnings("unused")
    private boolean isBatchStarted() {
        return (batchQueue != null);
        return false;
    }

    private synchronized void addToBatchQueue(KineticMessage request) {
        batchQueue.add(request);
    }

    private synchronized void createBatchQueue(KineticMessage request) {
    private synchronized void createBatchQueue(ChannelHandlerContext ctx,
            KineticMessage request) {

        if (batchQueue == null) {
            this.batchCtx = ctx;
            this.batchQueue = new BatchQueue(request);
        } else {
            // concurrent start batch is not allowed
            throw new RuntimeException("batch already started");
        }
    }

    private boolean shouldAddToBatch(KineticMessage request) {
    private boolean shouldAddToBatch(ChannelHandlerContext ctx,
            KineticMessage request) {

        boolean flag = false;

        if (batchQueue != null && batchQueue.isSameClient(request)) {
        if (batchQueue != null && (ctx == batchCtx)) {

            MessageType mtype = request.getCommand().getHeader()
                    .getMessageType();
@@ -245,6 +239,7 @@ public class NioMessageServiceHandler extends

        } finally {
            this.batchQueue = null;
            this.batchCtx = null;
        }
    }
}