Loading src/lib/bus/sender.c +58 −26 Original line number Diff line number Diff line Loading @@ -77,12 +77,13 @@ struct sender *sender_init(struct bus *b, struct bus_config *cfg) { return NULL; } s->fds[i].fd = -1; s->fds[i].fd = SENDER_FD_NOT_IN_USE; s->fds[i].events = POLLOUT; int *p_id = (int *)(&s->tx_info[i].id); *p_id = i; s->tx_info[i].timeout_sec = TIMEOUT_NOT_YET_SET; s->tx_info[i].fd = SENDER_FD_NOT_IN_USE; } BUS_LOG(b, 2, LOG_SENDER, "init success", b->udata); Loading Loading @@ -122,6 +123,7 @@ bool sender_enqueue_message(struct sender *s, /* Check if FD is being watched. If info is non-NULL, then ignore that record. */ static bool is_watched(struct sender *s, int fd, tx_info_t *ignore_info) { assert(fd != SENDER_FD_NOT_IN_USE); #if USE_HASH_TABLE if (ignore_info == NULL) { return yacht_member(s->fd_hash_table, fd); Loading Loading @@ -160,6 +162,7 @@ static bool add_fd_to_watch_set(struct sender *s, int fd, SSL *ssl) { BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64, "adding FD %d to watch set", fd); int idx = s->active_fds; assert(s->fds[idx].fd == SENDER_FD_NOT_IN_USE); s->fds[idx].fd = fd; s->fds[idx].events |= POLLOUT; s->active_fds++; Loading Loading @@ -205,6 +208,7 @@ static bool remove_fd_from_watch_set(struct sender *s, tx_info_t *info) { if (s->active_fds > 1) { s->fds[i].fd = s->fds[s->active_fds - 1].fd; } s->fds[s->active_fds - 1].fd = SENDER_FD_NOT_IN_USE; s->active_fds--; void *old = NULL; if (!yacht_remove(s->fd_hash_table, info->fd, &old)) { Loading @@ -230,12 +234,15 @@ static bool populate_tx_info(struct sender *s, struct bus *b = s->bus; assert(info->box == NULL); info->box = box; assert(info->fd == SENDER_FD_NOT_IN_USE); info->fd = box->fd; info->retries = 0; int fd = info->box->fd; BUS_LOG_SNPRINTF(b, 10, LOG_SENDER, b->udata, 64, "set info->id %d to have new box->out_msg %p", info->id, box->out_msg); if (!add_fd_to_watch_set(s, fd, box->ssl)) { if (!add_fd_to_watch_set(s, info->fd, box->ssl)) { BUS_LOG(b, /*3*/0, LOG_SENDER, "add_fd_to_watch_set FAILED", b->udata); return false; } Loading @@ -247,29 +254,24 @@ static bool populate_tx_info(struct sender *s, } static void release_tx_info(struct sender *s, tx_info_t *info) { info->timeout_sec = TIMEOUT_NOT_YET_SET; struct bus *b = s->bus; assert(info->id < MAX_CONCURRENT_SENDS); /* the box should already be released to the listener or threadpool. */ assert(info->box == NULL); info->timeout_sec = TIMEOUT_NOT_YET_SET; info->error = TX_ERROR_NONE; remove_fd_from_watch_set(s, info); info->fd = SENDER_FD_NOT_IN_USE; for (;;) { tx_flag_t cur = s->tx_flags; cur &= ~(1 << info->id); if (ATOMIC_BOOL_COMPARE_AND_SWAP(&s->tx_flags, s->tx_flags, cur)) { for (;;) { BUS_LOG(b, 6, LOG_SENDER, "released TX info", b->udata); int16_t tiu = s->tx_info_in_use; if (ATOMIC_BOOL_COMPARE_AND_SWAP(&s->tx_info_in_use, tiu, tiu - 1)) { return; } } BUS_LOG(b, 10, LOG_SENDER, "released TX info", b->udata); break; } } } Loading @@ -287,19 +289,23 @@ static tx_info_t *get_free_tx_info(struct sender *s) { tx_flag_t cur = s->tx_flags; tx_flag_t bit = (1 << i); if ((cur & bit) == 0) { cur |= bit; if (ATOMIC_BOOL_COMPARE_AND_SWAP(&s->tx_flags, s->tx_flags, cur)) { tx_flag_t marked = cur | bit; if (ATOMIC_BOOL_COMPARE_AND_SWAP(&s->tx_flags, cur, marked)) { /* *info is now reserved. */ struct bus *b = s->bus; BUS_LOG(b, 6, LOG_SENDER, "reserving TX info", b->udata); s->tx_info_in_use++; /* *info is now reserved. */ tx_info_t *info = &s->tx_info[i]; assert(info->id == i); assert(info->fd == SENDER_FD_NOT_IN_USE); /* Note: Intentionally _not_ memsetting info to 0 here, * so as to avoid race condition with timeout. */ info->error = TX_ERROR_NONE; BUS_LOG_SNPRINTF(b, 10, LOG_SENDER, b->udata, 64, "got free info %d", info->id); assert(info->box == NULL); info->sent_size = 0; return info; Loading Loading @@ -424,7 +430,8 @@ static void attempt_write(sender *s, int available) { if (written == available) { break; } struct pollfd *pfd = &s->fds[i]; BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64, assert(pfd->fd != SENDER_FD_NOT_IN_USE); BUS_LOG_SNPRINTF(b, 10, LOG_SENDER, b->udata, 64, "attempting write on %d (revents 0x%08x)", pfd->fd, pfd->revents); if (pfd->revents & POLLERR) { Loading @@ -445,12 +452,13 @@ static void attempt_write(sender *s, int available) { } assert(info); if (info->error != 0) { BUS_LOG_SNPRINTF(b, /*4*/0, LOG_SENDER, b->udata, 64, BUS_LOG_SNPRINTF(b, 2, LOG_SENDER, b->udata, 64, "socket has failed, let it time out: %d", info->error); continue; /* socket has failed, let it time out */ } assert(info->box); assert(info->box->out_msg); size_t msg_size = info->box->out_msg_size; size_t rem = msg_size - info->sent_size; SSL *ssl = info->box->ssl; Loading Loading @@ -480,6 +488,10 @@ static ssize_t socket_write_plain(sender *s, tx_info_t *info) { uint8_t *msg = info->box->out_msg; size_t msg_size = info->box->out_msg_size; size_t rem = msg_size - info->sent_size; BUS_LOG_SNPRINTF(b, 9, LOG_SENDER, b->udata, 64, "write %p to %d, %zd bytes (info %d)", &msg[info->sent_size], info->fd, rem, info->id); ssize_t wrsz = write(info->fd, &msg[info->sent_size], rem); if (wrsz == -1) { if (util_is_resumable_io_error(errno)) { Loading @@ -493,11 +505,13 @@ static ssize_t socket_write_plain(sender *s, tx_info_t *info) { errno = 0; return 0; } } else { } else if (wrsz > 0) { update_sent(b, s, info, wrsz); BUS_LOG_SNPRINTF(b, 5, LOG_SENDER, b->udata, 64, "sent: %zd\n", wrsz); return wrsz; } else { return 0; } } Loading Loading @@ -554,20 +568,25 @@ static ssize_t socket_write_ssl(sender *s, tx_info_t *info, SSL *ssl) { static void update_sent(struct bus *b, sender *s, tx_info_t *info, ssize_t sent) { size_t msg_size = info->box->out_msg_size; size_t rem = msg_size - info->sent_size; info->sent_size += sent; BUS_LOG_SNPRINTF(b, 5, LOG_SENDER, b->udata, 64, "wrote %zd", sent); if (rem - sent == 0) { /* completed! */ size_t rem = msg_size - info->sent_size; BUS_LOG_SNPRINTF(b, 5, LOG_SENDER, b->udata, 64, "wrote %zd, msg_size %zd (%p)", sent, msg_size, info->box->out_msg); if (rem == 0) { /* completed! */ BUS_LOG_SNPRINTF(b, 5, LOG_SENDER, b->udata, 64, "wrote all of %p, clearing", info->box->out_msg); attempt_to_enqueue_message_to_listener(s, info); } } static bool write_backpressure(sender *s, tx_info_t *info, uint16_t bp) { uint8_t buf[2]; buf[0] = (uint8_t)((bp & 0x00FF) >> 0); buf[1] = (uint8_t)((bp & 0xFF00) >> 8); int pipe_fd = s->pipes[info->id][1]; ssize_t res = write(pipe_fd, buf, sizeof(buf)); if (res == -1) { Loading @@ -585,9 +604,15 @@ static bool write_backpressure(sender *s, tx_info_t *info, uint16_t bp) { static void tick_handler(sender *s) { struct bus *b = s->bus; int tx_info_in_use = 0; for (int i = 0; i < MAX_CONCURRENT_SENDS; i++) { tx_flag_t bit = (1 << i); if (s->tx_flags & bit) { tx_info_in_use++; } } BUS_LOG_SNPRINTF(b, 1, LOG_SENDER, b->udata, 64, "tick... %p -- %d of %d tx_info in use, %d active FDs\n", s, s->tx_info_in_use, MAX_CONCURRENT_SENDS, s->active_fds); s, tx_info_in_use, MAX_CONCURRENT_SENDS, s->active_fds); /* Walk table and expire timeouts & items which have been sent. * Loading Loading @@ -629,16 +654,23 @@ static void attempt_to_enqueue_message_to_listener(sender *s, tx_info_t *info) { struct listener *l = bus_get_listener_for_socket(s->bus, info->fd); struct boxed_msg *box = info->box; assert(info->sent_size = box->out_msg_size); info->box = NULL; /* passed on to listener */ uint8_t *out_msg = box->out_msg; box->out_msg = NULL; /* release value, pointer will be stale after returning */ uint16_t backpressure = 0; /* If this succeeds, then this thread cannot touch the box anymore. */ if (listener_expect_response(l, box, &backpressure)) { box->out_msg = NULL; /* release value, pointer will be stale after returning */ BUS_LOG_SNPRINTF(b, 9, LOG_SENDER, b->udata, 64, "unblocking listener, releasing msg: %p, backpressure %u\n", out_msg, backpressure); write_backpressure(s, info, backpressure); /* alert blocked client thread */ BUS_LOG_SNPRINTF(b, 8, LOG_SENDER, b->udata, 128, "release_tx_info %d", __LINE__); release_tx_info(s, info); } else { BUS_LOG(b, 2, LOG_SENDER, "failed delivery", b->udata); box->out_msg = out_msg; info->box = box; /* return it since we need to keep managing it */ info->retries++; if (info->retries == SENDER_MAX_DELIVERY_RETRIES) { Loading src/lib/bus/sender_internal.h +4 −2 Original line number Diff line number Diff line Loading @@ -32,10 +32,13 @@ typedef enum { TX_ERROR_WRITE_FAILURE = -3, } tx_error_t; #define SENDER_FD_NOT_IN_USE (-1) /* Metadata for a message in-flight. */ typedef struct { /* Set during initialization only. */ const int id; /* must be SENDER_FD_NOT_IN_USE when not in use */ int fd; /* Should be either TIMEOUT_NOT_YET_SET or a number of seconds, which Loading Loading @@ -69,7 +72,6 @@ typedef struct sender { /* Outgoing message data. */ tx_flag_t tx_flags; tx_info_t tx_info[MAX_CONCURRENT_SENDS]; int16_t tx_info_in_use; int pipes[MAX_CONCURRENT_SENDS][2]; Loading Loading
src/lib/bus/sender.c +58 −26 Original line number Diff line number Diff line Loading @@ -77,12 +77,13 @@ struct sender *sender_init(struct bus *b, struct bus_config *cfg) { return NULL; } s->fds[i].fd = -1; s->fds[i].fd = SENDER_FD_NOT_IN_USE; s->fds[i].events = POLLOUT; int *p_id = (int *)(&s->tx_info[i].id); *p_id = i; s->tx_info[i].timeout_sec = TIMEOUT_NOT_YET_SET; s->tx_info[i].fd = SENDER_FD_NOT_IN_USE; } BUS_LOG(b, 2, LOG_SENDER, "init success", b->udata); Loading Loading @@ -122,6 +123,7 @@ bool sender_enqueue_message(struct sender *s, /* Check if FD is being watched. If info is non-NULL, then ignore that record. */ static bool is_watched(struct sender *s, int fd, tx_info_t *ignore_info) { assert(fd != SENDER_FD_NOT_IN_USE); #if USE_HASH_TABLE if (ignore_info == NULL) { return yacht_member(s->fd_hash_table, fd); Loading Loading @@ -160,6 +162,7 @@ static bool add_fd_to_watch_set(struct sender *s, int fd, SSL *ssl) { BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64, "adding FD %d to watch set", fd); int idx = s->active_fds; assert(s->fds[idx].fd == SENDER_FD_NOT_IN_USE); s->fds[idx].fd = fd; s->fds[idx].events |= POLLOUT; s->active_fds++; Loading Loading @@ -205,6 +208,7 @@ static bool remove_fd_from_watch_set(struct sender *s, tx_info_t *info) { if (s->active_fds > 1) { s->fds[i].fd = s->fds[s->active_fds - 1].fd; } s->fds[s->active_fds - 1].fd = SENDER_FD_NOT_IN_USE; s->active_fds--; void *old = NULL; if (!yacht_remove(s->fd_hash_table, info->fd, &old)) { Loading @@ -230,12 +234,15 @@ static bool populate_tx_info(struct sender *s, struct bus *b = s->bus; assert(info->box == NULL); info->box = box; assert(info->fd == SENDER_FD_NOT_IN_USE); info->fd = box->fd; info->retries = 0; int fd = info->box->fd; BUS_LOG_SNPRINTF(b, 10, LOG_SENDER, b->udata, 64, "set info->id %d to have new box->out_msg %p", info->id, box->out_msg); if (!add_fd_to_watch_set(s, fd, box->ssl)) { if (!add_fd_to_watch_set(s, info->fd, box->ssl)) { BUS_LOG(b, /*3*/0, LOG_SENDER, "add_fd_to_watch_set FAILED", b->udata); return false; } Loading @@ -247,29 +254,24 @@ static bool populate_tx_info(struct sender *s, } static void release_tx_info(struct sender *s, tx_info_t *info) { info->timeout_sec = TIMEOUT_NOT_YET_SET; struct bus *b = s->bus; assert(info->id < MAX_CONCURRENT_SENDS); /* the box should already be released to the listener or threadpool. */ assert(info->box == NULL); info->timeout_sec = TIMEOUT_NOT_YET_SET; info->error = TX_ERROR_NONE; remove_fd_from_watch_set(s, info); info->fd = SENDER_FD_NOT_IN_USE; for (;;) { tx_flag_t cur = s->tx_flags; cur &= ~(1 << info->id); if (ATOMIC_BOOL_COMPARE_AND_SWAP(&s->tx_flags, s->tx_flags, cur)) { for (;;) { BUS_LOG(b, 6, LOG_SENDER, "released TX info", b->udata); int16_t tiu = s->tx_info_in_use; if (ATOMIC_BOOL_COMPARE_AND_SWAP(&s->tx_info_in_use, tiu, tiu - 1)) { return; } } BUS_LOG(b, 10, LOG_SENDER, "released TX info", b->udata); break; } } } Loading @@ -287,19 +289,23 @@ static tx_info_t *get_free_tx_info(struct sender *s) { tx_flag_t cur = s->tx_flags; tx_flag_t bit = (1 << i); if ((cur & bit) == 0) { cur |= bit; if (ATOMIC_BOOL_COMPARE_AND_SWAP(&s->tx_flags, s->tx_flags, cur)) { tx_flag_t marked = cur | bit; if (ATOMIC_BOOL_COMPARE_AND_SWAP(&s->tx_flags, cur, marked)) { /* *info is now reserved. */ struct bus *b = s->bus; BUS_LOG(b, 6, LOG_SENDER, "reserving TX info", b->udata); s->tx_info_in_use++; /* *info is now reserved. */ tx_info_t *info = &s->tx_info[i]; assert(info->id == i); assert(info->fd == SENDER_FD_NOT_IN_USE); /* Note: Intentionally _not_ memsetting info to 0 here, * so as to avoid race condition with timeout. */ info->error = TX_ERROR_NONE; BUS_LOG_SNPRINTF(b, 10, LOG_SENDER, b->udata, 64, "got free info %d", info->id); assert(info->box == NULL); info->sent_size = 0; return info; Loading Loading @@ -424,7 +430,8 @@ static void attempt_write(sender *s, int available) { if (written == available) { break; } struct pollfd *pfd = &s->fds[i]; BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64, assert(pfd->fd != SENDER_FD_NOT_IN_USE); BUS_LOG_SNPRINTF(b, 10, LOG_SENDER, b->udata, 64, "attempting write on %d (revents 0x%08x)", pfd->fd, pfd->revents); if (pfd->revents & POLLERR) { Loading @@ -445,12 +452,13 @@ static void attempt_write(sender *s, int available) { } assert(info); if (info->error != 0) { BUS_LOG_SNPRINTF(b, /*4*/0, LOG_SENDER, b->udata, 64, BUS_LOG_SNPRINTF(b, 2, LOG_SENDER, b->udata, 64, "socket has failed, let it time out: %d", info->error); continue; /* socket has failed, let it time out */ } assert(info->box); assert(info->box->out_msg); size_t msg_size = info->box->out_msg_size; size_t rem = msg_size - info->sent_size; SSL *ssl = info->box->ssl; Loading Loading @@ -480,6 +488,10 @@ static ssize_t socket_write_plain(sender *s, tx_info_t *info) { uint8_t *msg = info->box->out_msg; size_t msg_size = info->box->out_msg_size; size_t rem = msg_size - info->sent_size; BUS_LOG_SNPRINTF(b, 9, LOG_SENDER, b->udata, 64, "write %p to %d, %zd bytes (info %d)", &msg[info->sent_size], info->fd, rem, info->id); ssize_t wrsz = write(info->fd, &msg[info->sent_size], rem); if (wrsz == -1) { if (util_is_resumable_io_error(errno)) { Loading @@ -493,11 +505,13 @@ static ssize_t socket_write_plain(sender *s, tx_info_t *info) { errno = 0; return 0; } } else { } else if (wrsz > 0) { update_sent(b, s, info, wrsz); BUS_LOG_SNPRINTF(b, 5, LOG_SENDER, b->udata, 64, "sent: %zd\n", wrsz); return wrsz; } else { return 0; } } Loading Loading @@ -554,20 +568,25 @@ static ssize_t socket_write_ssl(sender *s, tx_info_t *info, SSL *ssl) { static void update_sent(struct bus *b, sender *s, tx_info_t *info, ssize_t sent) { size_t msg_size = info->box->out_msg_size; size_t rem = msg_size - info->sent_size; info->sent_size += sent; BUS_LOG_SNPRINTF(b, 5, LOG_SENDER, b->udata, 64, "wrote %zd", sent); if (rem - sent == 0) { /* completed! */ size_t rem = msg_size - info->sent_size; BUS_LOG_SNPRINTF(b, 5, LOG_SENDER, b->udata, 64, "wrote %zd, msg_size %zd (%p)", sent, msg_size, info->box->out_msg); if (rem == 0) { /* completed! */ BUS_LOG_SNPRINTF(b, 5, LOG_SENDER, b->udata, 64, "wrote all of %p, clearing", info->box->out_msg); attempt_to_enqueue_message_to_listener(s, info); } } static bool write_backpressure(sender *s, tx_info_t *info, uint16_t bp) { uint8_t buf[2]; buf[0] = (uint8_t)((bp & 0x00FF) >> 0); buf[1] = (uint8_t)((bp & 0xFF00) >> 8); int pipe_fd = s->pipes[info->id][1]; ssize_t res = write(pipe_fd, buf, sizeof(buf)); if (res == -1) { Loading @@ -585,9 +604,15 @@ static bool write_backpressure(sender *s, tx_info_t *info, uint16_t bp) { static void tick_handler(sender *s) { struct bus *b = s->bus; int tx_info_in_use = 0; for (int i = 0; i < MAX_CONCURRENT_SENDS; i++) { tx_flag_t bit = (1 << i); if (s->tx_flags & bit) { tx_info_in_use++; } } BUS_LOG_SNPRINTF(b, 1, LOG_SENDER, b->udata, 64, "tick... %p -- %d of %d tx_info in use, %d active FDs\n", s, s->tx_info_in_use, MAX_CONCURRENT_SENDS, s->active_fds); s, tx_info_in_use, MAX_CONCURRENT_SENDS, s->active_fds); /* Walk table and expire timeouts & items which have been sent. * Loading Loading @@ -629,16 +654,23 @@ static void attempt_to_enqueue_message_to_listener(sender *s, tx_info_t *info) { struct listener *l = bus_get_listener_for_socket(s->bus, info->fd); struct boxed_msg *box = info->box; assert(info->sent_size = box->out_msg_size); info->box = NULL; /* passed on to listener */ uint8_t *out_msg = box->out_msg; box->out_msg = NULL; /* release value, pointer will be stale after returning */ uint16_t backpressure = 0; /* If this succeeds, then this thread cannot touch the box anymore. */ if (listener_expect_response(l, box, &backpressure)) { box->out_msg = NULL; /* release value, pointer will be stale after returning */ BUS_LOG_SNPRINTF(b, 9, LOG_SENDER, b->udata, 64, "unblocking listener, releasing msg: %p, backpressure %u\n", out_msg, backpressure); write_backpressure(s, info, backpressure); /* alert blocked client thread */ BUS_LOG_SNPRINTF(b, 8, LOG_SENDER, b->udata, 128, "release_tx_info %d", __LINE__); release_tx_info(s, info); } else { BUS_LOG(b, 2, LOG_SENDER, "failed delivery", b->udata); box->out_msg = out_msg; info->box = box; /* return it since we need to keep managing it */ info->retries++; if (info->retries == SENDER_MAX_DELIVERY_RETRIES) { Loading
src/lib/bus/sender_internal.h +4 −2 Original line number Diff line number Diff line Loading @@ -32,10 +32,13 @@ typedef enum { TX_ERROR_WRITE_FAILURE = -3, } tx_error_t; #define SENDER_FD_NOT_IN_USE (-1) /* Metadata for a message in-flight. */ typedef struct { /* Set during initialization only. */ const int id; /* must be SENDER_FD_NOT_IN_USE when not in use */ int fd; /* Should be either TIMEOUT_NOT_YET_SET or a number of seconds, which Loading Loading @@ -69,7 +72,6 @@ typedef struct sender { /* Outgoing message data. */ tx_flag_t tx_flags; tx_info_t tx_info[MAX_CONCURRENT_SENDS]; int16_t tx_info_in_use; int pipes[MAX_CONCURRENT_SENDS][2]; Loading