Loading kinetic-test/src/test/java/com/seagate/kinetic/asyncAPI/BatchOpAPITest.java +380 −26 Original line number Diff line number Diff line Loading @@ -24,7 +24,6 @@ import static com.seagate.kinetic.KineticTestHelpers.toByteArray; import static org.testng.AssertJUnit.assertNull; import static org.testng.AssertJUnit.assertTrue; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.concurrent.CountDownLatch; Loading Loading @@ -72,7 +71,7 @@ import com.seagate.kinetic.proto.Kinetic.Command.Status.StatusCode; @Test(groups = { "simulator", "drive" }) public class BatchOpAPITest extends IntegrationTestCase { private final int valueSize = 1024; private final int valueSize = 1024 * 1024; @Test(dataProvider = "transportProtocolOptions") public void testBatchOperation_PutsForcedAsyncSucceeds(String clientName) { Loading Loading @@ -2246,23 +2245,388 @@ public class BatchOpAPITest extends IntegrationTestCase { } } @Test(dataProvider = "transportProtocolOptions") public void testBatchOperationInLoop_OneClient_CommitBatch_DifferentKey( String clientName) { Entry foo = getFooEntry(); Entry bar = getBarEntry(); int loopCount = 4; KineticClient client = getClient(clientName); for (int i = 0; i < loopCount; i++) { byte[] fooKey = toByteArray("foo" + i); byte[] fooValue = ByteBuffer.allocate(valueSize).array(); byte[] fooVersion = toByteArray("v" + i); foo.setKey(fooKey); foo.setValue(fooValue); foo.getEntryMetadata().setVersion(fooVersion); byte[] barKey = toByteArray("bar" + i); bar.setKey(barKey); try { client.putForced(bar); } catch (KineticException e2) { Assert.fail("Put bar throw exception: " + e2.getMessage()); } BatchOperation batch = null; try { batch = client.createBatchOperation(); } catch (KineticException e) { Assert.fail("Create batch operation failed. " + e.getMessage()); } CallbackHandler<Entry> handler = buildAsyncCallbackHandler(new KineticTestHelpers.AsyncHandler<Entry>() { @Override public void onSuccess(CallbackResult<Entry> result) { } @Override public void onError(AsyncKineticException e) { } }); try { batch.putForcedAsync(foo, handler); } catch (KineticException e1) { Assert.fail("Put async batch op throw exception. " + e1.getMessage()); } CallbackHandler<Boolean> dhandler = buildAsyncCallbackHandler(new KineticTestHelpers.AsyncHandler<Boolean>() { @Override public void onSuccess(CallbackResult<Boolean> result) { } @Override public void onError(AsyncKineticException e) { } }); try { batch.deleteAsync(bar, dhandler); } catch (KineticException e1) { Assert.fail("Delete async batch op throw exception. " + e1.getMessage()); } try { batch.commit(); } catch (KineticException e1) { Assert.fail("Batch commit throw exception. " + e1.getMessage()); } try { Entry fooGet = getClient(clientName).get(fooKey); assertTrue(Arrays.equals(fooGet.getKey(), fooKey)); assertTrue(Arrays.equals(fooGet.getValue(), fooValue)); assertTrue(Arrays.equals( fooGet.getEntryMetadata().getVersion(), fooVersion)); } catch (KineticException e1) { Assert.fail("Get foo throw exception. " + e1.getMessage()); } try { Entry barGet = getClient(clientName).get(barKey); assertNull(barGet); } catch (KineticException e1) { Assert.fail("Get bar throw exception. " + e1.getMessage()); } } } @Test(dataProvider = "transportProtocolOptions") public void testBatchOperationInLoop_OneClient_CommitBatch_SameKey( String clientName) { Entry foo = getFooEntry(); Entry bar = getBarEntry(); int loopCount = 4; KineticClient client = getClient(clientName); for (int i = 0; i < loopCount; i++) { byte[] fooValue = ByteBuffer.allocate(valueSize).array(); byte[] fooVersion = toByteArray("v" + i); foo.setValue(fooValue); foo.getEntryMetadata().setVersion(fooVersion); try { client.putForced(bar); } catch (KineticException e2) { Assert.fail("Put bar throw exception: " + e2.getMessage()); } BatchOperation batch = null; try { batch = client.createBatchOperation(); } catch (KineticException e) { Assert.fail("Create batch operation failed. " + e.getMessage()); } CallbackHandler<Entry> handler = buildAsyncCallbackHandler(new KineticTestHelpers.AsyncHandler<Entry>() { @Override public void onSuccess(CallbackResult<Entry> result) { } @Override public void onError(AsyncKineticException e) { } }); try { batch.putForcedAsync(foo, handler); } catch (KineticException e1) { Assert.fail("Put async batch op throw exception. " + e1.getMessage()); } CallbackHandler<Boolean> dhandler = buildAsyncCallbackHandler(new KineticTestHelpers.AsyncHandler<Boolean>() { @Override public void onSuccess(CallbackResult<Boolean> result) { } @Override public void onError(AsyncKineticException e) { } }); try { batch.deleteAsync(bar, dhandler); } catch (KineticException e1) { Assert.fail("Delete async batch op throw exception. " + e1.getMessage()); } try { batch.commit(); } catch (KineticException e1) { Assert.fail("Batch commit throw exception. " + e1.getMessage()); } try { Entry fooGet = getClient(clientName).get(foo.getKey()); assertTrue(Arrays.equals(fooGet.getKey(), foo.getKey())); assertTrue(Arrays.equals(fooGet.getValue(), fooValue)); assertTrue(Arrays.equals( fooGet.getEntryMetadata().getVersion(), fooVersion)); } catch (KineticException e1) { Assert.fail("Get foo throw exception. " + e1.getMessage()); } try { Entry barGet = getClient(clientName).get(bar.getKey()); assertNull(barGet); } catch (KineticException e1) { Assert.fail("Get bar throw exception. " + e1.getMessage()); } } } @Test(dataProvider = "transportProtocolOptions", enabled = false) public void testBatchOperationInLoop_OneClient_AbortBatch_DifferentKey( String clientName) { Entry foo = getFooEntry(); Entry bar = getBarEntry(); int loopCount = 4; KineticClient client = getClient(clientName); for (int i = 0; i < loopCount; i++) { byte[] fooKey = toByteArray("foo" + i); byte[] fooValue = ByteBuffer.allocate(valueSize).array(); byte[] fooVersion = toByteArray("v" + i); foo.setKey(fooKey); foo.setValue(fooValue); foo.getEntryMetadata().setVersion(fooVersion); byte[] barKey = toByteArray("bar" + i); bar.setKey(barKey); try { client.putForced(bar); } catch (KineticException e2) { Assert.fail("Put bar throw exception: " + e2.getMessage()); } BatchOperation batch = null; try { batch = client.createBatchOperation(); } catch (KineticException e) { Assert.fail("Create batch operation failed. " + e.getMessage()); } CallbackHandler<Entry> handler = buildAsyncCallbackHandler(new KineticTestHelpers.AsyncHandler<Entry>() { @Override public void onSuccess(CallbackResult<Entry> result) { } @Override public void onError(AsyncKineticException e) { } }); try { batch.putForcedAsync(foo, handler); } catch (KineticException e1) { Assert.fail("Put async batch op throw exception. " + e1.getMessage()); } CallbackHandler<Boolean> dhandler = buildAsyncCallbackHandler(new KineticTestHelpers.AsyncHandler<Boolean>() { @Override public void onSuccess(CallbackResult<Boolean> result) { } @Override public void onError(AsyncKineticException e) { } }); try { batch.deleteAsync(bar, dhandler); } catch (KineticException e1) { Assert.fail("Delete async batch op throw exception. " + e1.getMessage()); } try { batch.abort(); } catch (KineticException e1) { Assert.fail("Batch abort throw exception. " + e1.getMessage()); } try { Entry fooGet = getClient(clientName).get(fooKey); assertNull(fooGet); } catch (KineticException e1) { Assert.fail("Get foo throw exception. " + e1.getMessage()); } try { Entry barGet = getClient(clientName).get(barKey); assertTrue(Arrays.equals(barGet.getKey(), bar.getKey())); assertTrue(Arrays.equals(barGet.getValue(), bar.getValue())); assertTrue(Arrays.equals( barGet.getEntryMetadata().getVersion(), bar .getEntryMetadata().getVersion())); } catch (KineticException e1) { Assert.fail("Get bar throw exception. " + e1.getMessage()); } } } @Test(dataProvider = "transportProtocolOptions", enabled = false) public void testBatchOperation_multiClients_AllSuccess(String clientName) throws KineticException, UnsupportedEncodingException, InterruptedException { int writeThreads = 2; public void testBatchOperationInLoop_OneClient_AbortBatch_SameKey( String clientName) { Entry foo = getFooEntry(); Entry bar = getBarEntry(); int loopCount = 4; KineticClient client = getClient(clientName); for (int i = 0; i < loopCount; i++) { byte[] fooValue = ByteBuffer.allocate(valueSize).array(); byte[] fooVersion = toByteArray("v" + i); foo.setValue(fooValue); foo.getEntryMetadata().setVersion(fooVersion); try { client.putForced(bar); } catch (KineticException e2) { Assert.fail("Put bar throw exception: " + e2.getMessage()); } BatchOperation batch = null; try { batch = client.createBatchOperation(); } catch (KineticException e) { Assert.fail("Create batch operation failed. " + e.getMessage()); } CallbackHandler<Entry> handler = buildAsyncCallbackHandler(new KineticTestHelpers.AsyncHandler<Entry>() { @Override public void onSuccess(CallbackResult<Entry> result) { } @Override public void onError(AsyncKineticException e) { } }); try { batch.putForcedAsync(foo, handler); } catch (KineticException e1) { Assert.fail("Put async batch op throw exception. " + e1.getMessage()); } CallbackHandler<Boolean> dhandler = buildAsyncCallbackHandler(new KineticTestHelpers.AsyncHandler<Boolean>() { @Override public void onSuccess(CallbackResult<Boolean> result) { } @Override public void onError(AsyncKineticException e) { } }); try { batch.deleteAsync(bar, dhandler); } catch (KineticException e1) { Assert.fail("Delete async batch op throw exception. " + e1.getMessage()); } try { batch.abort(); } catch (KineticException e1) { Assert.fail("Batch abort throw exception. " + e1.getMessage()); } try { Entry fooGet = getClient(clientName).get(foo.getKey()); assertNull(fooGet); } catch (KineticException e1) { Assert.fail("Get foo throw exception. " + e1.getMessage()); } try { Entry barGet = getClient(clientName).get(bar.getKey()); assertTrue(Arrays.equals(barGet.getKey(), bar.getKey())); assertTrue(Arrays.equals(barGet.getValue(), bar.getValue())); assertTrue(Arrays.equals( barGet.getEntryMetadata().getVersion(), bar .getEntryMetadata().getVersion())); } catch (KineticException e1) { Assert.fail("Get bar throw exception. " + e1.getMessage()); } } } @Test(dataProvider = "transportProtocolOptions") public void testBatchOperation_Concurrent_MultiClients_SameKey_AllSuccess( String clientName) { int writeThreads = 5; CountDownLatch latch = new CountDownLatch(writeThreads); ExecutorService pool = Executors.newCachedThreadPool(); KineticClient kineticClient; KineticClient kineticClient = null; for (int i = 0; i < writeThreads; i++) { try { kineticClient = KineticClientFactory .createInstance(kineticClientConfigutations.get(clientName)); .createInstance(kineticClientConfigutations .get(clientName)); } catch (KineticException e) { Assert.fail("Create client throw exception. " + e.getMessage()); } pool.execute(new BatchThread(kineticClient, latch)); } // wait all threads finish try { latch.await(); } catch (InterruptedException e) { Assert.fail("latch await throw exception. " + e.getMessage()); } pool.shutdown(); } Loading Loading @@ -2314,16 +2678,14 @@ class BatchThread implements Runnable { try { kineticClient.putForced(bar); } catch (KineticException e1) { // TODO Auto-generated catch block e1.printStackTrace(); Assert.fail("Put entry failed. " + e1.getMessage()); } BatchOperation batch = null; try { batch = kineticClient.createBatchOperation(); } catch (KineticException e1) { // TODO Auto-generated catch block e1.printStackTrace(); Assert.fail("Create batch throw exception. " + e1.getMessage()); } Entry foo = new Entry(); Loading @@ -2341,21 +2703,15 @@ class BatchThread implements Runnable { @Override public void onError(AsyncKineticException e) { assertTrue(e.getResponseMessage().getCommand().getStatus() .getCode().equals(StatusCode.NOT_ATTEMPTED)); } }); try { batch.putForcedAsync(foo, handler); } catch (KineticException e1) { // TODO Auto-generated catch block e1.printStackTrace(); Assert.fail("Put entry failed. " + e1.getMessage()); } byte[] barBatchVersion = toByteArray("5678"); bar.getEntryMetadata().setVersion(barBatchVersion); CallbackHandler<Boolean> dhandler = buildAsyncCallbackHandler(new KineticTestHelpers.AsyncHandler<Boolean>() { @Override public void onSuccess(CallbackResult<Boolean> result) { Loading @@ -2367,17 +2723,15 @@ class BatchThread implements Runnable { }); try { batch.deleteAsync(bar, dhandler); batch.deleteForcedAsync(bar.getKey(), dhandler); } catch (KineticException e1) { // TODO Auto-generated catch block e1.printStackTrace(); Assert.fail("Delete entry failed. " + e1.getMessage()); } try { batch.commit(); } catch (KineticException e1) { // TODO Auto-generated catch block e1.printStackTrace(); Assert.fail("Batch commit throw exception. " + e1.getMessage()); } try { Loading Loading
kinetic-test/src/test/java/com/seagate/kinetic/asyncAPI/BatchOpAPITest.java +380 −26 Original line number Diff line number Diff line Loading @@ -24,7 +24,6 @@ import static com.seagate.kinetic.KineticTestHelpers.toByteArray; import static org.testng.AssertJUnit.assertNull; import static org.testng.AssertJUnit.assertTrue; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.concurrent.CountDownLatch; Loading Loading @@ -72,7 +71,7 @@ import com.seagate.kinetic.proto.Kinetic.Command.Status.StatusCode; @Test(groups = { "simulator", "drive" }) public class BatchOpAPITest extends IntegrationTestCase { private final int valueSize = 1024; private final int valueSize = 1024 * 1024; @Test(dataProvider = "transportProtocolOptions") public void testBatchOperation_PutsForcedAsyncSucceeds(String clientName) { Loading Loading @@ -2246,23 +2245,388 @@ public class BatchOpAPITest extends IntegrationTestCase { } } @Test(dataProvider = "transportProtocolOptions") public void testBatchOperationInLoop_OneClient_CommitBatch_DifferentKey( String clientName) { Entry foo = getFooEntry(); Entry bar = getBarEntry(); int loopCount = 4; KineticClient client = getClient(clientName); for (int i = 0; i < loopCount; i++) { byte[] fooKey = toByteArray("foo" + i); byte[] fooValue = ByteBuffer.allocate(valueSize).array(); byte[] fooVersion = toByteArray("v" + i); foo.setKey(fooKey); foo.setValue(fooValue); foo.getEntryMetadata().setVersion(fooVersion); byte[] barKey = toByteArray("bar" + i); bar.setKey(barKey); try { client.putForced(bar); } catch (KineticException e2) { Assert.fail("Put bar throw exception: " + e2.getMessage()); } BatchOperation batch = null; try { batch = client.createBatchOperation(); } catch (KineticException e) { Assert.fail("Create batch operation failed. " + e.getMessage()); } CallbackHandler<Entry> handler = buildAsyncCallbackHandler(new KineticTestHelpers.AsyncHandler<Entry>() { @Override public void onSuccess(CallbackResult<Entry> result) { } @Override public void onError(AsyncKineticException e) { } }); try { batch.putForcedAsync(foo, handler); } catch (KineticException e1) { Assert.fail("Put async batch op throw exception. " + e1.getMessage()); } CallbackHandler<Boolean> dhandler = buildAsyncCallbackHandler(new KineticTestHelpers.AsyncHandler<Boolean>() { @Override public void onSuccess(CallbackResult<Boolean> result) { } @Override public void onError(AsyncKineticException e) { } }); try { batch.deleteAsync(bar, dhandler); } catch (KineticException e1) { Assert.fail("Delete async batch op throw exception. " + e1.getMessage()); } try { batch.commit(); } catch (KineticException e1) { Assert.fail("Batch commit throw exception. " + e1.getMessage()); } try { Entry fooGet = getClient(clientName).get(fooKey); assertTrue(Arrays.equals(fooGet.getKey(), fooKey)); assertTrue(Arrays.equals(fooGet.getValue(), fooValue)); assertTrue(Arrays.equals( fooGet.getEntryMetadata().getVersion(), fooVersion)); } catch (KineticException e1) { Assert.fail("Get foo throw exception. " + e1.getMessage()); } try { Entry barGet = getClient(clientName).get(barKey); assertNull(barGet); } catch (KineticException e1) { Assert.fail("Get bar throw exception. " + e1.getMessage()); } } } @Test(dataProvider = "transportProtocolOptions") public void testBatchOperationInLoop_OneClient_CommitBatch_SameKey( String clientName) { Entry foo = getFooEntry(); Entry bar = getBarEntry(); int loopCount = 4; KineticClient client = getClient(clientName); for (int i = 0; i < loopCount; i++) { byte[] fooValue = ByteBuffer.allocate(valueSize).array(); byte[] fooVersion = toByteArray("v" + i); foo.setValue(fooValue); foo.getEntryMetadata().setVersion(fooVersion); try { client.putForced(bar); } catch (KineticException e2) { Assert.fail("Put bar throw exception: " + e2.getMessage()); } BatchOperation batch = null; try { batch = client.createBatchOperation(); } catch (KineticException e) { Assert.fail("Create batch operation failed. " + e.getMessage()); } CallbackHandler<Entry> handler = buildAsyncCallbackHandler(new KineticTestHelpers.AsyncHandler<Entry>() { @Override public void onSuccess(CallbackResult<Entry> result) { } @Override public void onError(AsyncKineticException e) { } }); try { batch.putForcedAsync(foo, handler); } catch (KineticException e1) { Assert.fail("Put async batch op throw exception. " + e1.getMessage()); } CallbackHandler<Boolean> dhandler = buildAsyncCallbackHandler(new KineticTestHelpers.AsyncHandler<Boolean>() { @Override public void onSuccess(CallbackResult<Boolean> result) { } @Override public void onError(AsyncKineticException e) { } }); try { batch.deleteAsync(bar, dhandler); } catch (KineticException e1) { Assert.fail("Delete async batch op throw exception. " + e1.getMessage()); } try { batch.commit(); } catch (KineticException e1) { Assert.fail("Batch commit throw exception. " + e1.getMessage()); } try { Entry fooGet = getClient(clientName).get(foo.getKey()); assertTrue(Arrays.equals(fooGet.getKey(), foo.getKey())); assertTrue(Arrays.equals(fooGet.getValue(), fooValue)); assertTrue(Arrays.equals( fooGet.getEntryMetadata().getVersion(), fooVersion)); } catch (KineticException e1) { Assert.fail("Get foo throw exception. " + e1.getMessage()); } try { Entry barGet = getClient(clientName).get(bar.getKey()); assertNull(barGet); } catch (KineticException e1) { Assert.fail("Get bar throw exception. " + e1.getMessage()); } } } @Test(dataProvider = "transportProtocolOptions", enabled = false) public void testBatchOperationInLoop_OneClient_AbortBatch_DifferentKey( String clientName) { Entry foo = getFooEntry(); Entry bar = getBarEntry(); int loopCount = 4; KineticClient client = getClient(clientName); for (int i = 0; i < loopCount; i++) { byte[] fooKey = toByteArray("foo" + i); byte[] fooValue = ByteBuffer.allocate(valueSize).array(); byte[] fooVersion = toByteArray("v" + i); foo.setKey(fooKey); foo.setValue(fooValue); foo.getEntryMetadata().setVersion(fooVersion); byte[] barKey = toByteArray("bar" + i); bar.setKey(barKey); try { client.putForced(bar); } catch (KineticException e2) { Assert.fail("Put bar throw exception: " + e2.getMessage()); } BatchOperation batch = null; try { batch = client.createBatchOperation(); } catch (KineticException e) { Assert.fail("Create batch operation failed. " + e.getMessage()); } CallbackHandler<Entry> handler = buildAsyncCallbackHandler(new KineticTestHelpers.AsyncHandler<Entry>() { @Override public void onSuccess(CallbackResult<Entry> result) { } @Override public void onError(AsyncKineticException e) { } }); try { batch.putForcedAsync(foo, handler); } catch (KineticException e1) { Assert.fail("Put async batch op throw exception. " + e1.getMessage()); } CallbackHandler<Boolean> dhandler = buildAsyncCallbackHandler(new KineticTestHelpers.AsyncHandler<Boolean>() { @Override public void onSuccess(CallbackResult<Boolean> result) { } @Override public void onError(AsyncKineticException e) { } }); try { batch.deleteAsync(bar, dhandler); } catch (KineticException e1) { Assert.fail("Delete async batch op throw exception. " + e1.getMessage()); } try { batch.abort(); } catch (KineticException e1) { Assert.fail("Batch abort throw exception. " + e1.getMessage()); } try { Entry fooGet = getClient(clientName).get(fooKey); assertNull(fooGet); } catch (KineticException e1) { Assert.fail("Get foo throw exception. " + e1.getMessage()); } try { Entry barGet = getClient(clientName).get(barKey); assertTrue(Arrays.equals(barGet.getKey(), bar.getKey())); assertTrue(Arrays.equals(barGet.getValue(), bar.getValue())); assertTrue(Arrays.equals( barGet.getEntryMetadata().getVersion(), bar .getEntryMetadata().getVersion())); } catch (KineticException e1) { Assert.fail("Get bar throw exception. " + e1.getMessage()); } } } @Test(dataProvider = "transportProtocolOptions", enabled = false) public void testBatchOperation_multiClients_AllSuccess(String clientName) throws KineticException, UnsupportedEncodingException, InterruptedException { int writeThreads = 2; public void testBatchOperationInLoop_OneClient_AbortBatch_SameKey( String clientName) { Entry foo = getFooEntry(); Entry bar = getBarEntry(); int loopCount = 4; KineticClient client = getClient(clientName); for (int i = 0; i < loopCount; i++) { byte[] fooValue = ByteBuffer.allocate(valueSize).array(); byte[] fooVersion = toByteArray("v" + i); foo.setValue(fooValue); foo.getEntryMetadata().setVersion(fooVersion); try { client.putForced(bar); } catch (KineticException e2) { Assert.fail("Put bar throw exception: " + e2.getMessage()); } BatchOperation batch = null; try { batch = client.createBatchOperation(); } catch (KineticException e) { Assert.fail("Create batch operation failed. " + e.getMessage()); } CallbackHandler<Entry> handler = buildAsyncCallbackHandler(new KineticTestHelpers.AsyncHandler<Entry>() { @Override public void onSuccess(CallbackResult<Entry> result) { } @Override public void onError(AsyncKineticException e) { } }); try { batch.putForcedAsync(foo, handler); } catch (KineticException e1) { Assert.fail("Put async batch op throw exception. " + e1.getMessage()); } CallbackHandler<Boolean> dhandler = buildAsyncCallbackHandler(new KineticTestHelpers.AsyncHandler<Boolean>() { @Override public void onSuccess(CallbackResult<Boolean> result) { } @Override public void onError(AsyncKineticException e) { } }); try { batch.deleteAsync(bar, dhandler); } catch (KineticException e1) { Assert.fail("Delete async batch op throw exception. " + e1.getMessage()); } try { batch.abort(); } catch (KineticException e1) { Assert.fail("Batch abort throw exception. " + e1.getMessage()); } try { Entry fooGet = getClient(clientName).get(foo.getKey()); assertNull(fooGet); } catch (KineticException e1) { Assert.fail("Get foo throw exception. " + e1.getMessage()); } try { Entry barGet = getClient(clientName).get(bar.getKey()); assertTrue(Arrays.equals(barGet.getKey(), bar.getKey())); assertTrue(Arrays.equals(barGet.getValue(), bar.getValue())); assertTrue(Arrays.equals( barGet.getEntryMetadata().getVersion(), bar .getEntryMetadata().getVersion())); } catch (KineticException e1) { Assert.fail("Get bar throw exception. " + e1.getMessage()); } } } @Test(dataProvider = "transportProtocolOptions") public void testBatchOperation_Concurrent_MultiClients_SameKey_AllSuccess( String clientName) { int writeThreads = 5; CountDownLatch latch = new CountDownLatch(writeThreads); ExecutorService pool = Executors.newCachedThreadPool(); KineticClient kineticClient; KineticClient kineticClient = null; for (int i = 0; i < writeThreads; i++) { try { kineticClient = KineticClientFactory .createInstance(kineticClientConfigutations.get(clientName)); .createInstance(kineticClientConfigutations .get(clientName)); } catch (KineticException e) { Assert.fail("Create client throw exception. " + e.getMessage()); } pool.execute(new BatchThread(kineticClient, latch)); } // wait all threads finish try { latch.await(); } catch (InterruptedException e) { Assert.fail("latch await throw exception. " + e.getMessage()); } pool.shutdown(); } Loading Loading @@ -2314,16 +2678,14 @@ class BatchThread implements Runnable { try { kineticClient.putForced(bar); } catch (KineticException e1) { // TODO Auto-generated catch block e1.printStackTrace(); Assert.fail("Put entry failed. " + e1.getMessage()); } BatchOperation batch = null; try { batch = kineticClient.createBatchOperation(); } catch (KineticException e1) { // TODO Auto-generated catch block e1.printStackTrace(); Assert.fail("Create batch throw exception. " + e1.getMessage()); } Entry foo = new Entry(); Loading @@ -2341,21 +2703,15 @@ class BatchThread implements Runnable { @Override public void onError(AsyncKineticException e) { assertTrue(e.getResponseMessage().getCommand().getStatus() .getCode().equals(StatusCode.NOT_ATTEMPTED)); } }); try { batch.putForcedAsync(foo, handler); } catch (KineticException e1) { // TODO Auto-generated catch block e1.printStackTrace(); Assert.fail("Put entry failed. " + e1.getMessage()); } byte[] barBatchVersion = toByteArray("5678"); bar.getEntryMetadata().setVersion(barBatchVersion); CallbackHandler<Boolean> dhandler = buildAsyncCallbackHandler(new KineticTestHelpers.AsyncHandler<Boolean>() { @Override public void onSuccess(CallbackResult<Boolean> result) { Loading @@ -2367,17 +2723,15 @@ class BatchThread implements Runnable { }); try { batch.deleteAsync(bar, dhandler); batch.deleteForcedAsync(bar.getKey(), dhandler); } catch (KineticException e1) { // TODO Auto-generated catch block e1.printStackTrace(); Assert.fail("Delete entry failed. " + e1.getMessage()); } try { batch.commit(); } catch (KineticException e1) { // TODO Auto-generated catch block e1.printStackTrace(); Assert.fail("Batch commit throw exception. " + e1.getMessage()); } try { Loading