Loading Makefile +1 −1 Original line number Diff line number Diff line Loading @@ -95,7 +95,7 @@ makedirs: all: default test system_tests test_internals run examples clean: makedirs update_git_submodules rm -rf ./bin/*.a ./bin/*.so rm -rf ./bin/*.a ./bin/*.so ./bin/kinetic-c-util rm -rf ./bin/**/* rm -f $(OUT_DIR)/*.o $(OUT_DIR)/*.a *.core *.log bundle exec rake clobber Loading src/lib/bus/bus.c +29 −0 Original line number Diff line number Diff line Loading @@ -26,6 +26,7 @@ #include <errno.h> #include <assert.h> #include <limits.h> #include <sys/resource.h> #include "bus.h" #include "sender.h" Loading @@ -46,6 +47,7 @@ static int listener_id_of_socket(struct bus *b, int fd); static void noop_log_cb(log_event_t event, int log_level, const char *msg, void *udata); static void noop_error_cb(bus_unpack_cb_res_t result, void *socket_udata); static bool attempt_to_increase_resource_limits(struct bus *b); static void set_defaults(bus_config *cfg) { if (cfg->sender_count == 0) { cfg->sender_count = 1; } Loading Loading @@ -110,6 +112,8 @@ bool bus_init(bus_config *config, struct bus_result *res) { log_lock_init = true; attempt_to_increase_resource_limits(b); BUS_LOG_SNPRINTF(b, 3, LOG_INITIALIZATION, b->udata, 64, "Initialized bus at %p", (void*)b); Loading Loading @@ -223,6 +227,31 @@ cleanup: return false; } static bool attempt_to_increase_resource_limits(struct bus *b) { struct rlimit info; if (-1 == getrlimit(RLIMIT_NOFILE, &info)) { fprintf(stderr, "getrlimit: %s", strerror(errno)); errno = 0; return false; } const unsigned int nval = 1024; BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 256, "Current FD resource limits, [%lu, %lu], changing to %u", (unsigned long)info.rlim_cur, (unsigned long)info.rlim_max, nval); if (info.rlim_cur < nval && info.rlim_max > nval) { info.rlim_cur = nval; if (-1 == setrlimit(RLIMIT_NOFILE, &info)) { fprintf(stderr, "getrlimit: %s", strerror(errno)); errno = 0; return false; } } return true; } /* Pack message to deliver on behalf of the user into an envelope * that can track status / routing along the way. * Loading src/lib/bus/listener.c +2 −0 Original line number Diff line number Diff line Loading @@ -854,6 +854,8 @@ static void notify_caller(int fd) { } static bool grow_read_buf(listener *l, size_t nsize) { if (nsize < l->read_buf_size) { return true; } uint8_t *nbuf = realloc(l->read_buf, nsize); if (nbuf) { struct bus *b = l->bus; Loading src/lib/kinetic_controller.c +14 −17 Original line number Diff line number Diff line Loading @@ -40,8 +40,7 @@ KineticOperation* KineticController_CreateOperation(KineticSession const * const return NULL; } LOGF1("\n" "--------------------------------------------------\n" LOGF3("--------------------------------------------------\n" "Building new operation on session @ 0x%llX", session); KineticOperation* operation = KineticAllocator_NewOperation(session->connection); Loading Loading @@ -78,20 +77,10 @@ static KineticCompletionClosure DefaultClosure(DefaultCallbackData * const data) KineticStatus KineticController_ExecuteOperation(KineticOperation* operation, KineticCompletionClosure* const closure) { assert(operation != NULL); assert(operation->connection != NULL); assert(&operation->connection->session != NULL); KineticStatus status = KINETIC_STATUS_INVALID; LOGF1("Executing operation: 0x%llX", operation); if (operation->entry != NULL && operation->entry->value.array.data != NULL && operation->entry->value.bytesUsed > 0) { LOGF1(" Sending PDU (0x%0llX) w/value (%zu bytes)", operation->request, operation->entry->value.bytesUsed); } else { LOGF1(" Sending PDU (0x%0llX) w/o value", operation->request); } if (closure != NULL) { operation->closure = *closure; Loading Loading @@ -188,6 +177,11 @@ void KineticController_HandleUnexecpectedResponse(void *msg, response->command->header != NULL && response->command->header->has_connectionID) { LOGF1("[PDU RX UNSOLICTED] pdu: 0x%0llX, session: 0x%llX, bus: 0x%llX, " "protoLen: %u, valueLen: %u", response, &connection->session, connection->messageBus, response->header.protobufLength, response->header.valueLength); // Extract connectionID from unsolicited status message connection->connectionID = response->command->header->connectionID; LOGF2("Extracted connection ID from unsolicited status PDU (id=%lld)", Loading Loading @@ -218,16 +212,19 @@ void KineticController_HandleExpectedResponse(bus_msg_result_t *res, void *udata response->command->status->has_code) { status = KineticProtoStatusCode_to_KineticStatus(response->command->status->code); LOGF2("Response PDU received w/status %s, %i", Kinetic_GetStatusDescription(status), status); KineticLogger_LogProtobuf(3, response->proto); op->response = response; } else { status = KINETIC_STATUS_INVALID; LOG0("Error: received a response with a nonexistent command or status"); } LOGF1("[PDU RX] pdu: 0x%0llX, op: 0x%llX, session: 0x%llX, bus: 0x%llX, " "protoLen: %u, valueLen: %u, status: %s", response, op, &op->connection->session, op->connection->messageBus, response->header.protobufLength, response->header.valueLength, Kinetic_GetStatusDescription(status)); } else { Loading src/lib/kinetic_operation.c +5 −1 Original line number Diff line number Diff line Loading @@ -60,6 +60,7 @@ KineticStatus KineticOperation_SendRequest(KineticOperation* const operation) static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const operation) { LOGF2("\nSending PDU via fd=%d", operation->connection->messageBus); KineticPDU* request = operation->request; KineticProto_Message* proto = &operation->request->message.message; Loading Loading @@ -124,6 +125,10 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o return KINETIC_STATUS_BUFFER_OVERRUN; } LOGF1("[PDU TX] pdu: 0x%0llX, op: 0x%llX, session: 0x%llX, bus: 0x%llX, protoLen: %u, valueLen: %u", operation->request, operation, &operation->connection->session, operation->connection->messageBus, header.protobufLength, header.valueLength); KineticLogger_LogHeader(2, &header); uint32_t nboProtoLength = KineticNBO_FromHostU32(header.protobufLength); Loading Loading @@ -154,7 +159,6 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o // Send the value/payload, if specified if (header.valueLength > 0) { LOGF2("Sending PDU Value Payload (%zu bytes)", operation->entry->value.bytesUsed); memcpy(&msg[offset], operation->entry->value.array.data, operation->entry->value.bytesUsed); offset += operation->entry->value.bytesUsed; } Loading Loading
Makefile +1 −1 Original line number Diff line number Diff line Loading @@ -95,7 +95,7 @@ makedirs: all: default test system_tests test_internals run examples clean: makedirs update_git_submodules rm -rf ./bin/*.a ./bin/*.so rm -rf ./bin/*.a ./bin/*.so ./bin/kinetic-c-util rm -rf ./bin/**/* rm -f $(OUT_DIR)/*.o $(OUT_DIR)/*.a *.core *.log bundle exec rake clobber Loading
src/lib/bus/bus.c +29 −0 Original line number Diff line number Diff line Loading @@ -26,6 +26,7 @@ #include <errno.h> #include <assert.h> #include <limits.h> #include <sys/resource.h> #include "bus.h" #include "sender.h" Loading @@ -46,6 +47,7 @@ static int listener_id_of_socket(struct bus *b, int fd); static void noop_log_cb(log_event_t event, int log_level, const char *msg, void *udata); static void noop_error_cb(bus_unpack_cb_res_t result, void *socket_udata); static bool attempt_to_increase_resource_limits(struct bus *b); static void set_defaults(bus_config *cfg) { if (cfg->sender_count == 0) { cfg->sender_count = 1; } Loading Loading @@ -110,6 +112,8 @@ bool bus_init(bus_config *config, struct bus_result *res) { log_lock_init = true; attempt_to_increase_resource_limits(b); BUS_LOG_SNPRINTF(b, 3, LOG_INITIALIZATION, b->udata, 64, "Initialized bus at %p", (void*)b); Loading Loading @@ -223,6 +227,31 @@ cleanup: return false; } static bool attempt_to_increase_resource_limits(struct bus *b) { struct rlimit info; if (-1 == getrlimit(RLIMIT_NOFILE, &info)) { fprintf(stderr, "getrlimit: %s", strerror(errno)); errno = 0; return false; } const unsigned int nval = 1024; BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 256, "Current FD resource limits, [%lu, %lu], changing to %u", (unsigned long)info.rlim_cur, (unsigned long)info.rlim_max, nval); if (info.rlim_cur < nval && info.rlim_max > nval) { info.rlim_cur = nval; if (-1 == setrlimit(RLIMIT_NOFILE, &info)) { fprintf(stderr, "getrlimit: %s", strerror(errno)); errno = 0; return false; } } return true; } /* Pack message to deliver on behalf of the user into an envelope * that can track status / routing along the way. * Loading
src/lib/bus/listener.c +2 −0 Original line number Diff line number Diff line Loading @@ -854,6 +854,8 @@ static void notify_caller(int fd) { } static bool grow_read_buf(listener *l, size_t nsize) { if (nsize < l->read_buf_size) { return true; } uint8_t *nbuf = realloc(l->read_buf, nsize); if (nbuf) { struct bus *b = l->bus; Loading
src/lib/kinetic_controller.c +14 −17 Original line number Diff line number Diff line Loading @@ -40,8 +40,7 @@ KineticOperation* KineticController_CreateOperation(KineticSession const * const return NULL; } LOGF1("\n" "--------------------------------------------------\n" LOGF3("--------------------------------------------------\n" "Building new operation on session @ 0x%llX", session); KineticOperation* operation = KineticAllocator_NewOperation(session->connection); Loading Loading @@ -78,20 +77,10 @@ static KineticCompletionClosure DefaultClosure(DefaultCallbackData * const data) KineticStatus KineticController_ExecuteOperation(KineticOperation* operation, KineticCompletionClosure* const closure) { assert(operation != NULL); assert(operation->connection != NULL); assert(&operation->connection->session != NULL); KineticStatus status = KINETIC_STATUS_INVALID; LOGF1("Executing operation: 0x%llX", operation); if (operation->entry != NULL && operation->entry->value.array.data != NULL && operation->entry->value.bytesUsed > 0) { LOGF1(" Sending PDU (0x%0llX) w/value (%zu bytes)", operation->request, operation->entry->value.bytesUsed); } else { LOGF1(" Sending PDU (0x%0llX) w/o value", operation->request); } if (closure != NULL) { operation->closure = *closure; Loading Loading @@ -188,6 +177,11 @@ void KineticController_HandleUnexecpectedResponse(void *msg, response->command->header != NULL && response->command->header->has_connectionID) { LOGF1("[PDU RX UNSOLICTED] pdu: 0x%0llX, session: 0x%llX, bus: 0x%llX, " "protoLen: %u, valueLen: %u", response, &connection->session, connection->messageBus, response->header.protobufLength, response->header.valueLength); // Extract connectionID from unsolicited status message connection->connectionID = response->command->header->connectionID; LOGF2("Extracted connection ID from unsolicited status PDU (id=%lld)", Loading Loading @@ -218,16 +212,19 @@ void KineticController_HandleExpectedResponse(bus_msg_result_t *res, void *udata response->command->status->has_code) { status = KineticProtoStatusCode_to_KineticStatus(response->command->status->code); LOGF2("Response PDU received w/status %s, %i", Kinetic_GetStatusDescription(status), status); KineticLogger_LogProtobuf(3, response->proto); op->response = response; } else { status = KINETIC_STATUS_INVALID; LOG0("Error: received a response with a nonexistent command or status"); } LOGF1("[PDU RX] pdu: 0x%0llX, op: 0x%llX, session: 0x%llX, bus: 0x%llX, " "protoLen: %u, valueLen: %u, status: %s", response, op, &op->connection->session, op->connection->messageBus, response->header.protobufLength, response->header.valueLength, Kinetic_GetStatusDescription(status)); } else { Loading
src/lib/kinetic_operation.c +5 −1 Original line number Diff line number Diff line Loading @@ -60,6 +60,7 @@ KineticStatus KineticOperation_SendRequest(KineticOperation* const operation) static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const operation) { LOGF2("\nSending PDU via fd=%d", operation->connection->messageBus); KineticPDU* request = operation->request; KineticProto_Message* proto = &operation->request->message.message; Loading Loading @@ -124,6 +125,10 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o return KINETIC_STATUS_BUFFER_OVERRUN; } LOGF1("[PDU TX] pdu: 0x%0llX, op: 0x%llX, session: 0x%llX, bus: 0x%llX, protoLen: %u, valueLen: %u", operation->request, operation, &operation->connection->session, operation->connection->messageBus, header.protobufLength, header.valueLength); KineticLogger_LogHeader(2, &header); uint32_t nboProtoLength = KineticNBO_FromHostU32(header.protobufLength); Loading Loading @@ -154,7 +159,6 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o // Send the value/payload, if specified if (header.valueLength > 0) { LOGF2("Sending PDU Value Payload (%zu bytes)", operation->entry->value.bytesUsed); memcpy(&msg[offset], operation->entry->value.array.data, operation->entry->value.bytesUsed); offset += operation->entry->value.bytesUsed; } Loading