Commit f2f16109 authored by Job Vranish's avatar Job Vranish
Browse files

removed spinlock_when_not_using_async_callback

parent 6de5c002
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -33,7 +33,7 @@ VERSION = ${shell head -n1 $(VERSION_FILE)}

KINETIC_LIB_NAME = $(PROJECT).$(VERSION)
KINETIC_LIB = $(BIN_DIR)/lib$(KINETIC_LIB_NAME).a
LIB_INCS = -I$(LIB_DIR) -I$(PUB_INC) -I$(PROTOBUFC) -I$(VENDOR)
LIB_INCS = -I$(LIB_DIR) -I$(PUB_INC) -I$(PROTOBUFC) -I$(SOCKET99) -I$(VENDOR)
LIB_DEPS = \
	$(PROTOBUFC)/protobuf-c/protobuf-c.h \
	$(SOCKET99)/socket99.h \
+1 −1
Original line number Diff line number Diff line
@@ -29,7 +29,7 @@
  :include:
    - src/**
    - include/**
    - vendor/
    - vendor/socket99/**
    - vendor/protobuf-c/**

:defines:
+16 −7
Original line number Diff line number Diff line
@@ -60,7 +60,7 @@ static KineticStatus KineticClient_CreateOperation(
    return KINETIC_STATUS_SUCCESS;
}

static KineticStatus KineticClient_ExecuteOperation(KineticOperation* operation)
static KineticStatus KineticClient_ExecuteOperation(KineticOperation* operation, KineticCompletionClosure* closure)
{
    assert(operation != NULL);
    KineticStatus status = KINETIC_STATUS_INVALID;
@@ -77,6 +77,16 @@ static KineticStatus KineticClient_ExecuteOperation(KineticOperation* operation)
        LOGF1("  Sending PDU (0x%0llX) w/o value", operation->request);
    }

    if (closure != NULL)
    {
        operation->closure = *closure;
    }
    else
    {
        pthread_mutex_init(&operation->receiveCompleteMutex, NULL);
        pthread_cond_init(&operation->receiveComplete, NULL);
    }

    // Send the request
    status = KineticOperation_SendRequest(operation);
    if (status != KINETIC_STATUS_SUCCESS) {
@@ -179,7 +189,7 @@ KineticStatus KineticClient_NoOp(KineticSessionHandle handle)
    KineticOperation_BuildNoop(operation);

    // Execute the operation
    status = KineticClient_ExecuteOperation(operation);
    status = KineticClient_ExecuteOperation(operation, NULL);

    return status;
}
@@ -197,10 +207,9 @@ KineticStatus KineticClient_Put(KineticSessionHandle handle,

    // Initialize request
    KineticOperation_BuildPut(operation, entry);
    if (closure != NULL) {operation->closure = *closure;}

    // Execute the operation
    return KineticClient_ExecuteOperation(operation);
    return KineticClient_ExecuteOperation(operation, closure);
}

KineticStatus KineticClient_Get(KineticSessionHandle handle,
@@ -219,7 +228,7 @@ KineticStatus KineticClient_Get(KineticSessionHandle handle,
    if (closure != NULL) {operation->closure = *closure;}

    // Execute the operation
    return KineticClient_ExecuteOperation(operation);
    return KineticClient_ExecuteOperation(operation, closure);
}

KineticStatus KineticClient_Delete(KineticSessionHandle handle,
@@ -237,7 +246,7 @@ KineticStatus KineticClient_Delete(KineticSessionHandle handle,
    if (closure != NULL) {operation->closure = *closure;}

    // Execute the operation
    return KineticClient_ExecuteOperation(operation);
    return KineticClient_ExecuteOperation(operation, closure);
}

KineticStatus KineticClient_GetKeyRange(KineticSessionHandle handle,
@@ -264,5 +273,5 @@ KineticStatus KineticClient_GetKeyRange(KineticSessionHandle handle,
    if (closure != NULL) {operation->closure = *closure;}

    // Execute the operation
    return KineticClient_ExecuteOperation(operation);
    return KineticClient_ExecuteOperation(operation, closure);
}
+1 −1
Original line number Diff line number Diff line
@@ -119,7 +119,7 @@ static void* KineticConnection_Worker(void* thread_arg)

                            // Otherwise, is a synchronous opearation, so just set a flag
                            else {
                                op->receiveComplete = true;
                                pthread_cond_signal(&op->receiveComplete);
                            }
                        }
                    }
+15 −16
Original line number Diff line number Diff line
@@ -27,6 +27,7 @@
#include "kinetic_allocator.h"
#include "kinetic_logger.h"
#include <stdlib.h>
#include <errno.h>
#include <sys/time.h>

static void KineticOperation_ValidateOperation(KineticOperation* operation);
@@ -149,7 +150,6 @@ KineticOperation* KineticOperation_AssociateResponseWithOperation(KineticPDU* re
            operation->request->command->header->has_sequence && 
            operation->request->command->header->sequence == targetSequence)
        {
            operation->receiveComplete = false;
            operation->response = response;
            return operation;
        }
@@ -177,30 +177,26 @@ KineticStatus KineticOperation_ReceiveAsync(KineticOperation* const operation)

    // Wait for response if no callback supplied (synchronous)
    if (operation->closure.callback == NULL) { 
        bool timeout = false;
        struct timeval tv;
        time_t startTime, currentTime;

        status = KINETIC_STATUS_SOCKET_TIMEOUT;

        // Wait for matching response to arrive
        gettimeofday(&tv, NULL);
        startTime = tv.tv_sec;
        while(!operation->receiveComplete && !timeout) {
            gettimeofday(&tv, NULL);
            currentTime = tv.tv_sec;
            if ((currentTime - startTime) >= KINETIC_PDU_RECEIVE_TIMEOUT_SECS) {
                timeout = true;
            }
            else {
                sleep(0);
            }
        }
        struct timespec timeoutTime = {
            .tv_sec = tv.tv_sec + KINETIC_PDU_RECEIVE_TIMEOUT_SECS,
            .tv_nsec = 0,
        };

        if (timeout) {
        pthread_mutex_lock(&operation->receiveCompleteMutex);
        int res = pthread_cond_timedwait(&operation->receiveComplete, &operation->receiveCompleteMutex, &timeoutTime);
        pthread_mutex_unlock(&operation->receiveCompleteMutex);

        if (res == ETIMEDOUT) {
            LOG0("Timed out waiting to received response PDU!");
            status = KINETIC_STATUS_SOCKET_TIMEOUT;
        }
        else if (operation->response != NULL) {
        else if (res == 0 && operation->response != NULL) {
            status = KineticPDU_GetStatus(operation->response);
            LOGF2("Response PDU received w/status %s", Kinetic_GetStatusDescription(status));
        }
@@ -210,6 +206,9 @@ KineticStatus KineticOperation_ReceiveAsync(KineticOperation* const operation)
        }

        KineticAllocator_FreeOperation(operation->connection, operation);

        pthread_cond_destroy(&operation->receiveComplete);
        pthread_mutex_destroy(&operation->receiveCompleteMutex);
    }

    return status;
Loading