Commit a2b71c59 authored by Job Vranish's avatar Job Vranish
Browse files

fixed potential race condition on blocking operations

parent caa5cd26
Loading
Loading
Loading
Loading
+6 −2
Original line number Diff line number Diff line
@@ -54,6 +54,7 @@ KineticOperation* KineticController_CreateOperation(KineticSession const * const
typedef struct {
    pthread_mutex_t receiveCompleteMutex;
    pthread_cond_t receiveComplete;
    bool completed;
    KineticStatus status;
} DefaultCallbackData;

@@ -62,6 +63,7 @@ static void DefaultCallback(KineticCompletionData* kinetic_data, void* client_da
    DefaultCallbackData * data = client_data;
    pthread_mutex_lock(&data->receiveCompleteMutex);
    data->status = kinetic_data->status;
    data->completed = true;
    pthread_cond_signal(&data->receiveComplete);
    pthread_mutex_unlock(&data->receiveCompleteMutex);
}
@@ -92,6 +94,7 @@ KineticStatus KineticController_ExecuteOperation(KineticOperation* operation, Ki
        pthread_mutex_init(&data.receiveCompleteMutex, NULL);
        pthread_cond_init(&data.receiveComplete, NULL);
        data.status = KINETIC_STATUS_INVALID;
        data.completed = false;

        operation->closure = DefaultClosure(&data);

@@ -100,9 +103,10 @@ KineticStatus KineticController_ExecuteOperation(KineticOperation* operation, Ki

        if (status == KINETIC_STATUS_SUCCESS) {
            pthread_mutex_lock(&data.receiveCompleteMutex);
            pthread_cond_wait(&data.receiveComplete, &data.receiveCompleteMutex);
            pthread_mutex_unlock(&data.receiveCompleteMutex);
            while(data.completed == false)
            { pthread_cond_wait(&data.receiveComplete, &data.receiveCompleteMutex); }
            status = data.status;
            pthread_mutex_unlock(&data.receiveCompleteMutex);
        }

        pthread_cond_destroy(&data.receiveComplete);
+1 −1
Original line number Diff line number Diff line
@@ -750,7 +750,7 @@ void KineticOperation_Complete(KineticOperation* operation, KineticStatus status
{
    assert(operation != NULL);
    // ExecuteOperation should ensure a callback exists (either a user supplied one, or the a default)
    if (operation->closure.callback == NULL) { return; }
    assert(operation->closure.callback != NULL) ;
    KineticCompletionData completionData = {.status = status};

    KineticCountingSemaphore_Give(operation->connection->outstandingOperations);