Commit e445a82e authored by chiaming2000's avatar chiaming2000
Browse files

Java API:

Throws BatchAbortedException when a batch operation was aborted. All
operations within the batch were not committed to the persistent store.  
parent bb879f2b
Loading
Loading
Loading
Loading
+46 −5
Original line number Diff line number Diff line
@@ -27,6 +27,7 @@ import kinetic.client.ClientConfiguration;
import kinetic.client.Entry;
import kinetic.client.EntryMetadata;
import kinetic.client.EntryNotFoundException;
import kinetic.client.BatchAbortedException;
import kinetic.client.KineticException;
import kinetic.client.advanced.AdvancedKineticClient;
import kinetic.client.advanced.PersistOption;
@@ -1137,14 +1138,54 @@ public class DefaultKineticClient implements AdvancedKineticClient {
        KineticMessage request = null;
        KineticMessage response = null;

        // create get request message
        // create request message
        request = MessageFactory.createEndBatchRequestMessage(batchId, count);

        try {
            // send request
            response = this.client.request(request);

            // check response
            MessageFactory.checkReply(request, response);
        } catch (KineticException ke) {
            this.handleBatchException(ke);
        }
    }

    private void handleBatchException(KineticException ke)
            throws KineticException {

        if (ke.getResponseMessage() != null) {

            BatchAbortedException bae = null;

            String msg = ke.getResponseMessage().getCommand().getStatus()
                    .getStatusMessage();

            bae = new BatchAbortedException(msg);

            List<Long> slist = ke.getResponseMessage().getCommand().getBody()
                    .getBatch().getSequenceList();

            long fs = ke.getResponseMessage().getCommand().getBody().getBatch()
                    .getFailedSequence();
            int index = -1;
            for (int i = 0; i < slist.size(); i++) {
                if (slist.get(i) == fs) {
                    index = i;
                    break;
                }
            }

            bae.setFailedOperationIndex(index);

            bae.setRequestMessage(ke.getRequestMessage());
            bae.setResponseMessage(ke.getResponseMessage());

            // set index
            throw bae;

        } else {
            throw ke;
        }
    }

    void abortBatchOperation(int batchId) throws KineticException {
+84 −0
Original line number Diff line number Diff line
/**
 * Copyright (C) 2014 Seagate Technology.
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
 */
package kinetic.client;

/**
 * This exception indicates a batch commit (END_BATCH) was unsuccessful. All
 * commands performed within the batch were not committed to the persistent
 * store.
 * 
 * @see BatchOperation
 * @author chiaming
 *
 */
public class BatchAbortedException extends KineticException {

    private static final long serialVersionUID = 8738331797271144047L;

    private int index = -1;

    /**
     * {@inheritDoc}
     */
    public BatchAbortedException() {
        ;
    }
    
    /**
     * {@inheritDoc}
     */
    public BatchAbortedException(String message) {
        super(message);
    }
    
    /**
     * {@inheritDoc}
     */
    public BatchAbortedException(Throwable cause) {
        super(cause);
    }

    /**
     * {@inheritDoc}
     */
    public BatchAbortedException(String message, Throwable cause) {
        super(message, cause);
    }

    /**
     * Get the failed command index number starting with 0 for the first
     * command. For example, if the second command failed, it returns 1;
     * 
     * @return the failed command index number starting with 0 for the first
     *         command
     */
    public int getFiledOperationIndex() {
        return index;
    }

    /**
     * Set failed operation command index within the batch.
     * 
     * @param index
     *            failed operation command index within the batch
     */
    public void setFailedOperationIndex(int index) {
        this.index = index;
    }

}
+7 −2
Original line number Diff line number Diff line
@@ -90,9 +90,14 @@ public interface BatchOperation {
     * <p>
     * When this call returned successfully, all the commands performed in the
     * current batch are executed and committed to store successfully.
     * Otherwise, no commands in this batch were committed to the persistent
     * store.
     * 
     * @throws KineticException
     *             if any internal error occurred. The batch may or may not be
     *             committed. If committed, all commands are committed.
     *             Otherwise, no messages are committed.
     * @throws BatchAbortedException
     *             the commit failed. No messages within the batch were
     *             committed to the store.
     */
    public void commit() throws KineticException;

+15 −6
Original line number Diff line number Diff line
@@ -24,10 +24,13 @@ import java.util.logging.Logger;
import kinetic.client.BatchOperation;
import kinetic.client.ClientConfiguration;
import kinetic.client.Entry;
import kinetic.client.BatchAbortedException;
import kinetic.client.KineticClient;
import kinetic.client.KineticClientFactory;
import kinetic.client.KineticException;

import com.seagate.kinetic.proto.Kinetic.Command.Status;

/**
 * Kinetic client batch operation usage example.
 * 
@@ -73,10 +76,6 @@ public class BatchOperationFailedExample {
            // start batch a new batch operation
            BatchOperation batch = client.createBatchOperation();

            // put bar with wrong version, will fail
            bar.getEntryMetadata().setVersion("12341234".getBytes("UTF8"));
            batch.put(bar, "".getBytes());

            // put foo
            Entry foo = new Entry();
            foo.setKey("foo".getBytes("UTF8"));
@@ -85,11 +84,21 @@ public class BatchOperationFailedExample {

            batch.putForced(foo);

            // put bar with wrong version, will fail
            bar.getEntryMetadata().setVersion("12341234".getBytes("UTF8"));
            batch.put(bar, "".getBytes());

            // end/commit batch operation
            try {
                batch.commit();
            } catch (Exception e) {
                logger.info("received expected exception: " + e.getMessage());
            } catch (BatchAbortedException e) {
                // get status
                Status status = e.getResponseMessage().getCommand().getStatus();

                int index = e.getFiledOperationIndex();

                logger.info("received expected exception: " + status.getCode()
                        + ":" + status.getStatusMessage() + ", index=" + index);
            }

            Entry foo1 = client.get("foo".getBytes("UTF8"));