Commit fb6642d5 authored by Greg Williams's avatar Greg Williams
Browse files

Fixed concurrency issue in examples and system tests relating to overlapped/async PUT operations

parent 3f047e52
Loading
Loading
Loading
Loading
+193 −2
Original line number Diff line number Diff line
@@ -17,7 +17,6 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
*/

#include "kinetic_client.h"
#include "kinetic_types.h"
#include "byte_array.h"
@@ -27,11 +26,203 @@
#include <sys/param.h>
#include <sys/stat.h>
#include <sys/file.h>
#include <pthread.h>
#include <errno.h>

#define REPORT_ERRNO(en, msg) if(en != 0){errno = en; perror(msg);}

struct kinetic_thread_arg {
    char ip[16];
    struct kinetic_put_arg* opArgs;
    int opCount;
};

typedef struct {
    size_t opsInProgress;
    size_t currentChunk;
    size_t maxOverlappedChunks;
    int fd;
    uint64_t keyPrefix;
    pthread_mutex_t transferMutex;
    pthread_mutex_t completeMutex;
    pthread_cond_t completeCond;
    KineticStatus status;
    KineticSessionHandle sessionHandle;
} FileTransferProgress;

typedef struct {
    KineticEntry entry;
    uint8_t key[KINETIC_DEFAULT_KEY_LEN];
    uint8_t value[KINETIC_OBJ_SIZE];
    uint8_t tag[KINETIC_DEFAULT_KEY_LEN];
    FileTransferProgress* currentTransfer;
} AsyncWriteClosureData;

FileTransferProgress * start_file_transfer(KineticSessionHandle handle,
    char const * const filename, uint64_t keyPrefix, uint32_t maxOverlappedChunks);
KineticStatus wait_for_put_finish(FileTransferProgress* const transfer);

static int put_chunk_of_file(FileTransferProgress* transfer);
static void put_chunk_of_file_finished(KineticCompletionData* kinetic_data, void* client_data);


int main(int argc, char** argv)
{
    (void)argc;
    (void)argv;
    fprintf(stderr, "\nEXAMPLE NOT YET IMPLEMENTED!\n");

    // Initialize kinetic-c and configure sessions
    const char HmacKeyString[] = "asdfasdf";
    const KineticSession sessionConfig = {
        .host = "localhost",
        .port = KINETIC_PORT,
        .clusterVersion = 0,
        .identity = 1,
        .hmacKey = ByteArray_CreateWithCString(HmacKeyString),
    };
    KineticClient_Init("stdout", 0);

    // Establish connection
    KineticSessionHandle sessionHandle;
    KineticStatus status = KineticClient_Connect(&sessionConfig, &sessionHandle);
    if (status != KINETIC_STATUS_SUCCESS) {
        fprintf(stderr, "Failed connecting to the Kinetic device w/status: %s\n",
            Kinetic_GetStatusDescription(status));
        return -1;
    }

    // Create a unique/common key prefix
    struct timeval now;
    gettimeofday(&now, NULL);
    uint64_t prefix = (uint64_t)now.tv_sec << sizeof(8);

    // Kick off the chained write/PUT operations and wait for completion
    const uint32_t maxOverlappedChunks = 4;
    const char* dataFile = "test/support/data/test.data";
    FileTransferProgress* transfer = start_file_transfer(sessionHandle, dataFile, prefix, maxOverlappedChunks);
    printf("Waiting for transfer to complete...\n");
    status = wait_for_put_finish(transfer);
    if (status != KINETIC_STATUS_SUCCESS) {
        fprintf(stderr, "Transfer failed w/status: %s\n", Kinetic_GetStatusDescription(status));
        return -2;
    }
    printf("Transfer completed successfully!\n");

    // Shutdown client connection and cleanup
    KineticClient_Disconnect(&sessionHandle);
    KineticClient_Shutdown();

    return 0;
}

static int put_chunk_of_file(FileTransferProgress* transfer)
{
    AsyncWriteClosureData* closureData = calloc(1, sizeof(AsyncWriteClosureData));
    transfer->opsInProgress++;
    closureData->currentTransfer = transfer;

    int bytesRead = read(transfer->fd, closureData->value, sizeof(closureData->value));
    if (bytesRead > 0) {
        transfer->currentChunk++;
        closureData->entry = (KineticEntry){
            .key = ByteBuffer_CreateAndAppend(closureData->key, sizeof(closureData->key),
                &transfer->keyPrefix, sizeof(transfer->keyPrefix)),
            .tag = ByteBuffer_CreateAndAppendFormattedCString(closureData->tag, sizeof(closureData->tag),
                "some_value_tag..._%04d", transfer->currentChunk),
            .algorithm = KINETIC_ALGORITHM_SHA1,
            .value = ByteBuffer_Create(closureData->value, sizeof(closureData->value), (size_t)bytesRead),
            .synchronization = KINETIC_SYNCHRONIZATION_WRITETHROUGH,
        };
        KineticStatus status = KineticClient_Put(transfer->sessionHandle,
            &closureData->entry,
            &(KineticCompletionClosure) {
                .callback = put_chunk_of_file_finished,
                .clientData = closureData,
            });
        if (status != KINETIC_STATUS_SUCCESS) {
            transfer->opsInProgress--;
            free(closureData);
            fprintf(stderr, "Failed writing chunk! PUT request reported status: %s\n",
                Kinetic_GetStatusDescription(status));
        }
    }
    else if (bytesRead == 0) { // EOF reached
        transfer->opsInProgress--;
        free(closureData);
    }
    else {
        transfer->opsInProgress--;
        free(closureData);
        fprintf(stderr, "Failed reading data from file!\n");
        REPORT_ERRNO(bytesRead, "read");
    }
    
    return bytesRead;
}

static void put_chunk_of_file_finished(KineticCompletionData* kinetic_data, void* clientData)
{
    AsyncWriteClosureData* closureData = clientData;
    FileTransferProgress* currentTransfer = closureData->currentTransfer;
    free(closureData);
    currentTransfer->opsInProgress--;

    if (kinetic_data->status == KINETIC_STATUS_SUCCESS) {
        int bytesPut = put_chunk_of_file(currentTransfer);
        if (bytesPut <= 0 && currentTransfer->opsInProgress == 0) {
            if (currentTransfer->status == KINETIC_STATUS_NOT_ATTEMPTED) {
                currentTransfer->status = KINETIC_STATUS_SUCCESS;
            }
            pthread_cond_signal(&currentTransfer->completeCond);
        }
    }
    else {
        currentTransfer->status = kinetic_data->status;
        // only signal when finished
        // keep track of outstanding operations
        // if there is no more data to read (or error), and no outstanding operations,
        // then signal
        pthread_cond_signal(&currentTransfer->completeCond);
        fprintf(stderr, "Failed writing chunk! PUT response reported status: %s\n",
            Kinetic_GetStatusDescription(kinetic_data->status));
    }
}

FileTransferProgress * start_file_transfer(KineticSessionHandle handle,
    char const * const filename, uint64_t keyPrefix, uint32_t maxOverlappedChunks)
{
    FileTransferProgress * transferState = malloc(sizeof(FileTransferProgress));
    *transferState = (FileTransferProgress) {
        .sessionHandle = handle,
        .maxOverlappedChunks = maxOverlappedChunks,
        .keyPrefix = keyPrefix,
        .fd = open(filename, O_RDONLY),
    };
    pthread_mutex_init(&transferState->transferMutex, NULL);
    pthread_mutex_init(&transferState->completeMutex, NULL);
    pthread_cond_init(&transferState->completeCond, NULL);
        
    // Start max overlapped PUT operations
    for (size_t i = 0; i < transferState->maxOverlappedChunks; i++) {
        put_chunk_of_file(transferState);
    }
    return transferState;
}

KineticStatus wait_for_put_finish(FileTransferProgress* const transfer)
{
    pthread_mutex_lock(&transfer->completeMutex);
    pthread_cond_wait(&transfer->completeCond, &transfer->completeMutex);
    pthread_mutex_unlock(&transfer->completeMutex);

    KineticStatus status = transfer->status;

    pthread_mutex_destroy(&transfer->completeMutex);
    pthread_cond_destroy(&transfer->completeCond);

    close(transfer->fd);

    free(transfer);

    return status;
}
+263 −3
Original line number Diff line number Diff line
@@ -17,7 +17,6 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
*/

#include "kinetic_client.h"
#include "kinetic_types.h"
#include "byte_array.h"
@@ -27,11 +26,272 @@
#include <sys/param.h>
#include <sys/stat.h>
#include <sys/file.h>
#include <pthread.h>
#include <errno.h>

#define REPORT_ERRNO(en, msg) if(en != 0){errno = en; perror(msg);}

struct kinetic_thread_arg {
    char ip[16];
    struct kinetic_put_arg* opArgs;
    int opCount;
};

typedef struct {
    size_t opsInProgress;
    size_t currentChunk;
    size_t maxOverlappedChunks;
    int fd;
    ByteBuffer keyPrefix;
    uint8_t keyPrefixBuffer[KINETIC_DEFAULT_KEY_LEN];
    pthread_mutex_t transferMutex;
    pthread_mutex_t completeMutex;
    pthread_cond_t completeCond;
    KineticStatus status;
    KineticSessionHandle sessionHandle;
} FileTransferProgress;

typedef struct {
    KineticEntry entry;
    uint8_t key[KINETIC_DEFAULT_KEY_LEN];
    uint8_t value[KINETIC_OBJ_SIZE];
    uint8_t tag[KINETIC_DEFAULT_KEY_LEN];
    FileTransferProgress* currentTransfer;
} AsyncWriteClosureData;

typedef struct {
    KineticSessionHandle handle;
    const char* filename;
    const size_t maxOverlappedChunks;
    uint64_t keyPrefix;
    pthread_t thread;
    KineticStatus status;
} StoreFileOperation;

void* store_file_thread(void* storeArgs);
FileTransferProgress* start_file_transfer(KineticSessionHandle handle,
    char const * const filename, uint64_t prefix, uint32_t maxOverlappedChunks);
KineticStatus wait_for_transfer_complete(FileTransferProgress* const transfer);

static int put_chunk_of_file(FileTransferProgress* transfer);
static void put_chunk_of_file_finished(KineticCompletionData* kinetic_data, void* client_data);

int main(int argc, char** argv)
{
    (void)argc;
    (void)argv;
    fprintf(stderr, "\nEXAMPLE NOT YET IMPLEMENTED!\n");
    return 0;

    // Initialize kinetic-c and configure session
    const char HmacKeyString[] = "asdfasdf";
    const KineticSession sessionConfig = {
        .host = "localhost",
        .port = KINETIC_PORT,
        .clusterVersion = 0,
        .identity = 1,
        .hmacKey = ByteArray_CreateWithCString(HmacKeyString),
    };
    KineticClient_Init("stdout", 0);

    // Establish connection
    KineticSessionHandle handle;
    KineticStatus status = KineticClient_Connect(&sessionConfig, &handle);
    if (status != KINETIC_STATUS_SUCCESS) {
        fprintf(stderr, "Failed connecting to the Kinetic device w/status: %s\n",
            Kinetic_GetStatusDescription(status));
        return -1;
    }

    // Create a unique/common key prefix
    struct timeval now;
    gettimeofday(&now, NULL);
    uint64_t prefix = (uint64_t)now.tv_sec << sizeof(8);

    // Store the file(s) and wait for completion
    bool success = true;
    StoreFileOperation ops[] = {
        {
            .handle = handle,
            .filename = "./test/support/data/file_a.png",
            .keyPrefix = prefix,
            .maxOverlappedChunks = 4,
        },
        {
            .handle = handle,
            .filename = "./test/support/data/file_b.png",
            .keyPrefix = prefix,
            .maxOverlappedChunks = 4,
        },
        {
            .handle = handle,
            .filename = "./test/support/data/file_c.png",
            .keyPrefix = prefix,
            .maxOverlappedChunks = 4,
        },
    };
    const int numFiles = sizeof(ops) / sizeof(StoreFileOperation);
    for (int i = 0; i < numFiles; i++) {
        printf("Storing '%s' to disk...\n", ops[i].filename);
        int pthreadStatus = pthread_create(&ops[i].thread, NULL, store_file_thread, &ops[i]);
        if (pthreadStatus != 0) {
            REPORT_ERRNO(pthreadStatus, "pthread_create");
            fprintf(stderr, "Failed creating store thread for '%s'!\n", ops[i].filename);
            success = false;
        }
    }
    for (int i = 0; i < numFiles; i++) {
        int pthreadStatus = pthread_join(ops[i].thread, NULL);
        if (pthreadStatus == 0) {
            printf("File '%s' stored successfully!\n", ops[i].filename);
        }
        else {
            REPORT_ERRNO(pthreadStatus, "pthread_join");
            fprintf(stderr, "Failed storing '%s' to disk! status: %s\n",
                ops[i].filename, Kinetic_GetStatusDescription(ops[i].status));
            success = false;
        }
    }
    printf("Complete!\n");
    
    // Shutdown client connection and cleanup
    KineticClient_Disconnect(&handle);
    KineticClient_Shutdown();

    return success ? 0 : -1;
}

void* store_file_thread(void* storeArgs)
{
    // Kick off the chained write/PUT operations and wait for completion
    StoreFileOperation* op = storeArgs;
    FileTransferProgress* transfer =
        start_file_transfer(op->handle, op->filename, op->keyPrefix, op->maxOverlappedChunks);
    op->status = wait_for_transfer_complete(transfer);
    if (op->status != KINETIC_STATUS_SUCCESS) {
        fprintf(stderr, "Transfer failed w/status: %s\n", Kinetic_GetStatusDescription(op->status));
    }
    return (void*)storeArgs;
}

FileTransferProgress * start_file_transfer(KineticSessionHandle handle,
    char const * const filename, uint64_t prefix, uint32_t maxOverlappedChunks)
{
    FileTransferProgress * transferState = malloc(sizeof(FileTransferProgress));
    *transferState = (FileTransferProgress) {
        .sessionHandle = handle,
        .maxOverlappedChunks = maxOverlappedChunks,
        .keyPrefix = ByteBuffer_CreateAndAppend(transferState->keyPrefixBuffer,
            sizeof(transferState->keyPrefixBuffer), &prefix, sizeof(prefix)),
        .fd = open(filename, O_RDONLY),
    };
    pthread_mutex_init(&transferState->transferMutex, NULL); 
    pthread_mutex_init(&transferState->completeMutex, NULL); 
    pthread_cond_init(&transferState->completeCond, NULL);
        
    // Start max overlapped PUT operations
    for (size_t i = 0; i < transferState->maxOverlappedChunks; i++) {
        put_chunk_of_file(transferState);
    }
    return transferState;
}

KineticStatus wait_for_transfer_complete(FileTransferProgress* const transfer)
{
    pthread_mutex_lock(&transfer->completeMutex);
    pthread_cond_wait(&transfer->completeCond, &transfer->completeMutex);
    pthread_mutex_unlock(&transfer->completeMutex);

    KineticStatus status = transfer->status;

    pthread_mutex_destroy(&transfer->completeMutex);
    pthread_cond_destroy(&transfer->completeCond);

    close(transfer->fd);
    
    pthread_mutex_destroy(&transfer->transferMutex);
    free(transfer);

    return status;
}

int put_chunk_of_file(FileTransferProgress* transfer)
{
    AsyncWriteClosureData* closureData = calloc(1, sizeof(AsyncWriteClosureData));

    pthread_mutex_lock(&transfer->transferMutex);

    transfer->opsInProgress++;
    closureData->currentTransfer = transfer;

    int bytesRead = read(transfer->fd, closureData->value, sizeof(closureData->value));
    if (bytesRead > 0) {
        transfer->currentChunk++;
        closureData->entry = (KineticEntry){
            .key = ByteBuffer_CreateAndAppend(closureData->key, sizeof(closureData->key),
                transfer->keyPrefix.array.data, transfer->keyPrefix.bytesUsed),
            .tag = ByteBuffer_CreateAndAppendFormattedCString(closureData->tag, sizeof(closureData->tag),
                "some_value_tag..._%04d", transfer->currentChunk),
            .algorithm = KINETIC_ALGORITHM_SHA1,
            .value = ByteBuffer_Create(closureData->value, sizeof(closureData->value), (size_t)bytesRead),
            .synchronization = KINETIC_SYNCHRONIZATION_WRITETHROUGH,
        };
        KineticStatus status = KineticClient_Put(transfer->sessionHandle,
            &closureData->entry,
            &(KineticCompletionClosure) {
                .callback = put_chunk_of_file_finished,
                .clientData = closureData,
            });
        if (status != KINETIC_STATUS_SUCCESS) {
            transfer->opsInProgress--;
            free(closureData);
            fprintf(stderr, "Failed writing chunk! PUT request reported status: %s\n",
                Kinetic_GetStatusDescription(status));
        }
    }
    else if (bytesRead == 0) { // EOF reached
        transfer->opsInProgress--;
        free(closureData);
    }
    else {
        transfer->opsInProgress--;
        free(closureData);
        fprintf(stderr, "Failed reading data from file!\n");
        REPORT_ERRNO(bytesRead, "read");
    }

    pthread_mutex_unlock(&transfer->transferMutex);
    
    return bytesRead;
}

void put_chunk_of_file_finished(KineticCompletionData* kinetic_data, void* clientData)
{
    AsyncWriteClosureData* closureData = clientData;
    FileTransferProgress* transfer = closureData->currentTransfer;
    free(closureData);

    pthread_mutex_lock(&transfer->transferMutex);
    transfer->opsInProgress--;
    pthread_mutex_unlock(&transfer->transferMutex);

    if (kinetic_data->status == KINETIC_STATUS_SUCCESS) {
        int bytesPut = put_chunk_of_file(transfer);
        if (bytesPut <= 0 && transfer->opsInProgress == 0) {
            if (transfer->status == KINETIC_STATUS_NOT_ATTEMPTED) {
                transfer->status = KINETIC_STATUS_SUCCESS;
            }
            pthread_cond_signal(&transfer->completeCond);
        }
    }
    else {
        transfer->status = kinetic_data->status;
        // only signal when finished
        // keep track of outstanding operations
        // if there is no more data to read (or error), and no outstanding operations,
        // then signal
        pthread_cond_signal(&transfer->completeCond);
        pthread_mutex_destroy(&transfer->transferMutex);
        pthread_mutex_destroy(&transfer->completeMutex);
        fprintf(stderr, "Failed writing chunk! PUT response reported status: %s\n",
            Kinetic_GetStatusDescription(kinetic_data->status));
    }
}
+530409 −0

File added.

Preview size limit exceeded, changes collapsed.

+530409 −0

File added.

Preview size limit exceeded, changes collapsed.

+530409 −0

File added.

Preview size limit exceeded, changes collapsed.

Loading