Commit b37e95db authored by Job Vranish's avatar Job Vranish
Browse files

added non-blocking GET example

parent 002d5d49
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
@@ -458,6 +458,7 @@ setup_examples: $(example_executables) \
examples: setup_examples \
	start_simulator \
	run_example_put_nonblocking \
	run_example_get_nonblocking \
	run_example_write_file_blocking \
	run_example_write_file_blocking_threads \
	run_example_write_file_nonblocking \
@@ -468,6 +469,7 @@ examples: setup_examples \
valgrind_examples: setup_examples \
	start_simulator \
	valgrind_put_nonblocking \
	valgrind_get_nonblocking \
	valgrind_example_write_file_blocking \
	valgrind_example_write_file_blocking_threads \
	valgrind_example_write_file_nonblocking \
+145 −0
Original line number Diff line number Diff line
#include "kinetic_client.h"
#include "kinetic_types.h"
#include "kinetic_semaphore.h"
#include <stdlib.h>
#include <sys/param.h>
#include <openssl/sha.h>
#include <pthread.h>

typedef struct {
    KineticSemaphore * sem;
    KineticStatus status;
} GetStatus;

static void get_finished(KineticCompletionData* kinetic_data, void* clientData);

int main(int argc, char** argv)
{
    (void)argc;
    (void)argv;

    // Initialize kinetic-c and establish session
    KineticClient * client = KineticClient_Init("stdout", 0);
    if (client == NULL) { return 1; }
    const char HmacKeyString[] = "asdfasdf";
    KineticSession session = {.config = {
        .host = "localhost",
        .port = KINETIC_PORT,
        .clusterVersion = 0,
        .identity = 1,
        .hmacKey = ByteArray_CreateWithCString(HmacKeyString),
    }};

    KineticStatus connect_status = KineticClient_CreateConnection(&session, client);
    if (connect_status != KINETIC_STATUS_SUCCESS) {
        fprintf(stderr, "Failed connecting to the Kinetic device w/status: %s\n",
            Kinetic_GetStatusDescription(connect_status));
        return 1;
    }

    // some dummy data to PUT
    uint8_t value_data[] = { 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F };
    ByteBuffer value = ByteBuffer_MallocAndAppend(value_data, sizeof(value_data));

    // a dummy key
    uint8_t key_data[] = {0x00, 0x01, 0x02, 0x03, 0x04};
    ByteBuffer key = ByteBuffer_MallocAndAppend(key_data, sizeof(key_data));

    // Populate tag with SHA1
    ByteBuffer tag = ByteBuffer_Malloc(20);
    uint8_t sha1[20];
    SHA1(value.array.data, value.bytesUsed, &sha1[0]);
    ByteBuffer_Append(&tag, sha1, sizeof(sha1));

    KineticEntry entry = {
        .key = key,
        .tag = tag,
        .algorithm = KINETIC_ALGORITHM_SHA1,
        .value = value,
        .synchronization = KINETIC_SYNCHRONIZATION_WRITETHROUGH,
    };

    // Do a blocking put to make sure there is something there to read back
    KineticStatus put_status = KineticClient_Put(&session, &entry, NULL);

    if (put_status != KINETIC_STATUS_SUCCESS) {
        fprintf(stderr, "Put failed w/status: %s\n", Kinetic_GetStatusDescription(put_status));
        return 1;
    }

    // Create structure to populate with GET status in callback
    //   a semaphore is used to notify the main thread that it's
    //   safe to proceed.
    GetStatus get_status = {
        .sem = KineticSemaphore_Create(),
        .status = KINETIC_STATUS_INVALID,
    };

    ByteBuffer getTag = ByteBuffer_Malloc(tag.bytesUsed);
    ByteBuffer getValue = ByteBuffer_Malloc(value.bytesUsed);

    // Because I'm passing a pointer to this entry to KineticClient_Put(), this entry must not
    //   go out of scope until the GET completes
    KineticEntry get_entry = {
        .key = key,
        .tag = getTag,
        .algorithm = KINETIC_ALGORITHM_SHA1,
        .value = getValue,
        .force = true,
    };

    KineticStatus status = KineticClient_Get(
        &session,
        &get_entry,
        &(KineticCompletionClosure) {
            .callback = get_finished,
            .clientData = &get_status,
        }
    );
    if (status != KINETIC_STATUS_SUCCESS) {
        fprintf(stderr, "Get failed w/status: %s\n", Kinetic_GetStatusDescription(status));
        return 1;
    }

    // Wait for GET to finish
    KineticSemaphore_WaitForSignalAndDestroy(get_status.sem);

    if (get_status.status != KINETIC_STATUS_SUCCESS) {
        fprintf(stderr, "GET failed w/status: %s\n", Kinetic_GetStatusDescription(get_status.status));
        return 1;
    }

    if ((value.bytesUsed == getValue.bytesUsed) &&
        (memcmp(value.array.data, getValue.array.data, getValue.bytesUsed) != 0)) {
        fprintf(stderr, "GET completed but returned unexpected value");
        return 1;
    }
    printf("GET completed successfully!\n");

    // Free malloc'd buffers
    ByteBuffer_Free(value);
    ByteBuffer_Free(key);
    ByteBuffer_Free(tag);

    ByteBuffer_Free(getValue);
    ByteBuffer_Free(getTag);
    

    // Shutdown client connection and cleanup
    KineticClient_DestroyConnection(&session);
    KineticClient_Shutdown(client);

    return 0;
}

static void get_finished(KineticCompletionData* kinetic_data, void* clientData)
{
    GetStatus * get_status = clientData;

    KineticSemaphore_Lock(get_status->sem);
    // Save GET result status
    get_status->status = kinetic_data->status;
    // Signal that we're done
    KineticSemaphore_Signal(get_status->sem);
    KineticSemaphore_Unlock(get_status->sem);
}
+4 −4
Original line number Diff line number Diff line
@@ -78,18 +78,18 @@ int main(int argc, char** argv)
    );

    if (status != KINETIC_STATUS_SUCCESS) {
        fprintf(stderr, "Put failed w/status: %s\n", Kinetic_GetStatusDescription(status));
        fprintf(stderr, "PUT failed w/status: %s\n", Kinetic_GetStatusDescription(status));
        return 1;
    }

    // Wait for put to finish
    // Wait for PUT to finish
    KineticSemaphore_WaitForSignalAndDestroy(put_status.sem);

    if (put_status.status != KINETIC_STATUS_SUCCESS) {
        fprintf(stderr, "Transfer failed w/status: %s\n", Kinetic_GetStatusDescription(put_status.status));
        fprintf(stderr, "PUT failed w/status: %s\n", Kinetic_GetStatusDescription(put_status.status));
        return 1;
    }
    printf("Transfer completed successfully!\n");
    printf("PUT completed successfully!\n");

    // Free malloc'd buffers
    ByteBuffer_Free(value);