Loading kinetic-client/src/main/java/com/seagate/kinetic/client/io/provider/tcp/TcpTransportProvider.java +113 −124 Original line number Diff line number Diff line Loading @@ -34,10 +34,12 @@ import java.util.logging.Logger; import kinetic.client.ClientConfiguration; import kinetic.client.KineticException; import com.google.protobuf.InvalidProtocolBufferException; import com.seagate.kinetic.client.io.provider.spi.ClientMessageService; import com.seagate.kinetic.client.io.provider.spi.ClientTransportProvider; import com.seagate.kinetic.common.lib.KineticMessage; import com.seagate.kinetic.proto.Kinetic; import com.seagate.kinetic.proto.Kinetic.Command; import com.seagate.kinetic.proto.Kinetic.Message; import com.seagate.kinetic.proto.Kinetic.Message.Builder; Loading @@ -58,8 +60,6 @@ public class TcpTransportProvider implements ClientTransportProvider, Runnable { private final Logger logger = Logger .getLogger(TcpTransportProvider.class.getName()); private boolean useV2Protocol = true; // input socket read thread private Thread myThread = null; Loading Loading @@ -93,8 +93,6 @@ public class TcpTransportProvider implements ClientTransportProvider, Runnable { ClientConfiguration config = mservice.getConfiguration(); this.useV2Protocol = true; this.mservice = mservice; /** Loading @@ -114,9 +112,7 @@ public class TcpTransportProvider implements ClientTransportProvider, Runnable { is = socket.getInputStream(); if (this.useV2Protocol) { dis = new DataInputStream(is); } os = socket.getOutputStream(); Loading @@ -127,8 +123,7 @@ public class TcpTransportProvider implements ClientTransportProvider, Runnable { this.myThread.start(); logger.info("tcp-non-nio is used, use v2 protocol = " + this.useV2Protocol); logger.info("tcp-non-nio transport initialized ..."); } catch (IOException e) { Loading @@ -145,20 +140,18 @@ public class TcpTransportProvider implements ClientTransportProvider, Runnable { * {@inheritDoc} */ @Override public synchronized void write(KineticMessage im) throws IOException { public synchronized void write(KineticMessage km) throws IOException { Message.Builder message = (Builder) im.getMessage(); Message.Builder message = (Builder) km.getMessage(); if (logger.isLoggable(Level.FINEST)) { logger.finest("writing message: " + message); } if (this.useV2Protocol) { try { // get value to write separately byte[] value = im.getValue(); byte[] value = km.getValue(); // write 9 byte header ByteArrayOutputStream baos = new ByteArrayOutputStream(9); Loading Loading @@ -212,9 +205,6 @@ public class TcpTransportProvider implements ClientTransportProvider, Runnable { throw e; } } else { message.build().writeDelimitedTo(os); } } /** Loading @@ -227,16 +217,11 @@ public class TcpTransportProvider implements ClientTransportProvider, Runnable { this.isRunning = false; this.mservice.close(); this.is.close(); if (this.useV2Protocol) { this.dis.close(); } this.os.close(); this.socket.close(); logger.info("tcp non-nio transport closed, use v2 protocol=" + this.useV2Protocol); logger.info("tcp non-nio transport closed ..."); } catch (Exception e) { logger.log(Level.WARNING, e.getMessage(), e); } Loading @@ -250,16 +235,15 @@ public class TcpTransportProvider implements ClientTransportProvider, Runnable { try { Message message = null; KineticMessage im = new KineticMessage(); if (this.useV2Protocol) { KineticMessage km = new KineticMessage(); // 1. Read magic number. int magicNumber = dis.readByte(); if (magicNumber != 'F') { throw new CorruptedFrameException( "Invalid magic number: " + magicNumber); throw new CorruptedFrameException("Invalid magic number: " + magicNumber); } // 2. protobuf message size Loading Loading @@ -291,24 +275,29 @@ public class TcpTransportProvider implements ClientTransportProvider, Runnable { dis.read(attachedValue); // set to message // builder.setValue(ByteString.copyFrom(attachedValue)); im.setValue(attachedValue); km.setValue(attachedValue); } // v2 message message = builder.build(); } else { // v1 message message = Kinetic.Message.parseDelimitedFrom(is); } // set message to kietic message im.setMessage(message); km.setMessage(message); // build command Command.Builder commandBuilder = Command.newBuilder(); try { commandBuilder.mergeFrom(message.getCommandBytes()); km.setCommand(commandBuilder.build()); } catch (InvalidProtocolBufferException e) { logger.log(Level.WARNING, e.getMessage(), e); } if (logger.isLoggable(Level.FINEST)) { logger.finest("read message: " + message); } this.mservice.routeMessage(im); this.mservice.routeMessage(km); } catch (Exception e) { this.isRunning = false; } Loading kinetic-client/src/main/java/kinetic/client/ClientConfiguration.java +5 −1 Original line number Diff line number Diff line Loading @@ -256,7 +256,11 @@ public class ClientConfiguration extends Properties { * */ public void setUseNio(boolean flag) { this.useNio = flag; // XXX chiaming 07/19/2014: This will be re-enabled when it is compatible with 3.0.0 logger.warning("Method is disabled., Only NIO is supported."); //this.useNio = flag; } /** Loading kinetic-simulator/src/main/java/com/seagate/kinetic/simulator/io/provider/tcp/IoHandler.java +12 −0 Original line number Diff line number Diff line Loading @@ -32,7 +32,9 @@ import java.net.Socket; import java.util.logging.Level; import java.util.logging.Logger; import com.google.protobuf.InvalidProtocolBufferException; import com.seagate.kinetic.common.lib.KineticMessage; import com.seagate.kinetic.proto.Kinetic.Command; import com.seagate.kinetic.proto.Kinetic.Message; import com.seagate.kinetic.proto.Kinetic.Message.Builder; import com.seagate.kinetic.simulator.internal.MessageHandler; Loading Loading @@ -252,6 +254,16 @@ public class IoHandler implements Runnable { break; } // build command Command.Builder commandBuilder = Command.newBuilder(); try { commandBuilder.mergeFrom(request.getCommandBytes()); km.setCommand(commandBuilder.build()); } catch (InvalidProtocolBufferException e) { logger.log(Level.WARNING, e.getMessage(), e); } if (logger.isLoggable(Level.FINEST)) { logger.finest("received request message: " + request); } Loading kinetic-simulator/src/main/java/kinetic/simulator/SimulatorConfiguration.java +10 −2 Original line number Diff line number Diff line Loading @@ -20,12 +20,13 @@ package kinetic.simulator; import java.util.Properties; import java.util.logging.Logger; import com.seagate.kinetic.proto.Kinetic; import com.seagate.kinetic.simulator.heartbeat.HeartbeatProvider; import com.seagate.kinetic.simulator.heartbeat.provider.MulticastHeartbeatProvider; /** * * Simulator configuration instance. Loading @@ -33,6 +34,9 @@ import com.seagate.kinetic.simulator.heartbeat.provider.MulticastHeartbeatProvid */ public class SimulatorConfiguration extends Properties { private final static Logger logger = Logger.getLogger(SimulatorConfiguration.class .getName()); private static final long serialVersionUID = 1132514490479251740L; /** Loading Loading @@ -350,7 +354,11 @@ public class SimulatorConfiguration extends Properties { * */ public void setUseNio(boolean flag) { this.useNio = flag; // XXX chiaming 07/19/2014: This will be re-enabled when it is compatible with 3.0.0 logger.warning("Method is disabled., Only NIO is supported."); //this.useNio = flag; } /** Loading Loading
kinetic-client/src/main/java/com/seagate/kinetic/client/io/provider/tcp/TcpTransportProvider.java +113 −124 Original line number Diff line number Diff line Loading @@ -34,10 +34,12 @@ import java.util.logging.Logger; import kinetic.client.ClientConfiguration; import kinetic.client.KineticException; import com.google.protobuf.InvalidProtocolBufferException; import com.seagate.kinetic.client.io.provider.spi.ClientMessageService; import com.seagate.kinetic.client.io.provider.spi.ClientTransportProvider; import com.seagate.kinetic.common.lib.KineticMessage; import com.seagate.kinetic.proto.Kinetic; import com.seagate.kinetic.proto.Kinetic.Command; import com.seagate.kinetic.proto.Kinetic.Message; import com.seagate.kinetic.proto.Kinetic.Message.Builder; Loading @@ -58,8 +60,6 @@ public class TcpTransportProvider implements ClientTransportProvider, Runnable { private final Logger logger = Logger .getLogger(TcpTransportProvider.class.getName()); private boolean useV2Protocol = true; // input socket read thread private Thread myThread = null; Loading Loading @@ -93,8 +93,6 @@ public class TcpTransportProvider implements ClientTransportProvider, Runnable { ClientConfiguration config = mservice.getConfiguration(); this.useV2Protocol = true; this.mservice = mservice; /** Loading @@ -114,9 +112,7 @@ public class TcpTransportProvider implements ClientTransportProvider, Runnable { is = socket.getInputStream(); if (this.useV2Protocol) { dis = new DataInputStream(is); } os = socket.getOutputStream(); Loading @@ -127,8 +123,7 @@ public class TcpTransportProvider implements ClientTransportProvider, Runnable { this.myThread.start(); logger.info("tcp-non-nio is used, use v2 protocol = " + this.useV2Protocol); logger.info("tcp-non-nio transport initialized ..."); } catch (IOException e) { Loading @@ -145,20 +140,18 @@ public class TcpTransportProvider implements ClientTransportProvider, Runnable { * {@inheritDoc} */ @Override public synchronized void write(KineticMessage im) throws IOException { public synchronized void write(KineticMessage km) throws IOException { Message.Builder message = (Builder) im.getMessage(); Message.Builder message = (Builder) km.getMessage(); if (logger.isLoggable(Level.FINEST)) { logger.finest("writing message: " + message); } if (this.useV2Protocol) { try { // get value to write separately byte[] value = im.getValue(); byte[] value = km.getValue(); // write 9 byte header ByteArrayOutputStream baos = new ByteArrayOutputStream(9); Loading Loading @@ -212,9 +205,6 @@ public class TcpTransportProvider implements ClientTransportProvider, Runnable { throw e; } } else { message.build().writeDelimitedTo(os); } } /** Loading @@ -227,16 +217,11 @@ public class TcpTransportProvider implements ClientTransportProvider, Runnable { this.isRunning = false; this.mservice.close(); this.is.close(); if (this.useV2Protocol) { this.dis.close(); } this.os.close(); this.socket.close(); logger.info("tcp non-nio transport closed, use v2 protocol=" + this.useV2Protocol); logger.info("tcp non-nio transport closed ..."); } catch (Exception e) { logger.log(Level.WARNING, e.getMessage(), e); } Loading @@ -250,16 +235,15 @@ public class TcpTransportProvider implements ClientTransportProvider, Runnable { try { Message message = null; KineticMessage im = new KineticMessage(); if (this.useV2Protocol) { KineticMessage km = new KineticMessage(); // 1. Read magic number. int magicNumber = dis.readByte(); if (magicNumber != 'F') { throw new CorruptedFrameException( "Invalid magic number: " + magicNumber); throw new CorruptedFrameException("Invalid magic number: " + magicNumber); } // 2. protobuf message size Loading Loading @@ -291,24 +275,29 @@ public class TcpTransportProvider implements ClientTransportProvider, Runnable { dis.read(attachedValue); // set to message // builder.setValue(ByteString.copyFrom(attachedValue)); im.setValue(attachedValue); km.setValue(attachedValue); } // v2 message message = builder.build(); } else { // v1 message message = Kinetic.Message.parseDelimitedFrom(is); } // set message to kietic message im.setMessage(message); km.setMessage(message); // build command Command.Builder commandBuilder = Command.newBuilder(); try { commandBuilder.mergeFrom(message.getCommandBytes()); km.setCommand(commandBuilder.build()); } catch (InvalidProtocolBufferException e) { logger.log(Level.WARNING, e.getMessage(), e); } if (logger.isLoggable(Level.FINEST)) { logger.finest("read message: " + message); } this.mservice.routeMessage(im); this.mservice.routeMessage(km); } catch (Exception e) { this.isRunning = false; } Loading
kinetic-client/src/main/java/kinetic/client/ClientConfiguration.java +5 −1 Original line number Diff line number Diff line Loading @@ -256,7 +256,11 @@ public class ClientConfiguration extends Properties { * */ public void setUseNio(boolean flag) { this.useNio = flag; // XXX chiaming 07/19/2014: This will be re-enabled when it is compatible with 3.0.0 logger.warning("Method is disabled., Only NIO is supported."); //this.useNio = flag; } /** Loading
kinetic-simulator/src/main/java/com/seagate/kinetic/simulator/io/provider/tcp/IoHandler.java +12 −0 Original line number Diff line number Diff line Loading @@ -32,7 +32,9 @@ import java.net.Socket; import java.util.logging.Level; import java.util.logging.Logger; import com.google.protobuf.InvalidProtocolBufferException; import com.seagate.kinetic.common.lib.KineticMessage; import com.seagate.kinetic.proto.Kinetic.Command; import com.seagate.kinetic.proto.Kinetic.Message; import com.seagate.kinetic.proto.Kinetic.Message.Builder; import com.seagate.kinetic.simulator.internal.MessageHandler; Loading Loading @@ -252,6 +254,16 @@ public class IoHandler implements Runnable { break; } // build command Command.Builder commandBuilder = Command.newBuilder(); try { commandBuilder.mergeFrom(request.getCommandBytes()); km.setCommand(commandBuilder.build()); } catch (InvalidProtocolBufferException e) { logger.log(Level.WARNING, e.getMessage(), e); } if (logger.isLoggable(Level.FINEST)) { logger.finest("received request message: " + request); } Loading
kinetic-simulator/src/main/java/kinetic/simulator/SimulatorConfiguration.java +10 −2 Original line number Diff line number Diff line Loading @@ -20,12 +20,13 @@ package kinetic.simulator; import java.util.Properties; import java.util.logging.Logger; import com.seagate.kinetic.proto.Kinetic; import com.seagate.kinetic.simulator.heartbeat.HeartbeatProvider; import com.seagate.kinetic.simulator.heartbeat.provider.MulticastHeartbeatProvider; /** * * Simulator configuration instance. Loading @@ -33,6 +34,9 @@ import com.seagate.kinetic.simulator.heartbeat.provider.MulticastHeartbeatProvid */ public class SimulatorConfiguration extends Properties { private final static Logger logger = Logger.getLogger(SimulatorConfiguration.class .getName()); private static final long serialVersionUID = 1132514490479251740L; /** Loading Loading @@ -350,7 +354,11 @@ public class SimulatorConfiguration extends Properties { * */ public void setUseNio(boolean flag) { this.useNio = flag; // XXX chiaming 07/19/2014: This will be re-enabled when it is compatible with 3.0.0 logger.warning("Method is disabled., Only NIO is supported."); //this.useNio = flag; } /** Loading