Commit ef7691d1 authored by Greg Williams's avatar Greg Williams
Browse files

Finished updating code to account for change to session/connection and corresponding API changes.

parent 575c5206
Loading
Loading
Loading
Loading
+13 −10
Original line number Diff line number Diff line
@@ -99,20 +99,23 @@ int main(int argc, char** argv)
    // Establish connection
    KineticStatus status;
    const char HmacKeyString[] = "asdfasdf";
    const KineticSession session = {
    KineticSession session = {
        .config = (KineticSessionConfig) {
            .host = "localhost",
            .port = KINETIC_PORT,
            .clusterVersion = 0,
            .identity = 1,
        .hmacKey = ByteArray_CreateWithCString(HmacKeyString),
            .hmacKey = ByteArray_CreateWithCString(HmacKeyString)
        }
    };
    write_args* writeArgs = calloc(1, sizeof(write_args));
    KineticClient_Init("stdout", 0);
    status = KineticClient_CreateConnection(&session);
    if (status != KINETIC_STATUS_SUCCESS) {
        fprintf(stderr, "Connection to host '%s' failed w/ status: %s",
            session.host, Kinetic_GetStatusDescription(status));
            session.config.host, Kinetic_GetStatusDescription(status));
    }
    writeArgs->session = &session;
    
    // Create a ByteBuffer for consuming chunks of data out of for overlapped PUTs
    writeArgs->data = ByteBuffer_Create(buf, dataLen, 0);
@@ -133,14 +136,14 @@ int main(int argc, char** argv)
        .value = ByteBuffer_Create(writeArgs->value, sizeof(writeArgs->value), 0),
        .synchronization = KINETIC_SYNCHRONIZATION_WRITEBACK,
    };
    strcpy(writeArgs->ip, session.host);
    strcpy(writeArgs->ip, session.config.host);

    // Store the data
    printf("\nWriting data file to the Kinetic device...\n");
    store_data(writeArgs);

    // Shutdown client connection and cleanup
    KineticClient_DestroyConnection(&writeArgs->session);
    KineticClient_DestroyConnection(writeArgs->session);
    KineticClient_Shutdown();
    free(writeArgs);
    free(buf);
+10 −8
Original line number Diff line number Diff line
@@ -37,7 +37,7 @@
typedef struct {
    pthread_t threadID;
    char ip[16];
    KineticSession* session;
    KineticSession session;
    char keyPrefix[KINETIC_DEFAULT_KEY_LEN];
    uint8_t key[KINETIC_DEFAULT_KEY_LEN];
    uint8_t version[KINETIC_DEFAULT_KEY_LEN];
@@ -66,7 +66,7 @@ void* store_data(void* args)
        ByteBuffer_AppendArray(&entry->value, ByteBuffer_Consume(&thread_args->data, KINETIC_OBJ_SIZE));

        // Store the data slice
        KineticStatus status = KineticClient_Put(thread_args->session, entry, NULL);
        KineticStatus status = KineticClient_Put(&thread_args->session, entry, NULL);
        if (status != KINETIC_STATUS_SUCCESS) {
            fprintf(stderr, "Failed writing entry %d to disk w/status: %s",
                objIndex+1, Kinetic_GetStatusDescription(status));
@@ -98,30 +98,32 @@ int main(int argc, char** argv)

    // Initialize kinetic-c and configure sessions
    KineticClient_Init("stdout", 0);

    write_args* writeArgs = calloc(NUM_FILES, sizeof(write_args));
    if (writeArgs == NULL) {
        fprintf(stderr, "Failed allocating overlapped thread arguments!\n");
    }
    const char HmacKeyString[] = "asdfasdf";
    const KineticSession session = {
    const KineticSessionConfig config = {
        .host = "localhost",
        .port = KINETIC_PORT,
        .clusterVersion = 0,
        .identity = 1,
        .hmacKey = ByteArray_CreateWithCString(HmacKeyString),
    };
    write_args* writeArgs = calloc(NUM_FILES, sizeof(write_args));
    if (writeArgs == NULL) {
        fprintf(stderr, "Failed allocating overlapped thread arguments!\n");
    }

    // Kick off a thread for each file to store
    for (int i = 0; i < NUM_FILES; i++) {

        // Establish connection
        writeArgs[i].session = (KineticSession){.config = config};
        status = KineticClient_CreateConnection(&writeArgs[i].session);
        if (status != KINETIC_STATUS_SUCCESS) {
            fprintf(stderr, "Failed connecting to the Kinetic device w/status: %s\n",
                Kinetic_GetStatusDescription(status));
            return -1;
        }
        strcpy(writeArgs[i].ip, writeArgs[i].session.host);
        strcpy(writeArgs[i].ip, writeArgs[i].session.config.host);

        // Create a ByteBuffer for consuming chunks of data out of for overlapped PUTs
        writeArgs[i].data = ByteBuffer_Create(buf, dataLen, 0);
+9 −8
Original line number Diff line number Diff line
@@ -73,17 +73,18 @@ int main(int argc, char** argv)

    // Initialize kinetic-c and configure sessions
    const char HmacKeyString[] = "asdfasdf";
    const KineticSession session = {
    KineticSession session = {
            .config = (KineticSessionConfig) {
            .host = "localhost",
            .port = KINETIC_PORT,
            .clusterVersion = 0,
            .identity = 1,
            .hmacKey = ByteArray_CreateWithCString(HmacKeyString),
        }
    };
    KineticClient_Init("stdout", 0);

    // Establish connection
    KineticSession session;
    KineticStatus status = KineticClient_CreateConnection(&session);
    if (status != KINETIC_STATUS_SUCCESS) {
        fprintf(stderr, "Failed connecting to the Kinetic device w/status: %s\n",
@@ -99,7 +100,7 @@ int main(int argc, char** argv)
    // Kick off the chained write/PUT operations and wait for completion
    const uint32_t maxOverlappedChunks = 4;
    const char* dataFile = "test/support/data/test.data";
    FileTransferProgress* transfer = start_file_transfer(session, dataFile, prefix, maxOverlappedChunks);
    FileTransferProgress* transfer = start_file_transfer(&session, dataFile, prefix, maxOverlappedChunks);
    printf("Waiting for transfer to complete...\n");
    status = wait_for_put_finish(transfer);
    if (status != KINETIC_STATUS_SUCCESS) {
+14 −13
Original line number Diff line number Diff line
@@ -83,12 +83,14 @@ int main(int argc, char** argv)

    // Initialize kinetic-c and configure session
    const char HmacKeyString[] = "asdfasdf";
    const KineticSession session = {
    KineticSession session = {
        .config = (KineticSessionConfig) {
            .host = "localhost",
            .port = KINETIC_PORT,
            .clusterVersion = 0,
            .identity = 1,
            .hmacKey = ByteArray_CreateWithCString(HmacKeyString),
        }
    };
    KineticClient_Init("stdout", 0);

@@ -109,19 +111,19 @@ int main(int argc, char** argv)
    bool success = true;
    StoreFileOperation ops[] = {
        {
            .handle = handle,
            .session = &session,
            .filename = "./test/support/data/file_a.png",
            .keyPrefix = prefix,
            .maxOverlappedChunks = 4,
        },
        {
            .handle = handle,
            .session = &session,
            .filename = "./test/support/data/file_b.png",
            .keyPrefix = prefix,
            .maxOverlappedChunks = 4,
        },
        {
            .handle = handle,
            .session = &session,
            .filename = "./test/support/data/file_c.png",
            .keyPrefix = prefix,
            .maxOverlappedChunks = 4,
@@ -152,7 +154,7 @@ int main(int argc, char** argv)
    printf("Complete!\n");
    
    // Shutdown client connection and cleanup
    KineticClient_DestroyConnection(&handle);
    KineticClient_DestroyConnection(&session);
    KineticClient_Shutdown();

    return success ? 0 : -1;
@@ -163,7 +165,7 @@ void* store_file_thread(void* storeArgs)
    // Kick off the chained write/PUT operations and wait for completion
    StoreFileOperation* op = storeArgs;
    FileTransferProgress* transfer =
        start_file_transfer(op->handle, op->filename, op->keyPrefix, op->maxOverlappedChunks);
        start_file_transfer(op->session, op->filename, op->keyPrefix, op->maxOverlappedChunks);
    op->status = wait_for_transfer_complete(transfer);
    if (op->status != KINETIC_STATUS_SUCCESS) {
        fprintf(stderr, "Transfer failed w/status: %s\n", Kinetic_GetStatusDescription(op->status));
@@ -233,8 +235,7 @@ int put_chunk_of_file(FileTransferProgress* transfer)
            .value = ByteBuffer_Create(closureData->value, sizeof(closureData->value), (size_t)bytesRead),
            .synchronization = KINETIC_SYNCHRONIZATION_WRITETHROUGH,
        };
        KineticStatus status = KineticClient_Put(transfer->sessionHandle,
            &closureData->entry,
        KineticStatus status = KineticClient_Put(transfer->session, &closureData->entry,
            &(KineticCompletionClosure) {
                .callback = put_chunk_of_file_finished,
                .clientData = closureData,
+1 −0
Original line number Diff line number Diff line
@@ -43,6 +43,7 @@ KineticStatus KineticSession_Create(KineticSession * const session)
        return KINETIC_STATUS_MEMORY_ERROR;
    }
    KINETIC_CONNECTION_INIT(session->connection);
    session->connection->session = *session; // TODO: KILL ME!!!
    return KINETIC_STATUS_SUCCESS;
}

Loading