Commit 35712cc5 authored by chiaming2000's avatar chiaming2000
Browse files

Media optimization (compaction) preliminary implementation with leveldb

store.
parent a6cbe589
Loading
Loading
Loading
Loading
+13 −2
Original line number Diff line number Diff line
@@ -23,11 +23,10 @@ import java.util.logging.Logger;

import kinetic.client.KineticException;

import com.google.protobuf.ByteString;
import com.seagate.kinetic.common.lib.KineticMessage;
import com.seagate.kinetic.proto.Kinetic.Command;

import com.seagate.kinetic.proto.Kinetic.Command.MessageType;

import com.seagate.kinetic.proto.Kinetic.Command.Security.ACL.Permission;
import com.seagate.kinetic.proto.Kinetic.Command.Status.StatusCode;

@@ -97,6 +96,7 @@ public abstract class BackGroundOpHandler {
        }
    }
    
    @SuppressWarnings("unchecked")
    public static void mediaOptimize(KineticMessage request,
            KineticMessage respond, SimulatorEngine engine)
            throws KVStoreException, KineticException {
@@ -127,6 +127,17 @@ public abstract class BackGroundOpHandler {
             *  The following statements are for testing purpose only
             */
            
            // get start key
            ByteString startKey = request.getCommand().getBody().getRange()
                    .getStartKey();

            // get end key
            ByteString endKey = request.getCommand().getBody().getRange()
                    .getEndKey();

            // ask store to do media compaction
            engine.getStore().compactRange(startKey, endKey);

            // set endkey in response
            commandBuilder
                    .getBodyBuilder()
+41 −6
Original line number Diff line number Diff line
@@ -26,6 +26,7 @@ import java.util.logging.Level;
import java.util.logging.Logger;

import com.seagate.kinetic.common.lib.KineticMessage;
import com.seagate.kinetic.proto.Kinetic.Command.MessageType;
import com.seagate.kinetic.simulator.internal.ConnectionInfo;
import com.seagate.kinetic.simulator.internal.FaultInjectedCloseConnectionException;
import com.seagate.kinetic.simulator.internal.SimulatorEngine;
@@ -107,14 +108,48 @@ public class NioMessageServiceHandler extends
            KineticMessage request) throws InterruptedException {

        if (enforceOrdering) {
            if (this.shouldProcessRequestAsync(request)) {
                // process request async
                this.processRequestAsync(ctx, request);
            } else {
                // process request sequentially
                queuedRequestProcessRunner.processRequest(ctx, request);
            }
        } else {
            this.processRequestAsync(ctx, request);
        }
	}

    /**
     * Process request asynchronously. The calling thread does not wait for the
     * request to be processed and returns immediately.
     * 
     * @param ctx
     * @param request
     * @throws InterruptedException
     */
    private void processRequestAsync(ChannelHandlerContext ctx,
            KineticMessage request) throws InterruptedException {

        // each request is independently processed
        RequestProcessRunner rpr = null;
        rpr = new RequestProcessRunner(lcservice, ctx, request);
        this.lcservice.execute(rpr);

        logger.info("***** request processed asynchronously ....");
    }

    private boolean shouldProcessRequestAsync(KineticMessage request) {
        boolean flag = false;

        MessageType mtype = request.getCommand().getHeader().getMessageType();

        if (mtype == MessageType.MEDIAOPTIMIZE
                || mtype == MessageType.MEDIASCAN) {
            flag = true;
        }

        return flag;
    }

	@Override
+11 −1
Original line number Diff line number Diff line
@@ -247,4 +247,14 @@ public interface Store<K, O, V> {
     *             if any internal error occurred.
     */
    public void flush() throws KVStoreException;

    /**
     * Force a compaction of the specified key range.
     * 
     * @param startKey
     *            if null then compaction start from the first key
     * @param endKey
     *            if null then compaction ends at the last key
     */
    public void compactRange(K startKey, K endKey) throws KVStoreException;
}
+7 −0
Original line number Diff line number Diff line
@@ -156,4 +156,11 @@ public class BdbStore implements Store<ByteString, ByteString, KVValue> {
        logger.warning("flush is not implemented for bdb");
    }

    @Override
    public void compactRange(ByteString startKey, ByteString endKey)
            throws KVStoreException {
        // TODO Auto-generated method stub
        logger.warning("method is not implemented for bdb");
    }

}
+6 −0
Original line number Diff line number Diff line
@@ -490,4 +490,10 @@ public class KyotoCabinet implements Store<ByteString, ByteString, KVValue> {
        logger.warning("Flush is not yet implemented.");
    }

    @Override
    public void compactRange(ByteString startKey, ByteString endKey)
            throws KVStoreException {
        logger.warning("method is not yet implemented.");
    }

}
Loading