Loading src/lib/bus/listener.c +7 −2 Original line number Diff line number Diff line Loading @@ -100,10 +100,10 @@ bool listener_remove_socket(struct listener *l, int fd) { } /* Coefficients for backpressure based on certain conditions. */ #define MSG_BP_1QTR (0) #define MSG_BP_1QTR (0.25) #define MSG_BP_HALF (0.5) #define MSG_BP_3QTR (2.0) #define RX_INFO_BP_1QTR (0) #define RX_INFO_BP_1QTR (0.5) #define RX_INFO_BP_HALF (0.5) #define RX_INFO_BP_3QTR (2.0) #define THREADPOOL_BP (1.0) Loading Loading @@ -134,6 +134,11 @@ static uint16_t get_backpressure(struct listener *l) { uint16_t threadpool_fill_pressure = THREADPOOL_BP * l->upstream_backpressure; struct bus *b = l->bus; BUS_LOG_SNPRINTF(b, 6, LOG_SENDER, b->udata, 64, "lbp: %u, %u (iu %u), %u", msg_fill_pressure, rx_info_fill_pressure, l->rx_info_in_use, threadpool_fill_pressure); return msg_fill_pressure + rx_info_fill_pressure + threadpool_fill_pressure; } Loading src/lib/bus/sender.c +8 −5 Original line number Diff line number Diff line Loading @@ -269,10 +269,13 @@ static bool commit_event_and_block(struct sender *s, tx_info_t *info) { backpressure += (buf[2] << 8); /* Push back if message bus is too busy. */ backpressure >>= 0; // TODO: tuning BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64, backpressure >>= 7; // TODO: further tuning if (backpressure > 0) { BUS_LOG_SNPRINTF(b, 5, LOG_SENDER, b->udata, 64, "reading done_pipe: backpressure %d", backpressure); if (backpressure > 0) { poll(NULL, 0, backpressure); } poll(NULL, 0, backpressure); } BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64, "reading done_pipe: success %d", 1); Loading Loading @@ -843,6 +846,7 @@ static void update_sent(struct bus *b, sender *s, tx_info_t *info, ssize_t sent) } static void notify_caller(sender *s, tx_info_t *info, bool success) { struct bus *b = s->bus; uint16_t bp = 0; switch (info->state) { Loading @@ -866,7 +870,6 @@ static void notify_caller(sender *s, tx_info_t *info, bool success) { ssize_t res = 0; struct bus *b = s->bus; for (;;) { /* for loop because of EINTR */ res = write(pipe_fd, buf, sizeof(buf)); BUS_LOG_SNPRINTF(b, 5, LOG_SENDER, b->udata, 64, Loading Loading
src/lib/bus/listener.c +7 −2 Original line number Diff line number Diff line Loading @@ -100,10 +100,10 @@ bool listener_remove_socket(struct listener *l, int fd) { } /* Coefficients for backpressure based on certain conditions. */ #define MSG_BP_1QTR (0) #define MSG_BP_1QTR (0.25) #define MSG_BP_HALF (0.5) #define MSG_BP_3QTR (2.0) #define RX_INFO_BP_1QTR (0) #define RX_INFO_BP_1QTR (0.5) #define RX_INFO_BP_HALF (0.5) #define RX_INFO_BP_3QTR (2.0) #define THREADPOOL_BP (1.0) Loading Loading @@ -134,6 +134,11 @@ static uint16_t get_backpressure(struct listener *l) { uint16_t threadpool_fill_pressure = THREADPOOL_BP * l->upstream_backpressure; struct bus *b = l->bus; BUS_LOG_SNPRINTF(b, 6, LOG_SENDER, b->udata, 64, "lbp: %u, %u (iu %u), %u", msg_fill_pressure, rx_info_fill_pressure, l->rx_info_in_use, threadpool_fill_pressure); return msg_fill_pressure + rx_info_fill_pressure + threadpool_fill_pressure; } Loading
src/lib/bus/sender.c +8 −5 Original line number Diff line number Diff line Loading @@ -269,10 +269,13 @@ static bool commit_event_and_block(struct sender *s, tx_info_t *info) { backpressure += (buf[2] << 8); /* Push back if message bus is too busy. */ backpressure >>= 0; // TODO: tuning BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64, backpressure >>= 7; // TODO: further tuning if (backpressure > 0) { BUS_LOG_SNPRINTF(b, 5, LOG_SENDER, b->udata, 64, "reading done_pipe: backpressure %d", backpressure); if (backpressure > 0) { poll(NULL, 0, backpressure); } poll(NULL, 0, backpressure); } BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64, "reading done_pipe: success %d", 1); Loading Loading @@ -843,6 +846,7 @@ static void update_sent(struct bus *b, sender *s, tx_info_t *info, ssize_t sent) } static void notify_caller(sender *s, tx_info_t *info, bool success) { struct bus *b = s->bus; uint16_t bp = 0; switch (info->state) { Loading @@ -866,7 +870,6 @@ static void notify_caller(sender *s, tx_info_t *info, bool success) { ssize_t res = 0; struct bus *b = s->bus; for (;;) { /* for loop because of EINTR */ res = write(pipe_fd, buf, sizeof(buf)); BUS_LOG_SNPRINTF(b, 5, LOG_SENDER, b->udata, 64, Loading