Loading src/lib/bus/bus.c +40 −45 Original line number Diff line number Diff line Loading @@ -61,7 +61,7 @@ int completion_pipe = -1; void *unused = NULL; #endif bool bus_init(bus_config *config, struct bus_result *res) { bool Bus_Init(bus_config *config, struct bus_result *res) { if (res == NULL) { return false; } if (config == NULL) { res->status = BUS_INIT_ERROR_NULL; Loading Loading @@ -96,7 +96,7 @@ bool bus_init(bus_config *config, struct bus_result *res) { bus *b = calloc(1, sizeof(*b)); if (b == NULL) { goto cleanup; } if (!bus_ssl_init(b)) { goto cleanup; } if (!BusSSL_Init(b)) { goto cleanup; } b->sink_cb = config->sink_cb; b->unpack_cb = config->unpack_cb; Loading Loading @@ -127,7 +127,7 @@ bool bus_init(bus_config *config, struct bus_result *res) { } for (int i = 0; i < config->listener_count; i++) { ls[i] = listener_init(b, config); ls[i] = Listener_Init(b, config); if (ls[i] == NULL) { res->status = BUS_INIT_ERROR_LISTENER_INIT_FAIL; goto cleanup; Loading @@ -150,7 +150,7 @@ bool bus_init(bus_config *config, struct bus_result *res) { goto cleanup; } fd_set = yacht_init(DEF_FD_SET_SIZE2); fd_set = Yacht_Init(DEF_FD_SET_SIZE2); if (fd_set == NULL) { goto cleanup; } Loading Loading @@ -178,7 +178,7 @@ bool bus_init(bus_config *config, struct bus_result *res) { cleanup: if (ls) { for (int i = 0; i < config->listener_count; i++) { if (ls[i]) { listener_free(ls[i]); } if (ls[i]) { Listener_Free(ls[i]); } } free(ls); } Loading @@ -195,7 +195,7 @@ cleanup: } if (threads) { free(threads); } if (fd_set) { yacht_free(fd_set, NULL, NULL); } if (fd_set) { Yacht_Free(fd_set, NULL, NULL); } return false; } Loading Loading @@ -259,7 +259,7 @@ static boxed_msg *box_msg(struct bus *b, bus_user_msg *msg) { void *value = NULL; #endif connection_info *ci = NULL; if (yacht_get(b->fd_set, box->fd, &value)) { if (Yacht_Get(b->fd_set, box->fd, &value)) { ci = (connection_info *)value; } if (0 != pthread_mutex_unlock(&b->fd_set_lock)) { assert(false); } Loading Loading @@ -302,7 +302,7 @@ static boxed_msg *box_msg(struct bus *b, bus_user_msg *msg) { return box; } bool bus_send_request(struct bus *b, bus_user_msg *msg) bool Bus_SendRequest(struct bus *b, bus_user_msg *msg) { if (b == NULL || msg == NULL || msg->fd == -1) { return false; Loading @@ -315,7 +315,7 @@ bool bus_send_request(struct bus *b, bus_user_msg *msg) BUS_LOG_SNPRINTF(b, 3-0, LOG_SENDING_REQUEST, b->udata, 64, "Sending request <fd:%d, seq_id:%lld>", msg->fd, (long long)msg->seq_id); bool res = send_do_blocking_send(b, box); bool res = Send_DoBlockingSend(b, box); BUS_LOG_SNPRINTF(b, 3, LOG_SENDING_REQUEST, b->udata, 64, "...request sent, result %d", res); Loading @@ -335,12 +335,12 @@ static int listener_id_of_socket(struct bus *b, int fd) { return fd % b->listener_count; } struct listener *bus_get_listener_for_socket(struct bus *b, int fd) { struct listener *Bus_GetListenerForSocket(struct bus *b, int fd) { return b->listeners[listener_id_of_socket(b, fd)]; } /* Get the string key for a log event ID. */ const char *bus_log_event_str(log_event_t event) { const char *Bus_LogEventStr(log_event_t event) { switch (event) { case LOG_INITIALIZATION: return "INITIALIZATION"; case LOG_NEW_CLIENT: return "NEW_CLIENT"; Loading @@ -355,7 +355,7 @@ const char *bus_log_event_str(log_event_t event) { } } bool bus_register_socket(struct bus *b, bus_socket_t type, int fd, void *udata) { bool Bus_RegisterSocket(struct bus *b, bus_socket_t type, int fd, void *udata) { /* Register a socket internally with the listener. */ int l_id = listener_id_of_socket(b, fd); Loading @@ -377,7 +377,7 @@ bool bus_register_socket(struct bus *b, bus_socket_t type, int fd, void *udata) SSL *ssl = NULL; if (type == BUS_SOCKET_SSL) { ssl = bus_ssl_connect(b, fd); ssl = BusSSL_Connect(b, fd); if (ssl == NULL) { goto cleanup; } } else { ssl = BUS_NO_SSL; Loading @@ -394,7 +394,7 @@ bool bus_register_socket(struct bus *b, bus_socket_t type, int fd, void *udata) #endif /* Lock hash table and save whether this FD uses SSL. */ if (0 != pthread_mutex_lock(&b->fd_set_lock)) { assert(false); } bool set_ok = yacht_set(b->fd_set, fd, ci, &old_value); bool set_ok = Yacht_Set(b->fd_set, fd, ci, &old_value); if (0 != pthread_mutex_unlock(&b->fd_set_lock)) { assert(false); } if (set_ok) { Loading @@ -407,11 +407,11 @@ bool bus_register_socket(struct bus *b, bus_socket_t type, int fd, void *udata) #ifndef TEST int completion_pipe = -1; #endif res = listener_add_socket(l, ci, &completion_pipe); res = Listener_AddSocket(l, ci, &completion_pipe); if (!res) { goto cleanup; } BUS_LOG(b, 2, LOG_SOCKET_REGISTERED, "polling on socket add...", b->udata); bool completed = bus_poll_on_completion(b, completion_pipe); bool completed = BusPoll_OnCompletion(b, completion_pipe); if (!completed) { goto cleanup; } BUS_LOG(b, 2, LOG_SOCKET_REGISTERED, "successfully added socket", b->udata); Loading @@ -425,7 +425,7 @@ cleanup: } /* Free metadata about a socket that has been disconnected. */ bool bus_release_socket(struct bus *b, int fd, void **socket_udata_out) { bool Bus_ReleaseSocket(struct bus *b, int fd, void **socket_udata_out) { int l_id = listener_id_of_socket(b, fd); BUS_LOG_SNPRINTF(b, 2, LOG_SOCKET_REGISTERED, b->udata, 64, Loading @@ -436,12 +436,12 @@ bool bus_release_socket(struct bus *b, int fd, void **socket_udata_out) { #ifndef TEST int completion_pipe = -1; #endif if (!listener_remove_socket(l, fd, &completion_pipe)) { if (!Listener_RemoveSocket(l, fd, &completion_pipe)) { return false; /* couldn't send msg to listener */ } assert(completion_pipe != -1); bool completed = bus_poll_on_completion(b, completion_pipe); bool completed = BusPoll_OnCompletion(b, completion_pipe); if (!completed) { /* listener hung up while waiting */ return false; } Loading @@ -451,7 +451,7 @@ bool bus_release_socket(struct bus *b, int fd, void **socket_udata_out) { void *old_value = NULL; #endif if (0 != pthread_mutex_lock(&b->fd_set_lock)) { assert(false); } bool rm_ok = yacht_remove(b->fd_set, fd, &old_value); bool rm_ok = Yacht_Remove(b->fd_set, fd, &old_value); if (0 != pthread_mutex_unlock(&b->fd_set_lock)) { assert(false); } if (!rm_ok) { return false; Loading @@ -467,18 +467,13 @@ bool bus_release_socket(struct bus *b, int fd, void **socket_udata_out) { if (ci->ssl == BUS_NO_SSL) { res = true; /* nothing else to do */ } else { res = bus_ssl_disconnect(b, ci->ssl); res = BusSSL_Disconnect(b, ci->ssl); } free(ci); return res; } bool bus_schedule_threadpool_task(struct bus *b, struct threadpool_task *task, size_t *backpressure) { return threadpool_schedule(b->threadpool, task, backpressure); } #ifndef TEST static #endif Loading @@ -492,11 +487,11 @@ void free_connection_cb(void *value, void *udata) { #ifndef TEST int completion_pipe = -1; #endif if (!listener_remove_socket(l, ci->fd, &completion_pipe)) { if (!Listener_RemoveSocket(l, ci->fd, &completion_pipe)) { return; /* couldn't send msg to listener */ } bool completed = bus_poll_on_completion(b, completion_pipe); bool completed = BusPoll_OnCompletion(b, completion_pipe); if (!completed) { return; } Loading @@ -504,7 +499,7 @@ void free_connection_cb(void *value, void *udata) { free(ci); } bool bus_shutdown(bus *b) { bool Bus_Shutdown(bus *b) { for (;;) { shutdown_state_t ss = b->shutdown_state; /* Another thread is already shutting things down. */ Loading @@ -517,7 +512,7 @@ bool bus_shutdown(bus *b) { if (b->fd_set) { BUS_LOG(b, 2, LOG_SHUTDOWN, "removing all connections", b->udata); yacht_free(b->fd_set, free_connection_cb, b); Yacht_Free(b->fd_set, free_connection_cb, b); b->fd_set = NULL; } Loading @@ -529,25 +524,25 @@ bool bus_shutdown(bus *b) { for (int i = 0; i < b->listener_count; i++) { if (!b->joined[i]) { BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128, "listener_shutdown -- %d", i); if (!listener_shutdown(b->listeners[i], &completion_pipe)) { "Listener_Shutdown -- %d", i); if (!Listener_Shutdown(b->listeners[i], &completion_pipe)) { b->shutdown_state = SHUTDOWN_STATE_RUNNING; return false; } if (!bus_poll_on_completion(b, completion_pipe)) { if (!BusPoll_OnCompletion(b, completion_pipe)) { b->shutdown_state = SHUTDOWN_STATE_RUNNING; return false; } BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128, "listener_shutdown -- joining %d", i); "Listener_Shutdown -- joining %d", i); #ifndef TEST void *unused = NULL; #endif int res = syscall_pthread_join(b->threads[i], &unused); BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128, "listener_shutdown -- joined %d", i); "Listener_Shutdown -- joined %d", i); if (res != 0) { b->shutdown_state = SHUTDOWN_STATE_RUNNING; return false; Loading @@ -561,7 +556,7 @@ bool bus_shutdown(bus *b) { return true; } void bus_backpressure_delay(struct bus *b, size_t backpressure, uint8_t shift) { void Bus_BackpressureDelay(struct bus *b, size_t backpressure, uint8_t shift) { /* Push back if message bus is too busy. */ backpressure >>= shift; Loading @@ -572,11 +567,11 @@ void bus_backpressure_delay(struct bus *b, size_t backpressure, uint8_t shift) { } } void bus_lock_log(struct bus *b) { void Bus_LockLog(struct bus *b) { pthread_mutex_lock(&b->log_lock); } void bus_unlock_log(struct bus *b) { void Bus_UnlockLog(struct bus *b) { pthread_mutex_unlock(&b->log_lock); } Loading @@ -598,7 +593,7 @@ static void box_cleanup_cb(void *udata) { /* Deliver a boxed message to the thread pool to execute. * The boxed message will be freed by the threadpool. */ bool bus_process_boxed_message(struct bus *b, bool Bus_ProcessBoxedMessage(struct bus *b, struct boxed_msg *box, size_t *backpressure) { assert(box); assert(box->result.status != BUS_SEND_UNDEFINED); Loading @@ -611,23 +606,23 @@ bool bus_process_boxed_message(struct bus *b, BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "Scheduling boxed message -- %p -- where it will be freed", (void*)box); return bus_schedule_threadpool_task(b, &task, backpressure); return threadpool_schedule(b->threadpool, &task, backpressure); } /* How many seconds should it give the thread pool to shut down? */ #define THREAD_SHUTDOWN_SECONDS 5 void bus_free(bus *b) { void Bus_Free(bus *b) { if (b == NULL) { return; } while (b->shutdown_state != SHUTDOWN_STATE_HALTED) { if (bus_shutdown(b)) { break; } if (Bus_Shutdown(b)) { break; } syscall_poll(NULL, 0, 10); // sleep 10 msec } for (int i = 0; i < b->listener_count; i++) { BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128, "listener_free -- %d", i); listener_free(b->listeners[i]); "Listener_Free -- %d", i); Listener_Free(b->listeners[i]); } free(b->listeners); Loading @@ -651,7 +646,7 @@ void bus_free(bus *b) { pthread_mutex_destroy(&b->fd_set_lock); pthread_mutex_destroy(&b->log_lock); bus_ssl_ctx_free(b); BusSSL_CtxFree(b); free(b); } Loading src/lib/bus/bus.h +12 −12 Original line number Diff line number Diff line Loading @@ -22,24 +22,24 @@ #include "bus_types.h" /* Initialize a bus, based on configuration in *config. Returns a bool /* Initialize a bus, based on configuration in *config. RetuBus_RegisterSockets a bool * indicating whether the construction succeeded, and the bus pointer * and/or a status code indicating the cause of failure in *res. */ bool bus_init(bus_config *config, struct bus_result *res); bool Bus_Init(bus_config *config, struct bus_result *res); /* Send a request. Blocks until the request has been transmitted. * * Assumes the FD has been registered with bus_register_socket; * Assumes the FD has been registered with Bus_register_socket; * sending to an unregistered socket is an error. * * Returns true if the request has been accepted and the bus will * RetuBus_RegisterSockets true if the request has been accepted and the bus will * attempt to handle the request and response. They can still fail, * but the error status will be passed to the result handler callback. * * Returns false if the request has been rejected, due to a memory * RetuBus_RegisterSockets false if the request has been rejected, due to a memory * allocation error or invalid arguments. * */ bool bus_send_request(struct bus *b, bus_user_msg *msg); bool Bus_SendRequest(struct bus *b, bus_user_msg *msg); /* Register a socket connected to an endpoint, and data that will be passed * to all interactions on that socket. Loading @@ -49,21 +49,21 @@ bool bus_send_request(struct bus *b, bus_user_msg *msg); * * If USES_SSL is true, then the function will block until the initial * SSL/TLS connection handshake has completed. */ bool bus_register_socket(struct bus *b, bus_socket_t type, int fd, void *socket_udata); bool Bus_RegisterSocket(struct bus *b, bus_socket_t type, int fd, void *socket_udata); /* Free metadata about a socket that has been disconnected. */ bool bus_release_socket(struct bus *b, int fd, void **socket_udata_out); bool Bus_ReleaseSocket(struct bus *b, int fd, void **socket_udata_out); /* Begin shutting the system down. Returns true once everything pending /* Begin shutting the system down. RetuBus_RegisterSockets true once everything pending * has resolved. */ bool bus_shutdown(struct bus *b); bool Bus_Shutdown(struct bus *b); /* Free internal data structures for the bus. */ void bus_free(struct bus *b); void Bus_Free(struct bus *b); /* Inward facing portion of the message bus -- functions called * by other parts of the message bus, like the Listener thread, * but not by code outside the bus. */ #include "bus_inward.h" #include "Bus_inward.h" #endif src/lib/bus/bus_example.c +8 −8 Original line number Diff line number Diff line Loading @@ -88,7 +88,7 @@ static time_t get_cur_second(void); static void log_cb(log_event_t event, int log_level, const char *msg, void *udata) { example_state *s = (example_state *)udata; const char *event_str = bus_log_event_str(event); const char *event_str = Bus_LogEventStr(event); fprintf(/*stderr*/stdout, "%ld -- %s[%d] -- %s\n", s->last_second, event_str, log_level, msg); } Loading Loading @@ -253,7 +253,7 @@ int main(int argc, char **argv) { .bus_udata = &state, }; bus_result res = {0}; if (!bus_init(&cfg, &res)) { if (!Bus_Init(&cfg, &res)) { LOG(0, "failed to init bus: %d\n", res.status); return 1; } Loading @@ -264,8 +264,8 @@ int main(int argc, char **argv) { if (b) { LOG(1, "shutting down\n"); bus_shutdown(b); bus_free(b); Bus_Shutdown(b); Bus_Free(b); return 0; } else { return 1; Loading Loading @@ -445,7 +445,7 @@ static void tick_handler(example_state *s) { static time_t get_cur_second(void) { struct timeval tv; if (!util_timestamp(&tv, true)) { if (!Util_Timestamp(&tv, true)) { assert(false); } return tv.tv_sec; Loading @@ -456,7 +456,7 @@ static void run_bus(example_state *s, struct bus *b) { open_sockets(s); for (int i = 0; i < s->sockets_used; i++) { bus_register_socket(b, BUS_SOCKET_PLAIN, s->sockets[i], s->info[i]); Bus_RegisterSocket(b, BUS_SOCKET_PLAIN, s->sockets[i], s->info[i]); } int cur_socket_i = 0; Loading Loading @@ -503,8 +503,8 @@ static void run_bus(example_state *s, struct bus *b) { s->sent_msgs++; payload_size++; if (!bus_send_request(b, &msg)) { LOG(1, " @@@ bus_send_request failed!\n"); if (!Bus_SendRequest(b, &msg)) { LOG(1, " @@@ Bus_SendRequest failed!\n"); dropped++; if (dropped >= 100) { LOG(1, " @@@ more than 100 send failures, halting\n"); Loading src/lib/bus/bus_inward.h +6 −10 Original line number Diff line number Diff line Loading @@ -23,26 +23,22 @@ #include "bus_types.h" /* Get the string key for a log event ID. */ const char *bus_log_event_str(log_event_t event); const char *Bus_LogEventStr(log_event_t event); /* For a given file descriptor, get the listener ID to use. * This will level sockets between multiple threads. */ struct listener *bus_get_listener_for_socket(struct bus *b, int fd); /* Schedule a task in the bus's threadpool. */ bool bus_schedule_threadpool_task(struct bus *b, struct threadpool_task *task, size_t *backpressure); struct listener *Bus_GetListenerForSocket(struct bus *b, int fd); /* Lock / unlock the log mutex, since logging can occur on several threads. */ void bus_lock_log(struct bus *b); void bus_unlock_log(struct bus *b); void Bus_LockLog(struct bus *b); void Bus_UnlockLog(struct bus *b); /* Deliver a boxed message to the thread pool to execute. */ bool bus_process_boxed_message(struct bus *b, bool Bus_ProcessBoxedMessage(struct bus *b, struct boxed_msg *box, size_t *backpressure); /* Provide backpressure by sleeping for (backpressure >> shift) msec, if * the value is greater than 0. */ void bus_backpressure_delay(struct bus *b, size_t backpressure, uint8_t shift); void Bus_BackpressureDelay(struct bus *b, size_t backpressure, uint8_t shift); #endif src/lib/bus/bus_poll.c +4 −4 Original line number Diff line number Diff line Loading @@ -32,7 +32,7 @@ int poll_errno; int read_errno; #endif bool bus_poll_on_completion(struct bus *b, int fd) { bool BusPoll_OnCompletion(struct bus *b, int fd) { /* POLL in a pipe */ #ifndef TEST struct pollfd fds[1]; Loading @@ -51,7 +51,7 @@ bool bus_poll_on_completion(struct bus *b, int fd) { BUS_LOG_SNPRINTF(b, 5, LOG_SENDING_REQUEST, b->udata, 64, "poll_on_completion for %d, res %d (errno %d)", fd, res, errno); if (res == -1) { if (util_is_resumable_io_error(errno)) { if (Util_IsResumableIOError(errno)) { BUS_LOG_SNPRINTF(b, 5, LOG_SENDING_REQUEST, b->udata, 64, "poll_on_completion, resumable IO error %d", errno); errno = 0; Loading Loading @@ -83,11 +83,11 @@ bool bus_poll_on_completion(struct bus *b, int fd) { assert(read_buf[0] == LISTENER_MSG_TAG); msec = (read_buf[1] << 0) + (read_buf[2] << 8); bus_backpressure_delay(b, msec, LISTENER_BACKPRESSURE_SHIFT); Bus_BackpressureDelay(b, msec, LISTENER_BACKPRESSURE_SHIFT); BUS_LOG(b, 4, LOG_SENDING_REQUEST, "sent!", b->udata); return true; } else if (sz == -1) { if (util_is_resumable_io_error(errno)) { if (Util_IsResumableIOError(errno)) { BUS_LOG_SNPRINTF(b, 5, LOG_SENDING_REQUEST, b->udata, 64, "poll_on_completion read, resumable IO error %d", errno); errno = 0; Loading Loading
src/lib/bus/bus.c +40 −45 Original line number Diff line number Diff line Loading @@ -61,7 +61,7 @@ int completion_pipe = -1; void *unused = NULL; #endif bool bus_init(bus_config *config, struct bus_result *res) { bool Bus_Init(bus_config *config, struct bus_result *res) { if (res == NULL) { return false; } if (config == NULL) { res->status = BUS_INIT_ERROR_NULL; Loading Loading @@ -96,7 +96,7 @@ bool bus_init(bus_config *config, struct bus_result *res) { bus *b = calloc(1, sizeof(*b)); if (b == NULL) { goto cleanup; } if (!bus_ssl_init(b)) { goto cleanup; } if (!BusSSL_Init(b)) { goto cleanup; } b->sink_cb = config->sink_cb; b->unpack_cb = config->unpack_cb; Loading Loading @@ -127,7 +127,7 @@ bool bus_init(bus_config *config, struct bus_result *res) { } for (int i = 0; i < config->listener_count; i++) { ls[i] = listener_init(b, config); ls[i] = Listener_Init(b, config); if (ls[i] == NULL) { res->status = BUS_INIT_ERROR_LISTENER_INIT_FAIL; goto cleanup; Loading @@ -150,7 +150,7 @@ bool bus_init(bus_config *config, struct bus_result *res) { goto cleanup; } fd_set = yacht_init(DEF_FD_SET_SIZE2); fd_set = Yacht_Init(DEF_FD_SET_SIZE2); if (fd_set == NULL) { goto cleanup; } Loading Loading @@ -178,7 +178,7 @@ bool bus_init(bus_config *config, struct bus_result *res) { cleanup: if (ls) { for (int i = 0; i < config->listener_count; i++) { if (ls[i]) { listener_free(ls[i]); } if (ls[i]) { Listener_Free(ls[i]); } } free(ls); } Loading @@ -195,7 +195,7 @@ cleanup: } if (threads) { free(threads); } if (fd_set) { yacht_free(fd_set, NULL, NULL); } if (fd_set) { Yacht_Free(fd_set, NULL, NULL); } return false; } Loading Loading @@ -259,7 +259,7 @@ static boxed_msg *box_msg(struct bus *b, bus_user_msg *msg) { void *value = NULL; #endif connection_info *ci = NULL; if (yacht_get(b->fd_set, box->fd, &value)) { if (Yacht_Get(b->fd_set, box->fd, &value)) { ci = (connection_info *)value; } if (0 != pthread_mutex_unlock(&b->fd_set_lock)) { assert(false); } Loading Loading @@ -302,7 +302,7 @@ static boxed_msg *box_msg(struct bus *b, bus_user_msg *msg) { return box; } bool bus_send_request(struct bus *b, bus_user_msg *msg) bool Bus_SendRequest(struct bus *b, bus_user_msg *msg) { if (b == NULL || msg == NULL || msg->fd == -1) { return false; Loading @@ -315,7 +315,7 @@ bool bus_send_request(struct bus *b, bus_user_msg *msg) BUS_LOG_SNPRINTF(b, 3-0, LOG_SENDING_REQUEST, b->udata, 64, "Sending request <fd:%d, seq_id:%lld>", msg->fd, (long long)msg->seq_id); bool res = send_do_blocking_send(b, box); bool res = Send_DoBlockingSend(b, box); BUS_LOG_SNPRINTF(b, 3, LOG_SENDING_REQUEST, b->udata, 64, "...request sent, result %d", res); Loading @@ -335,12 +335,12 @@ static int listener_id_of_socket(struct bus *b, int fd) { return fd % b->listener_count; } struct listener *bus_get_listener_for_socket(struct bus *b, int fd) { struct listener *Bus_GetListenerForSocket(struct bus *b, int fd) { return b->listeners[listener_id_of_socket(b, fd)]; } /* Get the string key for a log event ID. */ const char *bus_log_event_str(log_event_t event) { const char *Bus_LogEventStr(log_event_t event) { switch (event) { case LOG_INITIALIZATION: return "INITIALIZATION"; case LOG_NEW_CLIENT: return "NEW_CLIENT"; Loading @@ -355,7 +355,7 @@ const char *bus_log_event_str(log_event_t event) { } } bool bus_register_socket(struct bus *b, bus_socket_t type, int fd, void *udata) { bool Bus_RegisterSocket(struct bus *b, bus_socket_t type, int fd, void *udata) { /* Register a socket internally with the listener. */ int l_id = listener_id_of_socket(b, fd); Loading @@ -377,7 +377,7 @@ bool bus_register_socket(struct bus *b, bus_socket_t type, int fd, void *udata) SSL *ssl = NULL; if (type == BUS_SOCKET_SSL) { ssl = bus_ssl_connect(b, fd); ssl = BusSSL_Connect(b, fd); if (ssl == NULL) { goto cleanup; } } else { ssl = BUS_NO_SSL; Loading @@ -394,7 +394,7 @@ bool bus_register_socket(struct bus *b, bus_socket_t type, int fd, void *udata) #endif /* Lock hash table and save whether this FD uses SSL. */ if (0 != pthread_mutex_lock(&b->fd_set_lock)) { assert(false); } bool set_ok = yacht_set(b->fd_set, fd, ci, &old_value); bool set_ok = Yacht_Set(b->fd_set, fd, ci, &old_value); if (0 != pthread_mutex_unlock(&b->fd_set_lock)) { assert(false); } if (set_ok) { Loading @@ -407,11 +407,11 @@ bool bus_register_socket(struct bus *b, bus_socket_t type, int fd, void *udata) #ifndef TEST int completion_pipe = -1; #endif res = listener_add_socket(l, ci, &completion_pipe); res = Listener_AddSocket(l, ci, &completion_pipe); if (!res) { goto cleanup; } BUS_LOG(b, 2, LOG_SOCKET_REGISTERED, "polling on socket add...", b->udata); bool completed = bus_poll_on_completion(b, completion_pipe); bool completed = BusPoll_OnCompletion(b, completion_pipe); if (!completed) { goto cleanup; } BUS_LOG(b, 2, LOG_SOCKET_REGISTERED, "successfully added socket", b->udata); Loading @@ -425,7 +425,7 @@ cleanup: } /* Free metadata about a socket that has been disconnected. */ bool bus_release_socket(struct bus *b, int fd, void **socket_udata_out) { bool Bus_ReleaseSocket(struct bus *b, int fd, void **socket_udata_out) { int l_id = listener_id_of_socket(b, fd); BUS_LOG_SNPRINTF(b, 2, LOG_SOCKET_REGISTERED, b->udata, 64, Loading @@ -436,12 +436,12 @@ bool bus_release_socket(struct bus *b, int fd, void **socket_udata_out) { #ifndef TEST int completion_pipe = -1; #endif if (!listener_remove_socket(l, fd, &completion_pipe)) { if (!Listener_RemoveSocket(l, fd, &completion_pipe)) { return false; /* couldn't send msg to listener */ } assert(completion_pipe != -1); bool completed = bus_poll_on_completion(b, completion_pipe); bool completed = BusPoll_OnCompletion(b, completion_pipe); if (!completed) { /* listener hung up while waiting */ return false; } Loading @@ -451,7 +451,7 @@ bool bus_release_socket(struct bus *b, int fd, void **socket_udata_out) { void *old_value = NULL; #endif if (0 != pthread_mutex_lock(&b->fd_set_lock)) { assert(false); } bool rm_ok = yacht_remove(b->fd_set, fd, &old_value); bool rm_ok = Yacht_Remove(b->fd_set, fd, &old_value); if (0 != pthread_mutex_unlock(&b->fd_set_lock)) { assert(false); } if (!rm_ok) { return false; Loading @@ -467,18 +467,13 @@ bool bus_release_socket(struct bus *b, int fd, void **socket_udata_out) { if (ci->ssl == BUS_NO_SSL) { res = true; /* nothing else to do */ } else { res = bus_ssl_disconnect(b, ci->ssl); res = BusSSL_Disconnect(b, ci->ssl); } free(ci); return res; } bool bus_schedule_threadpool_task(struct bus *b, struct threadpool_task *task, size_t *backpressure) { return threadpool_schedule(b->threadpool, task, backpressure); } #ifndef TEST static #endif Loading @@ -492,11 +487,11 @@ void free_connection_cb(void *value, void *udata) { #ifndef TEST int completion_pipe = -1; #endif if (!listener_remove_socket(l, ci->fd, &completion_pipe)) { if (!Listener_RemoveSocket(l, ci->fd, &completion_pipe)) { return; /* couldn't send msg to listener */ } bool completed = bus_poll_on_completion(b, completion_pipe); bool completed = BusPoll_OnCompletion(b, completion_pipe); if (!completed) { return; } Loading @@ -504,7 +499,7 @@ void free_connection_cb(void *value, void *udata) { free(ci); } bool bus_shutdown(bus *b) { bool Bus_Shutdown(bus *b) { for (;;) { shutdown_state_t ss = b->shutdown_state; /* Another thread is already shutting things down. */ Loading @@ -517,7 +512,7 @@ bool bus_shutdown(bus *b) { if (b->fd_set) { BUS_LOG(b, 2, LOG_SHUTDOWN, "removing all connections", b->udata); yacht_free(b->fd_set, free_connection_cb, b); Yacht_Free(b->fd_set, free_connection_cb, b); b->fd_set = NULL; } Loading @@ -529,25 +524,25 @@ bool bus_shutdown(bus *b) { for (int i = 0; i < b->listener_count; i++) { if (!b->joined[i]) { BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128, "listener_shutdown -- %d", i); if (!listener_shutdown(b->listeners[i], &completion_pipe)) { "Listener_Shutdown -- %d", i); if (!Listener_Shutdown(b->listeners[i], &completion_pipe)) { b->shutdown_state = SHUTDOWN_STATE_RUNNING; return false; } if (!bus_poll_on_completion(b, completion_pipe)) { if (!BusPoll_OnCompletion(b, completion_pipe)) { b->shutdown_state = SHUTDOWN_STATE_RUNNING; return false; } BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128, "listener_shutdown -- joining %d", i); "Listener_Shutdown -- joining %d", i); #ifndef TEST void *unused = NULL; #endif int res = syscall_pthread_join(b->threads[i], &unused); BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128, "listener_shutdown -- joined %d", i); "Listener_Shutdown -- joined %d", i); if (res != 0) { b->shutdown_state = SHUTDOWN_STATE_RUNNING; return false; Loading @@ -561,7 +556,7 @@ bool bus_shutdown(bus *b) { return true; } void bus_backpressure_delay(struct bus *b, size_t backpressure, uint8_t shift) { void Bus_BackpressureDelay(struct bus *b, size_t backpressure, uint8_t shift) { /* Push back if message bus is too busy. */ backpressure >>= shift; Loading @@ -572,11 +567,11 @@ void bus_backpressure_delay(struct bus *b, size_t backpressure, uint8_t shift) { } } void bus_lock_log(struct bus *b) { void Bus_LockLog(struct bus *b) { pthread_mutex_lock(&b->log_lock); } void bus_unlock_log(struct bus *b) { void Bus_UnlockLog(struct bus *b) { pthread_mutex_unlock(&b->log_lock); } Loading @@ -598,7 +593,7 @@ static void box_cleanup_cb(void *udata) { /* Deliver a boxed message to the thread pool to execute. * The boxed message will be freed by the threadpool. */ bool bus_process_boxed_message(struct bus *b, bool Bus_ProcessBoxedMessage(struct bus *b, struct boxed_msg *box, size_t *backpressure) { assert(box); assert(box->result.status != BUS_SEND_UNDEFINED); Loading @@ -611,23 +606,23 @@ bool bus_process_boxed_message(struct bus *b, BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "Scheduling boxed message -- %p -- where it will be freed", (void*)box); return bus_schedule_threadpool_task(b, &task, backpressure); return threadpool_schedule(b->threadpool, &task, backpressure); } /* How many seconds should it give the thread pool to shut down? */ #define THREAD_SHUTDOWN_SECONDS 5 void bus_free(bus *b) { void Bus_Free(bus *b) { if (b == NULL) { return; } while (b->shutdown_state != SHUTDOWN_STATE_HALTED) { if (bus_shutdown(b)) { break; } if (Bus_Shutdown(b)) { break; } syscall_poll(NULL, 0, 10); // sleep 10 msec } for (int i = 0; i < b->listener_count; i++) { BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128, "listener_free -- %d", i); listener_free(b->listeners[i]); "Listener_Free -- %d", i); Listener_Free(b->listeners[i]); } free(b->listeners); Loading @@ -651,7 +646,7 @@ void bus_free(bus *b) { pthread_mutex_destroy(&b->fd_set_lock); pthread_mutex_destroy(&b->log_lock); bus_ssl_ctx_free(b); BusSSL_CtxFree(b); free(b); } Loading
src/lib/bus/bus.h +12 −12 Original line number Diff line number Diff line Loading @@ -22,24 +22,24 @@ #include "bus_types.h" /* Initialize a bus, based on configuration in *config. Returns a bool /* Initialize a bus, based on configuration in *config. RetuBus_RegisterSockets a bool * indicating whether the construction succeeded, and the bus pointer * and/or a status code indicating the cause of failure in *res. */ bool bus_init(bus_config *config, struct bus_result *res); bool Bus_Init(bus_config *config, struct bus_result *res); /* Send a request. Blocks until the request has been transmitted. * * Assumes the FD has been registered with bus_register_socket; * Assumes the FD has been registered with Bus_register_socket; * sending to an unregistered socket is an error. * * Returns true if the request has been accepted and the bus will * RetuBus_RegisterSockets true if the request has been accepted and the bus will * attempt to handle the request and response. They can still fail, * but the error status will be passed to the result handler callback. * * Returns false if the request has been rejected, due to a memory * RetuBus_RegisterSockets false if the request has been rejected, due to a memory * allocation error or invalid arguments. * */ bool bus_send_request(struct bus *b, bus_user_msg *msg); bool Bus_SendRequest(struct bus *b, bus_user_msg *msg); /* Register a socket connected to an endpoint, and data that will be passed * to all interactions on that socket. Loading @@ -49,21 +49,21 @@ bool bus_send_request(struct bus *b, bus_user_msg *msg); * * If USES_SSL is true, then the function will block until the initial * SSL/TLS connection handshake has completed. */ bool bus_register_socket(struct bus *b, bus_socket_t type, int fd, void *socket_udata); bool Bus_RegisterSocket(struct bus *b, bus_socket_t type, int fd, void *socket_udata); /* Free metadata about a socket that has been disconnected. */ bool bus_release_socket(struct bus *b, int fd, void **socket_udata_out); bool Bus_ReleaseSocket(struct bus *b, int fd, void **socket_udata_out); /* Begin shutting the system down. Returns true once everything pending /* Begin shutting the system down. RetuBus_RegisterSockets true once everything pending * has resolved. */ bool bus_shutdown(struct bus *b); bool Bus_Shutdown(struct bus *b); /* Free internal data structures for the bus. */ void bus_free(struct bus *b); void Bus_Free(struct bus *b); /* Inward facing portion of the message bus -- functions called * by other parts of the message bus, like the Listener thread, * but not by code outside the bus. */ #include "bus_inward.h" #include "Bus_inward.h" #endif
src/lib/bus/bus_example.c +8 −8 Original line number Diff line number Diff line Loading @@ -88,7 +88,7 @@ static time_t get_cur_second(void); static void log_cb(log_event_t event, int log_level, const char *msg, void *udata) { example_state *s = (example_state *)udata; const char *event_str = bus_log_event_str(event); const char *event_str = Bus_LogEventStr(event); fprintf(/*stderr*/stdout, "%ld -- %s[%d] -- %s\n", s->last_second, event_str, log_level, msg); } Loading Loading @@ -253,7 +253,7 @@ int main(int argc, char **argv) { .bus_udata = &state, }; bus_result res = {0}; if (!bus_init(&cfg, &res)) { if (!Bus_Init(&cfg, &res)) { LOG(0, "failed to init bus: %d\n", res.status); return 1; } Loading @@ -264,8 +264,8 @@ int main(int argc, char **argv) { if (b) { LOG(1, "shutting down\n"); bus_shutdown(b); bus_free(b); Bus_Shutdown(b); Bus_Free(b); return 0; } else { return 1; Loading Loading @@ -445,7 +445,7 @@ static void tick_handler(example_state *s) { static time_t get_cur_second(void) { struct timeval tv; if (!util_timestamp(&tv, true)) { if (!Util_Timestamp(&tv, true)) { assert(false); } return tv.tv_sec; Loading @@ -456,7 +456,7 @@ static void run_bus(example_state *s, struct bus *b) { open_sockets(s); for (int i = 0; i < s->sockets_used; i++) { bus_register_socket(b, BUS_SOCKET_PLAIN, s->sockets[i], s->info[i]); Bus_RegisterSocket(b, BUS_SOCKET_PLAIN, s->sockets[i], s->info[i]); } int cur_socket_i = 0; Loading Loading @@ -503,8 +503,8 @@ static void run_bus(example_state *s, struct bus *b) { s->sent_msgs++; payload_size++; if (!bus_send_request(b, &msg)) { LOG(1, " @@@ bus_send_request failed!\n"); if (!Bus_SendRequest(b, &msg)) { LOG(1, " @@@ Bus_SendRequest failed!\n"); dropped++; if (dropped >= 100) { LOG(1, " @@@ more than 100 send failures, halting\n"); Loading
src/lib/bus/bus_inward.h +6 −10 Original line number Diff line number Diff line Loading @@ -23,26 +23,22 @@ #include "bus_types.h" /* Get the string key for a log event ID. */ const char *bus_log_event_str(log_event_t event); const char *Bus_LogEventStr(log_event_t event); /* For a given file descriptor, get the listener ID to use. * This will level sockets between multiple threads. */ struct listener *bus_get_listener_for_socket(struct bus *b, int fd); /* Schedule a task in the bus's threadpool. */ bool bus_schedule_threadpool_task(struct bus *b, struct threadpool_task *task, size_t *backpressure); struct listener *Bus_GetListenerForSocket(struct bus *b, int fd); /* Lock / unlock the log mutex, since logging can occur on several threads. */ void bus_lock_log(struct bus *b); void bus_unlock_log(struct bus *b); void Bus_LockLog(struct bus *b); void Bus_UnlockLog(struct bus *b); /* Deliver a boxed message to the thread pool to execute. */ bool bus_process_boxed_message(struct bus *b, bool Bus_ProcessBoxedMessage(struct bus *b, struct boxed_msg *box, size_t *backpressure); /* Provide backpressure by sleeping for (backpressure >> shift) msec, if * the value is greater than 0. */ void bus_backpressure_delay(struct bus *b, size_t backpressure, uint8_t shift); void Bus_BackpressureDelay(struct bus *b, size_t backpressure, uint8_t shift); #endif
src/lib/bus/bus_poll.c +4 −4 Original line number Diff line number Diff line Loading @@ -32,7 +32,7 @@ int poll_errno; int read_errno; #endif bool bus_poll_on_completion(struct bus *b, int fd) { bool BusPoll_OnCompletion(struct bus *b, int fd) { /* POLL in a pipe */ #ifndef TEST struct pollfd fds[1]; Loading @@ -51,7 +51,7 @@ bool bus_poll_on_completion(struct bus *b, int fd) { BUS_LOG_SNPRINTF(b, 5, LOG_SENDING_REQUEST, b->udata, 64, "poll_on_completion for %d, res %d (errno %d)", fd, res, errno); if (res == -1) { if (util_is_resumable_io_error(errno)) { if (Util_IsResumableIOError(errno)) { BUS_LOG_SNPRINTF(b, 5, LOG_SENDING_REQUEST, b->udata, 64, "poll_on_completion, resumable IO error %d", errno); errno = 0; Loading Loading @@ -83,11 +83,11 @@ bool bus_poll_on_completion(struct bus *b, int fd) { assert(read_buf[0] == LISTENER_MSG_TAG); msec = (read_buf[1] << 0) + (read_buf[2] << 8); bus_backpressure_delay(b, msec, LISTENER_BACKPRESSURE_SHIFT); Bus_BackpressureDelay(b, msec, LISTENER_BACKPRESSURE_SHIFT); BUS_LOG(b, 4, LOG_SENDING_REQUEST, "sent!", b->udata); return true; } else if (sz == -1) { if (util_is_resumable_io_error(errno)) { if (Util_IsResumableIOError(errno)) { BUS_LOG_SNPRINTF(b, 5, LOG_SENDING_REQUEST, b->udata, 64, "poll_on_completion read, resumable IO error %d", errno); errno = 0; Loading