Commit 7183733f authored by chiaming2000's avatar chiaming2000
Browse files

Batch operation error handling improvement:

PUT/DELETE response message status is set to NOT_ATTEMPTED when error
occurred.

END_BATCH response message status is set to INVALID_BATCH when any of
the operation in the batch failed. 
parent 13ca73a3
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -89,7 +89,7 @@ public class BatchOperationHandler {
            throws InvalidBatchException, NotAttemptedException {

        if (this.isEndBatch) {
            throw new InvalidBatchException("batch is not started");
            throw new InvalidBatchException("batch is not started or has ended");
        }

        try {
+26 −13
Original line number Diff line number Diff line
@@ -468,19 +468,8 @@ public class SimulatorEngine implements MessageService {

            checkBatchMode(kmreq);

            if (mtype == MessageType.START_BATCH
                    || mtype == MessageType.ABORT_BATCH) {
                // do nothing, simply send OK response
                ;
            } else if (kmreq.getIsBatchMessage()) {

                // init batch operation
                if (this.batchOp == null) {
                    this.initBatchOperation(kmreq, kmresp);
                }

                // process batch message
                this.batchOp.handleRequest(kmreq, kmresp);
            if (kmreq.getIsBatchMessage()) {
                this.processBatchOpMessage(kmreq, kmresp);
            } else if (kmreq.getMessage().getAuthType() == AuthType.PINAUTH) {
                // perform pin op
                PinOperationHandler.handleOperation(kmreq, kmresp, this);
@@ -1045,6 +1034,10 @@ public class SimulatorEngine implements MessageService {
            throw new InvalidBatchException("batch op already started");
        }

        if (kmreq.getCommand().getHeader().getMessageType() == MessageType.END_BATCH) {
            throw new InvalidBatchException("batch op failed");
        }

        this.isInBatchMode = true;

        // start a new batch, db is locked by this user
@@ -1091,4 +1084,24 @@ public class SimulatorEngine implements MessageService {
        }
    }

    private void processBatchOpMessage(KineticMessage kmreq,
            KineticMessage kmresp) throws InvalidBatchException,
            NotAttemptedException {

        MessageType mtype = kmreq.getCommand().getHeader().getMessageType();

        if (mtype == MessageType.START_BATCH
                || mtype == MessageType.ABORT_BATCH) {
            return;
        }

        // init batch operation
        if (this.batchOp == null) {
            this.initBatchOperation(kmreq, kmresp);
        }

        // process batch message
        this.batchOp.handleRequest(kmreq, kmresp);
    }

}
+9 −1
Original line number Diff line number Diff line
@@ -170,6 +170,7 @@ public class NioMessageServiceHandler extends
            KineticMessage request) {

        if (request.getCommand().getHeader().getMessageType() == MessageType.START_BATCH) {
            request.setIsBatchMessage(true);
            return true;
        }

@@ -225,14 +226,21 @@ public class NioMessageServiceHandler extends

        boolean flag = false;

        boolean hasBatchId = request.getCommand().getHeader().hasBatchID();
        if (hasBatchId == false) {
            return false;
        }

        String key = request.getCommand().getHeader().getConnectionID() + SEP
                + request.getCommand().getHeader().getBatchID();

        BatchQueue batchQueue = this.batchMap.get(key);

        MessageType mtype = null;

        if (batchQueue != null) {

            MessageType mtype = request.getCommand().getHeader()
            mtype = request.getCommand().getHeader()
                    .getMessageType();

            if (mtype == MessageType.PUT || mtype == MessageType.DELETE) {
+3 −0
Original line number Diff line number Diff line
@@ -65,6 +65,9 @@ public class BatchOperationAbortExample implements CallbackHandler<Entry> {

        client.putForced(bar);

        // clean up before batch op
        client.deleteForced("foo".getBytes("UTF8"));

        logger.info("*** starting batch operation ...");

        // start batch a new batch operation
+116 −0
Original line number Diff line number Diff line
/**
 * Copyright (C) 2014 Seagate Technology.
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
 */
package com.seagate.kinetic.example.batchop;

import java.io.UnsupportedEncodingException;
import java.util.logging.Level;
import java.util.logging.Logger;

import kinetic.client.AsyncKineticException;
import kinetic.client.BatchOperation;
import kinetic.client.CallbackHandler;
import kinetic.client.CallbackResult;
import kinetic.client.ClientConfiguration;
import kinetic.client.Entry;
import kinetic.client.KineticClient;
import kinetic.client.KineticClientFactory;
import kinetic.client.KineticException;

/**
 * Kinetic client batch operation usage example.
 * 
 * @author chiaming
 *
 */
public class BatchOperationFailedExample implements CallbackHandler<Entry> {

    private final static java.util.logging.Logger logger = Logger
            .getLogger(BatchOperationFailedExample.class.getName());

    public void run(String host, int port) throws KineticException,
            UnsupportedEncodingException {

        // kinetic client
        KineticClient client = null;

        try {

            // Client configuration and initialization
            ClientConfiguration clientConfig = new ClientConfiguration();

            clientConfig.setHost(host);
            clientConfig.setPort(port);

            // create client instance
            client = KineticClientFactory.createInstance(clientConfig);

            // put entry bar
            Entry bar = new Entry();
            bar.setKey("bar".getBytes("UTF8"));
            bar.setValue("bar".getBytes("UTF8"));
            bar.getEntryMetadata().setVersion("1234".getBytes("UTF8"));

            client.putForced(bar);

            logger.info("*** starting batch operation ...");

            // start batch a new batch operation
            BatchOperation batch = client.createBatchOperation();

            // put foo
            Entry foo = new Entry();
            foo.setKey("foo".getBytes("UTF8"));
            foo.setValue("foo".getBytes("UTF8"));
            foo.getEntryMetadata().setVersion("5678".getBytes("UTF8"));

            batch.putForcedAsync(foo, this);

            // delete bar
            // DeleteCbHandler dhandler = new DeleteCbHandler();
            bar.getEntryMetadata().setVersion("12341234".getBytes("UTF8"));
            batch.putAsync(bar, "".getBytes(), this);

            // end/commit batch operation
            batch.commit();

        } catch (KineticException e) {
            logger.log(Level.WARNING, e.getMessage(), e);
        } finally {
            client.close();
        }
    }

    @Override
    public void onSuccess(CallbackResult<Entry> result) {
        logger.info("put callback result received ...");
    }

    @Override
    public void onError(AsyncKineticException exception) {
        logger.log(Level.WARNING, exception.getMessage(), exception);
    }

    public static void main(String[] args) throws KineticException,
            InterruptedException, UnsupportedEncodingException {

        BatchOperationFailedExample batch = new BatchOperationFailedExample();

        batch.run("localhost", 8123);
    }

}