Commit 0b7be784 authored by chiaming2000's avatar chiaming2000
Browse files

Initial implementation for batch operation changes based on protocol

update on version 3.0.6. Exception handling improvements will follow
after this commit.

1. Introduced no-acknowledge pattern for PUT/DELETE operations within a
batch.
2. Added Batch Message construct for END_BATCH and END_BATCH_RESPONSE
messages.

https://github.com/Seagate/kinetic-protocol/blob/features/batch-operation/kinetic.proto
parent 66a1f67b
Loading
Loading
Loading
Loading
+14 −0
Original line number Diff line number Diff line
@@ -559,6 +559,20 @@ public class ClientProxy {
        }
    }

    void requestNoAck(KineticMessage kmreq) throws KineticException {

        try {
            finalizeHeader(kmreq);
            this.iohandler.getMessageHandler().writeNoAck(kmreq);
        } catch (Exception e) {

            KineticException ke = new KineticException(e.getMessage());
            ke.setRequestMessage(kmreq);

            throw ke;
        }
    }

    /**
     * Check hmac based on the specified message.
     *
+45 −1
Original line number Diff line number Diff line
@@ -35,6 +35,9 @@ public class DefaultBatchOperation implements BatchOperation {

    private int batchId = -1;

    // operation count
    private int count = 0;

    private DefaultKineticClient client = null;

    public DefaultBatchOperation(DefaultKineticClient client)
@@ -52,6 +55,7 @@ public class DefaultBatchOperation implements BatchOperation {
            CallbackHandler<Entry> handler) throws KineticException {

        this.client.batchPutAsync(entry, newVersion, handler, batchId);
        this.count++;
    }

    @Override
@@ -59,6 +63,7 @@ public class DefaultBatchOperation implements BatchOperation {
            throws KineticException {

        this.client.batchPutForcedAsync(entry, handler, batchId);
        this.count++;
    }

    @Override
@@ -66,6 +71,7 @@ public class DefaultBatchOperation implements BatchOperation {
            throws KineticException {

        this.client.batchDeleteAsync(entry, handler, batchId);
        this.count++;
    }

    @Override
@@ -73,12 +79,13 @@ public class DefaultBatchOperation implements BatchOperation {
            throws KineticException {

        this.client.batchDeleteForcedAsync(key, handler, batchId);
        this.count++;
    }

    @Override
    public void commit() throws KineticException {

        this.client.endBatchOperation(batchId);
        this.client.endBatchOperation(batchId, count);
    }

    private synchronized static int nextBatchId() {
@@ -90,4 +97,41 @@ public class DefaultBatchOperation implements BatchOperation {
        this.client.abortBatchOperation(batchId);
    }

    @Override
    public void put(Entry entry, byte[] newVersion) throws KineticException {
        // batch forced put
        this.client.batchPut(entry, newVersion, batchId);

        // increase count
        this.count++;
    }

    @Override
    public void putForced(Entry entry) throws KineticException {
        // batch forced put no ack
        this.client.batchPutForced(entry, batchId);

        // increase count
        this.count++;
    }

    @Override
    public void delete(Entry entry) throws KineticException {
        // batch delete no ack
        this.client.batchDelete(entry, batchId);

        // increase count
        this.count++;
    }

    @Override
    public void deleteForced(byte[] key) throws KineticException {

        // batch forced delete no ack
        this.client.batchDeleteForced(key, batchId);

        // increase count
        this.count++;
    }

}
+69 −2
Original line number Diff line number Diff line
@@ -576,6 +576,46 @@ public class DefaultKineticClient implements AdvancedKineticClient {
        this.client.requestAsync(km, handler);
    }

    public void batchPut(Entry entry, byte[] newVersion, int batchId)
            throws KineticException {

        // construct put request message
        KineticMessage km = MessageFactory.createPutRequestMessage(entry,
                newVersion);

        // proto builder
        Command.Builder command = (Command.Builder) km.getCommand();

        // set batch id
        command.getHeaderBuilder().setBatchID(batchId);

        // send request to the drive
        this.client.requestNoAck(km);
    }

    public void batchPutForced(Entry entry, int batchId)
            throws KineticException {
        byte[] newVersion = null;

        if (entry.getEntryMetadata() != null) {
            newVersion = entry.getEntryMetadata().getVersion();
        }

        // construct put request message
        KineticMessage km = MessageFactory.createPutRequestMessage(entry,
                newVersion);

        Command.Builder commandBuilder = (Command.Builder) km.getCommand();

        // set batchId
        commandBuilder.getHeaderBuilder().setBatchID(batchId);

        // set force bit
        commandBuilder.getBodyBuilder().getKeyValueBuilder().setForce(true);

        this.client.requestNoAck(km);
    }

    /**
     * {@inheritDoc}
     */
@@ -632,6 +672,19 @@ public class DefaultKineticClient implements AdvancedKineticClient {
        this.client.requestAsync(km, handler);
    }

    public void batchDelete(Entry entry, int batchId) throws KineticException {

        KineticMessage km = MessageFactory.createDeleteRequestMessage(entry);

        // proto builder
        Command.Builder message = (Command.Builder) km.getCommand();

        // set batch id
        message.getHeaderBuilder().setBatchID(batchId);

        this.client.requestNoAck(km);
    }

    /**
     * {@inheritDoc}
     */
@@ -919,6 +972,20 @@ public class DefaultKineticClient implements AdvancedKineticClient {
        this.client.requestAsync(km, handler);
    }

    public void batchDeleteForced(byte[] key, int batchId)
            throws KineticException {

        // create force delete request message
        KineticMessage km = MessageFactory.createForceDeleteRequestMessage(key);

        Command.Builder request = (Command.Builder) km.getCommand();

        request.getHeaderBuilder().setBatchID(batchId);

        // do async delete
        this.client.requestNoAck(km);
    }

    /**
     * {@inheritDoc}
     */
@@ -1065,13 +1132,13 @@ public class DefaultKineticClient implements AdvancedKineticClient {
     * @throws KineticException
     *             if any error occurred.
     */
    void endBatchOperation(int batchId) throws KineticException {
    void endBatchOperation(int batchId, int count) throws KineticException {

        KineticMessage request = null;
        KineticMessage response = null;

        // create get request message
        request = MessageFactory.createEndBatchRequestMessage(batchId);
        request = MessageFactory.createEndBatchRequestMessage(batchId, count);

        // send request
        response = this.client.request(request);
+4 −1
Original line number Diff line number Diff line
@@ -447,7 +447,8 @@ public class MessageFactory {
        return kineticMessage;
    }

    public static KineticMessage createEndBatchRequestMessage(int batchId)
    public static KineticMessage createEndBatchRequestMessage(int batchId,
            int count)
            throws KineticException {

        KineticMessage kineticMessage = createKineticMessageWithBuilder();
@@ -458,6 +459,8 @@ public class MessageFactory {

        request.getHeaderBuilder().setBatchID(batchId);

        request.getBodyBuilder().getBatchBuilder().setCount(count);

        return kineticMessage;
    }

+4 −0
Original line number Diff line number Diff line
@@ -321,6 +321,10 @@ public class MessageHandler implements ClientMessageService, Runnable {
		this.doWrite(message);
	}

    public void writeNoAck(KineticMessage message) throws IOException {
        this.doWrite(message);
    }

	@SuppressWarnings("rawtypes")
	private void invokeCallbackHandler(Object cbContext, KineticMessage response) {

Loading