Commit f62feb30 authored by chiaming2000's avatar chiaming2000
Browse files

Batch operation prototype initial commit.

Please note: this is a prototype implementation and still under active development. When it is reviewed, improved, and tested, this branch will be merged into master.
parent 48b4ff0e
Loading
Loading
Loading
Loading
+77 −0
Original line number Diff line number Diff line
/**
 * Copyright (C) 2014 Seagate Technology.
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
 */
package com.seagate.kinetic.client.internal;

import kinetic.client.BatchOperation;
import kinetic.client.CallbackHandler;
import kinetic.client.Entry;
import kinetic.client.KineticException;

/**
 * Kinetic client batch operation implementation.
 * 
 * 
 * @author chiaming
 *
 */
public class DefaultBatchOperation implements BatchOperation {

    private DefaultKineticClient client = null;

    public DefaultBatchOperation(DefaultKineticClient client)
            throws KineticException {

        this.client = client;

        this.client.startBatchOperation();
    }

    @Override
    public void putAsync(Entry entry, byte[] newVersion,
            CallbackHandler<Entry> handler) throws KineticException {
        this.client.putAsync(entry, newVersion, handler);
    }

    @Override
    public void putForcedAsync(Entry entry, CallbackHandler<Entry> handler)
            throws KineticException {

        this.client.putForcedAsync(entry, handler);
    }

    @Override
    public void deleteAsync(Entry entry, CallbackHandler<Boolean> handler)
            throws KineticException {

        this.client.deleteAsync(entry, handler);
    }

    @Override
    public void deleteForcedAsync(byte[] key, CallbackHandler<Boolean> handler)
            throws KineticException {

        this.client.deleteForcedAsync(key, handler);
    }

    @Override
    public void commit() throws KineticException {

        this.client.endBatchOperation();
    }

}
+116 −0
Original line number Diff line number Diff line
package kinetic.client;

/**
 * 
 * Kinetic client batch operation interface.
 * <p>
 * 
 * @auther James Hughes
 * @author chiaming
 *
 */
public interface BatchOperation {

    /**
     * Put the versioned <code>Entry</code> asynchronously within the batch
     * operation. The command is not committed until the {@link #commit()}
     * method is invoked and returned successfully.
     * <p>
     * If the version in the specified entry does not match the version stored
     * in the persistent store, a <code>VersionMismatchException</code> is
     * delivered to the callback instance.
     * 
     * @param entry
     *            the <code>Entry</code> to be put to the persistent store.
     * 
     * @param newVersion
     *            new version for the entry.
     * 
     * @param handler
     *            callback handler for this operation.
     * @throws KineticException
     *             if any internal errors occurred.
     * 
     * @see #putForcedAsync(Entry, CallbackHandler)
     */
    public void putAsync(Entry entry, byte[] newVersion,
            CallbackHandler<Entry> handler) throws KineticException;

    /**
     * Force to put the specified <code>Entry</code> asynchronously within the
     * batch operation. Overwrite the entry in the store if existed. The command
     * is not committed until the {@link #commit()} method is invoked and
     * returned successfully.
     * 
     * @param entry
     *            the <code>Entry</code> to be put to the persistent store.
     * 
     * @param handler
     *            callback handler for this operation.
     * @throws KineticException
     *             if any internal errors occurred.
     * 
     */
    public void putForcedAsync(Entry entry, CallbackHandler<Entry> handler)
            throws KineticException;

    /**
     * Delete the entry that is associated with the key specified in the
     * <code>entry</code> asynchronously within the batch operation. The command
     * is not committed until the {@link #commit()} method is invoked and
     * returned successfully.
     * <p>
     * The version specified in the entry metadata must match the one stored in
     * the persistent storage. Otherwise, a KineticException is raised.
     * 
     * @param entry
     *            the key in the entry is used to find the associated entry.
     * 
     * @param handler
     *            the callback handler for the asynchronous delete operation.
     * 
     * @throws KineticException
     *             if any internal error occurred.
     * 
     */
    public void deleteAsync(Entry entry, CallbackHandler<Boolean> handler)
            throws KineticException;

    /**
     * Force delete the entry that is associated with the key specified in the
     * parameter asynchronously within the batch operation. The command is not
     * committed until the {@link #commit()} method is invoked and returned
     * successfully.
     * <p>
     * The entry version stored in the persistent store is ignored.
     * 
     * @param key
     *            the key in the entry is used to find the associated entry.
     * 
     * @param handler
     *            the callback handler for the asynchronous delete operation.
     * 
     * @throws KineticException
     *             if any internal error occurred.
     * 
     */
    public void deleteForcedAsync(byte[] key, CallbackHandler<Boolean> handler)
            throws KineticException;

    /**
     * Commit the current batch operation.
     * <p>
     * When this call returned successfully, all the commands performed in the
     * current batch are executed and committed to store successfully.
     * Otherwise, no commands in this batch were committed to the persistent
     * store.
     * 
     */
    public void commit() throws KineticException;

    /**
     * Abort the current batch operation.
     * 
     */
    // public void abort();
}
+721 −0

File added.

Preview size limit exceeded, changes collapsed.

+215 −0
Original line number Diff line number Diff line
/**
 * 
 * Copyright (C) 2014 Seagate Technology.
 * 
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
 *
 */
package com.seagate.kinetic.simulator.internal;

import java.io.IOException;
import java.util.ArrayList;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.google.protobuf.ByteString;
import com.seagate.kinetic.common.lib.KineticMessage;
import com.seagate.kinetic.proto.Kinetic.Command.Algorithm;
import com.seagate.kinetic.proto.Kinetic.Command.KeyValue;
import com.seagate.kinetic.proto.Kinetic.Command.MessageType;
import com.seagate.kinetic.simulator.persist.BatchOperation;
import com.seagate.kinetic.simulator.persist.KVValue;

/**
 * Batch operation handler.
 * 
 * @author chiaming
 *
 */
public class BatchOpHandler {

    private final static Logger logger = Logger.getLogger(BatchOpHandler.class
            .getName());

    private SimulatorEngine engine = null;
    private long cid = -1;

    boolean isEndBatch = false;

    private int queueDepth = 100;

    private BatchOperation<ByteString, KVValue> batch = null;

    private ArrayList<KineticMessage> list = new ArrayList<KineticMessage>();

    public BatchOpHandler(KineticMessage request, KineticMessage respond,
            SimulatorEngine engine) {

        // this batch op handler belongs to this connection
        this.cid = request.getCommand().getHeader().getConnectionID();
        // simulator engine
        this.engine = engine;
    }

    public synchronized boolean handleRequest(KineticMessage request,
            KineticMessage response)
            throws InvalidBatchException, NotAttemptedException {

        if (this.isEndBatch) {
            throw new InvalidBatchException("batch is not started");
        }

        try {

            // check if request is from the same client/connection
            if (request.getCommand().getHeader().getConnectionID() != this.cid) {
                throw new RuntimeException("DB is locked by: " + cid
                        + ", request cid: "
                        + request.getCommand().getHeader().getConnectionID());
            }

            if (request.getCommand().getHeader().getMessageType() == MessageType.END_BATCH) {
                // commit the batch
                isEndBatch = true;
                // do batch op.
                this.commitBatch();
            } else {
                // check if reached limit
                if (list.size() <= this.queueDepth) {
                    list.add(request);
                    logger.info("*** added message to batch queue ...");
                } else {

                    logger.warning("exceed max queue depth: " + this.queueDepth);

                    throw new RuntimeException("exceed max queue depth: "
                            + this.queueDepth);
                }
            }
        } catch (NotAttemptedException nae) {
            isEndBatch = true;
            throw nae;
        } catch (Exception e) {
            isEndBatch = true;
            throw new InvalidBatchException(e);
        } finally {

            if (isEndBatch) {
                // do lean up
                this.close();
            }
        }

        return isEndBatch;
    }

    @SuppressWarnings("unchecked")
    private synchronized void commitBatch() throws KVStoreException,
            IOException,
            NotAttemptedException {

        batch = engine.getStore().createBatchOperation();

        for (KineticMessage km : list) {

            ByteString key = km.getCommand().getBody().getKeyValue().getKey();

            logger.info("*** batch op entry., key = " + key);

            if (km.getCommand().getHeader().getMessageType() == MessageType.DELETE) {

                if (km.getCommand().getBody().getKeyValue().getForce() == false) {

                    boolean isVersionMatched = true;

                    // XXX: check version
                    // ByteString dbv =
                    // km.getCommand().getBody().getKeyValue().getDbVersion();
                    if (isVersionMatched == false) {
                        throw new NotAttemptedException("version mismatch");
                    }

                }

                batch.delete(key);

            } else {
                ByteString valueByteString = null;

                if (km.getValue() != null) {
                    valueByteString = ByteString.copyFrom(km.getValue());
                } else {
                    // set value to empty if null
                    valueByteString = ByteString.EMPTY;
                }

                // KV in;
                KeyValue requestKeyValue = km.getCommand().getBody()
                        .getKeyValue();

                Algorithm al = null;
                if (requestKeyValue.hasAlgorithm()) {
                    al = requestKeyValue.getAlgorithm();
                }

                KVValue data = new KVValue(requestKeyValue.getKey(),
                        requestKeyValue.getNewVersion(),
                        requestKeyValue.getTag(), al, valueByteString);

                // XXX: check version
                // ByteString dbv =
                // km.getCommand().getBody().getKeyValue().getDbVersion();
                boolean isVersionMatched = true;
                if (isVersionMatched == false) {
                    throw new NotAttemptedException("version mismatch");
                }

                // XXX: check version
                batch.put(key, data);

                logger.info("*** batch op put entry., key = " + key);
            }
        }

        // commit db batch
        batch.commit();
    }

    /**
     * close the current batch operation.
     */
    public void close() {

        try {

            if (batch != null) {
                batch.close();
            }

            // clear list
            list.clear();

            logger.info("*** batch op closed ");
        } catch (Exception e) {
            logger.log(Level.WARNING, e.getMessage(), e);
        } finally {
            this.isEndBatch = true;
        }
    }

    public synchronized boolean isClosed() {
        return this.isEndBatch;
    }
}
+51 −0
Original line number Diff line number Diff line
/**
 * 
 * Copyright (C) 2014 Seagate Technology.
 * 
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
 *
 */
package com.seagate.kinetic.simulator.internal;

public class InvalidBatchException extends Exception {

    private static final long serialVersionUID = 1559241680833563288L;

    public InvalidBatchException() {
        // TODO Auto-generated constructor stub
    }

    public InvalidBatchException(String message) {
        super(message);
        // TODO Auto-generated constructor stub
    }

    public InvalidBatchException(Throwable cause) {
        super(cause);
        // TODO Auto-generated constructor stub
    }

    public InvalidBatchException(String message, Throwable cause) {
        super(message, cause);
        // TODO Auto-generated constructor stub
    }

    public InvalidBatchException(String message, Throwable cause,
            boolean enableSuppression, boolean writableStackTrace) {
        super(message, cause, enableSuppression, writableStackTrace);
        // TODO Auto-generated constructor stub
    }

}
Loading