Loading src/lib/bus/listener.c +18 −8 Original line number Diff line number Diff line Loading @@ -79,27 +79,37 @@ bool listener_close_socket(struct listener *l, int fd) { return push_message(l, msg); } /* Coefficients for backpressure based on certain conditions. */ #define MSG_BP_HALF (0.5) #define MSG_BP_3QTR (2.0) #define RX_INFO_BP_HALF (0.5) #define RX_INFO_BP_3QTR (2.0) #define THREADPOOL_BP (1.0) static uint16_t get_backpressure(struct listener *l) { uint16_t msg_fill_pressure = 0; if (l->msgs_in_use < MAX_QUEUE_MESSAGES / 2) { msg_fill_pressure = 0; } else if (l->msgs_in_use < 0.75 * MAX_QUEUE_MESSAGES) { msg_fill_pressure = l->msgs_in_use / 2; msg_fill_pressure = MSG_BP_HALF * l->msgs_in_use; } else { msg_fill_pressure = 2*l->msgs_in_use; msg_fill_pressure = MSG_BP_3QTR * l->msgs_in_use; } uint16_t rx_info_fill_pressure = 0; if (l->rx_info_in_use < MAX_PENDING_MESSAGES / 2) { rx_info_fill_pressure = 0; } else if (l->rx_info_in_use < 0.75 * MAX_PENDING_MESSAGES) { rx_info_fill_pressure = l->rx_info_in_use / 2; rx_info_fill_pressure = RX_INFO_BP_HALF * l->rx_info_in_use; } else { rx_info_fill_pressure = 2*l->rx_info_in_use; rx_info_fill_pressure = RX_INFO_BP_3QTR * l->rx_info_in_use; } return msg_fill_pressure + rx_info_fill_pressure + l->upstream_backpressure; uint16_t threadpool_fill_pressure = THREADPOOL_BP * l->upstream_backpressure; return msg_fill_pressure + rx_info_fill_pressure + threadpool_fill_pressure; } static void observe_backpressure(listener *l, size_t backpressure) { Loading Loading @@ -285,6 +295,7 @@ static void attempt_recv(listener *l, int available) { "reading %zd bytes from socket (buf is %zd)", ci->to_read_size, ci->read_buf_size); assert(ci->read_buf_size >= ci->to_read_size); ssize_t size = read(fd->fd, ci->read_buf, ci->to_read_size); if (size == -1) { Loading Loading @@ -341,7 +352,6 @@ static void attempt_recv(listener *l, int available) { static rx_info_t *find_info_by_sequence_id(listener *l, int fd, int64_t seq_id) { for (int i = 0; i < MAX_PENDING_MESSAGES; i++) { rx_info_t *info = &l->rx_info[i]; Loading @@ -351,7 +361,7 @@ static rx_info_t *find_info_by_sequence_id(listener *l, struct boxed_msg *box = info->box; struct bus *b = l->bus; BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, BUS_LOG_SNPRINTF(b, 4, LOG_MEMORY, b->udata, 128, "find_info_by_sequence_id: info (%p) at +%d [s %d]: box is %p", info, info->id, info->error, box); Loading Loading @@ -469,7 +479,7 @@ static void tick_handler(listener *l) { "notifying of rx failure -- timeout (info %p)", info); notify_message_failure(l, info, BUS_SEND_RX_TIMEOUT); } else { BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 64, BUS_LOG_SNPRINTF(b, 2, LOG_LISTENER, b->udata, 64, "decrementing countdown on info %p [%u]: %ld", info, info->id, info->timeout_sec - 1); info->timeout_sec--; Loading Loading
src/lib/bus/listener.c +18 −8 Original line number Diff line number Diff line Loading @@ -79,27 +79,37 @@ bool listener_close_socket(struct listener *l, int fd) { return push_message(l, msg); } /* Coefficients for backpressure based on certain conditions. */ #define MSG_BP_HALF (0.5) #define MSG_BP_3QTR (2.0) #define RX_INFO_BP_HALF (0.5) #define RX_INFO_BP_3QTR (2.0) #define THREADPOOL_BP (1.0) static uint16_t get_backpressure(struct listener *l) { uint16_t msg_fill_pressure = 0; if (l->msgs_in_use < MAX_QUEUE_MESSAGES / 2) { msg_fill_pressure = 0; } else if (l->msgs_in_use < 0.75 * MAX_QUEUE_MESSAGES) { msg_fill_pressure = l->msgs_in_use / 2; msg_fill_pressure = MSG_BP_HALF * l->msgs_in_use; } else { msg_fill_pressure = 2*l->msgs_in_use; msg_fill_pressure = MSG_BP_3QTR * l->msgs_in_use; } uint16_t rx_info_fill_pressure = 0; if (l->rx_info_in_use < MAX_PENDING_MESSAGES / 2) { rx_info_fill_pressure = 0; } else if (l->rx_info_in_use < 0.75 * MAX_PENDING_MESSAGES) { rx_info_fill_pressure = l->rx_info_in_use / 2; rx_info_fill_pressure = RX_INFO_BP_HALF * l->rx_info_in_use; } else { rx_info_fill_pressure = 2*l->rx_info_in_use; rx_info_fill_pressure = RX_INFO_BP_3QTR * l->rx_info_in_use; } return msg_fill_pressure + rx_info_fill_pressure + l->upstream_backpressure; uint16_t threadpool_fill_pressure = THREADPOOL_BP * l->upstream_backpressure; return msg_fill_pressure + rx_info_fill_pressure + threadpool_fill_pressure; } static void observe_backpressure(listener *l, size_t backpressure) { Loading Loading @@ -285,6 +295,7 @@ static void attempt_recv(listener *l, int available) { "reading %zd bytes from socket (buf is %zd)", ci->to_read_size, ci->read_buf_size); assert(ci->read_buf_size >= ci->to_read_size); ssize_t size = read(fd->fd, ci->read_buf, ci->to_read_size); if (size == -1) { Loading Loading @@ -341,7 +352,6 @@ static void attempt_recv(listener *l, int available) { static rx_info_t *find_info_by_sequence_id(listener *l, int fd, int64_t seq_id) { for (int i = 0; i < MAX_PENDING_MESSAGES; i++) { rx_info_t *info = &l->rx_info[i]; Loading @@ -351,7 +361,7 @@ static rx_info_t *find_info_by_sequence_id(listener *l, struct boxed_msg *box = info->box; struct bus *b = l->bus; BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, BUS_LOG_SNPRINTF(b, 4, LOG_MEMORY, b->udata, 128, "find_info_by_sequence_id: info (%p) at +%d [s %d]: box is %p", info, info->id, info->error, box); Loading Loading @@ -469,7 +479,7 @@ static void tick_handler(listener *l) { "notifying of rx failure -- timeout (info %p)", info); notify_message_failure(l, info, BUS_SEND_RX_TIMEOUT); } else { BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 64, BUS_LOG_SNPRINTF(b, 2, LOG_LISTENER, b->udata, 64, "decrementing countdown on info %p [%u]: %ld", info, info->id, info->timeout_sec - 1); info->timeout_sec--; Loading