Commit ec2659d5 authored by Greg Williams's avatar Greg Williams
Browse files

Completed async IO system test

parent efe7e19c
Loading
Loading
Loading
Loading
+0 −1
Original line number Diff line number Diff line
@@ -164,7 +164,6 @@ int main(int argc, char** argv)
            .value = valBuf,
        };
    }
    sleep(2); // Give a generous chunk of time for session to be initialized by the target device

    // Write all of the copies simultaneously (overlapped)
    for (int i = 0; i < NUM_COPIES; i++) {
+68 −237
Original line number Diff line number Diff line
@@ -54,23 +54,12 @@ STATIC KineticSessionHandle* kinetic_client;
STATIC const char HmacKeyString[] = "asdfasdf";
STATIC int SourceDataSize;


struct kinetic_thread_arg {
    char ip[16];
    struct kinetic_put_arg* opArgs;
    int opCount;
};

void setUp(void)
{
    KineticClient_Init("stdout", 3);
}

void tearDown(void)
{
    KineticClient_Shutdown();
}

typedef struct {
    size_t opsInProgress;
    size_t currentChunk;
@@ -98,11 +87,12 @@ int put_chunk_of_file(FileTransferProgress* transfer)
{
    AsyncWriteClosureData* closureData = calloc(1, sizeof(AsyncWriteClosureData));
    transfer->opsInProgress++;
    closureData->currentTransfer = transfer;

    ssize_t bytesRead = read(transfer->fd, closureData->value, sizeof(closureData->value));
    size_t bytesRead = read(transfer->fd, closureData->value, sizeof(closureData->value));
    LOGF0("[chunk len=%zu]", bytesRead);

    if (bytesRead > 0)
    {
    if (bytesRead > 0) {
        transfer->currentChunk++;
        closureData->entry = (KineticEntry){
            .key = ByteBuffer_Create(closureData->key, sizeof(closureData->key), 0),
@@ -120,34 +110,33 @@ int put_chunk_of_file(FileTransferProgress* transfer)
                .callback = put_chunk_of_file_finished,
                .clientData = closureData,
            });
        if (status == KINETIC_STATUS_SUCCESS)
        {
        if (status == KINETIC_STATUS_SUCCESS) {
            printf("Wrote chunk successfully!\n");
        }
        else
        {
        else {
            transfer->opsInProgress--;
            free(closureData);

            fprintf(stderr, "Failed writing chunk! PUT request reported status: %s\n",
                Kinetic_GetStatusDescription(status));
        }
    }
    else if (bytesRead == 0)
    {
        // no more data to read
        // but we're probably not done yet!
    else if (bytesRead == 0) { // no more data to read, but probably not done yet!
        transfer->opsInProgress--;
        free(closureData);
        fprintf(stderr, "Failed reading data from file (0 bytes read)!\n");
    }
    else
    {
    else {
        transfer->opsInProgress--;
        free(closureData);
        fprintf(stderr, "Failed reading data from file!\n");
        // REPORT_ERRNO(pthreadStatus, "read");
    }
    

    
    return bytesRead;
}

void update_with_status(FileTransferProgress* transfer, KineticStatus const status);

void put_chunk_of_file_finished(KineticCompletionData* kinetic_data, void* clientData)
{
    AsyncWriteClosureData* closureData = clientData;
@@ -155,32 +144,35 @@ void put_chunk_of_file_finished(KineticCompletionData* kinetic_data, void* clien
    free(closureData);
    currentTransfer->opsInProgress--;

    if (kinetic_data->status == KINETIC_STATUS_SUCCESS)
    {
        if (put_chunk_of_file() <= 0 && currentTransfer->opsInProgress == 0)
        {
            // pthread_cond_signal(&cdata->completeCond);
    if (kinetic_data->status == KINETIC_STATUS_SUCCESS) {
        LOGF1("PUT COMPLETED: opsInProgress=%zu", currentTransfer->opsInProgress);
        if (put_chunk_of_file(closureData->currentTransfer) <= 0 && currentTransfer->opsInProgress == 0) {
            if (currentTransfer->status == KINETIC_STATUS_NOT_ATTEMPTED) {
                currentTransfer->status = KINETIC_STATUS_SUCCESS;
            }

            pthread_cond_signal(&currentTransfer->completeCond);
        }
    else
    {
        update_with_status(&currentTransfer->status, kinetic_data->status);
    }
    else {
        update_with_status(currentTransfer, kinetic_data->status);
        fprintf(stderr, "Failed writing chunk! PUT response reported status: %s\n",
            Kinetic_GetStatusDescription(kinetic_data->status));
    }
}


// only signal when finished
// keep track of outstanding operations
// if there is no more data to read (or error), and no outstanding operations, then signal

void update_with_status(FileTransferProgress* transfer, KineticStatus const status)
{

    if (status != KINETIC_STATUS_SUCCESS) {
        transfer->status = status;
        pthread_cond_signal(&transfer->completeCond);
    }
}


FileTransferProgress * start_file_transfer(KineticSessionHandle handle, char const * const filename, char const * const keyPrefix)
FileTransferProgress * start_file_transfer(KineticSessionHandle handle,
    char const * const filename, char const * const keyPrefix)
{
    FileTransferProgress * transferState = calloc(1, sizeof(FileTransferProgress));
    transferState->sessionHandle = handle;
@@ -190,11 +182,7 @@ FileTransferProgress * start_file_transfer(KineticSessionHandle handle, char con
    pthread_cond_init(&transferState->completeCond, NULL); 
    
    transferState->keyPrefix = ByteBuffer_Create(transferState->keyPrefixBuffer, sizeof(transferState->keyPrefixBuffer), 0);
    struct timeval now;
    gettimeofday(&now, NULL);
    ByteBuffer_AppendCString(&transferState->keyPrefix, filename);
    ByteBuffer_AppendCString(&transferState->keyPrefix, keyPrefix);
    ByteBuffer_AppendFormattedCString(&transferState->keyPrefix, "_%010llu", (unsigned long long)now.tv_sec);

    transferState->fd = open(filename, O_RDONLY);
        
@@ -224,204 +212,47 @@ KineticStatus wait_for_put_finish(FileTransferProgress* const transfer)
    return status;
}

#if 0
void* kinetic_put(void* kinetic_arg)
{
    struct kinetic_thread_arg* arg = kinetic_arg;
    KineticEntry* entry = &(arg->entry);
    int32_t objIndex = 0;
    struct timeval startTime, stopTime;
    gettimeofday(&startTime, NULL);

    size_t totalLen = ByteBuffer_BytesRemaining(arg->data);
    size_t opCount = totalLen / MAX_OBJ_SIZE;
    if (totalLen % MAX_OBJ_SIZE) {
        opCount++;
    }

    TestPutClientDataStruct clientData = {
        .opsInProgress = opCount,
        .opsFailed = 0,
    };
    KineticCompletionClosure closure = {
        .callback = put_closure,
        .clientData = &clientData,
    };

    while (ByteBuffer_BytesRemaining(arg->data) > 0) {

        // Configure meta-data
        char keySuffix[8];
        snprintf(keySuffix, sizeof(keySuffix), "%02d", objIndex);
        entry->key.bytesUsed = strlen(arg->keyPrefix);
        ByteBuffer_AppendCString(&entry->key, keySuffix);

        // Prepare the next chunk of data to store
        ByteBuffer_Reset(&entry->value);
        ByteBuffer_AppendArray(
            &entry->value,
            ByteBuffer_Consume(&arg->data, MIN(ByteBuffer_BytesRemaining(arg->data), MAX_OBJ_SIZE))
        );

        // Set operation-specific attributes
        entry->synchronization = KINETIC_SYNCHRONIZATION_WRITETHROUGH;

        // Store the data slice
        LOGF1("  *** Storing a data slice (%zu bytes)", entry->value.bytesUsed);

        KineticStatus status = KineticClient_Put(arg->sessionHandle, entry, closure);
        TEST_ASSERT_EQUAL_KineticStatus(KINETIC_STATUS_SUCCESS, status);

        objIndex++;
    }

    // Wait for all write operations to complete...
    while(clientData.opsInProgress > 0) {
        sleep(0);
    }
    gettimeofday(&stopTime, NULL);
    LOG0("All PUT operations completed!");
    TEST_ASSERT_EQUAL_SIZET(0, clientData.opsFailed);

    int64_t elapsed_us = ((stopTime.tv_sec - startTime.tv_sec) * 1000000)
        + (stopTime.tv_usec - startTime.tv_usec);
    float elapsed_ms = elapsed_us / 1000.0f;
    arg->bandwidth = (arg->data.array.len * 1000.0f) / (elapsed_ms * 1024 * 1024);
    fflush(stdout);
    printf("\n"
        "Write/Put Performance:\n"
        "----------------------------------------\n"
        "wrote:      %.1f kB\n"
        "duration:   %.3f seconds\n"
        "entries:    %d entries\n"
        "throughput: %.2f MB/sec\n\n",
        arg->data.array.len / 1024.0f,
        elapsed_ms / 1000.0f,
        objIndex,
        arg->bandwidth);
    fflush(stdout);

    return (void*)0;
}

void test_kinetic_client_should_store_a_binary_object_split_across_entries_via_ovelapped_asynchronous_IO_operations(void)
{
    // Initialize kinetic-c and configure sessions
    const char HmacKeyString[] = "asdfasdf";
    const KineticSession sessionConfig = {
        .host = SYSTEM_TEST_HOST,
        .host = "localhost",
        .port = KINETIC_PORT,
        .clusterVersion = 0,
        .identity = 1,
        .nonBlocking = false,
        .hmacKey = ByteArray_CreateWithCString(HmacKeyString),
    };

    float bandwidthAccumulator = 0.0f, minBandwidth = 1000000000.0f, maxBandwidth = -1000000000.0f;
    float aggregateBandwidthPerIteration[MAX_ITERATIONS];

    for (int iteration = 0; iteration < MAX_ITERATIONS; iteration++) {

        printf("\n*** Overlapped PUT operation (iteration %d of %d)\n",
               iteration + 1, MAX_ITERATIONS);

        char* buf = malloc(sizeof(char) * BUFSIZE);
        int fd = open("test/support/data/test.data", O_RDONLY);
        SourceDataSize = read(fd, buf, BUFSIZE);
        close(fd);
        TEST_ASSERT_MESSAGE(SourceDataSize > 0, "read error");

        // Allocate session/thread data
        struct kinetic_thread_arg* kt_arg;
        pthread_t thread_id[KINETIC_MAX_THREADS];
        kinetic_client = malloc(sizeof(KineticSessionHandle) * NUM_COPIES);
        TEST_ASSERT_NOT_NULL_MESSAGE(kinetic_client, "kinetic_client malloc failed");
        kt_arg = malloc(sizeof(struct kinetic_thread_arg) * NUM_COPIES);
        TEST_ASSERT_NOT_NULL_MESSAGE(kt_arg, "kinetic_thread_arg malloc failed");
    KineticClient_Init("stdout", 2);

    // Establish connection
        TEST_ASSERT_EQUAL_KineticStatus(
            KINETIC_STATUS_SUCCESS,
            KineticClient_Connect(&sessionConfig, &kinetic_client[i]));
        strcpy(kt_arg[i].ip, sessionConfig.host);

        for (int i = 0; i < NUM_COPIES; i++) {

            printf("  *** Overlapped PUT operations (writing copy %d of %d)"
                   " on IP (iteration %d of %d):%s\n",
                   i + 1, NUM_COPIES, iteration + 1,
                   MAX_ITERATIONS, sessionConfig.host);

            // Create a ByteBuffer for consuming chunks of data out of for overlapped PUTs
            kt_arg[i].data = ByteBuffer_Create(buf, SourceDataSize, 0);
    KineticSessionHandle sessionHandle;
    KineticStatus status = KineticClient_Connect(&sessionConfig, &sessionHandle);
    if (status != KINETIC_STATUS_SUCCESS) {
        fprintf(stderr, "Failed connecting to the Kinetic device w/status: %s\n",
            Kinetic_GetStatusDescription(status));
        TEST_FAIL();
    }

            // Configure the KineticEntry
    // Create a unique/common key prefix
    struct timeval now;
    gettimeofday(&now, NULL);
            snprintf(kt_arg[i].keyPrefix, sizeof(kt_arg[i].keyPrefix), "%010llu_%02d%02d_",
                (unsigned long long)now.tv_sec, iteration, i);
            ByteBuffer keyBuf = ByteBuffer_Create(kt_arg[i].key, sizeof(kt_arg[i].key), 0);
            ByteBuffer_AppendCString(&keyBuf, kt_arg[i].keyPrefix);
            ByteBuffer verBuf = ByteBuffer_Create(kt_arg[i].version, sizeof(kt_arg[i].version), 0);
            ByteBuffer_AppendCString(&verBuf, "v1.0");
            ByteBuffer tagBuf = ByteBuffer_Create(kt_arg[i].tag, sizeof(kt_arg[i].tag), 0);
            ByteBuffer_AppendCString(&tagBuf, "some_value_tag...");
            ByteBuffer valBuf = ByteBuffer_Create(kt_arg[i].value, sizeof(kt_arg[i].value), 0);
            kt_arg[i].entry = (KineticEntry) {
                .key = keyBuf,
                // .newVersion = verBuf,
                .tag = tagBuf,
                .algorithm = KINETIC_ALGORITHM_SHA1,
                .value = valBuf,
            };

            // Spawn the thread
            kt_arg[i].sessionHandle = kinetic_client[i];
            int pthreadStatus = pthread_create(&thread_id[i], NULL, kinetic_put, &kt_arg[i]);
            REPORT_ERRNO(pthreadStatus, "pthread_create");
            TEST_ASSERT_EQUAL_MESSAGE(0, pthreadStatus, "pthread create failed");
    char keyPrefix[64];
    snprintf(keyPrefix, sizeof(keyPrefix), "%010ld_", now.tv_sec);

    // Kick off the chained write/PUT operations and wait for completion
    const char* dataFile = "test/support/data/test.data";
    FileTransferProgress* transfer = start_file_transfer(sessionHandle, dataFile, keyPrefix);
    printf("Waiting for transfer to complete...\n");
    status = wait_for_put_finish(transfer);
    if (status != KINETIC_STATUS_SUCCESS) {
        fprintf(stderr, "Transfer failed w/status: %s\n",
            Kinetic_GetStatusDescription(status));
        TEST_FAIL();
    }

        // Wait for each overlapped PUT operations to complete and cleanup
        printf("  *** Waiting for PUT threads to exit...\n");
        aggregateBandwidthPerIteration[iteration] = 0.0f; 
        for (int i = 0; i < NUM_COPIES; i++) {
            int join_status = pthread_join(thread_id[i], NULL);
            TEST_ASSERT_EQUAL_MESSAGE(0, join_status, "pthread join failed");

            // Update results for summary
            bandwidthAccumulator += kt_arg[i].bandwidth;
            aggregateBandwidthPerIteration[iteration] += kt_arg[i].bandwidth;
            minBandwidth = MIN(kt_arg[i].bandwidth, minBandwidth);
            maxBandwidth = MAX(kt_arg[i].bandwidth, maxBandwidth);
        }

        KineticClient_Disconnect(&kinetic_client[i]);

        // Cleanup the rest of the reources
        free(kinetic_client);
        free(kt_arg);
        free(buf);

        fflush(stdout);
        printf("  *** Iteration complete!\n");
        fflush(stdout);
    }

    fflush(stdout);
    printf("\n*** Overlapped PUT operation test complete!\n\n");
    double meanBandwidth = bandwidthAccumulator / (MAX_ITERATIONS * NUM_COPIES);
    double meanAggregateBandwidth = 0.0f;
    for (int iteration = 0; iteration < MAX_ITERATIONS; iteration++) {
        meanAggregateBandwidth += aggregateBandwidthPerIteration[iteration];
    }
    meanAggregateBandwidth /= MAX_ITERATIONS;
    printf("========================================\n");
    printf("=         Performance Summary          =\n");
    printf("========================================\n");
    printf("Min write bandwidth:      %.2f (MB/sec)\n", minBandwidth);
    printf("Max write bandwidth:      %.2f (MB/sec)\n", maxBandwidth);
    printf("Mean write bandwidth:     %.2f (MB/sec)\n", meanBandwidth);
    printf("Mean aggregate bandwidth: %.2f (MB/sec)\n", meanAggregateBandwidth);
    printf("\n");
    fflush(stdout);
    // Shutdown client connection and cleanup
    KineticClient_Disconnect(&sessionHandle);
    KineticClient_Shutdown();
}
#endif