Commit a44438c8 authored by chiaming2000's avatar chiaming2000
Browse files

Added concurrent batch operation support (per connection).

(feature is still work in progress) 

Applications can create multiple batch operation instances in one more
many connections. Each batch operation instance SHOULD BE operated in a
single threaded context.  Or otherwise operation ordering may not be in
the sequencial order.
parent 88a52932
Loading
Loading
Loading
Loading
+17 −6
Original line number Diff line number Diff line
@@ -31,47 +31,58 @@ import kinetic.client.KineticException;
 */
public class DefaultBatchOperation implements BatchOperation {

    private static int batchIdSequence = 1;

    private int batchId = -1;

    private DefaultKineticClient client = null;

    public DefaultBatchOperation(DefaultKineticClient client)
            throws KineticException {

        this.batchId = nextBatchId();

        this.client = client;

        this.client.startBatchOperation();
        this.client.startBatchOperation(batchId);
    }

    @Override
    public void putAsync(Entry entry, byte[] newVersion,
            CallbackHandler<Entry> handler) throws KineticException {
        this.client.putAsync(entry, newVersion, handler);

        this.client.batchPutAsync(entry, newVersion, handler, batchId);
    }

    @Override
    public void putForcedAsync(Entry entry, CallbackHandler<Entry> handler)
            throws KineticException {

        this.client.putForcedAsync(entry, handler);
        this.client.batchPutForcedAsync(entry, handler, batchId);
    }

    @Override
    public void deleteAsync(Entry entry, CallbackHandler<Boolean> handler)
            throws KineticException {

        this.client.deleteAsync(entry, handler);
        this.client.batchDeleteAsync(entry, handler, batchId);
    }

    @Override
    public void deleteForcedAsync(byte[] key, CallbackHandler<Boolean> handler)
            throws KineticException {

        this.client.deleteForcedAsync(key, handler);
        this.client.batchDeleteForcedAsync(key, handler, batchId);
    }

    @Override
    public void commit() throws KineticException {

        this.client.endBatchOperation();
        this.client.endBatchOperation(batchId);
    }

    private synchronized static int nextBatchId() {
        return batchIdSequence++;
    }

}
+76 −4
Original line number Diff line number Diff line
@@ -530,6 +530,24 @@ public class DefaultKineticClient implements AdvancedKineticClient {
        this.client.requestAsync(km, handler);
    }

    public void batchPutAsync(Entry entry, byte[] newVersion,
            CallbackHandler<Entry> handler, int batchId)
            throws KineticException {

        // construct put request message
        KineticMessage km = MessageFactory.createPutRequestMessage(entry,
                newVersion);

        // proto builder
        Command.Builder command = (Command.Builder) km.getCommand();

        // set batch id
        command.getHeaderBuilder().setBatchID(batchId);

        // send request to the drive
        this.client.requestAsync(km, handler);
    }

    /**
     * {@inheritDoc}
     */
@@ -572,6 +590,20 @@ public class DefaultKineticClient implements AdvancedKineticClient {
        this.client.requestAsync(km, handler);
    }

    public void batchDeleteAsync(Entry entry, CallbackHandler<Boolean> handler,
            int batchId) throws KineticException {

        KineticMessage km = MessageFactory.createDeleteRequestMessage(entry);

        // proto builder
        Command.Builder message = (Command.Builder) km.getCommand();

        // set batch id
        message.getHeaderBuilder().setBatchID(batchId);

        this.client.requestAsync(km, handler);
    }

    /**
     * {@inheritDoc}
     */
@@ -761,6 +793,30 @@ public class DefaultKineticClient implements AdvancedKineticClient {

    }

    public void batchPutForcedAsync(Entry entry,
            CallbackHandler<Entry> handler, int batchId)
            throws KineticException {
        byte[] newVersion = null;

        if (entry.getEntryMetadata() != null) {
            newVersion = entry.getEntryMetadata().getVersion();
        }

        // construct put request message
        KineticMessage km = MessageFactory.createPutRequestMessage(entry,
                newVersion);

        Command.Builder commandBuilder = (Command.Builder) km.getCommand();

        // set batchId
        commandBuilder.getHeaderBuilder().setBatchID(batchId);

        // set force bit
        commandBuilder.getBodyBuilder().getKeyValueBuilder().setForce(true);

        this.client.requestAsync(km, handler);
    }

    /**
     * {@inheritDoc}
     */
@@ -819,6 +875,22 @@ public class DefaultKineticClient implements AdvancedKineticClient {
        this.client.requestAsync(km, handler);
    }
    
    public void batchDeleteForcedAsync(byte[] key,
            CallbackHandler<Boolean> handler, int batchId)
            throws KineticException {
        
        // create force delete request message
        KineticMessage km = MessageFactory
                .createForceDeleteRequestMessage(key);

        Command.Builder request = (Command.Builder) km.getCommand();

        request.getHeaderBuilder().setBatchID(batchId);

        // do async delete
        this.client.requestAsync(km, handler);
    }

    /**
     * {@inheritDoc}
     */
@@ -944,13 +1016,13 @@ public class DefaultKineticClient implements AdvancedKineticClient {
     * @throws KineticException
     *             if any error occurred.
     */
    void startBatchOperation() throws KineticException {
    void startBatchOperation(int batchId) throws KineticException {

        KineticMessage request = null;
        KineticMessage response = null;

        // create get request message
        request = MessageFactory.createStartBatchRequestMessage();
        request = MessageFactory.createStartBatchRequestMessage(batchId);

        // send request
        response = this.client.request(request);
@@ -965,13 +1037,13 @@ public class DefaultKineticClient implements AdvancedKineticClient {
     * @throws KineticException
     *             if any error occurred.
     */
    void endBatchOperation() throws KineticException {
    void endBatchOperation(int batchId) throws KineticException {

        KineticMessage request = null;
        KineticMessage response = null;

        // create get request message
        request = MessageFactory.createEndBatchRequestMessage();
        request = MessageFactory.createEndBatchRequestMessage(batchId);

        // send request
        response = this.client.request(request);
+6 −2
Original line number Diff line number Diff line
@@ -418,7 +418,7 @@ public class MessageFactory {
        return kineticMessage;
    }
    
    public static KineticMessage createStartBatchRequestMessage()
    public static KineticMessage createStartBatchRequestMessage(int batchId)
            throws KineticException {

        KineticMessage kineticMessage = createKineticMessageWithBuilder();
@@ -427,10 +427,12 @@ public class MessageFactory {

        request.getHeaderBuilder().setMessageType(MessageType.START_BATCH);

        request.getHeaderBuilder().setBatchID(batchId);

        return kineticMessage;
    }

    public static KineticMessage createEndBatchRequestMessage()
    public static KineticMessage createEndBatchRequestMessage(int batchId)
            throws KineticException {

        KineticMessage kineticMessage = createKineticMessageWithBuilder();
@@ -439,6 +441,8 @@ public class MessageFactory {

        request.getHeaderBuilder().setMessageType(MessageType.END_BATCH);

        request.getHeaderBuilder().setBatchID(batchId);

        return kineticMessage;
    }

+22 −0
Original line number Diff line number Diff line
@@ -46,6 +46,9 @@ public class KineticMessage {
	// set to true if traveling through TLS/SSL
	private volatile boolean isSecuredChannel = false;  

    // set to true if this is a batch message
    private volatile boolean isBatchMessage = false;

	/**
	 * Set protocol buffer message.
	 *
@@ -118,4 +121,23 @@ public class KineticMessage {
	    return this.isSecuredChannel;
	}

    /**
     * Get if this message is a batch message.
     * 
     * @return true if this is a batch message
     */
    public boolean getIsBatchMessage() {
        return this.isBatchMessage;
    }

    /**
     * Set if this message is a batch message.
     * 
     * @param flag
     *            true if this is a batch message.
     */
    public void setIsBatchMessage(boolean flag) {
        this.isBatchMessage = flag;
    }

}
+481 −169

File changed.

Preview size limit exceeded, changes collapsed.

Loading