Loading Makefile +8 −0 Original line number Diff line number Diff line Loading @@ -83,12 +83,15 @@ LIB_OBJS = \ $(OUT_DIR)/kinetic_admin_client.o \ $(OUT_DIR)/threadpool.o \ $(OUT_DIR)/bus.o \ $(OUT_DIR)/bus_poll.o \ $(OUT_DIR)/bus_ssl.o \ $(OUT_DIR)/listener.o \ $(OUT_DIR)/listener_cmd.o \ $(OUT_DIR)/listener_helper.o \ $(OUT_DIR)/listener_io.o \ $(OUT_DIR)/listener_task.o \ $(OUT_DIR)/send.o \ $(OUT_DIR)/send_helper.o \ $(OUT_DIR)/syscall.o \ $(OUT_DIR)/util.o \ $(OUT_DIR)/yacht.o \ Loading Loading @@ -143,7 +146,12 @@ $(OUT_DIR)/protobuf-c.o: $(PROTOBUFC)/protobuf-c/protobuf-c.c $(PROTOBUFC)/proto ${OUT_DIR}/kinetic_types.o: ${LIB_DIR}/kinetic_types_internal.h ${OUT_DIR}/bus.o: ${LIB_DIR}/bus/bus_types.h ${OUT_DIR}/sender.o: ${LIB_DIR}/bus/sender_internal.h ${OUT_DIR}/sender_helper.o: ${LIB_DIR}/bus/sender_internal.h ${OUT_DIR}/listener.o: ${LIB_DIR}/bus/listener_internal.h ${OUT_DIR}/listener_cmd.o: ${LIB_DIR}/bus/listener_internal.h ${OUT_DIR}/listener_helper.o: ${LIB_DIR}/bus/listener_internal.h ${OUT_DIR}/listener_io.o: ${LIB_DIR}/bus/listener_internal.h ${OUT_DIR}/listener_task.o: ${LIB_DIR}/bus/listener_internal.h $(OUT_DIR)/threadpool.o: ${LIB_DIR}/threadpool/threadpool.c ${LIB_DIR}/threadpool/threadpool.h $(CC) -o $@ -c $< $(CFLAGS) Loading src/lib/bus/Makefile +5 −1 Original line number Diff line number Diff line SOCKET99_PATH= ../../../vendor/socket99 THREADPOOL_PATH= ../threadpool LIB_PATH= ../../../obj BASE_PROJECT_INC= -I.. -I../../../include -I../../../vendor/protobuf-c OPT= -O3 LIB_INC = -I${SOCKET99_PATH} -I${THREADPOOL_PATH} -I${OPENSSL_PATH}/include CFLAGS += -std=c99 ${OPT} -Wall -g ${LIB_INC} CFLAGS += -std=c99 ${OPT} -Wall -g ${LIB_INC} ${BASE_PROJECT_INC} LDFLAGS += -L. -L${LIB_PATH} -L${THREADPOOL_PATH} -L${OPENSSL_PATH}/lib -L${SOCKET99_PATH} -lsocket99 -lssl -lcrypto BUS_OBJS = \ bus.o \ bus_poll.o \ bus_ssl.o \ listener.o \ listener_cmd.o \ listener_helper.o \ listener_io.o \ listener_task.o \ send.o \ send_helper.o \ syscall.o \ util.o \ yacht.o \ Loading src/lib/bus/bus.c +67 −81 Original line number Diff line number Diff line Loading @@ -28,6 +28,7 @@ #include <sys/resource.h> #include "bus.h" #include "bus_poll.h" #include "send.h" #include "listener.h" #include "threadpool.h" Loading @@ -38,9 +39,9 @@ #include "syscall.h" #include "atomic.h" #include "kinetic_types_internal.h" #include "listener_task.h" static bool poll_on_completion(struct bus *b, int fd); static int listener_id_of_socket(struct bus *b, int fd); static void noop_log_cb(log_event_t event, int log_level, const char *msg, void *udata); Loading @@ -51,6 +52,15 @@ static void set_defaults(bus_config *cfg) { if (cfg->listener_count == 0) { cfg->listener_count = 1; } } #ifdef TEST boxed_msg *test_box = NULL; void *value = NULL; void *old_value = NULL; connection_info *test_ci = NULL; int completion_pipe = -1; void *unused = NULL; #endif bool bus_init(bus_config *config, struct bus_result *res) { if (res == NULL) { return false; } if (config == NULL) { Loading Loading @@ -229,7 +239,12 @@ static bool attempt_to_increase_resource_limits(struct bus *b) { * * The box should only ever be accessible on a single thread at a time. */ static boxed_msg *box_msg(struct bus *b, bus_user_msg *msg) { boxed_msg *box = calloc(1, sizeof(*box)); boxed_msg *box = NULL; #ifdef TEST box = test_box; #else box = calloc(1, sizeof(*box)); #endif if (box == NULL) { return NULL; } BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 64, Loading @@ -240,7 +255,9 @@ static boxed_msg *box_msg(struct bus *b, bus_user_msg *msg) { /* Lock hash table and check whether this FD uses SSL. */ if (0 != pthread_mutex_lock(&b->fd_set_lock)) { assert(false); } #ifndef TEST void *value = NULL; #endif connection_info *ci = NULL; if (yacht_get(b->fd_set, box->fd, &value)) { ci = (connection_info *)value; Loading Loading @@ -313,71 +330,6 @@ bool bus_send_request(struct bus *b, bus_user_msg *msg) return res; } static bool poll_on_completion(struct bus *b, int fd) { /* POLL in a pipe */ struct pollfd fds[1]; fds[0].fd = fd; fds[0].events = POLLIN; for (;;) { BUS_LOG(b, 5, LOG_SENDING_REQUEST, "Polling on completion...tick...", b->udata); int res = syscall_poll(fds, 1, -1); if (res == -1) { if (util_is_resumable_io_error(errno)) { BUS_LOG_SNPRINTF(b, 5, LOG_SENDING_REQUEST, b->udata, 64, "poll_on_completion, resumable IO error %d", errno); errno = 0; continue; } else { BUS_LOG_SNPRINTF(b, 1, LOG_SENDING_REQUEST, b->udata, 64, "poll_on_completion, non-resumable IO error %d", errno); return false; } } else if (res == 1) { uint16_t msec = 0; uint8_t read_buf[sizeof(uint8_t) + sizeof(msec)]; if (fds[0].revents & (POLLERR | POLLHUP | POLLNVAL)) { BUS_LOG(b, 1, LOG_SENDING_REQUEST, "failed (broken alert pipe)", b->udata); return false; } BUS_LOG(b, 3, LOG_SENDING_REQUEST, "Reading alert pipe...", b->udata); ssize_t sz = syscall_read(fd, read_buf, sizeof(read_buf)); if (sz == sizeof(read_buf)) { /* Payload: little-endian uint16_t, msec of backpressure. */ assert(read_buf[0] == LISTENER_MSG_TAG); msec = (read_buf[1] << 0) + (read_buf[2] << 8); bus_backpressure_delay(b, msec, LISTENER_BACKPRESSURE_SHIFT); BUS_LOG(b, 4, LOG_SENDING_REQUEST, "sent!", b->udata); return true; } else if (sz == -1) { if (util_is_resumable_io_error(errno)) { BUS_LOG_SNPRINTF(b, 5, LOG_SENDING_REQUEST, b->udata, 64, "poll_on_completion read, resumable IO error %d", errno); errno = 0; continue; } else { BUS_LOG_SNPRINTF(b, 2, LOG_SENDING_REQUEST, b->udata, 64, "poll_on_completion read, non-resumable IO error %d", errno); errno = 0; return false; } } else { BUS_LOG_SNPRINTF(b, 1, LOG_SENDING_REQUEST, b->udata, 64, "poll_on_completion bad read size %zd", sz); return false; } } else { BUS_LOG_SNPRINTF(b, 0, LOG_SENDING_REQUEST, b->udata, 64, "poll_on_completion, blocking forever returned 0, errno %d", errno); assert(false); } } } static int listener_id_of_socket(struct bus *b, int fd) { /* Just evenly divide sockets between listeners by file descriptor. */ return fd % b->listener_count; Loading Loading @@ -416,7 +368,11 @@ bool bus_register_socket(struct bus *b, bus_socket_t type, int fd, void *udata) /* Metadata about the connection. Note: This will be shared by the * client thread and the listener thread, but each will only modify * some of the fields. The client thread will free this. */ #ifdef TEST connection_info *ci = test_ci; #else connection_info *ci = calloc(1, sizeof(*ci)); #endif if (ci == NULL) { goto cleanup; } SSL *ssl = NULL; Loading @@ -434,7 +390,9 @@ bool bus_register_socket(struct bus *b, bus_socket_t type, int fd, void *udata) ci->largest_rd_seq_id_seen = BUS_NO_SEQ_ID; ci->largest_wr_seq_id_seen = BUS_NO_SEQ_ID; #ifndef TEST void *old_value = NULL; #endif /* Lock hash table and save whether this FD uses SSL. */ if (0 != pthread_mutex_lock(&b->fd_set_lock)) { assert(false); } bool set_ok = yacht_set(b->fd_set, fd, ci, &old_value); Loading @@ -447,12 +405,14 @@ bool bus_register_socket(struct bus *b, bus_socket_t type, int fd, void *udata) } bool res = false; #ifndef TEST int completion_pipe = -1; #endif res = listener_add_socket(l, ci, &completion_pipe); if (!res) { goto cleanup; } BUS_LOG(b, 2, LOG_SOCKET_REGISTERED, "polling on socket add...", b->udata); bool completed = poll_on_completion(b, completion_pipe); bool completed = bus_poll_on_completion(b, completion_pipe); if (!completed) { goto cleanup; } BUS_LOG(b, 2, LOG_SOCKET_REGISTERED, "successfully added socket", b->udata); Loading @@ -474,22 +434,28 @@ bool bus_release_socket(struct bus *b, int fd, void **socket_udata_out) { struct listener *l = b->listeners[l_id]; int completion_fd = -1; if (!listener_remove_socket(l, fd, &completion_fd)) { #ifndef TEST int completion_pipe = -1; #endif if (!listener_remove_socket(l, fd, &completion_pipe)) { return false; /* couldn't send msg to listener */ } bool completed = poll_on_completion(b, completion_fd); bool completed = bus_poll_on_completion(b, completion_pipe); if (!completed) { /* listener hung up while waiting */ return false; } /* Lock hash table and forget whether this FD uses SSL. */ #ifndef TEST void *old_value = NULL; #endif if (0 != pthread_mutex_lock(&b->fd_set_lock)) { assert(false); } bool rm_ok = yacht_remove(b->fd_set, fd, &old_value); if (0 != pthread_mutex_unlock(&b->fd_set_lock)) { assert(false); } assert(rm_ok); if (!rm_ok) { return false; } connection_info *ci = (connection_info *)old_value; assert(ci != NULL); Loading @@ -513,19 +479,24 @@ bool bus_schedule_threadpool_task(struct bus *b, struct threadpool_task *task, return threadpool_schedule(b->threadpool, task, backpressure); } static void free_connection_cb(void *value, void *udata) { #ifndef TEST static #endif void free_connection_cb(void *value, void *udata) { struct bus *b = (struct bus *)udata; connection_info *ci = (connection_info *)value; int l_id = listener_id_of_socket(b, ci->fd); struct listener *l = b->listeners[l_id]; int completion_fd = -1; if (!listener_remove_socket(l, ci->fd, &completion_fd)) { #ifndef TEST int completion_pipe = -1; #endif if (!listener_remove_socket(l, ci->fd, &completion_pipe)) { return; /* couldn't send msg to listener */ } bool completed = poll_on_completion(b, completion_fd); bool completed = bus_poll_on_completion(b, completion_pipe); if (!completed) { return; } Loading @@ -550,22 +521,37 @@ bool bus_shutdown(bus *b) { b->fd_set = NULL; } #ifndef TEST int completion_pipe = -1; #endif BUS_LOG(b, 2, LOG_SHUTDOWN, "shutting down listener threads", b->udata); for (int i = 0; i < b->listener_count; i++) { if (!b->joined[i]) { BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128, "listener_shutdown -- %d", i); int completion_fd = -1; listener_shutdown(b->listeners[i], &completion_fd); poll_on_completion(b, completion_fd); if (!listener_shutdown(b->listeners[i], &completion_pipe)) { b->shutdown_state = SHUTDOWN_STATE_RUNNING; return false; } if (!bus_poll_on_completion(b, completion_pipe)) { b->shutdown_state = SHUTDOWN_STATE_RUNNING; return false; } BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128, "listener_shutdown -- joining %d", i); #ifndef TEST void *unused = NULL; int res = pthread_join(b->threads[i], &unused); #endif int res = syscall_pthread_join(b->threads[i], &unused); BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128, "listener_shutdown -- joined %d", i); assert(res == 0); if (res != 0) { b->shutdown_state = SHUTDOWN_STATE_RUNNING; return false; } b->joined[i] = true; } } Loading src/lib/bus/bus.h +4 −20 Original line number Diff line number Diff line Loading @@ -45,9 +45,6 @@ bool bus_init(bus_config *config, struct bus_result *res); * */ bool bus_send_request(struct bus *b, bus_user_msg *msg); /* Get the string key for a log event ID. */ const char *bus_log_event_str(log_event_t event); /* Register a socket connected to an endpoint, and data that will be passed * to all interactions on that socket. * Loading @@ -65,25 +62,12 @@ bool bus_release_socket(struct bus *b, int fd, void **socket_udata_out); * has resolved. */ bool bus_shutdown(struct bus *b); /* For a given file descriptor, get the listener ID to use. * This will level sockets between multiple threads. */ struct listener *bus_get_listener_for_socket(struct bus *b, int fd); /* Schedule a task in the bus's threadpool. */ bool bus_schedule_threadpool_task(struct bus *b, struct threadpool_task *task, size_t *backpressure); /* Lock / unlock the log mutex, since logging can occur on several threads. */ void bus_lock_log(struct bus *b); void bus_unlock_log(struct bus *b); /* Free internal data structures for the bus. */ void bus_free(struct bus *b); /* Deliver a boxed message to the thread pool to execute. */ bool bus_process_boxed_message(struct bus *b, struct boxed_msg *box, size_t *backpressure); void bus_backpressure_delay(struct bus *b, size_t backpressure, uint8_t shift); /* Inward facing portion of the message bus -- functions called * by other parts of the message bus, like the Listener thread, * but not by code outside the bus. */ #include "bus_inward.h" #endif src/lib/bus/bus_inward.h 0 → 100644 +29 −0 Original line number Diff line number Diff line #ifndef BUS_INWARD_H #define BUS_INWARD_H #include "bus_types.h" /* Get the string key for a log event ID. */ const char *bus_log_event_str(log_event_t event); /* For a given file descriptor, get the listener ID to use. * This will level sockets between multiple threads. */ struct listener *bus_get_listener_for_socket(struct bus *b, int fd); /* Schedule a task in the bus's threadpool. */ bool bus_schedule_threadpool_task(struct bus *b, struct threadpool_task *task, size_t *backpressure); /* Lock / unlock the log mutex, since logging can occur on several threads. */ void bus_lock_log(struct bus *b); void bus_unlock_log(struct bus *b); /* Deliver a boxed message to the thread pool to execute. */ bool bus_process_boxed_message(struct bus *b, struct boxed_msg *box, size_t *backpressure); /* Provide backpressure by sleeping for (backpressure >> shift) msec, if * the value is greater than 0. */ void bus_backpressure_delay(struct bus *b, size_t backpressure, uint8_t shift); #endif Loading
Makefile +8 −0 Original line number Diff line number Diff line Loading @@ -83,12 +83,15 @@ LIB_OBJS = \ $(OUT_DIR)/kinetic_admin_client.o \ $(OUT_DIR)/threadpool.o \ $(OUT_DIR)/bus.o \ $(OUT_DIR)/bus_poll.o \ $(OUT_DIR)/bus_ssl.o \ $(OUT_DIR)/listener.o \ $(OUT_DIR)/listener_cmd.o \ $(OUT_DIR)/listener_helper.o \ $(OUT_DIR)/listener_io.o \ $(OUT_DIR)/listener_task.o \ $(OUT_DIR)/send.o \ $(OUT_DIR)/send_helper.o \ $(OUT_DIR)/syscall.o \ $(OUT_DIR)/util.o \ $(OUT_DIR)/yacht.o \ Loading Loading @@ -143,7 +146,12 @@ $(OUT_DIR)/protobuf-c.o: $(PROTOBUFC)/protobuf-c/protobuf-c.c $(PROTOBUFC)/proto ${OUT_DIR}/kinetic_types.o: ${LIB_DIR}/kinetic_types_internal.h ${OUT_DIR}/bus.o: ${LIB_DIR}/bus/bus_types.h ${OUT_DIR}/sender.o: ${LIB_DIR}/bus/sender_internal.h ${OUT_DIR}/sender_helper.o: ${LIB_DIR}/bus/sender_internal.h ${OUT_DIR}/listener.o: ${LIB_DIR}/bus/listener_internal.h ${OUT_DIR}/listener_cmd.o: ${LIB_DIR}/bus/listener_internal.h ${OUT_DIR}/listener_helper.o: ${LIB_DIR}/bus/listener_internal.h ${OUT_DIR}/listener_io.o: ${LIB_DIR}/bus/listener_internal.h ${OUT_DIR}/listener_task.o: ${LIB_DIR}/bus/listener_internal.h $(OUT_DIR)/threadpool.o: ${LIB_DIR}/threadpool/threadpool.c ${LIB_DIR}/threadpool/threadpool.h $(CC) -o $@ -c $< $(CFLAGS) Loading
src/lib/bus/Makefile +5 −1 Original line number Diff line number Diff line SOCKET99_PATH= ../../../vendor/socket99 THREADPOOL_PATH= ../threadpool LIB_PATH= ../../../obj BASE_PROJECT_INC= -I.. -I../../../include -I../../../vendor/protobuf-c OPT= -O3 LIB_INC = -I${SOCKET99_PATH} -I${THREADPOOL_PATH} -I${OPENSSL_PATH}/include CFLAGS += -std=c99 ${OPT} -Wall -g ${LIB_INC} CFLAGS += -std=c99 ${OPT} -Wall -g ${LIB_INC} ${BASE_PROJECT_INC} LDFLAGS += -L. -L${LIB_PATH} -L${THREADPOOL_PATH} -L${OPENSSL_PATH}/lib -L${SOCKET99_PATH} -lsocket99 -lssl -lcrypto BUS_OBJS = \ bus.o \ bus_poll.o \ bus_ssl.o \ listener.o \ listener_cmd.o \ listener_helper.o \ listener_io.o \ listener_task.o \ send.o \ send_helper.o \ syscall.o \ util.o \ yacht.o \ Loading
src/lib/bus/bus.c +67 −81 Original line number Diff line number Diff line Loading @@ -28,6 +28,7 @@ #include <sys/resource.h> #include "bus.h" #include "bus_poll.h" #include "send.h" #include "listener.h" #include "threadpool.h" Loading @@ -38,9 +39,9 @@ #include "syscall.h" #include "atomic.h" #include "kinetic_types_internal.h" #include "listener_task.h" static bool poll_on_completion(struct bus *b, int fd); static int listener_id_of_socket(struct bus *b, int fd); static void noop_log_cb(log_event_t event, int log_level, const char *msg, void *udata); Loading @@ -51,6 +52,15 @@ static void set_defaults(bus_config *cfg) { if (cfg->listener_count == 0) { cfg->listener_count = 1; } } #ifdef TEST boxed_msg *test_box = NULL; void *value = NULL; void *old_value = NULL; connection_info *test_ci = NULL; int completion_pipe = -1; void *unused = NULL; #endif bool bus_init(bus_config *config, struct bus_result *res) { if (res == NULL) { return false; } if (config == NULL) { Loading Loading @@ -229,7 +239,12 @@ static bool attempt_to_increase_resource_limits(struct bus *b) { * * The box should only ever be accessible on a single thread at a time. */ static boxed_msg *box_msg(struct bus *b, bus_user_msg *msg) { boxed_msg *box = calloc(1, sizeof(*box)); boxed_msg *box = NULL; #ifdef TEST box = test_box; #else box = calloc(1, sizeof(*box)); #endif if (box == NULL) { return NULL; } BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 64, Loading @@ -240,7 +255,9 @@ static boxed_msg *box_msg(struct bus *b, bus_user_msg *msg) { /* Lock hash table and check whether this FD uses SSL. */ if (0 != pthread_mutex_lock(&b->fd_set_lock)) { assert(false); } #ifndef TEST void *value = NULL; #endif connection_info *ci = NULL; if (yacht_get(b->fd_set, box->fd, &value)) { ci = (connection_info *)value; Loading Loading @@ -313,71 +330,6 @@ bool bus_send_request(struct bus *b, bus_user_msg *msg) return res; } static bool poll_on_completion(struct bus *b, int fd) { /* POLL in a pipe */ struct pollfd fds[1]; fds[0].fd = fd; fds[0].events = POLLIN; for (;;) { BUS_LOG(b, 5, LOG_SENDING_REQUEST, "Polling on completion...tick...", b->udata); int res = syscall_poll(fds, 1, -1); if (res == -1) { if (util_is_resumable_io_error(errno)) { BUS_LOG_SNPRINTF(b, 5, LOG_SENDING_REQUEST, b->udata, 64, "poll_on_completion, resumable IO error %d", errno); errno = 0; continue; } else { BUS_LOG_SNPRINTF(b, 1, LOG_SENDING_REQUEST, b->udata, 64, "poll_on_completion, non-resumable IO error %d", errno); return false; } } else if (res == 1) { uint16_t msec = 0; uint8_t read_buf[sizeof(uint8_t) + sizeof(msec)]; if (fds[0].revents & (POLLERR | POLLHUP | POLLNVAL)) { BUS_LOG(b, 1, LOG_SENDING_REQUEST, "failed (broken alert pipe)", b->udata); return false; } BUS_LOG(b, 3, LOG_SENDING_REQUEST, "Reading alert pipe...", b->udata); ssize_t sz = syscall_read(fd, read_buf, sizeof(read_buf)); if (sz == sizeof(read_buf)) { /* Payload: little-endian uint16_t, msec of backpressure. */ assert(read_buf[0] == LISTENER_MSG_TAG); msec = (read_buf[1] << 0) + (read_buf[2] << 8); bus_backpressure_delay(b, msec, LISTENER_BACKPRESSURE_SHIFT); BUS_LOG(b, 4, LOG_SENDING_REQUEST, "sent!", b->udata); return true; } else if (sz == -1) { if (util_is_resumable_io_error(errno)) { BUS_LOG_SNPRINTF(b, 5, LOG_SENDING_REQUEST, b->udata, 64, "poll_on_completion read, resumable IO error %d", errno); errno = 0; continue; } else { BUS_LOG_SNPRINTF(b, 2, LOG_SENDING_REQUEST, b->udata, 64, "poll_on_completion read, non-resumable IO error %d", errno); errno = 0; return false; } } else { BUS_LOG_SNPRINTF(b, 1, LOG_SENDING_REQUEST, b->udata, 64, "poll_on_completion bad read size %zd", sz); return false; } } else { BUS_LOG_SNPRINTF(b, 0, LOG_SENDING_REQUEST, b->udata, 64, "poll_on_completion, blocking forever returned 0, errno %d", errno); assert(false); } } } static int listener_id_of_socket(struct bus *b, int fd) { /* Just evenly divide sockets between listeners by file descriptor. */ return fd % b->listener_count; Loading Loading @@ -416,7 +368,11 @@ bool bus_register_socket(struct bus *b, bus_socket_t type, int fd, void *udata) /* Metadata about the connection. Note: This will be shared by the * client thread and the listener thread, but each will only modify * some of the fields. The client thread will free this. */ #ifdef TEST connection_info *ci = test_ci; #else connection_info *ci = calloc(1, sizeof(*ci)); #endif if (ci == NULL) { goto cleanup; } SSL *ssl = NULL; Loading @@ -434,7 +390,9 @@ bool bus_register_socket(struct bus *b, bus_socket_t type, int fd, void *udata) ci->largest_rd_seq_id_seen = BUS_NO_SEQ_ID; ci->largest_wr_seq_id_seen = BUS_NO_SEQ_ID; #ifndef TEST void *old_value = NULL; #endif /* Lock hash table and save whether this FD uses SSL. */ if (0 != pthread_mutex_lock(&b->fd_set_lock)) { assert(false); } bool set_ok = yacht_set(b->fd_set, fd, ci, &old_value); Loading @@ -447,12 +405,14 @@ bool bus_register_socket(struct bus *b, bus_socket_t type, int fd, void *udata) } bool res = false; #ifndef TEST int completion_pipe = -1; #endif res = listener_add_socket(l, ci, &completion_pipe); if (!res) { goto cleanup; } BUS_LOG(b, 2, LOG_SOCKET_REGISTERED, "polling on socket add...", b->udata); bool completed = poll_on_completion(b, completion_pipe); bool completed = bus_poll_on_completion(b, completion_pipe); if (!completed) { goto cleanup; } BUS_LOG(b, 2, LOG_SOCKET_REGISTERED, "successfully added socket", b->udata); Loading @@ -474,22 +434,28 @@ bool bus_release_socket(struct bus *b, int fd, void **socket_udata_out) { struct listener *l = b->listeners[l_id]; int completion_fd = -1; if (!listener_remove_socket(l, fd, &completion_fd)) { #ifndef TEST int completion_pipe = -1; #endif if (!listener_remove_socket(l, fd, &completion_pipe)) { return false; /* couldn't send msg to listener */ } bool completed = poll_on_completion(b, completion_fd); bool completed = bus_poll_on_completion(b, completion_pipe); if (!completed) { /* listener hung up while waiting */ return false; } /* Lock hash table and forget whether this FD uses SSL. */ #ifndef TEST void *old_value = NULL; #endif if (0 != pthread_mutex_lock(&b->fd_set_lock)) { assert(false); } bool rm_ok = yacht_remove(b->fd_set, fd, &old_value); if (0 != pthread_mutex_unlock(&b->fd_set_lock)) { assert(false); } assert(rm_ok); if (!rm_ok) { return false; } connection_info *ci = (connection_info *)old_value; assert(ci != NULL); Loading @@ -513,19 +479,24 @@ bool bus_schedule_threadpool_task(struct bus *b, struct threadpool_task *task, return threadpool_schedule(b->threadpool, task, backpressure); } static void free_connection_cb(void *value, void *udata) { #ifndef TEST static #endif void free_connection_cb(void *value, void *udata) { struct bus *b = (struct bus *)udata; connection_info *ci = (connection_info *)value; int l_id = listener_id_of_socket(b, ci->fd); struct listener *l = b->listeners[l_id]; int completion_fd = -1; if (!listener_remove_socket(l, ci->fd, &completion_fd)) { #ifndef TEST int completion_pipe = -1; #endif if (!listener_remove_socket(l, ci->fd, &completion_pipe)) { return; /* couldn't send msg to listener */ } bool completed = poll_on_completion(b, completion_fd); bool completed = bus_poll_on_completion(b, completion_pipe); if (!completed) { return; } Loading @@ -550,22 +521,37 @@ bool bus_shutdown(bus *b) { b->fd_set = NULL; } #ifndef TEST int completion_pipe = -1; #endif BUS_LOG(b, 2, LOG_SHUTDOWN, "shutting down listener threads", b->udata); for (int i = 0; i < b->listener_count; i++) { if (!b->joined[i]) { BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128, "listener_shutdown -- %d", i); int completion_fd = -1; listener_shutdown(b->listeners[i], &completion_fd); poll_on_completion(b, completion_fd); if (!listener_shutdown(b->listeners[i], &completion_pipe)) { b->shutdown_state = SHUTDOWN_STATE_RUNNING; return false; } if (!bus_poll_on_completion(b, completion_pipe)) { b->shutdown_state = SHUTDOWN_STATE_RUNNING; return false; } BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128, "listener_shutdown -- joining %d", i); #ifndef TEST void *unused = NULL; int res = pthread_join(b->threads[i], &unused); #endif int res = syscall_pthread_join(b->threads[i], &unused); BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128, "listener_shutdown -- joined %d", i); assert(res == 0); if (res != 0) { b->shutdown_state = SHUTDOWN_STATE_RUNNING; return false; } b->joined[i] = true; } } Loading
src/lib/bus/bus.h +4 −20 Original line number Diff line number Diff line Loading @@ -45,9 +45,6 @@ bool bus_init(bus_config *config, struct bus_result *res); * */ bool bus_send_request(struct bus *b, bus_user_msg *msg); /* Get the string key for a log event ID. */ const char *bus_log_event_str(log_event_t event); /* Register a socket connected to an endpoint, and data that will be passed * to all interactions on that socket. * Loading @@ -65,25 +62,12 @@ bool bus_release_socket(struct bus *b, int fd, void **socket_udata_out); * has resolved. */ bool bus_shutdown(struct bus *b); /* For a given file descriptor, get the listener ID to use. * This will level sockets between multiple threads. */ struct listener *bus_get_listener_for_socket(struct bus *b, int fd); /* Schedule a task in the bus's threadpool. */ bool bus_schedule_threadpool_task(struct bus *b, struct threadpool_task *task, size_t *backpressure); /* Lock / unlock the log mutex, since logging can occur on several threads. */ void bus_lock_log(struct bus *b); void bus_unlock_log(struct bus *b); /* Free internal data structures for the bus. */ void bus_free(struct bus *b); /* Deliver a boxed message to the thread pool to execute. */ bool bus_process_boxed_message(struct bus *b, struct boxed_msg *box, size_t *backpressure); void bus_backpressure_delay(struct bus *b, size_t backpressure, uint8_t shift); /* Inward facing portion of the message bus -- functions called * by other parts of the message bus, like the Listener thread, * but not by code outside the bus. */ #include "bus_inward.h" #endif
src/lib/bus/bus_inward.h 0 → 100644 +29 −0 Original line number Diff line number Diff line #ifndef BUS_INWARD_H #define BUS_INWARD_H #include "bus_types.h" /* Get the string key for a log event ID. */ const char *bus_log_event_str(log_event_t event); /* For a given file descriptor, get the listener ID to use. * This will level sockets between multiple threads. */ struct listener *bus_get_listener_for_socket(struct bus *b, int fd); /* Schedule a task in the bus's threadpool. */ bool bus_schedule_threadpool_task(struct bus *b, struct threadpool_task *task, size_t *backpressure); /* Lock / unlock the log mutex, since logging can occur on several threads. */ void bus_lock_log(struct bus *b); void bus_unlock_log(struct bus *b); /* Deliver a boxed message to the thread pool to execute. */ bool bus_process_boxed_message(struct bus *b, struct boxed_msg *box, size_t *backpressure); /* Provide backpressure by sleeping for (backpressure >> shift) msec, if * the value is greater than 0. */ void bus_backpressure_delay(struct bus *b, size_t backpressure, uint8_t shift); #endif