Commit f91a2b33 authored by chiaming2000's avatar chiaming2000
Browse files

Batch operation feature improvement (work in progress):

Batch PUT/DELETE response messages are processed/sent after END_BATCH
message is received. 
parent 57d9c3d3
Loading
Loading
Loading
Loading
+209 −0
Original line number Diff line number Diff line
@@ -19,7 +19,6 @@
 */
package com.seagate.kinetic.simulator.internal;

import java.io.IOException;
import java.util.ArrayList;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -38,29 +37,39 @@ import com.seagate.kinetic.simulator.persist.KVValue;
 * @author chiaming
 *
 */
public class BatchOpHandler {
public class BatchOperationHandler {

    private final static Logger logger = Logger.getLogger(BatchOpHandler.class
    private final static Logger logger = Logger.getLogger(BatchOperationHandler.class
            .getName());

    @SuppressWarnings("unused")
    private SimulatorEngine engine = null;

    private long cid = -1;

    boolean isEndBatch = false;

    private int queueDepth = 100;

    private BatchOperation<ByteString, KVValue> batch = null;

    private ArrayList<KineticMessage> list = new ArrayList<KineticMessage>();

    public BatchOpHandler(KineticMessage request, KineticMessage respond,
            SimulatorEngine engine) {
    @SuppressWarnings("unchecked")
    public BatchOperationHandler(KineticMessage request, KineticMessage respond,
 SimulatorEngine engine)
            throws InvalidBatchException {

        try {
            // start batch
            batch = engine.getStore().createBatchOperation();

            // this batch op handler belongs to this connection
            this.cid = request.getCommand().getHeader().getConnectionID();

            // simulator engine
            this.engine = engine;
        } catch (Exception e) {
            throw new InvalidBatchException(e);
        }
    }

    public synchronized boolean handleRequest(KineticMessage request,
@@ -80,56 +89,34 @@ public class BatchOpHandler {
                        + request.getCommand().getHeader().getConnectionID());
            }

            if (request.getCommand().getHeader().getMessageType() == MessageType.END_BATCH) {
                // commit the batch
                isEndBatch = true;
                // do batch op.
            MessageType mtype = request.getCommand().getHeader()
                    .getMessageType();

            if (mtype == MessageType.END_BATCH) {
                this.commitBatch();
            } else if (mtype == MessageType.PUT) {
                this.batchPut(request);
            } else if (mtype == MessageType.DELETE) {
                this.batchDelete(request);
            } else {
                // check if reached limit
                if (list.size() <= this.queueDepth) {
                    list.add(request);
                    logger.info("*** added message to batch queue ...");
                } else {

                    logger.warning("exceed max queue depth: " + this.queueDepth);

                    throw new RuntimeException("exceed max queue depth: "
                            + this.queueDepth);
                }
                throw new NotAttemptedException("invalid message type: "
                        + mtype);
            }
        } catch (NotAttemptedException nae) {
            isEndBatch = true;
            close();
            throw nae;
        } catch (Exception e) {
            isEndBatch = true;
            close();
            throw new InvalidBatchException(e);
        } finally {

            if (isEndBatch) {
                // do lean up
                this.close();
            }
        }

        return isEndBatch;
    }

    @SuppressWarnings("unchecked")
    private synchronized void commitBatch() throws KVStoreException,
            IOException,
            NotAttemptedException {

        batch = engine.getStore().createBatchOperation();

        for (KineticMessage km : list) {
    private void batchDelete(KineticMessage km) throws NotAttemptedException {

        ByteString key = km.getCommand().getBody().getKeyValue().getKey();

            logger.info("*** batch op entry., key = " + key);

            if (km.getCommand().getHeader().getMessageType() == MessageType.DELETE) {

        if (km.getCommand().getBody().getKeyValue().getForce() == false) {

            boolean isVersionMatched = true;
@@ -144,8 +131,12 @@ public class BatchOpHandler {
        }

        batch.delete(key);
    }

    private void batchPut(KineticMessage km) throws NotAttemptedException {

        ByteString key = km.getCommand().getBody().getKeyValue().getKey();

            } else {
        ByteString valueByteString = null;

        if (km.getValue() != null) {
@@ -156,8 +147,7 @@ public class BatchOpHandler {
        }

        // KV in;
                KeyValue requestKeyValue = km.getCommand().getBody()
                        .getKeyValue();
        KeyValue requestKeyValue = km.getCommand().getBody().getKeyValue();

        Algorithm al = null;
        if (requestKeyValue.hasAlgorithm()) {
@@ -165,8 +155,8 @@ public class BatchOpHandler {
        }

        KVValue data = new KVValue(requestKeyValue.getKey(),
                        requestKeyValue.getNewVersion(),
                        requestKeyValue.getTag(), al, valueByteString);
                requestKeyValue.getNewVersion(), requestKeyValue.getTag(), al,
                valueByteString);

        // XXX: check version
        // ByteString dbv =
@@ -181,10 +171,14 @@ public class BatchOpHandler {

        logger.info("*** batch op put entry., key = " + key);
    }
        }

    private synchronized void commitBatch() {
        try {
            // commit db batch
            batch.commit();
        } finally {
            this.close();
        }
    }

    /**
+2 −2
Original line number Diff line number Diff line
@@ -142,7 +142,7 @@ public class SimulatorEngine implements MessageService {
    private P2POperationHandler p2pHandler = null;

    // batch op handler
    private BatchOpHandler batchOp = null;
    private BatchOperationHandler batchOp = null;

    private NioEventLoopGroupManager nioManager = null;

@@ -1034,7 +1034,7 @@ public class SimulatorEngine implements MessageService {
        }

        // start a new batch, db is locked by this user
        this.batchOp = new BatchOpHandler(kmreq, kmresp, this);
        this.batchOp = new BatchOperationHandler(kmreq, kmresp, this);

        logger.info("batch op handler initialized ...");
    }
+83 −0
Original line number Diff line number Diff line
package com.seagate.kinetic.simulator.io.provider.nio;

import java.util.ArrayList;
import java.util.List;
import java.util.logging.Logger;

import com.seagate.kinetic.common.lib.KineticMessage;
import com.seagate.kinetic.proto.Kinetic.Command.MessageType;

public class BatchQueue {

    @SuppressWarnings("unused")
    private static final Logger logger = Logger.getLogger(BatchQueue.class
            .getName());

    private ArrayList<KineticMessage> mlist = new ArrayList<KineticMessage>();

    private long cid = -1;

    private int queueDepth = 100;

    public static boolean isStartBatchMessage(KineticMessage request) {
        return (request.getCommand().getHeader().getMessageType() == MessageType.START_BATCH);
    }

    public BatchQueue(KineticMessage request) {
        // this batch op handler belongs to this connection
        this.cid = request.getCommand().getHeader().getConnectionID();
    }

    public void add(KineticMessage request) {
        // this.checkPermission(request);
        this.mlist.add(request);
    }

    /**
     * only one batch at a time is supported.
     * 
     * @param request
     */
    @SuppressWarnings("unused")
    private void checkPermission(KineticMessage request) {

        // check if request is from the same client/connection
        if (request.getCommand().getHeader().getConnectionID() != this.cid) {
            throw new RuntimeException("DB is locked by: " + cid
                    + ", request cid: "
                    + request.getCommand().getHeader().getConnectionID());
        }

        // check message type, only supports put/delete
        this.checkMessageType(request);

        // check if reached limit
        if (mlist.size() > this.queueDepth) {
            throw new RuntimeException("exceed max queue depth: "
                    + this.queueDepth);
        }
    }

    private void checkMessageType(KineticMessage request) {

        MessageType mtype = request.getCommand().getHeader().getMessageType();

        switch (mtype) {
        case PUT:
        case DELETE:
            return;
        default:
            throw new RuntimeException("invalid message type: " + mtype);
        }

    }

    public List<KineticMessage> getMessageList() {
        return this.mlist;
    }

    public boolean isSameClient(KineticMessage request) {
        return (cid == request.getCommand().getHeader().getConnectionID());
    }

}
+118 −9
Original line number Diff line number Diff line
@@ -26,9 +26,11 @@ import java.util.logging.Level;
import java.util.logging.Logger;

import com.seagate.kinetic.common.lib.KineticMessage;
import com.seagate.kinetic.proto.Kinetic.Command.MessageType;
import com.seagate.kinetic.simulator.internal.ConnectionInfo;
import com.seagate.kinetic.simulator.internal.FaultInjectedCloseConnectionException;
import com.seagate.kinetic.simulator.internal.SimulatorEngine;
import com.seagate.kinetic.simulator.io.provider.nio.BatchQueue;
import com.seagate.kinetic.simulator.io.provider.nio.NioConnectionStateManager;
import com.seagate.kinetic.simulator.io.provider.nio.NioQueuedRequestProcessRunner;
import com.seagate.kinetic.simulator.io.provider.nio.RequestProcessRunner;
@@ -50,6 +52,8 @@ public class NioMessageServiceHandler extends

	private NioQueuedRequestProcessRunner queuedRequestProcessRunner = null;

    private BatchQueue batchQueue = null;

	private static boolean faultInjectCloseConnection = Boolean
			.getBoolean(FaultInjectedCloseConnectionException.FAULT_INJECT_CLOSE_CONNECTION);

@@ -92,6 +96,29 @@ public class NioMessageServiceHandler extends
		// check if conn id is set
		NioConnectionStateManager.checkIfConnectionIdSet(ctx, request);

        // add to queue if batchQueue has started
        if (this.shouldAddToBatch(request)) {

            this.addToBatchQueue(request);

            // the commands are queued until END_BATCH is received
            return;
        }

        // check if this is a start batchQueue message
        if (this.isStartBatch(request)) {
            this.createBatchQueue(request);
        } else if (this.isEndBatch(request)) {
            this.processBatchQueue(ctx);
        }

        // process regular request
        processRequest(ctx, request);
    }

    private void processRequest(ChannelHandlerContext ctx,
            KineticMessage request) throws InterruptedException {

        if (enforceOrdering) {
            // process request sequentially
            queuedRequestProcessRunner.processRequest(ctx, request);
@@ -138,4 +165,86 @@ public class NioMessageServiceHandler extends
			this.queuedRequestProcessRunner.close();
		}
	}

    @SuppressWarnings("unused")
    private boolean isBatchMessage(KineticMessage request) {

        MessageType mtype = request.getCommand().getHeader().getMessageType();

        switch (mtype) {
        case START_BATCH:
        case END_BATCH:
            return true;
        default:
            return false;
        }
    }

    private boolean isStartBatch(KineticMessage request) {

        if (request.getCommand().getHeader().getMessageType() == MessageType.START_BATCH) {
            return true;
        }

        return false;
    }

    private boolean isEndBatch(KineticMessage request) {

        if (request.getCommand().getHeader().getMessageType() == MessageType.END_BATCH) {
            return true;
        }
        return false;
    }

    /**
     * check if batch has started.
     * 
     * @return
     */
    @SuppressWarnings("unused")
    private boolean isBatchStarted() {
        return (batchQueue != null);
    }

    private synchronized void addToBatchQueue(KineticMessage request) {
        batchQueue.add(request);
    }

    private synchronized void createBatchQueue(KineticMessage request) {
        if (batchQueue == null) {
            this.batchQueue = new BatchQueue(request);
        }
    }

    private boolean shouldAddToBatch(KineticMessage request) {

        boolean flag = false;

        if (batchQueue != null && batchQueue.isSameClient(request)) {

            MessageType mtype = request.getCommand().getHeader()
                    .getMessageType();

            if (mtype == MessageType.PUT || mtype == MessageType.DELETE) {
                flag = true;
            }
        }

        return flag;
    }

    private synchronized void processBatchQueue(ChannelHandlerContext ctx)
            throws InterruptedException {
        
        try {

            for (KineticMessage request : batchQueue.getMessageList()) {
                this.processRequest(ctx, request);
            }

        } finally {
            this.batchQueue = null;
        }
    }
}
+24 −24
Original line number Diff line number Diff line
@@ -57,34 +57,29 @@ public class BatchOperationExample implements CallbackHandler<Entry> {
        // create client instance
        client = KineticClientFactory.createInstance(clientConfig);

        logger.info("*** starting batch op ...");
        // put entry bar
        Entry bar = new Entry();
        bar.setKey("bar".getBytes("UTF8"));
        bar.setValue("bar".getBytes("UTF8"));
        client.putForced(bar);

        byte[] key = "hello".getBytes("UTF8");
        logger.info("*** starting batch op ...");

        // start batch a new batch operation
        BatchOperation batch = client.createBatchOperation();

        // put entry 1
        Entry entry = new Entry();
        entry.setKey(key);
        entry.setValue("world".getBytes("UTF8"));
        Entry foo = new Entry();
        foo.setKey("foo".getBytes("UTF8"));
        foo.setValue("foo".getBytes("UTF8"));

        // client.putAsync(entry, null, this);
        batch.putForcedAsync(entry, this);

        // put entry 2
        Entry entry2 = new Entry();
        byte[] key2 = "hello2".getBytes("UTF8");
        entry2.setKey(key2);
        entry2.setValue("world2".getBytes("UTF8"));

        // client.putAsync(entry2, null, this);
        batch.putForcedAsync(entry2, this);
        batch.putForcedAsync(foo, this);

        // delet entry 1
        DeleteCbHandler dhandler = new DeleteCbHandler();
        // client.deleteAsync(entry, dhandler);
        batch.deleteAsync(entry, dhandler);
        batch.deleteAsync(bar, dhandler);

        // end/commit batch operation
        batch.commit();
@@ -93,25 +88,30 @@ public class BatchOperationExample implements CallbackHandler<Entry> {

        // start verifying result

        // get entry2, expect to find it
        Entry entry3 = client.get(key2);
        // get foo, expect to find it
        Entry foo1 = client.get(foo.getKey());

        // cannot be null
        if (foo1 == null) {
            throw new RuntimeException("Expect to find foo but not found");
        }

        byte[] key3 = entry3.getKey();
        byte[] key3 = foo1.getKey();
        String k = new String(key3, "UTF8");

        byte[] value3 = entry3.getValue();
        byte[] value3 = foo1.getValue();
        String v = new String(value3, "UTF8");

        logger.info("expect entry2 existed, key =" + k + ", value = "
        logger.info("expect foo existed, key =" + k + ", value = "
                + v);

        // get entry, expect to be not found
        Entry entry4 = client.get(key);
        if (entry4 != null) {
        Entry bar1 = client.get(bar.getKey());
        if (bar1 != null) {
            throw new RuntimeException("error: found deleted entry ...");
        }

        logger.info("Expect entry hello to be null, entry=" + entry4);
        logger.info("Expect entry bar to be null, entry=" + bar1);

        // close kinetic client
        client.close();
Loading