Commit cb828254 authored by lichenchong's avatar lichenchong
Browse files

Add performance stress concurrent batch operation test cases.

parent 7c6a6c6b
Loading
Loading
Loading
Loading
+2655 −0

File added.

Preview size limit exceeded, changes collapsed.

+272 −0
Original line number Diff line number Diff line
/**
 * 
 * Copyright (C) 2014 Seagate Technology.
 * 
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
 *
 */
package com.seagate.kinetic.batchOp;

import static com.seagate.kinetic.KineticTestHelpers.toByteArray;
import static org.testng.AssertJUnit.assertTrue;
import kinetic.client.BatchAbortedException;
import kinetic.client.BatchOperation;
import kinetic.client.Entry;
import kinetic.client.EntryMetadata;
import kinetic.client.KineticClient;
import kinetic.client.KineticClientFactory;
import kinetic.client.KineticException;

import org.testng.Assert;
import org.testng.annotations.Test;

import com.seagate.kinetic.IntegrationTestCase;
import com.seagate.kinetic.proto.Kinetic.Command.Status.StatusCode;

@Test(groups = { "simulator" })
public class BatchBoundaryTest extends IntegrationTestCase {
	private final int MAX_VALUE_SIZE = 1024 * 1024;
	private final int MAX_KEY_SIZE = 4096;
	private final int MAX_VERSION_SIZE = 2048;
	private final int MAX_BATCH_OP_NUM = 15;
	private final int MAX_BATCH_PER_CONNECTION_NUM = 5;

	@Test(dataProvider = "transportProtocolOptions")
	public void testBatchOperation_PutAndDeleteForced_PutExceedMaximumKeySizeFailed(
			String clientName) {
		byte[] key = new byte[MAX_KEY_SIZE + 1];
		byte[] value = toByteArray("value");

		Entry entry = new Entry(key, value);

		BatchOperation batch = null;
		try {
			batch = getClient(clientName).createBatchOperation();
		} catch (KineticException e) {
			Assert.fail("Create batch operation throw exception. "
					+ e.getMessage());
		}

		try {
			batch.put(entry, toByteArray("5678"));
		} catch (KineticException e) {
			Assert.fail("Put entry throw exception. " + e.getMessage());
		}

		try {
			batch.commit();
		} catch (BatchAbortedException e) {
			assertTrue(e.getResponseMessage().getCommand().getStatus()
					.getCode().equals(StatusCode.INVALID_BATCH));
			assertTrue(e.getFailedOperationIndex() == 0);

		} catch (KineticException e) {
			Assert.fail("received unexpected exception: " + e.getMessage());
		}
	}

	@Test(dataProvider = "transportProtocolOptions")
	public void testBatchOperation_PutAndDeleteForced_PutExceedMaximumValueSizeFailed(
			String clientName) {
		byte[] key = toByteArray("key");
		byte[] value = new byte[MAX_VALUE_SIZE + 1];

		Entry entry = new Entry(key, value);

		BatchOperation batch = null;
		try {
			batch = getClient(clientName).createBatchOperation();
		} catch (KineticException e) {
			Assert.fail("Create batch operation throw exception. "
					+ e.getMessage());
		}

		try {
			batch.put(entry, toByteArray("5678"));
		} catch (KineticException e) {
			Assert.fail("Put entry throw exception. " + e.getMessage());
		}

		try {
			batch.commit();
		} catch (BatchAbortedException e) {
			assertTrue(e.getResponseMessage().getCommand().getStatus()
					.getCode().equals(StatusCode.INVALID_BATCH));
			assertTrue(e.getFailedOperationIndex() == 0);

		} catch (KineticException e) {
			Assert.fail("received unexpected exception: " + e.getMessage());
		}
	}

	@Test(dataProvider = "transportProtocolOptions")
	public void testBatchOperation_PutAndDeleteForced_PutExceedMaximumDbVersionSizeFailed(
			String clientName) {
		byte[] key = toByteArray("key");
		byte[] value = toByteArray("value");
		byte[] dbVersion = new byte[MAX_VERSION_SIZE + 1];

		EntryMetadata emd = new EntryMetadata();
		emd.setVersion(dbVersion);

		Entry entry = new Entry(key, value, emd);

		try {
			getClient(clientName).deleteForced(key);
		} catch (KineticException e1) {
			Assert.fail("delete entry failed. " + e1.getMessage());
		}

		BatchOperation batch = null;
		try {
			batch = getClient(clientName).createBatchOperation();
		} catch (KineticException e) {
			Assert.fail("Create batch operation throw exception. "
					+ e.getMessage());
		}

		try {
			batch.putForced(entry);
		} catch (KineticException e) {
			Assert.fail("Put entry throw exception. " + e.getMessage());
		}

		try {
			batch.commit();
		} catch (BatchAbortedException e) {
			e.printStackTrace();
			System.out.println(e.getFailedOperationIndex());
			assertTrue(e.getResponseMessage().getCommand().getStatus()
					.getCode().equals(StatusCode.INVALID_BATCH));
			assertTrue(e.getFailedOperationIndex() == 0);

		} catch (KineticException e) {
			Assert.fail("received unexpected exception: " + e.getMessage());
		}

		try {
			getClient(clientName).deleteForced(key);
		} catch (KineticException e1) {
			Assert.fail("delete entry failed. " + e1.getMessage());
		}
	}

	@Test(dataProvider = "transportProtocolOptions")
	public void testBatchOperation_PutAndDeleteForced_PutExceedMaximumNewVersionSizeFailed(
			String clientName) {
		byte[] key = toByteArray("key");
		byte[] value = toByteArray("value");

		Entry entry = new Entry(key, value);
		try {
			getClient(clientName).deleteForced(key);
		} catch (KineticException e1) {
			Assert.fail("delete entry failed. " + e1.getMessage());
		}

		BatchOperation batch = null;
		try {
			batch = getClient(clientName).createBatchOperation();
		} catch (KineticException e) {
			Assert.fail("Create batch operation throw exception. "
					+ e.getMessage());
		}

		try {
			batch.put(entry, new byte[MAX_VERSION_SIZE + 1]);
		} catch (KineticException e) {
			Assert.fail("Put entry throw exception. " + e.getMessage());
		}

		try {
			batch.commit();
		} catch (BatchAbortedException e) {
			assertTrue(e.getResponseMessage().getCommand().getStatus()
					.getCode().equals(StatusCode.INVALID_BATCH));
			assertTrue(e.getFailedOperationIndex() == 0);

		} catch (KineticException e) {
			Assert.fail("received unexpected exception: " + e.getMessage());
		}

		try {
			getClient(clientName).deleteForced(key);
		} catch (KineticException e1) {
			Assert.fail("delete entry failed. " + e1.getMessage());
		}
	}

	@Test(dataProvider = "transportProtocolOptions", enabled = true, priority = 1)
	public void testBatchOperation_BatchCountExceedTheMaxinumNum_ThrowException(
			String clientName) {
		KineticClient kineticClient = creatClient(clientName);
		assertTrue(kineticClient != null);

		Entry foo = getFooEntry();

		try {
			kineticClient.deleteForced(foo.getKey());
		} catch (KineticException e1) {
			Assert.fail("delete entry failed. " + e1.getMessage());
		}

		BatchOperation batch[] = new BatchOperation[MAX_BATCH_PER_CONNECTION_NUM + 1];
		try {
			for (int i = 0; i < MAX_BATCH_PER_CONNECTION_NUM + 1; i++) {
				batch[i] = kineticClient.createBatchOperation();
			}
		} catch (KineticException e) {
			assertTrue(e.getMessage() != null);
		}
	}

	private KineticClient creatClient(String clientName) {
		KineticClient kineticClient = null;
		try {
			kineticClient = KineticClientFactory
					.createInstance(kineticClientConfigutations.get(clientName));
		} catch (KineticException e) {
			Assert.fail("Create client throw exception. " + e.getMessage());
		}
		return kineticClient;
	}

	@Test(dataProvider = "transportProtocolOptions", enabled = true, priority = 2)
	public void testBatchOperation_OperationExceedTheMaxinumNumPerBatch_ThrowException(
			String clientName) {
		KineticClient kineticClient = creatClient(clientName);
		assertTrue(kineticClient != null);

		try {
			for (int i = 0; i < MAX_BATCH_OP_NUM + 1; i++) {
				kineticClient.deleteForced(toByteArray("foo" + i));
			}
		} catch (KineticException e) {
			Assert.fail("Clean entry failed. " + e.getMessage());
		}
	}

	private Entry getFooEntry() {
		Entry foo = new Entry();
		byte[] fooKey = toByteArray("foo");
		foo.setKey(fooKey);
		byte[] fooValue = toByteArray("foovalue");
		foo.setValue(fooValue);
		byte[] fooVersion = toByteArray("1234");
		foo.getEntryMetadata().setVersion(fooVersion);

		return foo;
	}
}
+154 −0
Original line number Diff line number Diff line
/**
 * 
 * Copyright (C) 2014 Seagate Technology.
 * 
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
 *
 */
package com.seagate.kinetic.batchOp;

import static com.seagate.kinetic.KineticTestHelpers.toByteArray;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import kinetic.client.BatchOperation;
import kinetic.client.Entry;
import kinetic.client.KineticClient;
import kinetic.client.KineticClientFactory;
import kinetic.client.KineticException;

import org.testng.Assert;
import org.testng.annotations.Test;

import com.seagate.kinetic.IntegrationTestCase;

@Test(groups = { "simulator" })
public class BatchConcurrentTest extends IntegrationTestCase {
	@Test(dataProvider = "transportProtocolOptions")
	public void testBatchOperation_Concurrent_MultiClients_SameKey_AllSuccess(
			String clientName) {
		int writeThreads = 5;
		CountDownLatch latch = new CountDownLatch(writeThreads);
		ExecutorService pool = Executors.newCachedThreadPool();

		KineticClient kineticClient = null;
		for (int i = 0; i < writeThreads; i++) {
			try {
				kineticClient = KineticClientFactory
						.createInstance(kineticClientConfigutations
								.get(clientName));
			} catch (KineticException e) {
				Assert.fail("Create client throw exception. " + e.getMessage());
			}
			pool.execute(new BatchThread(kineticClient, latch));
		}

		// wait all threads finish
		try {
			latch.await();
		} catch (InterruptedException e) {
			Assert.fail("latch await throw exception. " + e.getMessage());
		}
		pool.shutdown();

	}
}

class BatchThread implements Runnable {
	private final CountDownLatch latch;
	private final KineticClient kineticClient;

	public BatchThread(KineticClient kineticClient, CountDownLatch latch) {
		this.kineticClient = kineticClient;
		this.latch = latch;
	}

	@Override
	public void run() {
		Entry bar = new Entry();
		byte[] barKey = toByteArray("bar");
		bar.setKey(barKey);
		byte[] barValue = toByteArray("barvalue");
		bar.setValue(barValue);
		byte[] barVersion = toByteArray("1234");
		bar.getEntryMetadata().setVersion(barVersion);

		Entry foo = new Entry();
		byte[] fooKey = toByteArray("foo");
		foo.setKey(fooKey);
		byte[] fooValue = toByteArray("foovalue");
		foo.setValue(fooValue);
		byte[] fooVersion = toByteArray("1234");
		foo.getEntryMetadata().setVersion(fooVersion);

		try {
			kineticClient.deleteForced(fooKey);
			kineticClient.deleteForced(barKey);
		} catch (KineticException e) {
			Assert.fail("Clean entry failed. " + e.getMessage());
		}

		try {
			kineticClient.putForced(bar);
		} catch (KineticException e1) {
			Assert.fail("Put entry failed. " + e1.getMessage());
		}

		BatchOperation batch = null;
		try {
			batch = kineticClient.createBatchOperation();
		} catch (KineticException e1) {
			Assert.fail("Create batch throw exception. " + e1.getMessage());
		}

		try {
			batch.putForced(foo);
		} catch (KineticException e1) {
			Assert.fail("Put entry failed. " + e1.getMessage());
		}

		try {
			batch.deleteForced(bar.getKey());
		} catch (KineticException e1) {
			Assert.fail("Delete entry failed. " + e1.getMessage());
		}

		try {
			batch.commit();
		} catch (KineticException e1) {
			Assert.fail("Batch commit throw exception. " + e1.getMessage());
		}

		try {
			kineticClient.deleteForced(fooKey);
			kineticClient.deleteForced(barKey);
		} catch (KineticException e) {
			Assert.fail("Clean entry failed. " + e.getMessage());
		}

		try {
			kineticClient.close();
		} catch (KineticException e) {
			Assert.fail("close kineticClient failed, " + e.getMessage());
		} catch (Exception e) {
			Assert.fail("close kineticClient failed, " + e.getMessage());
		}

		// latch count down
		latch.countDown();
	}
}
+0 −3028

File deleted.

Preview size limit exceeded, changes collapsed.

+140 −0
Original line number Diff line number Diff line
/**
 * 
 * Copyright (C) 2014 Seagate Technology.
 * 
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
 *
 */
package com.seagate.kinetic.batchOp;

import java.util.concurrent.TimeUnit;

import kinetic.client.KineticClientFactory;
import kinetic.client.KineticException;
import kinetic.client.advanced.AdvancedKineticClient;

import org.testng.annotations.Test;

import com.seagate.kinetic.IntegrationTestCase;

public class BatchPerformanceTest extends IntegrationTestCase {
	private static final int TEST_RUN_TIME_IN_MINUTE = 1;
	private static final int MAX_BATCH_COUNT = 5;
	private static final int REPORT_PERIOD_IN_SECONDS = 10;

	@Test(dataProvider = "transportProtocolOptions")
	public void testBatchPerformance_WithPureBatchPutUsingMultipleThreads(
			String clientName) {
		long start, end;
		double mbps = 0;
		System.out.println("Start batch performance test...");
		try {
			AdvancedKineticClient kineticClient = null;
			PureBatchPutThread threads[] = new PureBatchPutThread[MAX_BATCH_COUNT];
			for (int i = 0; i < MAX_BATCH_COUNT; i++) {
				kineticClient = (AdvancedKineticClient) KineticClientFactory
						.createInstance(kineticClientConfigutations
								.get(clientName));
				threads[i] = new PureBatchPutThread("MT_PBP" + i + "_",
						kineticClient, true);
				threads[i].start();
			}
			start = System.currentTimeMillis();

			try {
				int report_rounds = (60 * TEST_RUN_TIME_IN_MINUTE)
						/ REPORT_PERIOD_IN_SECONDS;
				long time_spent_in_seconds = 0;
				for (int i = 0; i < report_rounds; i++) {
					TimeUnit.SECONDS.sleep(REPORT_PERIOD_IN_SECONDS);
					end = System.currentTimeMillis();
					time_spent_in_seconds = (end - start) / 1000;
					mbps = ((double) (PureBatchPutThread.totalPutBytes()) / time_spent_in_seconds) / 1024 / 1024;
					System.out.println("Runned " + time_spent_in_seconds
							+ " seconds, MBPS: " + mbps);
				}
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} finally {
				for (int i = 0; i < MAX_BATCH_COUNT; i++) {
					threads[i].shutdown();
					try {
						threads[i].join();
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
			}

		} catch (KineticException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	@Test(dataProvider = "transportProtocolOptions")
	public void testUuidFileStorePerformance_WithPurePutUsingMultipleThreads(
			String clientName) {
		long start, end;
		double mbps = 0;
		System.out.println("Start uuid mode put performance test...");
		try {
			AdvancedKineticClient kineticClient = null;
			PurePutThread threads[] = new PurePutThread[MAX_BATCH_COUNT];
			for (int i = 0; i < MAX_BATCH_COUNT; i++) {
				kineticClient = (AdvancedKineticClient) KineticClientFactory
						.createInstance(kineticClientConfigutations
								.get(clientName));
				threads[i] = new PurePutThread("MT_PP" + i + "_",
						kineticClient, true);
				threads[i].start();
			}
			start = System.currentTimeMillis();

			try {
				int report_rounds = (60 * TEST_RUN_TIME_IN_MINUTE)
						/ REPORT_PERIOD_IN_SECONDS;
				long time_spent_in_seconds = 0;
				for (int i = 0; i < report_rounds; i++) {
					TimeUnit.SECONDS.sleep(REPORT_PERIOD_IN_SECONDS);
					end = System.currentTimeMillis();
					time_spent_in_seconds = (end - start) / 1000;
					mbps = ((double) (PurePutThread.totalPutBytes()) / time_spent_in_seconds) / 1024 / 1024;
					System.out.println("Runned " + time_spent_in_seconds
							+ " seconds, MBPS: " + mbps);
				}
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} finally {
				for (int i = 0; i < MAX_BATCH_COUNT; i++) {
					threads[i].shutdown();
					try {
						threads[i].join();
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
			}

		} catch (KineticException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}
Loading