Commit 02f7c84b authored by lichenchong's avatar lichenchong
Browse files

Merge branch 'batch-operation' of https://github.com/Seagate/kinetic-java.git into batch-operation

parents bc0325e0 37187a0b
Loading
Loading
Loading
Loading
+41 −65
Original line number Diff line number Diff line
@@ -48,15 +48,11 @@ import com.seagate.kinetic.proto.Kinetic.Local;
import com.seagate.kinetic.proto.Kinetic.Message;
import com.seagate.kinetic.proto.Kinetic.Message.AuthType;
import com.seagate.kinetic.simulator.heartbeat.Heartbeat;
import com.seagate.kinetic.simulator.internal.p2p.P2POperationHandler;
import com.seagate.kinetic.simulator.internal.handler.CommandManager;
import com.seagate.kinetic.simulator.io.provider.nio.NioEventLoopGroupManager;
import com.seagate.kinetic.simulator.io.provider.spi.MessageService;
import com.seagate.kinetic.simulator.io.provider.spi.TransportProvider;
import com.seagate.kinetic.simulator.lib.HeaderOp;
import com.seagate.kinetic.simulator.lib.HmacStore;
import com.seagate.kinetic.simulator.lib.SetupInfo;
import com.seagate.kinetic.simulator.persist.KVOp;
import com.seagate.kinetic.simulator.persist.RangeOp;
import com.seagate.kinetic.simulator.persist.Store;
import com.seagate.kinetic.simulator.persist.StoreFactory;
import com.seagate.kinetic.simulator.utility.ConfigurationUtil;
@@ -139,7 +135,7 @@ public class SimulatorEngine implements MessageService {

    private static final String SSL_NIO_TRANSPORT = "com.seagate.kinetic.simulator.io.provider.nio.ssl.SslNioTransportProvider";

    private P2POperationHandler p2pHandler = null;
    // private P2POperationHandler p2pHandler = null;

    // batch op handler
    private BatchOperationHandler batchOp = null;
@@ -182,6 +178,8 @@ public class SimulatorEngine implements MessageService {
        Runtime.getRuntime().addShutdownHook(shutdownHook);
    }

    private CommandManager manager = null;

    /**
     * Simulator constructor.
     *
@@ -203,7 +201,7 @@ public class SimulatorEngine implements MessageService {
        tpService.register(this);

        // p2p op handler.
        p2pHandler = new P2POperationHandler();
        // p2pHandler = new P2POperationHandler();

        try {

@@ -222,6 +220,9 @@ public class SimulatorEngine implements MessageService {
            // init network io service
            this.initIoService();

            // init op handlers
            this.initHandlers();

            logger.info("simulator protocol version = "
                    + SimulatorConfiguration.getProtocolVersion());

@@ -309,6 +310,32 @@ public class SimulatorEngine implements MessageService {
        }
    }

    private void initHandlers() {

        this.manager = new CommandManager(this);

        // this.pinOpHandler = new PinOpHandler();
        // this.pinOpHandler.init(this);
        //
        // this.flushHandler = new FlushOpHandler();
        // this.flushHandler.init(this);
        //
        // this.noOpHandler = new NoOpHandler();
        // this.noOpHandler.init(this);
        //
        // this.keyValueOpHandler = new KeyValueOpHandler();
        // this.keyValueOpHandler.init(this);
        //
        // this.rangeOpHandler = new RangeOpHandler();
        // this.rangeOpHandler.init(this);
        //
        // this.securityOpHandler = new SecurityOpHandler();
        // this.securityOpHandler.init(this);
        //
        // this.setupOpHandler = new SetupOpHandler();
        // this.setupOpHandler.init(this);
    }

    public boolean useNio() {
        return this.config.getUseNio();
    }
@@ -377,11 +404,12 @@ public class SimulatorEngine implements MessageService {

        tpService.deregister(this);

        // close p2p handler
        if (this.p2pHandler != null) {
            this.p2pHandler.close();
        // close handlers
        if (this.manager != null) {
            this.manager.close();
        }

        // close io resources
        if (this.nioManager != null) {
            this.nioManager.close();
        }
@@ -436,7 +464,6 @@ public class SimulatorEngine implements MessageService {
    }

    @Override
    @SuppressWarnings("unchecked")
    public KineticMessage processRequest(KineticMessage kmreq) {

        // create response message
@@ -470,61 +497,10 @@ public class SimulatorEngine implements MessageService {

            if (kmreq.getIsBatchMessage()) {
                this.processBatchOpMessage(kmreq, kmresp);
            } else if (kmreq.getMessage().getAuthType() == AuthType.PINAUTH) {
                // perform pin op
                PinOperationHandler.handleOperation(kmreq, kmresp, this);
            } else if (mtype == MessageType.FLUSHALLDATA) {
                commandBuilder.getHeaderBuilder().setMessageType(
                        MessageType.FLUSHALLDATA_RESPONSE);
                logger.warning("received flush data command, this is a no op on simulator at this time ...");
            } else if (mtype == MessageType.NOOP) {
                commandBuilder.getHeaderBuilder().setMessageType(
                        MessageType.NOOP_RESPONSE);
            } else if (kmreq.getCommand().getBody().hasKeyValue()) {
                KVOp.Op(aclmap, store, kmreq, kmresp);
            } else if (mtype == MessageType.GETKEYRANGE) {
                RangeOp.operation(store, kmreq, kmresp, aclmap);
            } else if (kmreq.getCommand().getBody().hasSecurity()) {
                boolean hasPermission = SecurityHandler.checkPermission(kmreq,
                        kmresp, aclmap);
                if (hasPermission) {
                    synchronized (this.hmacKeyMap) {
                        SecurityHandler.handleSecurity(kmreq, kmresp, this);
                        this.hmacKeyMap = HmacStore.getHmacKeyMap(aclmap);
                    }
                }

            } else if (kmreq.getCommand().getBody().hasSetup()) {
                boolean hasPermission = SetupHandler.checkPermission(kmreq,
                        kmresp, aclmap);
                if (hasPermission) {
                    SetupInfo setupInfo = SetupHandler.handleSetup(kmreq,
                            kmresp, store, kineticHome);
                    if (setupInfo != null) {
                        this.clusterVersion = setupInfo.getClusterVersion();
                        // this.pin = setupInfo.getPin();
                    }
                }
            } else if (kmreq.getCommand().getBody().hasGetLog()) {
                boolean hasPermission = GetLogHandler.checkPermission(kmreq,
                        kmresp, aclmap);
                if (hasPermission) {
                    GetLogHandler.handleGetLog(this, kmreq, kmresp);
                }
            } else if (kmreq.getCommand().getBody().hasP2POperation()) {

                // check permission
                boolean hasPermission = P2POperationHandler.checkPermission(
                        kmreq, kmresp, aclmap);

                if (hasPermission) {
                    this.p2pHandler.push(aclmap, store, kmreq, kmresp);
                }
            } else if (mtype == MessageType.MEDIASCAN) {
                BackGroundOpHandler.mediaScan(kmreq, kmresp, this);
            } else if (mtype == MessageType.MEDIAOPTIMIZE) {
                BackGroundOpHandler.mediaOptimize(kmreq, kmresp, this);
            } else {
                this.manager.getHandler(mtype).processRequest(kmreq, kmresp);
            }

        } catch (DeviceLockedException ire) {

            int number = kmreq.getCommand().getHeader().getMessageType()
+73 −0
Original line number Diff line number Diff line
/**
 * 
 * Copyright (C) 2014 Seagate Technology.
 * 
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
 *
 */
package com.seagate.kinetic.simulator.internal.handler;

import com.seagate.kinetic.common.lib.KineticMessage;
import com.seagate.kinetic.simulator.internal.SimulatorEngine;

/**
 * 
 * Command handler life-cycle and service interface.
 * <p>
 * A operation command handler must provide a no-arg constructor and implements
 * this interface.
 * <p>
 * After a handler is instantiated, the {@link #init(SimulatorEngine)} method is
 * called with the current instance of <code>SiumlatorEngine</code> engine.
 * <p>
 * The handler's {@link #close()} method is called when the simulator is
 * shutdown.
 * 
 * @author chiaming
 *
 */
public interface CommandHandler {

    /**
     * 
     * Initialize this command handler. This is called immediately after a
     * command handler is instantiated.
     * 
     * @param engine
     *            the associated simulator engine.
     */
    public void init(SimulatorEngine engine);

    /**
     * Provide the service for the operation command.
     * 
     * @param request
     *            the request message.
     * @param response
     *            the response message.
     * @throws ServiceException
     *             if any internal error occurred.
     */
    public void processRequest(KineticMessage request, KineticMessage response)
            throws ServiceException;

    /**
     * Close the command handler. All resources associated with this handler are
     * released.
     * <p>
     * This is invoked when the simulator is shutdown.
     */
    public void close();
}
+45 −0
Original line number Diff line number Diff line
/**
 * 
 * Copyright (C) 2014 Seagate Technology.
 * 
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
 *
 */
package com.seagate.kinetic.simulator.internal.handler;

import com.seagate.kinetic.common.lib.KineticMessage;
import com.seagate.kinetic.simulator.internal.SimulatorEngine;

public class CommandHandlerBase implements CommandHandler {

    protected SimulatorEngine engine = null;

    @Override
    public void init(SimulatorEngine engine) {
        this.engine = engine;
    }

    @Override
    public void processRequest(KineticMessage request, KineticMessage response)
            throws ServiceException {
        ;
    }

    @Override
    public void close() {
        ;
    }

}
+128 −0
Original line number Diff line number Diff line
/**
 * 
 * Copyright (C) 2014 Seagate Technology.
 * 
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
 *
 */
package com.seagate.kinetic.simulator.internal.handler;

import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;

import com.seagate.kinetic.proto.Kinetic.Command.MessageType;
import com.seagate.kinetic.simulator.internal.SimulatorEngine;

/**
 * The command manager serves as a dispatcher to route each request to its
 * associated handler to process the request command.
 * 
 * @see CommandHandler
 * 
 * @author chiaming
 *
 */
public class CommandManager {

    private final static Logger logger = Logger.getLogger(CommandManager.class
            .getName());

    // simulator engine.
    private SimulatorEngine engine = null;

    // handler map.
    Map<MessageType, CommandHandler> handlerMap = new HashMap<MessageType, CommandHandler>();

    /**
     * Constructor to create this instance.
     * 
     * @param engine
     *            the current simulator engine.
     */
    public CommandManager(SimulatorEngine engine) {
        this.engine = engine;

        // initialize the handlers
        this.init();
    }

    /**
     * Initialize the service handlers.
     */
    private void init() {

        handlerMap.put(MessageType.NOOP, new NoOpHandler());
        handlerMap.put(MessageType.FLUSHALLDATA, new FlushOpHandler());
        handlerMap.put(MessageType.PINOP, new PinOpHandler());

        CommandHandler kvHandler = new KeyValueOpHandler();
        handlerMap.put(MessageType.PUT, kvHandler);
        handlerMap.put(MessageType.GET, kvHandler);
        handlerMap.put(MessageType.DELETE, kvHandler);
        handlerMap.put(MessageType.GETNEXT, kvHandler);
        handlerMap.put(MessageType.GETPREVIOUS, kvHandler);

        handlerMap.put(MessageType.GETKEYRANGE, new RangeOpHandler());

        this.handlerMap.put(MessageType.SECURITY, new SecurityOpHandler());

        this.handlerMap.put(MessageType.SETUP, new SetupOpHandler());

        this.handlerMap.put(MessageType.GETLOG, new GetLogOpHandler());

        this.handlerMap.put(MessageType.PEER2PEERPUSH, new P2POpHandler());

        this.handlerMap.put(MessageType.MEDIASCAN, new MediaScanOpHandler());

        this.handlerMap.put(MessageType.MEDIAOPTIMIZE,
                new MediaOptimizeOpHandler());

        for (CommandHandler handler : handlerMap.values()) {
            handler.init(engine);
        }
    }

    /**
     * Get the associated command handler based the specified message type.
     * 
     * @param mtype
     *            the request message type.
     * @return the command handler for the specified message type.
     */
    public CommandHandler getHandler(MessageType mtype) {

        // get handler instance from map.
        CommandHandler handler = this.handlerMap.get(mtype);

        // use a no-op handler if no handler is associated with the request.
        if (handler == null) {
            handler = this.handlerMap.get(MessageType.NOOP);
            logger.warning("No handler found for the requested message type: "
                    + mtype + ", no op is performed.");
        }

        return handler;
    }

    /**
     * Close all handlers associated with this simulator.
     */
    public void close() {
        for (CommandHandler handler : handlerMap.values()) {
            handler.close();
        }
    }
}
+33 −0
Original line number Diff line number Diff line
/**
 * 
 * Copyright (C) 2014 Seagate Technology.
 * 
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
 *
 */
package com.seagate.kinetic.simulator.internal.handler;

import com.seagate.kinetic.common.lib.KineticMessage;

public class FlushOpHandler extends CommandHandlerBase implements
        CommandHandler {

    @Override
    public void processRequest(KineticMessage request, KineticMessage response)
            throws ServiceException {
        ;
    }

}
Loading