Commit 5c533b03 authored by Scott Vokes's avatar Scott Vokes
Browse files

Rearrange FDs which have errored out in listener's FD info arrays to avoid polling.

The process for removing FDs has been adjusted so that active and
inactive FDs remain contiguous at the beginning and end (respectively)
of the l->fds and l->fd_info arrays.
parent d6553780
Loading
Loading
Loading
Loading
+38 −9
Original line number Diff line number Diff line
@@ -184,6 +184,20 @@ static void add_socket(listener *l, connection_info *ci, int notify_fd) {
    ListenerCmd_NotifyCaller(l, notify_fd);
}

/* Swap poll and connection info for tracked sockets, by array offset. */
static void swap(listener *l, int a, int b) {
    struct pollfd a_pfd = l->fds[a + INCOMING_MSG_PIPE];
    struct pollfd b_pfd = l->fds[b + INCOMING_MSG_PIPE];
    connection_info *a_ci = l->fd_info[a];
    connection_info *b_ci = l->fd_info[b];

    l->fds[b + INCOMING_MSG_PIPE] = a_pfd;
    l->fds[a + INCOMING_MSG_PIPE] = b_pfd;

    l->fd_info[a] = b_ci;
    l->fd_info[b] = a_ci;
}

static void remove_socket(listener *l, int fd, int notify_fd) {
    struct bus *b = l->bus;
    BUS_LOG_SNPRINTF(b, 2, LOG_LISTENER, b->udata, 128,
@@ -191,18 +205,33 @@ static void remove_socket(listener *l, int fd, int notify_fd) {

    /* Don't really close it, just drop info about it in the listener.
     * The client thread will actually free the structure, close SSL, etc. */
    for (int i = 0; i < l->tracked_fds; i++) {
        if (l->fds[i + INCOMING_MSG_PIPE].fd == fd) {
    for (int id = 0; id < l->tracked_fds; id++) {
        struct pollfd removing_pfd = l->fds[id + INCOMING_MSG_PIPE];
        if (removing_pfd.fd == fd) {
            bool is_active = (removing_pfd.events & POLLIN) > 0;
            if (l->tracked_fds > 1) {
                /* Swap pollfd CI and last ones. */
                struct pollfd pfd = l->fds[i + INCOMING_MSG_PIPE];
                l->fds[i + INCOMING_MSG_PIPE] = l->fds[l->tracked_fds - 1 + INCOMING_MSG_PIPE];
                l->fds[l->tracked_fds - 1 + INCOMING_MSG_PIPE] = pfd;
                connection_info *ci = l->fd_info[i];
                l->fd_info[i] = l->fd_info[l->tracked_fds - 1];
                l->fd_info[l->tracked_fds - 1] = ci;
                int last_active = l->tracked_fds - l->inactive_fds - 1;

                /* If removing active node and it isn't the last active one, swap them */
                if (is_active && id != last_active) {
                    assert(id < last_active);
                    swap(l, id, last_active);
                    id = last_active;
                }

                /* If node (which is either last active node or inactive) is not at the end,
                 * and there are inactive nodes, swap it with the last.*/
                int last = l->tracked_fds - 1;
                if (id < last) {
                    swap(l, id, last);
                    id = last;
                }

                /* The node is now at the end of the array. */
            }
            
            l->tracked_fds--;
            if (!is_active) { l->inactive_fds--; }
        }
    }
    /* CI will be freed by the client thread. */
+18 −2
Original line number Diff line number Diff line
@@ -151,11 +151,27 @@ typedef struct listener {

    size_t upstream_backpressure;

    uint16_t tracked_fds;
    /* tracked_fds + incoming_msg_pipe */
    uint16_t tracked_fds;       ///< FDs currently tracked by listener
    /** File descriptors that are inactive due to errors, but have not
     * yet been explicitly removed/closed by the client. */
    uint16_t inactive_fds;

    /** Tracked file descriptors, for polling.
     * 
     * fds[INCOMING_MSG_PIPE_ID (0)] is the incoming_msg_pipe, so the
     * listener's poll is awakened by incoming commands. fds[1] through
     * fds[l->tracked_fds - l->inactive_fds] are the file descriptors
     * which should be polled, and the remaining ones (if any) have been
     * moved to the end so poll() will not touch them. */
    struct pollfd fds[MAX_FDS + 1];

    /** The connection info, corresponding to the the file descriptors tracked in
     * l->fds. Unlike l->fds, these are not offset by one for the incoming message
     * pipe, i.e. l->fd_info[3] correspons to l->fds[3 + INCOMING_MSG_PIPE]. */
    connection_info *fd_info[MAX_FDS];

    bool error_occured;         ///< Flag indicating post-poll handling is necessary.

    /* Read buffer and it's size. Will be grown on demand. */
    size_t read_buf_size;
    uint8_t *read_buf;
+42 −2
Original line number Diff line number Diff line
@@ -39,6 +39,7 @@ static void set_error_for_socket(listener *l, int id,
    int fd, rx_error_t err);
static void process_unpacked_message(listener *l,
    connection_info *ci, bus_unpack_cb_res_t result);
static void move_errored_active_sockets_to_end(listener *l);

void ListenerIO_AttemptRecv(listener *l, int available) {
    /*   --> failure --> set 'closed' error on socket, don't die */
@@ -103,6 +104,13 @@ void ListenerIO_AttemptRecv(listener *l, int available) {
            set_error_for_socket(l, i, ci->fd, RX_ERROR_POLLHUP);
        }
    }

    if (l->error_occured) {  // only conditionally do this to avoid wasting CPU
        /* This is done outside of the polling loop, to avoid erroneously repeat-polling
         * or skipping any individual file descriptors. */
        move_errored_active_sockets_to_end(l);
        l->error_occured = false;
    }        
}
    
static ssize_t socket_read_plain(struct bus *b, listener *l, int pfd_i, connection_info *ci) {
@@ -255,6 +263,8 @@ static bool sink_socket_read(struct bus *b,
}

static void set_error_for_socket(listener *l, int id, int fd, rx_error_t err) {
    l->error_occured = true;

    /* Mark all pending messages on this socket as being failed due to error. */
    struct bus *b = l->bus;
    BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64,
@@ -288,7 +298,37 @@ static void set_error_for_socket(listener *l, int id, int fd, rx_error_t err) {
        }
        }
    }
    l->fds[id + INCOMING_MSG_PIPE].events &= ~POLLIN;

    connection_info *newly_inactive_ci = l->fd_info[id];
    newly_inactive_ci->error = err;
}

static void move_errored_active_sockets_to_end(listener *l) {
    for (uint16_t id = 0; id < l->tracked_fds - l->inactive_fds; id++) {
        connection_info *ci = l->fd_info[id];
        struct pollfd *pfd = &l->fds[id + INCOMING_MSG_PIPE];
        int fd = pfd->fd;
        if (ci->error < 0 && pfd->events & POLLIN) {
            pfd->events &= ~POLLIN;
            /* move socket to end, so it won't be poll'd and get repeated POLLHUP. */
            int last_active = l->tracked_fds - l->inactive_fds - 1;
            if (id != last_active) {
                fprintf(stderr, "swapping %u and %u\n", id, last_active);
                assert(l->fds[last_active + INCOMING_MSG_PIPE].fd != fd);
                struct pollfd newly_inactive_fd = l->fds[id + INCOMING_MSG_PIPE];
                struct pollfd last_active_fd = l->fds[last_active + INCOMING_MSG_PIPE];
                connection_info *last_active_ci = l->fd_info[last_active];
                /* Swap pollfds */
                l->fds[id + INCOMING_MSG_PIPE] = last_active_fd;
                l->fds[last_active + INCOMING_MSG_PIPE] = newly_inactive_fd;
                /* Swap connection_info pointers */
                l->fd_info[last_active] = ci;
                l->fd_info[id] = last_active_ci;
            }
            l->inactive_fds++;
            assert(l->inactive_fds <= l->tracked_fds);
        }
    }
}

static void process_unpacked_message(listener *l,
+3 −2
Original line number Diff line number Diff line
@@ -32,7 +32,6 @@ struct timeval now;
struct timeval cur;
size_t backpressure = 0;
int poll_res = 0;

#define WHILE if
#else
#define WHILE while
@@ -76,7 +75,9 @@ void *ListenerTask_MainLoop(void *arg) {
        int poll_res = 0;
        #endif

        poll_res = syscall_poll(self->fds, self->tracked_fds + INCOMING_MSG_PIPE, delay);
        int to_poll = self->tracked_fds - self->inactive_fds + INCOMING_MSG_PIPE;

        poll_res = syscall_poll(self->fds, to_poll, delay);
        BUS_LOG_SNPRINTF(b, (poll_res == 0 ? 6 : 4), LOG_LISTENER, b->udata, 64,
            "poll res %d", poll_res);

+201 −15
Original line number Diff line number Diff line
@@ -62,12 +62,14 @@ void setUp(void) {
    memset(&Listener, 0, sizeof(Listener));
    l = &Listener;
    l->bus = &B;
    l->tracked_fds = 0;
    l->inactive_fds = 0;
    box = &Box;
}

void tearDown(void) {}

void test_listener_ListenerCmd_NotifyCaller_should_write_tag_to_caller_fd(void) {
void test_ListenerCmd_NotifyCaller_should_write_tag_to_caller_fd(void) {
    int fd = 5;
    ListenerTask_GetBackpressure_ExpectAndReturn(l, 0);
    syscall_write_ExpectAndReturn(fd, reply_buf, sizeof(reply_buf), 3);
@@ -77,7 +79,7 @@ void test_listener_ListenerCmd_NotifyCaller_should_write_tag_to_caller_fd(void)
    TEST_ASSERT_EQUAL(0, reply_buf[2]);
}

void test_listener_ListenerCmd_NotifyCaller_should_write_backpressure_to_reply_buffer(void) {
void test_ListenerCmd_NotifyCaller_should_write_backpressure_to_reply_buffer(void) {
    int fd = 5;
    ListenerTask_GetBackpressure_ExpectAndReturn(l, 0x1234);
    syscall_write_ExpectAndReturn(fd, reply_buf, sizeof(reply_buf), 3);
@@ -86,7 +88,7 @@ void test_listener_ListenerCmd_NotifyCaller_should_write_backpressure_to_reply_b
    TEST_ASSERT_EQUAL(0x12, reply_buf[2]);
}

void test_listener_ListenerCmd_CheckIncomingMessages_should_return_on_error(void) {
void test_ListenerCmd_CheckIncomingMessages_should_return_on_error(void) {
    int res = 1;
    l->fds[INCOMING_MSG_PIPE].revents = POLLHUP;
    ListenerCmd_CheckIncomingMessages(l, &res);
@@ -106,7 +108,7 @@ static void expect_notify_caller(listener *l, int fd) {
    syscall_write_ExpectAndReturn(fd, reply_buf, sizeof(reply_buf), 3);
}

void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_redundant_ADD_SOCKET_command(void) {
void test_ListenerCmd_CheckIncomingMessages_should_handle_redundant_ADD_SOCKET_command(void) {
    l->fds[INCOMING_MSG_PIPE_ID].fd = 5;
    l->fds[INCOMING_MSG_PIPE_ID].revents = POLLIN;
    l->read_buf = malloc(256);
@@ -156,7 +158,7 @@ static void setup_command(listener_msg *pmsg, rx_info_t *info) {
    }
}

void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_incoming_ADD_SOCKET_command(void) {
void test_ListenerCmd_CheckIncomingMessages_should_handle_incoming_ADD_SOCKET_command(void) {
    connection_info *ci = calloc(1, sizeof(*ci));
    *(int *)&ci->fd = 91;

@@ -183,7 +185,7 @@ void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_incoming_ADD_
    TEST_ASSERT_EQUAL(4, l->tracked_fds);
}

void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_incoming_REMOVE_SOCKET_command(void) {
void test_ListenerCmd_CheckIncomingMessages_should_handle_incoming_REMOVE_SOCKET_command_freeing_single_fd(void) {
    listener_msg msg = {
        .id = 4,
        .type = MSG_REMOVE_SOCKET,
@@ -196,19 +198,203 @@ void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_incoming_REMO
    setup_command(&msg, NULL);
    expect_notify_caller(l, 100);

    connection_info *ci = calloc(1, sizeof(*ci));
    l->tracked_fds = 1;
    l->fds[0 + INCOMING_MSG_PIPE].fd = 50;
    l->fds[0 + INCOMING_MSG_PIPE].events = POLLIN;
    connection_info *ci0 = calloc(1, sizeof(*ci0));
    l->fd_info[0] = ci0;
    
    int res = 1;
    ListenerTask_ReleaseMsg_Expect(l, &l->msgs[0]);
    ListenerCmd_CheckIncomingMessages(l, &res);

    TEST_ASSERT_EQUAL(0, l->tracked_fds);
    TEST_ASSERT_EQUAL(0, res);
}

void test_ListenerCmd_CheckIncomingMessages_should_handle_incoming_REMOVE_SOCKET_command_freeing_single_fd_when_inactive(void) {
    listener_msg msg = {
        .id = 4,
        .type = MSG_REMOVE_SOCKET,
        .pipes = {7, 8},
        .u.remove_socket = {
            .fd = 50,
            .notify_fd = 100,
        },
    };
    setup_command(&msg, NULL);
    expect_notify_caller(l, 100);

    l->tracked_fds = 1;
    l->inactive_fds = 1;
    l->fds[0 + INCOMING_MSG_PIPE].fd = 50;
    l->fds[0 + INCOMING_MSG_PIPE].events = 0;  // no POLLIN -> inactive
    connection_info *ci0 = calloc(1, sizeof(*ci0));
    l->fd_info[0] = ci0;
    
    int res = 1;
    ListenerTask_ReleaseMsg_Expect(l, &l->msgs[0]);
    ListenerCmd_CheckIncomingMessages(l, &res);

    TEST_ASSERT_EQUAL(0, l->tracked_fds);
    TEST_ASSERT_EQUAL(0, l->inactive_fds);
    TEST_ASSERT_EQUAL(0, res);
}

void test_ListenerCmd_CheckIncomingMessages_should_handle_incoming_REMOVE_SOCKET_command_freeing_first_of_two(void) {
    listener_msg msg = {
        .id = 4,
        .type = MSG_REMOVE_SOCKET,
        .pipes = {7, 8},
        .u.remove_socket = {
            .fd = 50,  // free first fds
            .notify_fd = 100,
        },
    };
    setup_command(&msg, NULL);
    expect_notify_caller(l, 100);

    l->tracked_fds = 2;
    l->fds[1 + INCOMING_MSG_PIPE].fd = 50;
    l->fd_info[1] = ci;
    l->fds[0 + INCOMING_MSG_PIPE].fd = 50;
    l->fds[0 + INCOMING_MSG_PIPE].events = POLLIN;
    connection_info *ci0 = calloc(1, sizeof(*ci0));
    l->fd_info[0] = ci0;

    l->fds[1 + INCOMING_MSG_PIPE].fd = 150;
    l->fds[1 + INCOMING_MSG_PIPE].events = POLLIN;
    connection_info *ci1 = calloc(1, sizeof(*ci1));
    l->fd_info[1] = ci1;
    
    int res = 1;
    ListenerTask_ReleaseMsg_Expect(l, &l->msgs[0]);
    ListenerCmd_CheckIncomingMessages(l, &res);

    TEST_ASSERT_EQUAL(1, l->tracked_fds);
    TEST_ASSERT_EQUAL(0, res);
    TEST_ASSERT_EQUAL(150, l->fds[0 + INCOMING_MSG_PIPE].fd);
    TEST_ASSERT_EQUAL(POLLIN, l->fds[0 + INCOMING_MSG_PIPE].events);
}

void test_ListenerCmd_CheckIncomingMessages_should_handle_incoming_REMOVE_SOCKET_command_freeing_second_of_two(void) {
    listener_msg msg = {
        .id = 4,
        .type = MSG_REMOVE_SOCKET,
        .pipes = {7, 8},
        .u.remove_socket = {
            .fd = 150,  // free second fds
            .notify_fd = 100,
        },
    };
    setup_command(&msg, NULL);
    expect_notify_caller(l, 100);

    l->tracked_fds = 2;
    l->fds[0 + INCOMING_MSG_PIPE].fd = 50;
    l->fds[0 + INCOMING_MSG_PIPE].events = POLLIN;
    connection_info *ci0 = calloc(1, sizeof(*ci0));
    l->fd_info[0] = ci0;

    l->fds[1 + INCOMING_MSG_PIPE].fd = 150;
    l->fds[1 + INCOMING_MSG_PIPE].events = POLLIN;
    connection_info *ci1 = calloc(1, sizeof(*ci1));
    l->fd_info[1] = ci1;
    
    int res = 1;
    ListenerTask_ReleaseMsg_Expect(l, &l->msgs[0]);
    ListenerCmd_CheckIncomingMessages(l, &res);

    TEST_ASSERT_EQUAL(1, l->tracked_fds);
    TEST_ASSERT_EQUAL(0, res);
    TEST_ASSERT_EQUAL(50, l->fds[0 + INCOMING_MSG_PIPE].fd);
    TEST_ASSERT_EQUAL(POLLIN, l->fds[0 + INCOMING_MSG_PIPE].events);
}

static void handle_remove_with_active_and_inactive_mix(int tracked, int inactive, int remove_nth) {
    listener_msg msg = {
        .id = 4,
        .type = MSG_REMOVE_SOCKET,
        .pipes = {7, 8},
        .u.remove_socket = {
            .fd = remove_nth,
            .notify_fd = 100,
        },
    };
    setup_command(&msg, NULL);
    expect_notify_caller(l, 100);

    l->tracked_fds = tracked;
    l->inactive_fds = inactive;
    TEST_ASSERT(inactive <= tracked);

    for (int i = 0; i < tracked; i++) {
        struct pollfd *pfd = &l->fds[i + INCOMING_MSG_PIPE];
        pfd->fd = i;
        if (i < tracked - inactive) {
            pfd->events = POLLIN;
        } else {
            pfd->events = 0;
        }

        connection_info *ci = calloc(1, sizeof(*ci));
        l->fd_info[i] = ci;
        *(int *)&ci->fd = i;  // cast to set const field
        TEST_ASSERT(ci);
    }

    int res = 1;
    ListenerTask_ReleaseMsg_Expect(l, &l->msgs[0]);
    ListenerCmd_CheckIncomingMessages(l, &res);
    TEST_ASSERT_EQUAL(0, res);

    if (remove_nth != tracked - 1) {
        TEST_ASSERT_NOT_EQUAL(remove_nth, l->fds[remove_nth + INCOMING_MSG_PIPE].fd);
        TEST_ASSERT_NOT_EQUAL(remove_nth, l->fd_info[remove_nth]->fd);
    }

    // The removed one should now be last
    TEST_ASSERT_EQUAL(tracked - 1, l->tracked_fds);
    TEST_ASSERT_EQUAL(remove_nth, l->fds[tracked - 1 + INCOMING_MSG_PIPE].fd);

    bool removing_inactive = (remove_nth >= tracked - inactive);
    int last_active = tracked - inactive - (removing_inactive ? 0 : 1);

    int now_active = 0;
    int now_inactive = 0;
    for (int i = 0; i < tracked - 1; i++) {
        struct pollfd *pfd = &l->fds[i + INCOMING_MSG_PIPE];
        if (pfd->events & POLLIN) {
            now_active++;
            // all active FDs must be contiguous at the beginning
            TEST_ASSERT(i < last_active);
        } else {
            now_inactive++;
        }
    }

    if (removing_inactive) {
        TEST_ASSERT_EQUAL(inactive - 1, now_inactive);
        TEST_ASSERT_EQUAL(tracked - inactive, now_active);
    } else {
        TEST_ASSERT_EQUAL(inactive, now_inactive);
        TEST_ASSERT_EQUAL(tracked - inactive - 1, now_active);
    }
}

/* Check invariants when deleting each socket R out of N sockets,
 * when M out of N are inactive. Try all cases up to N == limit. */
void test_remove_nth_parametric(void) {
    const int limit = 7;  // cmock uses excessive memory if > than this

    for (int tracked = 0; tracked < limit; tracked++) {
        for (int inactive = 0; inactive <= tracked; inactive++) {
            for (int remove_nth = 0; remove_nth < tracked; remove_nth++) {
                handle_remove_with_active_and_inactive_mix(tracked, inactive, remove_nth);
            }
        }
    }
}

void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_NULL_info_failure_case(void) {
void test_ListenerCmd_CheckIncomingMessages_should_handle_NULL_info_failure_case(void) {
    listener_msg msg = {
        .id = 1,
        .type = MSG_HOLD_RESPONSE,
@@ -230,7 +416,7 @@ void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_NULL_info_fai
    TEST_ASSERT_EQUAL(0, res);
}

void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_incoming_HOLD_RESPONSE_command(void) {
void test_ListenerCmd_CheckIncomingMessages_should_handle_incoming_HOLD_RESPONSE_command(void) {
    listener_msg msg = {
        .id = 1,
        .type = MSG_HOLD_RESPONSE,
@@ -260,7 +446,7 @@ void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_incoming_HOLD
    TEST_ASSERT_EQUAL(false, info.u.hold.has_result);
}

void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_incoming_EXPECT_command_when_result_is_saved(void) {
void test_ListenerCmd_CheckIncomingMessages_should_handle_incoming_EXPECT_command_when_result_is_saved(void) {
    listener_msg msg = {
        .id = 1,
        .type = MSG_EXPECT_RESPONSE,
@@ -304,7 +490,7 @@ void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_incoming_EXPE
    TEST_ASSERT_EQUAL(12345, hold_info.u.expect.result.u.success.seq_id);
}

void test_listener_ListenerCmd_CheckIncomingMessages_should_immediately_fail_incoming_EXPECT_command_when_corresponding_HOLD_has_an_error(void) {
void test_ListenerCmd_CheckIncomingMessages_should_immediately_fail_incoming_EXPECT_command_when_corresponding_HOLD_has_an_error(void) {
    listener_msg msg = {
        .id = 1,
        .type = MSG_EXPECT_RESPONSE,
@@ -347,7 +533,7 @@ void test_listener_ListenerCmd_CheckIncomingMessages_should_immediately_fail_inc
    
}

void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_incoming_EXPECT_command_when_no_result_is_saved(void) {
void test_ListenerCmd_CheckIncomingMessages_should_handle_incoming_EXPECT_command_when_no_result_is_saved(void) {
    listener_msg msg = {
        .id = 1,
        .type = MSG_EXPECT_RESPONSE,
@@ -378,7 +564,7 @@ void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_incoming_EXPE
    TEST_ASSERT_EQUAL(11, hold_info.timeout_sec);
}
    
void test_listener_ListenerCmd_CheckIncomingMessages_should_handle_incoming_SHUTDOWN_command(void) {
void test_ListenerCmd_CheckIncomingMessages_should_handle_incoming_SHUTDOWN_command(void) {
    listener_msg msg = {
        .id = 1,
        .type = MSG_SHUTDOWN,
Loading