Loading src/lib/bus/listener.c +16 −15 Original line number Diff line number Diff line Loading @@ -66,7 +66,7 @@ struct listener *listener_init(struct bus *b, struct bus_config *cfg) { for (int i = 0; i < MAX_QUEUE_MESSAGES; i++) { listener_msg *msg = &l->msgs[i]; uint16_t *p_id = (uint16_t *)&msg->id; uint8_t *p_id = &msg->id; *p_id = i; if (i < MAX_QUEUE_MESSAGES - 1) { /* forward link */ msg->next = &l->msgs[i + 1]; Loading Loading @@ -161,7 +161,7 @@ bool listener_hold_response(struct listener *l, int fd, BUS_LOG_SNPRINTF(b, 5, LOG_MEMORY, b->udata, 128, "listener_hold_response with fd %d, seq_id %lld", fd, seq_id); fd, (long long)seq_id); msg->type = MSG_HOLD_RESPONSE; msg->u.hold.fd = fd; Loading @@ -172,7 +172,7 @@ bool listener_hold_response(struct listener *l, int fd, if (!pm_res) { BUS_LOG_SNPRINTF(b, 0, LOG_MEMORY, b->udata, 128, "listener_hold_response with fd %d, seq_id %lld FAILED", fd, seq_id); fd, (long long)seq_id); } return pm_res; } Loading Loading @@ -532,7 +532,8 @@ static bool sink_socket_read(struct bus *b, BUS_LOG(b, 3, LOG_LISTENER, "calling unpack CB", b->udata); bus_unpack_cb_res_t ures = b->unpack_cb(sres.full_msg_buffer, ci->udata); BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64, "process_unpacked_message: ok? %d, seq_id %lld", ures.ok, ures.u.success.seq_id); "process_unpacked_message: ok? %d, seq_id %lld", ures.ok, (long long)ures.u.success.seq_id); process_unpacked_message(l, ci, ures); } Loading Loading @@ -565,7 +566,7 @@ static rx_info_t *find_info_by_sequence_id(listener *l, case RIS_HOLD: BUS_LOG_SNPRINTF(b, 4, LOG_MEMORY, b->udata, 128, "find_info_by_sequence_id: info (%p) at +%d: fd %d, seq_id %lld", (void*)info, info->id, fd, seq_id); (void*)info, info->id, fd, (long long)seq_id); if (info->u.hold.fd == fd && info->u.hold.seq_id == seq_id) { return info; } Loading @@ -591,7 +592,7 @@ static rx_info_t *find_info_by_sequence_id(listener *l, if (b->log_level > 5 || 1) { BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 64, "==== Could not find <fd: %d, seq_id: %lld>, dumping table ====\n", fd, seq_id); fd, (long long)seq_id); dump_rx_info_table(l); } /* Not found. Probably an unsolicited status message. */ Loading Loading @@ -621,7 +622,7 @@ static void process_unpacked_message(listener *l, { BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 128, "marking info %d, seq_id %lld ready for delivery", info->id, result.u.success.seq_id); info->id, (long long)result.u.success.seq_id); info->u.expect.error = RX_ERROR_READY_FOR_DELIVERY; assert(!info->u.hold.has_result); info->u.expect.has_result = true; Loading Loading @@ -735,13 +736,13 @@ static void dump_rx_info_table(listener *l) { switch (l->rx_info[i].state) { case RIS_HOLD: printf(", fd %d, seq_id %lld, has_result? %d\n", info->u.hold.fd, info->u.hold.seq_id, info->u.hold.has_result); info->u.hold.fd, (long long)info->u.hold.seq_id, info->u.hold.has_result); break; case RIS_EXPECT: { struct boxed_msg *box = info->u.expect.box; printf(", box %p (fd:%d, seq_id:%lld), error %d, has_result? %d\n", (void *)box, box ? box->fd : -1, box ? box->out_seq_id : -1, (void *)box, box ? box->fd : -1, box ? (long long)box->out_seq_id : -1, info->u.expect.error, info->u.expect.has_result); break; } Loading @@ -768,7 +769,7 @@ static void retry_delivery(listener *l, rx_info_t *info) { if (bus_process_boxed_message(l->bus, box, &backpressure)) { BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "successfully delivered box %p (seq_id %lld) from info %d at line %d (retry)", (void*)box, box->out_seq_id, info->id, __LINE__); (void*)box, (long long)box->out_seq_id, info->id, __LINE__); info->u.expect.error = RX_ERROR_DONE; release_rx_info(l, info); } else { Loading Loading @@ -1147,7 +1148,7 @@ static void hold_response(listener *l, int fd, int64_t seq_id, int16_t timeout_s assert(info->state == RIS_INACTIVE); BUS_LOG_SNPRINTF(b, 5, LOG_LISTENER, b->udata, 128, "setting info %p(+%d) to hold response <fd:%d, seq_id:%lld>", (void *)info, info->id, fd, seq_id); (void *)info, info->id, fd, (long long)seq_id); info->state = RIS_HOLD; info->timeout_sec = timeout_sec; Loading Loading @@ -1205,7 +1206,7 @@ static void attempt_delivery(listener *l, struct rx_info_t *info) { /* success */ BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 256, "successfully delivered box %p (seq_id %lld), marking info %d as DONE", (void*)box, box->out_seq_id, info->id); (void*)box, (long long)box->out_seq_id, info->id); info->u.expect.error = RX_ERROR_DONE; BUS_LOG_SNPRINTF(b, 4, LOG_LISTENER, b->udata, 128, "initial clean-up attempt for completed RX event at info +%d", info->id); Loading @@ -1224,7 +1225,7 @@ static void expect_response(listener *l, struct boxed_msg *box) { struct bus *b = l->bus; BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 128, "notifying to expect response <box:%p, fd:%d, seq_id:%lld>", (void *)box, box->fd, box->out_seq_id); (void *)box, box->fd, (long long)box->out_seq_id); /* If there's a pending HOLD message, convert it. */ rx_info_t *info = get_hold_rx_info(l, box->fd, box->out_seq_id); Loading @@ -1235,7 +1236,7 @@ static void expect_response(listener *l, struct boxed_msg *box) { BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 256, "converting HOLD to EXPECT for info %d with result, attempting delivery <box:%p, fd:%d, seq_id:%lld>", info->id, (void *)box, info->u.hold.fd, info->u.hold.seq_id); info->id, (void *)box, info->u.hold.fd, (long long)info->u.hold.seq_id); info->state = RIS_EXPECT; info->u.expect.error = RX_ERROR_READY_FOR_DELIVERY; Loading @@ -1246,7 +1247,7 @@ static void expect_response(listener *l, struct boxed_msg *box) { } else { BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 256, "converting HOLD to EXPECT info %d, attempting delivery <box:%p, fd:%d, seq_id:%lld>", info->id, (void *)box, info->u.hold.fd, info->u.hold.seq_id); info->id, (void *)box, info->u.hold.fd, (long long)info->u.hold.seq_id); info->state = RIS_EXPECT; info->u.expect.box = box; info->u.expect.error = RX_ERROR_NONE; Loading Loading
src/lib/bus/listener.c +16 −15 Original line number Diff line number Diff line Loading @@ -66,7 +66,7 @@ struct listener *listener_init(struct bus *b, struct bus_config *cfg) { for (int i = 0; i < MAX_QUEUE_MESSAGES; i++) { listener_msg *msg = &l->msgs[i]; uint16_t *p_id = (uint16_t *)&msg->id; uint8_t *p_id = &msg->id; *p_id = i; if (i < MAX_QUEUE_MESSAGES - 1) { /* forward link */ msg->next = &l->msgs[i + 1]; Loading Loading @@ -161,7 +161,7 @@ bool listener_hold_response(struct listener *l, int fd, BUS_LOG_SNPRINTF(b, 5, LOG_MEMORY, b->udata, 128, "listener_hold_response with fd %d, seq_id %lld", fd, seq_id); fd, (long long)seq_id); msg->type = MSG_HOLD_RESPONSE; msg->u.hold.fd = fd; Loading @@ -172,7 +172,7 @@ bool listener_hold_response(struct listener *l, int fd, if (!pm_res) { BUS_LOG_SNPRINTF(b, 0, LOG_MEMORY, b->udata, 128, "listener_hold_response with fd %d, seq_id %lld FAILED", fd, seq_id); fd, (long long)seq_id); } return pm_res; } Loading Loading @@ -532,7 +532,8 @@ static bool sink_socket_read(struct bus *b, BUS_LOG(b, 3, LOG_LISTENER, "calling unpack CB", b->udata); bus_unpack_cb_res_t ures = b->unpack_cb(sres.full_msg_buffer, ci->udata); BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64, "process_unpacked_message: ok? %d, seq_id %lld", ures.ok, ures.u.success.seq_id); "process_unpacked_message: ok? %d, seq_id %lld", ures.ok, (long long)ures.u.success.seq_id); process_unpacked_message(l, ci, ures); } Loading Loading @@ -565,7 +566,7 @@ static rx_info_t *find_info_by_sequence_id(listener *l, case RIS_HOLD: BUS_LOG_SNPRINTF(b, 4, LOG_MEMORY, b->udata, 128, "find_info_by_sequence_id: info (%p) at +%d: fd %d, seq_id %lld", (void*)info, info->id, fd, seq_id); (void*)info, info->id, fd, (long long)seq_id); if (info->u.hold.fd == fd && info->u.hold.seq_id == seq_id) { return info; } Loading @@ -591,7 +592,7 @@ static rx_info_t *find_info_by_sequence_id(listener *l, if (b->log_level > 5 || 1) { BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 64, "==== Could not find <fd: %d, seq_id: %lld>, dumping table ====\n", fd, seq_id); fd, (long long)seq_id); dump_rx_info_table(l); } /* Not found. Probably an unsolicited status message. */ Loading Loading @@ -621,7 +622,7 @@ static void process_unpacked_message(listener *l, { BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 128, "marking info %d, seq_id %lld ready for delivery", info->id, result.u.success.seq_id); info->id, (long long)result.u.success.seq_id); info->u.expect.error = RX_ERROR_READY_FOR_DELIVERY; assert(!info->u.hold.has_result); info->u.expect.has_result = true; Loading Loading @@ -735,13 +736,13 @@ static void dump_rx_info_table(listener *l) { switch (l->rx_info[i].state) { case RIS_HOLD: printf(", fd %d, seq_id %lld, has_result? %d\n", info->u.hold.fd, info->u.hold.seq_id, info->u.hold.has_result); info->u.hold.fd, (long long)info->u.hold.seq_id, info->u.hold.has_result); break; case RIS_EXPECT: { struct boxed_msg *box = info->u.expect.box; printf(", box %p (fd:%d, seq_id:%lld), error %d, has_result? %d\n", (void *)box, box ? box->fd : -1, box ? box->out_seq_id : -1, (void *)box, box ? box->fd : -1, box ? (long long)box->out_seq_id : -1, info->u.expect.error, info->u.expect.has_result); break; } Loading @@ -768,7 +769,7 @@ static void retry_delivery(listener *l, rx_info_t *info) { if (bus_process_boxed_message(l->bus, box, &backpressure)) { BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "successfully delivered box %p (seq_id %lld) from info %d at line %d (retry)", (void*)box, box->out_seq_id, info->id, __LINE__); (void*)box, (long long)box->out_seq_id, info->id, __LINE__); info->u.expect.error = RX_ERROR_DONE; release_rx_info(l, info); } else { Loading Loading @@ -1147,7 +1148,7 @@ static void hold_response(listener *l, int fd, int64_t seq_id, int16_t timeout_s assert(info->state == RIS_INACTIVE); BUS_LOG_SNPRINTF(b, 5, LOG_LISTENER, b->udata, 128, "setting info %p(+%d) to hold response <fd:%d, seq_id:%lld>", (void *)info, info->id, fd, seq_id); (void *)info, info->id, fd, (long long)seq_id); info->state = RIS_HOLD; info->timeout_sec = timeout_sec; Loading Loading @@ -1205,7 +1206,7 @@ static void attempt_delivery(listener *l, struct rx_info_t *info) { /* success */ BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 256, "successfully delivered box %p (seq_id %lld), marking info %d as DONE", (void*)box, box->out_seq_id, info->id); (void*)box, (long long)box->out_seq_id, info->id); info->u.expect.error = RX_ERROR_DONE; BUS_LOG_SNPRINTF(b, 4, LOG_LISTENER, b->udata, 128, "initial clean-up attempt for completed RX event at info +%d", info->id); Loading @@ -1224,7 +1225,7 @@ static void expect_response(listener *l, struct boxed_msg *box) { struct bus *b = l->bus; BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 128, "notifying to expect response <box:%p, fd:%d, seq_id:%lld>", (void *)box, box->fd, box->out_seq_id); (void *)box, box->fd, (long long)box->out_seq_id); /* If there's a pending HOLD message, convert it. */ rx_info_t *info = get_hold_rx_info(l, box->fd, box->out_seq_id); Loading @@ -1235,7 +1236,7 @@ static void expect_response(listener *l, struct boxed_msg *box) { BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 256, "converting HOLD to EXPECT for info %d with result, attempting delivery <box:%p, fd:%d, seq_id:%lld>", info->id, (void *)box, info->u.hold.fd, info->u.hold.seq_id); info->id, (void *)box, info->u.hold.fd, (long long)info->u.hold.seq_id); info->state = RIS_EXPECT; info->u.expect.error = RX_ERROR_READY_FOR_DELIVERY; Loading @@ -1246,7 +1247,7 @@ static void expect_response(listener *l, struct boxed_msg *box) { } else { BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 256, "converting HOLD to EXPECT info %d, attempting delivery <box:%p, fd:%d, seq_id:%lld>", info->id, (void *)box, info->u.hold.fd, info->u.hold.seq_id); info->id, (void *)box, info->u.hold.fd, (long long)info->u.hold.seq_id); info->state = RIS_EXPECT; info->u.expect.box = box; info->u.expect.error = RX_ERROR_NONE; Loading