Loading kinetic-client/src/main/java/com/seagate/kinetic/client/internal/ClientProxy.java +15 −4 Original line number Diff line number Diff line Loading @@ -91,6 +91,8 @@ public class ClientProxy { private CountDownLatch cidLatch = new CountDownLatch (1); private boolean isClosed = false; /** * Construct a new instance of client proxy * Loading @@ -99,7 +101,8 @@ public class ClientProxy { * @throws KineticException * if any internal error occurred. */ public ClientProxy(ClientConfiguration config) throws KineticException { public ClientProxy(ClientConfiguration config) throws KineticException { // client config this.config = config; Loading Loading @@ -664,11 +667,19 @@ public class ClientProxy { /** * close io handler and release associated resources. */ void close() { public synchronized void close() { if (this.isClosed) { return; } try { if (this.iohandler != null) { iohandler.close(); } } finally { this.isClosed = true; } } Loading kinetic-client/src/main/java/com/seagate/kinetic/client/io/MessageHandler.java +39 −1 Original line number Diff line number Diff line Loading @@ -132,8 +132,13 @@ public class MessageHandler implements ClientMessageService, Runnable { if (this.isStatusMessageReceived == false) { if (message.getMessage().getAuthType() == AuthType.UNSOLICITEDSTATUS) { // set cid this.client.setConnectionId(message); this.isStatusMessageReceived = true; // notify listener this.notifyListener(message); return; } else { if (this.iohandler.shouldWaitForStatusMessage()) { Loading Loading @@ -227,9 +232,42 @@ public class MessageHandler implements ClientMessageService, Runnable { this.asyncDelivered(seq); } } else { if (message.getMessage().getAuthType() == AuthType.UNSOLICITEDSTATUS) { logger.warning("received unsolicited message: " + message); /** * XXX chiaming 01/28/2015: The only possible behavior from the * drive is to close the current connection. So the API needs to * handle this accordingly. */ this.client.close(); /** * Call listener if one is set. */ this.notifyListener(message); } else { logger.warning("received unknown message: " + message); } } } private void notifyListener(KineticMessage message) { try { if (this.client.getConfiguration().getConnectionListener() != null) { // call listener this.client.getConfiguration().getConnectionListener() .onMessage(message); } } catch (Throwable t) { logger.log(Level.WARNING, t.getMessage(), t); } } public KineticMessage write(KineticMessage message) throws IOException, InterruptedException { Loading kinetic-client/src/main/java/kinetic/client/ClientConfiguration.java +32 −0 Original line number Diff line number Diff line Loading @@ -123,6 +123,9 @@ public class ClientConfiguration extends Properties { // expected wwn to connect to. private String expectedWwn = null; // connection listener private ConnectionListener listener = null; /** * Client configuration constructor. * Loading Loading @@ -543,4 +546,33 @@ public class ClientConfiguration extends Properties { return this.expectedWwn; } /** * Set connection listener to the connected Kinetic drive or simulator. * <p> * Upon received an unsolicitated status message, the Java runtime library * calls the listener if registered. * <p> * The thread execution from the Java client runtime library is serialized * such that each sub-sequential messages are delivered only after the * previous onMessage call returned. * * @param listener * connection listener to the connected service. * * @see ConnectionListener */ public void setConnectionListener(ConnectionListener listener) { this.listener = listener; } /** * Get the current registered connection listener. * * @return the current registered connection listener. Return null if not * registered. */ public ConnectionListener getConnectionListener() { return this.listener; } } kinetic-simulator/src/main/java/com/seagate/kinetic/simulator/io/provider/nio/NioMessageServiceHandler.java +50 −0 Original line number Diff line number Diff line Loading @@ -25,8 +25,13 @@ import io.netty.channel.SimpleChannelInboundHandler; import java.util.logging.Level; import java.util.logging.Logger; import com.google.protobuf.ByteString; import com.seagate.kinetic.common.lib.KineticMessage; import com.seagate.kinetic.proto.Kinetic.Command; import com.seagate.kinetic.proto.Kinetic.Command.MessageType; import com.seagate.kinetic.proto.Kinetic.Command.Status.StatusCode; import com.seagate.kinetic.proto.Kinetic.Message; import com.seagate.kinetic.proto.Kinetic.Message.AuthType; import com.seagate.kinetic.simulator.internal.ConnectionInfo; import com.seagate.kinetic.simulator.internal.FaultInjectedCloseConnectionException; import com.seagate.kinetic.simulator.internal.SimulatorEngine; Loading Loading @@ -85,6 +90,12 @@ public class NioMessageServiceHandler extends throws Exception { if (faultInjectCloseConnection) { KineticMessage response = this .createUnsolicitedStatusMessageWithBuilder(); ctx.writeAndFlush(response); throw new FaultInjectedCloseConnectionException( "Fault injected for the simulator"); } Loading Loading @@ -187,4 +198,43 @@ public class NioMessageServiceHandler extends return this.lcservice; } /** * Create an internal message with empty builder message. * * @return an internal message with empty builder message */ public static KineticMessage createUnsolicitedStatusMessageWithBuilder() { // new instance of internal message KineticMessage kineticMessage = new KineticMessage(); // new builder message Message.Builder message = Message.newBuilder(); // set to im kineticMessage.setMessage(message); // set hmac auth type message.setAuthType(AuthType.UNSOLICITEDSTATUS); // create command builder Command.Builder commandBuilder = Command.newBuilder(); commandBuilder.getStatusBuilder().setCode( StatusCode.CONNECTION_TERMINATED); commandBuilder.getStatusBuilder().setDetailedMessage( ByteString.copyFromUtf8("Connection closed")); // get command byte stirng ByteString commandByteString = commandBuilder.build().toByteString(); message.setCommandBytes(commandByteString); // set command kineticMessage.setCommand(commandBuilder); return kineticMessage; } } Loading
kinetic-client/src/main/java/com/seagate/kinetic/client/internal/ClientProxy.java +15 −4 Original line number Diff line number Diff line Loading @@ -91,6 +91,8 @@ public class ClientProxy { private CountDownLatch cidLatch = new CountDownLatch (1); private boolean isClosed = false; /** * Construct a new instance of client proxy * Loading @@ -99,7 +101,8 @@ public class ClientProxy { * @throws KineticException * if any internal error occurred. */ public ClientProxy(ClientConfiguration config) throws KineticException { public ClientProxy(ClientConfiguration config) throws KineticException { // client config this.config = config; Loading Loading @@ -664,11 +667,19 @@ public class ClientProxy { /** * close io handler and release associated resources. */ void close() { public synchronized void close() { if (this.isClosed) { return; } try { if (this.iohandler != null) { iohandler.close(); } } finally { this.isClosed = true; } } Loading
kinetic-client/src/main/java/com/seagate/kinetic/client/io/MessageHandler.java +39 −1 Original line number Diff line number Diff line Loading @@ -132,8 +132,13 @@ public class MessageHandler implements ClientMessageService, Runnable { if (this.isStatusMessageReceived == false) { if (message.getMessage().getAuthType() == AuthType.UNSOLICITEDSTATUS) { // set cid this.client.setConnectionId(message); this.isStatusMessageReceived = true; // notify listener this.notifyListener(message); return; } else { if (this.iohandler.shouldWaitForStatusMessage()) { Loading Loading @@ -227,9 +232,42 @@ public class MessageHandler implements ClientMessageService, Runnable { this.asyncDelivered(seq); } } else { if (message.getMessage().getAuthType() == AuthType.UNSOLICITEDSTATUS) { logger.warning("received unsolicited message: " + message); /** * XXX chiaming 01/28/2015: The only possible behavior from the * drive is to close the current connection. So the API needs to * handle this accordingly. */ this.client.close(); /** * Call listener if one is set. */ this.notifyListener(message); } else { logger.warning("received unknown message: " + message); } } } private void notifyListener(KineticMessage message) { try { if (this.client.getConfiguration().getConnectionListener() != null) { // call listener this.client.getConfiguration().getConnectionListener() .onMessage(message); } } catch (Throwable t) { logger.log(Level.WARNING, t.getMessage(), t); } } public KineticMessage write(KineticMessage message) throws IOException, InterruptedException { Loading
kinetic-client/src/main/java/kinetic/client/ClientConfiguration.java +32 −0 Original line number Diff line number Diff line Loading @@ -123,6 +123,9 @@ public class ClientConfiguration extends Properties { // expected wwn to connect to. private String expectedWwn = null; // connection listener private ConnectionListener listener = null; /** * Client configuration constructor. * Loading Loading @@ -543,4 +546,33 @@ public class ClientConfiguration extends Properties { return this.expectedWwn; } /** * Set connection listener to the connected Kinetic drive or simulator. * <p> * Upon received an unsolicitated status message, the Java runtime library * calls the listener if registered. * <p> * The thread execution from the Java client runtime library is serialized * such that each sub-sequential messages are delivered only after the * previous onMessage call returned. * * @param listener * connection listener to the connected service. * * @see ConnectionListener */ public void setConnectionListener(ConnectionListener listener) { this.listener = listener; } /** * Get the current registered connection listener. * * @return the current registered connection listener. Return null if not * registered. */ public ConnectionListener getConnectionListener() { return this.listener; } }
kinetic-simulator/src/main/java/com/seagate/kinetic/simulator/io/provider/nio/NioMessageServiceHandler.java +50 −0 Original line number Diff line number Diff line Loading @@ -25,8 +25,13 @@ import io.netty.channel.SimpleChannelInboundHandler; import java.util.logging.Level; import java.util.logging.Logger; import com.google.protobuf.ByteString; import com.seagate.kinetic.common.lib.KineticMessage; import com.seagate.kinetic.proto.Kinetic.Command; import com.seagate.kinetic.proto.Kinetic.Command.MessageType; import com.seagate.kinetic.proto.Kinetic.Command.Status.StatusCode; import com.seagate.kinetic.proto.Kinetic.Message; import com.seagate.kinetic.proto.Kinetic.Message.AuthType; import com.seagate.kinetic.simulator.internal.ConnectionInfo; import com.seagate.kinetic.simulator.internal.FaultInjectedCloseConnectionException; import com.seagate.kinetic.simulator.internal.SimulatorEngine; Loading Loading @@ -85,6 +90,12 @@ public class NioMessageServiceHandler extends throws Exception { if (faultInjectCloseConnection) { KineticMessage response = this .createUnsolicitedStatusMessageWithBuilder(); ctx.writeAndFlush(response); throw new FaultInjectedCloseConnectionException( "Fault injected for the simulator"); } Loading Loading @@ -187,4 +198,43 @@ public class NioMessageServiceHandler extends return this.lcservice; } /** * Create an internal message with empty builder message. * * @return an internal message with empty builder message */ public static KineticMessage createUnsolicitedStatusMessageWithBuilder() { // new instance of internal message KineticMessage kineticMessage = new KineticMessage(); // new builder message Message.Builder message = Message.newBuilder(); // set to im kineticMessage.setMessage(message); // set hmac auth type message.setAuthType(AuthType.UNSOLICITEDSTATUS); // create command builder Command.Builder commandBuilder = Command.newBuilder(); commandBuilder.getStatusBuilder().setCode( StatusCode.CONNECTION_TERMINATED); commandBuilder.getStatusBuilder().setDetailedMessage( ByteString.copyFromUtf8("Connection closed")); // get command byte stirng ByteString commandByteString = commandBuilder.build().toByteString(); message.setCommandBytes(commandByteString); // set command kineticMessage.setCommand(commandBuilder); return kineticMessage; } }