Loading src/lib/bus/listener_cmd.c +55 −9 Original line number Diff line number Diff line Loading @@ -144,6 +144,20 @@ static void msg_handler(listener *l, listener_msg *pmsg) { ListenerTask_ReleaseMsg(l, pmsg); } /* Swap poll and connection info for tracked sockets, by array offset. */ static void swap(listener *l, int a, int b) { struct pollfd a_pfd = l->fds[a + INCOMING_MSG_PIPE]; struct pollfd b_pfd = l->fds[b + INCOMING_MSG_PIPE]; connection_info *a_ci = l->fd_info[a]; connection_info *b_ci = l->fd_info[b]; l->fds[b + INCOMING_MSG_PIPE] = a_pfd; l->fds[a + INCOMING_MSG_PIPE] = b_pfd; l->fd_info[a] = b_ci; l->fd_info[b] = a_ci; } static void add_socket(listener *l, connection_info *ci, int notify_fd) { /* TODO: if epoll, just register with the OS. */ struct bus *b = l->bus; Loading @@ -167,8 +181,25 @@ static void add_socket(listener *l, connection_info *ci, int notify_fd) { l->fd_info[id] = ci; l->fds[id + INCOMING_MSG_PIPE].fd = ci->fd; l->fds[id + INCOMING_MSG_PIPE].events = POLLIN; /* If there are any inactive FDs, we need to swap the new last FD * and the first inactive FD so that the active and inactive FDs * remain contiguous. */ if (l->inactive_fds > 0) { int first_inactive = l->tracked_fds - l->inactive_fds; swap(l, id, first_inactive); } l->tracked_fds++; for (int i = 0; i < l->tracked_fds; i++) { if (l->fds[i + INCOMING_MSG_PIPE].events & POLLIN) { assert(i < l->tracked_fds - l->inactive_fds); } else { assert(i >= l->tracked_fds - l->inactive_fds); } } /* Prime the pump by sinking 0 bytes and getting a size to expect. */ bus_sink_cb_res_t sink_res = b->sink_cb(l->read_buf, 0, ci->udata); BUS_ASSERT(b, b->udata, sink_res.full_msg_buffer == NULL); // should have nothing to handle yet Loading @@ -191,18 +222,33 @@ static void remove_socket(listener *l, int fd, int notify_fd) { /* Don't really close it, just drop info about it in the listener. * The client thread will actually free the structure, close SSL, etc. */ for (int i = 0; i < l->tracked_fds; i++) { if (l->fds[i + INCOMING_MSG_PIPE].fd == fd) { for (int id = 0; id < l->tracked_fds; id++) { struct pollfd removing_pfd = l->fds[id + INCOMING_MSG_PIPE]; if (removing_pfd.fd == fd) { bool is_active = (removing_pfd.events & POLLIN) > 0; if (l->tracked_fds > 1) { /* Swap pollfd CI and last ones. */ struct pollfd pfd = l->fds[i + INCOMING_MSG_PIPE]; l->fds[i + INCOMING_MSG_PIPE] = l->fds[l->tracked_fds - 1 + INCOMING_MSG_PIPE]; l->fds[l->tracked_fds - 1 + INCOMING_MSG_PIPE] = pfd; connection_info *ci = l->fd_info[i]; l->fd_info[i] = l->fd_info[l->tracked_fds - 1]; l->fd_info[l->tracked_fds - 1] = ci; int last_active = l->tracked_fds - l->inactive_fds - 1; /* If removing active node and it isn't the last active one, swap them */ if (is_active && id != last_active) { assert(id < last_active); swap(l, id, last_active); id = last_active; } /* If node (which is either last active node or inactive) is not at the end, * and there are inactive nodes, swap it with the last.*/ int last = l->tracked_fds - 1; if (id < last) { swap(l, id, last); id = last; } /* The node is now at the end of the array. */ } l->tracked_fds--; if (!is_active) { l->inactive_fds--; } } } /* CI will be freed by the client thread. */ Loading src/lib/bus/listener_internal_types.h +18 −2 Original line number Diff line number Diff line Loading @@ -160,11 +160,27 @@ typedef struct listener { size_t upstream_backpressure; uint16_t tracked_fds; /* tracked_fds + incoming_msg_pipe */ uint16_t tracked_fds; ///< FDs currently tracked by listener /** File descriptors that are inactive due to errors, but have not * yet been explicitly removed/closed by the client. */ uint16_t inactive_fds; /** Tracked file descriptors, for polling. * * fds[INCOMING_MSG_PIPE_ID (0)] is the incoming_msg_pipe, so the * listener's poll is awakened by incoming commands. fds[1] through * fds[l->tracked_fds - l->inactive_fds] are the file descriptors * which should be polled, and the remaining ones (if any) have been * moved to the end so poll() will not touch them. */ struct pollfd fds[MAX_FDS + 1]; /** The connection info, corresponding to the the file descriptors tracked in * l->fds. Unlike l->fds, these are not offset by one for the incoming message * pipe, i.e. l->fd_info[3] correspons to l->fds[3 + INCOMING_MSG_PIPE]. */ connection_info *fd_info[MAX_FDS]; bool error_occured; ///< Flag indicating post-poll handling is necessary. /* Read buffer and it's size. Will be grown on demand. */ size_t read_buf_size; uint8_t *read_buf; Loading src/lib/bus/listener_io.c +42 −2 Original line number Diff line number Diff line Loading @@ -39,6 +39,7 @@ static void set_error_for_socket(listener *l, int id, int fd, rx_error_t err); static void process_unpacked_message(listener *l, connection_info *ci, bus_unpack_cb_res_t result); static void move_errored_active_sockets_to_end(listener *l); void ListenerIO_AttemptRecv(listener *l, int available) { /* --> failure --> set 'closed' error on socket, don't die */ Loading Loading @@ -103,6 +104,13 @@ void ListenerIO_AttemptRecv(listener *l, int available) { set_error_for_socket(l, i, ci->fd, RX_ERROR_POLLHUP); } } if (l->error_occured) { // only conditionally do this to avoid wasting CPU /* This is done outside of the polling loop, to avoid erroneously repeat-polling * or skipping any individual file descriptors. */ move_errored_active_sockets_to_end(l); l->error_occured = false; } } static ssize_t socket_read_plain(struct bus *b, listener *l, int pfd_i, connection_info *ci) { Loading Loading @@ -255,6 +263,8 @@ static bool sink_socket_read(struct bus *b, } static void set_error_for_socket(listener *l, int id, int fd, rx_error_t err) { l->error_occured = true; /* Mark all pending messages on this socket as being failed due to error. */ struct bus *b = l->bus; BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64, Loading Loading @@ -288,7 +298,37 @@ static void set_error_for_socket(listener *l, int id, int fd, rx_error_t err) { } } } l->fds[id + INCOMING_MSG_PIPE].events &= ~POLLIN; connection_info *newly_inactive_ci = l->fd_info[id]; newly_inactive_ci->error = err; } static void move_errored_active_sockets_to_end(listener *l) { for (uint16_t id = 0; id < l->tracked_fds - l->inactive_fds; id++) { connection_info *ci = l->fd_info[id]; struct pollfd *pfd = &l->fds[id + INCOMING_MSG_PIPE]; int fd = pfd->fd; if (ci->error < 0 && pfd->events & POLLIN) { pfd->events &= ~POLLIN; /* move socket to end, so it won't be poll'd and get repeated POLLHUP. */ int last_active = l->tracked_fds - l->inactive_fds - 1; if (id != last_active) { fprintf(stderr, "swapping %u and %u\n", id, last_active); assert(l->fds[last_active + INCOMING_MSG_PIPE].fd != fd); struct pollfd newly_inactive_fd = l->fds[id + INCOMING_MSG_PIPE]; struct pollfd last_active_fd = l->fds[last_active + INCOMING_MSG_PIPE]; connection_info *last_active_ci = l->fd_info[last_active]; /* Swap pollfds */ l->fds[id + INCOMING_MSG_PIPE] = last_active_fd; l->fds[last_active + INCOMING_MSG_PIPE] = newly_inactive_fd; /* Swap connection_info pointers */ l->fd_info[last_active] = ci; l->fd_info[id] = last_active_ci; } l->inactive_fds++; assert(l->inactive_fds <= l->tracked_fds); } } } static void process_unpacked_message(listener *l, Loading src/lib/bus/listener_task.c +3 −2 Original line number Diff line number Diff line Loading @@ -32,7 +32,6 @@ struct timeval now; struct timeval cur; size_t backpressure = 0; int poll_res = 0; #define WHILE if #else #define WHILE while Loading Loading @@ -76,7 +75,9 @@ void *ListenerTask_MainLoop(void *arg) { int poll_res = 0; #endif poll_res = syscall_poll(self->fds, self->tracked_fds + INCOMING_MSG_PIPE, delay); int to_poll = self->tracked_fds - self->inactive_fds + INCOMING_MSG_PIPE; poll_res = syscall_poll(self->fds, to_poll, delay); BUS_LOG_SNPRINTF(b, (poll_res == 0 ? 6 : 4), LOG_LISTENER, b->udata, 64, "poll res %d", poll_res); Loading test/unit/test_bus_listener_cmd.c +245 −15 Original line number Diff line number Diff line Loading @@ -62,12 +62,14 @@ void setUp(void) { memset(&Listener, 0, sizeof(Listener)); l = &Listener; l->bus = &B; l->tracked_fds = 0; l->inactive_fds = 0; box = &Box; } void tearDown(void) {} void test_listener_ListenerCmd_NotifyCaller_should_write_tag_to_caller_fd(void) { void test_ListenerCmd_NotifyCaller_should_write_tag_to_caller_fd(void) { int fd = 5; ListenerTask_GetBackpressure_ExpectAndReturn(l, 0); syscall_write_ExpectAndReturn(fd, reply_buf, sizeof(reply_buf), 3); Loading @@ -77,7 +79,7 @@ void test_listener_ListenerCmd_NotifyCaller_should_write_tag_to_caller_fd(void) TEST_ASSERT_EQUAL(0, reply_buf[2]); } void test_listener_ListenerCmd_NotifyCaller_should_write_backpressure_to_reply_buffer(void) { void test_ListenerCmd_NotifyCaller_should_write_backpressure_to_reply_buffer(void) { int fd = 5; ListenerTask_GetBackpressure_ExpectAndReturn(l, 0x1234); syscall_write_ExpectAndReturn(fd, reply_buf, sizeof(reply_buf), 3); Loading @@ -86,7 +88,7 @@ void test_listener_ListenerCmd_NotifyCaller_should_write_backpressure_to_reply_b TEST_ASSERT_EQUAL(0x12, reply_buf[2]); } void test_listener_ListenerCmd_CheckIncomingMessages_should_return_on_error(void) { void test_ListenerCmd_CheckIncomingMessages_should_return_on_error(void) { int res = 1; l->fds[INCOMING_MSG_PIPE].revents = POLLHUP; ListenerCmd_CheckIncomingMessages(l, &res); Loading @@ -106,7 +108,7 @@ static void expect_notify_caller(listener *l, int fd) { syscall_write_ExpectAndReturn(fd, reply_buf, sizeof(reply_buf), 3); } void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_redundant_ADD_SOCKET_command(void) { void test_ListenerCmd_CheckIncomingMessages_should_handle_redundant_ADD_SOCKET_command(void) { l->fds[INCOMING_MSG_PIPE_ID].fd = 5; l->fds[INCOMING_MSG_PIPE_ID].revents = POLLIN; l->read_buf = malloc(256); Loading Loading @@ -156,7 +158,7 @@ static void setup_command(listener_msg *pmsg, rx_info_t *info) { } } void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_incoming_ADD_SOCKET_command(void) { void test_ListenerCmd_CheckIncomingMessages_should_handle_incoming_ADD_SOCKET_command(void) { connection_info *ci = calloc(1, sizeof(*ci)); *(int *)&ci->fd = 91; Loading @@ -172,6 +174,11 @@ void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_incoming_ADD_ int res = 1; l->tracked_fds = 3; for (int i = 0; i < l->tracked_fds; i++) { l->fds[i + INCOMING_MSG_PIPE].fd = i; l->fds[i + INCOMING_MSG_PIPE].events = POLLIN; } expect_notify_caller(l, 7); ListenerTask_ReleaseMsg_Expect(l, &l->msgs[0]); ListenerCmd_CheckIncomingMessages(l, &res); Loading @@ -183,7 +190,46 @@ void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_incoming_ADD_ TEST_ASSERT_EQUAL(4, l->tracked_fds); } void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_incoming_REMOVE_SOCKET_command(void) { void test_ListenerCmd_CheckIncomingMessages_should_handle_incoming_ADD_SOCKET_command_correctly_with_inactive_sockets(void) { connection_info *ci = calloc(1, sizeof(*ci)); *(int *)&ci->fd = 91; listener_msg msg = { .type = MSG_ADD_SOCKET, .u.add_socket = { .info = ci, .notify_fd = 7, }, }; setup_command(&msg, NULL); int res = 1; l->tracked_fds = 3; // [0 1 | 2] l->inactive_fds = 1; for (int i = 0; i < l->tracked_fds; i++) { l->fds[i + INCOMING_MSG_PIPE].fd = i; if (i < l->tracked_fds - l->inactive_fds) { l->fds[i + INCOMING_MSG_PIPE].events = POLLIN; } else { l->fds[i + INCOMING_MSG_PIPE].events = 0; } } expect_notify_caller(l, 7); ListenerTask_ReleaseMsg_Expect(l, &l->msgs[0]); ListenerCmd_CheckIncomingMessages(l, &res); TEST_ASSERT_EQUAL(4, l->tracked_fds); TEST_ASSERT_EQUAL(ci, l->fd_info[2]); TEST_ASSERT_EQUAL(ci->fd, l->fds[2 + INCOMING_MSG_PIPE].fd); TEST_ASSERT_EQUAL(31, ci->to_read_size); TEST_ASSERT_EQUAL(POLLIN, l->fds[2 + INCOMING_MSG_PIPE].events); TEST_ASSERT_EQUAL(0, l->fds[3 + INCOMING_MSG_PIPE].events); TEST_ASSERT_EQUAL(2, l->fds[3 + INCOMING_MSG_PIPE].fd); } void test_ListenerCmd_CheckIncomingMessages_should_handle_incoming_REMOVE_SOCKET_command_freeing_single_fd(void) { listener_msg msg = { .id = 4, .type = MSG_REMOVE_SOCKET, Loading @@ -196,19 +242,203 @@ void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_incoming_REMO setup_command(&msg, NULL); expect_notify_caller(l, 100); connection_info *ci = calloc(1, sizeof(*ci)); l->tracked_fds = 1; l->fds[0 + INCOMING_MSG_PIPE].fd = 50; l->fds[0 + INCOMING_MSG_PIPE].events = POLLIN; connection_info *ci0 = calloc(1, sizeof(*ci0)); l->fd_info[0] = ci0; int res = 1; ListenerTask_ReleaseMsg_Expect(l, &l->msgs[0]); ListenerCmd_CheckIncomingMessages(l, &res); TEST_ASSERT_EQUAL(0, l->tracked_fds); TEST_ASSERT_EQUAL(0, res); } void test_ListenerCmd_CheckIncomingMessages_should_handle_incoming_REMOVE_SOCKET_command_freeing_single_fd_when_inactive(void) { listener_msg msg = { .id = 4, .type = MSG_REMOVE_SOCKET, .pipes = {7, 8}, .u.remove_socket = { .fd = 50, .notify_fd = 100, }, }; setup_command(&msg, NULL); expect_notify_caller(l, 100); l->tracked_fds = 1; l->inactive_fds = 1; l->fds[0 + INCOMING_MSG_PIPE].fd = 50; l->fds[0 + INCOMING_MSG_PIPE].events = 0; // no POLLIN -> inactive connection_info *ci0 = calloc(1, sizeof(*ci0)); l->fd_info[0] = ci0; int res = 1; ListenerTask_ReleaseMsg_Expect(l, &l->msgs[0]); ListenerCmd_CheckIncomingMessages(l, &res); TEST_ASSERT_EQUAL(0, l->tracked_fds); TEST_ASSERT_EQUAL(0, l->inactive_fds); TEST_ASSERT_EQUAL(0, res); } void test_ListenerCmd_CheckIncomingMessages_should_handle_incoming_REMOVE_SOCKET_command_freeing_first_of_two(void) { listener_msg msg = { .id = 4, .type = MSG_REMOVE_SOCKET, .pipes = {7, 8}, .u.remove_socket = { .fd = 50, // free first fds .notify_fd = 100, }, }; setup_command(&msg, NULL); expect_notify_caller(l, 100); l->tracked_fds = 2; l->fds[0 + INCOMING_MSG_PIPE].fd = 50; l->fds[0 + INCOMING_MSG_PIPE].events = POLLIN; connection_info *ci0 = calloc(1, sizeof(*ci0)); l->fd_info[0] = ci0; l->fds[1 + INCOMING_MSG_PIPE].fd = 150; l->fds[1 + INCOMING_MSG_PIPE].events = POLLIN; connection_info *ci1 = calloc(1, sizeof(*ci1)); l->fd_info[1] = ci1; int res = 1; ListenerTask_ReleaseMsg_Expect(l, &l->msgs[0]); ListenerCmd_CheckIncomingMessages(l, &res); TEST_ASSERT_EQUAL(1, l->tracked_fds); TEST_ASSERT_EQUAL(0, res); TEST_ASSERT_EQUAL(150, l->fds[0 + INCOMING_MSG_PIPE].fd); TEST_ASSERT_EQUAL(POLLIN, l->fds[0 + INCOMING_MSG_PIPE].events); } void test_ListenerCmd_CheckIncomingMessages_should_handle_incoming_REMOVE_SOCKET_command_freeing_second_of_two(void) { listener_msg msg = { .id = 4, .type = MSG_REMOVE_SOCKET, .pipes = {7, 8}, .u.remove_socket = { .fd = 150, // free second fds .notify_fd = 100, }, }; setup_command(&msg, NULL); expect_notify_caller(l, 100); l->tracked_fds = 2; l->fds[1 + INCOMING_MSG_PIPE].fd = 50; l->fd_info[1] = ci; l->fds[0 + INCOMING_MSG_PIPE].fd = 50; l->fds[0 + INCOMING_MSG_PIPE].events = POLLIN; connection_info *ci0 = calloc(1, sizeof(*ci0)); l->fd_info[0] = ci0; l->fds[1 + INCOMING_MSG_PIPE].fd = 150; l->fds[1 + INCOMING_MSG_PIPE].events = POLLIN; connection_info *ci1 = calloc(1, sizeof(*ci1)); l->fd_info[1] = ci1; int res = 1; ListenerTask_ReleaseMsg_Expect(l, &l->msgs[0]); ListenerCmd_CheckIncomingMessages(l, &res); TEST_ASSERT_EQUAL(1, l->tracked_fds); TEST_ASSERT_EQUAL(0, res); TEST_ASSERT_EQUAL(50, l->fds[0 + INCOMING_MSG_PIPE].fd); TEST_ASSERT_EQUAL(POLLIN, l->fds[0 + INCOMING_MSG_PIPE].events); } static void handle_remove_with_active_and_inactive_mix(int tracked, int inactive, int remove_nth) { listener_msg msg = { .id = 4, .type = MSG_REMOVE_SOCKET, .pipes = {7, 8}, .u.remove_socket = { .fd = remove_nth, .notify_fd = 100, }, }; setup_command(&msg, NULL); expect_notify_caller(l, 100); l->tracked_fds = tracked; l->inactive_fds = inactive; TEST_ASSERT(inactive <= tracked); for (int i = 0; i < tracked; i++) { struct pollfd *pfd = &l->fds[i + INCOMING_MSG_PIPE]; pfd->fd = i; if (i < tracked - inactive) { pfd->events = POLLIN; } else { pfd->events = 0; } connection_info *ci = calloc(1, sizeof(*ci)); l->fd_info[i] = ci; *(int *)&ci->fd = i; // cast to set const field TEST_ASSERT(ci); } int res = 1; ListenerTask_ReleaseMsg_Expect(l, &l->msgs[0]); ListenerCmd_CheckIncomingMessages(l, &res); TEST_ASSERT_EQUAL(0, res); if (remove_nth != tracked - 1) { TEST_ASSERT_NOT_EQUAL(remove_nth, l->fds[remove_nth + INCOMING_MSG_PIPE].fd); TEST_ASSERT_NOT_EQUAL(remove_nth, l->fd_info[remove_nth]->fd); } // The removed one should now be last TEST_ASSERT_EQUAL(tracked - 1, l->tracked_fds); TEST_ASSERT_EQUAL(remove_nth, l->fds[tracked - 1 + INCOMING_MSG_PIPE].fd); bool removing_inactive = (remove_nth >= tracked - inactive); int last_active = tracked - inactive - (removing_inactive ? 0 : 1); int now_active = 0; int now_inactive = 0; for (int i = 0; i < tracked - 1; i++) { struct pollfd *pfd = &l->fds[i + INCOMING_MSG_PIPE]; if (pfd->events & POLLIN) { now_active++; // all active FDs must be contiguous at the beginning TEST_ASSERT(i < last_active); } else { now_inactive++; } } if (removing_inactive) { TEST_ASSERT_EQUAL(inactive - 1, now_inactive); TEST_ASSERT_EQUAL(tracked - inactive, now_active); } else { TEST_ASSERT_EQUAL(inactive, now_inactive); TEST_ASSERT_EQUAL(tracked - inactive - 1, now_active); } } /* Check invariants when deleting each socket R out of N sockets, * when M out of N are inactive. Try all cases up to N == limit. */ void test_remove_nth_parametric(void) { const int limit = 7; // cmock uses excessive memory if > than this for (int tracked = 0; tracked < limit; tracked++) { for (int inactive = 0; inactive <= tracked; inactive++) { for (int remove_nth = 0; remove_nth < tracked; remove_nth++) { handle_remove_with_active_and_inactive_mix(tracked, inactive, remove_nth); } } } } void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_NULL_info_failure_case(void) { void test_ListenerCmd_CheckIncomingMessages_should_handle_NULL_info_failure_case(void) { listener_msg msg = { .id = 1, .type = MSG_HOLD_RESPONSE, Loading @@ -230,7 +460,7 @@ void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_NULL_info_fai TEST_ASSERT_EQUAL(0, res); } void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_incoming_HOLD_RESPONSE_command(void) { void test_ListenerCmd_CheckIncomingMessages_should_handle_incoming_HOLD_RESPONSE_command(void) { listener_msg msg = { .id = 1, .type = MSG_HOLD_RESPONSE, Loading Loading @@ -260,7 +490,7 @@ void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_incoming_HOLD TEST_ASSERT_EQUAL(false, info.u.hold.has_result); } void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_incoming_EXPECT_command_when_result_is_saved(void) { void test_ListenerCmd_CheckIncomingMessages_should_handle_incoming_EXPECT_command_when_result_is_saved(void) { listener_msg msg = { .id = 1, .type = MSG_EXPECT_RESPONSE, Loading Loading @@ -304,7 +534,7 @@ void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_incoming_EXPE TEST_ASSERT_EQUAL(12345, hold_info.u.expect.result.u.success.seq_id); } void test_listener_ListenerCmd_CheckIncomingMessages_should_immediately_fail_incoming_EXPECT_command_when_corresponding_HOLD_has_an_error(void) { void test_ListenerCmd_CheckIncomingMessages_should_immediately_fail_incoming_EXPECT_command_when_corresponding_HOLD_has_an_error(void) { listener_msg msg = { .id = 1, .type = MSG_EXPECT_RESPONSE, Loading Loading @@ -347,7 +577,7 @@ void test_listener_ListenerCmd_CheckIncomingMessages_should_immediately_fail_inc } void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_incoming_EXPECT_command_when_no_result_is_saved(void) { void test_ListenerCmd_CheckIncomingMessages_should_handle_incoming_EXPECT_command_when_no_result_is_saved(void) { listener_msg msg = { .id = 1, .type = MSG_EXPECT_RESPONSE, Loading Loading @@ -378,7 +608,7 @@ void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_incoming_EXPE TEST_ASSERT_EQUAL(11, hold_info.timeout_sec); } void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_incoming_SHUTDOWN_command(void) { void test_ListenerCmd_CheckIncomingMessages_should_handle_incoming_SHUTDOWN_command(void) { listener_msg msg = { .id = 1, .type = MSG_SHUTDOWN, Loading Loading
src/lib/bus/listener_cmd.c +55 −9 Original line number Diff line number Diff line Loading @@ -144,6 +144,20 @@ static void msg_handler(listener *l, listener_msg *pmsg) { ListenerTask_ReleaseMsg(l, pmsg); } /* Swap poll and connection info for tracked sockets, by array offset. */ static void swap(listener *l, int a, int b) { struct pollfd a_pfd = l->fds[a + INCOMING_MSG_PIPE]; struct pollfd b_pfd = l->fds[b + INCOMING_MSG_PIPE]; connection_info *a_ci = l->fd_info[a]; connection_info *b_ci = l->fd_info[b]; l->fds[b + INCOMING_MSG_PIPE] = a_pfd; l->fds[a + INCOMING_MSG_PIPE] = b_pfd; l->fd_info[a] = b_ci; l->fd_info[b] = a_ci; } static void add_socket(listener *l, connection_info *ci, int notify_fd) { /* TODO: if epoll, just register with the OS. */ struct bus *b = l->bus; Loading @@ -167,8 +181,25 @@ static void add_socket(listener *l, connection_info *ci, int notify_fd) { l->fd_info[id] = ci; l->fds[id + INCOMING_MSG_PIPE].fd = ci->fd; l->fds[id + INCOMING_MSG_PIPE].events = POLLIN; /* If there are any inactive FDs, we need to swap the new last FD * and the first inactive FD so that the active and inactive FDs * remain contiguous. */ if (l->inactive_fds > 0) { int first_inactive = l->tracked_fds - l->inactive_fds; swap(l, id, first_inactive); } l->tracked_fds++; for (int i = 0; i < l->tracked_fds; i++) { if (l->fds[i + INCOMING_MSG_PIPE].events & POLLIN) { assert(i < l->tracked_fds - l->inactive_fds); } else { assert(i >= l->tracked_fds - l->inactive_fds); } } /* Prime the pump by sinking 0 bytes and getting a size to expect. */ bus_sink_cb_res_t sink_res = b->sink_cb(l->read_buf, 0, ci->udata); BUS_ASSERT(b, b->udata, sink_res.full_msg_buffer == NULL); // should have nothing to handle yet Loading @@ -191,18 +222,33 @@ static void remove_socket(listener *l, int fd, int notify_fd) { /* Don't really close it, just drop info about it in the listener. * The client thread will actually free the structure, close SSL, etc. */ for (int i = 0; i < l->tracked_fds; i++) { if (l->fds[i + INCOMING_MSG_PIPE].fd == fd) { for (int id = 0; id < l->tracked_fds; id++) { struct pollfd removing_pfd = l->fds[id + INCOMING_MSG_PIPE]; if (removing_pfd.fd == fd) { bool is_active = (removing_pfd.events & POLLIN) > 0; if (l->tracked_fds > 1) { /* Swap pollfd CI and last ones. */ struct pollfd pfd = l->fds[i + INCOMING_MSG_PIPE]; l->fds[i + INCOMING_MSG_PIPE] = l->fds[l->tracked_fds - 1 + INCOMING_MSG_PIPE]; l->fds[l->tracked_fds - 1 + INCOMING_MSG_PIPE] = pfd; connection_info *ci = l->fd_info[i]; l->fd_info[i] = l->fd_info[l->tracked_fds - 1]; l->fd_info[l->tracked_fds - 1] = ci; int last_active = l->tracked_fds - l->inactive_fds - 1; /* If removing active node and it isn't the last active one, swap them */ if (is_active && id != last_active) { assert(id < last_active); swap(l, id, last_active); id = last_active; } /* If node (which is either last active node or inactive) is not at the end, * and there are inactive nodes, swap it with the last.*/ int last = l->tracked_fds - 1; if (id < last) { swap(l, id, last); id = last; } /* The node is now at the end of the array. */ } l->tracked_fds--; if (!is_active) { l->inactive_fds--; } } } /* CI will be freed by the client thread. */ Loading
src/lib/bus/listener_internal_types.h +18 −2 Original line number Diff line number Diff line Loading @@ -160,11 +160,27 @@ typedef struct listener { size_t upstream_backpressure; uint16_t tracked_fds; /* tracked_fds + incoming_msg_pipe */ uint16_t tracked_fds; ///< FDs currently tracked by listener /** File descriptors that are inactive due to errors, but have not * yet been explicitly removed/closed by the client. */ uint16_t inactive_fds; /** Tracked file descriptors, for polling. * * fds[INCOMING_MSG_PIPE_ID (0)] is the incoming_msg_pipe, so the * listener's poll is awakened by incoming commands. fds[1] through * fds[l->tracked_fds - l->inactive_fds] are the file descriptors * which should be polled, and the remaining ones (if any) have been * moved to the end so poll() will not touch them. */ struct pollfd fds[MAX_FDS + 1]; /** The connection info, corresponding to the the file descriptors tracked in * l->fds. Unlike l->fds, these are not offset by one for the incoming message * pipe, i.e. l->fd_info[3] correspons to l->fds[3 + INCOMING_MSG_PIPE]. */ connection_info *fd_info[MAX_FDS]; bool error_occured; ///< Flag indicating post-poll handling is necessary. /* Read buffer and it's size. Will be grown on demand. */ size_t read_buf_size; uint8_t *read_buf; Loading
src/lib/bus/listener_io.c +42 −2 Original line number Diff line number Diff line Loading @@ -39,6 +39,7 @@ static void set_error_for_socket(listener *l, int id, int fd, rx_error_t err); static void process_unpacked_message(listener *l, connection_info *ci, bus_unpack_cb_res_t result); static void move_errored_active_sockets_to_end(listener *l); void ListenerIO_AttemptRecv(listener *l, int available) { /* --> failure --> set 'closed' error on socket, don't die */ Loading Loading @@ -103,6 +104,13 @@ void ListenerIO_AttemptRecv(listener *l, int available) { set_error_for_socket(l, i, ci->fd, RX_ERROR_POLLHUP); } } if (l->error_occured) { // only conditionally do this to avoid wasting CPU /* This is done outside of the polling loop, to avoid erroneously repeat-polling * or skipping any individual file descriptors. */ move_errored_active_sockets_to_end(l); l->error_occured = false; } } static ssize_t socket_read_plain(struct bus *b, listener *l, int pfd_i, connection_info *ci) { Loading Loading @@ -255,6 +263,8 @@ static bool sink_socket_read(struct bus *b, } static void set_error_for_socket(listener *l, int id, int fd, rx_error_t err) { l->error_occured = true; /* Mark all pending messages on this socket as being failed due to error. */ struct bus *b = l->bus; BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64, Loading Loading @@ -288,7 +298,37 @@ static void set_error_for_socket(listener *l, int id, int fd, rx_error_t err) { } } } l->fds[id + INCOMING_MSG_PIPE].events &= ~POLLIN; connection_info *newly_inactive_ci = l->fd_info[id]; newly_inactive_ci->error = err; } static void move_errored_active_sockets_to_end(listener *l) { for (uint16_t id = 0; id < l->tracked_fds - l->inactive_fds; id++) { connection_info *ci = l->fd_info[id]; struct pollfd *pfd = &l->fds[id + INCOMING_MSG_PIPE]; int fd = pfd->fd; if (ci->error < 0 && pfd->events & POLLIN) { pfd->events &= ~POLLIN; /* move socket to end, so it won't be poll'd and get repeated POLLHUP. */ int last_active = l->tracked_fds - l->inactive_fds - 1; if (id != last_active) { fprintf(stderr, "swapping %u and %u\n", id, last_active); assert(l->fds[last_active + INCOMING_MSG_PIPE].fd != fd); struct pollfd newly_inactive_fd = l->fds[id + INCOMING_MSG_PIPE]; struct pollfd last_active_fd = l->fds[last_active + INCOMING_MSG_PIPE]; connection_info *last_active_ci = l->fd_info[last_active]; /* Swap pollfds */ l->fds[id + INCOMING_MSG_PIPE] = last_active_fd; l->fds[last_active + INCOMING_MSG_PIPE] = newly_inactive_fd; /* Swap connection_info pointers */ l->fd_info[last_active] = ci; l->fd_info[id] = last_active_ci; } l->inactive_fds++; assert(l->inactive_fds <= l->tracked_fds); } } } static void process_unpacked_message(listener *l, Loading
src/lib/bus/listener_task.c +3 −2 Original line number Diff line number Diff line Loading @@ -32,7 +32,6 @@ struct timeval now; struct timeval cur; size_t backpressure = 0; int poll_res = 0; #define WHILE if #else #define WHILE while Loading Loading @@ -76,7 +75,9 @@ void *ListenerTask_MainLoop(void *arg) { int poll_res = 0; #endif poll_res = syscall_poll(self->fds, self->tracked_fds + INCOMING_MSG_PIPE, delay); int to_poll = self->tracked_fds - self->inactive_fds + INCOMING_MSG_PIPE; poll_res = syscall_poll(self->fds, to_poll, delay); BUS_LOG_SNPRINTF(b, (poll_res == 0 ? 6 : 4), LOG_LISTENER, b->udata, 64, "poll res %d", poll_res); Loading
test/unit/test_bus_listener_cmd.c +245 −15 Original line number Diff line number Diff line Loading @@ -62,12 +62,14 @@ void setUp(void) { memset(&Listener, 0, sizeof(Listener)); l = &Listener; l->bus = &B; l->tracked_fds = 0; l->inactive_fds = 0; box = &Box; } void tearDown(void) {} void test_listener_ListenerCmd_NotifyCaller_should_write_tag_to_caller_fd(void) { void test_ListenerCmd_NotifyCaller_should_write_tag_to_caller_fd(void) { int fd = 5; ListenerTask_GetBackpressure_ExpectAndReturn(l, 0); syscall_write_ExpectAndReturn(fd, reply_buf, sizeof(reply_buf), 3); Loading @@ -77,7 +79,7 @@ void test_listener_ListenerCmd_NotifyCaller_should_write_tag_to_caller_fd(void) TEST_ASSERT_EQUAL(0, reply_buf[2]); } void test_listener_ListenerCmd_NotifyCaller_should_write_backpressure_to_reply_buffer(void) { void test_ListenerCmd_NotifyCaller_should_write_backpressure_to_reply_buffer(void) { int fd = 5; ListenerTask_GetBackpressure_ExpectAndReturn(l, 0x1234); syscall_write_ExpectAndReturn(fd, reply_buf, sizeof(reply_buf), 3); Loading @@ -86,7 +88,7 @@ void test_listener_ListenerCmd_NotifyCaller_should_write_backpressure_to_reply_b TEST_ASSERT_EQUAL(0x12, reply_buf[2]); } void test_listener_ListenerCmd_CheckIncomingMessages_should_return_on_error(void) { void test_ListenerCmd_CheckIncomingMessages_should_return_on_error(void) { int res = 1; l->fds[INCOMING_MSG_PIPE].revents = POLLHUP; ListenerCmd_CheckIncomingMessages(l, &res); Loading @@ -106,7 +108,7 @@ static void expect_notify_caller(listener *l, int fd) { syscall_write_ExpectAndReturn(fd, reply_buf, sizeof(reply_buf), 3); } void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_redundant_ADD_SOCKET_command(void) { void test_ListenerCmd_CheckIncomingMessages_should_handle_redundant_ADD_SOCKET_command(void) { l->fds[INCOMING_MSG_PIPE_ID].fd = 5; l->fds[INCOMING_MSG_PIPE_ID].revents = POLLIN; l->read_buf = malloc(256); Loading Loading @@ -156,7 +158,7 @@ static void setup_command(listener_msg *pmsg, rx_info_t *info) { } } void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_incoming_ADD_SOCKET_command(void) { void test_ListenerCmd_CheckIncomingMessages_should_handle_incoming_ADD_SOCKET_command(void) { connection_info *ci = calloc(1, sizeof(*ci)); *(int *)&ci->fd = 91; Loading @@ -172,6 +174,11 @@ void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_incoming_ADD_ int res = 1; l->tracked_fds = 3; for (int i = 0; i < l->tracked_fds; i++) { l->fds[i + INCOMING_MSG_PIPE].fd = i; l->fds[i + INCOMING_MSG_PIPE].events = POLLIN; } expect_notify_caller(l, 7); ListenerTask_ReleaseMsg_Expect(l, &l->msgs[0]); ListenerCmd_CheckIncomingMessages(l, &res); Loading @@ -183,7 +190,46 @@ void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_incoming_ADD_ TEST_ASSERT_EQUAL(4, l->tracked_fds); } void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_incoming_REMOVE_SOCKET_command(void) { void test_ListenerCmd_CheckIncomingMessages_should_handle_incoming_ADD_SOCKET_command_correctly_with_inactive_sockets(void) { connection_info *ci = calloc(1, sizeof(*ci)); *(int *)&ci->fd = 91; listener_msg msg = { .type = MSG_ADD_SOCKET, .u.add_socket = { .info = ci, .notify_fd = 7, }, }; setup_command(&msg, NULL); int res = 1; l->tracked_fds = 3; // [0 1 | 2] l->inactive_fds = 1; for (int i = 0; i < l->tracked_fds; i++) { l->fds[i + INCOMING_MSG_PIPE].fd = i; if (i < l->tracked_fds - l->inactive_fds) { l->fds[i + INCOMING_MSG_PIPE].events = POLLIN; } else { l->fds[i + INCOMING_MSG_PIPE].events = 0; } } expect_notify_caller(l, 7); ListenerTask_ReleaseMsg_Expect(l, &l->msgs[0]); ListenerCmd_CheckIncomingMessages(l, &res); TEST_ASSERT_EQUAL(4, l->tracked_fds); TEST_ASSERT_EQUAL(ci, l->fd_info[2]); TEST_ASSERT_EQUAL(ci->fd, l->fds[2 + INCOMING_MSG_PIPE].fd); TEST_ASSERT_EQUAL(31, ci->to_read_size); TEST_ASSERT_EQUAL(POLLIN, l->fds[2 + INCOMING_MSG_PIPE].events); TEST_ASSERT_EQUAL(0, l->fds[3 + INCOMING_MSG_PIPE].events); TEST_ASSERT_EQUAL(2, l->fds[3 + INCOMING_MSG_PIPE].fd); } void test_ListenerCmd_CheckIncomingMessages_should_handle_incoming_REMOVE_SOCKET_command_freeing_single_fd(void) { listener_msg msg = { .id = 4, .type = MSG_REMOVE_SOCKET, Loading @@ -196,19 +242,203 @@ void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_incoming_REMO setup_command(&msg, NULL); expect_notify_caller(l, 100); connection_info *ci = calloc(1, sizeof(*ci)); l->tracked_fds = 1; l->fds[0 + INCOMING_MSG_PIPE].fd = 50; l->fds[0 + INCOMING_MSG_PIPE].events = POLLIN; connection_info *ci0 = calloc(1, sizeof(*ci0)); l->fd_info[0] = ci0; int res = 1; ListenerTask_ReleaseMsg_Expect(l, &l->msgs[0]); ListenerCmd_CheckIncomingMessages(l, &res); TEST_ASSERT_EQUAL(0, l->tracked_fds); TEST_ASSERT_EQUAL(0, res); } void test_ListenerCmd_CheckIncomingMessages_should_handle_incoming_REMOVE_SOCKET_command_freeing_single_fd_when_inactive(void) { listener_msg msg = { .id = 4, .type = MSG_REMOVE_SOCKET, .pipes = {7, 8}, .u.remove_socket = { .fd = 50, .notify_fd = 100, }, }; setup_command(&msg, NULL); expect_notify_caller(l, 100); l->tracked_fds = 1; l->inactive_fds = 1; l->fds[0 + INCOMING_MSG_PIPE].fd = 50; l->fds[0 + INCOMING_MSG_PIPE].events = 0; // no POLLIN -> inactive connection_info *ci0 = calloc(1, sizeof(*ci0)); l->fd_info[0] = ci0; int res = 1; ListenerTask_ReleaseMsg_Expect(l, &l->msgs[0]); ListenerCmd_CheckIncomingMessages(l, &res); TEST_ASSERT_EQUAL(0, l->tracked_fds); TEST_ASSERT_EQUAL(0, l->inactive_fds); TEST_ASSERT_EQUAL(0, res); } void test_ListenerCmd_CheckIncomingMessages_should_handle_incoming_REMOVE_SOCKET_command_freeing_first_of_two(void) { listener_msg msg = { .id = 4, .type = MSG_REMOVE_SOCKET, .pipes = {7, 8}, .u.remove_socket = { .fd = 50, // free first fds .notify_fd = 100, }, }; setup_command(&msg, NULL); expect_notify_caller(l, 100); l->tracked_fds = 2; l->fds[0 + INCOMING_MSG_PIPE].fd = 50; l->fds[0 + INCOMING_MSG_PIPE].events = POLLIN; connection_info *ci0 = calloc(1, sizeof(*ci0)); l->fd_info[0] = ci0; l->fds[1 + INCOMING_MSG_PIPE].fd = 150; l->fds[1 + INCOMING_MSG_PIPE].events = POLLIN; connection_info *ci1 = calloc(1, sizeof(*ci1)); l->fd_info[1] = ci1; int res = 1; ListenerTask_ReleaseMsg_Expect(l, &l->msgs[0]); ListenerCmd_CheckIncomingMessages(l, &res); TEST_ASSERT_EQUAL(1, l->tracked_fds); TEST_ASSERT_EQUAL(0, res); TEST_ASSERT_EQUAL(150, l->fds[0 + INCOMING_MSG_PIPE].fd); TEST_ASSERT_EQUAL(POLLIN, l->fds[0 + INCOMING_MSG_PIPE].events); } void test_ListenerCmd_CheckIncomingMessages_should_handle_incoming_REMOVE_SOCKET_command_freeing_second_of_two(void) { listener_msg msg = { .id = 4, .type = MSG_REMOVE_SOCKET, .pipes = {7, 8}, .u.remove_socket = { .fd = 150, // free second fds .notify_fd = 100, }, }; setup_command(&msg, NULL); expect_notify_caller(l, 100); l->tracked_fds = 2; l->fds[1 + INCOMING_MSG_PIPE].fd = 50; l->fd_info[1] = ci; l->fds[0 + INCOMING_MSG_PIPE].fd = 50; l->fds[0 + INCOMING_MSG_PIPE].events = POLLIN; connection_info *ci0 = calloc(1, sizeof(*ci0)); l->fd_info[0] = ci0; l->fds[1 + INCOMING_MSG_PIPE].fd = 150; l->fds[1 + INCOMING_MSG_PIPE].events = POLLIN; connection_info *ci1 = calloc(1, sizeof(*ci1)); l->fd_info[1] = ci1; int res = 1; ListenerTask_ReleaseMsg_Expect(l, &l->msgs[0]); ListenerCmd_CheckIncomingMessages(l, &res); TEST_ASSERT_EQUAL(1, l->tracked_fds); TEST_ASSERT_EQUAL(0, res); TEST_ASSERT_EQUAL(50, l->fds[0 + INCOMING_MSG_PIPE].fd); TEST_ASSERT_EQUAL(POLLIN, l->fds[0 + INCOMING_MSG_PIPE].events); } static void handle_remove_with_active_and_inactive_mix(int tracked, int inactive, int remove_nth) { listener_msg msg = { .id = 4, .type = MSG_REMOVE_SOCKET, .pipes = {7, 8}, .u.remove_socket = { .fd = remove_nth, .notify_fd = 100, }, }; setup_command(&msg, NULL); expect_notify_caller(l, 100); l->tracked_fds = tracked; l->inactive_fds = inactive; TEST_ASSERT(inactive <= tracked); for (int i = 0; i < tracked; i++) { struct pollfd *pfd = &l->fds[i + INCOMING_MSG_PIPE]; pfd->fd = i; if (i < tracked - inactive) { pfd->events = POLLIN; } else { pfd->events = 0; } connection_info *ci = calloc(1, sizeof(*ci)); l->fd_info[i] = ci; *(int *)&ci->fd = i; // cast to set const field TEST_ASSERT(ci); } int res = 1; ListenerTask_ReleaseMsg_Expect(l, &l->msgs[0]); ListenerCmd_CheckIncomingMessages(l, &res); TEST_ASSERT_EQUAL(0, res); if (remove_nth != tracked - 1) { TEST_ASSERT_NOT_EQUAL(remove_nth, l->fds[remove_nth + INCOMING_MSG_PIPE].fd); TEST_ASSERT_NOT_EQUAL(remove_nth, l->fd_info[remove_nth]->fd); } // The removed one should now be last TEST_ASSERT_EQUAL(tracked - 1, l->tracked_fds); TEST_ASSERT_EQUAL(remove_nth, l->fds[tracked - 1 + INCOMING_MSG_PIPE].fd); bool removing_inactive = (remove_nth >= tracked - inactive); int last_active = tracked - inactive - (removing_inactive ? 0 : 1); int now_active = 0; int now_inactive = 0; for (int i = 0; i < tracked - 1; i++) { struct pollfd *pfd = &l->fds[i + INCOMING_MSG_PIPE]; if (pfd->events & POLLIN) { now_active++; // all active FDs must be contiguous at the beginning TEST_ASSERT(i < last_active); } else { now_inactive++; } } if (removing_inactive) { TEST_ASSERT_EQUAL(inactive - 1, now_inactive); TEST_ASSERT_EQUAL(tracked - inactive, now_active); } else { TEST_ASSERT_EQUAL(inactive, now_inactive); TEST_ASSERT_EQUAL(tracked - inactive - 1, now_active); } } /* Check invariants when deleting each socket R out of N sockets, * when M out of N are inactive. Try all cases up to N == limit. */ void test_remove_nth_parametric(void) { const int limit = 7; // cmock uses excessive memory if > than this for (int tracked = 0; tracked < limit; tracked++) { for (int inactive = 0; inactive <= tracked; inactive++) { for (int remove_nth = 0; remove_nth < tracked; remove_nth++) { handle_remove_with_active_and_inactive_mix(tracked, inactive, remove_nth); } } } } void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_NULL_info_failure_case(void) { void test_ListenerCmd_CheckIncomingMessages_should_handle_NULL_info_failure_case(void) { listener_msg msg = { .id = 1, .type = MSG_HOLD_RESPONSE, Loading @@ -230,7 +460,7 @@ void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_NULL_info_fai TEST_ASSERT_EQUAL(0, res); } void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_incoming_HOLD_RESPONSE_command(void) { void test_ListenerCmd_CheckIncomingMessages_should_handle_incoming_HOLD_RESPONSE_command(void) { listener_msg msg = { .id = 1, .type = MSG_HOLD_RESPONSE, Loading Loading @@ -260,7 +490,7 @@ void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_incoming_HOLD TEST_ASSERT_EQUAL(false, info.u.hold.has_result); } void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_incoming_EXPECT_command_when_result_is_saved(void) { void test_ListenerCmd_CheckIncomingMessages_should_handle_incoming_EXPECT_command_when_result_is_saved(void) { listener_msg msg = { .id = 1, .type = MSG_EXPECT_RESPONSE, Loading Loading @@ -304,7 +534,7 @@ void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_incoming_EXPE TEST_ASSERT_EQUAL(12345, hold_info.u.expect.result.u.success.seq_id); } void test_listener_ListenerCmd_CheckIncomingMessages_should_immediately_fail_incoming_EXPECT_command_when_corresponding_HOLD_has_an_error(void) { void test_ListenerCmd_CheckIncomingMessages_should_immediately_fail_incoming_EXPECT_command_when_corresponding_HOLD_has_an_error(void) { listener_msg msg = { .id = 1, .type = MSG_EXPECT_RESPONSE, Loading Loading @@ -347,7 +577,7 @@ void test_listener_ListenerCmd_CheckIncomingMessages_should_immediately_fail_inc } void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_incoming_EXPECT_command_when_no_result_is_saved(void) { void test_ListenerCmd_CheckIncomingMessages_should_handle_incoming_EXPECT_command_when_no_result_is_saved(void) { listener_msg msg = { .id = 1, .type = MSG_EXPECT_RESPONSE, Loading Loading @@ -378,7 +608,7 @@ void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_incoming_EXPE TEST_ASSERT_EQUAL(11, hold_info.timeout_sec); } void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_incoming_SHUTDOWN_command(void) { void test_ListenerCmd_CheckIncomingMessages_should_handle_incoming_SHUTDOWN_command(void) { listener_msg msg = { .id = 1, .type = MSG_SHUTDOWN, Loading