Loading src/lib/bus/bus.c +4 −1 Original line number Diff line number Diff line Loading @@ -281,7 +281,10 @@ bool bus_send_request(struct bus *b, bus_user_msg *msg) struct sender *s = b->senders[s_id]; BUS_LOG(b, 3, LOG_SENDING_REQUEST, "Sending request...", b->udata); return sender_send_request(s, box); bool res = sender_send_request(s, box); BUS_LOG_SNPRINTF(b, 3, LOG_SENDING_REQUEST, b->udata, 64, "...request sent, result %d", res); return res; } static bool poll_on_completion(struct bus *b, int fd) { Loading src/lib/bus/listener.c +7 −2 Original line number Diff line number Diff line Loading @@ -100,10 +100,10 @@ bool listener_remove_socket(struct listener *l, int fd) { } /* Coefficients for backpressure based on certain conditions. */ #define MSG_BP_1QTR (0) #define MSG_BP_1QTR (0.25) #define MSG_BP_HALF (0.5) #define MSG_BP_3QTR (2.0) #define RX_INFO_BP_1QTR (0) #define RX_INFO_BP_1QTR (0.5) #define RX_INFO_BP_HALF (0.5) #define RX_INFO_BP_3QTR (2.0) #define THREADPOOL_BP (1.0) Loading Loading @@ -134,6 +134,11 @@ static uint16_t get_backpressure(struct listener *l) { uint16_t threadpool_fill_pressure = THREADPOOL_BP * l->upstream_backpressure; struct bus *b = l->bus; BUS_LOG_SNPRINTF(b, 6, LOG_SENDER, b->udata, 64, "lbp: %u, %u (iu %u), %u", msg_fill_pressure, rx_info_fill_pressure, l->rx_info_in_use, threadpool_fill_pressure); return msg_fill_pressure + rx_info_fill_pressure + threadpool_fill_pressure; } Loading src/lib/bus/sender.c +8 −5 Original line number Diff line number Diff line Loading @@ -269,10 +269,13 @@ static bool commit_event_and_block(struct sender *s, tx_info_t *info) { backpressure += (buf[2] << 8); /* Push back if message bus is too busy. */ backpressure >>= 0; // TODO: tuning BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64, backpressure >>= 7; // TODO: further tuning if (backpressure > 0) { BUS_LOG_SNPRINTF(b, 5, LOG_SENDER, b->udata, 64, "reading done_pipe: backpressure %d", backpressure); if (backpressure > 0) { poll(NULL, 0, backpressure); } poll(NULL, 0, backpressure); } BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64, "reading done_pipe: success %d", 1); Loading Loading @@ -843,6 +846,7 @@ static void update_sent(struct bus *b, sender *s, tx_info_t *info, ssize_t sent) } static void notify_caller(sender *s, tx_info_t *info, bool success) { struct bus *b = s->bus; uint16_t bp = 0; switch (info->state) { Loading @@ -866,7 +870,6 @@ static void notify_caller(sender *s, tx_info_t *info, bool success) { ssize_t res = 0; struct bus *b = s->bus; for (;;) { /* for loop because of EINTR */ res = write(pipe_fd, buf, sizeof(buf)); BUS_LOG_SNPRINTF(b, 5, LOG_SENDER, b->udata, 64, Loading src/lib/kinetic_pdu.c +7 −1 Original line number Diff line number Diff line Loading @@ -37,10 +37,16 @@ #define STATIC static #endif #include <time.h> STATIC void log_cb(log_event_t event, int log_level, const char *msg, void *udata) { (void)udata; const char *event_str = bus_log_event_str(event); fprintf(stderr, "%s[%d] -- %s\n", struct timeval tv; gettimeofday(&tv, NULL); FILE *f = stdout; /* stderr */ fprintf(f, "%ld.%06d: %s[%d] -- %s\n", tv.tv_sec, tv.tv_usec, event_str, log_level, msg); } Loading src/lib/kinetic_socket.c +9 −2 Original line number Diff line number Diff line Loading @@ -77,6 +77,8 @@ int KineticSocket_Connect(const char* host, int port) return KINETIC_SOCKET_DESCRIPTOR_INVALID; } KineticSocket_EnableTCPNoDelay(result.fd); for (ai = ai_result; ai != NULL; ai = ai->ai_next) { int setsockopt_result; int buffer_size = KINETIC_OBJ_SIZE; Loading Loading @@ -151,6 +153,11 @@ void KineticSocket_Close(int socket) } } void KineticSocket_EnableTCPNoDelay(int socket) { int on = 1; setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on)); } void KineticSocket_BeginPacket(int socket) { Loading @@ -167,7 +174,7 @@ void KineticSocket_FinishPacket(int socket) #if !defined(__APPLE__) /* TCP_CORK is NOT available on OSX */ int off = 0; setsockopt(socket, IPPROTO_TCP, TCP_CORK, &off, sizeof(off)); #else (void)socket; #endif int on = 1; setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on)); } Loading
src/lib/bus/bus.c +4 −1 Original line number Diff line number Diff line Loading @@ -281,7 +281,10 @@ bool bus_send_request(struct bus *b, bus_user_msg *msg) struct sender *s = b->senders[s_id]; BUS_LOG(b, 3, LOG_SENDING_REQUEST, "Sending request...", b->udata); return sender_send_request(s, box); bool res = sender_send_request(s, box); BUS_LOG_SNPRINTF(b, 3, LOG_SENDING_REQUEST, b->udata, 64, "...request sent, result %d", res); return res; } static bool poll_on_completion(struct bus *b, int fd) { Loading
src/lib/bus/listener.c +7 −2 Original line number Diff line number Diff line Loading @@ -100,10 +100,10 @@ bool listener_remove_socket(struct listener *l, int fd) { } /* Coefficients for backpressure based on certain conditions. */ #define MSG_BP_1QTR (0) #define MSG_BP_1QTR (0.25) #define MSG_BP_HALF (0.5) #define MSG_BP_3QTR (2.0) #define RX_INFO_BP_1QTR (0) #define RX_INFO_BP_1QTR (0.5) #define RX_INFO_BP_HALF (0.5) #define RX_INFO_BP_3QTR (2.0) #define THREADPOOL_BP (1.0) Loading Loading @@ -134,6 +134,11 @@ static uint16_t get_backpressure(struct listener *l) { uint16_t threadpool_fill_pressure = THREADPOOL_BP * l->upstream_backpressure; struct bus *b = l->bus; BUS_LOG_SNPRINTF(b, 6, LOG_SENDER, b->udata, 64, "lbp: %u, %u (iu %u), %u", msg_fill_pressure, rx_info_fill_pressure, l->rx_info_in_use, threadpool_fill_pressure); return msg_fill_pressure + rx_info_fill_pressure + threadpool_fill_pressure; } Loading
src/lib/bus/sender.c +8 −5 Original line number Diff line number Diff line Loading @@ -269,10 +269,13 @@ static bool commit_event_and_block(struct sender *s, tx_info_t *info) { backpressure += (buf[2] << 8); /* Push back if message bus is too busy. */ backpressure >>= 0; // TODO: tuning BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64, backpressure >>= 7; // TODO: further tuning if (backpressure > 0) { BUS_LOG_SNPRINTF(b, 5, LOG_SENDER, b->udata, 64, "reading done_pipe: backpressure %d", backpressure); if (backpressure > 0) { poll(NULL, 0, backpressure); } poll(NULL, 0, backpressure); } BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64, "reading done_pipe: success %d", 1); Loading Loading @@ -843,6 +846,7 @@ static void update_sent(struct bus *b, sender *s, tx_info_t *info, ssize_t sent) } static void notify_caller(sender *s, tx_info_t *info, bool success) { struct bus *b = s->bus; uint16_t bp = 0; switch (info->state) { Loading @@ -866,7 +870,6 @@ static void notify_caller(sender *s, tx_info_t *info, bool success) { ssize_t res = 0; struct bus *b = s->bus; for (;;) { /* for loop because of EINTR */ res = write(pipe_fd, buf, sizeof(buf)); BUS_LOG_SNPRINTF(b, 5, LOG_SENDER, b->udata, 64, Loading
src/lib/kinetic_pdu.c +7 −1 Original line number Diff line number Diff line Loading @@ -37,10 +37,16 @@ #define STATIC static #endif #include <time.h> STATIC void log_cb(log_event_t event, int log_level, const char *msg, void *udata) { (void)udata; const char *event_str = bus_log_event_str(event); fprintf(stderr, "%s[%d] -- %s\n", struct timeval tv; gettimeofday(&tv, NULL); FILE *f = stdout; /* stderr */ fprintf(f, "%ld.%06d: %s[%d] -- %s\n", tv.tv_sec, tv.tv_usec, event_str, log_level, msg); } Loading
src/lib/kinetic_socket.c +9 −2 Original line number Diff line number Diff line Loading @@ -77,6 +77,8 @@ int KineticSocket_Connect(const char* host, int port) return KINETIC_SOCKET_DESCRIPTOR_INVALID; } KineticSocket_EnableTCPNoDelay(result.fd); for (ai = ai_result; ai != NULL; ai = ai->ai_next) { int setsockopt_result; int buffer_size = KINETIC_OBJ_SIZE; Loading Loading @@ -151,6 +153,11 @@ void KineticSocket_Close(int socket) } } void KineticSocket_EnableTCPNoDelay(int socket) { int on = 1; setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on)); } void KineticSocket_BeginPacket(int socket) { Loading @@ -167,7 +174,7 @@ void KineticSocket_FinishPacket(int socket) #if !defined(__APPLE__) /* TCP_CORK is NOT available on OSX */ int off = 0; setsockopt(socket, IPPROTO_TCP, TCP_CORK, &off, sizeof(off)); #else (void)socket; #endif int on = 1; setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on)); }