Loading src/lib/bus/.gitignore +1 −0 Original line number Diff line number Diff line Loading @@ -4,3 +4,4 @@ bus_example echosrv notes test_casq test_yacht src/lib/bus/Makefile +13 −5 Original line number Diff line number Diff line SOCKET99_PATH= ../../../vendor/socket99 THREADPOOL_PATH= ../threadpool OPT= -O0 OPT= -O3 LIB_INC = -I${SOCKET99_PATH} -I${THREADPOOL_PATH} CFLAGS += -std=c99 ${OPT} -g ${LIB_INC} LDFLAGS += -L. -lcasq -L${SOCKET99_PATH} -lsocket99 -L${THREADPOOL_PATH} -lthreadpool Loading @@ -11,15 +11,17 @@ BUS_OBJS = \ listener.o \ sender.o \ util.o \ yacht.o \ ECHOSRV_OBJS = \ echosrv.o \ util.o \ all: bus.png libcasq.a test_casq echosrv bus_example all: bus.png libcasq.a test_casq test_yacht echosrv bus_example test: test_casq test: test_casq test_yacht ./test_casq ./test_yacht %.png: %.dot dot -Tpng -o $@ $^ Loading @@ -30,6 +32,9 @@ libcasq.a: casq.o test_casq: libcasq.a test_casq.o ${CC} -o $@ ${LDFLAGS} $^ test_yacht: test_yacht.o yacht.o ${CC} -o $@ ${LDFLAGS} $^ echosrv: libcasq.a ${ECHOSRV_OBJS} ${CC} -o $@ ${LDFLAGS} $^ Loading @@ -44,4 +49,7 @@ tags: TAGS TAGS: *.[ch] etags *.[ch] *.o: bus_types.h *.o: bus_types.h bus_internal_types.h Makefile sender.o: sender_internal.h listener.o: listener_internal.h %.o: %.h src/lib/bus/bus.c +5 −3 Original line number Diff line number Diff line Loading @@ -236,6 +236,7 @@ bool bus_send_request(struct bus *b, bus_user_msg *msg) /* Pass boxed message to the sender */ if (!sender_enqueue_message(s, box, &complete_fd)) { BUS_LOG(b, 3, LOG_SENDING_REQUEST, "sender_enqueue_message failed", b->udata); return false; } Loading @@ -256,10 +257,11 @@ static bool poll_on_completion(struct bus *b, int fd) { const int ONE_SECOND = 1000; // msec for (int i = 0; i < TIMEOUT_SECONDS; i++) { BUS_LOG(b, 3, LOG_SENDING_REQUEST, "Polling on completion...tick...", b->udata); BUS_LOG(b, 5, LOG_SENDING_REQUEST, "Polling on completion...tick...", b->udata); int res = poll(fds, 1, ONE_SECOND); if (res == -1) { if (is_resumable_io_error(errno)) { BUS_LOG(b, 3, LOG_SENDING_REQUEST, "Polling on completion...EAGAIN", b->udata); errno = 0; } else { assert(false); Loading @@ -275,8 +277,8 @@ static bool poll_on_completion(struct bus *b, int fd) { /* Payload: little-endian uint16_t, msec of backpressure. */ uint16_t msec = (read_buf[0] << 0) + (read_buf[1] << 8); if (msec > 0) { BUS_LOG_SNPRINTF(b, 0, LOG_SENDING_REQUEST, b->udata, 64, " -- countpressure of %d msec", msec); BUS_LOG_SNPRINTF(b, 5, LOG_SENDING_REQUEST, b->udata, 64, " -- backpressure of %d msec", msec); usleep(1000L * msec); //(void)poll(fds, 0, msec); } Loading src/lib/bus/bus_example.c +5 −2 Original line number Diff line number Diff line Loading @@ -353,6 +353,10 @@ static size_t construct_msg(uint8_t *buf, size_t buf_size, size_t payload_size, return header_size + payload_size; } /* Should it CAS on the completion counter? * This should account for nearly all CPU usage. */ #define INCREMENT_COMPLETION_COUNTER 1 static void completion_cb(bus_msg_result_t *res, void *udata) { example_state *s = &state; socket_info *si = (socket_info *)udata; Loading @@ -360,8 +364,7 @@ static void completion_cb(bus_msg_result_t *res, void *udata) { switch (res->status) { case BUS_SEND_SUCCESS: { #if 1 /* CAS completion? or what? */ #if INCREMENT_COMPLETION_COUNTER size_t cur = s->completed_deliveries; for (;;) { if (ATOMIC_BOOL_COMPARE_AND_SWAP(&s->completed_deliveries, cur, cur + 1)) { Loading src/lib/bus/listener.c +8 −2 Original line number Diff line number Diff line Loading @@ -99,8 +99,10 @@ bool listener_close_socket(struct listener *l, int fd) { } /* Coefficients for backpressure based on certain conditions. */ #define MSG_BP_1QTR (0) #define MSG_BP_HALF (0.5) #define MSG_BP_3QTR (2.0) #define RX_INFO_BP_1QTR (0) #define RX_INFO_BP_HALF (0.5) #define RX_INFO_BP_3QTR (2.0) #define THREADPOOL_BP (1.0) Loading @@ -108,8 +110,10 @@ bool listener_close_socket(struct listener *l, int fd) { static uint16_t get_backpressure(struct listener *l) { uint16_t msg_fill_pressure = 0; if (l->msgs_in_use < MAX_QUEUE_MESSAGES / 2) { if (l->msgs_in_use < 0.25 * MAX_QUEUE_MESSAGES) { msg_fill_pressure = 0; } else if (l->msgs_in_use < 0.5 * MAX_QUEUE_MESSAGES) { msg_fill_pressure = MSG_BP_1QTR * l->msgs_in_use; } else if (l->msgs_in_use < 0.75 * MAX_QUEUE_MESSAGES) { msg_fill_pressure = MSG_BP_HALF * l->msgs_in_use; } else { Loading @@ -117,8 +121,10 @@ static uint16_t get_backpressure(struct listener *l) { } uint16_t rx_info_fill_pressure = 0; if (l->rx_info_in_use < MAX_PENDING_MESSAGES / 2) { if (l->rx_info_in_use < 0.25 * MAX_PENDING_MESSAGES) { rx_info_fill_pressure = 0; } else if (l->rx_info_in_use < 0.5 * MAX_PENDING_MESSAGES) { rx_info_fill_pressure = RX_INFO_BP_1QTR * l->rx_info_in_use; } else if (l->rx_info_in_use < 0.75 * MAX_PENDING_MESSAGES) { rx_info_fill_pressure = RX_INFO_BP_HALF * l->rx_info_in_use; } else { Loading Loading
src/lib/bus/.gitignore +1 −0 Original line number Diff line number Diff line Loading @@ -4,3 +4,4 @@ bus_example echosrv notes test_casq test_yacht
src/lib/bus/Makefile +13 −5 Original line number Diff line number Diff line SOCKET99_PATH= ../../../vendor/socket99 THREADPOOL_PATH= ../threadpool OPT= -O0 OPT= -O3 LIB_INC = -I${SOCKET99_PATH} -I${THREADPOOL_PATH} CFLAGS += -std=c99 ${OPT} -g ${LIB_INC} LDFLAGS += -L. -lcasq -L${SOCKET99_PATH} -lsocket99 -L${THREADPOOL_PATH} -lthreadpool Loading @@ -11,15 +11,17 @@ BUS_OBJS = \ listener.o \ sender.o \ util.o \ yacht.o \ ECHOSRV_OBJS = \ echosrv.o \ util.o \ all: bus.png libcasq.a test_casq echosrv bus_example all: bus.png libcasq.a test_casq test_yacht echosrv bus_example test: test_casq test: test_casq test_yacht ./test_casq ./test_yacht %.png: %.dot dot -Tpng -o $@ $^ Loading @@ -30,6 +32,9 @@ libcasq.a: casq.o test_casq: libcasq.a test_casq.o ${CC} -o $@ ${LDFLAGS} $^ test_yacht: test_yacht.o yacht.o ${CC} -o $@ ${LDFLAGS} $^ echosrv: libcasq.a ${ECHOSRV_OBJS} ${CC} -o $@ ${LDFLAGS} $^ Loading @@ -44,4 +49,7 @@ tags: TAGS TAGS: *.[ch] etags *.[ch] *.o: bus_types.h *.o: bus_types.h bus_internal_types.h Makefile sender.o: sender_internal.h listener.o: listener_internal.h %.o: %.h
src/lib/bus/bus.c +5 −3 Original line number Diff line number Diff line Loading @@ -236,6 +236,7 @@ bool bus_send_request(struct bus *b, bus_user_msg *msg) /* Pass boxed message to the sender */ if (!sender_enqueue_message(s, box, &complete_fd)) { BUS_LOG(b, 3, LOG_SENDING_REQUEST, "sender_enqueue_message failed", b->udata); return false; } Loading @@ -256,10 +257,11 @@ static bool poll_on_completion(struct bus *b, int fd) { const int ONE_SECOND = 1000; // msec for (int i = 0; i < TIMEOUT_SECONDS; i++) { BUS_LOG(b, 3, LOG_SENDING_REQUEST, "Polling on completion...tick...", b->udata); BUS_LOG(b, 5, LOG_SENDING_REQUEST, "Polling on completion...tick...", b->udata); int res = poll(fds, 1, ONE_SECOND); if (res == -1) { if (is_resumable_io_error(errno)) { BUS_LOG(b, 3, LOG_SENDING_REQUEST, "Polling on completion...EAGAIN", b->udata); errno = 0; } else { assert(false); Loading @@ -275,8 +277,8 @@ static bool poll_on_completion(struct bus *b, int fd) { /* Payload: little-endian uint16_t, msec of backpressure. */ uint16_t msec = (read_buf[0] << 0) + (read_buf[1] << 8); if (msec > 0) { BUS_LOG_SNPRINTF(b, 0, LOG_SENDING_REQUEST, b->udata, 64, " -- countpressure of %d msec", msec); BUS_LOG_SNPRINTF(b, 5, LOG_SENDING_REQUEST, b->udata, 64, " -- backpressure of %d msec", msec); usleep(1000L * msec); //(void)poll(fds, 0, msec); } Loading
src/lib/bus/bus_example.c +5 −2 Original line number Diff line number Diff line Loading @@ -353,6 +353,10 @@ static size_t construct_msg(uint8_t *buf, size_t buf_size, size_t payload_size, return header_size + payload_size; } /* Should it CAS on the completion counter? * This should account for nearly all CPU usage. */ #define INCREMENT_COMPLETION_COUNTER 1 static void completion_cb(bus_msg_result_t *res, void *udata) { example_state *s = &state; socket_info *si = (socket_info *)udata; Loading @@ -360,8 +364,7 @@ static void completion_cb(bus_msg_result_t *res, void *udata) { switch (res->status) { case BUS_SEND_SUCCESS: { #if 1 /* CAS completion? or what? */ #if INCREMENT_COMPLETION_COUNTER size_t cur = s->completed_deliveries; for (;;) { if (ATOMIC_BOOL_COMPARE_AND_SWAP(&s->completed_deliveries, cur, cur + 1)) { Loading
src/lib/bus/listener.c +8 −2 Original line number Diff line number Diff line Loading @@ -99,8 +99,10 @@ bool listener_close_socket(struct listener *l, int fd) { } /* Coefficients for backpressure based on certain conditions. */ #define MSG_BP_1QTR (0) #define MSG_BP_HALF (0.5) #define MSG_BP_3QTR (2.0) #define RX_INFO_BP_1QTR (0) #define RX_INFO_BP_HALF (0.5) #define RX_INFO_BP_3QTR (2.0) #define THREADPOOL_BP (1.0) Loading @@ -108,8 +110,10 @@ bool listener_close_socket(struct listener *l, int fd) { static uint16_t get_backpressure(struct listener *l) { uint16_t msg_fill_pressure = 0; if (l->msgs_in_use < MAX_QUEUE_MESSAGES / 2) { if (l->msgs_in_use < 0.25 * MAX_QUEUE_MESSAGES) { msg_fill_pressure = 0; } else if (l->msgs_in_use < 0.5 * MAX_QUEUE_MESSAGES) { msg_fill_pressure = MSG_BP_1QTR * l->msgs_in_use; } else if (l->msgs_in_use < 0.75 * MAX_QUEUE_MESSAGES) { msg_fill_pressure = MSG_BP_HALF * l->msgs_in_use; } else { Loading @@ -117,8 +121,10 @@ static uint16_t get_backpressure(struct listener *l) { } uint16_t rx_info_fill_pressure = 0; if (l->rx_info_in_use < MAX_PENDING_MESSAGES / 2) { if (l->rx_info_in_use < 0.25 * MAX_PENDING_MESSAGES) { rx_info_fill_pressure = 0; } else if (l->rx_info_in_use < 0.5 * MAX_PENDING_MESSAGES) { rx_info_fill_pressure = RX_INFO_BP_1QTR * l->rx_info_in_use; } else if (l->rx_info_in_use < 0.75 * MAX_PENDING_MESSAGES) { rx_info_fill_pressure = RX_INFO_BP_HALF * l->rx_info_in_use; } else { Loading