Commit e9cfd4e7 authored by Scott Vokes's avatar Scott Vokes
Browse files

Update socket control flow to expose hangups / KINETIC_STATUS_SESSION_TERMINATED.

Also, rename KINTEIC_STATUS_SESSION_TERMINATED to
KINETIC_STATUS_SESSION_TERMINATED throughout.
parent adf3af6f
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -193,7 +193,7 @@ typedef enum {
    KINETIC_STATUS_DEVICE_NAME_REQUIRED,    ///< A device name is required, but was empty
    KINETIC_STATUS_INVALID_LOG_TYPE,        ///< The device log type specified was invalid
    KINETIC_STATUS_HMAC_FAILURE,            ///< An HMAC validation error was detected
    KINTEIC_STATUS_SESSION_TERMINATED,      ///< The session has been terminated by the Kinetic device
    KINETIC_STATUS_SESSION_TERMINATED,      ///< The session has been terminated by the Kinetic device
    KINETIC_STATUS_COUNT                    ///< Number of status codes in KineticStatusDescriptor
} KineticStatus;

+1 −0
Original line number Diff line number Diff line
@@ -440,6 +440,7 @@ bool bus_release_socket(struct bus *b, int fd, void **socket_udata_out) {
        return false;           /* couldn't send msg to listener */
    }

    assert(completion_pipe != -1);
    bool completed = bus_poll_on_completion(b, completion_pipe);
    if (!completed) {           /* listener hung up while waiting */
        return false;
+4 −0
Original line number Diff line number Diff line
@@ -45,7 +45,11 @@ bool bus_poll_on_completion(struct bus *b, int fd) {
        #ifdef TEST
        errno = poll_errno;
        #endif
        BUS_LOG_SNPRINTF(b, 5, LOG_SENDING_REQUEST, b->udata, 64,
            "poll_on_completion, polling %d", fd);
        int res = syscall_poll(fds, 1, -1);
        BUS_LOG_SNPRINTF(b, 5, LOG_SENDING_REQUEST, b->udata, 64,
            "poll_on_completion for %d, res %d (errno %d)", fd, res, errno);
        if (res == -1) {
            if (util_is_resumable_io_error(errno)) {
                BUS_LOG_SNPRINTF(b, 5, LOG_SENDING_REQUEST, b->udata, 64,
+22 −4
Original line number Diff line number Diff line
@@ -50,8 +50,13 @@ void ListenerCmd_NotifyCaller(listener *l, int fd) {
    uint16_t backpressure = ListenerTask_GetBackpressure(l);
    reply_buf[1] = (uint8_t)(backpressure & 0xff);
    reply_buf[2] = (uint8_t)((backpressure >> 8) & 0xff);
    struct bus *b = l->bus;

    for (;;) {
        BUS_LOG_SNPRINTF(b, 6, LOG_LISTENER, b->udata, 128,
            "NotifyCaller on %d with backpressure %u",
            fd, backpressure);;
        
        ssize_t wres = syscall_write(fd, reply_buf, sizeof(reply_buf));
        if (wres == sizeof(reply_buf)) { break; }
        if (wres == -1) {
@@ -70,6 +75,8 @@ void ListenerCmd_CheckIncomingMessages(listener *l, int *res) {
    short events = l->fds[INCOMING_MSG_PIPE_ID].revents;

    if (events & (POLLERR | POLLHUP | POLLNVAL)) {  /* hangup/error */
        BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 128,
            "hangup on listener incoming command pipe: %d", events);
        return;
    }

@@ -105,7 +112,7 @@ void ListenerCmd_CheckIncomingMessages(listener *l, int *res) {
static void msg_handler(listener *l, listener_msg *pmsg) {
    struct bus *b = l->bus;
    BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 128,
        "Handling message -- %p", (void*)pmsg);
        "Handling message -- %p, type %d", (void*)pmsg, pmsg->type);

    l->is_idle = false;

@@ -237,7 +244,7 @@ static void expect_response(listener *l, struct boxed_msg *box) {
    rx_info_t *info = listener_helper_find_info_by_sequence_id(l, box->fd, box->out_seq_id);
    if (info && info->state == RIS_HOLD) {
        BUS_ASSERT(b, b->udata, info->state == RIS_HOLD);
        if (info->u.hold.has_result) {
        if (info->u.hold.error == RX_ERROR_NONE && info->u.hold.has_result) {
            bus_unpack_cb_res_t result = info->u.hold.result;

            BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 256,
@@ -251,6 +258,14 @@ static void expect_response(listener *l, struct boxed_msg *box) {
            info->u.expect.has_result = true;
            info->u.expect.result = result;
            ListenerTask_AttemptDelivery(l, info);
        } else if (info->u.hold.error != RX_ERROR_NONE) {
            rx_error_t error = info->u.hold.error;
            bus_unpack_cb_res_t result = info->u.hold.result;
            info->state = RIS_EXPECT;
            info->u.expect.error = error;
            info->u.expect.result = result;
            info->u.expect.box = box;
            ListenerTask_NotifyMessageFailure(l, info, BUS_SEND_RX_FAILURE);
        } else {
            BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 256,
                "converting HOLD to EXPECT info %d (%p), attempting delivery <box:%p, fd:%d, seq_id:%lld>",
@@ -263,13 +278,16 @@ static void expect_response(listener *l, struct boxed_msg *box) {
            /* Switch over to client's transferred timeout */
            info->timeout_sec = box->timeout_sec;
        }
    } else {                    /* use free info */
    } else if (info && info->state == RIS_EXPECT) {                    /* use free info */
        /* If we get here, the listener thinks the HOLD message timed out,
         * but the client doesn't think things timed out badly enough to
         * itself expose an error. We also don't know if we're going to
         * get a response or not. */

        // FIXME: should we just assert false for this case now?
        /* FIXME: should we just assert(false) for this case now?
         * This should never happen, there is a large extra timeout added to
         * the HOLD messages to avoid a window where HOLDs could time out
         * just before EXPECTs arrive. */

        BUS_LOG_SNPRINTF(b, 0, LOG_MEMORY, b->udata, 128,
            "get_hold_rx_info FAILED: fd %d, seq_id %lld",
+1 −1
Original line number Diff line number Diff line
@@ -85,7 +85,7 @@ bool listener_helper_push_message(struct listener *l, listener_msg *msg, int *re
                errno = 0;
                continue;
            } else {
                BUS_LOG_SNPRINTF(b, 10, LOG_LISTENER, b->udata, 64,
                BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 64,
                    "write_commit error, errno %d", errno);
                errno = 0;
                ListenerTask_ReleaseMsg(l, msg);
Loading