Loading src/lib/bus/sender.c +16 −14 Original line number Diff line number Diff line Loading @@ -111,11 +111,11 @@ bool sender_register_socket(struct sender *s, int fd, SSL *ssl) { info->u.add_socket.ssl = ssl; BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64, "registering socket %d with SSL %p", fd, (void*)ssl); bool res = commit_event_and_block(s, info); tx_error_t res = commit_event_and_block(s, info); release_tx_info(s, info); BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64, "registering socket %d: res %d", fd, res); return res; return res == TX_ERROR_NONE; } bool sender_remove_socket(struct sender *s, int fd) { Loading @@ -124,9 +124,9 @@ bool sender_remove_socket(struct sender *s, int fd) { info->state = TIS_RM_SOCKET; info->u.rm_socket.fd = fd; bool res = commit_event_and_block(s, info); tx_error_t res = commit_event_and_block(s, info); release_tx_info(s, info); return res; return res == TX_ERROR_NONE; } bool sender_send_request(struct sender *s, boxed_msg *box) { Loading @@ -141,11 +141,11 @@ bool sender_send_request(struct sender *s, boxed_msg *box) { BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64, "sending request on %d: box %p", box->fd, (void*)box); bool res = commit_event_and_block(s, info); tx_error_t res = commit_event_and_block(s, info); BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64, "sending request: releasing tx_info, res %d", res); release_tx_info(s, info); return res; return res == TX_ERROR_NONE; } bool sender_shutdown(struct sender *s) { Loading @@ -157,11 +157,11 @@ bool sender_shutdown(struct sender *s) { info->state = TIS_SHUTDOWN; BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64, "sending shutdown request on %d", info->id); bool res = commit_event_and_block(s, info); tx_error_t res = commit_event_and_block(s, info); release_tx_info(s, info); BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64, "shutdown request: %d", res); return res; return res == TX_ERROR_NONE; } void sender_free(struct sender *s) { Loading Loading @@ -245,8 +245,10 @@ static bool write_commit(struct sender *s, tx_info_t *info) { } } static bool commit_event_and_block(struct sender *s, tx_info_t *info) { if (!write_commit(s, info)) { return false; } static tx_error_t commit_event_and_block(struct sender *s, tx_info_t *info) { if (!write_commit(s, info)) { return TX_ERROR_WRITE_FAILURE; } struct bus *b = s->bus; struct pollfd fds[1]; Loading @@ -265,7 +267,7 @@ static bool commit_event_and_block(struct sender *s, tx_info_t *info) { if (ev & (POLLHUP | POLLERR | POLLNVAL)) { /* We've been hung up on due to a shutdown event. */ close(info->done_pipe); return true; return TX_ERROR_CLOSED; } else if (ev & POLLIN) { uint16_t backpressure = 0; uint8_t buf[sizeof(bool) + sizeof(backpressure)]; Loading @@ -288,7 +290,7 @@ static bool commit_event_and_block(struct sender *s, tx_info_t *info) { BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64, "reading done_pipe: success %d", 1); return success; return success ? TX_ERROR_NONE : TX_ERROR_WRITE_FAILURE; } else if (rd == -1) { if (errno == EINTR) { errno = 0; Loading @@ -297,7 +299,7 @@ static bool commit_event_and_block(struct sender *s, tx_info_t *info) { BUS_LOG_SNPRINTF(b, 10, LOG_SENDER, b->udata, 64, "blocking read on done_pipe: errno %d", errno); errno = 0; return false; return TX_ERROR_CLOSED; } } } else { Loading @@ -310,7 +312,7 @@ static bool commit_event_and_block(struct sender *s, tx_info_t *info) { BUS_LOG_SNPRINTF(b, 10, LOG_SENDER, b->udata, 64, "blocking poll for done_pipe: errno %d", errno); errno = 0; return false; return TX_ERROR_CLOSED; } else { /* Shouldn't happen -- blocking. */ assert(false); Loading src/lib/bus/sender_internal.h +1 −1 Original line number Diff line number Diff line Loading @@ -164,7 +164,7 @@ static tx_info_t *get_free_tx_info(struct sender *s); static void release_tx_info(struct sender *s, tx_info_t *info); static int get_notify_pipe(struct sender *s, int id); static bool write_commit(struct sender *s, tx_info_t *info); static bool commit_event_and_block(struct sender *s, tx_info_t *info); static tx_error_t commit_event_and_block(struct sender *s, tx_info_t *info); static bool register_socket_info(sender *s, int fd, SSL *ssl); static void increment_fd_refcount(sender *s, fd_info *fdi); Loading Loading
src/lib/bus/sender.c +16 −14 Original line number Diff line number Diff line Loading @@ -111,11 +111,11 @@ bool sender_register_socket(struct sender *s, int fd, SSL *ssl) { info->u.add_socket.ssl = ssl; BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64, "registering socket %d with SSL %p", fd, (void*)ssl); bool res = commit_event_and_block(s, info); tx_error_t res = commit_event_and_block(s, info); release_tx_info(s, info); BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64, "registering socket %d: res %d", fd, res); return res; return res == TX_ERROR_NONE; } bool sender_remove_socket(struct sender *s, int fd) { Loading @@ -124,9 +124,9 @@ bool sender_remove_socket(struct sender *s, int fd) { info->state = TIS_RM_SOCKET; info->u.rm_socket.fd = fd; bool res = commit_event_and_block(s, info); tx_error_t res = commit_event_and_block(s, info); release_tx_info(s, info); return res; return res == TX_ERROR_NONE; } bool sender_send_request(struct sender *s, boxed_msg *box) { Loading @@ -141,11 +141,11 @@ bool sender_send_request(struct sender *s, boxed_msg *box) { BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64, "sending request on %d: box %p", box->fd, (void*)box); bool res = commit_event_and_block(s, info); tx_error_t res = commit_event_and_block(s, info); BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64, "sending request: releasing tx_info, res %d", res); release_tx_info(s, info); return res; return res == TX_ERROR_NONE; } bool sender_shutdown(struct sender *s) { Loading @@ -157,11 +157,11 @@ bool sender_shutdown(struct sender *s) { info->state = TIS_SHUTDOWN; BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64, "sending shutdown request on %d", info->id); bool res = commit_event_and_block(s, info); tx_error_t res = commit_event_and_block(s, info); release_tx_info(s, info); BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64, "shutdown request: %d", res); return res; return res == TX_ERROR_NONE; } void sender_free(struct sender *s) { Loading Loading @@ -245,8 +245,10 @@ static bool write_commit(struct sender *s, tx_info_t *info) { } } static bool commit_event_and_block(struct sender *s, tx_info_t *info) { if (!write_commit(s, info)) { return false; } static tx_error_t commit_event_and_block(struct sender *s, tx_info_t *info) { if (!write_commit(s, info)) { return TX_ERROR_WRITE_FAILURE; } struct bus *b = s->bus; struct pollfd fds[1]; Loading @@ -265,7 +267,7 @@ static bool commit_event_and_block(struct sender *s, tx_info_t *info) { if (ev & (POLLHUP | POLLERR | POLLNVAL)) { /* We've been hung up on due to a shutdown event. */ close(info->done_pipe); return true; return TX_ERROR_CLOSED; } else if (ev & POLLIN) { uint16_t backpressure = 0; uint8_t buf[sizeof(bool) + sizeof(backpressure)]; Loading @@ -288,7 +290,7 @@ static bool commit_event_and_block(struct sender *s, tx_info_t *info) { BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64, "reading done_pipe: success %d", 1); return success; return success ? TX_ERROR_NONE : TX_ERROR_WRITE_FAILURE; } else if (rd == -1) { if (errno == EINTR) { errno = 0; Loading @@ -297,7 +299,7 @@ static bool commit_event_and_block(struct sender *s, tx_info_t *info) { BUS_LOG_SNPRINTF(b, 10, LOG_SENDER, b->udata, 64, "blocking read on done_pipe: errno %d", errno); errno = 0; return false; return TX_ERROR_CLOSED; } } } else { Loading @@ -310,7 +312,7 @@ static bool commit_event_and_block(struct sender *s, tx_info_t *info) { BUS_LOG_SNPRINTF(b, 10, LOG_SENDER, b->udata, 64, "blocking poll for done_pipe: errno %d", errno); errno = 0; return false; return TX_ERROR_CLOSED; } else { /* Shouldn't happen -- blocking. */ assert(false); Loading
src/lib/bus/sender_internal.h +1 −1 Original line number Diff line number Diff line Loading @@ -164,7 +164,7 @@ static tx_info_t *get_free_tx_info(struct sender *s); static void release_tx_info(struct sender *s, tx_info_t *info); static int get_notify_pipe(struct sender *s, int id); static bool write_commit(struct sender *s, tx_info_t *info); static bool commit_event_and_block(struct sender *s, tx_info_t *info); static tx_error_t commit_event_and_block(struct sender *s, tx_info_t *info); static bool register_socket_info(sender *s, int fd, SSL *ssl); static void increment_fd_refcount(sender *s, fd_info *fdi); Loading