Commit 35730d38 authored by Job Vranish's avatar Job Vranish
Browse files

added simple non-blocking put example

parent a84e415c
Loading
Loading
Loading
Loading
+9 −1
Original line number Diff line number Diff line
@@ -70,6 +70,7 @@ LIB_OBJS = \
	$(OUT_DIR)/kinetic_types_internal.o \
	$(OUT_DIR)/kinetic_types.o \
	$(OUT_DIR)/kinetic_memory.o \
	$(OUT_DIR)/kinetic_semaphore.o \
	$(OUT_DIR)/byte_array.o \
	$(OUT_DIR)/kinetic_client.o \
	$(OUT_DIR)/threadpool.o \
@@ -225,6 +226,8 @@ install: $(KINETIC_LIB) $(KINETIC_SO_DEV)
	$(INSTALL) -d $(PREFIX)/include/
	$(INSTALL) -c $(PUB_INC)/$(API_NAME).h $(PREFIX)/include/
	$(INSTALL) -c $(PUB_INC)/kinetic_types.h $(PREFIX)/include/
	$(INSTALL) -c $(PUB_INC)/kinetic_semaphore.h $(PREFIX)/include/
	$(INSTALL) -c $(PUB_INC)/byte_array.h $(PREFIX)/include/

uninstall:
	@echo
@@ -236,6 +239,8 @@ uninstall:
	$(RM) -f $(PREFIX)${LIBDIR}/lib$(PROJECT)*.so
	$(RM) -f $(PREFIX)/include/${API_NAME}.h
	$(RM) -f $(PREFIX)/include/kinetic_types.h
	$(RM) -f $(PREFIX)/include/kinetic_semaphore.h
	$(RM) -f $(PREFIX)/include/byte_array.h
	$(RM) -f $(PREFIX)/include/kinetic_proto.h
	$(RM) -f $(PREFIX)/include/protobuf-c/protobuf-c.h
	$(RM) -f $(PREFIX)/include/protobuf-c.h
@@ -399,6 +404,7 @@ run: $(UTIL_EXEC) start_simulator

EXAMPLE_SRC = ./src/examples
EXAMPLE_LDFLAGS += -lm -l ssl $(KINETIC_LIB) -l crypto -l pthread
EXAMPLE_CFLAGS += -Wno-deprecated-declarations
EXAMPLES = write_file_blocking
VALGRIND = valgrind
VALGRIND_ARGS = --track-origins=yes #--leak-check=full
@@ -411,7 +417,7 @@ $(BIN_DIR)/examples/%: $(EXAMPLE_SRC)/%.c $(KINETIC_LIB)
	@echo ================================================================================
	@echo Building example: '$<'
	@echo --------------------------------------------------------------------------------
	$(CC) -o $@ $< $(CFLAGS) -I$(PUB_INC) $(UTIL_LDFLAGS) $(KINETIC_LIB)
	$(CC) -o $@ $< $(CFLAGS) $(EXAMPLE_CFLAGS) -I$(PUB_INC) $(UTIL_LDFLAGS) $(KINETIC_LIB)
	@echo ================================================================================
	@echo

@@ -451,6 +457,7 @@ setup_examples: $(example_executables) \

examples: setup_examples \
	start_simulator \
	run_example_put_nonblocking \
	run_example_write_file_blocking \
	run_example_write_file_blocking_threads \
	run_example_write_file_nonblocking \
@@ -460,6 +467,7 @@ examples: setup_examples \

valgrind_examples: setup_examples \
	start_simulator \
	valgrind_put_nonblocking \
	valgrind_example_write_file_blocking \
	valgrind_example_write_file_blocking_threads \
	valgrind_example_write_file_nonblocking \
+3 −0
Original line number Diff line number Diff line
@@ -61,5 +61,8 @@ ByteBuffer* ByteBuffer_AppendCString(ByteBuffer* buffer, const char* data);
ByteBuffer* ByteBuffer_AppendFormattedCString(ByteBuffer* buffer, const char * format, ...);
ByteBuffer* ByteBuffer_AppendDummyData(ByteBuffer* buffer, size_t len);
bool ByteBuffer_IsNull(ByteBuffer const buffer);
ByteBuffer ByteBuffer_Malloc(size_t size);
ByteBuffer ByteBuffer_MallocAndAppend(const void* data, size_t len);
void ByteBuffer_Free(ByteBuffer buffer);

#endif // _BYTE_ARRAY_H
+14 −0
Original line number Diff line number Diff line
#ifndef _KINETIC_SEMAPHORE_H
#define _KINETIC_SEMAPHORE_H

struct _KineticSemaphore;
typedef struct _KineticSemaphore KineticSemaphore;

KineticSemaphore * KineticSemaphore_Create(void);
void KineticSemaphore_Lock(KineticSemaphore * sem);
void KineticSemaphore_Unlock(KineticSemaphore * sem);
void KineticSemaphore_Signal(KineticSemaphore * sem);
void KineticSemaphore_WaitForSignalAndDestroy(KineticSemaphore * sem);

#endif // _KINETIC_SEMAPHORE_H
+106 −0
Original line number Diff line number Diff line
#include "kinetic_client.h"
#include "kinetic_types.h"
#include "kinetic_semaphore.h"
#include <stdlib.h>
#include <openssl/sha.h>
#include <pthread.h>

typedef struct {
    KineticSemaphore * sem;
    KineticStatus status;
} PutStatus;

static void put_finished(KineticCompletionData* kinetic_data, void* clientData);

int main(int argc, char** argv)
{
    (void)argc;
    (void)argv;

    // Initialize kinetic-c and establish session
    KineticClient * client = KineticClient_Init("stdout", 0);
    if (client == NULL) { return 1; }
    const char HmacKeyString[] = "asdfasdf";
    KineticSession session = {.config = {
        .host = "localhost",
        .port = KINETIC_PORT,
        .clusterVersion = 0,
        .identity = 1,
        .hmacKey = ByteArray_CreateWithCString(HmacKeyString),
    }};

    KineticStatus connect_status = KineticClient_CreateConnection(&session, client);
    if (connect_status != KINETIC_STATUS_SUCCESS) {
        fprintf(stderr, "Failed connecting to the Kinetic device w/status: %s\n",
            Kinetic_GetStatusDescription(connect_status));
        return 1;
    }

    PutStatus put_status = {
        .sem = KineticSemaphore_Create(),
        .status = KINETIC_STATUS_INVALID,
    };

    // some dummy data to put
    uint8_t value_data[] = { 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F };
    ByteBuffer value = ByteBuffer_MallocAndAppend(value_data, sizeof(value_data));

    // a dummy key
    uint8_t key_data[] = {0x00, 0x01, 0x02, 0x03, 0x04};
    ByteBuffer key = ByteBuffer_MallocAndAppend(key_data, sizeof(key_data));

    ByteBuffer tag = ByteBuffer_Malloc(20);
    uint8_t sha1[20];
    SHA1(value.array.data, value.bytesUsed, &sha1[0]);
    ByteBuffer_Append(&tag, sha1, sizeof(sha1));

    KineticEntry entry = {
        .key = key,
        .tag = tag,
        .algorithm = KINETIC_ALGORITHM_SHA1,
        .value = value,
        .synchronization = KINETIC_SYNCHRONIZATION_WRITETHROUGH,
    };
    
    KineticStatus status = KineticClient_Put(
        &session,
        &entry,
        &(KineticCompletionClosure) {
            .callback = put_finished,
            .clientData = &put_status,
        }
    );

    if (status != KINETIC_STATUS_SUCCESS) {
        fprintf(stderr, "Put failed w/status: %s\n", Kinetic_GetStatusDescription(status));
        return 1;
    }

    KineticSemaphore_WaitForSignalAndDestroy(put_status.sem);

    if (put_status.status != KINETIC_STATUS_SUCCESS) {
        fprintf(stderr, "Transfer failed w/status: %s\n", Kinetic_GetStatusDescription(put_status.status));
        return 1;
    }
    printf("Transfer completed successfully!\n");

    ByteBuffer_Free(value);
    ByteBuffer_Free(key);
    ByteBuffer_Free(tag);

    // Shutdown client connection and cleanup
    KineticClient_DestroyConnection(&session);
    KineticClient_Shutdown(client);

    return 0;
}

static void put_finished(KineticCompletionData* kinetic_data, void* clientData)
{
    PutStatus * put_status = clientData;

    KineticSemaphore_Lock(put_status->sem);
    put_status->status = kinetic_data->status;
    KineticSemaphore_Signal(put_status->sem);
    KineticSemaphore_Unlock(put_status->sem);
}
+27 −2
Original line number Diff line number Diff line
@@ -3,6 +3,7 @@
#include <string.h>
#include <stdio.h>
#include <stdarg.h>
#include <stdlib.h>
#include <sys/param.h>

static ByteBuffer* append_formatted_cstring_va_list(ByteBuffer* buffer,
@@ -110,7 +111,7 @@ ByteBuffer* ByteBuffer_Append(ByteBuffer* buffer, const void* data, size_t len)
    assert(buffer != NULL);
    assert(buffer->array.data != NULL);
    assert(data != NULL);
    if (len == 0 || ((buffer->bytesUsed + len) > buffer->array.len)) {
    if ((buffer->bytesUsed + len) > buffer->array.len) {
        return NULL;
    }
    memcpy(&buffer->array.data[buffer->bytesUsed], data, len);
@@ -124,7 +125,7 @@ ByteBuffer* ByteBuffer_AppendArray(ByteBuffer* buffer, const ByteArray array)
    assert(buffer != NULL);
    assert(buffer->array.data != NULL);
    assert(array.data != NULL);
    if (array.len == 0 || ((buffer->bytesUsed + array.len) > buffer->array.len)) {
    if ((buffer->bytesUsed + array.len) > buffer->array.len) {
        return NULL;
    }
    memcpy(&buffer->array.data[buffer->bytesUsed], array.data, array.len);
@@ -223,3 +224,27 @@ bool ByteBuffer_IsNull(ByteBuffer const buffer)
{
    return buffer.array.data == NULL;
}

ByteBuffer ByteBuffer_Malloc(size_t size)
{
    // There is not check on the return value from calloc by design
    //  the intention is that you can check for malloc failure by
    //  calling ByteBuffer_IsNull on the returned ByteBuffer
    return ByteBuffer_Create(calloc(1, size), size, 0);
}

ByteBuffer ByteBuffer_MallocAndAppend(const void* data, size_t len)
{
    assert(data != NULL);
    assert(len != 0);
    ByteBuffer buffer = ByteBuffer_Malloc(len);
    if (ByteBuffer_IsNull(buffer)) { return buffer; }
    ByteBuffer_Append(&buffer, data, len);
    return buffer;
}

void ByteBuffer_Free(ByteBuffer buffer)
{
    assert(buffer.array.data != NULL);
    free(buffer.array.data);
}
Loading