Loading src/lib/bus/bus.c +5 −0 Original line number Diff line number Diff line Loading @@ -292,6 +292,11 @@ static boxed_msg *box_msg(struct bus *b, bus_user_msg *msg) { return NULL; } box->timeout_sec = (time_t)msg->timeout_sec; if (box->timeout_sec == 0) { box->timeout_sec = BUS_DEFAULT_TIMEOUT_SEC; } box->out_seq_id = msg->seq_id; box->out_msg_size = msg->msg_size; Loading src/lib/bus/bus_internal_types.h +3 −0 Original line number Diff line number Diff line Loading @@ -39,6 +39,9 @@ typedef struct boxed_msg { * has completed or failed due to timeout / unrecoverable error. */ bus_msg_result_t result; /* Message send timeout. */ time_t timeout_sec; /* Callback and userdata to which the bus_msg_result_t above will be sunk. */ bus_msg_cb *cb; void *udata; Loading src/lib/bus/bus_types.h +4 −0 Original line number Diff line number Diff line Loading @@ -32,6 +32,9 @@ struct boxed_msg; /* Max number of concurrent sends that can be active. */ #define BUS_MAX_CONCURRENT_SENDS 10 /* Default number of seconds before a message response times out. */ #define BUS_DEFAULT_TIMEOUT_SEC 10 #define BUS_LOG(B, LEVEL, EVENT_KEY, MSG, UDATA) \ do { \ struct bus *_b = (B); \ Loading Loading @@ -214,6 +217,7 @@ typedef struct { uint64_t seq_id; uint8_t *msg; size_t msg_size; uint16_t timeout_sec; bus_msg_cb *cb; void *udata; Loading src/lib/bus/listener.c +1 −2 Original line number Diff line number Diff line Loading @@ -34,7 +34,6 @@ #include "atomic.h" #define DEFAULT_READ_BUF_SIZE (1024L * 1024L) #define RESPONSE_TIMEOUT 10 static void retry_delivery(listener *l, rx_info_t *info); Loading Loading @@ -955,7 +954,7 @@ static void expect_response(listener *l, boxed_msg *box) { (void*)info, info->id, (void*)box); assert(info->box == NULL); info->box = box; info->timeout_sec = RESPONSE_TIMEOUT; info->timeout_sec = box->timeout_sec; } static void shutdown(listener *l) { Loading src/lib/bus/sender.c +2 −1 Original line number Diff line number Diff line Loading @@ -137,6 +137,7 @@ bool sender_send_request(struct sender *s, boxed_msg *box) { info->state = TIS_REQUEST_ENQUEUE; info->u.enqueue.fd = box->fd; info->u.enqueue.box = box; info->u.enqueue.timeout_sec = box->timeout_sec; BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64, "sending request on %d: box %p", box->fd, (void*)box); Loading Loading @@ -568,7 +569,7 @@ static void enqueue_write(struct sender *s, tx_info_t *info) { struct u_write uw = { .fd = info->u.enqueue.fd, .timeout_sec = TX_TIMEOUT, .timeout_sec = info->u.enqueue.timeout_sec, .box = info->u.enqueue.box, .fdi = fdi, }; Loading Loading
src/lib/bus/bus.c +5 −0 Original line number Diff line number Diff line Loading @@ -292,6 +292,11 @@ static boxed_msg *box_msg(struct bus *b, bus_user_msg *msg) { return NULL; } box->timeout_sec = (time_t)msg->timeout_sec; if (box->timeout_sec == 0) { box->timeout_sec = BUS_DEFAULT_TIMEOUT_SEC; } box->out_seq_id = msg->seq_id; box->out_msg_size = msg->msg_size; Loading
src/lib/bus/bus_internal_types.h +3 −0 Original line number Diff line number Diff line Loading @@ -39,6 +39,9 @@ typedef struct boxed_msg { * has completed or failed due to timeout / unrecoverable error. */ bus_msg_result_t result; /* Message send timeout. */ time_t timeout_sec; /* Callback and userdata to which the bus_msg_result_t above will be sunk. */ bus_msg_cb *cb; void *udata; Loading
src/lib/bus/bus_types.h +4 −0 Original line number Diff line number Diff line Loading @@ -32,6 +32,9 @@ struct boxed_msg; /* Max number of concurrent sends that can be active. */ #define BUS_MAX_CONCURRENT_SENDS 10 /* Default number of seconds before a message response times out. */ #define BUS_DEFAULT_TIMEOUT_SEC 10 #define BUS_LOG(B, LEVEL, EVENT_KEY, MSG, UDATA) \ do { \ struct bus *_b = (B); \ Loading Loading @@ -214,6 +217,7 @@ typedef struct { uint64_t seq_id; uint8_t *msg; size_t msg_size; uint16_t timeout_sec; bus_msg_cb *cb; void *udata; Loading
src/lib/bus/listener.c +1 −2 Original line number Diff line number Diff line Loading @@ -34,7 +34,6 @@ #include "atomic.h" #define DEFAULT_READ_BUF_SIZE (1024L * 1024L) #define RESPONSE_TIMEOUT 10 static void retry_delivery(listener *l, rx_info_t *info); Loading Loading @@ -955,7 +954,7 @@ static void expect_response(listener *l, boxed_msg *box) { (void*)info, info->id, (void*)box); assert(info->box == NULL); info->box = box; info->timeout_sec = RESPONSE_TIMEOUT; info->timeout_sec = box->timeout_sec; } static void shutdown(listener *l) { Loading
src/lib/bus/sender.c +2 −1 Original line number Diff line number Diff line Loading @@ -137,6 +137,7 @@ bool sender_send_request(struct sender *s, boxed_msg *box) { info->state = TIS_REQUEST_ENQUEUE; info->u.enqueue.fd = box->fd; info->u.enqueue.box = box; info->u.enqueue.timeout_sec = box->timeout_sec; BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64, "sending request on %d: box %p", box->fd, (void*)box); Loading Loading @@ -568,7 +569,7 @@ static void enqueue_write(struct sender *s, tx_info_t *info) { struct u_write uw = { .fd = info->u.enqueue.fd, .timeout_sec = TX_TIMEOUT, .timeout_sec = info->u.enqueue.timeout_sec, .box = info->u.enqueue.box, .fdi = fdi, }; Loading