Loading Makefile +1 −0 Original line number Diff line number Diff line Loading @@ -89,6 +89,7 @@ LIB_OBJS = \ $(OUT_DIR)/listener_io.o \ $(OUT_DIR)/listener_task.o \ $(OUT_DIR)/send.o \ $(OUT_DIR)/syscall.o \ $(OUT_DIR)/util.o \ $(OUT_DIR)/yacht.o \ Loading src/lib/bus/Makefile +1 −0 Original line number Diff line number Diff line Loading @@ -15,6 +15,7 @@ BUS_OBJS = \ listener_io.o \ listener_task.o \ send.o \ syscall.o \ util.o \ yacht.o \ Loading src/lib/bus/bus.c +6 −6 Original line number Diff line number Diff line Loading @@ -22,7 +22,6 @@ #include <string.h> #include <unistd.h> #include <pthread.h> #include <poll.h> #include <errno.h> #include <assert.h> #include <limits.h> Loading @@ -36,6 +35,7 @@ #include "bus_ssl.h" #include "util.h" #include "yacht.h" #include "syscall.h" #include "atomic.h" #include "listener_task.h" Loading Loading @@ -320,7 +320,7 @@ static bool poll_on_completion(struct bus *b, int fd) { for (;;) { BUS_LOG(b, 5, LOG_SENDING_REQUEST, "Polling on completion...tick...", b->udata); int res = poll(fds, 1, -1); int res = syscall_poll(fds, 1, -1); if (res == -1) { if (util_is_resumable_io_error(errno)) { BUS_LOG_SNPRINTF(b, 5, LOG_SENDING_REQUEST, b->udata, 64, Loading @@ -342,7 +342,7 @@ static bool poll_on_completion(struct bus *b, int fd) { } BUS_LOG(b, 3, LOG_SENDING_REQUEST, "Reading alert pipe...", b->udata); ssize_t sz = read(fd, read_buf, sizeof(read_buf)); ssize_t sz = syscall_read(fd, read_buf, sizeof(read_buf)); if (sz == sizeof(read_buf)) { /* Payload: little-endian uint16_t, msec of backpressure. */ Loading Loading @@ -587,7 +587,7 @@ void bus_backpressure_delay(struct bus *b, size_t backpressure, uint8_t shift) { if (backpressure > 0) { BUS_LOG_SNPRINTF(b, 8, LOG_SENDER, b->udata, 64, "backpressure %zd", backpressure); poll(NULL, 0, backpressure); syscall_poll(NULL, 0, backpressure); } } Loading Loading @@ -640,7 +640,7 @@ void bus_free(bus *b) { if (b == NULL) { return; } while (b->shutdown_state != SHUTDOWN_STATE_HALTED) { if (bus_shutdown(b)) { break; } poll(NULL, 0, 10); // sleep 10 msec syscall_poll(NULL, 0, 10); // sleep 10 msec } for (int i = 0; i < b->listener_count; i++) { Loading @@ -655,7 +655,7 @@ void bus_free(bus *b) { BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128, "threadpool_shutdown -- %d", i); if (threadpool_shutdown(b->threadpool, false)) { break; } (void)poll(NULL, 0, 10); (void)syscall_poll(NULL, 0, 10); if (i == limit - 1) { BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128, Loading src/lib/bus/bus_ssl.c +2 −1 Original line number Diff line number Diff line Loading @@ -21,6 +21,7 @@ #include <assert.h> #include "bus_ssl.h" #include "syscall.h" #include "util.h" #define TIMEOUT_MSEC 100 Loading Loading @@ -138,7 +139,7 @@ static bool do_blocking_connection(struct bus *b, SSL *ssl, int fd) { size_t elapsed = 0; while (!connected) { int pres = poll(fds, 1, TIMEOUT_MSEC); int pres = syscall_poll(fds, 1, TIMEOUT_MSEC); BUS_LOG_SNPRINTF(b, 5, LOG_SOCKET_REGISTERED, b->udata, 128, "SSL_Connect handshake for socket %d, poll res %d", fd, pres); Loading src/lib/bus/listener.c +10 −9 Original line number Diff line number Diff line Loading @@ -33,6 +33,7 @@ #include "listener_cmd.h" #include "listener_task.h" #include "listener_internal.h" #include "syscall.h" #include "util.h" #include "atomic.h" Loading Loading @@ -76,11 +77,11 @@ struct listener *listener_init(struct bus *b, struct bus_config *cfg) { if (0 != pipe(msg->pipes)) { for (int i = 0; i < pipe_count; i++) { listener_msg *msg = &l->msgs[i]; close(msg->pipes[0]); close(msg->pipes[1]); syscall_close(msg->pipes[0]); syscall_close(msg->pipes[1]); } close(l->commit_pipe); close(l->incoming_msg_pipe); syscall_close(l->commit_pipe); syscall_close(l->incoming_msg_pipe); free(l); return NULL; } Loading Loading @@ -225,16 +226,16 @@ void listener_free(struct listener *l) { break; } close(msg->pipes[0]); close(msg->pipes[1]); syscall_close(msg->pipes[0]); syscall_close(msg->pipes[1]); } if (l->read_buf) { free(l->read_buf); } close(l->commit_pipe); close(l->incoming_msg_pipe); syscall_close(l->commit_pipe); syscall_close(l->incoming_msg_pipe); free(l); } Loading @@ -250,7 +251,7 @@ static bool push_message(struct listener *l, listener_msg *msg, int *reply_fd) { if (reply_fd) { *reply_fd = msg->pipes[0]; } for (;;) { ssize_t wr = write(l->commit_pipe, msg_buf, sizeof(msg_buf)); ssize_t wr = syscall_write(l->commit_pipe, msg_buf, sizeof(msg_buf)); if (wr == sizeof(msg_buf)) { return true; // committed } else { Loading Loading
Makefile +1 −0 Original line number Diff line number Diff line Loading @@ -89,6 +89,7 @@ LIB_OBJS = \ $(OUT_DIR)/listener_io.o \ $(OUT_DIR)/listener_task.o \ $(OUT_DIR)/send.o \ $(OUT_DIR)/syscall.o \ $(OUT_DIR)/util.o \ $(OUT_DIR)/yacht.o \ Loading
src/lib/bus/Makefile +1 −0 Original line number Diff line number Diff line Loading @@ -15,6 +15,7 @@ BUS_OBJS = \ listener_io.o \ listener_task.o \ send.o \ syscall.o \ util.o \ yacht.o \ Loading
src/lib/bus/bus.c +6 −6 Original line number Diff line number Diff line Loading @@ -22,7 +22,6 @@ #include <string.h> #include <unistd.h> #include <pthread.h> #include <poll.h> #include <errno.h> #include <assert.h> #include <limits.h> Loading @@ -36,6 +35,7 @@ #include "bus_ssl.h" #include "util.h" #include "yacht.h" #include "syscall.h" #include "atomic.h" #include "listener_task.h" Loading Loading @@ -320,7 +320,7 @@ static bool poll_on_completion(struct bus *b, int fd) { for (;;) { BUS_LOG(b, 5, LOG_SENDING_REQUEST, "Polling on completion...tick...", b->udata); int res = poll(fds, 1, -1); int res = syscall_poll(fds, 1, -1); if (res == -1) { if (util_is_resumable_io_error(errno)) { BUS_LOG_SNPRINTF(b, 5, LOG_SENDING_REQUEST, b->udata, 64, Loading @@ -342,7 +342,7 @@ static bool poll_on_completion(struct bus *b, int fd) { } BUS_LOG(b, 3, LOG_SENDING_REQUEST, "Reading alert pipe...", b->udata); ssize_t sz = read(fd, read_buf, sizeof(read_buf)); ssize_t sz = syscall_read(fd, read_buf, sizeof(read_buf)); if (sz == sizeof(read_buf)) { /* Payload: little-endian uint16_t, msec of backpressure. */ Loading Loading @@ -587,7 +587,7 @@ void bus_backpressure_delay(struct bus *b, size_t backpressure, uint8_t shift) { if (backpressure > 0) { BUS_LOG_SNPRINTF(b, 8, LOG_SENDER, b->udata, 64, "backpressure %zd", backpressure); poll(NULL, 0, backpressure); syscall_poll(NULL, 0, backpressure); } } Loading Loading @@ -640,7 +640,7 @@ void bus_free(bus *b) { if (b == NULL) { return; } while (b->shutdown_state != SHUTDOWN_STATE_HALTED) { if (bus_shutdown(b)) { break; } poll(NULL, 0, 10); // sleep 10 msec syscall_poll(NULL, 0, 10); // sleep 10 msec } for (int i = 0; i < b->listener_count; i++) { Loading @@ -655,7 +655,7 @@ void bus_free(bus *b) { BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128, "threadpool_shutdown -- %d", i); if (threadpool_shutdown(b->threadpool, false)) { break; } (void)poll(NULL, 0, 10); (void)syscall_poll(NULL, 0, 10); if (i == limit - 1) { BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128, Loading
src/lib/bus/bus_ssl.c +2 −1 Original line number Diff line number Diff line Loading @@ -21,6 +21,7 @@ #include <assert.h> #include "bus_ssl.h" #include "syscall.h" #include "util.h" #define TIMEOUT_MSEC 100 Loading Loading @@ -138,7 +139,7 @@ static bool do_blocking_connection(struct bus *b, SSL *ssl, int fd) { size_t elapsed = 0; while (!connected) { int pres = poll(fds, 1, TIMEOUT_MSEC); int pres = syscall_poll(fds, 1, TIMEOUT_MSEC); BUS_LOG_SNPRINTF(b, 5, LOG_SOCKET_REGISTERED, b->udata, 128, "SSL_Connect handshake for socket %d, poll res %d", fd, pres); Loading
src/lib/bus/listener.c +10 −9 Original line number Diff line number Diff line Loading @@ -33,6 +33,7 @@ #include "listener_cmd.h" #include "listener_task.h" #include "listener_internal.h" #include "syscall.h" #include "util.h" #include "atomic.h" Loading Loading @@ -76,11 +77,11 @@ struct listener *listener_init(struct bus *b, struct bus_config *cfg) { if (0 != pipe(msg->pipes)) { for (int i = 0; i < pipe_count; i++) { listener_msg *msg = &l->msgs[i]; close(msg->pipes[0]); close(msg->pipes[1]); syscall_close(msg->pipes[0]); syscall_close(msg->pipes[1]); } close(l->commit_pipe); close(l->incoming_msg_pipe); syscall_close(l->commit_pipe); syscall_close(l->incoming_msg_pipe); free(l); return NULL; } Loading Loading @@ -225,16 +226,16 @@ void listener_free(struct listener *l) { break; } close(msg->pipes[0]); close(msg->pipes[1]); syscall_close(msg->pipes[0]); syscall_close(msg->pipes[1]); } if (l->read_buf) { free(l->read_buf); } close(l->commit_pipe); close(l->incoming_msg_pipe); syscall_close(l->commit_pipe); syscall_close(l->incoming_msg_pipe); free(l); } Loading @@ -250,7 +251,7 @@ static bool push_message(struct listener *l, listener_msg *msg, int *reply_fd) { if (reply_fd) { *reply_fd = msg->pipes[0]; } for (;;) { ssize_t wr = write(l->commit_pipe, msg_buf, sizeof(msg_buf)); ssize_t wr = syscall_write(l->commit_pipe, msg_buf, sizeof(msg_buf)); if (wr == sizeof(msg_buf)) { return true; // committed } else { Loading