Commit 918d060b authored by Job Vranish's avatar Job Vranish
Browse files

implemented async timeouts

parent c165a020
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -148,6 +148,7 @@ typedef enum {
    KINETIC_STATUS_INVALID_REQUEST,     // Something about the request is invalid
    KINETIC_STATUS_OPERATION_INVALID,   // Operation was invalid
    KINETIC_STATUS_OPERATION_FAILED,    // Device reported an operation error
    KINETIC_STATUS_OPERATION_TIMEDOUT,  // Device did not respond to the operation in time
    KINETIC_STATUS_CLUSTER_MISMATCH,    // Specified cluster version does not match device
    KINETIC_STATUS_VERSION_MISMATCH,    // The specified object version info for a PUT/GET do not match stored object
    KINETIC_STATUS_DATA_ERROR,          // Device reported data error, no space or HMAC failure
+1 −0
Original line number Diff line number Diff line
@@ -319,6 +319,7 @@ void KineticAllocator_FreeOperation(KineticConnection* const connection, Kinetic
            operation->response, operation, connection);
        KineticAllocator_FreePDU(connection, operation->response);
    }
    pthread_mutex_destroy(&operation->timeoutTimeMutex);
    KineticAllocator_FreeItem(&connection->operations, (void*)operation);
    LOGF3("Freed operation (0x%0llX) on connection (0x%0llX)", operation, connection);
}
+45 −15
Original line number Diff line number Diff line
@@ -60,7 +60,28 @@ static KineticStatus KineticClient_CreateOperation(
    return KINETIC_STATUS_SUCCESS;
}

static KineticStatus KineticClient_ExecuteOperation(KineticOperation* operation, KineticCompletionClosure* closure)
typedef struct {
    pthread_mutex_t receiveCompleteMutex;
    pthread_cond_t receiveComplete;
    KineticStatus status;
} DefaultCallbackData;

static void DefaultCallback(KineticCompletionData* kinetic_data, void* client_data)
{
    DefaultCallbackData * data = client_data;
    data->status = kinetic_data->status;
    pthread_cond_signal(&data->receiveComplete);
}

static KineticCompletionClosure DefaultClosure(DefaultCallbackData * const data)
{
    return (KineticCompletionClosure) {
        .callback = DefaultCallback,
        .clientData = data,
    };
}

static KineticStatus KineticClient_ExecuteOperation(KineticOperation* operation, KineticCompletionClosure* const closure)
{
    assert(operation != NULL);
    KineticStatus status = KINETIC_STATUS_INVALID;
@@ -77,23 +98,37 @@ static KineticStatus KineticClient_ExecuteOperation(KineticOperation* operation,
        LOGF1("  Sending PDU (0x%0llX) w/o value", operation->request);
    }

    KineticOperation_SetTimeoutTime(operation, KINETIC_OPERATION_TIMEOUT_SECS);

    if (closure != NULL)
    {
        operation->closure = *closure;
        return KineticOperation_SendRequest(operation);
    }
    else
    {
        pthread_mutex_init(&operation->receiveCompleteMutex, NULL);
        pthread_cond_init(&operation->receiveComplete, NULL);
    }
        DefaultCallbackData data;
        pthread_mutex_init(&data.receiveCompleteMutex, NULL);
        pthread_cond_init(&data.receiveComplete, NULL);
        data.status = KINETIC_STATUS_SUCCESS;

        operation->closure = DefaultClosure(&data);

        // Send the request
        status = KineticOperation_SendRequest(operation);
    if (status != KINETIC_STATUS_SUCCESS) {
        return status;

        if (status == KINETIC_STATUS_SUCCESS) {
            pthread_mutex_lock(&data.receiveCompleteMutex);
            pthread_cond_wait(&data.receiveComplete, &data.receiveCompleteMutex);
            pthread_mutex_unlock(&data.receiveCompleteMutex);
            status = data.status;
        }

    return KineticOperation_ReceiveAsync(operation);
        pthread_cond_destroy(&data.receiveComplete);
        pthread_mutex_destroy(&data.receiveCompleteMutex);

        return status;
    }
}

void KineticClient_Init(const char* log_file, int log_level)
@@ -189,9 +224,7 @@ KineticStatus KineticClient_NoOp(KineticSessionHandle handle)
    KineticOperation_BuildNoop(operation);

    // Execute the operation
    status = KineticClient_ExecuteOperation(operation, NULL);

    return status;
    return  KineticClient_ExecuteOperation(operation, NULL);
}

KineticStatus KineticClient_Put(KineticSessionHandle handle,
@@ -225,7 +258,6 @@ KineticStatus KineticClient_Get(KineticSessionHandle handle,

    // Initialize request
    KineticOperation_BuildGet(operation, entry);
    if (closure != NULL) {operation->closure = *closure;}

    // Execute the operation
    return KineticClient_ExecuteOperation(operation, closure);
@@ -243,7 +275,6 @@ KineticStatus KineticClient_Delete(KineticSessionHandle handle,

    // Initialize request
    KineticOperation_BuildDelete(operation, entry);
    if (closure != NULL) {operation->closure = *closure;}

    // Execute the operation
    return KineticClient_ExecuteOperation(operation, closure);
@@ -270,7 +301,6 @@ KineticStatus KineticClient_GetKeyRange(KineticSessionHandle handle,

    // Initialize request
    KineticOperation_BuildGetKeyRange(operation, range, keys);
    if (closure != NULL) {operation->closure = *closure;}

    // Execute the operation
    return KineticClient_ExecuteOperation(operation, closure);
+36 −9
Original line number Diff line number Diff line
@@ -41,6 +41,37 @@ void KineticConnection_Pause(KineticConnection* const connection, bool pause)
    connection->thread.paused = pause;
}

static void CompleteOperation(KineticOperation* operation, KineticStatus status)
{
    assert(operation != NULL);
    // ExecuteOperation should ensure a callback exists (either a user supplied one, or the a default)
    assert(operation->closure.callback != NULL);
    KineticCompletionData completionData = {.status = status};
    operation->closure.callback(&completionData, operation->closure.clientData);
    KineticAllocator_FreeOperation(operation->connection, operation);
}

static void TimeoutOperations(KineticConnection* const connection)
{
    struct timeval currentTime;
    gettimeofday(&currentTime, NULL);

    for (KineticOperation* operation = KineticAllocator_GetFirstOperation(connection);
         operation != NULL;
         operation = KineticAllocator_GetNextOperation(connection, operation)) {

        struct timeval timeoutTime = KineticOperation_GetTimeoutTime(operation);

        // if this operation has a nonzero timeout
        //   and it's timed out
        if (!Kinetic_TimevalIsZero(timeoutTime) &&
            Kinetic_TimevalCmp(currentTime, operation->timeoutTime) >= 0)
        {
            CompleteOperation(operation, KINETIC_STATUS_OPERATION_TIMEDOUT);
        }
    }
}

static void* KineticConnection_Worker(void* thread_arg)
{
    KineticStatus status;
@@ -110,17 +141,11 @@ static void* KineticConnection_Worker(void* thread_arg)
                                status = op->callback(op);
                            }

                            // Call client-supplied closure callback, if supplied
                            if (op->closure.callback != NULL) {
                                KineticCompletionData completionData = {.status = status};
                                op->closure.callback(&completionData, op->closure.clientData);
                                KineticAllocator_FreeOperation(thread->connection, op);
                            if (status == KINETIC_STATUS_SUCCESS) {
                                status = KineticPDU_GetStatus(op->response);
                            }

                            // Otherwise, is a synchronous opearation, so just set a flag
                            else {
                                pthread_cond_signal(&op->receiveComplete);
                            }
                            CompleteOperation(op, status);
                        }
                    }
                }
@@ -141,6 +166,8 @@ static void* KineticConnection_Worker(void* thread_arg)
                thread->fatalError = true;
            } break;
        }

        TimeoutOperations(thread->connection);
    }

    LOG1("Worker thread terminated!");
+24 −57
Original line number Diff line number Diff line
@@ -129,6 +129,30 @@ KineticStatus KineticOperation_GetStatus(const KineticOperation* const operation
    return status;
}

struct timeval KineticOperation_GetTimeoutTime(KineticOperation* const operation)
{
    pthread_mutex_lock(&operation->timeoutTimeMutex);
    struct timeval timeoutTime = operation->timeoutTime;
    pthread_mutex_unlock(&operation->timeoutTimeMutex);
    return timeoutTime;
}

void KineticOperation_SetTimeoutTime(KineticOperation* const operation, uint32_t const timeout_in_sec)
{
    pthread_mutex_lock(&operation->timeoutTimeMutex);

    // set timeout time
    struct timeval currentTime;
    gettimeofday(&currentTime, NULL);
    struct timeval timeoutIn = {
        .tv_sec = timeout_in_sec,
        .tv_usec = 0,
    };
    operation->timeoutTime = Kinetic_TimevalAdd(currentTime, timeoutIn);

    pthread_mutex_unlock(&operation->timeoutTimeMutex);
}

KineticOperation* KineticOperation_AssociateResponseWithOperation(KineticPDU* response)
{
    if (response == NULL ||
@@ -167,63 +191,6 @@ KineticOperation* KineticOperation_AssociateResponseWithOperation(KineticPDU* re
    return NULL;
}

KineticStatus KineticOperation_ReceiveAsync(KineticOperation* const operation)
{
    assert(operation != NULL);
    assert(operation->request != NULL);
    assert(operation->request->connection != NULL);
    assert(operation->request->proto != NULL);
    assert(operation->request->command != NULL);
    assert(operation->request->command->header != NULL);
    assert(operation->request->command->header->has_sequence);

    const int fd = operation->request->connection->socket;
    assert(fd >= 0);
    LOGF1("\nReceiving PDU via fd=%d", fd);

    KineticStatus status = KINETIC_STATUS_SUCCESS;

    // Wait for response if no callback supplied (synchronous)
    if (operation->closure.callback == NULL) { 
        struct timeval tv;

        status = KINETIC_STATUS_SOCKET_TIMEOUT;

        // Wait for matching response to arrive
        gettimeofday(&tv, NULL);
        struct timespec timeoutTime = {
            .tv_sec = tv.tv_sec + KINETIC_PDU_RECEIVE_TIMEOUT_SECS,
            .tv_nsec = 0,
        };

        pthread_mutex_lock(&operation->receiveCompleteMutex);
        int res = pthread_cond_timedwait(&operation->receiveComplete, &operation->receiveCompleteMutex, &timeoutTime);
        pthread_mutex_unlock(&operation->receiveCompleteMutex);

        if (res == ETIMEDOUT) {
            LOG0("Timed out waiting to received response PDU!");
            status = KINETIC_STATUS_SOCKET_TIMEOUT;
        }
        else if (res == 0 && operation->response != NULL) {
            status = KineticPDU_GetStatus(operation->response);
            LOGF2("Response PDU received w/status %s", Kinetic_GetStatusDescription(status));
        }
        else {
            LOG0("Unknown error occurred waiting for response PDU to arrive!");
            status = KINETIC_STATUS_CONNECTION_ERROR;
        }

        KineticAllocator_FreeOperation(operation->connection, operation);

        pthread_cond_destroy(&operation->receiveComplete);
        pthread_mutex_destroy(&operation->receiveCompleteMutex);
    }

    return status;
}



KineticStatus KineticOperation_NoopCallback(KineticOperation* operation)
{
    assert(operation != NULL);
Loading