Commit db0b1b02 authored by lichenchong's avatar lichenchong
Browse files

Kinetic-test:

Add more batch op test cases.
parent 2b1a6a85
Loading
Loading
Loading
Loading
+347 −9
Original line number Diff line number Diff line
@@ -29,9 +29,11 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import kinetic.client.BatchAbortedException;
import kinetic.client.BatchOperation;
import kinetic.client.ClientConfiguration;
import kinetic.client.Entry;
import kinetic.client.EntryMetadata;
import kinetic.client.KineticClient;
import kinetic.client.KineticClientFactory;
import kinetic.client.KineticException;
@@ -66,7 +68,11 @@ import com.seagate.kinetic.proto.Kinetic.Command.Status.StatusCode;
@Test(groups = { "simulator" })
public class BatchOpAPITest extends IntegrationTestCase {

    private final int valueSize = 1024 * 1024;
    private final int MAX_VALUE_SIZE = 1024 * 1024;
    private final int MAX_KEY_SIZE = 4096;
    private final int MAX_VERSION_SIZE = 2048;
    private final int MAX_BATCH_OP_NUM = 15;
    private final int MAX_BATCH_PER_CONNECTION_NUM = 5;

    @Test(dataProvider = "transportProtocolOptions")
    public void testBatchOperation_PutsForcedAsyncSucceeds(String clientName) {
@@ -823,13 +829,13 @@ public class BatchOpAPITest extends IntegrationTestCase {
    public void testBatchOperation_PutAndDelete_WithBigValue_Succeeds(
            String clientName) {
        Entry bar = getBarEntry();
        byte[] barBigValue = ByteBuffer.allocate(valueSize).array();
        byte[] barBigValue = ByteBuffer.allocate(MAX_VALUE_SIZE).array();
        bar.setValue(barBigValue);

        Entry foo = getFooEntry();
        foo.getEntryMetadata().setVersion(null);
        byte[] newVersion = toByteArray("5678");
        byte[] fooBigValue = ByteBuffer.allocate(valueSize).array();
        byte[] fooBigValue = ByteBuffer.allocate(MAX_VALUE_SIZE).array();
        foo.setValue(fooBigValue);

        try {
@@ -2200,7 +2206,7 @@ public class BatchOpAPITest extends IntegrationTestCase {

        for (int i = 0; i < loopCount; i++) {
            byte[] fooKey = toByteArray("foo" + i);
            byte[] fooValue = ByteBuffer.allocate(valueSize).array();
            byte[] fooValue = ByteBuffer.allocate(MAX_VALUE_SIZE).array();
            byte[] fooVersion = toByteArray("v" + i);
            foo.setKey(fooKey);
            foo.setValue(fooValue);
@@ -2294,7 +2300,7 @@ public class BatchOpAPITest extends IntegrationTestCase {
        }

        for (int i = 0; i < loopCount; i++) {
            byte[] fooValue = ByteBuffer.allocate(valueSize).array();
            byte[] fooValue = ByteBuffer.allocate(MAX_VALUE_SIZE).array();
            byte[] fooVersion = toByteArray("v" + i);
            foo.setValue(fooValue);
            foo.getEntryMetadata().setVersion(fooVersion);
@@ -2358,7 +2364,7 @@ public class BatchOpAPITest extends IntegrationTestCase {
        }
    }

    @Test(dataProvider = "transportProtocolOptions", enabled = false)
    @Test(dataProvider = "transportProtocolOptions")
    public void testBatchOperationInLoop_OneClient_AbortBatch_DifferentKey(
            String clientName) {
        Entry foo = getFooEntry();
@@ -2385,7 +2391,7 @@ public class BatchOpAPITest extends IntegrationTestCase {

        for (int i = 0; i < loopCount; i++) {
            byte[] fooKey = toByteArray("foo" + i);
            byte[] fooValue = ByteBuffer.allocate(valueSize).array();
            byte[] fooValue = ByteBuffer.allocate(MAX_VALUE_SIZE).array();
            byte[] fooVersion = toByteArray("v" + i);
            foo.setKey(fooKey);
            foo.setValue(fooValue);
@@ -2463,7 +2469,7 @@ public class BatchOpAPITest extends IntegrationTestCase {
        }
    }

    @Test(dataProvider = "transportProtocolOptions", enabled = false)
    @Test(dataProvider = "transportProtocolOptions")
    public void testBatchOperationInLoop_OneClient_AbortBatch_SameKey(
            String clientName) {
        Entry foo = getFooEntry();
@@ -2479,7 +2485,7 @@ public class BatchOpAPITest extends IntegrationTestCase {
        int loopCount = 4;
        KineticClient client = getClient(clientName);
        for (int i = 0; i < loopCount; i++) {
            byte[] fooValue = ByteBuffer.allocate(valueSize).array();
            byte[] fooValue = ByteBuffer.allocate(MAX_VALUE_SIZE).array();
            byte[] fooVersion = toByteArray("v" + i);
            foo.setValue(fooValue);
            foo.getEntryMetadata().setVersion(fooVersion);
@@ -2545,6 +2551,338 @@ public class BatchOpAPITest extends IntegrationTestCase {
        }
    }

    @Test(dataProvider = "transportProtocolOptions")
    public void testBatchOperation_PutAndDeleteForced_PutFailed(
            String clientName) {
        Entry bar = getBarEntry();
        Entry foo = getFooEntry();
        byte[] newVersion = toByteArray("5678");

        try {
            cleanEntry(bar, getClient(clientName));
            cleanEntry(foo, getClient(clientName));
        } catch (KineticException e) {
            Assert.fail("Clean entry failed. " + e.getMessage());
        }

        try {
            getClient(clientName).putForced(bar);
            getClient(clientName).putForced(foo);
        } catch (KineticException e) {
            Assert.fail("Put entry failed. " + e.getMessage());
        }

        BatchOperation batch = null;
        try {
            batch = getClient(clientName).createBatchOperation();
        } catch (KineticException e) {
            Assert.fail("Create batch operation throw exception. "
                    + e.getMessage());
        }

        try {
            batch.put(new Entry(toByteArray("foo"), toByteArray("values")),
                    newVersion);
        } catch (KineticException e) {
            Assert.fail("Put entry throw exception. " + e.getMessage());
        }

        try {
            batch.delete(bar);
        } catch (KineticException e) {
            Assert.fail("Delete async throw exception. " + e.getMessage());
        }

        try {
            batch.commit();
        } catch (BatchAbortedException e) {
            assertTrue(e.getResponseMessage().getCommand().getStatus()
                    .getCode().equals(StatusCode.INVALID_BATCH));
            System.out.println(e.getFiledOperationIndex());
            assertTrue(e.getFiledOperationIndex() == 0);

        } catch (KineticException e) {
            Assert.fail("received unexpected exception: " + e.getMessage());
        }
        // get foo, expect to find it
        Entry fooGet;
        try {
            fooGet = getClient(clientName).get(foo.getKey());
            assertTrue(Arrays.equals(fooGet.getEntryMetadata().getVersion(),
                    foo.getEntryMetadata().getVersion()));
        } catch (KineticException e) {
            Assert.fail("Get foo throw exception. " + e.getMessage());
        }

        // get bar, expect to null
        Entry barGet;
        try {
            barGet = getClient(clientName).get(bar.getKey());
            assertTrue(Arrays.equals(barGet.getEntryMetadata().getVersion(),
                    bar.getEntryMetadata().getVersion()));
            assertTrue(Arrays.equals(barGet.getValue(), bar.getValue()));
        } catch (KineticException e) {
            Assert.fail("Get bar throw exception. " + e.getMessage());
        }

        try {
            cleanEntry(bar, getClient(clientName));
            cleanEntry(foo, getClient(clientName));
        } catch (KineticException e) {
            Assert.fail("Clean entry failed. " + e.getMessage());
        }
    }

    @Test(dataProvider = "transportProtocolOptions")
    public void testBatchOperation_PutAndDeleteForced_PutExceedMaximumKeySizeFailed(
            String clientName) {
        byte[] key = new byte[MAX_KEY_SIZE + 1];
        byte[] value = toByteArray("value");

        Entry entry = new Entry(key, value);

        BatchOperation batch = null;
        try {
            batch = getClient(clientName).createBatchOperation();
        } catch (KineticException e) {
            Assert.fail("Create batch operation throw exception. "
                    + e.getMessage());
        }

        try {
            batch.put(entry, toByteArray("5678"));
        } catch (KineticException e) {
            Assert.fail("Put entry throw exception. " + e.getMessage());
        }

        try {
            batch.commit();
        } catch (BatchAbortedException e) {
            assertTrue(e.getResponseMessage().getCommand().getStatus()
                    .getCode().equals(StatusCode.INVALID_BATCH));
            assertTrue(e.getFiledOperationIndex() == 0);

        } catch (KineticException e) {
            Assert.fail("received unexpected exception: " + e.getMessage());
        }
    }

    @Test(dataProvider = "transportProtocolOptions")
    public void testBatchOperation_PutAndDeleteForced_PutExceedMaximumValueSizeFailed(
            String clientName) {
        byte[] key = toByteArray("key");
        byte[] value = new byte[MAX_VALUE_SIZE + 1];

        Entry entry = new Entry(key, value);

        BatchOperation batch = null;
        try {
            batch = getClient(clientName).createBatchOperation();
        } catch (KineticException e) {
            Assert.fail("Create batch operation throw exception. "
                    + e.getMessage());
        }

        try {
            batch.put(entry, toByteArray("5678"));
        } catch (KineticException e) {
            Assert.fail("Put entry throw exception. " + e.getMessage());
        }

        try {
            batch.commit();
        } catch (BatchAbortedException e) {
            assertTrue(e.getResponseMessage().getCommand().getStatus()
                    .getCode().equals(StatusCode.INVALID_BATCH));
            assertTrue(e.getFiledOperationIndex() == 0);

        } catch (KineticException e) {
            Assert.fail("received unexpected exception: " + e.getMessage());
        }
    }

    @Test(dataProvider = "transportProtocolOptions")
    public void testBatchOperation_PutAndDeleteForced_PutExceedMaximumNewVersionSizeFailed(
            String clientName) {
        byte[] key = toByteArray("key");
        byte[] value = toByteArray("value");

        Entry entry = new Entry(key, value);
        try {
            getClient(clientName).deleteForced(key);
        } catch (KineticException e1) {
            Assert.fail("delete entry failed. " + e1.getMessage());
        }

        BatchOperation batch = null;
        try {
            batch = getClient(clientName).createBatchOperation();
        } catch (KineticException e) {
            Assert.fail("Create batch operation throw exception. "
                    + e.getMessage());
        }

        try {
            batch.put(entry, new byte[MAX_VERSION_SIZE + 1]);
        } catch (KineticException e) {
            Assert.fail("Put entry throw exception. " + e.getMessage());
        }

        try {
            batch.commit();
        } catch (BatchAbortedException e) {
            assertTrue(e.getResponseMessage().getCommand().getStatus()
                    .getCode().equals(StatusCode.INVALID_BATCH));
            assertTrue(e.getFiledOperationIndex() == 0);

        } catch (KineticException e) {
            Assert.fail("received unexpected exception: " + e.getMessage());
        }

        try {
            getClient(clientName).deleteForced(key);
        } catch (KineticException e1) {
            Assert.fail("delete entry failed. " + e1.getMessage());
        }
    }

    @Test(dataProvider = "transportProtocolOptions")
    public void testBatchOperation_PutAndDeleteForced_PutExceedMaximumDbVersionSizeFailed(
            String clientName) {
        byte[] key = toByteArray("key");
        byte[] value = toByteArray("value");
        byte[] dbVersion = new byte[MAX_VERSION_SIZE + 1];

        EntryMetadata emd = new EntryMetadata();
        emd.setVersion(dbVersion);

        Entry entry = new Entry(key, value, emd);

        try {
            getClient(clientName).deleteForced(key);
        } catch (KineticException e1) {
            Assert.fail("delete entry failed. " + e1.getMessage());
        }

        BatchOperation batch = null;
        try {
            batch = getClient(clientName).createBatchOperation();
        } catch (KineticException e) {
            Assert.fail("Create batch operation throw exception. "
                    + e.getMessage());
        }

        try {
            batch.putForced(entry);
        } catch (KineticException e) {
            Assert.fail("Put entry throw exception. " + e.getMessage());
        }

        try {
            batch.commit();
        } catch (BatchAbortedException e) {
            assertTrue(e.getResponseMessage().getCommand().getStatus()
                    .getCode().equals(StatusCode.INVALID_BATCH));
            assertTrue(e.getFiledOperationIndex() == 0);

        } catch (KineticException e) {
            Assert.fail("received unexpected exception: " + e.getMessage());
        }

        try {
            getClient(clientName).deleteForced(key);
        } catch (KineticException e1) {
            Assert.fail("delete entry failed. " + e1.getMessage());
        }
    }

    // TODO: need to trace
    @Test(dataProvider = "transportProtocolOptions", enabled = false)
    public void testBatchOperation_OperationExceedTheMaxinumNum_ThrowException(
            String clientName) {
        Entry bar = getBarEntry();
        Entry foo = getFooEntry();

        try {
            getClient(clientName).deleteForced(foo.getKey());
        } catch (KineticException e1) {
            Assert.fail("delete entry failed. " + e1.getMessage());
        }

        BatchOperation batch = null;
        try {
            for (int i = 0; i < MAX_BATCH_PER_CONNECTION_NUM + 1; i++) {
                batch = getClient(clientName).createBatchOperation();
            }
        } catch (KineticException e) {
            assertTrue(e.getMessage() != null);
        }

        try {
            batch.putForced(foo);
            batch.deleteForced(bar.getKey());
        } catch (KineticException e1) {
            assertTrue(e1.getMessage() != null);
        }

        try {
            batch.commit();
        } catch (KineticException e) {
            assertTrue(e.getMessage() != null);
            System.out.println("ke: " + e.getMessage());
        } catch (Exception e) {
            Assert.fail("Catch unexpected exception: " + e.getMessage());
        }
    }

    // TODO: need to trace
    @Test(dataProvider = "transportProtocolOptions", enabled = false)
    public void testBatchOperation_BatchNumExceedTheMaxinumNum_ThrowException(
            String clientName) {
        Entry bar = getBarEntry();
        byte[] newVersion = toByteArray("5678");

        try {
            for (int i = 0; i < MAX_BATCH_OP_NUM + 1; i++) {
                getClient(clientName).deleteForced(toByteArray("foo" + i));
            }
        } catch (KineticException e) {
            Assert.fail("Clean entry failed. " + e.getMessage());
        }

        BatchOperation batch = null;
        try {
            batch = getClient(clientName).createBatchOperation();
        } catch (KineticException e) {
            Assert.fail("Create batch operation throw exception. "
                    + e.getMessage());
        }

        try {
            for (int i = 0; i < MAX_BATCH_OP_NUM + 1; i++) {
                batch.put(new Entry(toByteArray("foo" + i),
                        toByteArray("values" + i)), newVersion);
            }
        } catch (KineticException e) {
            Assert.fail("Put entry throw exception. " + e.getMessage());
        }

        try {
            batch.delete(bar);
        } catch (KineticException e) {
            Assert.fail("Delete async throw exception. " + e.getMessage());
        }

        try {
            batch.commit();
        } catch (KineticException e) {
            assertTrue(e.getMessage() != null);
            System.out.println("ke: " + e.getMessage());
        } catch (Exception e) {
            Assert.fail("Catch unexpected exception: " + e.getMessage());
        }
    }

    @Test(dataProvider = "transportProtocolOptions")
    public void testBatchOperation_Concurrent_MultiClients_SameKey_AllSuccess(
            String clientName) {