Commit 80ea9595 authored by Scott Vokes's avatar Scott Vokes
Browse files

Merge branch 'bus_sender_rework' into develop

Conflicts:
	src/lib/bus/sender.c
	src/lib/kinetic_controller.c
parents 19226a9a c1b69730
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -3,9 +3,9 @@ THREADPOOL_PATH= ../threadpool
LIB_PATH=		../../../obj

OPT=		-O3
LIB_INC =	-I${SOCKET99_PATH} -I${THREADPOOL_PATH}
LIB_INC =	-I${SOCKET99_PATH} -I${THREADPOOL_PATH} -I${OPENSSL_PATH}/include
CFLAGS +=	-std=c99 ${OPT} -Wall -g ${LIB_INC}
LDFLAGS +=	-L. -lsocket99 -L${LIB_PATH}
LDFLAGS +=	-L. -lsocket99 -L${LIB_PATH} -L${THREADPOOL_PATH} -L${OPENSSL_PATH}/lib -lssl -lcrypto

BUS_OBJS = \
	bus.o \
+28 −22
Original line number Diff line number Diff line
@@ -33,6 +33,7 @@
#include "threadpool.h"
#include "bus_internal_types.h"
#include "bus_ssl.h"
#include "util.h"
#include "yacht.h"

/* Function pointers for pthreads. */
@@ -192,6 +193,7 @@ bool bus_init(bus_config *config, struct bus_result *res) {
    res->bus = b;
    BUS_LOG(b, 1, LOG_INITIALIZATION, "initialized", config->bus_udata);
    return true;

cleanup:
    if (ss) {
        for (int i = 0; i < config->sender_count; i++) {
@@ -264,10 +266,6 @@ static boxed_msg *box_msg(struct bus *b, bus_user_msg *msg) {
    return box;
}

static bool is_resumable_io_error(int errno_) {
    return errno_ == EAGAIN || errno_ == EINTR || errno_ == EWOULDBLOCK;
}

bool bus_send_request(struct bus *b, bus_user_msg *msg)
{
    if (b == NULL || msg == NULL || msg->fd == 0) {
@@ -279,20 +277,11 @@ bool bus_send_request(struct bus *b, bus_user_msg *msg)
        return false;
    }

    int complete_fd = -1;

    int s_id = sender_id_of_socket(b, msg->fd);
    struct sender *s = b->senders[s_id];

    /* Pass boxed message to the sender */
    if (!sender_enqueue_message(s, box, &complete_fd)) {
        BUS_LOG(b, 3, LOG_SENDING_REQUEST, "sender_enqueue_message failed", b->udata);
        return false;
    }

    /* block until delivery status */
    BUS_LOG(b, 3, LOG_SENDING_REQUEST, "Sending request...", b->udata);
    return poll_on_completion(b, complete_fd);
    return sender_send_request(s, box);
}

static bool poll_on_completion(struct bus *b, int fd) {
@@ -301,6 +290,10 @@ static bool poll_on_completion(struct bus *b, int fd) {
    fds[0].fd = fd;
    fds[0].events = POLLIN;

    /* TODO REFACTOR this should be reused between bus, sender, and listener,
     *     or be moved into the listener.
     *     The sender has its own blocking polling for commands. */

    /* FIXME: compare this to TCP timeouts -- try to prevent sender
     * succeeding but failing to notify client. */
    const int TIMEOUT_SECONDS = 10;
@@ -310,7 +303,7 @@ static bool poll_on_completion(struct bus *b, int fd) {
        BUS_LOG(b, 5, LOG_SENDING_REQUEST, "Polling on completion...tick...", b->udata);
        int res = poll(fds, 1, ONE_SECOND);
        if (res == -1) {
            if (is_resumable_io_error(errno)) {
            if (util_is_resumable_io_error(errno)) {
                BUS_LOG(b, 3, LOG_SENDING_REQUEST, "Polling on completion...EAGAIN", b->udata);
                if (errno == EINTR && i > 0) { i--; }
                errno = 0;
@@ -337,7 +330,7 @@ static bool poll_on_completion(struct bus *b, int fd) {
                BUS_LOG(b, 3, LOG_SENDING_REQUEST, "sent!", b->udata);
                return true;
            } else if (sz == -1) {
                if (is_resumable_io_error(errno)) {
                if (util_is_resumable_io_error(errno)) {
                    errno = 0;
                } else {
                    assert(false);
@@ -346,7 +339,7 @@ static bool poll_on_completion(struct bus *b, int fd) {
            }
        }
    }
    BUS_LOG(b, 2, LOG_SENDING_REQUEST, "failed to send (timeout on sender)", b->udata);
    BUS_LOG(b, 2, LOG_SENDING_REQUEST, "failed (timeout)", b->udata);

    #if 0
    assert(false);
@@ -391,13 +384,15 @@ const char *bus_log_event_str(log_event_t event) {
}

bool bus_register_socket(struct bus *b, bus_socket_t type, int fd, void *udata) {
    /* Register a socket internally with a listener. */
    /* Register a socket internally with a sender and listener. */
    int s_id = sender_id_of_socket(b, fd);
    int l_id = listener_id_of_socket(b, fd);

    BUS_LOG_SNPRINTF(b, 2, LOG_SOCKET_REGISTERED, b->udata, 64,
        "registering socket %d", fd);

    /* Spread sockets throughout the different listener processes. */
    /* Spread sockets throughout the different sender & listener processes. */
    struct sender *s = b->senders[s_id];
    struct listener *l = b->listeners[l_id];
    
    int pipes[2];
@@ -436,9 +431,14 @@ bool bus_register_socket(struct bus *b, bus_socket_t type, int fd, void *udata)
        goto cleanup;
    }

    bool res = listener_add_socket(l, ci, pipe_in);
    bool res = false;
    res = sender_register_socket(s, fd, ci->ssl);
    if (!res) { goto cleanup; }

    res = listener_add_socket(l, ci, pipe_in);
    if (!res) { goto cleanup; }

    /* FIXME: Move this into listener_add_socket? */
    BUS_LOG(b, 2, LOG_SOCKET_REGISTERED, "polling on socket add...", b->udata);
    bool completed = poll_on_completion(b, pipe_out);
    if (!completed) { goto cleanup; }
@@ -460,15 +460,19 @@ cleanup:

/* Free metadata about a socket that has been disconnected. */
bool bus_release_socket(struct bus *b, int fd) {
    /* Register a socket internally with a listener. */
    int s_id = sender_id_of_socket(b, fd);
    int l_id = listener_id_of_socket(b, fd);

    BUS_LOG_SNPRINTF(b, 2, LOG_SOCKET_REGISTERED, b->udata, 64,
        "forgetting socket %d", fd);

    /* Spread sockets throughout the different listener processes. */
    struct sender *s = b->senders[s_id];
    struct listener *l = b->listeners[l_id];

    if (!sender_remove_socket(s, fd)) {
        return false;
    }

    if (!listener_remove_socket(l, fd)) {
        return false;           /* couldn't send msg to listener */
    }
@@ -500,7 +504,9 @@ bool bus_shutdown(bus *b) {
    for (int i = 0; i < b->sender_count; i++) {
        int off = 0;
        if (!b->joined[i + off]) {
            BUS_LOG(b, 2, LOG_SHUTDOWN, "sender_shutdown...", b->udata);
            while (!sender_shutdown(b->senders[i])) {
                BUS_LOG(b, 2, LOG_SHUTDOWN, "sender_shutdown... (retry)", b->udata);
                sleep(1);
            }
            void *unused = NULL;
+1 −0
Original line number Diff line number Diff line
@@ -177,6 +177,7 @@ typedef enum {
    BUS_SEND_RX_TIMEOUT = -3,
    BUS_SEND_RX_FAILURE = -4,
    BUS_SEND_BAD_RESPONSE = -5,
    BUS_SEND_UNREGISTERED_SOCKET = -6,
} bus_send_status_t;

/* Result from attempting to configure a message bus. */
+1 −1
Original line number Diff line number Diff line
@@ -283,7 +283,6 @@ void *listener_mainloop(void *arg) {

static void set_error_for_socket(listener *l, int id, int fd, rx_error_t err) {
    /* Mark all pending messages on this socket as being failed due to error. */

    for (int i = 0; i < MAX_PENDING_MESSAGES; i++) {
        rx_info_t *info = &l->rx_info[i];
        if (!info->active) {
@@ -544,6 +543,7 @@ static void process_unpacked_message(listener *l,
             *     not flush them out as unsolicited until the queue of messages
             *     is fully processed. Otherwise, they sit in the queue stuck and
             *     slowing things down. */
            /* TODO: Do unsolicited status messages always have a seq_id of 0? */
            BUS_LOG_SNPRINTF(b, 1, LOG_LISTENER, b->udata, 128,
                "Couldn't find info for seq_id %lld, msg %p",
                (long long)seq_id, opaque_msg);
+713 −391

File changed.

Preview size limit exceeded, changes collapsed.

Loading