Commit 0c95ab63 authored by Greg Williams's avatar Greg Williams
Browse files

Updated PUT tests and examples to make sure last PUT is a FLUSH so that all...

Updated PUT tests and examples to make sure last PUT is a FLUSH so that all data gets written to disk
parent 4302caa3
Loading
Loading
Loading
Loading
+6 −3
Original line number Diff line number Diff line
@@ -59,11 +59,14 @@ void store_data(write_args* args)
        ByteBuffer_Reset(&entry->value);
        ByteBuffer_AppendArray(
            &entry->value,
            ByteBuffer_Consume(
                &args->data,
                MIN(ByteBuffer_BytesRemaining(args->data), KINETIC_OBJ_SIZE))
            ByteBuffer_Consume(&args->data, KINETIC_OBJ_SIZE)
        );

        // Ensure last PUT triggers flush to disk for completion
        if (ByteBuffer_BytesRemaining(args->data) == 0) {
            entry->synchronization = KINETIC_SYNCHRONIZATION_FLUSH;
        }

        // Store the object
        KineticStatus status = KineticClient_Put(args->session, entry, NULL);
        if (status != KINETIC_STATUS_SUCCESS) {
+5 −0
Original line number Diff line number Diff line
@@ -65,6 +65,11 @@ void* store_data(void* args)
        // Prepare the next chunk of data to store
        ByteBuffer_AppendArray(&entry->value, ByteBuffer_Consume(&thread_args->data, KINETIC_OBJ_SIZE));
        
        // Ensure last PUT triggers flush to disk for completion
        if (ByteBuffer_BytesRemaining(thread_args->data) == 0) {
            entry->synchronization = KINETIC_SYNCHRONIZATION_FLUSH;
        }

        // Store the data slice
        KineticStatus status = KineticClient_Put(&thread_args->session, entry, NULL);
        if (status != KINETIC_STATUS_SUCCESS) {
+10 −1
Original line number Diff line number Diff line
@@ -103,6 +103,8 @@ static int put_chunk_of_file(FileTransferProgress* transfer)
    int bytesRead = read(transfer->fd, closureData->value, sizeof(closureData->value));
    if (bytesRead > 0) {
        transfer->currentChunk++;

        // Configure the entry to store
        closureData->entry = (KineticEntry){
            .key = ByteBuffer_CreateAndAppend(closureData->key, sizeof(closureData->key),
                &transfer->keyPrefix, sizeof(transfer->keyPrefix)),
@@ -110,8 +112,15 @@ static int put_chunk_of_file(FileTransferProgress* transfer)
                "some_value_tag..._%04d", transfer->currentChunk),
            .algorithm = KINETIC_ALGORITHM_SHA1,
            .value = ByteBuffer_Create(closureData->value, sizeof(closureData->value), (size_t)bytesRead),
            .synchronization = KINETIC_SYNCHRONIZATION_WRITETHROUGH,
            .synchronization = KINETIC_SYNCHRONIZATION_WRITEBACK,
        };

        // Ensure last PUT triggers flush to disk for completion
        if ((size_t)bytesRead < sizeof(closureData->value)) {
            closureData->entry.synchronization = KINETIC_SYNCHRONIZATION_FLUSH;
        }

        // Store the current entry
        KineticStatus status = KineticClient_Put(transfer->session,
            &closureData->entry,
            &(KineticCompletionClosure) {
+6 −0
Original line number Diff line number Diff line
@@ -235,6 +235,12 @@ int put_chunk_of_file(FileTransferProgress* transfer)
            .value = ByteBuffer_Create(closureData->value, sizeof(closureData->value), (size_t)bytesRead),
            .synchronization = KINETIC_SYNCHRONIZATION_WRITETHROUGH,
        };

        // Ensure last PUT triggers flush to disk for completion
        if ((size_t)bytesRead < sizeof(closureData->value)) {
            closureData->entry.synchronization = KINETIC_SYNCHRONIZATION_FLUSH;
        }

        KineticStatus status = KineticClient_Put(transfer->session, &closureData->entry,
            &(KineticCompletionClosure) {
                .callback = put_chunk_of_file_finished,
+51 −26
Original line number Diff line number Diff line
@@ -26,6 +26,7 @@
#include <string.h>
#include <stdlib.h>
#include <sys/file.h>
#include <sys/time.h>
#include <errno.h>

#include "kinetic_client.h"
@@ -45,25 +46,23 @@
#include "kinetic_socket.h"
#include "kinetic_nbo.h"

#define REPORT_ERRNO(en, msg) if(en != 0){errno = en; perror(msg);}
STATIC const int SourceDataSize = 50 * KINETIC_OBJ_SIZE;

struct kinetic_thread_arg {
    char ip[16];
    struct kinetic_put_arg* opArgs;
    int opCount;
};
#define REPORT_ERRNO(en, msg) if(en != 0){errno = en; perror(msg);}

typedef struct {
    size_t opsInProgress;
    size_t currentChunk;
    size_t maxOverlappedChunks;
    int fd;
    ByteBuffer buffer;
    uint64_t keyPrefix;
    pthread_mutex_t transferMutex;
    pthread_mutex_t completeMutex;
    pthread_cond_t completeCond;
    KineticStatus status;
    KineticSession* session;
    size_t bytesWritten;
    struct timeval startTime;
} FileTransferProgress;

typedef struct {
@@ -75,7 +74,7 @@ typedef struct {
} AsyncWriteClosureData;

FileTransferProgress * start_file_transfer(KineticSession * const session,
    char const * const filename, uint64_t keyPrefix, uint32_t maxOverlappedChunks);
    uint64_t keyPrefix, uint32_t maxOverlappedChunks);
KineticStatus wait_for_put_finish(FileTransferProgress* const transfer);

static int put_chunk_of_file(FileTransferProgress* transfer);
@@ -88,7 +87,7 @@ void test_kinetic_client_should_store_a_binary_object_split_across_entries_via_o
    const char HmacKeyString[] = "asdfasdf";
    KineticSession session = {
        .config = (KineticSessionConfig) {
            .host = "localhost",
            .host = SYSTEM_TEST_HOST,
            .port = KINETIC_PORT,
            .clusterVersion = 0,
            .identity = 1,
@@ -105,14 +104,15 @@ void test_kinetic_client_should_store_a_binary_object_split_across_entries_via_o
        TEST_FAIL();
    }

    sleep(1); // Sleep to allow unsolicited status to be reported in order to get connection ID for requests

    // Create a unique/common key prefix
    struct timeval now;
    gettimeofday(&now, NULL);
    uint64_t prefix = (uint64_t)now.tv_sec << sizeof(8);

    // Kick off the chained write/PUT operations and wait for completion
    const char* dataFile = "test/support/data/test.data";
    FileTransferProgress* transfer = start_file_transfer(&session, dataFile, prefix, 4);
    FileTransferProgress* transfer = start_file_transfer(&session, prefix, 4);
    printf("Waiting for transfer to complete...\n");
    status = wait_for_put_finish(transfer);
    if (status != KINETIC_STATUS_SUCCESS) {
@@ -133,9 +133,9 @@ static int put_chunk_of_file(FileTransferProgress* transfer)
    AsyncWriteClosureData* closureData = calloc(1, sizeof(AsyncWriteClosureData));
    transfer->opsInProgress++;
    closureData->currentTransfer = transfer;
    size_t valueLen = 0;

    int bytesRead = read(transfer->fd, closureData->value, sizeof(closureData->value));
    if (bytesRead > 0) {
    if (ByteBuffer_BytesRemaining(transfer->buffer) > 0) {
        transfer->currentChunk++;
        closureData->entry = (KineticEntry){
            .key = ByteBuffer_CreateAndAppend(closureData->key, sizeof(closureData->key),
@@ -143,9 +143,19 @@ static int put_chunk_of_file(FileTransferProgress* transfer)
            .tag = ByteBuffer_CreateAndAppendFormattedCString(closureData->tag, sizeof(closureData->tag),
                "some_value_tag..._%04d", transfer->currentChunk),
            .algorithm = KINETIC_ALGORITHM_SHA1,
            .value = ByteBuffer_Create(closureData->value, sizeof(closureData->value), (size_t)bytesRead),
            .synchronization = KINETIC_SYNCHRONIZATION_WRITETHROUGH,
            .value = ByteBuffer_CreateAndAppendArray(closureData->value, sizeof(closureData->value),
                ByteBuffer_Consume(&transfer->buffer, KINETIC_OBJ_SIZE)),
            .synchronization = KINETIC_SYNCHRONIZATION_WRITEBACK,
        };

        size_t valueLen = closureData->entry.value.bytesUsed;
        transfer->bytesWritten += valueLen;

        // Make sure last write has FLUSH enabled to ensure all data is written
        if (valueLen < KINETIC_OBJ_SIZE) {
            closureData->entry.synchronization = KINETIC_SYNCHRONIZATION_FLUSH;
        }

        KineticStatus status = KineticClient_Put(transfer->session,
            &closureData->entry,
            &(KineticCompletionClosure) {
@@ -159,18 +169,12 @@ static int put_chunk_of_file(FileTransferProgress* transfer)
                Kinetic_GetStatusDescription(status));
        }
    }
    else if (bytesRead == 0) { // EOF reached
        transfer->opsInProgress--;
        free(closureData);
    }
    else {
    else { // EOF reached
        transfer->opsInProgress--;
        free(closureData);
        fprintf(stderr, "Failed reading data from file!\n");
        REPORT_ERRNO(bytesRead, "read");
    }
    
    return bytesRead;
    return valueLen;
}

static void put_chunk_of_file_finished(KineticCompletionData* kinetic_data, void* clientData)
@@ -202,18 +206,23 @@ static void put_chunk_of_file_finished(KineticCompletionData* kinetic_data, void
}

FileTransferProgress * start_file_transfer(KineticSession * const session,
    char const * const filename, uint64_t keyPrefix, uint32_t maxOverlappedChunks)
    uint64_t keyPrefix, uint32_t maxOverlappedChunks)
{
    FileTransferProgress * transferState = malloc(sizeof(FileTransferProgress));
    uint8_t* testData = malloc(SourceDataSize);
    *transferState = (FileTransferProgress) {
        .session = session,
        .maxOverlappedChunks = maxOverlappedChunks,
        .keyPrefix = keyPrefix,
        .fd = open(filename, O_RDONLY),
        .bytesWritten = 0,
        .buffer = ByteBuffer_Create(testData, SourceDataSize, 0),
    };
    ByteBuffer_AppendDummyData(&transferState->buffer, SourceDataSize);
    ByteBuffer_Reset(&transferState->buffer);
    pthread_mutex_init(&transferState->transferMutex, NULL);
    pthread_mutex_init(&transferState->completeMutex, NULL);
    pthread_cond_init(&transferState->completeCond, NULL);
    gettimeofday(&transferState->startTime, NULL);
        
    // Start max overlapped PUT operations
    for (size_t i = 0; i < transferState->maxOverlappedChunks; i++) {
@@ -228,12 +237,28 @@ KineticStatus wait_for_put_finish(FileTransferProgress* const transfer)
    pthread_cond_wait(&transfer->completeCond, &transfer->completeMutex);
    pthread_mutex_unlock(&transfer->completeMutex);

    struct timeval stopTime;
    gettimeofday(&stopTime, NULL);

    KineticStatus status = transfer->status;

    pthread_mutex_destroy(&transfer->completeMutex);
    pthread_cond_destroy(&transfer->completeCond);

    close(transfer->fd);
    int64_t elapsed_us = ((stopTime.tv_sec - transfer->startTime.tv_sec) * 1000000)
        + (stopTime.tv_usec - transfer->startTime.tv_usec);
    float elapsed_ms = elapsed_us / 1000.0f;
    float bandwidth = (transfer->bytesWritten * 1000.0f) / (elapsed_ms * 1024 * 1024);
    fflush(stdout);
    printf("\n"
        "Write/Put Performance:\n"
        "----------------------------------------\n"
        "wrote:      %.1f kB\n"
        "duration:   %.3f seconds\n"
        "throughput: %.2f MB/sec\n\n",
        transfer->bytesWritten / 1024.0f,
        elapsed_ms / 1000.0f,
        bandwidth);

    free(transfer);

Loading