Loading src/lib/bus/listener_cmd.c +2 −2 Original line number Diff line number Diff line Loading @@ -234,8 +234,8 @@ static void expect_response(listener *l, struct boxed_msg *box) { (void *)box, box->fd, (long long)box->out_seq_id); /* If there's a pending HOLD message, convert it. */ rx_info_t *info = listener_helper_get_hold_rx_info(l, box->fd, box->out_seq_id); if (info) { rx_info_t *info = listener_helper_find_info_by_sequence_id(l, box->fd, box->out_seq_id); if (info && info->state == RIS_HOLD) { BUS_ASSERT(b, b->udata, info->state == RIS_HOLD); if (info->u.hold.has_result) { bus_unpack_cb_res_t result = info->u.hold.result; Loading src/lib/bus/listener_helper.c +7 −13 Original line number Diff line number Diff line Loading @@ -24,6 +24,10 @@ #include <assert.h> #ifdef TEST uint8_t msg_buf[sizeof(uint8_t)]; #endif listener_msg *listener_helper_get_free_msg(listener *l) { struct bus *b = l->bus; Loading Loading @@ -65,7 +69,9 @@ bool listener_helper_push_message(struct listener *l, listener_msg *msg, int *re struct bus *b = l->bus; BUS_ASSERT(b, b->udata, msg); uint8_t msg_buf[sizeof(msg->id)]; #ifndef TEST uint8_t msg_buf[sizeof(uint8_t)]; #endif msg_buf[0] = msg->id; if (reply_fd) { *reply_fd = msg->pipes[0]; } Loading Loading @@ -116,18 +122,6 @@ rx_info_t *listener_helper_get_free_rx_info(struct listener *l) { } } rx_info_t *listener_helper_get_hold_rx_info(listener *l, int fd, int64_t seq_id) { for (int i = 0; i <= l->rx_info_max_used; i++) { rx_info_t *info = &l->rx_info[i]; if (info->state == RIS_HOLD && info->u.hold.fd == fd && info->u.hold.seq_id == seq_id) { return info; } } return NULL; } rx_info_t *listener_helper_find_info_by_sequence_id(listener *l, int fd, int64_t seq_id) { struct bus *b = l->bus; Loading src/lib/bus/listener_helper.h +0 −1 Original line number Diff line number Diff line Loading @@ -28,7 +28,6 @@ listener_msg *listener_helper_get_free_msg(listener *l); bool listener_helper_push_message(struct listener *l, listener_msg *msg, int *reply_fd); rx_info_t *listener_helper_get_free_rx_info(listener *l); rx_info_t *listener_helper_get_hold_rx_info(listener *l, int fd, int64_t seq_id); rx_info_t *listener_helper_find_info_by_sequence_id(listener *l, int fd, int64_t seq_id); Loading src/lib/bus/listener_internal_types.h +3 −0 Original line number Diff line number Diff line Loading @@ -68,6 +68,9 @@ typedef struct listener_msg { } u; } listener_msg; /* How long the listener should wait for responses before blocking. */ #define LISTENER_TASK_TIMEOUT_DELAY 100 typedef enum { RIS_HOLD = 1, RIS_EXPECT = 2, Loading src/lib/bus/listener_task.c +64 −41 Original line number Diff line number Diff line Loading @@ -27,20 +27,31 @@ #include "listener_io.h" #include "atomic.h" #define TIMEOUT_DELAY 100 #ifdef TEST struct timeval now; struct timeval cur; size_t backpressure = 0; int poll_res = 0; #define WHILE if #else #define WHILE while #endif static void tick_handler(listener *l); static void clean_up_completed_info(listener *l, rx_info_t *info); static void retry_delivery(listener *l, rx_info_t *info); static void observe_backpressure(listener *l, size_t backpressure); void *ListenerTask_MainLoop(void *arg) { listener *self = (listener *)arg; assert(self); struct bus *b = self->bus; struct timeval tv; if (!util_timestamp(&tv, true)) { BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 64, "timestamp failure: %d", errno); } time_t last_sec = tv.tv_sec; #ifndef TEST struct timeval now; #endif time_t last_sec = (time_t)-1; // always trigger first time /* The listener thread has full control over its execution -- the * only thing other threads can do is reserve messages from l->msgs, Loading @@ -49,23 +60,26 @@ void *ListenerTask_MainLoop(void *arg) { * communication is managed at the command interface, so it doesn't * need any internal locking. */ while (self->shutdown_notify_fd == LISTENER_NO_FD) { if (!util_timestamp(&tv, true)) { WHILE (self->shutdown_notify_fd == LISTENER_NO_FD) { if (!util_timestamp(&now, true)) { BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 64, "timestamp failure: %d", errno); } time_t cur_sec = tv.tv_sec; time_t cur_sec = now.tv_sec; if (cur_sec != last_sec) { tick_handler(self); last_sec = cur_sec; } int delay = (self->is_idle ? -1 : TIMEOUT_DELAY); int res = syscall_poll(self->fds, self->tracked_fds + INCOMING_MSG_PIPE, delay); BUS_LOG_SNPRINTF(b, (res == 0 ? 6 : 4), LOG_LISTENER, b->udata, 64, "poll res %d", res); int delay = (self->is_idle ? -1 : LISTENER_TASK_TIMEOUT_DELAY); #ifndef TEST int poll_res = 0; #endif poll_res = syscall_poll(self->fds, self->tracked_fds + INCOMING_MSG_PIPE, delay); BUS_LOG_SNPRINTF(b, (poll_res == 0 ? 6 : 4), LOG_LISTENER, b->udata, 64, "poll res %d", poll_res); if (res < 0) { if (poll_res < 0) { if (util_is_resumable_io_error(errno)) { errno = 0; } else { Loading @@ -73,14 +87,16 @@ void *ListenerTask_MainLoop(void *arg) { * or FDS is a bad pointer. */ BUS_ASSERT(b, b->udata, false); } } else if (res > 0) { ListenerCmd_CheckIncomingMessages(self, &res); ListenerIO_AttemptRecv(self, res); } else if (poll_res > 0) { ListenerCmd_CheckIncomingMessages(self, &poll_res); ListenerIO_AttemptRecv(self, poll_res); } else { /* nothing to do */ } } /* (This will always be true, except when testing.) */ if (self->shutdown_notify_fd != LISTENER_NO_FD) { BUS_LOG(b, 3, LOG_LISTENER, "shutting down", b->udata); if (self->tracked_fds > 0) { Loading @@ -90,6 +106,7 @@ void *ListenerTask_MainLoop(void *arg) { ListenerCmd_NotifyCaller(self, self->shutdown_notify_fd); self->shutdown_notify_fd = LISTENER_SHUTDOWN_COMPLETE_FD; } return NULL; } Loading Loading @@ -120,8 +137,10 @@ static void tick_handler(listener *l) { any_work = true; /* Check timeout */ if (info->timeout_sec == 1) { struct timeval tv; if (!util_timestamp(&tv, false)) { #ifndef TEST struct timeval cur; #endif if (!util_timestamp(&cur, false)) { BUS_LOG(b, 0, LOG_LISTENER, "gettimeofday failure in tick_handler!", b->udata); continue; Loading @@ -132,7 +151,7 @@ static void tick_handler(listener *l) { BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 128, "timing out hold info %p -- <fd:%d, seq_id:%lld> at (%ld.%ld)", (void*)info, info->u.hold.fd, (long long)info->u.hold.seq_id, (long)tv.tv_sec, (long)tv.tv_usec); (long)cur.tv_sec, (long)cur.tv_usec); ListenerTask_ReleaseRXInfo(l, info); } else { Loading @@ -158,8 +177,10 @@ static void tick_handler(listener *l) { info->u.expect.error, (void*)info); ListenerTask_NotifyMessageFailure(l, info, BUS_SEND_RX_FAILURE); } else if (info->timeout_sec == 1) { struct timeval tv; if (!util_timestamp(&tv, false)) { #ifndef TEST struct timeval cur; #endif if (!util_timestamp(&cur, false)) { BUS_LOG(b, 0, LOG_LISTENER, "gettimeofday failure in tick_handler!", b->udata); continue; Loading @@ -171,7 +192,8 @@ static void tick_handler(listener *l) { (void*)info, box->fd, (long long)box->out_seq_id, (long)box->tv_send_start.tv_sec, (long)box->tv_send_start.tv_usec, (long)box->tv_send_done.tv_sec, (long)box->tv_send_done.tv_usec, (long)tv.tv_sec, (long)tv.tv_usec); (long)cur.tv_sec, (long)cur.tv_usec); (void)box; ListenerTask_NotifyMessageFailure(l, info, BUS_SEND_RX_TIMEOUT); } else { Loading Loading @@ -228,7 +250,9 @@ static void retry_delivery(listener *l, rx_info_t *info) { "releasing box %p at line %d", (void*)box, __LINE__); BUS_ASSERT(b, b->udata, box->result.status == BUS_SEND_SUCCESS); #ifndef TEST size_t backpressure = 0; #endif if (bus_process_boxed_message(l->bus, box, &backpressure)) { BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "successfully delivered box %p (seq_id %lld) from info %d at line %d (retry)", Loading @@ -251,7 +275,10 @@ static void clean_up_completed_info(listener *l, rx_info_t *info) { BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "info %p, box is %p at line %d", (void*)info, (void*)info->u.expect.box, __LINE__); #ifndef TEST size_t backpressure = 0; #endif if (info->u.expect.box) { struct boxed_msg *box = info->u.expect.box; if (box->result.status != BUS_SEND_SUCCESS) { Loading Loading @@ -285,7 +312,9 @@ static void clean_up_completed_info(listener *l, rx_info_t *info) { void ListenerTask_NotifyMessageFailure(listener *l, rx_info_t *info, bus_send_status_t status) { #ifndef TEST size_t backpressure = 0; #endif struct bus *b = l->bus; BUS_ASSERT(b, b->udata, info->state == RIS_EXPECT); BUS_ASSERT(b, b->udata, info->u.expect.box); Loading Loading @@ -463,7 +492,9 @@ void ListenerTask_AttemptDelivery(listener *l, struct rx_info_t *info) { result->u.response.seq_id = seq_id; result->u.response.opaque_msg = opaque_msg; #ifndef TEST size_t backpressure = 0; #endif if (bus_process_boxed_message(b, box, &backpressure)) { /* success */ BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 256, Loading @@ -487,14 +518,6 @@ static void observe_backpressure(listener *l, size_t backpressure) { l->upstream_backpressure = (cur + backpressure) / 2; } /* Coefficients for backpressure based on certain conditions. */ #define MSG_BP_1QTR (0.25) #define MSG_BP_HALF (0.5) #define MSG_BP_3QTR (2.0) #define RX_INFO_BP_1QTR (0.5) #define RX_INFO_BP_HALF (0.5) #define RX_INFO_BP_3QTR (2.0) #define THREADPOOL_BP (1.0) uint16_t ListenerTask_GetBackpressure(struct listener *l) { uint16_t msg_fill_pressure = 0; Loading Loading
src/lib/bus/listener_cmd.c +2 −2 Original line number Diff line number Diff line Loading @@ -234,8 +234,8 @@ static void expect_response(listener *l, struct boxed_msg *box) { (void *)box, box->fd, (long long)box->out_seq_id); /* If there's a pending HOLD message, convert it. */ rx_info_t *info = listener_helper_get_hold_rx_info(l, box->fd, box->out_seq_id); if (info) { rx_info_t *info = listener_helper_find_info_by_sequence_id(l, box->fd, box->out_seq_id); if (info && info->state == RIS_HOLD) { BUS_ASSERT(b, b->udata, info->state == RIS_HOLD); if (info->u.hold.has_result) { bus_unpack_cb_res_t result = info->u.hold.result; Loading
src/lib/bus/listener_helper.c +7 −13 Original line number Diff line number Diff line Loading @@ -24,6 +24,10 @@ #include <assert.h> #ifdef TEST uint8_t msg_buf[sizeof(uint8_t)]; #endif listener_msg *listener_helper_get_free_msg(listener *l) { struct bus *b = l->bus; Loading Loading @@ -65,7 +69,9 @@ bool listener_helper_push_message(struct listener *l, listener_msg *msg, int *re struct bus *b = l->bus; BUS_ASSERT(b, b->udata, msg); uint8_t msg_buf[sizeof(msg->id)]; #ifndef TEST uint8_t msg_buf[sizeof(uint8_t)]; #endif msg_buf[0] = msg->id; if (reply_fd) { *reply_fd = msg->pipes[0]; } Loading Loading @@ -116,18 +122,6 @@ rx_info_t *listener_helper_get_free_rx_info(struct listener *l) { } } rx_info_t *listener_helper_get_hold_rx_info(listener *l, int fd, int64_t seq_id) { for (int i = 0; i <= l->rx_info_max_used; i++) { rx_info_t *info = &l->rx_info[i]; if (info->state == RIS_HOLD && info->u.hold.fd == fd && info->u.hold.seq_id == seq_id) { return info; } } return NULL; } rx_info_t *listener_helper_find_info_by_sequence_id(listener *l, int fd, int64_t seq_id) { struct bus *b = l->bus; Loading
src/lib/bus/listener_helper.h +0 −1 Original line number Diff line number Diff line Loading @@ -28,7 +28,6 @@ listener_msg *listener_helper_get_free_msg(listener *l); bool listener_helper_push_message(struct listener *l, listener_msg *msg, int *reply_fd); rx_info_t *listener_helper_get_free_rx_info(listener *l); rx_info_t *listener_helper_get_hold_rx_info(listener *l, int fd, int64_t seq_id); rx_info_t *listener_helper_find_info_by_sequence_id(listener *l, int fd, int64_t seq_id); Loading
src/lib/bus/listener_internal_types.h +3 −0 Original line number Diff line number Diff line Loading @@ -68,6 +68,9 @@ typedef struct listener_msg { } u; } listener_msg; /* How long the listener should wait for responses before blocking. */ #define LISTENER_TASK_TIMEOUT_DELAY 100 typedef enum { RIS_HOLD = 1, RIS_EXPECT = 2, Loading
src/lib/bus/listener_task.c +64 −41 Original line number Diff line number Diff line Loading @@ -27,20 +27,31 @@ #include "listener_io.h" #include "atomic.h" #define TIMEOUT_DELAY 100 #ifdef TEST struct timeval now; struct timeval cur; size_t backpressure = 0; int poll_res = 0; #define WHILE if #else #define WHILE while #endif static void tick_handler(listener *l); static void clean_up_completed_info(listener *l, rx_info_t *info); static void retry_delivery(listener *l, rx_info_t *info); static void observe_backpressure(listener *l, size_t backpressure); void *ListenerTask_MainLoop(void *arg) { listener *self = (listener *)arg; assert(self); struct bus *b = self->bus; struct timeval tv; if (!util_timestamp(&tv, true)) { BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 64, "timestamp failure: %d", errno); } time_t last_sec = tv.tv_sec; #ifndef TEST struct timeval now; #endif time_t last_sec = (time_t)-1; // always trigger first time /* The listener thread has full control over its execution -- the * only thing other threads can do is reserve messages from l->msgs, Loading @@ -49,23 +60,26 @@ void *ListenerTask_MainLoop(void *arg) { * communication is managed at the command interface, so it doesn't * need any internal locking. */ while (self->shutdown_notify_fd == LISTENER_NO_FD) { if (!util_timestamp(&tv, true)) { WHILE (self->shutdown_notify_fd == LISTENER_NO_FD) { if (!util_timestamp(&now, true)) { BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 64, "timestamp failure: %d", errno); } time_t cur_sec = tv.tv_sec; time_t cur_sec = now.tv_sec; if (cur_sec != last_sec) { tick_handler(self); last_sec = cur_sec; } int delay = (self->is_idle ? -1 : TIMEOUT_DELAY); int res = syscall_poll(self->fds, self->tracked_fds + INCOMING_MSG_PIPE, delay); BUS_LOG_SNPRINTF(b, (res == 0 ? 6 : 4), LOG_LISTENER, b->udata, 64, "poll res %d", res); int delay = (self->is_idle ? -1 : LISTENER_TASK_TIMEOUT_DELAY); #ifndef TEST int poll_res = 0; #endif poll_res = syscall_poll(self->fds, self->tracked_fds + INCOMING_MSG_PIPE, delay); BUS_LOG_SNPRINTF(b, (poll_res == 0 ? 6 : 4), LOG_LISTENER, b->udata, 64, "poll res %d", poll_res); if (res < 0) { if (poll_res < 0) { if (util_is_resumable_io_error(errno)) { errno = 0; } else { Loading @@ -73,14 +87,16 @@ void *ListenerTask_MainLoop(void *arg) { * or FDS is a bad pointer. */ BUS_ASSERT(b, b->udata, false); } } else if (res > 0) { ListenerCmd_CheckIncomingMessages(self, &res); ListenerIO_AttemptRecv(self, res); } else if (poll_res > 0) { ListenerCmd_CheckIncomingMessages(self, &poll_res); ListenerIO_AttemptRecv(self, poll_res); } else { /* nothing to do */ } } /* (This will always be true, except when testing.) */ if (self->shutdown_notify_fd != LISTENER_NO_FD) { BUS_LOG(b, 3, LOG_LISTENER, "shutting down", b->udata); if (self->tracked_fds > 0) { Loading @@ -90,6 +106,7 @@ void *ListenerTask_MainLoop(void *arg) { ListenerCmd_NotifyCaller(self, self->shutdown_notify_fd); self->shutdown_notify_fd = LISTENER_SHUTDOWN_COMPLETE_FD; } return NULL; } Loading Loading @@ -120,8 +137,10 @@ static void tick_handler(listener *l) { any_work = true; /* Check timeout */ if (info->timeout_sec == 1) { struct timeval tv; if (!util_timestamp(&tv, false)) { #ifndef TEST struct timeval cur; #endif if (!util_timestamp(&cur, false)) { BUS_LOG(b, 0, LOG_LISTENER, "gettimeofday failure in tick_handler!", b->udata); continue; Loading @@ -132,7 +151,7 @@ static void tick_handler(listener *l) { BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 128, "timing out hold info %p -- <fd:%d, seq_id:%lld> at (%ld.%ld)", (void*)info, info->u.hold.fd, (long long)info->u.hold.seq_id, (long)tv.tv_sec, (long)tv.tv_usec); (long)cur.tv_sec, (long)cur.tv_usec); ListenerTask_ReleaseRXInfo(l, info); } else { Loading @@ -158,8 +177,10 @@ static void tick_handler(listener *l) { info->u.expect.error, (void*)info); ListenerTask_NotifyMessageFailure(l, info, BUS_SEND_RX_FAILURE); } else if (info->timeout_sec == 1) { struct timeval tv; if (!util_timestamp(&tv, false)) { #ifndef TEST struct timeval cur; #endif if (!util_timestamp(&cur, false)) { BUS_LOG(b, 0, LOG_LISTENER, "gettimeofday failure in tick_handler!", b->udata); continue; Loading @@ -171,7 +192,8 @@ static void tick_handler(listener *l) { (void*)info, box->fd, (long long)box->out_seq_id, (long)box->tv_send_start.tv_sec, (long)box->tv_send_start.tv_usec, (long)box->tv_send_done.tv_sec, (long)box->tv_send_done.tv_usec, (long)tv.tv_sec, (long)tv.tv_usec); (long)cur.tv_sec, (long)cur.tv_usec); (void)box; ListenerTask_NotifyMessageFailure(l, info, BUS_SEND_RX_TIMEOUT); } else { Loading Loading @@ -228,7 +250,9 @@ static void retry_delivery(listener *l, rx_info_t *info) { "releasing box %p at line %d", (void*)box, __LINE__); BUS_ASSERT(b, b->udata, box->result.status == BUS_SEND_SUCCESS); #ifndef TEST size_t backpressure = 0; #endif if (bus_process_boxed_message(l->bus, box, &backpressure)) { BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "successfully delivered box %p (seq_id %lld) from info %d at line %d (retry)", Loading @@ -251,7 +275,10 @@ static void clean_up_completed_info(listener *l, rx_info_t *info) { BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "info %p, box is %p at line %d", (void*)info, (void*)info->u.expect.box, __LINE__); #ifndef TEST size_t backpressure = 0; #endif if (info->u.expect.box) { struct boxed_msg *box = info->u.expect.box; if (box->result.status != BUS_SEND_SUCCESS) { Loading Loading @@ -285,7 +312,9 @@ static void clean_up_completed_info(listener *l, rx_info_t *info) { void ListenerTask_NotifyMessageFailure(listener *l, rx_info_t *info, bus_send_status_t status) { #ifndef TEST size_t backpressure = 0; #endif struct bus *b = l->bus; BUS_ASSERT(b, b->udata, info->state == RIS_EXPECT); BUS_ASSERT(b, b->udata, info->u.expect.box); Loading Loading @@ -463,7 +492,9 @@ void ListenerTask_AttemptDelivery(listener *l, struct rx_info_t *info) { result->u.response.seq_id = seq_id; result->u.response.opaque_msg = opaque_msg; #ifndef TEST size_t backpressure = 0; #endif if (bus_process_boxed_message(b, box, &backpressure)) { /* success */ BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 256, Loading @@ -487,14 +518,6 @@ static void observe_backpressure(listener *l, size_t backpressure) { l->upstream_backpressure = (cur + backpressure) / 2; } /* Coefficients for backpressure based on certain conditions. */ #define MSG_BP_1QTR (0.25) #define MSG_BP_HALF (0.5) #define MSG_BP_3QTR (2.0) #define RX_INFO_BP_1QTR (0.5) #define RX_INFO_BP_HALF (0.5) #define RX_INFO_BP_3QTR (2.0) #define THREADPOOL_BP (1.0) uint16_t ListenerTask_GetBackpressure(struct listener *l) { uint16_t msg_fill_pressure = 0; Loading