Loading Gemfile.lock +1 −0 Original line number Diff line number Diff line Loading @@ -3,6 +3,7 @@ GEM specs: beefcake (0.5.0) ceedling (0.15.6) ceedling (0.16.0) rake (>= 0.8.7) thor (>= 0.14.5) diff-lcs (1.2.5) Loading src/lib/bus/listener.c +25 −25 Original line number Diff line number Diff line Loading @@ -150,7 +150,7 @@ bool listener_expect_response(struct listener *l, boxed_msg *box, if (msg == NULL) { return false; } BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "listener_expect_response with box of %p", box); "listener_expect_response with box of %p", (void*)box); msg->type = MSG_EXPECT_RESPONSE; msg->u.expect.box = box; Loading Loading @@ -486,7 +486,7 @@ static rx_info_t *find_info_by_sequence_id(listener *l, struct bus *b = l->bus; BUS_LOG_SNPRINTF(b, 4, LOG_MEMORY, b->udata, 128, "find_info_by_sequence_id: info (%p) at +%d [s %d]: box is %p", info, info->id, info->error, box); (void*)info, info->id, info->error, (void*)box); if (box == NULL) { continue; Loading Loading @@ -517,7 +517,7 @@ static void process_unpacked_message(listener *l, info->error = RX_ERROR_READY_FOR_DELIVERY; BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "releasing box %p at line %d", box, __LINE__); "releasing box %p at line %d", (void*)box, __LINE__); info->box = NULL; /* release */ bus_msg_result_t *result = &box->result; result->status = BUS_SEND_SUCCESS; Loading @@ -526,12 +526,12 @@ static void process_unpacked_message(listener *l, if (bus_process_boxed_message(b, box, &backpressure)) { /* success */ BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "successfully delivered box %p, marking info %p as DONE", box, info); "successfully delivered box %p, marking info %p as DONE", (void*)box, (void*)info); info->error = RX_ERROR_DONE; assert(info->box == NULL); } else { BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "returning box %p at line %d", box, __LINE__); "returning box %p at line %d", (void*)box, __LINE__); info->box = box; /* retry in tick_handler */ } Loading Loading @@ -570,7 +570,7 @@ static void tick_handler(listener *l) { BUS_LOG_SNPRINTF(b, 1, LOG_LISTENER, b->udata, 128, "tick... %p: %d of %d msgs in use, %d of %d rx_info in use, %d tracked_fds", l, l->msgs_in_use, MAX_QUEUE_MESSAGES, (void*)l, l->msgs_in_use, MAX_QUEUE_MESSAGES, l->rx_info_in_use, MAX_PENDING_MESSAGES, l->tracked_fds); if (b->log_level > 5 || 0) { /* dump rx_info table */ Loading @@ -578,7 +578,7 @@ static void tick_handler(listener *l) { printf(" -- in_use: %d, info[%d]: timeout %ld, error %d, box %p\n", l->rx_info[i].active, l->rx_info[i].id, l->rx_info[i].timeout_sec, l->rx_info[i].error, l->rx_info[i].box); l->rx_info[i].error, (void*)l->rx_info[i].box); } } Loading @@ -593,20 +593,20 @@ static void tick_handler(listener *l) { retry_delivery(l, info); } else if (info->error == RX_ERROR_DONE) { BUS_LOG_SNPRINTF(b, 4, LOG_LISTENER, b->udata, 64, "cleaning up completed RX event at info %p", info); "cleaning up completed RX event at info %p", (void*)info); clean_up_completed_info(l, info); } else if (info->error != RX_ERROR_NONE) { BUS_LOG_SNPRINTF(b, 1, LOG_LISTENER, b->udata, 64, "notifying of rx failure -- error %d (info %p)", info->error, info); "notifying of rx failure -- error %d (info %p)", info->error, (void*)info); notify_message_failure(l, info, BUS_SEND_RX_FAILURE); } else if (info->timeout_sec == 1) { BUS_LOG_SNPRINTF(b, 2, LOG_LISTENER, b->udata, 64, "notifying of rx failure -- timeout (info %p)", info); "notifying of rx failure -- timeout (info %p)", (void*)info); notify_message_failure(l, info, BUS_SEND_RX_TIMEOUT); } else { BUS_LOG_SNPRINTF(b, 2, LOG_LISTENER, b->udata, 64, "decrementing countdown on info %p [%u]: %ld", info, info->id, info->timeout_sec - 1); (void*)info, info->id, info->timeout_sec - 1); info->timeout_sec--; } } Loading @@ -621,18 +621,18 @@ static void retry_delivery(listener *l, rx_info_t *info) { assert(info->box->result.status == BUS_SEND_SUCCESS); struct boxed_msg *box = info->box; BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "releasing box %p at line %d", box, __LINE__); "releasing box %p at line %d", (void*)box, __LINE__); info->box = NULL; /* release */ size_t backpressure = 0; if (bus_process_boxed_message(l->bus, box, &backpressure)) { BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "successfully delivered box %p from info %p at line %d (retry)", box, info, __LINE__); (void*)box, (void*)info, __LINE__); info->error = RX_ERROR_DONE; release_rx_info(l, info); } else { BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "returning box %p at line %d", box, __LINE__); "returning box %p at line %d", (void*)box, __LINE__); info->box = box; /* retry in tick_handler */ } Loading @@ -643,18 +643,18 @@ static void clean_up_completed_info(listener *l, rx_info_t *info) { assert(info->error == RX_ERROR_DONE); struct bus *b = l->bus; BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "info %p, box is %p at line %d", info, info->box, __LINE__); "info %p, box is %p at line %d", (void*)info, (void*)info->box, __LINE__); size_t backpressure = 0; if (info->box) { if (info->box->result.status != BUS_SEND_SUCCESS) { printf("*** info %d: info->timeout %ld\n", info->id, info->timeout_sec); printf(" info->error %d\n", info->error); printf(" info->box == %p\n", info->box); printf(" info->box == %p\n", (void*)info->box); printf(" info->box->result.status == %d\n", info->box->result.status); printf(" info->box->fd %d\n", info->box->fd); printf(" info->box->out_seq_id %lld\n", (long long)info->box->out_seq_id); printf(" info->box->out_msg %p\n", info->box->out_msg); printf(" info->box->out_msg %p\n", (void*)info->box->out_msg); } assert(info->box == NULL); Loading @@ -662,13 +662,13 @@ static void clean_up_completed_info(listener *l, rx_info_t *info) { assert(info->box->result.status == BUS_SEND_SUCCESS); struct boxed_msg *box = info->box; BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "releasing box %p at line %d", box, __LINE__); "releasing box %p at line %d", (void*)box, __LINE__); info->box = NULL; /* release */ if (bus_process_boxed_message(l->bus, box, &backpressure)) { release_rx_info(l, info); } else { BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "returning box %p at line %d", box, __LINE__); "returning box %p at line %d", (void*)box, __LINE__); info->box = box; /* retry in tick_handler */ } } else { /* already processed, just release it */ Loading @@ -687,12 +687,12 @@ static void notify_message_failure(listener *l, boxed_msg *box = info->box; BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "releasing box %p at line %d", box, __LINE__); "releasing box %p at line %d", (void*)box, __LINE__); info->box = NULL; if (bus_process_boxed_message(l->bus, box, &backpressure)) { BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "delivered box %p with failure message %d at line %d (info %p)", box, status, __LINE__, info); (void*)box, status, __LINE__, (void*)info); info->error = RX_ERROR_DONE; release_rx_info(l, info); } else { Loading Loading @@ -789,12 +789,12 @@ static bool push_message(struct listener *l, listener_msg *msg) { if (casq_push(l->q, msg)) { BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 128, "Pushed message -- %p -- of type %d", msg, msg->type); "Pushed message -- %p -- of type %d", (void*)msg, msg->type); /* TODO: write a wake message to a pipe? */ return true; } else { BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 128, "Failed to pushed message -- %p", msg); "Failed to pushed message -- %p", (void*)msg); return false; } } Loading @@ -802,7 +802,7 @@ static bool push_message(struct listener *l, listener_msg *msg) { static void msg_handler(listener *l, listener_msg *pmsg) { struct bus *b = l->bus; BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 128, "Handling message -- %p", pmsg); "Handling message -- %p", (void*)pmsg); listener_msg msg = *pmsg; release_msg(l, pmsg); Loading Loading @@ -942,7 +942,7 @@ static void expect_response(listener *l, boxed_msg *box) { assert(info); BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "Setting info %p (+%d)'s box to %p", info, info->id, box); (void*)info, info->id, (void*)box); assert(info->box == NULL); info->box = box; info->timeout_sec = RESPONSE_TIMEOUT; Loading src/lib/bus/sender.c +3 −3 Original line number Diff line number Diff line Loading @@ -587,7 +587,7 @@ static void tick_handler(sender *s) { struct bus *b = s->bus; BUS_LOG_SNPRINTF(b, 1, LOG_SENDER, b->udata, 64, "tick... %p -- %d of %d tx_info in use, %d active FDs\n", s, s->tx_info_in_use, MAX_CONCURRENT_SENDS, s->active_fds); (void*)s, s->tx_info_in_use, MAX_CONCURRENT_SENDS, s->active_fds); /* Walk table and expire timeouts & items which have been sent. * Loading @@ -613,7 +613,7 @@ static void tick_handler(sender *s) { info->timeout_sec--; BUS_LOG_SNPRINTF(b, 5, LOG_SENDER, b->udata, 64, "decrementing tx_info %d -- %ld (status %d, box %p)", info->id, info->timeout_sec, info->error, info->box); info->id, info->timeout_sec, info->error, (void*)info->box); } } } Loading @@ -624,7 +624,7 @@ static void attempt_to_enqueue_message_to_listener(sender *s, tx_info_t *info) { /* Notify listener that it should expect a response to a * successfully sent message. */ BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 128, "telling listener to expect response, with box %p", info->box); "telling listener to expect response, with box %p", (void*)info->box); struct listener *l = bus_get_listener_for_socket(s->bus, info->fd); Loading Loading
Gemfile.lock +1 −0 Original line number Diff line number Diff line Loading @@ -3,6 +3,7 @@ GEM specs: beefcake (0.5.0) ceedling (0.15.6) ceedling (0.16.0) rake (>= 0.8.7) thor (>= 0.14.5) diff-lcs (1.2.5) Loading
src/lib/bus/listener.c +25 −25 Original line number Diff line number Diff line Loading @@ -150,7 +150,7 @@ bool listener_expect_response(struct listener *l, boxed_msg *box, if (msg == NULL) { return false; } BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "listener_expect_response with box of %p", box); "listener_expect_response with box of %p", (void*)box); msg->type = MSG_EXPECT_RESPONSE; msg->u.expect.box = box; Loading Loading @@ -486,7 +486,7 @@ static rx_info_t *find_info_by_sequence_id(listener *l, struct bus *b = l->bus; BUS_LOG_SNPRINTF(b, 4, LOG_MEMORY, b->udata, 128, "find_info_by_sequence_id: info (%p) at +%d [s %d]: box is %p", info, info->id, info->error, box); (void*)info, info->id, info->error, (void*)box); if (box == NULL) { continue; Loading Loading @@ -517,7 +517,7 @@ static void process_unpacked_message(listener *l, info->error = RX_ERROR_READY_FOR_DELIVERY; BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "releasing box %p at line %d", box, __LINE__); "releasing box %p at line %d", (void*)box, __LINE__); info->box = NULL; /* release */ bus_msg_result_t *result = &box->result; result->status = BUS_SEND_SUCCESS; Loading @@ -526,12 +526,12 @@ static void process_unpacked_message(listener *l, if (bus_process_boxed_message(b, box, &backpressure)) { /* success */ BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "successfully delivered box %p, marking info %p as DONE", box, info); "successfully delivered box %p, marking info %p as DONE", (void*)box, (void*)info); info->error = RX_ERROR_DONE; assert(info->box == NULL); } else { BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "returning box %p at line %d", box, __LINE__); "returning box %p at line %d", (void*)box, __LINE__); info->box = box; /* retry in tick_handler */ } Loading Loading @@ -570,7 +570,7 @@ static void tick_handler(listener *l) { BUS_LOG_SNPRINTF(b, 1, LOG_LISTENER, b->udata, 128, "tick... %p: %d of %d msgs in use, %d of %d rx_info in use, %d tracked_fds", l, l->msgs_in_use, MAX_QUEUE_MESSAGES, (void*)l, l->msgs_in_use, MAX_QUEUE_MESSAGES, l->rx_info_in_use, MAX_PENDING_MESSAGES, l->tracked_fds); if (b->log_level > 5 || 0) { /* dump rx_info table */ Loading @@ -578,7 +578,7 @@ static void tick_handler(listener *l) { printf(" -- in_use: %d, info[%d]: timeout %ld, error %d, box %p\n", l->rx_info[i].active, l->rx_info[i].id, l->rx_info[i].timeout_sec, l->rx_info[i].error, l->rx_info[i].box); l->rx_info[i].error, (void*)l->rx_info[i].box); } } Loading @@ -593,20 +593,20 @@ static void tick_handler(listener *l) { retry_delivery(l, info); } else if (info->error == RX_ERROR_DONE) { BUS_LOG_SNPRINTF(b, 4, LOG_LISTENER, b->udata, 64, "cleaning up completed RX event at info %p", info); "cleaning up completed RX event at info %p", (void*)info); clean_up_completed_info(l, info); } else if (info->error != RX_ERROR_NONE) { BUS_LOG_SNPRINTF(b, 1, LOG_LISTENER, b->udata, 64, "notifying of rx failure -- error %d (info %p)", info->error, info); "notifying of rx failure -- error %d (info %p)", info->error, (void*)info); notify_message_failure(l, info, BUS_SEND_RX_FAILURE); } else if (info->timeout_sec == 1) { BUS_LOG_SNPRINTF(b, 2, LOG_LISTENER, b->udata, 64, "notifying of rx failure -- timeout (info %p)", info); "notifying of rx failure -- timeout (info %p)", (void*)info); notify_message_failure(l, info, BUS_SEND_RX_TIMEOUT); } else { BUS_LOG_SNPRINTF(b, 2, LOG_LISTENER, b->udata, 64, "decrementing countdown on info %p [%u]: %ld", info, info->id, info->timeout_sec - 1); (void*)info, info->id, info->timeout_sec - 1); info->timeout_sec--; } } Loading @@ -621,18 +621,18 @@ static void retry_delivery(listener *l, rx_info_t *info) { assert(info->box->result.status == BUS_SEND_SUCCESS); struct boxed_msg *box = info->box; BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "releasing box %p at line %d", box, __LINE__); "releasing box %p at line %d", (void*)box, __LINE__); info->box = NULL; /* release */ size_t backpressure = 0; if (bus_process_boxed_message(l->bus, box, &backpressure)) { BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "successfully delivered box %p from info %p at line %d (retry)", box, info, __LINE__); (void*)box, (void*)info, __LINE__); info->error = RX_ERROR_DONE; release_rx_info(l, info); } else { BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "returning box %p at line %d", box, __LINE__); "returning box %p at line %d", (void*)box, __LINE__); info->box = box; /* retry in tick_handler */ } Loading @@ -643,18 +643,18 @@ static void clean_up_completed_info(listener *l, rx_info_t *info) { assert(info->error == RX_ERROR_DONE); struct bus *b = l->bus; BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "info %p, box is %p at line %d", info, info->box, __LINE__); "info %p, box is %p at line %d", (void*)info, (void*)info->box, __LINE__); size_t backpressure = 0; if (info->box) { if (info->box->result.status != BUS_SEND_SUCCESS) { printf("*** info %d: info->timeout %ld\n", info->id, info->timeout_sec); printf(" info->error %d\n", info->error); printf(" info->box == %p\n", info->box); printf(" info->box == %p\n", (void*)info->box); printf(" info->box->result.status == %d\n", info->box->result.status); printf(" info->box->fd %d\n", info->box->fd); printf(" info->box->out_seq_id %lld\n", (long long)info->box->out_seq_id); printf(" info->box->out_msg %p\n", info->box->out_msg); printf(" info->box->out_msg %p\n", (void*)info->box->out_msg); } assert(info->box == NULL); Loading @@ -662,13 +662,13 @@ static void clean_up_completed_info(listener *l, rx_info_t *info) { assert(info->box->result.status == BUS_SEND_SUCCESS); struct boxed_msg *box = info->box; BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "releasing box %p at line %d", box, __LINE__); "releasing box %p at line %d", (void*)box, __LINE__); info->box = NULL; /* release */ if (bus_process_boxed_message(l->bus, box, &backpressure)) { release_rx_info(l, info); } else { BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "returning box %p at line %d", box, __LINE__); "returning box %p at line %d", (void*)box, __LINE__); info->box = box; /* retry in tick_handler */ } } else { /* already processed, just release it */ Loading @@ -687,12 +687,12 @@ static void notify_message_failure(listener *l, boxed_msg *box = info->box; BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "releasing box %p at line %d", box, __LINE__); "releasing box %p at line %d", (void*)box, __LINE__); info->box = NULL; if (bus_process_boxed_message(l->bus, box, &backpressure)) { BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "delivered box %p with failure message %d at line %d (info %p)", box, status, __LINE__, info); (void*)box, status, __LINE__, (void*)info); info->error = RX_ERROR_DONE; release_rx_info(l, info); } else { Loading Loading @@ -789,12 +789,12 @@ static bool push_message(struct listener *l, listener_msg *msg) { if (casq_push(l->q, msg)) { BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 128, "Pushed message -- %p -- of type %d", msg, msg->type); "Pushed message -- %p -- of type %d", (void*)msg, msg->type); /* TODO: write a wake message to a pipe? */ return true; } else { BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 128, "Failed to pushed message -- %p", msg); "Failed to pushed message -- %p", (void*)msg); return false; } } Loading @@ -802,7 +802,7 @@ static bool push_message(struct listener *l, listener_msg *msg) { static void msg_handler(listener *l, listener_msg *pmsg) { struct bus *b = l->bus; BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 128, "Handling message -- %p", pmsg); "Handling message -- %p", (void*)pmsg); listener_msg msg = *pmsg; release_msg(l, pmsg); Loading Loading @@ -942,7 +942,7 @@ static void expect_response(listener *l, boxed_msg *box) { assert(info); BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "Setting info %p (+%d)'s box to %p", info, info->id, box); (void*)info, info->id, (void*)box); assert(info->box == NULL); info->box = box; info->timeout_sec = RESPONSE_TIMEOUT; Loading
src/lib/bus/sender.c +3 −3 Original line number Diff line number Diff line Loading @@ -587,7 +587,7 @@ static void tick_handler(sender *s) { struct bus *b = s->bus; BUS_LOG_SNPRINTF(b, 1, LOG_SENDER, b->udata, 64, "tick... %p -- %d of %d tx_info in use, %d active FDs\n", s, s->tx_info_in_use, MAX_CONCURRENT_SENDS, s->active_fds); (void*)s, s->tx_info_in_use, MAX_CONCURRENT_SENDS, s->active_fds); /* Walk table and expire timeouts & items which have been sent. * Loading @@ -613,7 +613,7 @@ static void tick_handler(sender *s) { info->timeout_sec--; BUS_LOG_SNPRINTF(b, 5, LOG_SENDER, b->udata, 64, "decrementing tx_info %d -- %ld (status %d, box %p)", info->id, info->timeout_sec, info->error, info->box); info->id, info->timeout_sec, info->error, (void*)info->box); } } } Loading @@ -624,7 +624,7 @@ static void attempt_to_enqueue_message_to_listener(sender *s, tx_info_t *info) { /* Notify listener that it should expect a response to a * successfully sent message. */ BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 128, "telling listener to expect response, with box %p", info->box); "telling listener to expect response, with box %p", (void*)info->box); struct listener *l = bus_get_listener_for_socket(s->bus, info->fd); Loading