Commit c7550d7a authored by Scott Vokes's avatar Scott Vokes
Browse files

Finish eliminating sender thread, update error handling process.

bus_send_request now returns false if the request has been rejected
(due to a bad argument or an internal allocation failure), otherwise
it returns true and handles subsequent errors during the request or
response via the status callback.
parent 2227d634
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -193,6 +193,7 @@ typedef enum {
    KINETIC_STATUS_DEVICE_LOCKED,           ///< The operation failed because the device is sercurely locked. An UNLOCK must be issued to unlock for use.
    KINETIC_STATUS_ACL_ERROR,               ///< A security operation failed due to bad ACL(s)
    KINETIC_STATUS_NOT_AUTHORIZED,          ///< Authorization failure
    KINETIC_STATUS_REQUEST_REJECTED,        ///< No request was attempted.
    KINETIC_STATUS_COUNT                    ///< Number of status codes in KineticStatusDescriptor
} KineticStatus;

+10 −1
Original line number Diff line number Diff line
@@ -277,7 +277,7 @@ static boxed_msg *box_msg(struct bus *b, bus_user_msg *msg) {

bool bus_send_request(struct bus *b, bus_user_msg *msg)
{
    if (b == NULL || msg == NULL || msg->fd == 0) {
    if (b == NULL || msg == NULL || msg->fd == -1) {
        return false;
    }

@@ -291,6 +291,15 @@ bool bus_send_request(struct bus *b, bus_user_msg *msg)
    bool res = sender_do_blocking_send(b, box);
    BUS_LOG_SNPRINTF(b, 3, LOG_SENDING_REQUEST, b->udata, 64,
        "...request sent, result %d", res);

    /* The send was rejected -- free the box, but don't call the error
     * handling callback. */
    if (!res) {
        BUS_LOG_SNPRINTF(b, 3, LOG_SENDING_REQUEST, b->udata, 64,
            "Freeing box since request was rejected: %p", box);
        free(box);
    }

    return res;
}

+9 −1
Original line number Diff line number Diff line
@@ -34,7 +34,15 @@ bool bus_init(bus_config *config, struct bus_result *res);
/* Send a request. Blocks until the request has been transmitted.
 * 
 * Assumes the FD has been registered with bus_register_socket;
 * sending to an unregistered socket is an error. */
 * sending to an unregistered socket is an error.
 *
 * Returns true if the request has been accepted and the bus will
 * attempt to handle the request and response. They can still fail,
 * but the error status will be passed to the result handler callback.
 *
 * Returns false if the request has been rejected, due to a memory
 * allocation error or invalid arguments.
 * */
bool bus_send_request(struct bus *b, bus_user_msg *msg);

/* Get the string key for a log event ID. */
+52 −16
Original line number Diff line number Diff line
@@ -35,12 +35,20 @@
#include "yacht.h"
#include "sender_internal.h"

/* Do a blocking send.
 *
 * Returning true indicates that the message has been queued up for
 * delivery, but the request or response may still fail. Those errors
 * are handled by giving an error status code to the callback.
 * Returning false means that the send was rejected outright, and
 * the callback-based error handling will not be used. */
bool sender_do_blocking_send(bus *b, boxed_msg *box) {
    /* Note: assumes that all locking and thread-safe seq_id allocation
     * has been handled upstream. If multiple client requests are queued
     * up to go out at the same time, they must go out in monotonic order,
     * with only a single thread writing to the socket at once. */
    assert(b);
    assert(box);

    BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 256,
        "doing blocking send of box %p, with <fd:%d, seq_id %lld>, msg[%zd]: %p",
@@ -67,8 +75,10 @@ bool sender_do_blocking_send(bus *b, boxed_msg *box) {
     * EXPECT hasn't, leading to ambiguity about what to do with
     * the response (which may or may not have arrived).
     * */
    attempt_to_enqueue_sending_request_message_to_listener(b,
        box->fd, box->out_seq_id, box->timeout_sec + 5);
    if (!attempt_to_enqueue_sending_request_message_to_listener(b,
            box->fd, box->out_seq_id, box->timeout_sec + 5)) {
        return false;
    }
    assert(box->out_sent_size == 0);

    int rem_msec = timeout_msec;
@@ -83,7 +93,7 @@ bool sender_do_blocking_send(bus *b, boxed_msg *box) {
                continue;
            } else {
                handle_failure(b, box, BUS_SEND_TX_FAILURE);
                return false;
                return true;
            }
        } else if (res == 1) {
            short revents = fds[0].revents;
@@ -91,13 +101,13 @@ bool sender_do_blocking_send(bus *b, boxed_msg *box) {
                BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 256,
                    "do_blocking_send on %d: POLLNVAL => UNREGISTERED_SOCKET", box->fd);
                handle_failure(b, box, BUS_SEND_UNREGISTERED_SOCKET);
                return false;
                return true;
            } else if (revents & (POLLERR | POLLHUP)) {
                BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 256,
                    "do_blocking_send on %d: POLLERR/POLLHUP => TX_FAILURE (%d)",
                    box->fd, revents);
                handle_failure(b, box, BUS_SEND_TX_FAILURE);
                return false;
                return true;
            } else if (revents & POLLOUT) {
                /* TODO: use gettimeofday to figure out actual elapsed time?
                 * We're more concerned with timing out on the response than
@@ -113,7 +123,7 @@ bool sender_do_blocking_send(bus *b, boxed_msg *box) {
                case HW_DONE:
                    return true;
                case HW_ERROR:
                    return false;
                    return true;
                }
            } else {
                BUS_LOG_SNPRINTF(b, 0, LOG_SENDER, b->udata, 256,
@@ -124,12 +134,12 @@ bool sender_do_blocking_send(bus *b, boxed_msg *box) {
            BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 256,
                "do_blocking_send on %d: TX_TIMEOUT", box->fd);
            handle_failure(b, box, BUS_SEND_TX_TIMEOUT);
            return false;
            return true;
        }
    }
}

static void attempt_to_enqueue_sending_request_message_to_listener(struct bus *b,
static bool attempt_to_enqueue_sending_request_message_to_listener(struct bus *b,
    int fd, int64_t seq_id, int16_t timeout_sec) {
    BUS_LOG_SNPRINTF(b, 5, LOG_SENDER, b->udata, 128,
      "telling listener to expect response, with <fd%d, seq_id:%lld>",
@@ -138,9 +148,10 @@ static void attempt_to_enqueue_sending_request_message_to_listener(struct bus *b
    struct listener *l = bus_get_listener_for_socket(b, fd);

    int delay = 1;
    for (;;) {
    const int max_retries = 10;
    for (int try = 0; try < max_retries; try++) {
        if (listener_hold_response(l, fd, seq_id, timeout_sec)) {
            return;
            return true;
        } else {
            /* Don't apply much backpressure here since the client
             * thread will get it when the message is done sending. */
@@ -148,18 +159,39 @@ static void attempt_to_enqueue_sending_request_message_to_listener(struct bus *b
            if (delay < 5) { delay++; }
        }
    }
    assert(false);
    return false;
}

static void handle_failure(struct bus *b, boxed_msg *box, bus_send_status_t status) {
    BUS_LOG_SNPRINTF(b, 5, LOG_SENDER, b->udata, 64,
        "handle_failure: box %p, status %d",
        (void*)box, status);
        "handle_failure: box %p, <fd:%d, seq_id:%lld>, status %d",
        (void*)box, box->fd, (long long)box->out_seq_id, status);
    
    box->result = (bus_msg_result_t){
        .status = status,
    };
    /* FIXME - make distinction upstream between sending fail and rejected send */
    
    size_t backpressure = 0;

    /* Retry until it succeeds. */
    size_t retries = 0;
    for (;;) {
        if (bus_process_boxed_message(b, box, &backpressure)) {
            BUS_LOG_SNPRINTF(b, 5, LOG_SENDER, b->udata, 64,
                "deleted box %p", (void*)box);
            bus_backpressure_delay(b, backpressure,
                LISTENER_EXPECT_BACKPRESSURE_SHIFT);
            return;
        } else {
            retries++;
            const int delay = 5;
            poll(NULL, 0, delay);
            if (retries > 0 && (retries & 255) == 0) {
                BUS_LOG_SNPRINTF(b, 0, LOG_SENDER, b->udata, 64,
                    "looping on handle_failure retry: %zd", retries);
            }
        }
    }
}

static ssize_t write_plain(bus *b, boxed_msg *box);
@@ -187,6 +219,9 @@ static handle_write_res handle_write(bus *b, boxed_msg *box) {
        /* If the OS set POLLOUT but we can't actually write, then
         * just go back to the poll() loop with no progress.
         * If we busywait on this, something is deeply wrong. */
        BUS_LOG_SNPRINTF(b, 0, LOG_SENDER, b->udata, 128,
            "suspicious: wrote %zd bytes to <fd:%d, seq_id%lld>",
            wrsz, box->fd, (long long)box->out_seq_id);
    } else {
        /* Update amount written so far */
        box->out_sent_size += wrsz;
@@ -233,7 +268,7 @@ static ssize_t write_plain(struct bus *b, boxed_msg *box) {
                errno = 0;
                continue;
            } else {
                /* FIXME: notify about closed socket */
                /* will notify about closed socket upstream */
                BUS_LOG_SNPRINTF(b, 1, LOG_SENDER, b->udata, 64,
                    "write: socket error writing, %s", strerror(errno));
                errno = 0;
@@ -288,6 +323,7 @@ static ssize_t write_ssl(struct bus *b, boxed_msg *box, SSL *ssl) {
                     * the socket for too long */
                    continue;
                } else {
                    /* will notify about socket error upstream */
                    BUS_LOG_SNPRINTF(b, 1, LOG_SENDER, b->udata, 64,
                        "SSL_write on fd %d: SSL_ERROR_SYSCALL -- %s",
                        fd, strerror(errno));
@@ -313,7 +349,7 @@ static ssize_t write_ssl(struct bus *b, boxed_msg *box, SSL *ssl) {
            {
                BUS_LOG_SNPRINTF(b, 1, LOG_SENDER, b->udata, 64,
                    "SSL_write on socket %d: match fail: error %d", box->fd, reason);
                break;
                return -1;
            }
            }
        } else {
+7 −0
Original line number Diff line number Diff line
@@ -23,6 +23,13 @@
#include "bus_types.h"
#include "bus_internal_types.h"

/* Do a blocking send.
 *
 * Returning true indicates that the message has been queued up for
 * delivery, but the request or response may still fail. Those errors
 * are handled by giving an error status code to the callback.
 * Returning false means that the send was rejected outright, and
 * the callback-based error handling will not be used. */
bool sender_do_blocking_send(struct bus *b, boxed_msg *box);

#endif
Loading