Commit 2227d634 authored by Scott Vokes's avatar Scott Vokes
Browse files

WIP: Cleanup / clarification of dataflow and shutdown process.

parent 4ec0a88f
Loading
Loading
Loading
Loading
+39 −27
Original line number Diff line number Diff line
@@ -36,9 +36,7 @@
#include "bus_ssl.h"
#include "util.h"
#include "yacht.h"

/* Function pointers for pthreads. */
void *listener_mainloop(void *arg);
#include "atomic.h"

static bool poll_on_completion(struct bus *b, int fd);
static int listener_id_of_socket(struct bus *b, int fd);
@@ -47,12 +45,13 @@ static void noop_log_cb(log_event_t event,
static void noop_error_cb(bus_unpack_cb_res_t result, void *socket_udata);
static bool attempt_to_increase_resource_limits(struct bus *b);

/* Function pointer for pthread start function. */
void *listener_mainloop(void *arg);

static void set_defaults(bus_config *cfg) {
    if (cfg->listener_count == 0) { cfg->listener_count = 1; }
}

#define DEF_FD_SET_SIZE2 4

bool bus_init(bus_config *config, struct bus_result *res) {
    if (res == NULL) { return false; }
    if (config == NULL) {
@@ -78,7 +77,7 @@ bool bus_init(bus_config *config, struct bus_result *res) {

    res->status = BUS_INIT_ERROR_ALLOC_FAIL;

    bool log_lock_init = false;
    uint8_t locks_initialized = 0;
    struct listener **ls = NULL;     /* listeners */
    struct threadpool *tp = NULL;
    bool *joined = NULL;
@@ -101,12 +100,12 @@ bool bus_init(bus_config *config, struct bus_result *res) {
        res->status = BUS_INIT_ERROR_MUTEX_INIT_FAIL;
        goto cleanup;
    }
    locks_initialized++;
    if (0 != pthread_rwlock_init(&b->fd_set_lock, NULL)) {
        res->status = BUS_INIT_ERROR_MUTEX_INIT_FAIL;
        goto cleanup;
    }

    log_lock_init = true;
    locks_initialized++;

    attempt_to_increase_resource_limits(b);

@@ -177,8 +176,10 @@ cleanup:
    if (tp) { threadpool_free(tp); }
    if (joined) { free(joined); }
    if (b) {
        if (log_lock_init) {
        if (locks_initialized > 1) {
            pthread_rwlock_destroy(&b->fd_set_lock);
        }
        if (locks_initialized > 0) {
            pthread_mutex_destroy(&b->log_lock);
        }
        free(b);
@@ -265,8 +266,8 @@ static boxed_msg *box_msg(struct bus *b, bus_user_msg *msg) {
    box->out_seq_id = msg->seq_id;
    box->out_msg_size = msg->msg_size;

    /* Store message by pointer, since the client thread using it is blocked
     * until we are done sending. */
    /* Store message by pointer, since the client code calling in is
     * blocked until we are done sending. */
    box->out_msg = msg->msg;

    box->cb = msg->cb;
@@ -304,12 +305,12 @@ static bool poll_on_completion(struct bus *b, int fd) {
        int res = poll(fds, 1, -1);
        if (res == -1) {
            if (util_is_resumable_io_error(errno)) {
                BUS_LOG_SNPRINTF(b, 3, LOG_SENDING_REQUEST, b->udata, 64,
                BUS_LOG_SNPRINTF(b, 5, LOG_SENDING_REQUEST, b->udata, 64,
                    "poll_on_completion, resumable IO error %d", errno);
                errno = 0;
                continue;
            } else {
                BUS_LOG_SNPRINTF(b, 3, LOG_SENDING_REQUEST, b->udata, 64,
                BUS_LOG_SNPRINTF(b, 1, LOG_SENDING_REQUEST, b->udata, 64,
                    "poll_on_completion, non-resumable IO error %d", errno);
                return false;
            }
@@ -330,25 +331,28 @@ static bool poll_on_completion(struct bus *b, int fd) {
                assert(read_buf[0] == LISTENER_MSG_TAG);

                msec = (read_buf[1] << 0) + (read_buf[2] << 8);
                bus_backpressure_delay(b, msec, 3);
                bus_backpressure_delay(b, msec, LISTENER_BACKPRESSURE_SHIFT);
                BUS_LOG(b, 4, LOG_SENDING_REQUEST, "sent!", b->udata);
                return true;
            } else if (sz == -1) {
                if (util_is_resumable_io_error(errno)) {
                    BUS_LOG_SNPRINTF(b, 5, LOG_SENDING_REQUEST, b->udata, 64,
                        "poll_on_completion read, resumable IO error %d", errno);
                    errno = 0;
                    continue;
                } else {
                    BUS_LOG_SNPRINTF(b, 3, LOG_SENDING_REQUEST, b->udata, 64,
                    BUS_LOG_SNPRINTF(b, 2, LOG_SENDING_REQUEST, b->udata, 64,
                        "poll_on_completion read, non-resumable IO error %d", errno);
                    errno = 0;
                    return false;
                }
            } else {
                BUS_LOG_SNPRINTF(b, 3, LOG_SENDING_REQUEST, b->udata, 64,
                BUS_LOG_SNPRINTF(b, 1, LOG_SENDING_REQUEST, b->udata, 64,
                    "poll_on_completion bad read size %zd", sz);
                return false;
            }
        } else {
            BUS_LOG_SNPRINTF(b, 3, LOG_SENDING_REQUEST, b->udata, 64,
            BUS_LOG_SNPRINTF(b, 0, LOG_SENDING_REQUEST, b->udata, 64,
                "poll_on_completion, blocking forever returned 0, errno %d", errno);
            assert(false);
        }
@@ -516,14 +520,20 @@ static void free_connection_cb(void *value, void *udata) {
}

bool bus_shutdown(bus *b) {
    /* TODO: thread safety for shutdown being called concurrently on several client threads?
     * Maybe use CAS-ing the fd_set to NULL as a flag.*/
    struct yacht *fd_set = b->fd_set;
    b->fd_set = NULL;
    for (;;) {
        shutdown_state_t ss = b->shutdown_state;
        /* Another thread is already shutting things down. */
        if (ss != SHUTDOWN_STATE_RUNNING) { return false; }
        if (ATOMIC_BOOL_COMPARE_AND_SWAP(&b->shutdown_state,
                SHUTDOWN_STATE_RUNNING, SHUTDOWN_STATE_SHUTTING_DOWN)) {
            break;
        }
    }

    if (fd_set) {
    if (b->fd_set) {
        BUS_LOG(b, 2, LOG_SHUTDOWN, "removing all connections", b->udata);
        yacht_free(fd_set, free_connection_cb, b);
        yacht_free(b->fd_set, free_connection_cb, b);
        b->fd_set = NULL;
    }

    BUS_LOG(b, 2, LOG_SHUTDOWN, "shutting down listener threads", b->udata);
@@ -547,6 +557,7 @@ bool bus_shutdown(bus *b) {
    }

    BUS_LOG(b, 2, LOG_SHUTDOWN, "done with shutdown", b->udata);
    b->shutdown_state = SHUTDOWN_STATE_HALTED;
    return true;
}

@@ -608,7 +619,10 @@ bool bus_process_boxed_message(struct bus *b,

void bus_free(bus *b) {
    if (b == NULL) { return; }
    bus_shutdown(b);
    while (b->shutdown_state != SHUTDOWN_STATE_HALTED) {
        if (bus_shutdown(b)) { break; }
        poll(NULL, 0, 10);  // sleep 10 msec
    }

    for (int i = 0; i < b->listener_count; i++) {
        BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128,
@@ -632,14 +646,12 @@ void bus_free(bus *b) {
    }
    BUS_LOG(b, 3, LOG_SHUTDOWN, "threadpool_free", b->udata);
    threadpool_free(b->threadpool);

    free(b->joined);
    free(b->threads);

    pthread_rwlock_destroy(&b->fd_set_lock);
    pthread_mutex_destroy(&b->log_lock);

    bus_ssl_ctx_free(b);

    free(b);
}

+11 −0
Original line number Diff line number Diff line
@@ -59,8 +59,15 @@ typedef struct boxed_msg {
    size_t out_sent_size;
} boxed_msg;

// Special "NO SSL" value, to distinguish from a NULL SSL handle.
#define BUS_NO_SSL ((SSL *)-2)

typedef enum {
    SHUTDOWN_STATE_RUNNING = 0,
    SHUTDOWN_STATE_SHUTTING_DOWN,
    SHUTDOWN_STATE_HALTED,
} shutdown_state_t;

/* Message bus. */
typedef struct bus {
    bus_sink_cb *sink_cb;
@@ -78,6 +85,7 @@ typedef struct bus {

    bool *joined;
    pthread_t *threads;
    shutdown_state_t shutdown_state;

    struct threadpool *threadpool;
    SSL_CTX *ssl_ctx;
@@ -122,4 +130,7 @@ typedef struct {
/* Arbitrary byte used to tag writes from the listener. */
#define LISTENER_MSG_TAG 0x15

/* Starting size^2 for file descriptor hash table. */
#define DEF_FD_SET_SIZE2 4

#endif
+5 −5
Original line number Diff line number Diff line
@@ -45,7 +45,7 @@ bool bus_ssl_init(struct bus *b) {
    return true;
}

/* Do an SSL / TLS shake for a connection. Blocking.
/* Do an SSL / TLS handshake for a connection. Blocking.
 * Returns whether the connection succeeded. */
SSL *bus_ssl_connect(struct bus *b, int fd) {
    SSL *ssl = NULL;
@@ -181,7 +181,7 @@ static bool do_blocking_connection(struct bus *b, SSL *ssl, int fd) {
                        } else {
                            unsigned long errval = ERR_get_error();
                            char ebuf[256];
                            BUS_LOG_SNPRINTF(b, 5, LOG_SOCKET_REGISTERED, b->udata, 128,
                            BUS_LOG_SNPRINTF(b, 1, LOG_SOCKET_REGISTERED, b->udata, 128,
                                "socket %d: ERROR -- %s", fd, ERR_error_string(errval, ebuf));
                        }
                    }
@@ -190,7 +190,7 @@ static bool do_blocking_connection(struct bus *b, SSL *ssl, int fd) {
                    {
                        unsigned long errval = ERR_get_error();
                        char ebuf[256];
                        BUS_LOG_SNPRINTF(b, 5, LOG_SOCKET_REGISTERED, b->udata, 128,
                        BUS_LOG_SNPRINTF(b, 1, LOG_SOCKET_REGISTERED, b->udata, 128,
                            "socket %d: ERROR %d -- %s", fd, reason, ERR_error_string(errval, ebuf));
                        assert(false);
                    }
@@ -203,11 +203,11 @@ static bool do_blocking_connection(struct bus *b, SSL *ssl, int fd) {
                    fds[0].events = (POLLIN | POLLOUT);
                }
            } else if (fds[0].revents & POLLHUP) {
                BUS_LOG_SNPRINTF(b, 2, LOG_SOCKET_REGISTERED, b->udata, 128,
                BUS_LOG_SNPRINTF(b, 1, LOG_SOCKET_REGISTERED, b->udata, 128,
                    "SSL_Connect: HUP on %d", fd);
                return false;
            } else if (fds[0].revents & POLLERR) {
                BUS_LOG_SNPRINTF(b, 2, LOG_SOCKET_REGISTERED, b->udata, 128,
                BUS_LOG_SNPRINTF(b,12, LOG_SOCKET_REGISTERED, b->udata, 128,
                    "SSL_Connect: ERR on %d", fd);
                return false;
            }
+24 −15
Original line number Diff line number Diff line
@@ -42,10 +42,10 @@ struct listener *listener_init(struct bus *b, struct bus_config *cfg) {
    struct listener *l = calloc(1, sizeof(*l));
    if (l == NULL) { return NULL; }

    assert(b);
    l->bus = b;
    BUS_LOG(b, 2, LOG_LISTENER, "init", b->udata);

    int pipe_count = 0;
    int pipes[2];
    if (0 != pipe(pipes)) {
        free(l);
@@ -56,7 +56,7 @@ struct listener *listener_init(struct bus *b, struct bus_config *cfg) {
    l->incoming_msg_pipe = pipes[0];
    l->fds[INCOMING_MSG_PIPE_ID].fd = l->incoming_msg_pipe;
    l->fds[INCOMING_MSG_PIPE_ID].events = POLLIN;
    l->shutdown_notify_fd = SHUTDOWN_NO_FD;
    l->shutdown_notify_fd = LISTENER_NO_FD;

    for (int i = 0; i < MAX_PENDING_MESSAGES; i++) {
        rx_info_t *info = &l->rx_info[i];
@@ -70,7 +70,7 @@ struct listener *listener_init(struct bus *b, struct bus_config *cfg) {
    }
    l->rx_info_freelist = &l->rx_info[0];

    for (pipe_count = 0; pipe_count < MAX_QUEUE_MESSAGES; pipe_count++) {
    for (int pipe_count = 0; pipe_count < MAX_QUEUE_MESSAGES; pipe_count++) {
        listener_msg *msg = &l->msgs[pipe_count];
        uint8_t *p_id = (uint8_t *)&msg->id;
        *p_id = pipe_count;  /* Set (const) ID. */
@@ -81,8 +81,8 @@ struct listener *listener_init(struct bus *b, struct bus_config *cfg) {
                close(msg->pipes[0]);
                close(msg->pipes[1]);
            }
            close(pipes[0]);
            close(pipes[1]);
            close(l->commit_pipe);
            close(l->incoming_msg_pipe);
            free(l);
            return NULL;
        }
@@ -232,7 +232,9 @@ bool listener_shutdown(struct listener *l, int *notify_fd) {

void listener_free(struct listener *l) {
    struct bus *b = l->bus;
    /* assert: pthread must be join'd. */
    /* Thread has joined but data has not been freed yet. */
    assert(l->shutdown_notify_fd == LISTENER_SHUTDOWN_COMPLETE_FD);

    if (l) {
        for (int i = 0; i < MAX_PENDING_MESSAGES; i++) {
            rx_info_t *info = &l->rx_info[i];
@@ -244,6 +246,9 @@ void listener_free(struct listener *l) {
                break;
            case RIS_EXPECT:
                if (info->u.expect.box) {
                    /* TODO: This can leak memory, since the caller's
                     * callback is not being called. It should be called
                     * with BUS_SEND_RX_FAILURE, if it's safe to do so. */
                    free(info->u.expect.box);
                    info->u.expect.box = NULL;
                }
@@ -270,6 +275,9 @@ void listener_free(struct listener *l) {
            default:
                break;
            }

            close(msg->pipes[0]);
            close(msg->pipes[1]);
        }

        if (l->read_buf) {
@@ -304,7 +312,7 @@ void *listener_mainloop(void *arg) {
     * communication is managed at the command interface, so it doesn't
     * need any internal locking. */

    while (self->shutdown_notify_fd == SHUTDOWN_NO_FD) {
    while (self->shutdown_notify_fd == LISTENER_NO_FD) {
        gettimeofday(&tv, NULL);  // TODO: clock_gettime
        time_t cur_sec = tv.tv_sec;
        if (cur_sec != last_sec) {
@@ -341,6 +349,7 @@ void *listener_mainloop(void *arg) {
    }

    notify_caller(self->shutdown_notify_fd);
    self->shutdown_notify_fd = LISTENER_SHUTDOWN_COMPLETE_FD;
    return NULL;
}

@@ -967,10 +976,10 @@ static void release_rx_info(struct listener *l, rx_info_t *info) {
    switch (info->state) {
    case RIS_HOLD:
        if (info->u.hold.has_result) {
            /* FIXME: If we have a message that timed out, we need to
             * free it, but don't know how. We should never get here,
             * because it means the sender finished sending the message,
             * but the listener never got the handler callback. */
            /* If we have a message that timed out, we need to free it,
             * but don't know how. We should never get here, because it
             * means the sender finished sending the message, but the
             * listener never got the handler callback. */

            if (info->u.hold.result.ok) {
                void *msg = info->u.hold.result.u.success.msg;
@@ -985,7 +994,6 @@ static void release_rx_info(struct listener *l, rx_info_t *info) {
                    BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 128,
                        "LEAKING RESULT %p", (void *)&info->u.hold.result);
                }                
                
            }
        }
        break;
@@ -1052,6 +1060,7 @@ static listener_msg *get_free_msg(listener *l) {
                        nanosleep(&ts, NULL);
                    }
                    BUS_ASSERT(b, b->udata, head->type == MSG_NONE);
                    memset(&head->u, 0, sizeof(head->u));
                    return head;
                }
            }
@@ -1143,9 +1152,10 @@ static void msg_handler(listener *l, listener_msg *pmsg) {
}

static void notify_caller(int fd) {
    if (fd == -1) { return; }
    uint8_t reply_buf[sizeof(uint8_t) + sizeof(uint16_t)] = {LISTENER_MSG_TAG};

    /* TODO: reply_buf[1:2] can be little-endian backpressure */
    /* TODO: reply_buf[1:2] can be little-endian backpressure.  */

    for (;;) {
        ssize_t wres = write(fd, reply_buf, sizeof(reply_buf));
@@ -1389,7 +1399,6 @@ static void expect_response(listener *l, struct boxed_msg *box) {
        info->u.expect.error = RX_ERROR_NONE;
        info->u.expect.has_result = false;
        /* Switch over to sender's transferred timeout */
        info->timeout_sec = box->timeout_sec;
        notify_message_failure(l, info, BUS_SEND_RX_TIMEOUT_EXPECT);
    }
}
+8 −0
Original line number Diff line number Diff line
@@ -23,6 +23,14 @@
#include "bus_types.h"
#include "bus_internal_types.h"

/* How many bits to >> the backpressure value from commands
 * delivered to the listener. */
#define LISTENER_BACKPRESSURE_SHIFT 0 /* TODO */

/* How many bits to >> the backpressure value from the listener when a
 * send has completed. */
#define LISTENER_EXPECT_BACKPRESSURE_SHIFT 7

/* Manager of incoming messages from drives, both responses and
 * unsolicited status updates. */
struct listener;
Loading