Commit de7e5648 authored by Greg Williams's avatar Greg Williams
Browse files

Merge branch 'develop' into protobuf_rework

parents d7c8d2f9 d4979ba3
Loading
Loading
Loading
Loading
+72.3 KiB
Loading image diff...
+43 −0
Original line number Diff line number Diff line
title Kinetic C message handling, full request response cycle

User -> Client: KineticClient_Put (command)

Client -> Client: KineticOperation_SendRequest
note over Client: lock

Client -> Client: Reserve sequence ID
Client -> Client: Pack command & message
Client -> Client: Send request
note over Client: unlock

Client -> Client: pack request
activate Client

Client -> Listener: hold responses for <FD, SEQ_ID>
activate Listener

Client -> Drive: send request
activate Drive

Client -> Listener: expect response for <FD, SEQ_ID>
Listener -> Client: acknowledgement with backpressure

parallel {
    Client -> Client: backpressure
    deactivate Client
    Listener -> Listener: wait
}

Drive -> Listener: response
deactivate Drive

Listener -> Listener: unpack response

Listener -> Threadpool: Transfer callback with result code
activate Threadpool
deactivate Listener

Threadpool -->> Client: callback(status)
deactivate Threadpool

Client -> User: status
+55 −9
Original line number Diff line number Diff line
@@ -144,6 +144,20 @@ static void msg_handler(listener *l, listener_msg *pmsg) {
    ListenerTask_ReleaseMsg(l, pmsg);
}

/* Swap poll and connection info for tracked sockets, by array offset. */
static void swap(listener *l, int a, int b) {
    struct pollfd a_pfd = l->fds[a + INCOMING_MSG_PIPE];
    struct pollfd b_pfd = l->fds[b + INCOMING_MSG_PIPE];
    connection_info *a_ci = l->fd_info[a];
    connection_info *b_ci = l->fd_info[b];

    l->fds[b + INCOMING_MSG_PIPE] = a_pfd;
    l->fds[a + INCOMING_MSG_PIPE] = b_pfd;

    l->fd_info[a] = b_ci;
    l->fd_info[b] = a_ci;
}

static void add_socket(listener *l, connection_info *ci, int notify_fd) {
    /* TODO: if epoll, just register with the OS. */
    struct bus *b = l->bus;
@@ -167,8 +181,25 @@ static void add_socket(listener *l, connection_info *ci, int notify_fd) {
    l->fd_info[id] = ci;
    l->fds[id + INCOMING_MSG_PIPE].fd = ci->fd;
    l->fds[id + INCOMING_MSG_PIPE].events = POLLIN;

    /* If there are any inactive FDs, we need to swap the new last FD
     * and the first inactive FD so that the active and inactive FDs
     * remain contiguous. */
    if (l->inactive_fds > 0) {
        int first_inactive = l->tracked_fds - l->inactive_fds;
        swap(l, id, first_inactive);
    }

    l->tracked_fds++;

    for (int i = 0; i < l->tracked_fds; i++) {
        if (l->fds[i + INCOMING_MSG_PIPE].events & POLLIN) {
            assert(i < l->tracked_fds - l->inactive_fds);
        } else {
            assert(i >= l->tracked_fds - l->inactive_fds);
        }
    }

    /* Prime the pump by sinking 0 bytes and getting a size to expect. */
    bus_sink_cb_res_t sink_res = b->sink_cb(l->read_buf, 0, ci->udata);
    BUS_ASSERT(b, b->udata, sink_res.full_msg_buffer == NULL);  // should have nothing to handle yet
@@ -191,18 +222,33 @@ static void remove_socket(listener *l, int fd, int notify_fd) {

    /* Don't really close it, just drop info about it in the listener.
     * The client thread will actually free the structure, close SSL, etc. */
    for (int i = 0; i < l->tracked_fds; i++) {
        if (l->fds[i + INCOMING_MSG_PIPE].fd == fd) {
    for (int id = 0; id < l->tracked_fds; id++) {
        struct pollfd removing_pfd = l->fds[id + INCOMING_MSG_PIPE];
        if (removing_pfd.fd == fd) {
            bool is_active = (removing_pfd.events & POLLIN) > 0;
            if (l->tracked_fds > 1) {
                /* Swap pollfd CI and last ones. */
                struct pollfd pfd = l->fds[i + INCOMING_MSG_PIPE];
                l->fds[i + INCOMING_MSG_PIPE] = l->fds[l->tracked_fds - 1 + INCOMING_MSG_PIPE];
                l->fds[l->tracked_fds - 1 + INCOMING_MSG_PIPE] = pfd;
                connection_info *ci = l->fd_info[i];
                l->fd_info[i] = l->fd_info[l->tracked_fds - 1];
                l->fd_info[l->tracked_fds - 1] = ci;
                int last_active = l->tracked_fds - l->inactive_fds - 1;

                /* If removing active node and it isn't the last active one, swap them */
                if (is_active && id != last_active) {
                    assert(id < last_active);
                    swap(l, id, last_active);
                    id = last_active;
                }

                /* If node (which is either last active node or inactive) is not at the end,
                 * and there are inactive nodes, swap it with the last.*/
                int last = l->tracked_fds - 1;
                if (id < last) {
                    swap(l, id, last);
                    id = last;
                }

                /* The node is now at the end of the array. */
            }
            
            l->tracked_fds--;
            if (!is_active) { l->inactive_fds--; }
        }
    }
    /* CI will be freed by the client thread. */
+18 −2
Original line number Diff line number Diff line
@@ -151,11 +151,27 @@ typedef struct listener {

    size_t upstream_backpressure;

    uint16_t tracked_fds;
    /* tracked_fds + incoming_msg_pipe */
    uint16_t tracked_fds;       ///< FDs currently tracked by listener
    /** File descriptors that are inactive due to errors, but have not
     * yet been explicitly removed/closed by the client. */
    uint16_t inactive_fds;

    /** Tracked file descriptors, for polling.
     * 
     * fds[INCOMING_MSG_PIPE_ID (0)] is the incoming_msg_pipe, so the
     * listener's poll is awakened by incoming commands. fds[1] through
     * fds[l->tracked_fds - l->inactive_fds] are the file descriptors
     * which should be polled, and the remaining ones (if any) have been
     * moved to the end so poll() will not touch them. */
    struct pollfd fds[MAX_FDS + 1];

    /** The connection info, corresponding to the the file descriptors tracked in
     * l->fds. Unlike l->fds, these are not offset by one for the incoming message
     * pipe, i.e. l->fd_info[3] correspons to l->fds[3 + INCOMING_MSG_PIPE]. */
    connection_info *fd_info[MAX_FDS];

    bool error_occured;         ///< Flag indicating post-poll handling is necessary.

    /* Read buffer and it's size. Will be grown on demand. */
    size_t read_buf_size;
    uint8_t *read_buf;
+42 −2
Original line number Diff line number Diff line
@@ -39,6 +39,7 @@ static void set_error_for_socket(listener *l, int id,
    int fd, rx_error_t err);
static void process_unpacked_message(listener *l,
    connection_info *ci, bus_unpack_cb_res_t result);
static void move_errored_active_sockets_to_end(listener *l);

void ListenerIO_AttemptRecv(listener *l, int available) {
    /*   --> failure --> set 'closed' error on socket, don't die */
@@ -103,6 +104,13 @@ void ListenerIO_AttemptRecv(listener *l, int available) {
            set_error_for_socket(l, i, ci->fd, RX_ERROR_POLLHUP);
        }
    }

    if (l->error_occured) {  // only conditionally do this to avoid wasting CPU
        /* This is done outside of the polling loop, to avoid erroneously repeat-polling
         * or skipping any individual file descriptors. */
        move_errored_active_sockets_to_end(l);
        l->error_occured = false;
    }        
}
    
static ssize_t socket_read_plain(struct bus *b, listener *l, int pfd_i, connection_info *ci) {
@@ -255,6 +263,8 @@ static bool sink_socket_read(struct bus *b,
}

static void set_error_for_socket(listener *l, int id, int fd, rx_error_t err) {
    l->error_occured = true;

    /* Mark all pending messages on this socket as being failed due to error. */
    struct bus *b = l->bus;
    BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64,
@@ -288,7 +298,37 @@ static void set_error_for_socket(listener *l, int id, int fd, rx_error_t err) {
        }
        }
    }
    l->fds[id + INCOMING_MSG_PIPE].events &= ~POLLIN;

    connection_info *newly_inactive_ci = l->fd_info[id];
    newly_inactive_ci->error = err;
}

static void move_errored_active_sockets_to_end(listener *l) {
    for (uint16_t id = 0; id < l->tracked_fds - l->inactive_fds; id++) {
        connection_info *ci = l->fd_info[id];
        struct pollfd *pfd = &l->fds[id + INCOMING_MSG_PIPE];
        int fd = pfd->fd;
        if (ci->error < 0 && pfd->events & POLLIN) {
            pfd->events &= ~POLLIN;
            /* move socket to end, so it won't be poll'd and get repeated POLLHUP. */
            int last_active = l->tracked_fds - l->inactive_fds - 1;
            if (id != last_active) {
                fprintf(stderr, "swapping %u and %u\n", id, last_active);
                assert(l->fds[last_active + INCOMING_MSG_PIPE].fd != fd);
                struct pollfd newly_inactive_fd = l->fds[id + INCOMING_MSG_PIPE];
                struct pollfd last_active_fd = l->fds[last_active + INCOMING_MSG_PIPE];
                connection_info *last_active_ci = l->fd_info[last_active];
                /* Swap pollfds */
                l->fds[id + INCOMING_MSG_PIPE] = last_active_fd;
                l->fds[last_active + INCOMING_MSG_PIPE] = newly_inactive_fd;
                /* Swap connection_info pointers */
                l->fd_info[last_active] = ci;
                l->fd_info[id] = last_active_ci;
            }
            l->inactive_fds++;
            assert(l->inactive_fds <= l->tracked_fds);
        }
    }
}

static void process_unpacked_message(listener *l,
Loading