Loading src/lib/bus/listener.c +80 −19 Original line number Diff line number Diff line Loading @@ -35,6 +35,8 @@ #include "atomic.h" #define DEFAULT_READ_BUF_SIZE (1024L * 1024L) #define INCOMING_MSG_PIPE 1 #define INCOMING_MSG_PIPE_ID 0 static void retry_delivery(listener *l, rx_info_t *info); Loading @@ -45,13 +47,26 @@ struct listener *listener_init(struct bus *b, struct bus_config *cfg) { l->bus = b; BUS_LOG(b, 2, LOG_LISTENER, "init", b->udata); int pipes[2]; if (0 != pipe(pipes)) { free(l); return NULL; } struct casq *q = casq_new(); if (q == NULL) { free(l); close(pipes[0]); close(pipes[1]); return NULL; } l->q = q; l->commit_pipe = pipes[1]; l->incoming_msg_pipe = pipes[0]; l->fds[INCOMING_MSG_PIPE_ID].fd = l->incoming_msg_pipe; l->fds[INCOMING_MSG_PIPE_ID].events = POLLIN; for (int i = 0; i < MAX_PENDING_MESSAGES; i++) { rx_info_t *info = &l->rx_info[i]; info->state = RIS_INACTIVE; Loading Loading @@ -255,13 +270,16 @@ void listener_free(struct listener *l) { for (int i = 0; i < l->tracked_fds; i++) { /* Forget off the front to stress forget_socket. */ forget_socket(l, l->fds[0].fd); forget_socket(l, l->fds[INCOMING_MSG_PIPE].fd); } if (l->read_buf) { free(l->read_buf); } close(l->commit_pipe); close(l->incoming_msg_pipe); free(l); } } Loading @@ -269,13 +287,11 @@ void listener_free(struct listener *l) { // ================================================== #define MAX_TIMEOUT 100 void *listener_mainloop(void *arg) { listener *self = (listener *)arg; assert(self); struct bus *b = self->bus; int timeout = 1; int timeout = MIN_DELAY; struct timeval tv; Loading @@ -296,7 +312,7 @@ void *listener_mainloop(void *arg) { last_sec = cur_sec; } int res = poll(self->fds, self->tracked_fds, timeout); int res = poll(self->fds, self->tracked_fds + INCOMING_MSG_PIPE, timeout); BUS_LOG_SNPRINTF(b, (res == 0 ? 6 : 4), LOG_LISTENER, b->udata, 64, "poll res %d", res); Loading @@ -319,6 +335,7 @@ void *listener_mainloop(void *arg) { assert(false); } } else if (res > 0) { check_and_flush_incoming_msg_pipe(self, &res); attempt_recv(self, res); work_done = true; } else { Loading @@ -328,11 +345,11 @@ void *listener_mainloop(void *arg) { if (work_done) { timeout = 0; } else if (timeout == 0) { timeout = 1; timeout = MIN_DELAY; } else { timeout <<= 1; if (timeout > MAX_TIMEOUT) { timeout = MAX_TIMEOUT; if (timeout > MAX_DELAY) { timeout = INFINITE_DELAY; } } } Loading @@ -341,6 +358,36 @@ void *listener_mainloop(void *arg) { return NULL; } static void check_and_flush_incoming_msg_pipe(listener *l, int *res) { struct bus *b = l->bus; short ev = l->fds[INCOMING_MSG_PIPE_ID].revents; if (ev & (POLLERR | POLLHUP | POLLNVAL)) { /* hangup/error */ return; } if (ev & POLLIN) { char buf[64]; for (;;) { ssize_t rd = read(l->fds[INCOMING_MSG_PIPE_ID].fd, buf, sizeof(buf)); if (rd == -1) { if (errno == EINTR) { errno = 0; continue; } else { BUS_LOG_SNPRINTF(b, 6, LOG_LISTENER, b->udata, 128, "check_and_flush_incoming_msg_pipe: %s", strerror(errno)); errno = 0; break; } } else { /* no-op, msg is unused */ (*res)--; break; } } } } #define RX_INFO_MAX_USED i <= l->rx_info_max_used //#define RX_INFO_MAX_USED i < MAX_PENDING_MESSAGES Loading Loading @@ -374,7 +421,7 @@ static void set_error_for_socket(listener *l, int id, int fd, rx_error_t err) { } } } l->fds[id].events &= ~POLLIN; l->fds[id + INCOMING_MSG_PIPE].events &= ~POLLIN; } static void print_SSL_error(struct bus *b, connection_info *ci, int lvl, const char *prefix) { Loading @@ -400,7 +447,7 @@ static void attempt_recv(listener *l, int available) { for (int i = 0; i < l->tracked_fds; i++) { if (read_from == available) { break; } struct pollfd *fd = &l->fds[i]; struct pollfd *fd = &l->fds[i + INCOMING_MSG_PIPE]; connection_info *ci = l->fd_info[i]; assert(ci->fd == fd->fd); Loading Loading @@ -1019,10 +1066,24 @@ static bool push_message(struct listener *l, listener_msg *msg) { struct bus *b = l->bus; if (casq_push(l->q, msg)) { retry: BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 128, "Pushed message -- %p -- of type %d", (void*)msg, msg->type); /* TODO: write a wake message to a pipe? */ ssize_t wr = write(l->commit_pipe, "!", 1); if (wr == 1) { return true; } else { assert(wr == -1); if (errno == EINTR) { errno = 0; goto retry; } else { BUS_LOG_SNPRINTF(b, 10, LOG_LISTENER, b->udata, 64, "write_commit errno %d", errno); errno = 0; return false; } } } else { BUS_LOG_SNPRINTF(b, 3 - 3, LOG_LISTENER, b->udata, 128, "Failed to pushed message -- %p", (void*)msg); Loading Loading @@ -1109,7 +1170,7 @@ static void add_socket(listener *l, connection_info *ci, int notify_fd) { BUS_LOG(b, 3, LOG_LISTENER, "FULL", b->udata); } for (int i = 0; i < l->tracked_fds; i++) { if (l->fds[i].fd == ci->fd) { if (l->fds[i + INCOMING_MSG_PIPE].fd == ci->fd) { free(ci); notify_caller(notify_fd); return; /* already present */ Loading @@ -1118,8 +1179,8 @@ static void add_socket(listener *l, connection_info *ci, int notify_fd) { int id = l->tracked_fds; l->fd_info[id] = ci; l->fds[id].fd = ci->fd; l->fds[id].events = POLLIN; l->fds[id + INCOMING_MSG_PIPE].fd = ci->fd; l->fds[id + INCOMING_MSG_PIPE].events = POLLIN; l->tracked_fds++; /* Prime the pump by sinking 0 bytes and getting a size to expect. */ Loading Loading @@ -1152,12 +1213,12 @@ static void forget_socket(listener *l, int fd) { /* don't really close it, just drop info about it in the listener */ for (int i = 0; i < l->tracked_fds; i++) { if (l->fds[i].fd == fd) { if (l->fds[i + INCOMING_MSG_PIPE].fd == fd) { if (l->tracked_fds > 1) { /* Swap pollfd CI and last ones. */ struct pollfd pfd = l->fds[i]; l->fds[i] = l->fds[l->tracked_fds - 1]; l->fds[l->tracked_fds - 1] = pfd; struct pollfd pfd = l->fds[i + INCOMING_MSG_PIPE]; l->fds[i + INCOMING_MSG_PIPE] = l->fds[l->tracked_fds - 1 + INCOMING_MSG_PIPE]; l->fds[l->tracked_fds - 1 + INCOMING_MSG_PIPE] = pfd; connection_info *ci = l->fd_info[i]; l->fd_info[i] = l->fd_info[l->tracked_fds - 1]; l->fd_info[l->tracked_fds - 1] = ci; Loading src/lib/bus/listener_internal.h +12 −1 Original line number Diff line number Diff line Loading @@ -97,12 +97,21 @@ typedef struct rx_info_t { #define MAX_QUEUE_MESSAGES 32 typedef uint32_t msg_flag_t; /* Minimum and maximum poll() delays for listener, before going dormant. */ #define MIN_DELAY 10 #define MAX_DELAY 100 #define INFINITE_DELAY (-1) /* Receiver of responses */ typedef struct listener { struct bus *bus; struct casq *q; bool shutdown; /* Pipes used to wake the sleeping listener on queue input. */ int commit_pipe; int incoming_msg_pipe; rx_info_t rx_info[MAX_PENDING_MESSAGES]; rx_info_t *rx_info_freelist; uint16_t rx_info_in_use; Loading @@ -116,7 +125,8 @@ typedef struct listener { size_t upstream_backpressure; uint16_t tracked_fds; struct pollfd fds[MAX_FDS]; /* tracked_fds + incoming_msg_pipe */ struct pollfd fds[MAX_FDS + 1]; connection_info *fd_info[MAX_FDS]; /* Read buffer and it's size. Will be grown on demand. */ Loading @@ -140,6 +150,7 @@ static void attempt_delivery(listener *l, struct rx_info_t *info); static void clean_up_completed_info(listener *l, rx_info_t *info); static void observe_backpressure(listener *l, size_t backpressure); static bool grow_read_buf(listener *l, size_t nsize); static void check_and_flush_incoming_msg_pipe(listener *l, int *res); static rx_info_t *get_hold_rx_info(listener *l, int fd, int64_t seq_id); static void dump_rx_info_table(listener *l); Loading Loading
src/lib/bus/listener.c +80 −19 Original line number Diff line number Diff line Loading @@ -35,6 +35,8 @@ #include "atomic.h" #define DEFAULT_READ_BUF_SIZE (1024L * 1024L) #define INCOMING_MSG_PIPE 1 #define INCOMING_MSG_PIPE_ID 0 static void retry_delivery(listener *l, rx_info_t *info); Loading @@ -45,13 +47,26 @@ struct listener *listener_init(struct bus *b, struct bus_config *cfg) { l->bus = b; BUS_LOG(b, 2, LOG_LISTENER, "init", b->udata); int pipes[2]; if (0 != pipe(pipes)) { free(l); return NULL; } struct casq *q = casq_new(); if (q == NULL) { free(l); close(pipes[0]); close(pipes[1]); return NULL; } l->q = q; l->commit_pipe = pipes[1]; l->incoming_msg_pipe = pipes[0]; l->fds[INCOMING_MSG_PIPE_ID].fd = l->incoming_msg_pipe; l->fds[INCOMING_MSG_PIPE_ID].events = POLLIN; for (int i = 0; i < MAX_PENDING_MESSAGES; i++) { rx_info_t *info = &l->rx_info[i]; info->state = RIS_INACTIVE; Loading Loading @@ -255,13 +270,16 @@ void listener_free(struct listener *l) { for (int i = 0; i < l->tracked_fds; i++) { /* Forget off the front to stress forget_socket. */ forget_socket(l, l->fds[0].fd); forget_socket(l, l->fds[INCOMING_MSG_PIPE].fd); } if (l->read_buf) { free(l->read_buf); } close(l->commit_pipe); close(l->incoming_msg_pipe); free(l); } } Loading @@ -269,13 +287,11 @@ void listener_free(struct listener *l) { // ================================================== #define MAX_TIMEOUT 100 void *listener_mainloop(void *arg) { listener *self = (listener *)arg; assert(self); struct bus *b = self->bus; int timeout = 1; int timeout = MIN_DELAY; struct timeval tv; Loading @@ -296,7 +312,7 @@ void *listener_mainloop(void *arg) { last_sec = cur_sec; } int res = poll(self->fds, self->tracked_fds, timeout); int res = poll(self->fds, self->tracked_fds + INCOMING_MSG_PIPE, timeout); BUS_LOG_SNPRINTF(b, (res == 0 ? 6 : 4), LOG_LISTENER, b->udata, 64, "poll res %d", res); Loading @@ -319,6 +335,7 @@ void *listener_mainloop(void *arg) { assert(false); } } else if (res > 0) { check_and_flush_incoming_msg_pipe(self, &res); attempt_recv(self, res); work_done = true; } else { Loading @@ -328,11 +345,11 @@ void *listener_mainloop(void *arg) { if (work_done) { timeout = 0; } else if (timeout == 0) { timeout = 1; timeout = MIN_DELAY; } else { timeout <<= 1; if (timeout > MAX_TIMEOUT) { timeout = MAX_TIMEOUT; if (timeout > MAX_DELAY) { timeout = INFINITE_DELAY; } } } Loading @@ -341,6 +358,36 @@ void *listener_mainloop(void *arg) { return NULL; } static void check_and_flush_incoming_msg_pipe(listener *l, int *res) { struct bus *b = l->bus; short ev = l->fds[INCOMING_MSG_PIPE_ID].revents; if (ev & (POLLERR | POLLHUP | POLLNVAL)) { /* hangup/error */ return; } if (ev & POLLIN) { char buf[64]; for (;;) { ssize_t rd = read(l->fds[INCOMING_MSG_PIPE_ID].fd, buf, sizeof(buf)); if (rd == -1) { if (errno == EINTR) { errno = 0; continue; } else { BUS_LOG_SNPRINTF(b, 6, LOG_LISTENER, b->udata, 128, "check_and_flush_incoming_msg_pipe: %s", strerror(errno)); errno = 0; break; } } else { /* no-op, msg is unused */ (*res)--; break; } } } } #define RX_INFO_MAX_USED i <= l->rx_info_max_used //#define RX_INFO_MAX_USED i < MAX_PENDING_MESSAGES Loading Loading @@ -374,7 +421,7 @@ static void set_error_for_socket(listener *l, int id, int fd, rx_error_t err) { } } } l->fds[id].events &= ~POLLIN; l->fds[id + INCOMING_MSG_PIPE].events &= ~POLLIN; } static void print_SSL_error(struct bus *b, connection_info *ci, int lvl, const char *prefix) { Loading @@ -400,7 +447,7 @@ static void attempt_recv(listener *l, int available) { for (int i = 0; i < l->tracked_fds; i++) { if (read_from == available) { break; } struct pollfd *fd = &l->fds[i]; struct pollfd *fd = &l->fds[i + INCOMING_MSG_PIPE]; connection_info *ci = l->fd_info[i]; assert(ci->fd == fd->fd); Loading Loading @@ -1019,10 +1066,24 @@ static bool push_message(struct listener *l, listener_msg *msg) { struct bus *b = l->bus; if (casq_push(l->q, msg)) { retry: BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 128, "Pushed message -- %p -- of type %d", (void*)msg, msg->type); /* TODO: write a wake message to a pipe? */ ssize_t wr = write(l->commit_pipe, "!", 1); if (wr == 1) { return true; } else { assert(wr == -1); if (errno == EINTR) { errno = 0; goto retry; } else { BUS_LOG_SNPRINTF(b, 10, LOG_LISTENER, b->udata, 64, "write_commit errno %d", errno); errno = 0; return false; } } } else { BUS_LOG_SNPRINTF(b, 3 - 3, LOG_LISTENER, b->udata, 128, "Failed to pushed message -- %p", (void*)msg); Loading Loading @@ -1109,7 +1170,7 @@ static void add_socket(listener *l, connection_info *ci, int notify_fd) { BUS_LOG(b, 3, LOG_LISTENER, "FULL", b->udata); } for (int i = 0; i < l->tracked_fds; i++) { if (l->fds[i].fd == ci->fd) { if (l->fds[i + INCOMING_MSG_PIPE].fd == ci->fd) { free(ci); notify_caller(notify_fd); return; /* already present */ Loading @@ -1118,8 +1179,8 @@ static void add_socket(listener *l, connection_info *ci, int notify_fd) { int id = l->tracked_fds; l->fd_info[id] = ci; l->fds[id].fd = ci->fd; l->fds[id].events = POLLIN; l->fds[id + INCOMING_MSG_PIPE].fd = ci->fd; l->fds[id + INCOMING_MSG_PIPE].events = POLLIN; l->tracked_fds++; /* Prime the pump by sinking 0 bytes and getting a size to expect. */ Loading Loading @@ -1152,12 +1213,12 @@ static void forget_socket(listener *l, int fd) { /* don't really close it, just drop info about it in the listener */ for (int i = 0; i < l->tracked_fds; i++) { if (l->fds[i].fd == fd) { if (l->fds[i + INCOMING_MSG_PIPE].fd == fd) { if (l->tracked_fds > 1) { /* Swap pollfd CI and last ones. */ struct pollfd pfd = l->fds[i]; l->fds[i] = l->fds[l->tracked_fds - 1]; l->fds[l->tracked_fds - 1] = pfd; struct pollfd pfd = l->fds[i + INCOMING_MSG_PIPE]; l->fds[i + INCOMING_MSG_PIPE] = l->fds[l->tracked_fds - 1 + INCOMING_MSG_PIPE]; l->fds[l->tracked_fds - 1 + INCOMING_MSG_PIPE] = pfd; connection_info *ci = l->fd_info[i]; l->fd_info[i] = l->fd_info[l->tracked_fds - 1]; l->fd_info[l->tracked_fds - 1] = ci; Loading
src/lib/bus/listener_internal.h +12 −1 Original line number Diff line number Diff line Loading @@ -97,12 +97,21 @@ typedef struct rx_info_t { #define MAX_QUEUE_MESSAGES 32 typedef uint32_t msg_flag_t; /* Minimum and maximum poll() delays for listener, before going dormant. */ #define MIN_DELAY 10 #define MAX_DELAY 100 #define INFINITE_DELAY (-1) /* Receiver of responses */ typedef struct listener { struct bus *bus; struct casq *q; bool shutdown; /* Pipes used to wake the sleeping listener on queue input. */ int commit_pipe; int incoming_msg_pipe; rx_info_t rx_info[MAX_PENDING_MESSAGES]; rx_info_t *rx_info_freelist; uint16_t rx_info_in_use; Loading @@ -116,7 +125,8 @@ typedef struct listener { size_t upstream_backpressure; uint16_t tracked_fds; struct pollfd fds[MAX_FDS]; /* tracked_fds + incoming_msg_pipe */ struct pollfd fds[MAX_FDS + 1]; connection_info *fd_info[MAX_FDS]; /* Read buffer and it's size. Will be grown on demand. */ Loading @@ -140,6 +150,7 @@ static void attempt_delivery(listener *l, struct rx_info_t *info); static void clean_up_completed_info(listener *l, rx_info_t *info); static void observe_backpressure(listener *l, size_t backpressure); static bool grow_read_buf(listener *l, size_t nsize); static void check_and_flush_incoming_msg_pipe(listener *l, int *res); static rx_info_t *get_hold_rx_info(listener *l, int fd, int64_t seq_id); static void dump_rx_info_table(listener *l); Loading