Loading include/byte_array.h +1 −0 Original line number Diff line number Diff line Loading @@ -51,6 +51,7 @@ ByteBuffer ByteBuffer_CreateAndAppend(void* data, size_t max_len, const void* va ByteBuffer ByteBuffer_CreateAndAppendArray(void* data, size_t max_len, const ByteArray value); ByteBuffer ByteBuffer_CreateAndAppendCString(void* data, size_t max_len, const char* value); ByteBuffer ByteBuffer_CreateAndAppendFormattedCString(void* data, size_t max_len, const char * format, ...); ByteBuffer ByteBuffer_CreateAndAppendDummyData(void* data, size_t max_len, size_t len); void ByteBuffer_Reset(ByteBuffer* buffer); long ByteBuffer_BytesRemaining(const ByteBuffer buffer); ByteArray ByteBuffer_Consume(ByteBuffer* buffer, size_t max_len); Loading src/lib/byte_array.c +7 −0 Original line number Diff line number Diff line Loading @@ -81,6 +81,13 @@ ByteBuffer ByteBuffer_CreateAndAppendCString(void* data, size_t max_len, const c return buf; } ByteBuffer ByteBuffer_CreateAndAppendDummyData(void* data, size_t max_len, size_t len) { ByteBuffer buf = ByteBuffer_Create(data, max_len, 0); ByteBuffer_AppendDummyData(&buf, len); return buf; } long ByteBuffer_BytesRemaining(const ByteBuffer buffer) { assert(buffer.array.data != NULL); Loading test/system/test_system_async_io.c +2 −0 Original line number Diff line number Diff line Loading @@ -24,6 +24,8 @@ #include <sys/time.h> #include <errno.h> STATIC const int TestDataSize = 100 * (1024*1024); struct kinetic_thread_arg { char ip[16]; struct kinetic_put_arg* opArgs; Loading test/system/test_system_overlapped_io.c +11 −15 Original line number Diff line number Diff line Loading @@ -28,14 +28,12 @@ #define MAX_ITERATIONS (2) #define NUM_COPIES (3) #define BUFSIZE (128 * KINETIC_OBJ_SIZE) #define KINETIC_MAX_THREADS (10) #define MAX_OBJ_SIZE (KINETIC_OBJ_SIZE) #define REPORT_ERRNO(en, msg) if(en != 0){errno = en; perror(msg);} STATIC const char HmacKeyString[] = "asdfasdf"; STATIC int SourceDataSize; STATIC const int TestDataSize = 50 * (1024*1024); struct kinetic_thread_arg { char ip[16]; Loading Loading @@ -81,18 +79,16 @@ void test_kinetic_client_should_be_able_to_store_an_arbitrarily_large_binary_obj printf("\n*** Overlapped PUT operation (iteration %d of %d)\n", iteration + 1, MAX_ITERATIONS); char* buf = malloc(sizeof(char) * BUFSIZE); int fd = open("test/support/data/test.data", O_RDONLY); SourceDataSize = read(fd, buf, BUFSIZE); close(fd); TEST_ASSERT_MESSAGE(SourceDataSize > 0, "read error"); TEST_ASSERT_MESSAGE(TestDataSize > 0, "read error"); // Allocate session/thread data struct kinetic_thread_arg* kt_arg; pthread_t thread_id[KINETIC_MAX_THREADS]; pthread_t thread_id[NUM_COPIES]; kt_arg = malloc(sizeof(struct kinetic_thread_arg) * NUM_COPIES); TEST_ASSERT_NOT_NULL_MESSAGE(kt_arg, "kinetic_thread_arg malloc failed"); uint8_t* testData = malloc(TestDataSize); ByteBuffer testBuf = ByteBuffer_CreateAndAppendDummyData(testData, TestDataSize, TestDataSize); ByteBuffer_Reset(&testBuf); // Establish all of the connection first, so their session can all get initialized first for (int i = 0; i < NUM_COPIES; i++) { Loading @@ -102,9 +98,7 @@ void test_kinetic_client_should_be_able_to_store_an_arbitrarily_large_binary_obj KINETIC_STATUS_SUCCESS, KineticClient_CreateConnection(&kt_arg[i].session, client)); strcpy(kt_arg[i].ip, sessionConfig.host); // Create a ByteBuffer for consuming chunks of data out of for overlapped PUTs kt_arg[i].data = ByteBuffer_Create(buf, SourceDataSize, 0); kt_arg[i].data = testBuf; // Configure the KineticEntry struct timeval now; Loading Loading @@ -157,7 +151,7 @@ void test_kinetic_client_should_be_able_to_store_an_arbitrarily_large_binary_obj // Cleanup the rest of the reources free(kt_arg); free(buf); free(testData); fflush(stdout); printf(" *** Iteration complete!\n"); Loading Loading @@ -205,7 +199,7 @@ static void* kinetic_put(void* kinetic_arg) ByteBuffer_Reset(&entry->value); ByteBuffer_AppendArray( &entry->value, ByteBuffer_Consume(&arg->data, MIN(ByteBuffer_BytesRemaining(arg->data), MAX_OBJ_SIZE)) ByteBuffer_Consume(&arg->data, MAX_OBJ_SIZE) ); // Set operation-specific attributes Loading @@ -227,8 +221,10 @@ static void* kinetic_put(void* kinetic_arg) gettimeofday(&stopTime, NULL); int64_t elapsed_us = ((stopTime.tv_sec - startTime.tv_sec) * 1000000) + (stopTime.tv_usec - startTime.tv_usec); LOGF0("elapsed us = %lu", elapsed_us); float elapsed_ms = elapsed_us / 1000.0f; arg->bandwidth = (arg->data.array.len * 1000.0f) / (elapsed_ms * 1024 * 1024); fflush(stdout); Loading Loading
include/byte_array.h +1 −0 Original line number Diff line number Diff line Loading @@ -51,6 +51,7 @@ ByteBuffer ByteBuffer_CreateAndAppend(void* data, size_t max_len, const void* va ByteBuffer ByteBuffer_CreateAndAppendArray(void* data, size_t max_len, const ByteArray value); ByteBuffer ByteBuffer_CreateAndAppendCString(void* data, size_t max_len, const char* value); ByteBuffer ByteBuffer_CreateAndAppendFormattedCString(void* data, size_t max_len, const char * format, ...); ByteBuffer ByteBuffer_CreateAndAppendDummyData(void* data, size_t max_len, size_t len); void ByteBuffer_Reset(ByteBuffer* buffer); long ByteBuffer_BytesRemaining(const ByteBuffer buffer); ByteArray ByteBuffer_Consume(ByteBuffer* buffer, size_t max_len); Loading
src/lib/byte_array.c +7 −0 Original line number Diff line number Diff line Loading @@ -81,6 +81,13 @@ ByteBuffer ByteBuffer_CreateAndAppendCString(void* data, size_t max_len, const c return buf; } ByteBuffer ByteBuffer_CreateAndAppendDummyData(void* data, size_t max_len, size_t len) { ByteBuffer buf = ByteBuffer_Create(data, max_len, 0); ByteBuffer_AppendDummyData(&buf, len); return buf; } long ByteBuffer_BytesRemaining(const ByteBuffer buffer) { assert(buffer.array.data != NULL); Loading
test/system/test_system_async_io.c +2 −0 Original line number Diff line number Diff line Loading @@ -24,6 +24,8 @@ #include <sys/time.h> #include <errno.h> STATIC const int TestDataSize = 100 * (1024*1024); struct kinetic_thread_arg { char ip[16]; struct kinetic_put_arg* opArgs; Loading
test/system/test_system_overlapped_io.c +11 −15 Original line number Diff line number Diff line Loading @@ -28,14 +28,12 @@ #define MAX_ITERATIONS (2) #define NUM_COPIES (3) #define BUFSIZE (128 * KINETIC_OBJ_SIZE) #define KINETIC_MAX_THREADS (10) #define MAX_OBJ_SIZE (KINETIC_OBJ_SIZE) #define REPORT_ERRNO(en, msg) if(en != 0){errno = en; perror(msg);} STATIC const char HmacKeyString[] = "asdfasdf"; STATIC int SourceDataSize; STATIC const int TestDataSize = 50 * (1024*1024); struct kinetic_thread_arg { char ip[16]; Loading Loading @@ -81,18 +79,16 @@ void test_kinetic_client_should_be_able_to_store_an_arbitrarily_large_binary_obj printf("\n*** Overlapped PUT operation (iteration %d of %d)\n", iteration + 1, MAX_ITERATIONS); char* buf = malloc(sizeof(char) * BUFSIZE); int fd = open("test/support/data/test.data", O_RDONLY); SourceDataSize = read(fd, buf, BUFSIZE); close(fd); TEST_ASSERT_MESSAGE(SourceDataSize > 0, "read error"); TEST_ASSERT_MESSAGE(TestDataSize > 0, "read error"); // Allocate session/thread data struct kinetic_thread_arg* kt_arg; pthread_t thread_id[KINETIC_MAX_THREADS]; pthread_t thread_id[NUM_COPIES]; kt_arg = malloc(sizeof(struct kinetic_thread_arg) * NUM_COPIES); TEST_ASSERT_NOT_NULL_MESSAGE(kt_arg, "kinetic_thread_arg malloc failed"); uint8_t* testData = malloc(TestDataSize); ByteBuffer testBuf = ByteBuffer_CreateAndAppendDummyData(testData, TestDataSize, TestDataSize); ByteBuffer_Reset(&testBuf); // Establish all of the connection first, so their session can all get initialized first for (int i = 0; i < NUM_COPIES; i++) { Loading @@ -102,9 +98,7 @@ void test_kinetic_client_should_be_able_to_store_an_arbitrarily_large_binary_obj KINETIC_STATUS_SUCCESS, KineticClient_CreateConnection(&kt_arg[i].session, client)); strcpy(kt_arg[i].ip, sessionConfig.host); // Create a ByteBuffer for consuming chunks of data out of for overlapped PUTs kt_arg[i].data = ByteBuffer_Create(buf, SourceDataSize, 0); kt_arg[i].data = testBuf; // Configure the KineticEntry struct timeval now; Loading Loading @@ -157,7 +151,7 @@ void test_kinetic_client_should_be_able_to_store_an_arbitrarily_large_binary_obj // Cleanup the rest of the reources free(kt_arg); free(buf); free(testData); fflush(stdout); printf(" *** Iteration complete!\n"); Loading Loading @@ -205,7 +199,7 @@ static void* kinetic_put(void* kinetic_arg) ByteBuffer_Reset(&entry->value); ByteBuffer_AppendArray( &entry->value, ByteBuffer_Consume(&arg->data, MIN(ByteBuffer_BytesRemaining(arg->data), MAX_OBJ_SIZE)) ByteBuffer_Consume(&arg->data, MAX_OBJ_SIZE) ); // Set operation-specific attributes Loading @@ -227,8 +221,10 @@ static void* kinetic_put(void* kinetic_arg) gettimeofday(&stopTime, NULL); int64_t elapsed_us = ((stopTime.tv_sec - startTime.tv_sec) * 1000000) + (stopTime.tv_usec - startTime.tv_usec); LOGF0("elapsed us = %lu", elapsed_us); float elapsed_ms = elapsed_us / 1000.0f; arg->bandwidth = (arg->data.array.len * 1000.0f) / (elapsed_ms * 1024 * 1024); fflush(stdout); Loading