Commit 6b6515e5 authored by Greg Williams's avatar Greg Williams
Browse files

Cleaned up examples and async system tests. Added create and append api...

Cleaned up examples and async system tests. Added create and append api methods to byte_array module and updated examples to use them for conciseness
parent ec2659d5
Loading
Loading
Loading
Loading
+5 −1
Original line number Diff line number Diff line
@@ -46,9 +46,13 @@ typedef struct {

ByteBuffer ByteBuffer_Create(void* data, size_t max_len, size_t used);
ByteBuffer ByteBuffer_CreateWithArray(ByteArray array);
ByteBuffer ByteBuffer_CreateAndAppend(void* data, size_t max_len, const void* value, size_t value_len);
ByteBuffer ByteBuffer_CreateAndAppendArray(void* data, size_t max_len, const ByteArray value);
ByteBuffer ByteBuffer_CreateAndAppendCString(void* data, size_t max_len, const char* value);
ByteBuffer ByteBuffer_CreateAndAppendFormattedCString(void* data, size_t max_len, const char * format, ...);
void ByteBuffer_Reset(ByteBuffer* buffer);
long ByteBuffer_BytesRemaining(const ByteBuffer buffer);
ByteArray ByteBuffer_Consume(ByteBuffer* buffer, size_t len);
ByteArray ByteBuffer_Consume(ByteBuffer* buffer, size_t max_len);
ByteBuffer* ByteBuffer_Append(ByteBuffer* buffer, const void* data, size_t len);
ByteBuffer* ByteBuffer_AppendArray(ByteBuffer* buffer, const ByteArray array);
ByteBuffer* ByteBuffer_AppendBuffer(ByteBuffer* buffer, const ByteBuffer bufferToAppend);
+18 −41
Original line number Diff line number Diff line
@@ -30,11 +30,7 @@
#include <pthread.h>
#include <errno.h>

#define MAX_ITERATIONS (2)
#define NUM_COPIES (3)
#define BUFSIZE  (128 * KINETIC_OBJ_SIZE)
#define KINETIC_MAX_THREADS (10)
#define MAX_OBJ_SIZE (KINETIC_OBJ_SIZE)
#define NUM_FILES (3)

#define REPORT_ERRNO(en, msg) if(en != 0){errno = en; perror(msg);}

@@ -56,25 +52,18 @@ void* store_data(void* args)
{
    write_args* thread_args = (write_args*)args;
    KineticEntry* entry = &(thread_args->entry);
    int32_t objIndex = 0;

    while (ByteBuffer_BytesRemaining(thread_args->data) > 0) {
    int32_t objIndex;
    for (objIndex = 0; ByteBuffer_BytesRemaining(thread_args->data) > 0; objIndex++) {

        // Configure meta-data
        char keySuffix[8];
        snprintf(keySuffix, sizeof(keySuffix), "%02d", objIndex);
        entry->key.bytesUsed = strlen(thread_args->keyPrefix);
        ByteBuffer_AppendCString(&entry->key, keySuffix);
        entry->synchronization = KINETIC_SYNCHRONIZATION_WRITEBACK;

        // Prepare the next chunk of data to store
        ByteBuffer_Reset(&entry->value);
        ByteBuffer_AppendArray(
            &entry->value,
            ByteBuffer_Consume(&thread_args->data, MIN(ByteBuffer_BytesRemaining(thread_args->data), MAX_OBJ_SIZE))
        );

        // Set operation-specific attributes
        entry->synchronization = KINETIC_SYNCHRONIZATION_WRITEBACK;
        ByteBuffer_AppendArray(&entry->value, ByteBuffer_Consume(&thread_args->data, KINETIC_OBJ_SIZE));

        // Store the data slice
        KineticStatus status = KineticClient_Put(thread_args->sessionHandle, entry, NULL);
@@ -83,12 +72,8 @@ void* store_data(void* args)
                objIndex+1, Kinetic_GetStatusDescription(status));
            return (void*)NULL;
        }

        objIndex++;
    }

    printf("File stored to successfully to Kinetic Device across %d entries!\n", objIndex);

    return (void*)NULL;
}

@@ -112,7 +97,7 @@ int main(int argc, char** argv)
    }

    // Allocate session/thread data
    write_args* writeArgs = calloc(NUM_COPIES, sizeof(write_args));
    write_args* writeArgs = calloc(NUM_FILES, sizeof(write_args));
    if (writeArgs == NULL) {
        fprintf(stderr, "Failed allocating overlapped thread arguments!\n");
    }
@@ -129,8 +114,8 @@ int main(int argc, char** argv)
    };
    KineticClient_Init("stdout", 0);

    // Establish all of the connection first, so their session can all get initialized first
    for (int i = 0; i < NUM_COPIES; i++) {
    // Kick off a thread for each file to store
    for (int i = 0; i < NUM_FILES; i++) {

        // Establish connection
        status = KineticClient_Connect(&sessionConfig, &writeArgs[i].sessionHandle);
@@ -144,31 +129,24 @@ int main(int argc, char** argv)
        // Create a ByteBuffer for consuming chunks of data out of for overlapped PUTs
        writeArgs[i].data = ByteBuffer_Create(buf, dataLen, 0);

        // Configure the KineticEntry
        // Configure common entry attributes
        struct timeval now;
        gettimeofday(&now, NULL);
        snprintf(writeArgs[i].keyPrefix, sizeof(writeArgs[i].keyPrefix), "%010llu_%02d_",
            (unsigned long long)now.tv_sec, i);
        ByteBuffer keyBuf = ByteBuffer_Create(writeArgs[i].key, sizeof(writeArgs[i].key), 0);
        ByteBuffer_AppendCString(&keyBuf, writeArgs[i].keyPrefix);
        ByteBuffer verBuf = ByteBuffer_Create(writeArgs[i].version, sizeof(writeArgs[i].version), 0);
        ByteBuffer_AppendCString(&verBuf, "v1.0");
        ByteBuffer tagBuf = ByteBuffer_Create(writeArgs[i].tag, sizeof(writeArgs[i].tag), 0);
        ByteBuffer_AppendCString(&tagBuf, "some_value_tag...");
        ByteBuffer valBuf = ByteBuffer_Create(writeArgs[i].value, sizeof(writeArgs[i].value), 0);
        writeArgs[i].entry = (KineticEntry) {
            .key = keyBuf,
            // .newVersion = verBuf,
            .tag = tagBuf,
            .key = ByteBuffer_CreateAndAppendCString(
                writeArgs[i].key, sizeof(writeArgs[i].key), writeArgs[i].keyPrefix),
            // .newVersion = ByteBuffer_CreateAndAppendCString(
            //    writeArgs[i].version, sizeof(writeArgs[i].version), "v1.0"),
            .tag = ByteBuffer_CreateAndAppendCString(
                writeArgs[i].tag, sizeof(writeArgs[i].tag), "some_value_tag..."),
            .algorithm = KINETIC_ALGORITHM_SHA1,
            .value = valBuf,
        };
    }

    // Write all of the copies simultaneously (overlapped)
    for (int i = 0; i < NUM_COPIES; i++) {
        printf("  *** Overlapped PUT operations (writing copy %d of %d)"
               " on IP: %s\n", i + 1, NUM_COPIES, sessionConfig.host);
        // Store the entry
        int threadCreateStatus = pthread_create(&writeArgs[i].threadID, NULL, store_data, &writeArgs[i]);
        REPORT_ERRNO(threadCreateStatus, "pthread_create");
        if (threadCreateStatus != 0) {
@@ -177,9 +155,8 @@ int main(int argc, char** argv)
        }
    }

    // Wait for each overlapped PUT operations to complete and cleanup
    printf("  *** Waiting for PUT threads to exit...\n");
    for (int i = 0; i < NUM_COPIES; i++) {
    // Wait for all PUT operations to complete and cleanup
    for (int i = 0; i < NUM_FILES; i++) {
        int joinStatus = pthread_join(writeArgs[i].threadID, NULL);
        if (joinStatus != 0) {
            fprintf(stderr, "pthread join failed!\n");
+50 −5
Original line number Diff line number Diff line
@@ -3,6 +3,7 @@
#include <string.h>
#include <stdio.h>
#include <stdarg.h>
#include <sys/param.h>

ByteArray ByteArray_Create(void* data, size_t len)
{
@@ -43,6 +44,8 @@ void ByteBuffer_Reset(ByteBuffer* buffer)

ByteBuffer ByteBuffer_Create(void* data, size_t max_len, size_t used)
{
    assert(data != NULL);
    assert(max_len > 0);
    return (ByteBuffer) {
        .array = (ByteArray) {.data = (uint8_t*)data, .len = max_len},
        .bytesUsed = used,
@@ -54,19 +57,64 @@ ByteBuffer ByteBuffer_CreateWithArray(ByteArray array)
    return (ByteBuffer) {.array = array, .bytesUsed = 0};
}

ByteBuffer ByteBuffer_CreateAndAppend(void* data, size_t max_len, const void* value, size_t value_len)
{
    ByteBuffer buf = ByteBuffer_Create(data, max_len, 0);
    ByteBuffer_Append(&buf, value, value_len);
    return buf;
}

ByteBuffer ByteBuffer_CreateAndAppendArray(void* data, size_t max_len, const ByteArray value)
{
    ByteBuffer buf = ByteBuffer_Create(data, max_len, 0);
    ByteBuffer_AppendArray(&buf, value);
    return buf;
}

ByteBuffer ByteBuffer_CreateAndAppendCString(void* data, size_t max_len, const char* value)
{
    ByteBuffer buf = ByteBuffer_Create(data, max_len, 0);
    ByteBuffer_AppendCString(&buf, value);
    return buf;
}

ByteBuffer ByteBuffer_CreateAndAppendFormattedCString(void* data, size_t max_len, const char * format, ...)
{
    ByteBuffer buf = ByteBuffer_Create(data, max_len, 0);

    va_list args;
    va_start(args,format);

    uint8_t tmp[256];
    ByteBuffer tmpBuf = ByteBuffer_Create(tmp, sizeof(tmp), 0);

    int formattedSize = vsnprintf((void*)tmpBuf.array.data, tmpBuf.array.len, format, args);
    assert(formattedSize >= 0);
    tmpBuf.bytesUsed = (tmpBuf.array.len <= (size_t)formattedSize) ? formattedSize : tmpBuf.array.len;

    va_end(args);

    ByteBuffer_AppendBuffer(&buf, tmpBuf);

    return buf;
}

long ByteBuffer_BytesRemaining(const ByteBuffer buffer)
{
    assert(buffer.array.data != NULL);
    return ((long)buffer.array.len - (long)buffer.bytesUsed);
}

ByteArray ByteBuffer_Consume(ByteBuffer* buffer, size_t len)
ByteArray ByteBuffer_Consume(ByteBuffer* buffer, size_t max_len)
{
    assert(buffer != NULL);
    assert(buffer->array.data != NULL);
    if (buffer->bytesUsed + len > buffer->array.len) {
    if (buffer->bytesUsed >= buffer->array.len) {
        return BYTE_ARRAY_NONE;
    }
    long remaining = ByteBuffer_BytesRemaining(*buffer);
    assert(remaining >= 0);
    size_t len = MIN(max_len, (size_t)remaining);
    ByteArray slice = {
        .data = &buffer->array.data[buffer->bytesUsed],
        .len = len,
@@ -81,12 +129,10 @@ ByteBuffer* ByteBuffer_Append(ByteBuffer* buffer, const void* data, size_t len)
    assert(buffer->array.data != NULL);
    assert(data != NULL);
    if (len == 0 || ((buffer->bytesUsed + len) > buffer->array.len)) {
        // printf("Invalid parameters for buffer copy!\n");
        return NULL;
    }
    memcpy(&buffer->array.data[buffer->bytesUsed], data, len);
    buffer->bytesUsed += len;
    // printf("Appended data!\n")
    assert(buffer != NULL);
    return buffer;
}
@@ -118,7 +164,6 @@ ByteBuffer* ByteBuffer_AppendBuffer(ByteBuffer* buffer, const ByteBuffer bufferT
    return buffer;
}


ByteBuffer* ByteBuffer_AppendCString(ByteBuffer* buffer, const char* str)
{
    assert(buffer != NULL);
+18 −32
Original line number Diff line number Diff line
@@ -44,10 +44,6 @@
#include "kinetic_socket.h"
#include "kinetic_nbo.h"

#define MAX_ITERATIONS (1)
#define NUM_COPIES (1)
#define KINETIC_MAX_THREADS (10)

#define REPORT_ERRNO(en, msg) if(en != 0){errno = en; perror(msg);}

STATIC KineticSessionHandle* kinetic_client;
@@ -80,8 +76,11 @@ typedef struct {
    FileTransferProgress* currentTransfer;
} AsyncWriteClosureData;


void put_chunk_of_file_finished(KineticCompletionData* kinetic_data, void* client_data);
void update_with_status(FileTransferProgress* transfer, KineticStatus const status);
FileTransferProgress * start_file_transfer(KineticSessionHandle handle,
    char const * const filename, char const * const keyPrefix);
KineticStatus wait_for_put_finish(FileTransferProgress* const transfer);

int put_chunk_of_file(FileTransferProgress* transfer)
{
@@ -90,20 +89,17 @@ int put_chunk_of_file(FileTransferProgress* transfer)
    closureData->currentTransfer = transfer;

    size_t bytesRead = read(transfer->fd, closureData->value, sizeof(closureData->value));
    LOGF0("[chunk len=%zu]", bytesRead);

    if (bytesRead > 0) {
        transfer->currentChunk++;
        closureData->entry = (KineticEntry){
            .key = ByteBuffer_Create(closureData->key, sizeof(closureData->key), 0),
            .tag = ByteBuffer_Create(closureData->tag, sizeof(closureData->tag), 0),
            .key = ByteBuffer_CreateAndAppend(closureData->key, sizeof(closureData->key),
                transfer->keyPrefix.array.data, transfer->keyPrefix.bytesUsed),
            .tag = ByteBuffer_CreateAndAppendFormattedCString(closureData->tag, sizeof(closureData->tag),
                "some_value_tag..._%04d", transfer->currentChunk),
            .algorithm = KINETIC_ALGORITHM_SHA1,
            .value = ByteBuffer_Create(closureData->value, sizeof(closureData->value), bytesRead),
            .synchronization = KINETIC_SYNCHRONIZATION_WRITETHROUGH,
        };
        ByteBuffer_AppendBuffer(&closureData->entry.key, transfer->keyPrefix);
        ByteBuffer_AppendFormattedCString(&closureData->entry.key, "_%04d", transfer->currentChunk);
        ByteBuffer_AppendCString(&closureData->entry.tag, "some_value_tag...");
        KineticStatus status = KineticClient_Put(transfer->sessionHandle,
            &closureData->entry,
            &(KineticCompletionClosure) {
@@ -120,10 +116,9 @@ int put_chunk_of_file(FileTransferProgress* transfer)
                Kinetic_GetStatusDescription(status));
        }
    }
    else if (bytesRead == 0) { // no more data to read, but probably not done yet!
    else if (bytesRead == 0) { // EOF reached
        transfer->opsInProgress--;
        free(closureData);
        fprintf(stderr, "Failed reading data from file (0 bytes read)!\n");
    }
    else {
        transfer->opsInProgress--;
@@ -135,8 +130,6 @@ int put_chunk_of_file(FileTransferProgress* transfer)
    return bytesRead;
}

void update_with_status(FileTransferProgress* transfer, KineticStatus const status);

void put_chunk_of_file_finished(KineticCompletionData* kinetic_data, void* clientData)
{
    AsyncWriteClosureData* closureData = clientData;
@@ -145,7 +138,6 @@ void put_chunk_of_file_finished(KineticCompletionData* kinetic_data, void* clien
    currentTransfer->opsInProgress--;

    if (kinetic_data->status == KINETIC_STATUS_SUCCESS) {
        LOGF1("PUT COMPLETED: opsInProgress=%zu", currentTransfer->opsInProgress);
        if (put_chunk_of_file(closureData->currentTransfer) <= 0 && currentTransfer->opsInProgress == 0) {
            if (currentTransfer->status == KINETIC_STATUS_NOT_ATTEMPTED) {
                currentTransfer->status = KINETIC_STATUS_SUCCESS;
@@ -154,20 +146,14 @@ void put_chunk_of_file_finished(KineticCompletionData* kinetic_data, void* clien
        }
    }
    else {
        update_with_status(currentTransfer, kinetic_data->status);
        fprintf(stderr, "Failed writing chunk! PUT response reported status: %s\n",
            Kinetic_GetStatusDescription(kinetic_data->status));
    }
}

        currentTransfer->status = kinetic_data->status;
        // only signal when finished
        // keep track of outstanding operations
// if there is no more data to read (or error), and no outstanding operations, then signal
void update_with_status(FileTransferProgress* transfer, KineticStatus const status)
{
    if (status != KINETIC_STATUS_SUCCESS) {
        transfer->status = status;
        pthread_cond_signal(&transfer->completeCond);
        // if there is no more data to read (or error), and no outstanding operations,
        // then signal
        pthread_cond_signal(&currentTransfer->completeCond);
        fprintf(stderr, "Failed writing chunk! PUT response reported status: %s\n",
            Kinetic_GetStatusDescription(kinetic_data->status));
    }
}

@@ -188,9 +174,9 @@ FileTransferProgress * start_file_transfer(KineticSessionHandle handle,
        
    //start 4 async actions (fix concurrency issue)
    put_chunk_of_file(transferState);
    put_chunk_of_file(transferState);
    put_chunk_of_file(transferState);
    put_chunk_of_file(transferState);
    // put_chunk_of_file(transferState);
    // put_chunk_of_file(transferState);
    // put_chunk_of_file(transferState);
    return transferState;
}

+2 −2
Original line number Diff line number Diff line
@@ -164,8 +164,8 @@ void test_ByteBuffer_Consume(void)
    TEST_ASSERT_EQUAL(totalConsumed, buffer.bytesUsed);

    lenToConsume = 2;
    totalConsumed += lenToConsume;
    ByteArray consumed2 = ByteBuffer_Consume(&buffer, lenToConsume);
    totalConsumed += 2;
    ByteArray consumed2 = ByteBuffer_Consume(&buffer, lenToConsume+1); // request more than available
    TEST_ASSERT_EQUAL_PTR(&array.data[3], consumed2.data);
    TEST_ASSERT_EQUAL(lenToConsume, consumed2.len);
    TEST_ASSERT_EQUAL_HEX8_ARRAY(&data[3], consumed2.data, lenToConsume);