Loading kinetic-client/src/main/java/com/seagate/kinetic/client/io/MessageHandler.java +1 −1 Original line number Diff line number Diff line Loading @@ -419,7 +419,7 @@ public class MessageHandler implements ClientMessageService, Runnable { MessageFactory.checkDeleteReply(context.getRequestMessage(), context.getResponseMessage()); } catch (KineticException e) { lce = new AsyncKineticException(lce); lce = new AsyncKineticException(e); lce.setRequestMessage(context.getRequestMessage()); lce.setResponseMessage(context.getResponseMessage()); Loading kinetic-simulator/src/main/java/com/seagate/kinetic/simulator/internal/BatchOperationHandler.java +74 −21 Original line number Diff line number Diff line Loading @@ -30,6 +30,7 @@ import com.seagate.kinetic.proto.Kinetic.Command.KeyValue; import com.seagate.kinetic.proto.Kinetic.Command.MessageType; import com.seagate.kinetic.simulator.persist.BatchOperation; import com.seagate.kinetic.simulator.persist.KVValue; import com.seagate.kinetic.simulator.persist.Store; /** * Batch operation handler. Loading @@ -45,6 +46,9 @@ public class BatchOperationHandler { @SuppressWarnings("unused") private SimulatorEngine engine = null; @SuppressWarnings("rawtypes") private Store store = null; private long cid = -1; boolean isEndBatch = false; Loading @@ -67,6 +71,10 @@ public class BatchOperationHandler { // simulator engine this.engine = engine; // store this.store = engine.getStore(); } catch (Exception e) { throw new InvalidBatchException(e); } Loading Loading @@ -103,9 +111,15 @@ public class BatchOperationHandler { + mtype); } } catch (NotAttemptedException nae) { logger.log(Level.WARNING, nae.getMessage(), nae); close(); throw nae; } catch (KVStoreException kvse) { logger.log(Level.WARNING, kvse.getMessage(), kvse); close(); throw new NotAttemptedException(kvse); } catch (Exception e) { logger.log(Level.WARNING, e.getMessage(), e); close(); throw new InvalidBatchException(e); } Loading @@ -113,27 +127,24 @@ public class BatchOperationHandler { return isEndBatch; } private void batchDelete(KineticMessage km) throws NotAttemptedException { private void batchDelete(KineticMessage km) throws KVStoreException { ByteString key = km.getCommand().getBody().getKeyValue().getKey(); if (km.getCommand().getBody().getKeyValue().getForce() == false) { boolean isVersionMatched = true; // proto request KV KeyValue requestKeyValue = km.getCommand().getBody().getKeyValue(); // XXX: check version // ByteString dbv = // km.getCommand().getBody().getKeyValue().getDbVersion(); if (isVersionMatched == false) { throw new NotAttemptedException("version mismatch"); } ByteString key = requestKeyValue.getKey(); // check version if required if (requestKeyValue.getForce() == false) { checkVersion(requestKeyValue); } batch.delete(key); } private void batchPut(KineticMessage km) throws NotAttemptedException { private void batchPut(KineticMessage km) throws KVStoreException { ByteString key = km.getCommand().getBody().getKeyValue().getKey(); Loading @@ -146,9 +157,15 @@ public class BatchOperationHandler { valueByteString = ByteString.EMPTY; } // KV in; // proto request KV KeyValue requestKeyValue = km.getCommand().getBody().getKeyValue(); // check version if required if (requestKeyValue.getForce() == false) { checkVersion(requestKeyValue); } // construct store KV Algorithm al = null; if (requestKeyValue.hasAlgorithm()) { al = requestKeyValue.getAlgorithm(); Loading @@ -158,15 +175,7 @@ public class BatchOperationHandler { requestKeyValue.getNewVersion(), requestKeyValue.getTag(), al, valueByteString); // XXX: check version // ByteString dbv = // km.getCommand().getBody().getKeyValue().getDbVersion(); boolean isVersionMatched = true; if (isVersionMatched == false) { throw new NotAttemptedException("version mismatch"); } // XXX: check version // batch put batch.put(key, data); logger.info("*** batch op put entry., key = " + key); Loading Loading @@ -203,6 +212,50 @@ public class BatchOperationHandler { } } private static void compareVersion(ByteString storeDbVersion, ByteString requestDbVersion) throws KVStoreVersionMismatch { if (mySize(storeDbVersion) != mySize(requestDbVersion)) { throw new KVStoreVersionMismatch("Length mismatch"); } if (mySize(storeDbVersion) == 0) { return; } if (!storeDbVersion.equals(requestDbVersion)) { throw new KVStoreVersionMismatch("Compare mismatch"); } } private static int mySize(ByteString s) { if (s == null) return 0; return s.size(); } private void checkVersion(KeyValue requestKeyValue) throws KVStoreException { ByteString requestDbVersion = requestKeyValue.getDbVersion(); ByteString storeDbVersion = null; ByteString key = requestKeyValue.getKey(); KVValue storeKv = (KVValue) store.get(key); if (storeKv != null) { storeDbVersion = storeKv.getVersion(); } logger.info("*********comparing version., storeV=" + storeDbVersion + "requestV=" + requestDbVersion); compareVersion(storeDbVersion, requestDbVersion); logger.info("*********batch op version checked and passed ..."); } public synchronized boolean isClosed() { return this.isEndBatch; } Loading kinetic-test/src/test/java/com/seagate/kinetic/example/batchop/BatchOperationExample.java +7 −6 Original line number Diff line number Diff line Loading @@ -61,24 +61,25 @@ public class BatchOperationExample implements CallbackHandler<Entry> { Entry bar = new Entry(); bar.setKey("bar".getBytes("UTF8")); bar.setValue("bar".getBytes("UTF8")); bar.getEntryMetadata().setVersion("1234".getBytes("UTF8")); client.putForced(bar); logger.info("*** starting batch op ..."); logger.info("*** starting batch operation ..."); // start batch a new batch operation BatchOperation batch = client.createBatchOperation(); // put entry 1 // put foo Entry foo = new Entry(); foo.setKey("foo".getBytes("UTF8")); foo.setValue("foo".getBytes("UTF8")); foo.getEntryMetadata().setVersion("5678".getBytes("UTF8")); // client.putAsync(entry, null, this); batch.putForcedAsync(foo, this); // delet entry 1 // delete bar DeleteCbHandler dhandler = new DeleteCbHandler(); // client.deleteAsync(entry, dhandler); batch.deleteAsync(bar, dhandler); // end/commit batch operation Loading Loading @@ -108,7 +109,7 @@ public class BatchOperationExample implements CallbackHandler<Entry> { // get entry, expect to be not found Entry bar1 = client.get(bar.getKey()); if (bar1 != null) { throw new RuntimeException("error: found deleted entry ..."); throw new RuntimeException("error: found deleted entry."); } logger.info("Expect entry bar to be null, entry=" + bar1); Loading Loading
kinetic-client/src/main/java/com/seagate/kinetic/client/io/MessageHandler.java +1 −1 Original line number Diff line number Diff line Loading @@ -419,7 +419,7 @@ public class MessageHandler implements ClientMessageService, Runnable { MessageFactory.checkDeleteReply(context.getRequestMessage(), context.getResponseMessage()); } catch (KineticException e) { lce = new AsyncKineticException(lce); lce = new AsyncKineticException(e); lce.setRequestMessage(context.getRequestMessage()); lce.setResponseMessage(context.getResponseMessage()); Loading
kinetic-simulator/src/main/java/com/seagate/kinetic/simulator/internal/BatchOperationHandler.java +74 −21 Original line number Diff line number Diff line Loading @@ -30,6 +30,7 @@ import com.seagate.kinetic.proto.Kinetic.Command.KeyValue; import com.seagate.kinetic.proto.Kinetic.Command.MessageType; import com.seagate.kinetic.simulator.persist.BatchOperation; import com.seagate.kinetic.simulator.persist.KVValue; import com.seagate.kinetic.simulator.persist.Store; /** * Batch operation handler. Loading @@ -45,6 +46,9 @@ public class BatchOperationHandler { @SuppressWarnings("unused") private SimulatorEngine engine = null; @SuppressWarnings("rawtypes") private Store store = null; private long cid = -1; boolean isEndBatch = false; Loading @@ -67,6 +71,10 @@ public class BatchOperationHandler { // simulator engine this.engine = engine; // store this.store = engine.getStore(); } catch (Exception e) { throw new InvalidBatchException(e); } Loading Loading @@ -103,9 +111,15 @@ public class BatchOperationHandler { + mtype); } } catch (NotAttemptedException nae) { logger.log(Level.WARNING, nae.getMessage(), nae); close(); throw nae; } catch (KVStoreException kvse) { logger.log(Level.WARNING, kvse.getMessage(), kvse); close(); throw new NotAttemptedException(kvse); } catch (Exception e) { logger.log(Level.WARNING, e.getMessage(), e); close(); throw new InvalidBatchException(e); } Loading @@ -113,27 +127,24 @@ public class BatchOperationHandler { return isEndBatch; } private void batchDelete(KineticMessage km) throws NotAttemptedException { private void batchDelete(KineticMessage km) throws KVStoreException { ByteString key = km.getCommand().getBody().getKeyValue().getKey(); if (km.getCommand().getBody().getKeyValue().getForce() == false) { boolean isVersionMatched = true; // proto request KV KeyValue requestKeyValue = km.getCommand().getBody().getKeyValue(); // XXX: check version // ByteString dbv = // km.getCommand().getBody().getKeyValue().getDbVersion(); if (isVersionMatched == false) { throw new NotAttemptedException("version mismatch"); } ByteString key = requestKeyValue.getKey(); // check version if required if (requestKeyValue.getForce() == false) { checkVersion(requestKeyValue); } batch.delete(key); } private void batchPut(KineticMessage km) throws NotAttemptedException { private void batchPut(KineticMessage km) throws KVStoreException { ByteString key = km.getCommand().getBody().getKeyValue().getKey(); Loading @@ -146,9 +157,15 @@ public class BatchOperationHandler { valueByteString = ByteString.EMPTY; } // KV in; // proto request KV KeyValue requestKeyValue = km.getCommand().getBody().getKeyValue(); // check version if required if (requestKeyValue.getForce() == false) { checkVersion(requestKeyValue); } // construct store KV Algorithm al = null; if (requestKeyValue.hasAlgorithm()) { al = requestKeyValue.getAlgorithm(); Loading @@ -158,15 +175,7 @@ public class BatchOperationHandler { requestKeyValue.getNewVersion(), requestKeyValue.getTag(), al, valueByteString); // XXX: check version // ByteString dbv = // km.getCommand().getBody().getKeyValue().getDbVersion(); boolean isVersionMatched = true; if (isVersionMatched == false) { throw new NotAttemptedException("version mismatch"); } // XXX: check version // batch put batch.put(key, data); logger.info("*** batch op put entry., key = " + key); Loading Loading @@ -203,6 +212,50 @@ public class BatchOperationHandler { } } private static void compareVersion(ByteString storeDbVersion, ByteString requestDbVersion) throws KVStoreVersionMismatch { if (mySize(storeDbVersion) != mySize(requestDbVersion)) { throw new KVStoreVersionMismatch("Length mismatch"); } if (mySize(storeDbVersion) == 0) { return; } if (!storeDbVersion.equals(requestDbVersion)) { throw new KVStoreVersionMismatch("Compare mismatch"); } } private static int mySize(ByteString s) { if (s == null) return 0; return s.size(); } private void checkVersion(KeyValue requestKeyValue) throws KVStoreException { ByteString requestDbVersion = requestKeyValue.getDbVersion(); ByteString storeDbVersion = null; ByteString key = requestKeyValue.getKey(); KVValue storeKv = (KVValue) store.get(key); if (storeKv != null) { storeDbVersion = storeKv.getVersion(); } logger.info("*********comparing version., storeV=" + storeDbVersion + "requestV=" + requestDbVersion); compareVersion(storeDbVersion, requestDbVersion); logger.info("*********batch op version checked and passed ..."); } public synchronized boolean isClosed() { return this.isEndBatch; } Loading
kinetic-test/src/test/java/com/seagate/kinetic/example/batchop/BatchOperationExample.java +7 −6 Original line number Diff line number Diff line Loading @@ -61,24 +61,25 @@ public class BatchOperationExample implements CallbackHandler<Entry> { Entry bar = new Entry(); bar.setKey("bar".getBytes("UTF8")); bar.setValue("bar".getBytes("UTF8")); bar.getEntryMetadata().setVersion("1234".getBytes("UTF8")); client.putForced(bar); logger.info("*** starting batch op ..."); logger.info("*** starting batch operation ..."); // start batch a new batch operation BatchOperation batch = client.createBatchOperation(); // put entry 1 // put foo Entry foo = new Entry(); foo.setKey("foo".getBytes("UTF8")); foo.setValue("foo".getBytes("UTF8")); foo.getEntryMetadata().setVersion("5678".getBytes("UTF8")); // client.putAsync(entry, null, this); batch.putForcedAsync(foo, this); // delet entry 1 // delete bar DeleteCbHandler dhandler = new DeleteCbHandler(); // client.deleteAsync(entry, dhandler); batch.deleteAsync(bar, dhandler); // end/commit batch operation Loading Loading @@ -108,7 +109,7 @@ public class BatchOperationExample implements CallbackHandler<Entry> { // get entry, expect to be not found Entry bar1 = client.get(bar.getKey()); if (bar1 != null) { throw new RuntimeException("error: found deleted entry ..."); throw new RuntimeException("error: found deleted entry."); } logger.info("Expect entry bar to be null, entry=" + bar1); Loading