Commit 1704d925 authored by Scott Vokes's avatar Scott Vokes
Browse files

Restructure sender thread to eliminate race on message buffer free().

Now, the sender has complete control over its own execution. tx_info_t
event structs are atomically reserved / released as part of client
commands. Commands cause the client caller to block until they have
been processed.

Events which represent outgoing requests can cause the relevant FD to be
added to the set of sockets watched for write access, and polling checks
for commands that have been committed and for writeable sockets. Writes
proceed through states (enqueue, write, notify, release) in a tagged
union, and errored events are expired immediately.

All CI tests pass.

There is still some work necessary for cleanup on failed allocations,
and a race condition related to error handling as requests are being
added has been noted.
parent 175aca76
Loading
Loading
Loading
Loading
+28 −22
Original line number Diff line number Diff line
@@ -33,6 +33,7 @@
#include "threadpool.h"
#include "bus_internal_types.h"
#include "bus_ssl.h"
#include "util.h"
#include "yacht.h"

/* Function pointers for pthreads. */
@@ -192,6 +193,7 @@ bool bus_init(bus_config *config, struct bus_result *res) {
    res->bus = b;
    BUS_LOG(b, 1, LOG_INITIALIZATION, "initialized", config->bus_udata);
    return true;

cleanup:
    if (ss) {
        for (int i = 0; i < config->sender_count; i++) {
@@ -264,10 +266,6 @@ static boxed_msg *box_msg(struct bus *b, bus_user_msg *msg) {
    return box;
}

static bool is_resumable_io_error(int errno_) {
    return errno_ == EAGAIN || errno_ == EINTR || errno_ == EWOULDBLOCK;
}

bool bus_send_request(struct bus *b, bus_user_msg *msg)
{
    if (b == NULL || msg == NULL || msg->fd == 0) {
@@ -279,20 +277,11 @@ bool bus_send_request(struct bus *b, bus_user_msg *msg)
        return false;
    }

    int complete_fd = -1;

    int s_id = sender_id_of_socket(b, msg->fd);
    struct sender *s = b->senders[s_id];

    /* Pass boxed message to the sender */
    if (!sender_enqueue_message(s, box, &complete_fd)) {
        BUS_LOG(b, 3, LOG_SENDING_REQUEST, "sender_enqueue_message failed", b->udata);
        return false;
    }

    /* block until delivery status */
    BUS_LOG(b, 3, LOG_SENDING_REQUEST, "Sending request...", b->udata);
    return poll_on_completion(b, complete_fd);
    return sender_send_request(s, box);
}

static bool poll_on_completion(struct bus *b, int fd) {
@@ -301,6 +290,10 @@ static bool poll_on_completion(struct bus *b, int fd) {
    fds[0].fd = fd;
    fds[0].events = POLLIN;

    /* TODO REFACTOR this should be reused between bus, sender, and listener,
     *     or be moved into the listener.
     *     The sender has its own blocking polling for commands. */

    /* FIXME: compare this to TCP timeouts -- try to prevent sender
     * succeeding but failing to notify client. */
    const int TIMEOUT_SECONDS = 10;
@@ -310,7 +303,7 @@ static bool poll_on_completion(struct bus *b, int fd) {
        BUS_LOG(b, 5, LOG_SENDING_REQUEST, "Polling on completion...tick...", b->udata);
        int res = poll(fds, 1, ONE_SECOND);
        if (res == -1) {
            if (is_resumable_io_error(errno)) {
            if (util_is_resumable_io_error(errno)) {
                BUS_LOG(b, 3, LOG_SENDING_REQUEST, "Polling on completion...EAGAIN", b->udata);
                if (errno == EINTR && i > 0) { i--; }
                errno = 0;
@@ -337,7 +330,7 @@ static bool poll_on_completion(struct bus *b, int fd) {
                BUS_LOG(b, 3, LOG_SENDING_REQUEST, "sent!", b->udata);
                return true;
            } else if (sz == -1) {
                if (is_resumable_io_error(errno)) {
                if (util_is_resumable_io_error(errno)) {
                    errno = 0;
                } else {
                    assert(false);
@@ -346,7 +339,7 @@ static bool poll_on_completion(struct bus *b, int fd) {
            }
        }
    }
    BUS_LOG(b, 2, LOG_SENDING_REQUEST, "failed to send (timeout on sender)", b->udata);
    BUS_LOG(b, 2, LOG_SENDING_REQUEST, "failed (timeout)", b->udata);

    #if 0
    assert(false);
@@ -391,13 +384,15 @@ const char *bus_log_event_str(log_event_t event) {
}

bool bus_register_socket(struct bus *b, bus_socket_t type, int fd, void *udata) {
    /* Register a socket internally with a listener. */
    /* Register a socket internally with a sender and listener. */
    int s_id = sender_id_of_socket(b, fd);
    int l_id = listener_id_of_socket(b, fd);

    BUS_LOG_SNPRINTF(b, 2, LOG_SOCKET_REGISTERED, b->udata, 64,
        "registering socket %d", fd);

    /* Spread sockets throughout the different listener processes. */
    /* Spread sockets throughout the different sender & listener processes. */
    struct sender *s = b->senders[s_id];
    struct listener *l = b->listeners[l_id];
    
    int pipes[2];
@@ -436,9 +431,14 @@ bool bus_register_socket(struct bus *b, bus_socket_t type, int fd, void *udata)
        goto cleanup;
    }

    bool res = listener_add_socket(l, ci, pipe_in);
    bool res = false;
    res = sender_register_socket(s, fd, ci->ssl);
    if (!res) { goto cleanup; }

    res = listener_add_socket(l, ci, pipe_in);
    if (!res) { goto cleanup; }

    /* FIXME: Move this into listener_add_socket? */
    BUS_LOG(b, 2, LOG_SOCKET_REGISTERED, "polling on socket add...", b->udata);
    bool completed = poll_on_completion(b, pipe_out);
    if (!completed) { goto cleanup; }
@@ -460,15 +460,19 @@ cleanup:

/* Free metadata about a socket that has been disconnected. */
bool bus_release_socket(struct bus *b, int fd) {
    /* Register a socket internally with a listener. */
    int s_id = sender_id_of_socket(b, fd);
    int l_id = listener_id_of_socket(b, fd);

    BUS_LOG_SNPRINTF(b, 2, LOG_SOCKET_REGISTERED, b->udata, 64,
        "forgetting socket %d", fd);

    /* Spread sockets throughout the different listener processes. */
    struct sender *s = b->senders[s_id];
    struct listener *l = b->listeners[l_id];

    if (!sender_remove_socket(s, fd)) {
        return false;
    }

    if (!listener_remove_socket(l, fd)) {
        return false;           /* couldn't send msg to listener */
    }
@@ -500,7 +504,9 @@ bool bus_shutdown(bus *b) {
    for (int i = 0; i < b->sender_count; i++) {
        int off = 0;
        if (!b->joined[i + off]) {
            BUS_LOG(b, 2, LOG_SHUTDOWN, "sender_shutdown...", b->udata);
            while (!sender_shutdown(b->senders[i])) {
                BUS_LOG(b, 2, LOG_SHUTDOWN, "sender_shutdown... (retry)", b->udata);
                sleep(1);
            }
            void *unused = NULL;
+1 −0
Original line number Diff line number Diff line
@@ -177,6 +177,7 @@ typedef enum {
    BUS_SEND_RX_TIMEOUT = -3,
    BUS_SEND_RX_FAILURE = -4,
    BUS_SEND_BAD_RESPONSE = -5,
    BUS_SEND_UNREGISTERED_SOCKET = -6,
} bus_send_status_t;

/* Result from attempting to configure a message bus. */
+1 −1
Original line number Diff line number Diff line
@@ -283,7 +283,6 @@ void *listener_mainloop(void *arg) {

static void set_error_for_socket(listener *l, int id, int fd, rx_error_t err) {
    /* Mark all pending messages on this socket as being failed due to error. */

    for (int i = 0; i < MAX_PENDING_MESSAGES; i++) {
        rx_info_t *info = &l->rx_info[i];
        if (!info->active) {
@@ -544,6 +543,7 @@ static void process_unpacked_message(listener *l,
             *     not flush them out as unsolicited until the queue of messages
             *     is fully processed. Otherwise, they sit in the queue stuck and
             *     slowing things down. */
            /* TODO: Do unsolicited status messages always have a seq_id of 0? */
            BUS_LOG_SNPRINTF(b, 1, LOG_LISTENER, b->udata, 128,
                "Couldn't find info for seq_id %lld, msg %p",
                seq_id, opaque_msg);
+715 −392

File changed.

Preview size limit exceeded, changes collapsed.

+12 −2
Original line number Diff line number Diff line
@@ -28,12 +28,22 @@ struct sender;

struct sender *sender_init(struct bus *b, struct bus_config *cfg);

/* Sender commands.
 *
 * These all block until the message has been processed (a command
 * has been sent over an outgoing socket, a socket has been registered
 * or unregistered, or the sender has shut down) or delivery has
 * failed.
 *
 * Backpressure is handled internally. */
bool sender_register_socket(struct sender *s, int fd, SSL *ssl);
bool sender_remove_socket(struct sender *s, int fd);

/* Send an outgoing message.
 * 
 * This blocks until the message has either been sent over a outgoing
 * socket or delivery has failed, to provide counterpressure. */
bool sender_enqueue_message(struct sender *s,
    boxed_msg *msg, int *response_fd);
bool sender_send_request(struct sender *s, boxed_msg *box);

bool sender_shutdown(struct sender *s);

Loading