Loading src/lib/bus/bus.c +2 −2 Original line number Diff line number Diff line Loading @@ -323,8 +323,8 @@ bool bus_send_request(struct bus *b, bus_user_msg *msg) int s_id = sender_id_of_socket(b, msg->fd); struct sender *s = b->senders[s_id]; BUS_LOG_SNPRINTF(b, 3, LOG_SENDING_REQUEST, b->udata, 64, "Sending request <fd:%d, seq_id:%lld>", msg->fd, msg->seq_id); BUS_LOG_SNPRINTF(b, 3-3, LOG_SENDING_REQUEST, b->udata, 64, "Sending request <fd:%d, seq_id:%lld>", msg->fd, (long long)msg->seq_id); bool res = sender_send_request(s, box); BUS_LOG_SNPRINTF(b, 3, LOG_SENDING_REQUEST, b->udata, 64, "...request sent, result %d", res); Loading src/lib/bus/sender.c +15 −2 Original line number Diff line number Diff line Loading @@ -434,7 +434,7 @@ static void cleanup(sender *s) { static bool register_socket_info(sender *s, int fd, SSL *ssl) { if (s->shutdown) { return false; } fd_info *info = malloc(sizeof(*info)); fd_info *info = calloc(1, sizeof(*info)); if (info == NULL) { return false; } Loading Loading @@ -503,6 +503,7 @@ static bool release_socket_info(sender *s, int fd) { fd_info *info = (fd_info *)old; if (info) { assert(fd == info->fd); info->errored = true; /* Expire any pending events on this socket. */ if (info->refcount > 0) { set_error_for_socket(s, fd, TX_ERROR_CLOSED); Loading Loading @@ -574,6 +575,11 @@ static void enqueue_write(struct sender *s, tx_info_t *info) { if (yacht_get(s->fd_hash_table, fd, (void **)&fdi)) { assert(fdi); if (fdi->errored) { set_error_for_socket(s, fd, TX_ERROR_CLOSED); return; } if (fdi->largest_seq_id_seen > out_seq_id && fdi->largest_seq_id_seen > 0) { BUS_LOG_SNPRINTF(b, 0 , LOG_SENDER, b->udata, 64, "suspicious outgoing sequence ID on %d: got %lld, already sent up to %lld", Loading Loading @@ -684,6 +690,11 @@ static void set_error_for_socket(sender *s, int fd, tx_error_t error) { break; } fd_info *fdi = NULL; if (yacht_get(s->fd_hash_table, fd, (void **)&fdi)) { fdi->errored = true; } for (int i = 0; i < MAX_CONCURRENT_SENDS; i++) { tx_flag_t cur = 1 << i; if (s->tx_flags & cur) { Loading Loading @@ -791,6 +802,7 @@ static ssize_t socket_write_plain(sender *s, tx_info_t *info) { BUS_LOG_SNPRINTF(b, 10, LOG_SENDER, b->udata, 64, "write %p to %d, %zd bytes (info %d)", (void*)&msg[sent_size], fd, rem, info->id); ssize_t wrsz = write(fd, &msg[sent_size], rem); if (wrsz == -1) { if (util_is_resumable_io_error(errno)) { Loading @@ -805,6 +817,7 @@ static ssize_t socket_write_plain(sender *s, tx_info_t *info) { return 0; } } else if (wrsz > 0) { update_sent(b, s, info, wrsz); BUS_LOG_SNPRINTF(b, 5, LOG_SENDER, b->udata, 64, "sent: %zd\n", wrsz); Loading src/lib/bus/sender_internal.h +1 −0 Original line number Diff line number Diff line Loading @@ -69,6 +69,7 @@ typedef struct { SSL *ssl; /* SSL handle. Can be NULL. */ int refcount; int64_t largest_seq_id_seen; bool errored; } fd_info; /* Metadata for a message in-flight. */ Loading Loading
src/lib/bus/bus.c +2 −2 Original line number Diff line number Diff line Loading @@ -323,8 +323,8 @@ bool bus_send_request(struct bus *b, bus_user_msg *msg) int s_id = sender_id_of_socket(b, msg->fd); struct sender *s = b->senders[s_id]; BUS_LOG_SNPRINTF(b, 3, LOG_SENDING_REQUEST, b->udata, 64, "Sending request <fd:%d, seq_id:%lld>", msg->fd, msg->seq_id); BUS_LOG_SNPRINTF(b, 3-3, LOG_SENDING_REQUEST, b->udata, 64, "Sending request <fd:%d, seq_id:%lld>", msg->fd, (long long)msg->seq_id); bool res = sender_send_request(s, box); BUS_LOG_SNPRINTF(b, 3, LOG_SENDING_REQUEST, b->udata, 64, "...request sent, result %d", res); Loading
src/lib/bus/sender.c +15 −2 Original line number Diff line number Diff line Loading @@ -434,7 +434,7 @@ static void cleanup(sender *s) { static bool register_socket_info(sender *s, int fd, SSL *ssl) { if (s->shutdown) { return false; } fd_info *info = malloc(sizeof(*info)); fd_info *info = calloc(1, sizeof(*info)); if (info == NULL) { return false; } Loading Loading @@ -503,6 +503,7 @@ static bool release_socket_info(sender *s, int fd) { fd_info *info = (fd_info *)old; if (info) { assert(fd == info->fd); info->errored = true; /* Expire any pending events on this socket. */ if (info->refcount > 0) { set_error_for_socket(s, fd, TX_ERROR_CLOSED); Loading Loading @@ -574,6 +575,11 @@ static void enqueue_write(struct sender *s, tx_info_t *info) { if (yacht_get(s->fd_hash_table, fd, (void **)&fdi)) { assert(fdi); if (fdi->errored) { set_error_for_socket(s, fd, TX_ERROR_CLOSED); return; } if (fdi->largest_seq_id_seen > out_seq_id && fdi->largest_seq_id_seen > 0) { BUS_LOG_SNPRINTF(b, 0 , LOG_SENDER, b->udata, 64, "suspicious outgoing sequence ID on %d: got %lld, already sent up to %lld", Loading Loading @@ -684,6 +690,11 @@ static void set_error_for_socket(sender *s, int fd, tx_error_t error) { break; } fd_info *fdi = NULL; if (yacht_get(s->fd_hash_table, fd, (void **)&fdi)) { fdi->errored = true; } for (int i = 0; i < MAX_CONCURRENT_SENDS; i++) { tx_flag_t cur = 1 << i; if (s->tx_flags & cur) { Loading Loading @@ -791,6 +802,7 @@ static ssize_t socket_write_plain(sender *s, tx_info_t *info) { BUS_LOG_SNPRINTF(b, 10, LOG_SENDER, b->udata, 64, "write %p to %d, %zd bytes (info %d)", (void*)&msg[sent_size], fd, rem, info->id); ssize_t wrsz = write(fd, &msg[sent_size], rem); if (wrsz == -1) { if (util_is_resumable_io_error(errno)) { Loading @@ -805,6 +817,7 @@ static ssize_t socket_write_plain(sender *s, tx_info_t *info) { return 0; } } else if (wrsz > 0) { update_sent(b, s, info, wrsz); BUS_LOG_SNPRINTF(b, 5, LOG_SENDER, b->udata, 64, "sent: %zd\n", wrsz); Loading
src/lib/bus/sender_internal.h +1 −0 Original line number Diff line number Diff line Loading @@ -69,6 +69,7 @@ typedef struct { SSL *ssl; /* SSL handle. Can be NULL. */ int refcount; int64_t largest_seq_id_seen; bool errored; } fd_info; /* Metadata for a message in-flight. */ Loading