Commit 31adc65c authored by Job Vranish's avatar Job Vranish
Browse files

Merge branch 'develop' of github.com:Seagate/kinetic-c into develop

parents 7f492ad2 303e2078
Loading
Loading
Loading
Loading
+65 −25
Original line number Diff line number Diff line
@@ -21,11 +21,9 @@
#include "kinetic_client.h"
#include <stdlib.h>
#include <sys/file.h>
#include <sys/time.h>
#include <errno.h>


#define REPORT_ERRNO(en, msg) if(en != 0){errno = en; perror(msg);}

struct kinetic_thread_arg {
    char ip[16];
    struct kinetic_put_arg* opArgs;
@@ -36,13 +34,15 @@ typedef struct {
    size_t opsInProgress;
    size_t currentChunk;
    size_t maxOverlappedChunks;
    int fd;
    FILE* fp;
    uint64_t keyPrefix;
    pthread_mutex_t transferMutex;
    pthread_mutex_t completeMutex;
    pthread_cond_t completeCond;
    KineticStatus status;
    KineticSession* session;
    size_t bytesWritten;
    struct timeval startTime;
} FileTransferProgress;

typedef struct {
@@ -91,6 +91,7 @@ void test_kinetic_client_should_store_a_binary_object_split_across_entries_via_o
    // Kick off the chained write/PUT operations and wait for completion
    const char* dataFile = "test/support/data/test.data";
    FileTransferProgress* transfer = start_file_transfer(&session, dataFile, prefix, 4);
    if (transfer != NULL) {
        printf("Waiting for transfer to complete...\n");
        status = wait_for_put_finish(transfer);
        if (status != KINETIC_STATUS_SUCCESS) {
@@ -99,6 +100,10 @@ void test_kinetic_client_should_store_a_binary_object_split_across_entries_via_o
            TEST_FAIL();
        }
        printf("Transfer completed successfully!\n");
    }
    else {
        printf("Failed starting file transfer!\n");
    }

    // Shutdown client connection and cleanup
    KineticClient_DestroyConnection(&session);
@@ -112,7 +117,8 @@ static int put_chunk_of_file(FileTransferProgress* transfer)
    transfer->opsInProgress++;
    closureData->currentTransfer = transfer;

    int bytesRead = read(transfer->fd, closureData->value, sizeof(closureData->value));
    size_t bytesRead = fread(closureData->value, 1, sizeof(closureData->value), transfer->fp);
    bool eofReached = feof(transfer->fp) != 0;
    if (bytesRead > 0) {
        transfer->currentChunk++;
        closureData->entry = (KineticEntry){
@@ -122,8 +128,14 @@ static int put_chunk_of_file(FileTransferProgress* transfer)
                "some_value_tag..._%04d", transfer->currentChunk),
            .algorithm = KINETIC_ALGORITHM_SHA1,
            .value = ByteBuffer_Create(closureData->value, sizeof(closureData->value), (size_t)bytesRead),
            .synchronization = KINETIC_SYNCHRONIZATION_WRITETHROUGH,
            .synchronization = KINETIC_SYNCHRONIZATION_WRITEBACK,
        };

        // Make sure last write has FLUSH enabled to ensure all data is written
        if (eofReached) {
            closureData->entry.synchronization = KINETIC_SYNCHRONIZATION_FLUSH;
        }

        KineticStatus status = KineticClient_Put(transfer->session,
            &closureData->entry,
            &(KineticCompletionClosure) {
@@ -136,16 +148,19 @@ static int put_chunk_of_file(FileTransferProgress* transfer)
            fprintf(stderr, "Failed writing chunk! PUT request reported status: %s\n",
                Kinetic_GetStatusDescription(status));
        }
        else {
            transfer->bytesWritten += bytesRead;
        }
    else if (bytesRead == 0) { // EOF reached
    }
    else if (bytesRead == 0) {
        if (eofReached) {
            transfer->opsInProgress--;
        free(closureData);
        }
        else {
            transfer->opsInProgress--;
            fprintf(stderr, "Failed reading data from file! error: %s\n", strerror(errno));
        }
        free(closureData);
        fprintf(stderr, "Failed reading data from file!\n");
        REPORT_ERRNO(bytesRead, "read");
    }
    
    return bytesRead;
@@ -182,16 +197,24 @@ static void put_chunk_of_file_finished(KineticCompletionData* kinetic_data, void
FileTransferProgress * start_file_transfer(KineticSession * const session,
    char const * const filename, uint64_t keyPrefix, uint32_t maxOverlappedChunks)
{
    FILE* fp = fopen(filename, "r");
    if (fp == NULL) {
        fprintf(stderr, "Failed opening data file '%s'! error: %s\n", filename, strerror(errno));
        return NULL;
    }

    FileTransferProgress * transferState = malloc(sizeof(FileTransferProgress));
    *transferState = (FileTransferProgress) {
        .session = session,
        .maxOverlappedChunks = maxOverlappedChunks,
        .keyPrefix = keyPrefix,
        .fd = open(filename, O_RDONLY),
        .fp = fp,
    };

    pthread_mutex_init(&transferState->transferMutex, NULL);
    pthread_mutex_init(&transferState->completeMutex, NULL);
    pthread_cond_init(&transferState->completeCond, NULL);
    gettimeofday(&transferState->startTime, NULL);
        
    // Start max overlapped PUT operations
    for (size_t i = 0; i < transferState->maxOverlappedChunks; i++) {
@@ -206,13 +229,30 @@ KineticStatus wait_for_put_finish(FileTransferProgress* const transfer)
    pthread_cond_wait(&transfer->completeCond, &transfer->completeMutex);
    pthread_mutex_unlock(&transfer->completeMutex);
    
    KineticStatus status = transfer->status;
    fclose(transfer->fp);
    transfer->fp = NULL;
    struct timeval stopTime;
    gettimeofday(&stopTime, NULL);

    pthread_mutex_destroy(&transfer->completeMutex);
    pthread_cond_destroy(&transfer->completeCond);

    close(transfer->fd);
    int64_t elapsed_us = ((stopTime.tv_sec - transfer->startTime.tv_sec) * 1000000)
        + (stopTime.tv_usec - transfer->startTime.tv_usec);
    float elapsed_ms = elapsed_us / 1000.0f;
    float bandwidth = (transfer->bytesWritten * 1000.0f) / (elapsed_ms * 1024 * 1024);
    fflush(stdout);
    printf("\n"
        "Write/Put Performance:\n"
        "----------------------------------------\n"
        "wrote:      %.1f kB\n"
        "duration:   %.3f seconds\n"
        "throughput: %.2f MB/sec\n\n",
        transfer->bytesWritten / 1024.0f,
        elapsed_ms / 1000.0f,
        bandwidth);

    KineticStatus status = transfer->status;
    free(transfer);

    return status;
+6 −1
Original line number Diff line number Diff line
@@ -209,7 +209,12 @@ static void* kinetic_put(void* kinetic_arg)
        );

        // Set operation-specific attributes
        if (ByteBuffer_BytesRemaining(arg->data) == 0) {
            entry->synchronization = KINETIC_SYNCHRONIZATION_FLUSH;
        }
        else {
            entry->synchronization = KINETIC_SYNCHRONIZATION_WRITEBACK;
        }

        // Store the data slice
        LOGF1("  *** Storing a data slice (%zu bytes)", entry->value.bytesUsed);