Loading src/lib/bus/listener_cmd.c +31 −14 Original line number Diff line number Diff line Loading @@ -144,6 +144,20 @@ static void msg_handler(listener *l, listener_msg *pmsg) { ListenerTask_ReleaseMsg(l, pmsg); } /* Swap poll and connection info for tracked sockets, by array offset. */ static void swap(listener *l, int a, int b) { struct pollfd a_pfd = l->fds[a + INCOMING_MSG_PIPE]; struct pollfd b_pfd = l->fds[b + INCOMING_MSG_PIPE]; connection_info *a_ci = l->fd_info[a]; connection_info *b_ci = l->fd_info[b]; l->fds[b + INCOMING_MSG_PIPE] = a_pfd; l->fds[a + INCOMING_MSG_PIPE] = b_pfd; l->fd_info[a] = b_ci; l->fd_info[b] = a_ci; } static void add_socket(listener *l, connection_info *ci, int notify_fd) { /* TODO: if epoll, just register with the OS. */ struct bus *b = l->bus; Loading @@ -167,8 +181,25 @@ static void add_socket(listener *l, connection_info *ci, int notify_fd) { l->fd_info[id] = ci; l->fds[id + INCOMING_MSG_PIPE].fd = ci->fd; l->fds[id + INCOMING_MSG_PIPE].events = POLLIN; /* If there are any inactive FDs, we need to swap the new last FD * and the first inactive FD so that the active and inactive FDs * remain contiguous. */ if (l->inactive_fds > 0) { int first_inactive = l->tracked_fds - l->inactive_fds; swap(l, id, first_inactive); } l->tracked_fds++; for (int i = 0; i < l->tracked_fds; i++) { if (l->fds[i + INCOMING_MSG_PIPE].events & POLLIN) { assert(i < l->tracked_fds - l->inactive_fds); } else { assert(i >= l->tracked_fds - l->inactive_fds); } } /* Prime the pump by sinking 0 bytes and getting a size to expect. */ bus_sink_cb_res_t sink_res = b->sink_cb(l->read_buf, 0, ci->udata); BUS_ASSERT(b, b->udata, sink_res.full_msg_buffer == NULL); // should have nothing to handle yet Loading @@ -184,20 +215,6 @@ static void add_socket(listener *l, connection_info *ci, int notify_fd) { ListenerCmd_NotifyCaller(l, notify_fd); } /* Swap poll and connection info for tracked sockets, by array offset. */ static void swap(listener *l, int a, int b) { struct pollfd a_pfd = l->fds[a + INCOMING_MSG_PIPE]; struct pollfd b_pfd = l->fds[b + INCOMING_MSG_PIPE]; connection_info *a_ci = l->fd_info[a]; connection_info *b_ci = l->fd_info[b]; l->fds[b + INCOMING_MSG_PIPE] = a_pfd; l->fds[a + INCOMING_MSG_PIPE] = b_pfd; l->fd_info[a] = b_ci; l->fd_info[b] = a_ci; } static void remove_socket(listener *l, int fd, int notify_fd) { struct bus *b = l->bus; BUS_LOG_SNPRINTF(b, 2, LOG_LISTENER, b->udata, 128, Loading test/unit/test_bus_listener_cmd.c +44 −0 Original line number Diff line number Diff line Loading @@ -174,6 +174,11 @@ void test_ListenerCmd_CheckIncomingMessages_should_handle_incoming_ADD_SOCKET_co int res = 1; l->tracked_fds = 3; for (int i = 0; i < l->tracked_fds; i++) { l->fds[i + INCOMING_MSG_PIPE].fd = i; l->fds[i + INCOMING_MSG_PIPE].events = POLLIN; } expect_notify_caller(l, 7); ListenerTask_ReleaseMsg_Expect(l, &l->msgs[0]); ListenerCmd_CheckIncomingMessages(l, &res); Loading @@ -185,6 +190,45 @@ void test_ListenerCmd_CheckIncomingMessages_should_handle_incoming_ADD_SOCKET_co TEST_ASSERT_EQUAL(4, l->tracked_fds); } void test_ListenerCmd_CheckIncomingMessages_should_handle_incoming_ADD_SOCKET_command_correctly_with_inactive_sockets(void) { connection_info *ci = calloc(1, sizeof(*ci)); *(int *)&ci->fd = 91; listener_msg msg = { .type = MSG_ADD_SOCKET, .u.add_socket = { .info = ci, .notify_fd = 7, }, }; setup_command(&msg, NULL); int res = 1; l->tracked_fds = 3; // [0 1 | 2] l->inactive_fds = 1; for (int i = 0; i < l->tracked_fds; i++) { l->fds[i + INCOMING_MSG_PIPE].fd = i; if (i < l->tracked_fds - l->inactive_fds) { l->fds[i + INCOMING_MSG_PIPE].events = POLLIN; } else { l->fds[i + INCOMING_MSG_PIPE].events = 0; } } expect_notify_caller(l, 7); ListenerTask_ReleaseMsg_Expect(l, &l->msgs[0]); ListenerCmd_CheckIncomingMessages(l, &res); TEST_ASSERT_EQUAL(4, l->tracked_fds); TEST_ASSERT_EQUAL(ci, l->fd_info[2]); TEST_ASSERT_EQUAL(ci->fd, l->fds[2 + INCOMING_MSG_PIPE].fd); TEST_ASSERT_EQUAL(31, ci->to_read_size); TEST_ASSERT_EQUAL(POLLIN, l->fds[2 + INCOMING_MSG_PIPE].events); TEST_ASSERT_EQUAL(0, l->fds[3 + INCOMING_MSG_PIPE].events); TEST_ASSERT_EQUAL(2, l->fds[3 + INCOMING_MSG_PIPE].fd); } void test_ListenerCmd_CheckIncomingMessages_should_handle_incoming_REMOVE_SOCKET_command_freeing_single_fd(void) { listener_msg msg = { .id = 4, Loading Loading
src/lib/bus/listener_cmd.c +31 −14 Original line number Diff line number Diff line Loading @@ -144,6 +144,20 @@ static void msg_handler(listener *l, listener_msg *pmsg) { ListenerTask_ReleaseMsg(l, pmsg); } /* Swap poll and connection info for tracked sockets, by array offset. */ static void swap(listener *l, int a, int b) { struct pollfd a_pfd = l->fds[a + INCOMING_MSG_PIPE]; struct pollfd b_pfd = l->fds[b + INCOMING_MSG_PIPE]; connection_info *a_ci = l->fd_info[a]; connection_info *b_ci = l->fd_info[b]; l->fds[b + INCOMING_MSG_PIPE] = a_pfd; l->fds[a + INCOMING_MSG_PIPE] = b_pfd; l->fd_info[a] = b_ci; l->fd_info[b] = a_ci; } static void add_socket(listener *l, connection_info *ci, int notify_fd) { /* TODO: if epoll, just register with the OS. */ struct bus *b = l->bus; Loading @@ -167,8 +181,25 @@ static void add_socket(listener *l, connection_info *ci, int notify_fd) { l->fd_info[id] = ci; l->fds[id + INCOMING_MSG_PIPE].fd = ci->fd; l->fds[id + INCOMING_MSG_PIPE].events = POLLIN; /* If there are any inactive FDs, we need to swap the new last FD * and the first inactive FD so that the active and inactive FDs * remain contiguous. */ if (l->inactive_fds > 0) { int first_inactive = l->tracked_fds - l->inactive_fds; swap(l, id, first_inactive); } l->tracked_fds++; for (int i = 0; i < l->tracked_fds; i++) { if (l->fds[i + INCOMING_MSG_PIPE].events & POLLIN) { assert(i < l->tracked_fds - l->inactive_fds); } else { assert(i >= l->tracked_fds - l->inactive_fds); } } /* Prime the pump by sinking 0 bytes and getting a size to expect. */ bus_sink_cb_res_t sink_res = b->sink_cb(l->read_buf, 0, ci->udata); BUS_ASSERT(b, b->udata, sink_res.full_msg_buffer == NULL); // should have nothing to handle yet Loading @@ -184,20 +215,6 @@ static void add_socket(listener *l, connection_info *ci, int notify_fd) { ListenerCmd_NotifyCaller(l, notify_fd); } /* Swap poll and connection info for tracked sockets, by array offset. */ static void swap(listener *l, int a, int b) { struct pollfd a_pfd = l->fds[a + INCOMING_MSG_PIPE]; struct pollfd b_pfd = l->fds[b + INCOMING_MSG_PIPE]; connection_info *a_ci = l->fd_info[a]; connection_info *b_ci = l->fd_info[b]; l->fds[b + INCOMING_MSG_PIPE] = a_pfd; l->fds[a + INCOMING_MSG_PIPE] = b_pfd; l->fd_info[a] = b_ci; l->fd_info[b] = a_ci; } static void remove_socket(listener *l, int fd, int notify_fd) { struct bus *b = l->bus; BUS_LOG_SNPRINTF(b, 2, LOG_LISTENER, b->udata, 128, Loading
test/unit/test_bus_listener_cmd.c +44 −0 Original line number Diff line number Diff line Loading @@ -174,6 +174,11 @@ void test_ListenerCmd_CheckIncomingMessages_should_handle_incoming_ADD_SOCKET_co int res = 1; l->tracked_fds = 3; for (int i = 0; i < l->tracked_fds; i++) { l->fds[i + INCOMING_MSG_PIPE].fd = i; l->fds[i + INCOMING_MSG_PIPE].events = POLLIN; } expect_notify_caller(l, 7); ListenerTask_ReleaseMsg_Expect(l, &l->msgs[0]); ListenerCmd_CheckIncomingMessages(l, &res); Loading @@ -185,6 +190,45 @@ void test_ListenerCmd_CheckIncomingMessages_should_handle_incoming_ADD_SOCKET_co TEST_ASSERT_EQUAL(4, l->tracked_fds); } void test_ListenerCmd_CheckIncomingMessages_should_handle_incoming_ADD_SOCKET_command_correctly_with_inactive_sockets(void) { connection_info *ci = calloc(1, sizeof(*ci)); *(int *)&ci->fd = 91; listener_msg msg = { .type = MSG_ADD_SOCKET, .u.add_socket = { .info = ci, .notify_fd = 7, }, }; setup_command(&msg, NULL); int res = 1; l->tracked_fds = 3; // [0 1 | 2] l->inactive_fds = 1; for (int i = 0; i < l->tracked_fds; i++) { l->fds[i + INCOMING_MSG_PIPE].fd = i; if (i < l->tracked_fds - l->inactive_fds) { l->fds[i + INCOMING_MSG_PIPE].events = POLLIN; } else { l->fds[i + INCOMING_MSG_PIPE].events = 0; } } expect_notify_caller(l, 7); ListenerTask_ReleaseMsg_Expect(l, &l->msgs[0]); ListenerCmd_CheckIncomingMessages(l, &res); TEST_ASSERT_EQUAL(4, l->tracked_fds); TEST_ASSERT_EQUAL(ci, l->fd_info[2]); TEST_ASSERT_EQUAL(ci->fd, l->fds[2 + INCOMING_MSG_PIPE].fd); TEST_ASSERT_EQUAL(31, ci->to_read_size); TEST_ASSERT_EQUAL(POLLIN, l->fds[2 + INCOMING_MSG_PIPE].events); TEST_ASSERT_EQUAL(0, l->fds[3 + INCOMING_MSG_PIPE].events); TEST_ASSERT_EQUAL(2, l->fds[3 + INCOMING_MSG_PIPE].fd); } void test_ListenerCmd_CheckIncomingMessages_should_handle_incoming_REMOVE_SOCKET_command_freeing_single_fd(void) { listener_msg msg = { .id = 4, Loading