Loading test/system/test_system_async_io.c +13 −18 Original line number Diff line number Diff line Loading @@ -54,18 +54,6 @@ STATIC KineticSessionHandle* kinetic_client; STATIC const char HmacKeyString[] = "asdfasdf"; STATIC int SourceDataSize; // struct kinetic_put_arg { // KineticSessionHandle sessionHandle; // char keyPrefix[KINETIC_DEFAULT_KEY_LEN]; // uint8_t key[KINETIC_DEFAULT_KEY_LEN]; // uint8_t version[KINETIC_DEFAULT_KEY_LEN]; // uint8_t tag[KINETIC_DEFAULT_KEY_LEN]; // uint8_t value[KINETIC_OBJ_SIZE]; // KineticEntry entry; // ByteBuffer data; // KineticStatus status; // float bandwidth; // }; struct kinetic_thread_arg { char ip[16]; Loading Loading @@ -137,6 +125,8 @@ int put_chunk_of_file(FileTransferProgress* transfer) } else { transfer->opsInProgress--; free(closureData); } } Loading @@ -144,14 +134,17 @@ int put_chunk_of_file(FileTransferProgress* transfer) { // no more data to read // but we're probably not done yet! transfer->opsInProgress--; free(closureData); } else { } transfer->opsInProgress--; free(closureData); } return bytesRead; } Loading Loading @@ -187,7 +180,7 @@ void update_with_status(FileTransferProgress* transfer, KineticStatus const stat } void start_file_transfer(KineticSessionHandle handle, char const * const filename, char const * const keyPrefix) FileTransferProgress * start_file_transfer(KineticSessionHandle handle, char const * const filename, char const * const keyPrefix) { FileTransferProgress * transferState = calloc(1, sizeof(FileTransferProgress)); transferState->sessionHandle = handle; Loading @@ -199,16 +192,18 @@ void start_file_transfer(KineticSessionHandle handle, char const * const filenam transferState->keyPrefix = ByteBuffer_Create(transferState->keyPrefixBuffer, sizeof(transferState->keyPrefixBuffer), 0); struct timeval now; gettimeofday(&now, NULL); ByteBuffer_AppendCString(&transferState->keyPrefix, filename); ByteBuffer_AppendCString(&transferState->keyPrefix, keyPrefix); ByteBuffer_AppendFormattedCString(&transferState->keyPrefix, "_%010llu", (unsigned long long)now.tv_sec); transferState->fd = open(filename, O_RDONLY); //start 4 async actions //start 4 async actions (fix concurrency issue) put_chunk_of_file(transferState); put_chunk_of_file(transferState); put_chunk_of_file(transferState); put_chunk_of_file(transferState); return transferState; } KineticStatus wait_for_put_finish(FileTransferProgress* const transfer) Loading Loading
test/system/test_system_async_io.c +13 −18 Original line number Diff line number Diff line Loading @@ -54,18 +54,6 @@ STATIC KineticSessionHandle* kinetic_client; STATIC const char HmacKeyString[] = "asdfasdf"; STATIC int SourceDataSize; // struct kinetic_put_arg { // KineticSessionHandle sessionHandle; // char keyPrefix[KINETIC_DEFAULT_KEY_LEN]; // uint8_t key[KINETIC_DEFAULT_KEY_LEN]; // uint8_t version[KINETIC_DEFAULT_KEY_LEN]; // uint8_t tag[KINETIC_DEFAULT_KEY_LEN]; // uint8_t value[KINETIC_OBJ_SIZE]; // KineticEntry entry; // ByteBuffer data; // KineticStatus status; // float bandwidth; // }; struct kinetic_thread_arg { char ip[16]; Loading Loading @@ -137,6 +125,8 @@ int put_chunk_of_file(FileTransferProgress* transfer) } else { transfer->opsInProgress--; free(closureData); } } Loading @@ -144,14 +134,17 @@ int put_chunk_of_file(FileTransferProgress* transfer) { // no more data to read // but we're probably not done yet! transfer->opsInProgress--; free(closureData); } else { } transfer->opsInProgress--; free(closureData); } return bytesRead; } Loading Loading @@ -187,7 +180,7 @@ void update_with_status(FileTransferProgress* transfer, KineticStatus const stat } void start_file_transfer(KineticSessionHandle handle, char const * const filename, char const * const keyPrefix) FileTransferProgress * start_file_transfer(KineticSessionHandle handle, char const * const filename, char const * const keyPrefix) { FileTransferProgress * transferState = calloc(1, sizeof(FileTransferProgress)); transferState->sessionHandle = handle; Loading @@ -199,16 +192,18 @@ void start_file_transfer(KineticSessionHandle handle, char const * const filenam transferState->keyPrefix = ByteBuffer_Create(transferState->keyPrefixBuffer, sizeof(transferState->keyPrefixBuffer), 0); struct timeval now; gettimeofday(&now, NULL); ByteBuffer_AppendCString(&transferState->keyPrefix, filename); ByteBuffer_AppendCString(&transferState->keyPrefix, keyPrefix); ByteBuffer_AppendFormattedCString(&transferState->keyPrefix, "_%010llu", (unsigned long long)now.tv_sec); transferState->fd = open(filename, O_RDONLY); //start 4 async actions //start 4 async actions (fix concurrency issue) put_chunk_of_file(transferState); put_chunk_of_file(transferState); put_chunk_of_file(transferState); put_chunk_of_file(transferState); return transferState; } KineticStatus wait_for_put_finish(FileTransferProgress* const transfer) Loading