Loading Makefile +3 −0 Original line number Diff line number Diff line Loading @@ -133,6 +133,9 @@ $(OUT_DIR)/socket99.o: $(SOCKET99)/socket99.c $(SOCKET99)/socket99.h $(OUT_DIR)/protobuf-c.o: $(PROTOBUFC)/protobuf-c/protobuf-c.c $(PROTOBUFC)/protobuf-c/protobuf-c.h $(CC) -c -o $@ $< -std=c99 -fPIC -g -Wall -Werror -Wno-unused-parameter $(OPTIMIZE) -I$(PROTOBUFC) ${OUT_DIR}/kinetic_types.o: ${LIB_DIR}/kinetic_types_internal.h ${OUT_DIR}/bus.o: ${LIB_DIR}/bus/bus_types.h ${OUT_DIR}/sender.o: ${LIB_DIR}/bus/sender_internal.h ${OUT_DIR}/listener.o: ${LIB_DIR}/bus/listener_internal.h $(OUT_DIR)/threadpool.o: ${LIB_DIR}/threadpool/threadpool.c ${LIB_DIR}/threadpool/threadpool.h $(CC) -o $@ -c $< $(CFLAGS) Loading src/lib/bus/bus.c +1 −0 Original line number Diff line number Diff line Loading @@ -462,6 +462,7 @@ bool bus_register_socket(struct bus *b, bus_socket_t type, int fd, void *udata) ci->fd = fd; ci->to_read_size = 0; ci->udata = udata; ci->largest_seq_id_seen = 0; if (type == BUS_SOCKET_SSL) { if (!bus_ssl_connect(b, ci)) { goto cleanup; } Loading src/lib/bus/bus_internal_types.h +1 −0 Original line number Diff line number Diff line Loading @@ -103,6 +103,7 @@ typedef struct { int fd; rx_error_t error; size_t to_read_size; int64_t largest_seq_id_seen; SSL *ssl; /* SSL handle. Must be valid or BUS_NO_SSL. */ Loading src/lib/bus/bus_types.h +3 −0 Original line number Diff line number Diff line Loading @@ -66,6 +66,9 @@ struct boxed_msg; "snprintf failure -- " \ __FILE__, \ udata); \ char line_buf[32]; \ snprintf(line_buf, 32, "line %d\n", __LINE__); \ _b->log_cb(event_key, level, line_buf, udata); \ } else { \ _b->log_cb(event_key, level, log_buf, udata); \ } \ Loading src/lib/bus/listener.c +24 −19 Original line number Diff line number Diff line Loading @@ -160,7 +160,7 @@ bool listener_hold_response(struct listener *l, int fd, } BUS_LOG_SNPRINTF(b, 5, LOG_MEMORY, b->udata, 128, "listener_hold_response with fd %d, seq_id %lld", "listener_hold_response with <fd:%d, seq_id:%lld>", fd, (long long)seq_id); msg->type = MSG_HOLD_RESPONSE; Loading @@ -171,7 +171,7 @@ bool listener_hold_response(struct listener *l, int fd, bool pm_res = push_message(l, msg); if (!pm_res) { BUS_LOG_SNPRINTF(b, 0, LOG_MEMORY, b->udata, 128, "listener_hold_response with fd %d, seq_id %lld FAILED", "listener_hold_response with <fd:%d, seq_id:%lld> FAILED", fd, (long long)seq_id); } return pm_res; Loading @@ -188,7 +188,8 @@ bool listener_expect_response(struct listener *l, boxed_msg *box, } BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "listener_expect_response with box of %p", (void*)box); "listener_expect_response with box of %p, seq_id:%lld", (void*)box, (long long)box->out_seq_id); msg->type = MSG_EXPECT_RESPONSE; msg->u.expect.box = box; Loading Loading @@ -531,7 +532,7 @@ static bool sink_socket_read(struct bus *b, BUS_LOG(b, 3, LOG_LISTENER, "calling unpack CB", b->udata); bus_unpack_cb_res_t ures = b->unpack_cb(sres.full_msg_buffer, ci->udata); BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64, "process_unpacked_message: ok? %d, seq_id %lld", "process_unpacked_message: ok? %d, seq_id:%lld", ures.ok, (long long)ures.u.success.seq_id); process_unpacked_message(l, ci, ures); } Loading Loading @@ -564,7 +565,7 @@ static rx_info_t *find_info_by_sequence_id(listener *l, break; /* skip */ case RIS_HOLD: BUS_LOG_SNPRINTF(b, 4, LOG_MEMORY, b->udata, 128, "find_info_by_sequence_id: info (%p) at +%d: fd %d, seq_id %lld", "find_info_by_sequence_id: info (%p) at +%d: <fd:%d, seq_id:%lld>", (void*)info, info->id, fd, (long long)seq_id); if (info->u.hold.fd == fd && info->u.hold.seq_id == seq_id) { return info; Loading Loading @@ -608,11 +609,12 @@ static void process_unpacked_message(listener *l, int64_t seq_id = result.u.success.seq_id; void *opaque_msg = result.u.success.msg; if (seq_id < l->largest_seq_id_seen && l->largest_seq_id_seen != 0) { if (seq_id < ci->largest_seq_id_seen && ci->largest_seq_id_seen != 0) { BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 128, "suspicious sequence ID: largest seen is %lld, got %lld\n", (long long)l->largest_seq_id_seen, (long long)seq_id); (long long)ci->largest_seq_id_seen, (long long)seq_id); } ci->largest_seq_id_seen = seq_id; rx_info_t *info = find_info_by_sequence_id(l, ci->fd, seq_id); if (info) { Loading @@ -626,7 +628,7 @@ static void process_unpacked_message(listener *l, case RIS_EXPECT: { BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 128, "marking info %d, seq_id %lld ready for delivery", "marking info %d, seq_id:%lld ready for delivery", info->id, (long long)result.u.success.seq_id); info->u.expect.error = RX_ERROR_READY_FOR_DELIVERY; assert(!info->u.hold.has_result); Loading @@ -641,10 +643,11 @@ static void process_unpacked_message(listener *l, } } else { /* We received a response that we weren't expecting. */ if (seq_id != 0) { BUS_LOG_SNPRINTF(b, 2 - 2, LOG_LISTENER, b->udata, 128, "Couldn't find info for fd %d, seq_id %lld, msg %p", "Couldn't find info for fd:%d, seq_id:%lld, msg %p", ci->fd, (long long)seq_id, opaque_msg); } if (b->unexpected_msg_cb) { b->unexpected_msg_cb(opaque_msg, seq_id, b->udata, ci->udata); } Loading Loading @@ -1210,8 +1213,8 @@ static void attempt_delivery(listener *l, struct rx_info_t *info) { if (bus_process_boxed_message(b, box, &backpressure)) { /* success */ BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 256, "successfully delivered box %p (seq_id %lld), marking info %d as DONE", (void*)box, (long long)box->out_seq_id, info->id); "successfully delivered box %p (seq_id:%lld), marking info %d as DONE", (void*)box, (long long)seq_id, info->id); info->u.expect.error = RX_ERROR_DONE; BUS_LOG_SNPRINTF(b, 4, LOG_LISTENER, b->udata, 128, "initial clean-up attempt for completed RX event at info +%d", info->id); Loading Loading @@ -1240,8 +1243,9 @@ static void expect_response(listener *l, struct boxed_msg *box) { bus_unpack_cb_res_t result = info->u.hold.result; BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 256, "converting HOLD to EXPECT for info %d with result, attempting delivery <box:%p, fd:%d, seq_id:%lld>", info->id, (void *)box, info->u.hold.fd, (long long)info->u.hold.seq_id); "converting HOLD to EXPECT for info %d (%p) with result, attempting delivery <box:%p, fd:%d, seq_id:%lld>", info->id, (void *)info, (void *)box, info->u.hold.fd, (long long)info->u.hold.seq_id); info->state = RIS_EXPECT; info->u.expect.error = RX_ERROR_READY_FOR_DELIVERY; Loading @@ -1251,8 +1255,9 @@ static void expect_response(listener *l, struct boxed_msg *box) { attempt_delivery(l, info); } else { BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 256, "converting HOLD to EXPECT info %d, attempting delivery <box:%p, fd:%d, seq_id:%lld>", info->id, (void *)box, info->u.hold.fd, (long long)info->u.hold.seq_id); "converting HOLD to EXPECT info %d (%p), attempting delivery <box:%p, fd:%d, seq_id:%lld>", info->id, (void *)info, (void *)box, info->u.hold.fd, (long long)info->u.hold.seq_id); info->state = RIS_EXPECT; info->u.expect.box = box; info->u.expect.error = RX_ERROR_NONE; Loading Loading
Makefile +3 −0 Original line number Diff line number Diff line Loading @@ -133,6 +133,9 @@ $(OUT_DIR)/socket99.o: $(SOCKET99)/socket99.c $(SOCKET99)/socket99.h $(OUT_DIR)/protobuf-c.o: $(PROTOBUFC)/protobuf-c/protobuf-c.c $(PROTOBUFC)/protobuf-c/protobuf-c.h $(CC) -c -o $@ $< -std=c99 -fPIC -g -Wall -Werror -Wno-unused-parameter $(OPTIMIZE) -I$(PROTOBUFC) ${OUT_DIR}/kinetic_types.o: ${LIB_DIR}/kinetic_types_internal.h ${OUT_DIR}/bus.o: ${LIB_DIR}/bus/bus_types.h ${OUT_DIR}/sender.o: ${LIB_DIR}/bus/sender_internal.h ${OUT_DIR}/listener.o: ${LIB_DIR}/bus/listener_internal.h $(OUT_DIR)/threadpool.o: ${LIB_DIR}/threadpool/threadpool.c ${LIB_DIR}/threadpool/threadpool.h $(CC) -o $@ -c $< $(CFLAGS) Loading
src/lib/bus/bus.c +1 −0 Original line number Diff line number Diff line Loading @@ -462,6 +462,7 @@ bool bus_register_socket(struct bus *b, bus_socket_t type, int fd, void *udata) ci->fd = fd; ci->to_read_size = 0; ci->udata = udata; ci->largest_seq_id_seen = 0; if (type == BUS_SOCKET_SSL) { if (!bus_ssl_connect(b, ci)) { goto cleanup; } Loading
src/lib/bus/bus_internal_types.h +1 −0 Original line number Diff line number Diff line Loading @@ -103,6 +103,7 @@ typedef struct { int fd; rx_error_t error; size_t to_read_size; int64_t largest_seq_id_seen; SSL *ssl; /* SSL handle. Must be valid or BUS_NO_SSL. */ Loading
src/lib/bus/bus_types.h +3 −0 Original line number Diff line number Diff line Loading @@ -66,6 +66,9 @@ struct boxed_msg; "snprintf failure -- " \ __FILE__, \ udata); \ char line_buf[32]; \ snprintf(line_buf, 32, "line %d\n", __LINE__); \ _b->log_cb(event_key, level, line_buf, udata); \ } else { \ _b->log_cb(event_key, level, log_buf, udata); \ } \ Loading
src/lib/bus/listener.c +24 −19 Original line number Diff line number Diff line Loading @@ -160,7 +160,7 @@ bool listener_hold_response(struct listener *l, int fd, } BUS_LOG_SNPRINTF(b, 5, LOG_MEMORY, b->udata, 128, "listener_hold_response with fd %d, seq_id %lld", "listener_hold_response with <fd:%d, seq_id:%lld>", fd, (long long)seq_id); msg->type = MSG_HOLD_RESPONSE; Loading @@ -171,7 +171,7 @@ bool listener_hold_response(struct listener *l, int fd, bool pm_res = push_message(l, msg); if (!pm_res) { BUS_LOG_SNPRINTF(b, 0, LOG_MEMORY, b->udata, 128, "listener_hold_response with fd %d, seq_id %lld FAILED", "listener_hold_response with <fd:%d, seq_id:%lld> FAILED", fd, (long long)seq_id); } return pm_res; Loading @@ -188,7 +188,8 @@ bool listener_expect_response(struct listener *l, boxed_msg *box, } BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "listener_expect_response with box of %p", (void*)box); "listener_expect_response with box of %p, seq_id:%lld", (void*)box, (long long)box->out_seq_id); msg->type = MSG_EXPECT_RESPONSE; msg->u.expect.box = box; Loading Loading @@ -531,7 +532,7 @@ static bool sink_socket_read(struct bus *b, BUS_LOG(b, 3, LOG_LISTENER, "calling unpack CB", b->udata); bus_unpack_cb_res_t ures = b->unpack_cb(sres.full_msg_buffer, ci->udata); BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64, "process_unpacked_message: ok? %d, seq_id %lld", "process_unpacked_message: ok? %d, seq_id:%lld", ures.ok, (long long)ures.u.success.seq_id); process_unpacked_message(l, ci, ures); } Loading Loading @@ -564,7 +565,7 @@ static rx_info_t *find_info_by_sequence_id(listener *l, break; /* skip */ case RIS_HOLD: BUS_LOG_SNPRINTF(b, 4, LOG_MEMORY, b->udata, 128, "find_info_by_sequence_id: info (%p) at +%d: fd %d, seq_id %lld", "find_info_by_sequence_id: info (%p) at +%d: <fd:%d, seq_id:%lld>", (void*)info, info->id, fd, (long long)seq_id); if (info->u.hold.fd == fd && info->u.hold.seq_id == seq_id) { return info; Loading Loading @@ -608,11 +609,12 @@ static void process_unpacked_message(listener *l, int64_t seq_id = result.u.success.seq_id; void *opaque_msg = result.u.success.msg; if (seq_id < l->largest_seq_id_seen && l->largest_seq_id_seen != 0) { if (seq_id < ci->largest_seq_id_seen && ci->largest_seq_id_seen != 0) { BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 128, "suspicious sequence ID: largest seen is %lld, got %lld\n", (long long)l->largest_seq_id_seen, (long long)seq_id); (long long)ci->largest_seq_id_seen, (long long)seq_id); } ci->largest_seq_id_seen = seq_id; rx_info_t *info = find_info_by_sequence_id(l, ci->fd, seq_id); if (info) { Loading @@ -626,7 +628,7 @@ static void process_unpacked_message(listener *l, case RIS_EXPECT: { BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 128, "marking info %d, seq_id %lld ready for delivery", "marking info %d, seq_id:%lld ready for delivery", info->id, (long long)result.u.success.seq_id); info->u.expect.error = RX_ERROR_READY_FOR_DELIVERY; assert(!info->u.hold.has_result); Loading @@ -641,10 +643,11 @@ static void process_unpacked_message(listener *l, } } else { /* We received a response that we weren't expecting. */ if (seq_id != 0) { BUS_LOG_SNPRINTF(b, 2 - 2, LOG_LISTENER, b->udata, 128, "Couldn't find info for fd %d, seq_id %lld, msg %p", "Couldn't find info for fd:%d, seq_id:%lld, msg %p", ci->fd, (long long)seq_id, opaque_msg); } if (b->unexpected_msg_cb) { b->unexpected_msg_cb(opaque_msg, seq_id, b->udata, ci->udata); } Loading Loading @@ -1210,8 +1213,8 @@ static void attempt_delivery(listener *l, struct rx_info_t *info) { if (bus_process_boxed_message(b, box, &backpressure)) { /* success */ BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 256, "successfully delivered box %p (seq_id %lld), marking info %d as DONE", (void*)box, (long long)box->out_seq_id, info->id); "successfully delivered box %p (seq_id:%lld), marking info %d as DONE", (void*)box, (long long)seq_id, info->id); info->u.expect.error = RX_ERROR_DONE; BUS_LOG_SNPRINTF(b, 4, LOG_LISTENER, b->udata, 128, "initial clean-up attempt for completed RX event at info +%d", info->id); Loading Loading @@ -1240,8 +1243,9 @@ static void expect_response(listener *l, struct boxed_msg *box) { bus_unpack_cb_res_t result = info->u.hold.result; BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 256, "converting HOLD to EXPECT for info %d with result, attempting delivery <box:%p, fd:%d, seq_id:%lld>", info->id, (void *)box, info->u.hold.fd, (long long)info->u.hold.seq_id); "converting HOLD to EXPECT for info %d (%p) with result, attempting delivery <box:%p, fd:%d, seq_id:%lld>", info->id, (void *)info, (void *)box, info->u.hold.fd, (long long)info->u.hold.seq_id); info->state = RIS_EXPECT; info->u.expect.error = RX_ERROR_READY_FOR_DELIVERY; Loading @@ -1251,8 +1255,9 @@ static void expect_response(listener *l, struct boxed_msg *box) { attempt_delivery(l, info); } else { BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 256, "converting HOLD to EXPECT info %d, attempting delivery <box:%p, fd:%d, seq_id:%lld>", info->id, (void *)box, info->u.hold.fd, (long long)info->u.hold.seq_id); "converting HOLD to EXPECT info %d (%p), attempting delivery <box:%p, fd:%d, seq_id:%lld>", info->id, (void *)info, (void *)box, info->u.hold.fd, (long long)info->u.hold.seq_id); info->state = RIS_EXPECT; info->u.expect.box = box; info->u.expect.error = RX_ERROR_NONE; Loading