Commit dd36eaf4 authored by Greg Williams's avatar Greg Williams
Browse files

Merge branch 'develop' into protobuf_rework

parents e58b0e28 14c62385
Loading
Loading
Loading
Loading
+46 −68
Original line number Diff line number Diff line
@@ -61,7 +61,7 @@ int completion_pipe = -1;
void *unused = NULL;
#endif

bool bus_init(bus_config *config, struct bus_result *res) {
bool Bus_Init(bus_config *config, struct bus_result *res) {
    if (res == NULL) { return false; }
    if (config == NULL) {
        res->status = BUS_INIT_ERROR_NULL;
@@ -96,7 +96,7 @@ bool bus_init(bus_config *config, struct bus_result *res) {
    bus *b = calloc(1, sizeof(*b));
    if (b == NULL) { goto cleanup; }

    if (!bus_ssl_init(b)) { goto cleanup; }
    if (!BusSSL_Init(b)) { goto cleanup; }

    b->sink_cb = config->sink_cb;
    b->unpack_cb = config->unpack_cb;
@@ -105,11 +105,6 @@ bool bus_init(bus_config *config, struct bus_result *res) {
    b->log_cb = config->log_cb;
    b->log_level = config->log_level;
    b->udata = config->bus_udata;
    if (0 != pthread_mutex_init(&b->log_lock, NULL)) {
        res->status = BUS_INIT_ERROR_MUTEX_INIT_FAIL;
        goto cleanup;
    }
    locks_initialized++;
    if (0 != pthread_mutex_init(&b->fd_set_lock, NULL)) {
        res->status = BUS_INIT_ERROR_MUTEX_INIT_FAIL;
        goto cleanup;
@@ -127,7 +122,7 @@ bool bus_init(bus_config *config, struct bus_result *res) {
    }

    for (int i = 0; i < config->listener_count; i++) {
        ls[i] = listener_init(b, config);
        ls[i] = Listener_Init(b, config);
        if (ls[i] == NULL) {
            res->status = BUS_INIT_ERROR_LISTENER_INIT_FAIL;
            goto cleanup;
@@ -137,7 +132,7 @@ bool bus_init(bus_config *config, struct bus_result *res) {
        }
    }

    tp = threadpool_init(&config->threadpool_cfg);
    tp = Threadpool_Init(&config->threadpool_cfg);
    if (tp == NULL) {
        res->status = BUS_INIT_ERROR_THREADPOOL_INIT_FAIL;
        goto cleanup;
@@ -150,7 +145,7 @@ bool bus_init(bus_config *config, struct bus_result *res) {
        goto cleanup;
    }

    fd_set = yacht_init(DEF_FD_SET_SIZE2);
    fd_set = Yacht_Init(DEF_FD_SET_SIZE2);
    if (fd_set == NULL) {
        goto cleanup;
    }
@@ -178,24 +173,21 @@ bool bus_init(bus_config *config, struct bus_result *res) {
cleanup:
    if (ls) {
        for (int i = 0; i < config->listener_count; i++) {
            if (ls[i]) { listener_free(ls[i]); }
            if (ls[i]) { Listener_Free(ls[i]); }
        }
        free(ls);
    }
    if (tp) { threadpool_free(tp); }
    if (tp) { Threadpool_Free(tp); }
    if (joined) { free(joined); }
    if (b) {
        if (locks_initialized > 1) {
            pthread_mutex_destroy(&b->fd_set_lock);
        }
        if (locks_initialized > 0) {
            pthread_mutex_destroy(&b->log_lock);
        }
        free(b);
    }

    if (threads) { free(threads); }
    if (fd_set) { yacht_free(fd_set, NULL, NULL); }
    if (fd_set) { Yacht_Free(fd_set, NULL, NULL); }

    return false;
}
@@ -259,7 +251,7 @@ static boxed_msg *box_msg(struct bus *b, bus_user_msg *msg) {
    void *value = NULL;
#endif
    connection_info *ci = NULL;
    if (yacht_get(b->fd_set, box->fd, &value)) {
    if (Yacht_Get(b->fd_set, box->fd, &value)) {
        ci = (connection_info *)value;
    }
    if (0 != pthread_mutex_unlock(&b->fd_set_lock)) { assert(false); }
@@ -302,7 +294,7 @@ static boxed_msg *box_msg(struct bus *b, bus_user_msg *msg) {
    return box;
}

bool bus_send_request(struct bus *b, bus_user_msg *msg)
bool Bus_SendRequest(struct bus *b, bus_user_msg *msg)
{
    if (b == NULL || msg == NULL || msg->fd == -1) {
        return false;
@@ -315,7 +307,7 @@ bool bus_send_request(struct bus *b, bus_user_msg *msg)

    BUS_LOG_SNPRINTF(b, 3-0, LOG_SENDING_REQUEST, b->udata, 64,
        "Sending request <fd:%d, seq_id:%lld>", msg->fd, (long long)msg->seq_id);
    bool res = send_do_blocking_send(b, box);
    bool res = Send_DoBlockingSend(b, box);
    BUS_LOG_SNPRINTF(b, 3, LOG_SENDING_REQUEST, b->udata, 64,
        "...request sent, result %d", res);

@@ -335,12 +327,12 @@ static int listener_id_of_socket(struct bus *b, int fd) {
    return fd % b->listener_count;
}

struct listener *bus_get_listener_for_socket(struct bus *b, int fd) {
struct listener *Bus_GetListenerForSocket(struct bus *b, int fd) {
    return b->listeners[listener_id_of_socket(b, fd)];
}

/* Get the string key for a log event ID. */
const char *bus_log_event_str(log_event_t event) {
const char *Bus_LogEventStr(log_event_t event) {
    switch (event) {
    case LOG_INITIALIZATION: return "INITIALIZATION";
    case LOG_NEW_CLIENT: return "NEW_CLIENT";
@@ -355,7 +347,7 @@ const char *bus_log_event_str(log_event_t event) {
    }
}

bool bus_register_socket(struct bus *b, bus_socket_t type, int fd, void *udata) {
bool Bus_RegisterSocket(struct bus *b, bus_socket_t type, int fd, void *udata) {
    /* Register a socket internally with the listener. */
    int l_id = listener_id_of_socket(b, fd);

@@ -377,7 +369,7 @@ bool bus_register_socket(struct bus *b, bus_socket_t type, int fd, void *udata)

    SSL *ssl = NULL;
    if (type == BUS_SOCKET_SSL) {
        ssl = bus_ssl_connect(b, fd);
        ssl = BusSSL_Connect(b, fd);
        if (ssl == NULL) { goto cleanup; }
    } else {
        ssl = BUS_NO_SSL;
@@ -394,7 +386,7 @@ bool bus_register_socket(struct bus *b, bus_socket_t type, int fd, void *udata)
    #endif
    /* Lock hash table and save whether this FD uses SSL. */
    if (0 != pthread_mutex_lock(&b->fd_set_lock)) { assert(false); }
    bool set_ok = yacht_set(b->fd_set, fd, ci, &old_value);
    bool set_ok = Yacht_Set(b->fd_set, fd, ci, &old_value);
    if (0 != pthread_mutex_unlock(&b->fd_set_lock)) { assert(false); }

    if (set_ok) {
@@ -407,11 +399,11 @@ bool bus_register_socket(struct bus *b, bus_socket_t type, int fd, void *udata)
    #ifndef TEST
    int completion_pipe = -1;
    #endif
    res = listener_add_socket(l, ci, &completion_pipe);
    res = Listener_AddSocket(l, ci, &completion_pipe);
    if (!res) { goto cleanup; }

    BUS_LOG(b, 2, LOG_SOCKET_REGISTERED, "polling on socket add...", b->udata);
    bool completed = bus_poll_on_completion(b, completion_pipe);
    bool completed = BusPoll_OnCompletion(b, completion_pipe);
    if (!completed) { goto cleanup; }

    BUS_LOG(b, 2, LOG_SOCKET_REGISTERED, "successfully added socket", b->udata);
@@ -425,7 +417,7 @@ cleanup:
}

/* Free metadata about a socket that has been disconnected. */
bool bus_release_socket(struct bus *b, int fd, void **socket_udata_out) {
bool Bus_ReleaseSocket(struct bus *b, int fd, void **socket_udata_out) {
    int l_id = listener_id_of_socket(b, fd);

    BUS_LOG_SNPRINTF(b, 2, LOG_SOCKET_REGISTERED, b->udata, 64,
@@ -436,12 +428,12 @@ bool bus_release_socket(struct bus *b, int fd, void **socket_udata_out) {
    #ifndef TEST
    int completion_pipe = -1;
    #endif
    if (!listener_remove_socket(l, fd, &completion_pipe)) {
    if (!Listener_RemoveSocket(l, fd, &completion_pipe)) {
        return false;           /* couldn't send msg to listener */
    }

    assert(completion_pipe != -1);
    bool completed = bus_poll_on_completion(b, completion_pipe);
    bool completed = BusPoll_OnCompletion(b, completion_pipe);
    if (!completed) {           /* listener hung up while waiting */
        return false;
    }
@@ -451,7 +443,7 @@ bool bus_release_socket(struct bus *b, int fd, void **socket_udata_out) {
    void *old_value = NULL;
    #endif
    if (0 != pthread_mutex_lock(&b->fd_set_lock)) { assert(false); }
    bool rm_ok = yacht_remove(b->fd_set, fd, &old_value);
    bool rm_ok = Yacht_Remove(b->fd_set, fd, &old_value);
    if (0 != pthread_mutex_unlock(&b->fd_set_lock)) { assert(false); }
    if (!rm_ok) {
        return false;
@@ -467,18 +459,13 @@ bool bus_release_socket(struct bus *b, int fd, void **socket_udata_out) {
    if (ci->ssl == BUS_NO_SSL) {
        res = true;            /* nothing else to do */
    } else {
        res = bus_ssl_disconnect(b, ci->ssl);
        res = BusSSL_Disconnect(b, ci->ssl);
    }

    free(ci);
    return res;
}

bool bus_schedule_threadpool_task(struct bus *b, struct threadpool_task *task,
        size_t *backpressure) {
    return threadpool_schedule(b->threadpool, task, backpressure);
}

#ifndef TEST
static
#endif
@@ -492,11 +479,11 @@ void free_connection_cb(void *value, void *udata) {
    #ifndef TEST
    int completion_pipe = -1;
    #endif
    if (!listener_remove_socket(l, ci->fd, &completion_pipe)) {
    if (!Listener_RemoveSocket(l, ci->fd, &completion_pipe)) {
        return;           /* couldn't send msg to listener */
    }

    bool completed = bus_poll_on_completion(b, completion_pipe);
    bool completed = BusPoll_OnCompletion(b, completion_pipe);
    if (!completed) {
        return;
    }
@@ -504,7 +491,7 @@ void free_connection_cb(void *value, void *udata) {
    free(ci);
}

bool bus_shutdown(bus *b) {
bool Bus_Shutdown(bus *b) {
    for (;;) {
        shutdown_state_t ss = b->shutdown_state;
        /* Another thread is already shutting things down. */
@@ -517,7 +504,7 @@ bool bus_shutdown(bus *b) {

    if (b->fd_set) {
        BUS_LOG(b, 2, LOG_SHUTDOWN, "removing all connections", b->udata);
        yacht_free(b->fd_set, free_connection_cb, b);
        Yacht_Free(b->fd_set, free_connection_cb, b);
        b->fd_set = NULL;
    }

@@ -529,25 +516,25 @@ bool bus_shutdown(bus *b) {
    for (int i = 0; i < b->listener_count; i++) {
        if (!b->joined[i]) {
            BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128,
                "listener_shutdown -- %d", i);
            if (!listener_shutdown(b->listeners[i], &completion_pipe)) {
                "Listener_Shutdown -- %d", i);
            if (!Listener_Shutdown(b->listeners[i], &completion_pipe)) {
                b->shutdown_state = SHUTDOWN_STATE_RUNNING;
                return false;
            }

            if (!bus_poll_on_completion(b, completion_pipe)) {
            if (!BusPoll_OnCompletion(b, completion_pipe)) {
                b->shutdown_state = SHUTDOWN_STATE_RUNNING;
                return false;
            }

            BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128,
                "listener_shutdown -- joining %d", i);
                "Listener_Shutdown -- joining %d", i);
            #ifndef TEST
            void *unused = NULL;
            #endif
            int res = syscall_pthread_join(b->threads[i], &unused);
            BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128,
                "listener_shutdown -- joined %d", i);
                "Listener_Shutdown -- joined %d", i);
            if (res != 0) {
                b->shutdown_state = SHUTDOWN_STATE_RUNNING;
                return false;
@@ -561,7 +548,7 @@ bool bus_shutdown(bus *b) {
    return true;
}

void bus_backpressure_delay(struct bus *b, size_t backpressure, uint8_t shift) {
void Bus_BackpressureDelay(struct bus *b, size_t backpressure, uint8_t shift) {
    /* Push back if message bus is too busy. */
    backpressure >>= shift;
    
@@ -572,14 +559,6 @@ void bus_backpressure_delay(struct bus *b, size_t backpressure, uint8_t shift) {
    }
}

void bus_lock_log(struct bus *b) {
    pthread_mutex_lock(&b->log_lock);
}

void bus_unlock_log(struct bus *b) {
    pthread_mutex_unlock(&b->log_lock);
}

static void box_execute_cb(void *udata) {
    boxed_msg *box = (boxed_msg *)udata;

@@ -598,7 +577,7 @@ static void box_cleanup_cb(void *udata) {

/* Deliver a boxed message to the thread pool to execute.
 * The boxed message will be freed by the threadpool. */
bool bus_process_boxed_message(struct bus *b,
bool Bus_ProcessBoxedMessage(struct bus *b,
        struct boxed_msg *box, size_t *backpressure) {
    assert(box);
    assert(box->result.status != BUS_SEND_UNDEFINED);
@@ -611,47 +590,46 @@ bool bus_process_boxed_message(struct bus *b,

    BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128,
        "Scheduling boxed message -- %p -- where it will be freed", (void*)box);
    return bus_schedule_threadpool_task(b, &task, backpressure);
    return Threadpool_Schedule(b->threadpool, &task, backpressure);
}

/* How many seconds should it give the thread pool to shut down? */
#define THREAD_SHUTDOWN_SECONDS 5

void bus_free(bus *b) {
void Bus_Free(bus *b) {
    if (b == NULL) { return; }
    while (b->shutdown_state != SHUTDOWN_STATE_HALTED) {
        if (bus_shutdown(b)) { break; }
        if (Bus_Shutdown(b)) { break; }
        syscall_poll(NULL, 0, 10);  // sleep 10 msec
    }

    for (int i = 0; i < b->listener_count; i++) {
        BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128,
            "listener_free -- %d", i);
        listener_free(b->listeners[i]);
            "Listener_Free -- %d", i);
        Listener_Free(b->listeners[i]);
    }
    free(b->listeners);

    int limit = (1000 * THREAD_SHUTDOWN_SECONDS)/10;
    for (int i = 0; i < limit; i++) {
        BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128,
            "threadpool_shutdown -- %d", i);
        if (threadpool_shutdown(b->threadpool, false)) { break; }
            "Threadpool_Shutdown -- %d", i);
        if (Threadpool_Shutdown(b->threadpool, false)) { break; }
        (void)syscall_poll(NULL, 0, 10);

        if (i == limit - 1) {
            BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128,
                "threadpool_shutdown -- %d (forced)", i);
            threadpool_shutdown(b->threadpool, true);
                "Threadpool_Shutdown -- %d (forced)", i);
            Threadpool_Shutdown(b->threadpool, true);
        }
    }
    BUS_LOG(b, 3, LOG_SHUTDOWN, "threadpool_free", b->udata);
    threadpool_free(b->threadpool);
    BUS_LOG(b, 3, LOG_SHUTDOWN, "Threadpool_Free", b->udata);
    Threadpool_Free(b->threadpool);
    free(b->joined);
    free(b->threads);
    pthread_mutex_destroy(&b->fd_set_lock);
    pthread_mutex_destroy(&b->log_lock);

    bus_ssl_ctx_free(b);
    BusSSL_CtxFree(b);
    free(b);
}

+11 −15
Original line number Diff line number Diff line
@@ -22,28 +22,24 @@

#include "bus_types.h"

/* This opaque bus struct represents the only user-facing interface to
 * the network handling code. Callbacks are provided to react to network
 * events. */

/* Initialize a bus, based on configuration in *config. Returns a bool
/* Initialize a bus, based on configuration in *config. RetuBus_RegisterSockets a bool
 * indicating whether the construction succeeded, and the bus pointer
 * and/or a status code indicating the cause of failure in *res. */
bool bus_init(bus_config *config, struct bus_result *res);
bool Bus_Init(bus_config *config, struct bus_result *res);

/* Send a request. Blocks until the request has been transmitted.
 * 
 * Assumes the FD has been registered with bus_register_socket;
 * Assumes the FD has been registered with Bus_register_socket;
 * sending to an unregistered socket is an error.
 *
 * Returns true if the request has been accepted and the bus will
 * RetuBus_RegisterSockets true if the request has been accepted and the bus will
 * attempt to handle the request and response. They can still fail,
 * but the error status will be passed to the result handler callback.
 *
 * Returns false if the request has been rejected, due to a memory
 * RetuBus_RegisterSockets false if the request has been rejected, due to a memory
 * allocation error or invalid arguments.
 * */
bool bus_send_request(struct bus *b, bus_user_msg *msg);
bool Bus_SendRequest(struct bus *b, bus_user_msg *msg);

/* Register a socket connected to an endpoint, and data that will be passed
 * to all interactions on that socket.
@@ -53,17 +49,17 @@ bool bus_send_request(struct bus *b, bus_user_msg *msg);
 *
 * If USES_SSL is true, then the function will block until the initial
 * SSL/TLS connection handshake has completed. */
bool bus_register_socket(struct bus *b, bus_socket_t type, int fd, void *socket_udata);
bool Bus_RegisterSocket(struct bus *b, bus_socket_t type, int fd, void *socket_udata);

/* Free metadata about a socket that has been disconnected. */
bool bus_release_socket(struct bus *b, int fd, void **socket_udata_out);
bool Bus_ReleaseSocket(struct bus *b, int fd, void **socket_udata_out);

/* Begin shutting the system down. Returns true once everything pending
/* Begin shutting the system down. RetuBus_RegisterSockets true once everything pending
 * has resolved. */
bool bus_shutdown(struct bus *b);
bool Bus_Shutdown(struct bus *b);

/* Free internal data structures for the bus. */
void bus_free(struct bus *b);
void Bus_Free(struct bus *b);

/* Inward facing portion of the message bus -- functions called
 * by other parts of the message bus, like the Listener thread,
+8 −8
Original line number Diff line number Diff line
@@ -88,7 +88,7 @@ static time_t get_cur_second(void);

static void log_cb(log_event_t event, int log_level, const char *msg, void *udata) {
    example_state *s = (example_state *)udata;
    const char *event_str = bus_log_event_str(event);
    const char *event_str = Bus_LogEventStr(event);
    fprintf(/*stderr*/stdout, "%ld -- %s[%d] -- %s\n",
        s->last_second, event_str, log_level, msg);
}
@@ -253,7 +253,7 @@ int main(int argc, char **argv) {
        .bus_udata = &state,
    };
    bus_result res = {0};
    if (!bus_init(&cfg, &res)) {
    if (!Bus_Init(&cfg, &res)) {
        LOG(0, "failed to init bus: %d\n", res.status);
        return 1;
    }
@@ -264,8 +264,8 @@ int main(int argc, char **argv) {

    if (b) {
        LOG(1, "shutting down\n");
        bus_shutdown(b);
        bus_free(b);
        Bus_Shutdown(b);
        Bus_Free(b);
        return 0;
    } else {
        return 1;
@@ -445,7 +445,7 @@ static void tick_handler(example_state *s) {

static time_t get_cur_second(void) {
    struct timeval tv;
    if (!util_timestamp(&tv, true)) {
    if (!Util_Timestamp(&tv, true)) {
        assert(false);
    }
    return tv.tv_sec;
@@ -456,7 +456,7 @@ static void run_bus(example_state *s, struct bus *b) {
    open_sockets(s);

    for (int i = 0; i < s->sockets_used; i++) {
        bus_register_socket(b, BUS_SOCKET_PLAIN, s->sockets[i], s->info[i]);
        Bus_RegisterSocket(b, BUS_SOCKET_PLAIN, s->sockets[i], s->info[i]);
    }

    int cur_socket_i = 0;
@@ -503,8 +503,8 @@ static void run_bus(example_state *s, struct bus *b) {
            
            s->sent_msgs++;
            payload_size++;
            if (!bus_send_request(b, &msg)) {
                LOG(1, " @@@ bus_send_request failed!\n");
            if (!Bus_SendRequest(b, &msg)) {
                LOG(1, " @@@ Bus_SendRequest failed!\n");
                dropped++;
                if (dropped >= 100) {
                    LOG(1, " @@@ more than 100 send failures, halting\n");
+0 −1
Original line number Diff line number Diff line
@@ -78,7 +78,6 @@ typedef struct bus {

    int log_level;
    bus_log_cb *log_cb;
    pthread_mutex_t log_lock;

    uint8_t listener_count;
    struct listener **listeners;
+4 −12
Original line number Diff line number Diff line
@@ -23,26 +23,18 @@
#include "bus_types.h"

/* Get the string key for a log event ID. */
const char *bus_log_event_str(log_event_t event);
const char *Bus_LogEventStr(log_event_t event);

/* For a given file descriptor, get the listener ID to use.
 * This will level sockets between multiple threads. */
struct listener *bus_get_listener_for_socket(struct bus *b, int fd);

/* Schedule a task in the bus's threadpool. */
bool bus_schedule_threadpool_task(struct bus *b, struct threadpool_task *task,
    size_t *backpressure);

/* Lock / unlock the log mutex, since logging can occur on several threads. */
void bus_lock_log(struct bus *b);
void bus_unlock_log(struct bus *b);
struct listener *Bus_GetListenerForSocket(struct bus *b, int fd);

/* Deliver a boxed message to the thread pool to execute. */
bool bus_process_boxed_message(struct bus *b,
bool Bus_ProcessBoxedMessage(struct bus *b,
    struct boxed_msg *box, size_t *backpressure);

/* Provide backpressure by sleeping for (backpressure >> shift) msec, if
 * the value is greater than 0. */
void bus_backpressure_delay(struct bus *b, size_t backpressure, uint8_t shift);
void Bus_BackpressureDelay(struct bus *b, size_t backpressure, uint8_t shift);

#endif
Loading