Commit c483c21b authored by Scott Vokes's avatar Scott Vokes
Browse files

Eliminate send timeout race between sender and listener threads.

Previously, a window could occur where:

1) The sender delivers a HOLD[fd, seq_id] message to the listener.

2) The sender successfully sends its payload, but due to a stressed
   network, overloaded simulator, etc., the payload takes a while to
   finish sending.

3) The listener times out the HOLD message, which had a timeout of the
   normal message timeout + 1 (intended to close the window caused by
   one thread reacting crossing the second boundary before the other,
   but insufficinet).

4) The listener receives the EXPECT message from the sender, but doesn't
   have a corresponding HOLD message to pair it with, because the HOLD
   has just timed out.

In this case, the sender would not trigger normal timeout error handling
because it still successfully passed the handler callback to the listener
(albeit just barely), but the listener is missing its hold message (which
may have already saved a response), so it waits further for a response
which never arrives. This code path did not appear to trigger the timeout
properly, leading to a possible deadlock.

Now, if the hold message is not found, the listener constructs an EXPECT
event message, but immediately times it out with a new error case --
BUS_SEND_RX_TIMEOUT_EXPECT. (This is a recoverable timeout error.)

Also, eliminate a secondary error handling path for retries for messages
between the sender and listener. This has probably not been triggered,
but adds ambiguity to the error handling -- the normal code path for
timeouts is sufficient.

Rename KineticController_HandleExpectedResponse to
KineticController_HandleResult (since it also handles error codes) and
KineticController_HandleUnexecpectedResponse to
KineticController_HandleUnexpectedResponse (fix typo), as part of
clarifying the overall error handling dataflow.
parent c2e3dd07
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -184,6 +184,7 @@ typedef enum {
    BUS_SEND_RX_FAILURE = -54,
    BUS_SEND_BAD_RESPONSE = -55,
    BUS_SEND_UNREGISTERED_SOCKET = -56,
    BUS_SEND_RX_TIMEOUT_EXPECT = -57,
} bus_send_status_t;

/* Result from attempting to configure a message bus. */
+40 −6
Original line number Diff line number Diff line
@@ -881,6 +881,15 @@ static rx_info_t *get_free_rx_info(struct listener *l) {
    }
}

static connection_info *get_connection_info(struct listener *l, int fd) {
    for (int i = 0; i < l->tracked_fds; i++) {
        connection_info *ci = l->fd_info[i];
        assert(ci);
        if (ci->fd == fd) { return ci; }
    }
    return NULL;
}

static void release_rx_info(struct listener *l, rx_info_t *info) {
    assert(info);
    struct bus *b = l->bus;
@@ -896,9 +905,22 @@ static void release_rx_info(struct listener *l, rx_info_t *info) {
             * free it, but don't know how. We should never get here,
             * because it means the sender finished sending the message,
             * but the listener never got the handler callback. */

            if (info->u.hold.result.ok) {
                void *msg = info->u.hold.result.u.success.msg;
                int64_t seq_id = info->u.hold.result.u.success.seq_id;

                connection_info *ci = get_connection_info(l, info->u.hold.fd);
                if (ci && b->unexpected_msg_cb) {
                    BUS_LOG_SNPRINTF(b, 1, LOG_LISTENER, b->udata, 128,
                        "CALLING UNEXPECTED_MSG_CB ON RESULT %p", (void *)&info->u.hold.result);
                    b->unexpected_msg_cb(msg, seq_id, b->udata, ci->udata);
                } else {
                    BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 128,
                        "LEAKING RESULT %p", (void *)&info->u.hold.result);
            //assert(false);
                }
                
            }
        }
        break;
    case RIS_EXPECT:
@@ -1266,12 +1288,23 @@ static void expect_response(listener *l, struct boxed_msg *box) {
            info->timeout_sec = box->timeout_sec;
        }
    } else {                    /* use free info */
        /* If we get here, the listener thinks the HOLD message timed out,
         * but the sender doesn't think things timed out badly enough to
         * itself expose an error. We also don't know if we're going to
         * get a response or not. */

        BUS_LOG_SNPRINTF(b, 3-3, LOG_MEMORY, b->udata, 128,
            "get_hold_rx_info FAILED: fd %d, seq_id %lld",
            box->fd, (long long)box->out_seq_id);

        /* This should be treated like a send timeout. */
        info = get_free_rx_info(l);
        assert(info);
        assert(info->state == RIS_INACTIVE);
        BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128,
            "Setting info %p (+%d)'s box to %p",
            (void*)info, info->id, (void*)box);

        BUS_LOG_SNPRINTF(b, 3-3, LOG_MEMORY, b->udata, 256,
            "Setting info %p (+%d)'s box to %p, which will be expired immediately (timeout %lld)",
            (void*)info, info->id, (void*)box, (long long)box->timeout_sec);
        
        info->state = RIS_EXPECT;
        assert(info->u.expect.box == NULL);
@@ -1280,6 +1313,7 @@ static void expect_response(listener *l, struct boxed_msg *box) {
        info->u.expect.has_result = false;
        /* Switch over to sender's transferred timeout */
        info->timeout_sec = box->timeout_sec;
        notify_message_failure(l, info, BUS_SEND_RX_TIMEOUT_EXPECT);
    }
}

+9 −18
Original line number Diff line number Diff line
@@ -593,9 +593,15 @@ static void enqueue_write(struct sender *s, tx_info_t *info) {
         * because (in rare cases) the response may arrive between finishing
         * the write and the listener processing the notification. In that
         * case, it should hold onto the unrecognized response until the
         * sender notifies it (and passes it the callback). */
         * sender notifies it (and passes it the callback).
         *
         * This timeout is several extra seconds so that we don't have
         * a window where the HOLD message has timed out, but the
         * EXPECT hasn't, leading to ambiguity about what to do with
         * the response (which may or may not have arrived).
         * */
        attempt_to_enqueue_sending_request_message_to_listener(s,
            fd, out_seq_id, info->u.enqueue.timeout_sec + 1);
            fd, out_seq_id, info->u.enqueue.timeout_sec + 5);
        
        /* Increment the refcount. This will cause poll to watch for the
         * socket being writable, if it isn't already being watched. */
@@ -898,7 +904,6 @@ static void update_sent(struct bus *b, sender *s, tx_info_t *info, ssize_t sent)
            .fd = info->u.write.fd,
            .timeout_sec = info->u.write.timeout_sec,
            .box = info->u.write.box,
            .retries = info->u.write.retries,
        };

        /* Message has been sent, so release - caller may free it. */
@@ -1095,26 +1100,12 @@ static void attempt_to_enqueue_request_sent_message_to_listener(sender *s, tx_in
        info->state = TIS_REQUEST_RELEASE;
        info->u.release.backpressure = backpressure;

        notify_caller(s, info, true);
        notify_caller(s, info, true);   // unblock sender
    } else {
        BUS_LOG(b, 2, LOG_SENDER, "failed delivery", b->udata);

        /* Put it back, since we need to keep managing it */
        info->u.notify.box = box;

        info->u.notify.retries++;
        if (info->u.notify.retries == SENDER_MAX_DELIVERY_RETRIES) {

            info->state = TIS_ERROR;
            struct u_error ue = {
                .error = TX_ERROR_NOTIFY_LISTENER_FAILURE,
                .box = box,
            };
            info->u.error = ue;

            notify_message_failure(s, info, BUS_SEND_RX_FAILURE);
            BUS_LOG(b, 2, LOG_SENDER, "failed delivery, several retries", b->udata);
        }
    }
}

+2 −2
Original line number Diff line number Diff line
@@ -25,7 +25,9 @@
#define MAX_TIMEOUT 100
#define TX_TIMEOUT 9

#if 0
#define SENDER_MAX_DELIVERY_RETRIES 5
#endif

/* Max number of in-flight messagse. */
#define MAX_CONCURRENT_SENDS 32
@@ -103,7 +105,6 @@ typedef struct {
            time_t timeout_sec;
            boxed_msg *box;
            size_t sent_size;
            uint8_t retries;
            fd_info *fdi;
        } write;

@@ -111,7 +112,6 @@ typedef struct {
            int fd;
            time_t timeout_sec;
            boxed_msg *box;
            uint8_t retries;
        } notify;

        struct {
+16 −8
Original line number Diff line number Diff line
@@ -145,6 +145,9 @@ KineticStatus bus_to_kinetic_status(bus_send_status_t const status)
        case BUS_SEND_UNREGISTERED_SOCKET:
            res = KINETIC_STATUS_SOCKET_ERROR;
            break;
        case BUS_SEND_RX_TIMEOUT_EXPECT:
            res = KINETIC_STATUS_OPERATION_TIMEDOUT;
            break;
        case BUS_SEND_UNDEFINED:
        default:
        {
@@ -176,10 +179,14 @@ static const char *bus_error_string(bus_send_status_t t) {
        return "rx_failure";
    case BUS_SEND_BAD_RESPONSE:
        return "bad_response";
    case BUS_SEND_UNREGISTERED_SOCKET:
        return "unregistered socket";
    case BUS_SEND_RX_TIMEOUT_EXPECT:
        return "internal timeout";
    }
}

void KineticController_HandleUnexecpectedResponse(void *msg,
void KineticController_HandleUnexpectedResponse(void *msg,
                                                int64_t seq_id,
                                                void *bus_udata,
                                                void *socket_udata)
@@ -192,9 +199,10 @@ void KineticController_HandleUnexecpectedResponse(void *msg,
    (void)bus_udata;

    LOGF2("[PDU RX UNSOLICITED] pdu: 0x%0llX, session: 0x%llX, bus: 0x%llX, "
                "fd: %6d, protoLen: %u, valueLen: %u",
                "fd: %6d, protoLen: %u, valueLen: %u, seq_id: %08llx",
                response, connection->pSession, connection->messageBus, connection->socket,
                response->header.protobufLength, response->header.valueLength);
                response->header.protobufLength, response->header.valueLength,
                (long long)seq_id);

    // Handle unsolicited status PDUs
    if (response->proto->authType == KINETIC_PROTO_MESSAGE_AUTH_TYPE_UNSOLICITEDSTATUS) {
@@ -226,7 +234,7 @@ void KineticController_HandleUnexecpectedResponse(void *msg,
    }
}

void KineticController_HandleExpectedResponse(bus_msg_result_t *res, void *udata)
void KineticController_HandleResult(bus_msg_result_t *res, void *udata)
{
    KineticOperation* op = udata;

@@ -257,7 +265,7 @@ void KineticController_HandleExpectedResponse(bus_msg_result_t *res, void *udata
    else
    {
        // pull out bus error?
        LOGF1("Error receiving response, got message bus error: %s", bus_error_string(res->status));
        LOGF0("Error receiving response, got message bus error: %s", bus_error_string(res->status));
    }

    // Call operation-specific callback, if configured
Loading