Loading Makefile +1 −1 Original line number Diff line number Diff line Loading @@ -94,7 +94,7 @@ makedirs: all: default test system_tests test_internals run examples clean: makedirs update_git_submodules rm -rf ./bin/*.a ./bin/*.so rm -rf ./bin/*.a ./bin/*.so ./bin/kinetic-c-util rm -rf ./bin/**/* rm -f $(OUT_DIR)/*.o $(OUT_DIR)/*.a *.core *.log bundle exec rake clobber Loading src/lib/kinetic_controller.c +14 −17 Original line number Diff line number Diff line Loading @@ -40,8 +40,7 @@ KineticOperation* KineticController_CreateOperation(KineticSession const * const return NULL; } LOGF1("\n" "--------------------------------------------------\n" LOGF3("--------------------------------------------------\n" "Building new operation on session @ 0x%llX", session); KineticOperation* operation = KineticAllocator_NewOperation(session->connection); Loading Loading @@ -76,20 +75,10 @@ static KineticCompletionClosure DefaultClosure(DefaultCallbackData * const data) KineticStatus KineticController_ExecuteOperation(KineticOperation* operation, KineticCompletionClosure* const closure) { assert(operation != NULL); assert(operation->connection != NULL); assert(&operation->connection->session != NULL); KineticStatus status = KINETIC_STATUS_INVALID; LOGF1("Executing operation: 0x%llX", operation); if (operation->entry != NULL && operation->entry->value.array.data != NULL && operation->entry->value.bytesUsed > 0) { LOGF1(" Sending PDU (0x%0llX) w/value (%zu bytes)", operation->request, operation->entry->value.bytesUsed); } else { LOGF1(" Sending PDU (0x%0llX) w/o value", operation->request); } if (closure != NULL) { operation->closure = *closure; Loading Loading @@ -186,6 +175,11 @@ void KineticController_HandleUnexecpectedResponse(void *msg, response->command->header != NULL && response->command->header->has_connectionID) { LOGF1("[PDU RX UNSOLICTED] pdu: 0x%0llX, session: 0x%llX, bus: 0x%llX, " "protoLen: %u, valueLen: %u", response, &connection->session, connection->messageBus, response->header.protobufLength, response->header.valueLength); // Extract connectionID from unsolicited status message connection->connectionID = response->command->header->connectionID; LOGF2("Extracted connection ID from unsolicited status PDU (id=%lld)", Loading Loading @@ -216,16 +210,19 @@ void KineticController_HandleExpectedResponse(bus_msg_result_t *res, void *udata response->command->status->has_code) { status = KineticProtoStatusCode_to_KineticStatus(response->command->status->code); LOGF2("Response PDU received w/status %s, %i", Kinetic_GetStatusDescription(status), status); KineticLogger_LogProtobuf(3, response->proto); op->response = response; } else { status = KINETIC_STATUS_INVALID; LOG0("Error: received a response with a nonexistent command or status"); } LOGF1("[PDU RX] pdu: 0x%0llX, op: 0x%llX, session: 0x%llX, bus: 0x%llX, " "protoLen: %u, valueLen: %u, status: %s", response, op, &op->connection->session, op->connection->messageBus, response->header.protobufLength, response->header.valueLength, Kinetic_GetStatusDescription(status)); } else { Loading src/lib/kinetic_operation.c +6 −2 Original line number Diff line number Diff line Loading @@ -40,7 +40,7 @@ KineticStatus KineticOperation_SendRequest(KineticOperation* const operation) assert(operation != NULL); assert(operation->connection != NULL); assert(operation->request != NULL); LOGF2("\nSending PDU via fd=%d", operation->connection->messageBus); KineticPDU* request = operation->request; KineticProto_Message* proto = &operation->request->message.message; Loading Loading @@ -103,6 +103,11 @@ KineticStatus KineticOperation_SendRequest(KineticOperation* const operation) else { header.valueLength = 0; } LOGF1("[PDU TX] pdu: 0x%0llX, op: 0x%llX, session: 0x%llX, bus: 0x%llX, protoLen: %u, valueLen: %u", operation->request, operation, &operation->connection->session, operation->connection->messageBus, header.protobufLength, header.valueLength); KineticLogger_LogHeader(2, &header); uint32_t nboProtoLength = KineticNBO_FromHostU32(header.protobufLength); Loading Loading @@ -136,7 +141,6 @@ KineticStatus KineticOperation_SendRequest(KineticOperation* const operation) // Send the value/payload, if specified if (header.valueLength > 0) { LOGF2("Sending PDU Value Payload (%zu bytes)", operation->entry->value.bytesUsed); memcpy(&msg[offset], operation->entry->value.array.data, operation->entry->value.bytesUsed); offset += operation->entry->value.bytesUsed; } Loading src/lib/kinetic_pdu.c +3 −10 Original line number Diff line number Diff line Loading @@ -31,12 +31,6 @@ #include "bus.h" #include "kinetic_pdu_unpack.h" #ifdef TEST #define STATIC #else #define STATIC static #endif #include <time.h> STATIC void log_cb(log_event_t event, int log_level, const char *msg, void *udata) { Loading @@ -44,9 +38,8 @@ STATIC void log_cb(log_event_t event, int log_level, const char *msg, void *udat const char *event_str = bus_log_event_str(event); struct timeval tv; gettimeofday(&tv, NULL); FILE *f = stdout; /* stderr */ fprintf(f, "%ld.%06d: %s[%d] -- %s\n", tv.tv_sec, (int)tv.tv_usec, LOGF1("%ld.%06ld: %s[%d] -- %s\n", tv.tv_sec, (long)tv.tv_usec, event_str, log_level, msg); } Loading Loading @@ -216,7 +209,7 @@ bool KineticPDU_InitBus(int log_level, KineticClient * client) { bus_config cfg = { .log_cb = log_cb, .log_level = 1, .log_level = (log_level > 1) ? 1 : 0, .sink_cb = sink_cb, .unpack_cb = unpack_cb, .unexpected_msg_cb = KineticController_HandleUnexecpectedResponse, Loading test/system/test_system_async_io.c +1 −0 Original line number Diff line number Diff line Loading @@ -157,6 +157,7 @@ void test_kinetic_client_should_store_a_binary_object_split_across_entries_via_o static void put_finished(KineticCompletionData* kinetic_data, void* clientData) { // LOGF1("DONE w/ status: %s", kinetic_data->status); PutStatus * put_status = clientData; // Save PUT result status put_status->status = kinetic_data->status; Loading Loading
Makefile +1 −1 Original line number Diff line number Diff line Loading @@ -94,7 +94,7 @@ makedirs: all: default test system_tests test_internals run examples clean: makedirs update_git_submodules rm -rf ./bin/*.a ./bin/*.so rm -rf ./bin/*.a ./bin/*.so ./bin/kinetic-c-util rm -rf ./bin/**/* rm -f $(OUT_DIR)/*.o $(OUT_DIR)/*.a *.core *.log bundle exec rake clobber Loading
src/lib/kinetic_controller.c +14 −17 Original line number Diff line number Diff line Loading @@ -40,8 +40,7 @@ KineticOperation* KineticController_CreateOperation(KineticSession const * const return NULL; } LOGF1("\n" "--------------------------------------------------\n" LOGF3("--------------------------------------------------\n" "Building new operation on session @ 0x%llX", session); KineticOperation* operation = KineticAllocator_NewOperation(session->connection); Loading Loading @@ -76,20 +75,10 @@ static KineticCompletionClosure DefaultClosure(DefaultCallbackData * const data) KineticStatus KineticController_ExecuteOperation(KineticOperation* operation, KineticCompletionClosure* const closure) { assert(operation != NULL); assert(operation->connection != NULL); assert(&operation->connection->session != NULL); KineticStatus status = KINETIC_STATUS_INVALID; LOGF1("Executing operation: 0x%llX", operation); if (operation->entry != NULL && operation->entry->value.array.data != NULL && operation->entry->value.bytesUsed > 0) { LOGF1(" Sending PDU (0x%0llX) w/value (%zu bytes)", operation->request, operation->entry->value.bytesUsed); } else { LOGF1(" Sending PDU (0x%0llX) w/o value", operation->request); } if (closure != NULL) { operation->closure = *closure; Loading Loading @@ -186,6 +175,11 @@ void KineticController_HandleUnexecpectedResponse(void *msg, response->command->header != NULL && response->command->header->has_connectionID) { LOGF1("[PDU RX UNSOLICTED] pdu: 0x%0llX, session: 0x%llX, bus: 0x%llX, " "protoLen: %u, valueLen: %u", response, &connection->session, connection->messageBus, response->header.protobufLength, response->header.valueLength); // Extract connectionID from unsolicited status message connection->connectionID = response->command->header->connectionID; LOGF2("Extracted connection ID from unsolicited status PDU (id=%lld)", Loading Loading @@ -216,16 +210,19 @@ void KineticController_HandleExpectedResponse(bus_msg_result_t *res, void *udata response->command->status->has_code) { status = KineticProtoStatusCode_to_KineticStatus(response->command->status->code); LOGF2("Response PDU received w/status %s, %i", Kinetic_GetStatusDescription(status), status); KineticLogger_LogProtobuf(3, response->proto); op->response = response; } else { status = KINETIC_STATUS_INVALID; LOG0("Error: received a response with a nonexistent command or status"); } LOGF1("[PDU RX] pdu: 0x%0llX, op: 0x%llX, session: 0x%llX, bus: 0x%llX, " "protoLen: %u, valueLen: %u, status: %s", response, op, &op->connection->session, op->connection->messageBus, response->header.protobufLength, response->header.valueLength, Kinetic_GetStatusDescription(status)); } else { Loading
src/lib/kinetic_operation.c +6 −2 Original line number Diff line number Diff line Loading @@ -40,7 +40,7 @@ KineticStatus KineticOperation_SendRequest(KineticOperation* const operation) assert(operation != NULL); assert(operation->connection != NULL); assert(operation->request != NULL); LOGF2("\nSending PDU via fd=%d", operation->connection->messageBus); KineticPDU* request = operation->request; KineticProto_Message* proto = &operation->request->message.message; Loading Loading @@ -103,6 +103,11 @@ KineticStatus KineticOperation_SendRequest(KineticOperation* const operation) else { header.valueLength = 0; } LOGF1("[PDU TX] pdu: 0x%0llX, op: 0x%llX, session: 0x%llX, bus: 0x%llX, protoLen: %u, valueLen: %u", operation->request, operation, &operation->connection->session, operation->connection->messageBus, header.protobufLength, header.valueLength); KineticLogger_LogHeader(2, &header); uint32_t nboProtoLength = KineticNBO_FromHostU32(header.protobufLength); Loading Loading @@ -136,7 +141,6 @@ KineticStatus KineticOperation_SendRequest(KineticOperation* const operation) // Send the value/payload, if specified if (header.valueLength > 0) { LOGF2("Sending PDU Value Payload (%zu bytes)", operation->entry->value.bytesUsed); memcpy(&msg[offset], operation->entry->value.array.data, operation->entry->value.bytesUsed); offset += operation->entry->value.bytesUsed; } Loading
src/lib/kinetic_pdu.c +3 −10 Original line number Diff line number Diff line Loading @@ -31,12 +31,6 @@ #include "bus.h" #include "kinetic_pdu_unpack.h" #ifdef TEST #define STATIC #else #define STATIC static #endif #include <time.h> STATIC void log_cb(log_event_t event, int log_level, const char *msg, void *udata) { Loading @@ -44,9 +38,8 @@ STATIC void log_cb(log_event_t event, int log_level, const char *msg, void *udat const char *event_str = bus_log_event_str(event); struct timeval tv; gettimeofday(&tv, NULL); FILE *f = stdout; /* stderr */ fprintf(f, "%ld.%06d: %s[%d] -- %s\n", tv.tv_sec, (int)tv.tv_usec, LOGF1("%ld.%06ld: %s[%d] -- %s\n", tv.tv_sec, (long)tv.tv_usec, event_str, log_level, msg); } Loading Loading @@ -216,7 +209,7 @@ bool KineticPDU_InitBus(int log_level, KineticClient * client) { bus_config cfg = { .log_cb = log_cb, .log_level = 1, .log_level = (log_level > 1) ? 1 : 0, .sink_cb = sink_cb, .unpack_cb = unpack_cb, .unexpected_msg_cb = KineticController_HandleUnexecpectedResponse, Loading
test/system/test_system_async_io.c +1 −0 Original line number Diff line number Diff line Loading @@ -157,6 +157,7 @@ void test_kinetic_client_should_store_a_binary_object_split_across_entries_via_o static void put_finished(KineticCompletionData* kinetic_data, void* clientData) { // LOGF1("DONE w/ status: %s", kinetic_data->status); PutStatus * put_status = clientData; // Save PUT result status put_status->status = kinetic_data->status; Loading