Commit f0a8392f authored by Emma Li's avatar Emma Li
Browse files

Merge pull request #31 from Kinetic/feature/batch_queue

Feature/batch queue
parents a02ec6d8 b0801c31
Loading
Loading
Loading
Loading
+5 −11
Original line number Diff line number Diff line
@@ -480,9 +480,11 @@ public class ClientProxy {
        
        try {

            // finalizeHeader(kmreq);

            // require to obtain lock to prevent possible dead-lock
        	// such as if connection close is triggered from remote.
        	synchronized (this) {
        		kmresp = this.iohandler.getMessageHandler().write(kmreq);
        	}

            // check if we do received a response
            if (kmresp == null) {
@@ -534,14 +536,6 @@ public class ClientProxy {

        try {

            // Message.Builder message = (Builder) im.getMessage();

            // finalize and fill the required header fields for the message
            // finalizeHeader(kineticMessage);

            // get request message to send
            // Message request = message.build();

            // create context message for the async operation
            CallbackContext<T> context = new CallbackContext<T>(handler);

+5 −5
Original line number Diff line number Diff line
@@ -103,8 +103,7 @@ public class MessageHandler implements ClientMessageService, Runnable {
		this.asyncQueuedSize = this.client.getConfiguration()
				.getAsyncQueueSize();

		this.requestTimeout = this.client.getConfiguration()
				.getRequestTimeoutMillis();
		this.requestTimeout = this.client.getConfiguration().getRequestTimeoutMillis();
	}
	

@@ -300,11 +299,12 @@ public class MessageHandler implements ClientMessageService, Runnable {
                this.doWrite(message);
            }

			respond = lbq.poll(this.requestTimeout, TimeUnit.MILLISECONDS);

            if (this.isClosed) {
				throw new IOException("Connection is closed.");
			} else {
				respond = lbq.poll(this.requestTimeout, TimeUnit.MILLISECONDS);
			}
			
		} finally {
			this.ackmap.remove(seq);
		}
+21 −0
Original line number Diff line number Diff line
@@ -20,6 +20,7 @@ import io.netty.channel.ChannelHandlerContext;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Logger;

@@ -328,4 +329,24 @@ public class NioBatchOpPreProcessor {

        return kineticMessage;
    }
    
    /**
     * remove batches from this connection.
     * 
     * @param km kinetic message.
     */
    public static void cleanUpConnection (long cid) {
        
        // connection id
        String keyPrefix = String.valueOf(cid);
        // batch keys
        Set <String> keys = batchMap.keySet();
        
        // remove matched connection ids
        for (String key: keys) {    
            if (key.startsWith(keyPrefix)) {
                batchMap.remove(key);
            }
        }
    }
}
+3 −0
Original line number Diff line number Diff line
@@ -187,6 +187,9 @@ public class NioMessageServiceHandler extends
        // map
        ConnectionInfo info = SimulatorEngine.removeConnectionInfo(ctx);
        
        // remove batches in batch queue
        NioBatchOpPreProcessor.cleanUpConnection(info.getConnectionId());

        logger.info("connection info is removed, id=" + info.getConnectionId()
                + ", is secure channel=" + this.isSecureChannel);
	}
+101 −7
Original line number Diff line number Diff line
@@ -37,8 +37,8 @@ public class BatchBoundaryTest extends IntegrationTestCase {
	private final int MAX_VALUE_SIZE = 1024 * 1024;
	private final int MAX_KEY_SIZE = 4096;
	private final int MAX_VERSION_SIZE = 2048;
	private final int MAX_BATCH_OP_NUM = 15;
	private final int MAX_BATCH_PER_CONNECTION_NUM = 5;
	private final int MAX_BATCH_OP_NUM = 5;
	private final int MAX_BATCH_PER_CONNECTION_NUM = 15;

	@Test(dataProvider = "transportProtocolOptions")
	public void testBatchOperation_PutAndDeleteForced_PutExceedMaximumKeySizeFailed(
@@ -206,7 +206,7 @@ public class BatchBoundaryTest extends IntegrationTestCase {
	}

	@Test(dataProvider = "transportProtocolOptions", enabled = true, priority = 1)
	public void testBatchOperation_BatchCountExceedTheMaxinumNum_ThrowException(
	public void testBatchOperation_BatchCountExceedTheMaximumNum_ThrowException(
			String clientName) {
		KineticClient kineticClient = creatClient(clientName);
		assertTrue(kineticClient != null);
@@ -219,13 +219,19 @@ public class BatchBoundaryTest extends IntegrationTestCase {
			Assert.fail("delete entry failed. " + e1.getMessage());
		}

		BatchOperation batch[] = new BatchOperation[MAX_BATCH_PER_CONNECTION_NUM + 1];
		BatchOperation batch[] = new BatchOperation[MAX_BATCH_OP_NUM + 1];
		try {
			for (int i = 0; i < MAX_BATCH_PER_CONNECTION_NUM + 1; i++) {
			for (int i = 0; i < MAX_BATCH_OP_NUM + 1; i++) {
				batch[i] = kineticClient.createBatchOperation();
			}
		} catch (KineticException e) {
			assertTrue(e.getMessage() != null);
		} finally {
		    try {
                kineticClient.close();
            } catch (KineticException e) {
                ;
            }
		}
	}

@@ -243,15 +249,103 @@ public class BatchBoundaryTest extends IntegrationTestCase {
	@Test(dataProvider = "transportProtocolOptions", enabled = true, priority = 2)
	public void testBatchOperation_OperationExceedTheMaxinumNumPerBatch_ThrowException(
			String clientName) {
		//KineticClient kineticClient = creatClient(clientName);
		//assertTrue(kineticClient != null);

		BatchOperation batch = null;
		try {
			batch = getClient(clientName).createBatchOperation();
		} catch (KineticException e) {
			Assert.fail("Create batch operation throw exception. "
					+ e.getMessage());
		}

		try {
			for (int j = 0; j < MAX_BATCH_PER_CONNECTION_NUM + 1; j++) {
				batch.putForced(new Entry(toByteArray("foo" + j),
						toByteArray("value" + j)));
			}

		} catch (KineticException e) {
			Assert.fail("Put entry throw exception. " + e.getMessage());
		}

		try {
			batch.commit();
		} catch (KineticException e) {
			assertTrue(e.getMessage() != null);
		}
	}

	@Test(dataProvider = "transportProtocolOptions", enabled = true, priority = 2)
	public void testBatchOperation_BatchQueueCheck_Success(String clientName) {
		KineticClient kineticClient = creatClient(clientName);
		assertTrue(kineticClient != null);

		BatchOperation batch[] = new BatchOperation[MAX_BATCH_OP_NUM + 1];
		try {
			for (int i = 0; i < MAX_BATCH_OP_NUM + 1; i++) {
				kineticClient.deleteForced(toByteArray("foo" + i));
				batch[i] = kineticClient.createBatchOperation();
			}
		} catch (KineticException e) {
			Assert.fail("Clean entry failed. " + e.getMessage());
			assertTrue(e.getMessage() != null);
			try {
				kineticClient.close();
			} catch (KineticException e1) {
				Assert.fail("client close throw exception: " + e1.getMessage());
			}
		}

		BatchOperation batchSecond = null;
		KineticClient kineticClientSecond = creatClient(clientName);
		try {
			batchSecond = kineticClientSecond.createBatchOperation();
		} catch (KineticException e) {
			Assert.fail("Create batch operation throw exception. "
					+ e.getMessage());
		}

		try {
			for (int j = 0; j < MAX_BATCH_PER_CONNECTION_NUM; j++) {
				batchSecond.putForced(new Entry(toByteArray("foo" + j),
						toByteArray("value" + j)));
			}

		} catch (KineticException e) {
			Assert.fail("Put entry throw exception. " + e.getMessage());
		}

		try {
			batchSecond.commit();
		} catch (KineticException e) {
			Assert.fail("Second batch operation failed. " + e.getMessage());
		}
	}

	@Test(dataProvider = "transportProtocolOptions", enabled = true, priority = 2)
	public void testBatchOperation_BatchQueueCheck_ThrowException(
			String clientName) {
		KineticClient kineticClient = creatClient(clientName);
		assertTrue(kineticClient != null);

		BatchOperation batch[] = new BatchOperation[MAX_BATCH_OP_NUM + 1];
		try {
			for (int i = 0; i < MAX_BATCH_OP_NUM + 1; i++) {
				batch[i] = kineticClient.createBatchOperation();
			}
		} catch (KineticException e) {
			assertTrue(e.getMessage() != null);
			try {
				kineticClient.createBatchOperation();
			} catch (KineticException e1) {
				assertTrue(e1.getMessage() != null);
				try {
					kineticClient.close();
				} catch (KineticException e2) {
					Assert.fail("client close throw exception: "
							+ e2.getMessage());
				}
			}
		}
	}