Commit e87de74b authored by chiaming2000's avatar chiaming2000
Browse files

Batch operation code clean up.

parent a8a868e1
Loading
Loading
Loading
Loading
+26 −52
Original line number Diff line number Diff line
/**
 * 
 * Copyright (C) 2014 Seagate Technology.
 * 
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
 *
 */
package com.seagate.kinetic.simulator.io.provider.nio;

import java.util.ArrayList;
@@ -5,7 +24,6 @@ import java.util.List;
import java.util.logging.Logger;

import com.seagate.kinetic.common.lib.KineticMessage;
import com.seagate.kinetic.proto.Kinetic.Command.MessageType;

public class BatchQueue {

@@ -15,17 +33,8 @@ public class BatchQueue {

    private ArrayList<KineticMessage> mlist = new ArrayList<KineticMessage>();

    private long cid = -1;

    private int queueDepth = 100;

    public static boolean isStartBatchMessage(KineticMessage request) {
        return (request.getCommand().getHeader().getMessageType() == MessageType.START_BATCH);
    }

    public BatchQueue(KineticMessage request) {
        // this batch op handler belongs to this connection
        this.cid = request.getCommand().getHeader().getConnectionID();
        ;
    }

    public void add(KineticMessage request) {
@@ -33,51 +42,16 @@ public class BatchQueue {
        this.mlist.add(request);
    }

    /**
     * only one batch at a time is supported.
     * 
     * @param request
     */
    @SuppressWarnings("unused")
    private void checkPermission(KineticMessage request) {

        // check if request is from the same client/connection
        if (request.getCommand().getHeader().getConnectionID() != this.cid) {
            throw new RuntimeException("DB is locked by: " + cid
                    + ", request cid: "
                    + request.getCommand().getHeader().getConnectionID());
        }

        // check message type, only supports put/delete
        this.checkMessageType(request);

        // check if reached limit
        if (mlist.size() > this.queueDepth) {
            throw new RuntimeException("exceed max queue depth: "
                    + this.queueDepth);
        }
    }

    private void checkMessageType(KineticMessage request) {

        MessageType mtype = request.getCommand().getHeader().getMessageType();

        switch (mtype) {
        case PUT:
        case DELETE:
            return;
        default:
            throw new RuntimeException("invalid message type: " + mtype);
        }

    }

    public List<KineticMessage> getMessageList() {
        return this.mlist;
    }

    public boolean isSameClient(KineticMessage request) {
        return (cid == request.getCommand().getHeader().getConnectionID());
    public int size() {
        return this.mlist.size();
    }

    public void clear() {
        this.mlist.clear();
    }

}
+221 −0
Original line number Diff line number Diff line
/**
 * 
 * Copyright (C) 2014 Seagate Technology.
 * 
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
 *
 */
package com.seagate.kinetic.simulator.io.provider.nio;

import io.netty.channel.ChannelHandlerContext;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Logger;

import com.seagate.kinetic.common.lib.KineticMessage;
import com.seagate.kinetic.proto.Kinetic.Command.MessageType;
import com.seagate.kinetic.simulator.internal.InvalidBatchException;
import com.seagate.kinetic.simulator.internal.KVStoreException;

/**
 * Batch op Pre processor.
 * <p>
 * 
 * @author chiaming
 *
 */
public class NioBatchOpPreProcessor {

    private static final Logger logger = Logger
            .getLogger(NioBatchOpPreProcessor.class.getName());

    private static final String SEP = ".";

    // key = connId + "-" + batchId
    private static Map<String, BatchQueue> batchMap = new ConcurrentHashMap<String, BatchQueue>();

    public static boolean processMessage(NioMessageServiceHandler nioHandler,
            ChannelHandlerContext ctx, KineticMessage request)
            throws InterruptedException, InvalidBatchException,
            KVStoreException {

        MessageType mtype = getMessageType(request);

        // add to queue if batchQueue has started
        if (shouldAddToBatch(ctx, request, mtype)) {
            // the command was queued until END_BATCH is received
            return false;
        }

        switch (mtype) {
        case START_BATCH:
            request.setIsBatchMessage(true);
            createBatchQueue(ctx, request);
            break;
        case END_BATCH:
            request.setIsBatchMessage(true);
            processBatchQueue(nioHandler, ctx, request);
            break;
        case ABORT_BATCH:
            request.setIsBatchMessage(true);
            processBatchAbort(request);
            break;
        default:
            break;
        }

        return true;
    }

    private static MessageType getMessageType(KineticMessage request) {
        return request.getCommand().getHeader().getMessageType();
    }

    private static void createBatchQueue(
            ChannelHandlerContext ctx,
            KineticMessage request) {

        String key = request.getCommand().getHeader().getConnectionID() + SEP
                + request.getCommand().getHeader().getBatchID();

        BatchQueue batchQueue = batchMap.get(key);

        if (batchQueue == null) {
            batchQueue = new BatchQueue(request);
            batchMap.put(key, batchQueue);
        } else {
            // batch already started
            throw new RuntimeException("batch already started");
        }

        logger.info("batch queue created for key: " + key);
    }


    private static boolean shouldAddToBatch(ChannelHandlerContext ctx,
 KineticMessage request, MessageType mtype)
            throws KVStoreException {

        boolean flag = false;

        boolean hasBatchId = request.getCommand().getHeader().hasBatchID();

        /**
         * not in a batch
         */
        if (hasBatchId == false) {
            return false;
        }

        String key = request.getCommand().getHeader().getConnectionID() + SEP
                + request.getCommand().getHeader().getBatchID();

        BatchQueue batchQueue = batchMap.get(key);

        if (batchQueue != null) {

            if (mtype == MessageType.PUT || mtype == MessageType.DELETE) {

                // is added to batch queue
                flag = true;

                // is a batch message
                request.setIsBatchMessage(true);

                // add to batch queue
                batchQueue.add(request);
            }
        } else {
            // there is a batch ID not known at this point
            // the only allowed message type is start message.
            if (mtype != MessageType.START_BATCH) {
                request.setIsInvalidBatchMessage(true);
            }
        }

        return flag;
    }

    private static synchronized void processBatchQueue(
            NioMessageServiceHandler ioHandler,
            ChannelHandlerContext ctx,
            KineticMessage km) throws InterruptedException,
            InvalidBatchException {

        String key = km.getCommand().getHeader().getConnectionID() + SEP
                + km.getCommand().getHeader().getBatchID();

        BatchQueue batchQueue = batchMap.get(key);

        if (batchQueue == null) {
            throw new RuntimeException("No batch Id found for key: " + key);
        }

        try {

            List<KineticMessage> mlist = batchQueue.getMessageList();

            int msize = mlist.size();

            if (msize > 0) {
                mlist.get(0).setIsFirstBatchMessage(true);
            }


            for (int index = 0; index < msize; index++) {
                // get request message
                KineticMessage request = mlist.get(index);

                // process message
                ioHandler.processRequest(ctx, request);
            }

        } finally {
            cleanup(key);

            /**
             * end batch is called in the end of message processing
             * (simulatorEngine).
             */
        }
    }

    private static void processBatchAbort(KineticMessage km) {

        String key = km.getCommand().getHeader().getConnectionID() + SEP
                + km.getCommand().getHeader().getBatchID();

        boolean isFound = batchMap.containsKey(key);

        if (isFound == false) {
            throw new RuntimeException("No batch Id found for key: " + key);
        }

        cleanup(key);

        logger.info("batch aborted ... key=" + key);
    }

    private static void cleanup(String key) {

        BatchQueue batchQueue = batchMap.remove(key);

        if (batchQueue == null) {
            logger.warning("No batch Id found, key=" + key);
        }
    }
}
+8 −169
Original line number Diff line number Diff line
@@ -22,17 +22,12 @@ package com.seagate.kinetic.simulator.io.provider.nio;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.seagate.kinetic.common.lib.KineticMessage;
import com.seagate.kinetic.proto.Kinetic.Command.MessageType;
import com.seagate.kinetic.simulator.internal.ConnectionInfo;
import com.seagate.kinetic.simulator.internal.FaultInjectedCloseConnectionException;
import com.seagate.kinetic.simulator.internal.InvalidBatchException;
import com.seagate.kinetic.simulator.internal.SimulatorEngine;
import com.seagate.kinetic.simulator.io.provider.spi.MessageService;

@@ -46,17 +41,12 @@ public class NioMessageServiceHandler extends
	private static final Logger logger = Logger
			.getLogger(NioMessageServiceHandler.class.getName());

    private static final String SEP = "-";

	private MessageService lcservice = null;

	private boolean enforceOrdering = false;

	private NioQueuedRequestProcessRunner queuedRequestProcessRunner = null;

    // key = connId + "-" + batchId
    private Map<String, BatchQueue> batchMap = new ConcurrentHashMap<String, BatchQueue>();

	private static boolean faultInjectCloseConnection = Boolean
			.getBoolean(FaultInjectedCloseConnectionException.FAULT_INJECT_CLOSE_CONNECTION);

@@ -104,26 +94,16 @@ public class NioMessageServiceHandler extends
		// check if conn id is set
		NioConnectionStateManager.checkIfConnectionIdSet(ctx, request);

        // add to queue if batchQueue has started
        if (this.shouldAddToBatch(ctx, request)) {
            // the command was queued until END_BATCH is received
            return;
        }

        // check if this is a start batchQueue message
        if (this.isStartBatch(ctx, request)) {
            this.createBatchQueue(ctx, request);
        } else if (this.isEndBatch(ctx, request)) {
            this.processBatchQueue(ctx, request);
        } else if (this.isAbortBatch(ctx, request)) {
            this.processBatchAbort(ctx, request);
        }
        boolean shouldContinue = NioBatchOpPreProcessor.processMessage(this,
                ctx, request);

        if (shouldContinue) {
            // process regular request
            processRequest(ctx, request);
        }
    }

    private void processRequest(ChannelHandlerContext ctx,
    public void processRequest(ChannelHandlerContext ctx,
            KineticMessage request) throws InterruptedException {

        if (enforceOrdering) {
@@ -164,153 +144,12 @@ public class NioMessageServiceHandler extends
        // map
        ConnectionInfo info = SimulatorEngine.removeConnectionInfo(ctx);

        this.batchMap = null;

        logger.info("connection info is removed, id=" + info.getConnectionId()
                + ", is secure channel=" + this.isSecureChannel);
	}

    private boolean isStartBatch(ChannelHandlerContext ctx,
            KineticMessage request) {

        if (request.getCommand().getHeader().getMessageType() == MessageType.START_BATCH) {
            request.setIsBatchMessage(true);
            return true;
        }

        return false;
    }

    private boolean isEndBatch(ChannelHandlerContext ctx, KineticMessage request) {

        if (request.getCommand().getHeader().getMessageType() == MessageType.END_BATCH) {

            request.setIsBatchMessage(true);

            return true;
        }

        return false;
    }

    private boolean isAbortBatch(ChannelHandlerContext ctx,
            KineticMessage request) {

        if (request.getCommand().getHeader().getMessageType() == MessageType.ABORT_BATCH) {

            request.setIsBatchMessage(true);

            return true;
        }

        return false;
    }

    private synchronized void createBatchQueue(ChannelHandlerContext ctx,
            KineticMessage request) {

        String key = request.getCommand().getHeader().getConnectionID() + SEP
                + request.getCommand().getHeader().getBatchID();

        BatchQueue batchQueue = this.batchMap.get(key);

        if (batchQueue == null) {
            batchQueue = new BatchQueue(request);
            this.batchMap.put(key, batchQueue);
        } else {
            // concurrent start batch is not allowed
            throw new RuntimeException("batch already started");
        }

        logger.info("batch queue created for key: " + key);
    }

    private boolean shouldAddToBatch(ChannelHandlerContext ctx,
            KineticMessage request) {

        boolean flag = false;

        boolean hasBatchId = request.getCommand().getHeader().hasBatchID();
        if (hasBatchId == false) {
            return false;
        }

        String key = request.getCommand().getHeader().getConnectionID() + SEP
                + request.getCommand().getHeader().getBatchID();

        BatchQueue batchQueue = this.batchMap.get(key);

        MessageType mtype = request.getCommand().getHeader().getMessageType();

        if (batchQueue != null) {

            if (mtype == MessageType.PUT || mtype == MessageType.DELETE) {

                // is added to batch queue
                flag = true;

                // is a batch message
                request.setIsBatchMessage(true);

                // add to batch queue
                batchQueue.add(request);
            }
        } else {
            // there is a batch ID not known at this point
            // the only allowed message type is start message.
            if (mtype != MessageType.START_BATCH) {
                request.setIsInvalidBatchMessage(true);
            }
        }

        return flag;
    }

    private synchronized void processBatchQueue(ChannelHandlerContext ctx,
            KineticMessage km) throws InterruptedException,
            InvalidBatchException {
        
        String key = km.getCommand().getHeader().getConnectionID() + SEP
                + km.getCommand().getHeader().getBatchID();

        BatchQueue batchQueue = this.batchMap.get(key);

        if (batchQueue == null) {
            throw new RuntimeException("No batch Id found for key: " + key);
        }

        try {

            List<KineticMessage> mlist = batchQueue.getMessageList();
            if (mlist.size() > 0) {
                mlist.get(0).setIsFirstBatchMessage(true);
            }

            for (KineticMessage request : batchQueue.getMessageList()) {
                this.processRequest(ctx, request);
            }

        } finally {

            this.batchMap.remove(key);

            /**
             * end batch is called in the end of message processing
             * (simulatorEngine).
             */
        }
    }

    private void processBatchAbort(ChannelHandlerContext ctx, KineticMessage km) {
        String key = km.getCommand().getHeader().getConnectionID() + SEP
                + km.getCommand().getHeader().getBatchID();

        BatchQueue batchQueue = this.batchMap.remove(key);

        if (batchQueue == null) {
            throw new RuntimeException("No batch Id found for key: " + key);
    public MessageService getMessageService() {
        return this.lcservice;
    }

        logger.info("batch aborted ... key=" + key);
    }
}