Loading src/lib/bus/bus_internal_types.h +4 −0 Original line number Diff line number Diff line Loading @@ -46,6 +46,10 @@ typedef struct boxed_msg { bus_msg_cb *cb; void *udata; /* Event timestamps to track timeouts. */ struct timeval tv_send_start; struct timeval tv_send_done; /* Destination filename and message body. */ int fd; SSL *ssl; /* valid pointer or BUS_BOXED_MSG_NO_SSL */ Loading src/lib/bus/listener.c +27 −4 Original line number Diff line number Diff line Loading @@ -737,10 +737,20 @@ static void tick_handler(listener *l) { case RIS_HOLD: /* Check timeout */ if (info->timeout_sec == 1) { struct timeval tv; if (-1 == gettimeofday(&tv, NULL)) { BUS_LOG(b, 0, LOG_LISTENER, "gettimeofday failure in tick_handler!", b->udata); continue; } /* never got a response, but we don't have the callback * either -- the sender will notify about the timeout. */ BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64, "timing out hold info %p", (void*)info); BUS_LOG_SNPRINTF(b, 1, LOG_LISTENER, b->udata, 64, "timing out hold info %p -- <fd:%d, seq_id:%lld> at (%ld.%ld)", (void*)info, info->u.hold.fd, (long long)info->u.hold.seq_id, (long)tv.tv_sec, (long)tv.tv_usec); release_rx_info(l, info); } else { BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64, Loading @@ -764,8 +774,21 @@ static void tick_handler(listener *l) { info->u.expect.error, (void*)info); notify_message_failure(l, info, BUS_SEND_RX_FAILURE); } else if (info->timeout_sec == 1) { BUS_LOG_SNPRINTF(b, 2, LOG_LISTENER, b->udata, 64, "notifying of rx failure -- timeout (info %p)", (void*)info); struct timeval tv; if (-1 == gettimeofday(&tv, NULL)) { BUS_LOG(b, 0, LOG_LISTENER, "gettimeofday failure in tick_handler!", b->udata); continue; } struct boxed_msg *box = info->u.expect.box; BUS_LOG_SNPRINTF(b, 1, LOG_LISTENER, b->udata, 256, "notifying of rx failure -- timeout (info %p) -- " "<fd:%d, seq_id:%lld>, from time (%ld.%ld) to (%ld.%ld) to (%ld.%ld)", (void*)info, box->fd, box->out_seq_id, (long)box->tv_send_start.tv_sec, (long)box->tv_send_start.tv_usec, (long)box->tv_send_done.tv_sec, (long)box->tv_send_done.tv_usec, (long)tv.tv_sec, (long)tv.tv_usec); notify_message_failure(l, info, BUS_SEND_RX_TIMEOUT); } else { BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64, Loading src/lib/bus/sender.c +33 −0 Original line number Diff line number Diff line Loading @@ -142,6 +142,12 @@ bool sender_send_request(struct sender *s, boxed_msg *box) { info->u.enqueue.box = box; info->u.enqueue.timeout_sec = box->timeout_sec; if (-1 == gettimeofday(&box->tv_send_start, NULL)) { BUS_LOG(b, 0, LOG_SENDER, "gettimeofday failure in sender_send_request!", b->udata); return false; } BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64, "sending request on %d: box %p", box->fd, (void*)box); tx_error_t res = commit_event_and_block(s, info); Loading Loading @@ -910,6 +916,11 @@ static void update_sent(struct bus *b, sender *s, tx_info_t *info, ssize_t sent) /* Message has been sent, so release - caller may free it. */ un.box->out_msg = NULL; if (-1 == gettimeofday(&un.box->tv_send_done, NULL)) { BUS_LOG(b, 0, LOG_SENDER, "gettimeofday failure in update_sent!", b->udata); return; } info->state = TIS_RESPONSE_NOTIFY; info->u.notify = un; Loading Loading @@ -1016,6 +1027,7 @@ static void tick_handler(sender *s) { } static void tick_timeout(sender *s, tx_info_t *info) { struct bus *b = s->bus; switch (info->state) { case TIS_REQUEST_WRITE: if (info->u.write.timeout_sec == 1) { /* timed out */ Loading @@ -1026,6 +1038,12 @@ static void tick_timeout(sender *s, tx_info_t *info) { .backpressure = 0, }; info->u.error = ue; BUS_LOG_SNPRINTF(b, 1, LOG_SENDER, b->udata, 128, "timeout during network send (%ld.%ld - %ld.%ld)", (long)ue.box->tv_send_start.tv_sec, (long)ue.box->tv_send_start.tv_usec, (long)ue.box->tv_send_done.tv_sec, (long)ue.box->tv_send_done.tv_usec); notify_message_failure(s, info, BUS_SEND_TX_TIMEOUT); } else { info->u.write.timeout_sec--; Loading @@ -1040,6 +1058,21 @@ static void tick_timeout(sender *s, tx_info_t *info) { .backpressure = 0, }; info->u.error = ue; struct timeval tv; if (-1 == gettimeofday(&tv, NULL)) { BUS_LOG(b, 0, LOG_SENDER, "gettimeofday failure in tick_timeout!", b->udata); } BUS_LOG_SNPRINTF(b, 1, LOG_SENDER, b->udata, 256, "timeout waiting to notify of pending response on <fd:%d, seq_id:%lld>:" "(%ld.%ld - %ld.%ld - %ld.%ld)", info->u.notify.fd, info->u.notify.box->out_seq_id, (long)ue.box->tv_send_start.tv_sec, (long)ue.box->tv_send_start.tv_usec, (long)ue.box->tv_send_done.tv_sec, (long)ue.box->tv_send_done.tv_usec, (long)tv.tv_sec, (long)tv.tv_usec); notify_message_failure(s, info, BUS_SEND_TX_TIMEOUT); } else { info->u.notify.timeout_sec--; Loading Loading
src/lib/bus/bus_internal_types.h +4 −0 Original line number Diff line number Diff line Loading @@ -46,6 +46,10 @@ typedef struct boxed_msg { bus_msg_cb *cb; void *udata; /* Event timestamps to track timeouts. */ struct timeval tv_send_start; struct timeval tv_send_done; /* Destination filename and message body. */ int fd; SSL *ssl; /* valid pointer or BUS_BOXED_MSG_NO_SSL */ Loading
src/lib/bus/listener.c +27 −4 Original line number Diff line number Diff line Loading @@ -737,10 +737,20 @@ static void tick_handler(listener *l) { case RIS_HOLD: /* Check timeout */ if (info->timeout_sec == 1) { struct timeval tv; if (-1 == gettimeofday(&tv, NULL)) { BUS_LOG(b, 0, LOG_LISTENER, "gettimeofday failure in tick_handler!", b->udata); continue; } /* never got a response, but we don't have the callback * either -- the sender will notify about the timeout. */ BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64, "timing out hold info %p", (void*)info); BUS_LOG_SNPRINTF(b, 1, LOG_LISTENER, b->udata, 64, "timing out hold info %p -- <fd:%d, seq_id:%lld> at (%ld.%ld)", (void*)info, info->u.hold.fd, (long long)info->u.hold.seq_id, (long)tv.tv_sec, (long)tv.tv_usec); release_rx_info(l, info); } else { BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64, Loading @@ -764,8 +774,21 @@ static void tick_handler(listener *l) { info->u.expect.error, (void*)info); notify_message_failure(l, info, BUS_SEND_RX_FAILURE); } else if (info->timeout_sec == 1) { BUS_LOG_SNPRINTF(b, 2, LOG_LISTENER, b->udata, 64, "notifying of rx failure -- timeout (info %p)", (void*)info); struct timeval tv; if (-1 == gettimeofday(&tv, NULL)) { BUS_LOG(b, 0, LOG_LISTENER, "gettimeofday failure in tick_handler!", b->udata); continue; } struct boxed_msg *box = info->u.expect.box; BUS_LOG_SNPRINTF(b, 1, LOG_LISTENER, b->udata, 256, "notifying of rx failure -- timeout (info %p) -- " "<fd:%d, seq_id:%lld>, from time (%ld.%ld) to (%ld.%ld) to (%ld.%ld)", (void*)info, box->fd, box->out_seq_id, (long)box->tv_send_start.tv_sec, (long)box->tv_send_start.tv_usec, (long)box->tv_send_done.tv_sec, (long)box->tv_send_done.tv_usec, (long)tv.tv_sec, (long)tv.tv_usec); notify_message_failure(l, info, BUS_SEND_RX_TIMEOUT); } else { BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64, Loading
src/lib/bus/sender.c +33 −0 Original line number Diff line number Diff line Loading @@ -142,6 +142,12 @@ bool sender_send_request(struct sender *s, boxed_msg *box) { info->u.enqueue.box = box; info->u.enqueue.timeout_sec = box->timeout_sec; if (-1 == gettimeofday(&box->tv_send_start, NULL)) { BUS_LOG(b, 0, LOG_SENDER, "gettimeofday failure in sender_send_request!", b->udata); return false; } BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64, "sending request on %d: box %p", box->fd, (void*)box); tx_error_t res = commit_event_and_block(s, info); Loading Loading @@ -910,6 +916,11 @@ static void update_sent(struct bus *b, sender *s, tx_info_t *info, ssize_t sent) /* Message has been sent, so release - caller may free it. */ un.box->out_msg = NULL; if (-1 == gettimeofday(&un.box->tv_send_done, NULL)) { BUS_LOG(b, 0, LOG_SENDER, "gettimeofday failure in update_sent!", b->udata); return; } info->state = TIS_RESPONSE_NOTIFY; info->u.notify = un; Loading Loading @@ -1016,6 +1027,7 @@ static void tick_handler(sender *s) { } static void tick_timeout(sender *s, tx_info_t *info) { struct bus *b = s->bus; switch (info->state) { case TIS_REQUEST_WRITE: if (info->u.write.timeout_sec == 1) { /* timed out */ Loading @@ -1026,6 +1038,12 @@ static void tick_timeout(sender *s, tx_info_t *info) { .backpressure = 0, }; info->u.error = ue; BUS_LOG_SNPRINTF(b, 1, LOG_SENDER, b->udata, 128, "timeout during network send (%ld.%ld - %ld.%ld)", (long)ue.box->tv_send_start.tv_sec, (long)ue.box->tv_send_start.tv_usec, (long)ue.box->tv_send_done.tv_sec, (long)ue.box->tv_send_done.tv_usec); notify_message_failure(s, info, BUS_SEND_TX_TIMEOUT); } else { info->u.write.timeout_sec--; Loading @@ -1040,6 +1058,21 @@ static void tick_timeout(sender *s, tx_info_t *info) { .backpressure = 0, }; info->u.error = ue; struct timeval tv; if (-1 == gettimeofday(&tv, NULL)) { BUS_LOG(b, 0, LOG_SENDER, "gettimeofday failure in tick_timeout!", b->udata); } BUS_LOG_SNPRINTF(b, 1, LOG_SENDER, b->udata, 256, "timeout waiting to notify of pending response on <fd:%d, seq_id:%lld>:" "(%ld.%ld - %ld.%ld - %ld.%ld)", info->u.notify.fd, info->u.notify.box->out_seq_id, (long)ue.box->tv_send_start.tv_sec, (long)ue.box->tv_send_start.tv_usec, (long)ue.box->tv_send_done.tv_sec, (long)ue.box->tv_send_done.tv_usec, (long)tv.tv_sec, (long)tv.tv_usec); notify_message_failure(s, info, BUS_SEND_TX_TIMEOUT); } else { info->u.notify.timeout_sec--; Loading