Commit c88a7ee6 authored by Job Vranish's avatar Job Vranish
Browse files

added support for threadless async

parent 8a560813
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
@@ -62,6 +62,8 @@ KineticStatus KineticClient_Connect(const KineticSession* config,
 */
KineticStatus KineticClient_Disconnect(KineticSessionHandle* const handle);

KineticStatus KineticClient_AsyncRun(KineticSessionHandle* handle);

/**
 * @brief Executes a NOOP command to test whether the Kinetic Device is operational.
 *
+4 −0
Original line number Diff line number Diff line
@@ -106,6 +106,10 @@ typedef struct _KineticSession {
    // Set to true to enable non-blocking/asynchronous I/O
    bool    nonBlocking;

    // Don't start worker threads, the user will need to call
    //   KineticClient_AsyncRun() in order for async callbacks to fire
    bool    no_threads; 

    // The version number of this cluster definition. If this is not equal to
    // the value on the Kinetic Device, the request is rejected and will return
    // `KINETIC_STATUS_VERSION_FAILURE`
+79 −109
Original line number Diff line number Diff line
@@ -26,26 +26,15 @@
#include <sys/param.h>
#include <sys/stat.h>
#include <sys/file.h>
#include <sys/mman.h>
#include <pthread.h>
#include <errno.h>

#define REPORT_ERRNO(en, msg) if(en != 0){errno = en; perror(msg);}

struct kinetic_thread_arg {
    char ip[16];
    struct kinetic_put_arg* opArgs;
    int opCount;
};

typedef struct {
    size_t opsInProgress;
    size_t currentChunk;
    size_t maxOverlappedChunks;
    int fd;
    uint64_t keyPrefix;
    pthread_mutex_t transferMutex;
    pthread_mutex_t completeMutex;
    pthread_cond_t completeCond;
    KineticStatus status;
    KineticSessionHandle sessionHandle;
} FileTransferProgress;
@@ -53,17 +42,15 @@ typedef struct {
typedef struct {
    KineticEntry entry;
    uint8_t key[KINETIC_DEFAULT_KEY_LEN];
    uint8_t value[KINETIC_OBJ_SIZE];
    uint8_t tag[KINETIC_DEFAULT_KEY_LEN];
    FileTransferProgress* currentTransfer;
} AsyncWriteClosureData;

FileTransferProgress * start_file_transfer(KineticSessionHandle handle,
    char const * const filename, uint64_t keyPrefix, uint32_t maxOverlappedChunks);
KineticStatus wait_for_put_finish(FileTransferProgress* const transfer);
static FileTransferProgress * start_file_transfer(KineticSessionHandle handle,
    char const * const filename, uint64_t keyPrefix);
static KineticStatus wait_for_transfer_complete(FileTransferProgress* const transfer);

static int put_chunk_of_file(FileTransferProgress* transfer);
static void put_chunk_of_file_finished(KineticCompletionData* kinetic_data, void* client_data);
static void put_complete(KineticCompletionData* kinetic_data, void* client_data);


int main(int argc, char** argv)
@@ -78,6 +65,7 @@ int main(int argc, char** argv)
        .port = KINETIC_PORT,
        .clusterVersion = 0,
        .identity = 1,
        .no_threads = true,
        .hmacKey = ByteArray_CreateWithCString(HmacKeyString),
    };
    KineticClient_Init("stdout", 0);
@@ -94,19 +82,20 @@ int main(int argc, char** argv)
    // Create a unique/common key prefix
    struct timeval now;
    gettimeofday(&now, NULL);
    uint64_t prefix = (uint64_t)now.tv_sec << sizeof(8);
    uint64_t prefix = (uint64_t)now.tv_sec << 8;

    // Kick off the chained write/PUT operations and wait for completion
    const uint32_t maxOverlappedChunks = 4;
    // Kick off the write/PUT operations and wait for completion
    const char* dataFile = "test/support/data/test.data";
    FileTransferProgress* transfer = start_file_transfer(sessionHandle, dataFile, prefix, maxOverlappedChunks);
    FileTransferProgress* transfer = start_file_transfer(sessionHandle, dataFile, prefix);
    if (transfer != NULL) {
        printf("Waiting for transfer to complete...\n");
    status = wait_for_put_finish(transfer);
        status = wait_for_transfer_complete(transfer);
        if (status != KINETIC_STATUS_SUCCESS) {
            fprintf(stderr, "Transfer failed w/status: %s\n", Kinetic_GetStatusDescription(status));
            return -2;
        }
        printf("Transfer completed successfully!\n");
    }

    // Shutdown client connection and cleanup
    KineticClient_Disconnect(&sessionHandle);
@@ -115,111 +104,92 @@ int main(int argc, char** argv)
    return 0;
}

static int put_chunk_of_file(FileTransferProgress* transfer)
static void put_complete(KineticCompletionData* kinetic_data, void* clientData)
{
    AsyncWriteClosureData* closureData = clientData;
    FileTransferProgress* currentTransfer = closureData->currentTransfer;
    free(closureData);
    currentTransfer->opsInProgress--;

    // make sure the first error (if there is one) is the one we keep
    if (currentTransfer->status == KINETIC_STATUS_SUCCESS) {
        currentTransfer->status = kinetic_data->status;
    }

    if (kinetic_data->status != KINETIC_STATUS_SUCCESS) {
        fprintf(stderr, "Failed writing chunk! PUT response reported status: %s\n",
            Kinetic_GetStatusDescription(kinetic_data->status));
    }
}

static FileTransferProgress * start_file_transfer(KineticSessionHandle handle,
    char const * const filename, uint64_t keyPrefix)
{
    FileTransferProgress * transferState = calloc(1, sizeof(FileTransferProgress));
    int file = open(filename, O_RDONLY);
    *transferState = (FileTransferProgress) {
        .opsInProgress = 0,
        .status = KINETIC_STATUS_SUCCESS,
        .sessionHandle = handle,
        .fd = file,
    };

    if (file < 0) {
        printf("Unable to open %s\n", filename);
        return NULL;
    }

    struct stat inputfile_stat;
    fstat(file, &inputfile_stat);
    char* inputfile_data = (char*)mmap(0, inputfile_stat.st_size, PROT_READ, MAP_SHARED, file, 0);
   
    for (off_t i = 0; i < inputfile_stat.st_size; i += 1024*1024) {
       int value_size = 1024*1024;
       if (i + value_size > inputfile_stat.st_size) {
           value_size = inputfile_stat.st_size - i + 1;
       }
       AsyncWriteClosureData* closureData = calloc(1, sizeof(AsyncWriteClosureData));
    transfer->opsInProgress++;
    closureData->currentTransfer = transfer;
       int32_t currentChunk = (i >> 20);
       uint64_t key = keyPrefix + currentChunk;

    int bytesRead = read(transfer->fd, closureData->value, sizeof(closureData->value));
    if (bytesRead > 0) {
        transfer->currentChunk++;
       closureData->entry = (KineticEntry){
           .key = ByteBuffer_CreateAndAppend(closureData->key, sizeof(closureData->key),
                &transfer->keyPrefix, sizeof(transfer->keyPrefix)),
               &key, sizeof(key)),
           .tag = ByteBuffer_CreateAndAppendFormattedCString(closureData->tag, sizeof(closureData->tag),
                "some_value_tag..._%04d", transfer->currentChunk),
               "some_value_tag..._%04d", currentChunk),
           .algorithm = KINETIC_ALGORITHM_SHA1,
            .value = ByteBuffer_Create(closureData->value, sizeof(closureData->value), (size_t)bytesRead),
           .value = ByteBuffer_Create(&inputfile_data[i], value_size, value_size),
           .synchronization = KINETIC_SYNCHRONIZATION_WRITETHROUGH,
       };
        KineticStatus status = KineticClient_Put(transfer->sessionHandle,
       closureData->currentTransfer = transferState;
       transferState->opsInProgress++;
       KineticStatus status = KineticClient_Put(handle,
           &closureData->entry,
           &(KineticCompletionClosure) {
                .callback = put_chunk_of_file_finished,
               .callback = put_complete,
               .clientData = closureData,
           });
       if (status != KINETIC_STATUS_SUCCESS) {
            transfer->opsInProgress--;
           transferState->opsInProgress--;
           free(closureData);
           fprintf(stderr, "Failed writing chunk! PUT request reported status: %s\n",
               Kinetic_GetStatusDescription(status));
       }
    }
    else if (bytesRead == 0) { // EOF reached
        transfer->opsInProgress--;
        free(closureData);
    }
    else {
        transfer->opsInProgress--;
        free(closureData);
        fprintf(stderr, "Failed reading data from file!\n");
        REPORT_ERRNO(bytesRead, "read");
    }

    return bytesRead;
}

static void put_chunk_of_file_finished(KineticCompletionData* kinetic_data, void* clientData)
{
    AsyncWriteClosureData* closureData = clientData;
    FileTransferProgress* currentTransfer = closureData->currentTransfer;
    free(closureData);
    currentTransfer->opsInProgress--;

    if (kinetic_data->status == KINETIC_STATUS_SUCCESS) {
        int bytesPut = put_chunk_of_file(currentTransfer);
        if (bytesPut <= 0 && currentTransfer->opsInProgress == 0) {
            if (currentTransfer->status == KINETIC_STATUS_NOT_ATTEMPTED) {
                currentTransfer->status = KINETIC_STATUS_SUCCESS;
            }
            pthread_cond_signal(&currentTransfer->completeCond);
        }
    }
    else {
        currentTransfer->status = kinetic_data->status;
        // only signal when finished
        // keep track of outstanding operations
        // if there is no more data to read (or error), and no outstanding operations,
        // then signal
        pthread_cond_signal(&currentTransfer->completeCond);
        fprintf(stderr, "Failed writing chunk! PUT response reported status: %s\n",
            Kinetic_GetStatusDescription(kinetic_data->status));
    }
    return transferState;
}

FileTransferProgress * start_file_transfer(KineticSessionHandle handle,
    char const * const filename, uint64_t keyPrefix, uint32_t maxOverlappedChunks)
static KineticStatus wait_for_transfer_complete(FileTransferProgress* const transfer)
{
    FileTransferProgress * transferState = malloc(sizeof(FileTransferProgress));
    *transferState = (FileTransferProgress) {
        .sessionHandle = handle,
        .maxOverlappedChunks = maxOverlappedChunks,
        .keyPrefix = keyPrefix,
        .fd = open(filename, O_RDONLY),
    };
    pthread_mutex_init(&transferState->transferMutex, NULL);
    pthread_mutex_init(&transferState->completeMutex, NULL);
    pthread_cond_init(&transferState->completeCond, NULL);
        
    // Start max overlapped PUT operations
    for (size_t i = 0; i < transferState->maxOverlappedChunks; i++) {
        put_chunk_of_file(transferState);
    while (transfer->opsInProgress > 0) {
        if (KineticClient_AsyncRun(&transfer->sessionHandle) != KINETIC_STATUS_SUCCESS) {
            break;
        }
    return transferState;
    }

KineticStatus wait_for_put_finish(FileTransferProgress* const transfer)
{
    pthread_mutex_lock(&transfer->completeMutex);
    pthread_cond_wait(&transfer->completeCond, &transfer->completeMutex);
    pthread_mutex_unlock(&transfer->completeMutex);

    KineticStatus status = transfer->status;

    pthread_mutex_destroy(&transfer->completeMutex);
    pthread_cond_destroy(&transfer->completeCond);

    close(transfer->fd);

    free(transfer);
+83 −130
Original line number Diff line number Diff line
@@ -26,27 +26,15 @@
#include <sys/param.h>
#include <sys/stat.h>
#include <sys/file.h>
#include <sys/mman.h>
#include <pthread.h>
#include <errno.h>

#define REPORT_ERRNO(en, msg) if(en != 0){errno = en; perror(msg);}

struct kinetic_thread_arg {
    char ip[16];
    struct kinetic_put_arg* opArgs;
    int opCount;
};

typedef struct {
    size_t opsInProgress;
    size_t currentChunk;
    size_t maxOverlappedChunks;
    int fd;
    ByteBuffer keyPrefix;
    uint8_t keyPrefixBuffer[KINETIC_DEFAULT_KEY_LEN];
    pthread_mutex_t transferMutex;
    pthread_mutex_t completeMutex;
    pthread_cond_t completeCond;
    KineticStatus status;
    KineticSessionHandle sessionHandle;
} FileTransferProgress;
@@ -54,27 +42,25 @@ typedef struct {
typedef struct {
    KineticEntry entry;
    uint8_t key[KINETIC_DEFAULT_KEY_LEN];
    uint8_t value[KINETIC_OBJ_SIZE];
    uint8_t tag[KINETIC_DEFAULT_KEY_LEN];
    FileTransferProgress* currentTransfer;
} AsyncWriteClosureData;

static FileTransferProgress * start_file_transfer(KineticSessionHandle handle,
    char const * const filename, uint64_t keyPrefix);
static KineticStatus wait_for_transfer_complete(FileTransferProgress* const transfer);

static void put_complete(KineticCompletionData* kinetic_data, void* client_data);

typedef struct {
    KineticSessionHandle handle;
    const char* filename;
    const size_t maxOverlappedChunks;
    uint64_t keyPrefix;
    pthread_t thread;
    KineticStatus status;
} StoreFileOperation;

void* store_file_thread(void* storeArgs);
FileTransferProgress* start_file_transfer(KineticSessionHandle handle,
    char const * const filename, uint64_t prefix, uint32_t maxOverlappedChunks);
KineticStatus wait_for_transfer_complete(FileTransferProgress* const transfer);

static int put_chunk_of_file(FileTransferProgress* transfer);
static void put_chunk_of_file_finished(KineticCompletionData* kinetic_data, void* client_data);

int main(int argc, char** argv)
{
@@ -89,6 +75,7 @@ int main(int argc, char** argv)
        .clusterVersion = 0,
        .identity = 1,
        .hmacKey = ByteArray_CreateWithCString(HmacKeyString),
        .no_threads = true,
    };
    KineticClient_Init("stdout", 0);

@@ -113,19 +100,16 @@ int main(int argc, char** argv)
            .handle = handle,
            .filename = "./test/support/data/file_a.png",
            .keyPrefix = prefix,
            .maxOverlappedChunks = 4,
        },
        {
            .handle = handle,
            .filename = "./test/support/data/file_b.png",
            .keyPrefix = prefix,
            .maxOverlappedChunks = 4,
        },
        {
            .handle = handle,
            .filename = "./test/support/data/file_c.png",
            .keyPrefix = prefix,
            .maxOverlappedChunks = 4,
        },
    };
    const int numFiles = sizeof(ops) / sizeof(StoreFileOperation);
@@ -164,7 +148,7 @@ void* store_file_thread(void* storeArgs)
    // Kick off the chained write/PUT operations and wait for completion
    StoreFileOperation* op = storeArgs;
    FileTransferProgress* transfer =
        start_file_transfer(op->handle, op->filename, op->keyPrefix, op->maxOverlappedChunks);
        start_file_transfer(op->handle, op->filename, op->keyPrefix);
    op->status = wait_for_transfer_complete(transfer);
    if (op->status != KINETIC_STATUS_SUCCESS) {
        fprintf(stderr, "Transfer failed w/status: %s\n", Kinetic_GetStatusDescription(op->status));
@@ -172,126 +156,95 @@ void* store_file_thread(void* storeArgs)
    return (void*)storeArgs;
}

FileTransferProgress * start_file_transfer(KineticSessionHandle handle,
    char const * const filename, uint64_t prefix, uint32_t maxOverlappedChunks)
static void put_complete(KineticCompletionData* kinetic_data, void* clientData)
{
    FileTransferProgress * transferState = malloc(sizeof(FileTransferProgress));
    *transferState = (FileTransferProgress) {
        .sessionHandle = handle,
        .maxOverlappedChunks = maxOverlappedChunks,
        .keyPrefix = ByteBuffer_CreateAndAppend(transferState->keyPrefixBuffer,
            sizeof(transferState->keyPrefixBuffer), &prefix, sizeof(prefix)),
        .fd = open(filename, O_RDONLY),
    };
    pthread_mutex_init(&transferState->transferMutex, NULL); 
    pthread_mutex_init(&transferState->completeMutex, NULL); 
    pthread_cond_init(&transferState->completeCond, NULL);
    AsyncWriteClosureData* closureData = clientData;
    FileTransferProgress* currentTransfer = closureData->currentTransfer;
    free(closureData);
    currentTransfer->opsInProgress--;

    // Start max overlapped PUT operations
    for (size_t i = 0; i < transferState->maxOverlappedChunks; i++) {
        put_chunk_of_file(transferState);
    // make sure the first error (if there is one) is the one we keep
    if (currentTransfer->status == KINETIC_STATUS_SUCCESS) {
        currentTransfer->status = kinetic_data->status;
    }

    if (kinetic_data->status != KINETIC_STATUS_SUCCESS) {
        fprintf(stderr, "Failed writing chunk! PUT response reported status: %s\n",
            Kinetic_GetStatusDescription(kinetic_data->status));
    }
    return transferState;
}

KineticStatus wait_for_transfer_complete(FileTransferProgress* const transfer)
static FileTransferProgress * start_file_transfer(KineticSessionHandle handle,
    char const * const filename, uint64_t keyPrefix)
{
    pthread_mutex_lock(&transfer->completeMutex);
    pthread_cond_wait(&transfer->completeCond, &transfer->completeMutex);
    pthread_mutex_unlock(&transfer->completeMutex);

    KineticStatus status = transfer->status;

    pthread_mutex_destroy(&transfer->completeMutex);
    pthread_cond_destroy(&transfer->completeCond);
    FileTransferProgress * transferState = calloc(1, sizeof(FileTransferProgress));
    int file = open(filename, O_RDONLY);
    *transferState = (FileTransferProgress) {
        .opsInProgress = 0,
        .status = KINETIC_STATUS_SUCCESS,
        .sessionHandle = handle,
        .fd = file,
    };

    close(transfer->fd);
    if (file < 0) {
        printf("Unable to open %s\n", filename);
        return NULL;
    }

    pthread_mutex_destroy(&transfer->transferMutex);
    free(transfer);
    struct stat inputfile_stat;
    fstat(file, &inputfile_stat);
    char* inputfile_data = (char*)mmap(0, inputfile_stat.st_size, PROT_READ, MAP_SHARED, file, 0);
   
    return status;
    for (off_t i = 0; i < inputfile_stat.st_size; i += 1024*1024) {
       int value_size = 1024*1024;
       if (i + value_size > inputfile_stat.st_size) {
           value_size = inputfile_stat.st_size - i + 1;
       }

int put_chunk_of_file(FileTransferProgress* transfer)
{
       AsyncWriteClosureData* closureData = calloc(1, sizeof(AsyncWriteClosureData));
       int32_t currentChunk = (i >> 20);
       uint64_t key = keyPrefix + currentChunk;

    pthread_mutex_lock(&transfer->transferMutex);

    transfer->opsInProgress++;
    closureData->currentTransfer = transfer;

    int bytesRead = read(transfer->fd, closureData->value, sizeof(closureData->value));
    if (bytesRead > 0) {
        transfer->currentChunk++;
       closureData->entry = (KineticEntry){
           .key = ByteBuffer_CreateAndAppend(closureData->key, sizeof(closureData->key),
                transfer->keyPrefix.array.data, transfer->keyPrefix.bytesUsed),
               &key, sizeof(key)),
           .tag = ByteBuffer_CreateAndAppendFormattedCString(closureData->tag, sizeof(closureData->tag),
                "some_value_tag..._%04d", transfer->currentChunk),
               "some_value_tag..._%04d", currentChunk),
           .algorithm = KINETIC_ALGORITHM_SHA1,
            .value = ByteBuffer_Create(closureData->value, sizeof(closureData->value), (size_t)bytesRead),
           .value = ByteBuffer_Create(&inputfile_data[i], value_size, value_size),
           .synchronization = KINETIC_SYNCHRONIZATION_WRITETHROUGH,
       };
        KineticStatus status = KineticClient_Put(transfer->sessionHandle,
       closureData->currentTransfer = transferState;
       transferState->opsInProgress++;
       KineticStatus status = KineticClient_Put(handle,
           &closureData->entry,
           &(KineticCompletionClosure) {
                .callback = put_chunk_of_file_finished,
               .callback = put_complete,
               .clientData = closureData,
           });
       if (status != KINETIC_STATUS_SUCCESS) {
            transfer->opsInProgress--;
           transferState->opsInProgress--;
           free(closureData);
           fprintf(stderr, "Failed writing chunk! PUT request reported status: %s\n",
               Kinetic_GetStatusDescription(status));
       }

    }
    else if (bytesRead == 0) { // EOF reached
        transfer->opsInProgress--;
        free(closureData);
    }
    else {
        transfer->opsInProgress--;
        free(closureData);
        fprintf(stderr, "Failed reading data from file!\n");
        REPORT_ERRNO(bytesRead, "read");
    return transferState;
}

    pthread_mutex_unlock(&transfer->transferMutex);
    
    return bytesRead;
static KineticStatus wait_for_transfer_complete(FileTransferProgress* const transfer)
{
    while (transfer->opsInProgress > 0) {
        if (KineticClient_AsyncRun(&transfer->sessionHandle) != KINETIC_STATUS_SUCCESS) {
            break;
        }
    }

void put_chunk_of_file_finished(KineticCompletionData* kinetic_data, void* clientData)
{
    AsyncWriteClosureData* closureData = clientData;
    FileTransferProgress* transfer = closureData->currentTransfer;
    free(closureData);
    KineticStatus status = transfer->status;

    pthread_mutex_lock(&transfer->transferMutex);
    transfer->opsInProgress--;
    pthread_mutex_unlock(&transfer->transferMutex);
    close(transfer->fd);

    if (kinetic_data->status == KINETIC_STATUS_SUCCESS) {
        int bytesPut = put_chunk_of_file(transfer);
        if (bytesPut <= 0 && transfer->opsInProgress == 0) {
            if (transfer->status == KINETIC_STATUS_NOT_ATTEMPTED) {
                transfer->status = KINETIC_STATUS_SUCCESS;
            }
            pthread_cond_signal(&transfer->completeCond);
        }
    }
    else {
        transfer->status = kinetic_data->status;
        // only signal when finished
        // keep track of outstanding operations
        // if there is no more data to read (or error), and no outstanding operations,
        // then signal
        pthread_cond_signal(&transfer->completeCond);
        pthread_mutex_destroy(&transfer->transferMutex);
        pthread_mutex_destroy(&transfer->completeMutex);
        fprintf(stderr, "Failed writing chunk! PUT response reported status: %s\n",
            Kinetic_GetStatusDescription(kinetic_data->status));
    }
    free(transfer);

    return status;
}
+12 −1
Original line number Diff line number Diff line
@@ -83,11 +83,22 @@ KineticStatus KineticClient_Connect(const KineticSession* config,
    }

    // Wait for initial unsolicited status to be received in order to obtain connectionID
    while(connection->connectionID == 0) {sleep(1);}
    while(connection->connectionID == 0) {
        if (!KineticClient_AsyncRun(handle))
        {
            break;
        }
    }

    return status;
}

KineticStatus KineticClient_AsyncRun(KineticSessionHandle* handle)
{
    KineticConnection* connection = KineticConnection_FromHandle(*handle);
    return KineticController_Run(connection);
}

KineticStatus KineticClient_Disconnect(KineticSessionHandle* const handle)
{
    if (*handle == KINETIC_HANDLE_INVALID) {
Loading