Commit 3c86e6cd authored by chiaming2000's avatar chiaming2000
Browse files

Refactor NioMessageServiceHandler such that it is shared by TCP/SSL

transport.

Batch operations now also work with SSL transport.
parent 400b7f12
Loading
Loading
Loading
Loading
+10 −8
Original line number Diff line number Diff line
@@ -17,7 +17,7 @@
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
 *
 */
package com.seagate.kinetic.simulator.io.provider.nio.tcp;
package com.seagate.kinetic.simulator.io.provider.nio;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
@@ -34,10 +34,6 @@ import com.seagate.kinetic.simulator.internal.ConnectionInfo;
import com.seagate.kinetic.simulator.internal.FaultInjectedCloseConnectionException;
import com.seagate.kinetic.simulator.internal.InvalidBatchException;
import com.seagate.kinetic.simulator.internal.SimulatorEngine;
import com.seagate.kinetic.simulator.io.provider.nio.BatchQueue;
import com.seagate.kinetic.simulator.io.provider.nio.NioConnectionStateManager;
import com.seagate.kinetic.simulator.io.provider.nio.NioQueuedRequestProcessRunner;
import com.seagate.kinetic.simulator.io.provider.nio.RequestProcessRunner;
import com.seagate.kinetic.simulator.io.provider.spi.MessageService;

/**
@@ -64,9 +60,14 @@ public class NioMessageServiceHandler extends
	private static boolean faultInjectCloseConnection = Boolean
			.getBoolean(FaultInjectedCloseConnectionException.FAULT_INJECT_CLOSE_CONNECTION);

	public NioMessageServiceHandler(MessageService lcservice2) {
    private boolean isSecureChannel = false;

    public NioMessageServiceHandler(MessageService lcservice2,
            boolean isSecureChannel) {
		this.lcservice = lcservice2;

        this.isSecureChannel = isSecureChannel;

		this.enforceOrdering = lcservice.getServiceConfiguration()
				.getMessageOrderingEnforced();

@@ -98,7 +99,7 @@ public class NioMessageServiceHandler extends
		}
		
		// set ssl channel flag to false
		request.setIsSecureChannel(false);
        request.setIsSecureChannel(isSecureChannel);
		
		// check if conn id is set
		NioConnectionStateManager.checkIfConnectionIdSet(ctx, request);
@@ -165,7 +166,8 @@ public class NioMessageServiceHandler extends

        this.batchMap = null;

        logger.info("connection info is removed, id=" + info.getConnectionId());
        logger.info("connection info is removed, id=" + info.getConnectionId()
                + ", is secure channel=" + this.isSecureChannel);
	}

    private boolean isStartBatch(ChannelHandlerContext ctx,
+5 −2
Original line number Diff line number Diff line
@@ -23,7 +23,6 @@ package com.seagate.kinetic.simulator.io.provider.nio.ssl;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;

import io.netty.handler.ssl.SslHandler;

import java.util.logging.Logger;
@@ -33,6 +32,7 @@ import javax.net.ssl.SSLEngine;
import com.seagate.kinetic.common.lib.TlsUtil;
import com.seagate.kinetic.common.protocol.codec.KineticDecoder;
import com.seagate.kinetic.common.protocol.codec.KineticEncoder;
import com.seagate.kinetic.simulator.io.provider.nio.NioMessageServiceHandler;
//import com.seagate.kinetic.proto.Kinetic;
import com.seagate.kinetic.simulator.io.provider.spi.MessageService;

@@ -71,7 +71,10 @@ ChannelInitializer<SocketChannel> {
			pipeline.addLast("encoder", new KineticEncoder());
		

		pipeline.addLast("handler", new SslMessageServiceHandler(lcservice));
        // pipeline.addLast("handler", new SslMessageServiceHandler(lcservice));

        pipeline.addLast("handler", new NioMessageServiceHandler(lcservice,
                true));

		logger.info("ssl nio channel initialized ... ");
	}
+0 −133
Original line number Diff line number Diff line
/**
 * 
 * Copyright (C) 2014 Seagate Technology.
 * 
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
 *
 */
package com.seagate.kinetic.simulator.io.provider.nio.ssl;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.logging.Level;
import java.util.logging.Logger;

import com.seagate.kinetic.common.lib.KineticMessage;
import com.seagate.kinetic.simulator.internal.ConnectionInfo;
import com.seagate.kinetic.simulator.internal.SimulatorEngine;

import com.seagate.kinetic.simulator.io.provider.nio.NioConnectionStateManager;
import com.seagate.kinetic.simulator.io.provider.nio.NioQueuedRequestProcessRunner;
import com.seagate.kinetic.simulator.io.provider.nio.RequestProcessRunner;
import com.seagate.kinetic.simulator.io.provider.spi.MessageService;

/**
 *
 * @author chiaming
 *
 */
public class SslMessageServiceHandler extends
		SimpleChannelInboundHandler<KineticMessage> {

	private static final Logger logger = Logger
			.getLogger(SslMessageServiceHandler.class.getName());

	private MessageService lcservice = null;

	private boolean enforceOrdering = false;

	private NioQueuedRequestProcessRunner queuedRequestProcessRunner = null;

	public SslMessageServiceHandler(MessageService lcservice2) {
		this.lcservice = lcservice2;

		this.enforceOrdering = lcservice.getServiceConfiguration()
				.getMessageOrderingEnforced();

		if (this.enforceOrdering) {
			this.queuedRequestProcessRunner = new NioQueuedRequestProcessRunner(
					lcservice);
		}
	}

	@Override
	public void channelActive(final ChannelHandlerContext ctx) throws Exception {
	    
	    // register connection info with the channel handler context
        @SuppressWarnings("unused")
        ConnectionInfo info = this.lcservice.registerNewConnection(ctx);
        
        //logger.info("TLS channel is active, connection registered., id = " + info.getConnectionId());
	}

	@Override
    protected void channelRead0(ChannelHandlerContext ctx,
            KineticMessage request) throws Exception {

	    // set secure channel flag
	    request.setIsSecureChannel(true);
	    
	    // check if client set conn id
        NioConnectionStateManager.checkIfConnectionIdSet(ctx, request);

        if (enforceOrdering) {
            // process request sequentially
            queuedRequestProcessRunner.processRequest(ctx, request);
        } else {
            // each request is independently processed
            RequestProcessRunner rpr = null;
            rpr = new RequestProcessRunner(lcservice, ctx, request);
            this.lcservice.execute(rpr);
        }
    }

	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		ctx.flush();
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
			throws Exception {

		logger.log(Level.WARNING, "Unexpected exception from downstream.",
				cause);

		// close process runner
		if (this.queuedRequestProcessRunner != null) {
			this.queuedRequestProcessRunner.close();
		}

		// close context
		ctx.close();
	}

	@Override
	public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
	    
	    // remove connection info of the channel handler context from conn info map
        @SuppressWarnings("unused")
        ConnectionInfo info = SimulatorEngine.removeConnectionInfo(ctx);
       
        //logger.info("connection info is removed, id=" + info.getConnectionId() );

		if (this.queuedRequestProcessRunner != null) {
			//logger.info("removing/closing ssl nio queued request process runner ...");
			this.queuedRequestProcessRunner.close();
		}
	}

}
+3 −2
Original line number Diff line number Diff line
@@ -28,6 +28,7 @@ import java.util.logging.Logger;

import com.seagate.kinetic.common.protocol.codec.KineticDecoder;
import com.seagate.kinetic.common.protocol.codec.KineticEncoder;
import com.seagate.kinetic.simulator.io.provider.nio.NioMessageServiceHandler;
//import com.seagate.kinetic.proto.Kinetic;
import com.seagate.kinetic.simulator.io.provider.spi.MessageService;

@@ -55,8 +56,8 @@ ChannelInitializer<SocketChannel> {
			p.addLast("encoder", new KineticEncoder());
		

		p.addLast("handler", new NioMessageServiceHandler(lcservice));
        p.addLast("handler", new NioMessageServiceHandler(lcservice, false));

		logger.info("nio channel initialized ...");
        logger.info("nio channel initialized., is secure channel=false");
	}
}
+3 −1
Original line number Diff line number Diff line
@@ -52,7 +52,9 @@ public class BatchOperationAbortExample implements CallbackHandler<Entry> {
        ClientConfiguration clientConfig = new ClientConfiguration();

        clientConfig.setHost(host);
        clientConfig.setPort(port);
        clientConfig.setUseSsl(true);
        clientConfig.setPort(8443);
        // clientConfig.setPort(port);

        // create client instance
        client = KineticClientFactory.createInstance(clientConfig);