Commit 7e68f344 authored by chiaming2000's avatar chiaming2000
Browse files

Java API:

Enforce sequence number ordering per connection. Messages write to
socket per connection are guaranteed to have incremental sequense and
thread safe.
parent 0b7be784
Loading
Loading
Loading
Loading
+4 −4
Original line number Diff line number Diff line
@@ -481,7 +481,7 @@ public class ClientProxy {
        
        try {

            finalizeHeader(kmreq);
            // finalizeHeader(kmreq);

            kmresp = this.iohandler.getMessageHandler().write(kmreq);

@@ -538,7 +538,7 @@ public class ClientProxy {
            // Message.Builder message = (Builder) im.getMessage();

            // finalize and fill the required header fields for the message
            finalizeHeader(kineticMessage);
            // finalizeHeader(kineticMessage);

            // get request message to send
            // Message request = message.build();
@@ -562,7 +562,7 @@ public class ClientProxy {
    void requestNoAck(KineticMessage kmreq) throws KineticException {

        try {
            finalizeHeader(kmreq);
            // finalizeHeader(kmreq);
            this.iohandler.getMessageHandler().writeNoAck(kmreq);
        } catch (Exception e) {

@@ -604,7 +604,7 @@ public class ClientProxy {
     * @param message
     *            the request protocol buffer message.
     */
    private void finalizeHeader(KineticMessage kineticMessage) {
    public void finalizeHeader(KineticMessage kineticMessage) {

        Message.Builder messageBuilder = (Builder) kineticMessage.getMessage();
        
+26 −11
Original line number Diff line number Diff line
@@ -280,16 +280,25 @@ public class MessageHandler implements ClientMessageService, Runnable {
		LinkedBlockingQueue<KineticMessage> lbq = new LinkedBlockingQueue<KineticMessage>(
				1);

		Long seq = Long.valueOf(message.getCommand().getHeader()
		KineticMessage respond = null;

        Long seq = 0L;

		try {

            synchronized (this) {

                this.client.finalizeHeader(message);

                seq = Long.valueOf(message.getCommand().getHeader()
                        .getSequence());

                this.ackmap.put(seq, lbq);

		KineticMessage respond = null;

		try {
                // this.iohandler.write(message);
                this.doWrite(message);
            }

			respond = lbq.poll(this.requestTimeout, TimeUnit.MILLISECONDS);

			if (this.isClosed) {
@@ -302,17 +311,19 @@ public class MessageHandler implements ClientMessageService, Runnable {
		return respond;
	}

	public void writeAsync(KineticMessage message, Object context)
    public synchronized void writeAsync(KineticMessage message, Object context)
			throws IOException,
			InterruptedException {

        this.client.finalizeHeader(message);

		Long seq = Long.valueOf(message.getCommand().getHeader()
				.getSequence());

		synchronized (this) {
        // synchronized (this) {
			while (this.ackmap.size() >= this.asyncQueuedSize && this.isRunning) {
				this.wait();
			}
            // }
		}

		this.ackmap.put(seq, context);
@@ -321,7 +332,11 @@ public class MessageHandler implements ClientMessageService, Runnable {
		this.doWrite(message);
	}

    public void writeNoAck(KineticMessage message) throws IOException {
    public synchronized void writeNoAck(KineticMessage message)
            throws IOException {

        this.client.finalizeHeader(message);

        this.doWrite(message);
    }