Commit 44b93dc2 authored by Job Vranish's avatar Job Vranish
Browse files

cleaned up async io test

parent cad0f03c
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -23,7 +23,7 @@ int main(int argc, char** argv)
    if (client == NULL) { return 1; }
    const char HmacKeyString[] = "asdfasdf";
    KineticSession session = {.config = {
        .host = "localhost",
        .host = SYSTEM_TEST_HOST,
        .port = KINETIC_PORT,
        .clusterVersion = 0,
        .identity = 1,
+1 −1
Original line number Diff line number Diff line
@@ -22,7 +22,7 @@ int main(int argc, char** argv)
    if (client == NULL) { return 1; }
    const char HmacKeyString[] = "asdfasdf";
    KineticSession session = {.config = {
        .host = "localhost",
        .host = SYSTEM_TEST_HOST,
        .port = KINETIC_PORT,
        .clusterVersion = 0,
        .identity = 1,
+7 −1
Original line number Diff line number Diff line
#include "kinetic_semaphore.h"
#include <pthread.h>
#include <stdlib.h>
#include <stdbool.h>

struct _KineticSemaphore
{
    pthread_mutex_t mutex;
    pthread_cond_t complete;
    bool signaled;
};

KineticSemaphore * KineticSemaphore_Create(void)
@@ -15,6 +17,7 @@ KineticSemaphore * KineticSemaphore_Create(void)
    {
        pthread_mutex_init(&sem->mutex, NULL);
        pthread_cond_init(&sem->complete, NULL);
        sem->signaled = false;
    }
    return sem;
}
@@ -32,12 +35,15 @@ void KineticSemaphore_Unlock(KineticSemaphore * sem)
void KineticSemaphore_Signal(KineticSemaphore * sem)
{
    pthread_cond_signal(&sem->complete);
    sem->signaled = true;
}

void KineticSemaphore_WaitForSignalAndDestroy(KineticSemaphore * sem)
{
    pthread_mutex_lock(&sem->mutex);
    if (!sem->signaled) {
        pthread_cond_wait(&sem->complete, &sem->mutex);
    }
    pthread_mutex_unlock(&sem->mutex); 

    pthread_mutex_destroy(&sem->mutex);
+77 −177
Original line number Diff line number Diff line
@@ -19,56 +19,31 @@
*/
#include "system_test_fixture.h"
#include "kinetic_client.h"
#include "kinetic_semaphore.h"
#include <stdlib.h>
#include <sys/file.h>
#include <sys/time.h>
#include <errno.h>

STATIC const int TestDataSize = 100 * (1024*1024);

struct kinetic_thread_arg {
    char ip[16];
    struct kinetic_put_arg* opArgs;
    int opCount;
};
#define NUM_PUTS (500)

typedef struct {
    size_t opsInProgress;
    size_t currentChunk;
    size_t maxOverlappedChunks;
    FILE* fp;
    uint64_t keyPrefix;
    pthread_mutex_t transferMutex;
    pthread_mutex_t completeMutex;
    pthread_cond_t completeCond;
    KineticSemaphore * sem;
    KineticStatus status;
    KineticSession* session;
    size_t bytesWritten;
    struct timeval startTime;
} FileTransferProgress;

typedef struct {
    KineticEntry entry;
    uint8_t key[KINETIC_DEFAULT_KEY_LEN];
    uint8_t value[KINETIC_OBJ_SIZE];
    uint8_t tag[KINETIC_DEFAULT_KEY_LEN];
    FileTransferProgress* currentTransfer;
} AsyncWriteClosureData;
} PutStatus;

FileTransferProgress * start_file_transfer(KineticSession * const session,
    char const * const filename, uint64_t keyPrefix, uint32_t maxOverlappedChunks);
KineticStatus wait_for_put_finish(FileTransferProgress* const transfer);

static int put_chunk_of_file(FileTransferProgress* transfer);
static void put_chunk_of_file_finished(KineticCompletionData* kinetic_data, void* client_data);
static void put_finished(KineticCompletionData* kinetic_data, void* clientData);

void test_kinetic_client_should_store_a_binary_object_split_across_entries_via_ovelapped_asynchronous_IO_operations(void)
{
    ByteBuffer test_data = ByteBuffer_Malloc(KINETIC_OBJ_SIZE);
    ByteBuffer_AppendDummyData(&test_data, test_data.array.len);

    // Initialize kinetic-c and configure sessions
    const char HmacKeyString[] = "asdfasdf";
    KineticSession session = {
        .config = (KineticSessionConfig) {
            .host = "localhost",
            .host = SYSTEM_TEST_HOST,
            .port = KINETIC_PORT,
            .clusterVersion = 0,
            .identity = 1,
@@ -85,164 +60,74 @@ void test_kinetic_client_should_store_a_binary_object_split_across_entries_via_o
        TEST_FAIL();
    }

    // Create a unique/common key prefix
    struct timeval now;
    gettimeofday(&now, NULL);
    uint64_t prefix = (uint64_t)now.tv_sec << sizeof(8);
    uint8_t tag_data[] = {0x00, 0x01, 0x02, 0x03};
    ByteBuffer tag = ByteBuffer_Create(tag_data, sizeof(tag_data), sizeof(tag_data));

    // Kick off the chained write/PUT operations and wait for completion
    const char* dataFile = "test/support/data/test.data";
    FileTransferProgress* transfer = start_file_transfer(&session, dataFile, prefix, 4);
    if (transfer != NULL) {
        printf("Waiting for transfer to complete...\n");
        status = wait_for_put_finish(transfer);
        if (status != KINETIC_STATUS_SUCCESS) {
            fprintf(stderr, "Transfer failed w/status: %s\n",
                Kinetic_GetStatusDescription(status));
            TEST_FAIL();
        }
        printf("Transfer completed successfully!\n");
    }
    else {
        printf("Failed starting file transfer!\n");
    }
    PutStatus put_statuses[NUM_PUTS];
    for (size_t i = 0; i < NUM_PUTS; i++) {
        put_statuses[i] = (PutStatus){
            .sem = KineticSemaphore_Create(),
            .status = KINETIC_STATUS_INVALID,
        };
    };

    // Shutdown client connection and cleanup
    KineticClient_DestroyConnection(&session);
    KineticClient_Shutdown(client);
}
    struct timeval start_time;
    gettimeofday(&start_time, NULL);

    uint32_t keys[NUM_PUTS];
    KineticEntry entries[NUM_PUTS];

static int put_chunk_of_file(FileTransferProgress* transfer)
{
    AsyncWriteClosureData* closureData = calloc(1, sizeof(AsyncWriteClosureData));
    transfer->opsInProgress++;
    closureData->currentTransfer = transfer;
    for (uint32_t put = 0; put < NUM_PUTS; put++) {
        keys[put] = put;
        ByteBuffer key = ByteBuffer_Create(&keys[put], sizeof(keys[put]), sizeof(keys[put]));

        KineticSynchronization sync = (put == NUM_PUTS - 1)
            ? KINETIC_SYNCHRONIZATION_FLUSH
            : KINETIC_SYNCHRONIZATION_WRITEBACK;

    size_t bytesRead = fread(closureData->value, 1, sizeof(closureData->value), transfer->fp);
    bool eofReached = feof(transfer->fp) != 0;
    if (bytesRead > 0) {
        transfer->currentChunk++;
        closureData->entry = (KineticEntry){
            .key = ByteBuffer_CreateAndAppend(closureData->key, sizeof(closureData->key),
                &transfer->keyPrefix, sizeof(transfer->keyPrefix)),
            .tag = ByteBuffer_CreateAndAppendFormattedCString(closureData->tag, sizeof(closureData->tag),
                "some_value_tag..._%04d", transfer->currentChunk),
        entries[put] = (KineticEntry) {
            .key = key,
            .tag = tag,
            .algorithm = KINETIC_ALGORITHM_SHA1,
            .value = ByteBuffer_Create(closureData->value, sizeof(closureData->value), (size_t)bytesRead),
            .synchronization = KINETIC_SYNCHRONIZATION_WRITEBACK,
            .value = test_data,
            .synchronization = sync,
        };

        // Make sure last write has FLUSH enabled to ensure all data is written
        if (eofReached) {
            closureData->entry.synchronization = KINETIC_SYNCHRONIZATION_FLUSH;
        KineticStatus status = KineticClient_Put(
            &session,
            &entries[put],
            &(KineticCompletionClosure) {
                .callback = put_finished,
                .clientData = &put_statuses[put],
            }
        );

        KineticStatus status = KineticClient_Put(transfer->session,
            &closureData->entry,
            &(KineticCompletionClosure) {
                .callback = put_chunk_of_file_finished,
                .clientData = closureData,
            });
        if (status != KINETIC_STATUS_SUCCESS) {
            transfer->opsInProgress--;
            free(closureData);
            fprintf(stderr, "Failed writing chunk! PUT request reported status: %s\n",
                Kinetic_GetStatusDescription(status));
        }
        else {
            transfer->bytesWritten += bytesRead;
        }
    }
    else if (bytesRead == 0) {
        if (eofReached) {
            transfer->opsInProgress--;
        }
        else {
            transfer->opsInProgress--;
            fprintf(stderr, "Failed reading data from file! error: %s\n", strerror(errno));
        }
        free(closureData);
            fprintf(stderr, "PUT failed w/status: %s\n", Kinetic_GetStatusDescription(status));
            TEST_FAIL();
        }
    
    return bytesRead;
    }

static void put_chunk_of_file_finished(KineticCompletionData* kinetic_data, void* clientData)
{
    AsyncWriteClosureData* closureData = clientData;
    FileTransferProgress* currentTransfer = closureData->currentTransfer;
    free(closureData);
    currentTransfer->opsInProgress--;
    printf("Waiting for put finish\n");

    if (kinetic_data->status == KINETIC_STATUS_SUCCESS) {
        int bytesPut = put_chunk_of_file(currentTransfer);
        if (bytesPut <= 0 && currentTransfer->opsInProgress == 0) {
            if (currentTransfer->status == KINETIC_STATUS_NOT_ATTEMPTED) {
                currentTransfer->status = KINETIC_STATUS_SUCCESS;
            }
            pthread_cond_signal(&currentTransfer->completeCond);
        }
    }
    else {
        currentTransfer->status = kinetic_data->status;
        // only signal when finished
        // keep track of outstanding operations
        // if there is no more data to read (or error), and no outstanding operations,
        // then signal
        pthread_cond_signal(&currentTransfer->completeCond);
        fprintf(stderr, "Failed writing chunk! PUT response reported status: %s\n",
            Kinetic_GetStatusDescription(kinetic_data->status));
    }
}

FileTransferProgress * start_file_transfer(KineticSession * const session,
    char const * const filename, uint64_t keyPrefix, uint32_t maxOverlappedChunks)
    for (size_t i = 0; i < NUM_PUTS; i++)
    {
    FILE* fp = fopen(filename, "r");
    if (fp == NULL) {
        fprintf(stderr, "Failed opening data file '%s'! error: %s\n", filename, strerror(errno));
        return NULL;
    }

    FileTransferProgress * transferState = malloc(sizeof(FileTransferProgress));
    *transferState = (FileTransferProgress) {
        .session = session,
        .maxOverlappedChunks = maxOverlappedChunks,
        .keyPrefix = keyPrefix,
        .fp = fp,
    };

    pthread_mutex_init(&transferState->transferMutex, NULL);
    pthread_mutex_init(&transferState->completeMutex, NULL);
    pthread_cond_init(&transferState->completeCond, NULL);
    gettimeofday(&transferState->startTime, NULL);
        
    // Start max overlapped PUT operations
    for (size_t i = 0; i < transferState->maxOverlappedChunks; i++) {
        put_chunk_of_file(transferState);
        KineticSemaphore_WaitForSignalAndDestroy(put_statuses[i].sem);
        if (put_statuses[i].status != KINETIC_STATUS_SUCCESS) {
            fprintf(stderr, "PUT failed w/status: %s\n", Kinetic_GetStatusDescription(put_statuses[i].status));
            TEST_FAIL();
        }
    return transferState;
    }

KineticStatus wait_for_put_finish(FileTransferProgress* const transfer)
{
    pthread_mutex_lock(&transfer->completeMutex);
    pthread_cond_wait(&transfer->completeCond, &transfer->completeMutex);
    pthread_mutex_unlock(&transfer->completeMutex);
    
    fclose(transfer->fp);
    transfer->fp = NULL;
    struct timeval stopTime;
    gettimeofday(&stopTime, NULL);

    pthread_mutex_destroy(&transfer->completeMutex);
    pthread_cond_destroy(&transfer->completeCond);
    struct timeval stop_time;
    gettimeofday(&stop_time, NULL);

    int64_t elapsed_us = ((stopTime.tv_sec - transfer->startTime.tv_sec) * 1000000)
        + (stopTime.tv_usec - transfer->startTime.tv_usec);
    size_t bytes_written = NUM_PUTS * test_data.array.len;
    int64_t elapsed_us = ((stop_time.tv_sec - start_time.tv_sec) * 1000000)
        + (stop_time.tv_usec - start_time.tv_usec);
    float elapsed_ms = elapsed_us / 1000.0f;
    float bandwidth = (transfer->bytesWritten * 1000.0f) / (elapsed_ms * 1024 * 1024);
    float bandwidth = (bytes_written * 1000.0f) / (elapsed_ms * 1024 * 1024);
    fflush(stdout);
    printf("\n"
        "Write/Put Performance:\n"
@@ -250,12 +135,27 @@ KineticStatus wait_for_put_finish(FileTransferProgress* const transfer)
        "wrote:      %.1f kB\n"
        "duration:   %.3f seconds\n"
        "throughput: %.2f MB/sec\n\n",
        transfer->bytesWritten / 1024.0f,
        bytes_written / 1024.0f,
        elapsed_ms / 1000.0f,
        bandwidth);

    KineticStatus status = transfer->status;
    free(transfer);

    return status;
    printf("Transfer completed successfully!\n");

    ByteBuffer_Free(test_data);

    // Shutdown client connection and cleanup
    KineticClient_DestroyConnection(&session);
    KineticClient_Shutdown(client);
}

static void put_finished(KineticCompletionData* kinetic_data, void* clientData)
{
    PutStatus * put_status = clientData;
    KineticSemaphore_Lock(put_status->sem);
    // Save PUT result status
    put_status->status = kinetic_data->status;
    // Signal that we're done
    KineticSemaphore_Signal(put_status->sem);
    KineticSemaphore_Unlock(put_status->sem);
}