Commit 8c1615c7 authored by Scott Vokes's avatar Scott Vokes
Browse files

Track largest seen sequence ID in a connection-specific way.

parent bbdae10b
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -462,6 +462,7 @@ bool bus_register_socket(struct bus *b, bus_socket_t type, int fd, void *udata)
    ci->fd = fd;
    ci->to_read_size = 0;
    ci->udata = udata;
    ci->largest_seq_id_seen = 0;

    if (type == BUS_SOCKET_SSL) {
        if (!bus_ssl_connect(b, ci)) { goto cleanup; }
+1 −0
Original line number Diff line number Diff line
@@ -103,6 +103,7 @@ typedef struct {
    int fd;
    rx_error_t error;
    size_t to_read_size;
    int64_t largest_seq_id_seen;

    SSL *ssl;                   /* SSL handle. Must be valid or BUS_NO_SSL. */

+9 −7
Original line number Diff line number Diff line
@@ -608,11 +608,12 @@ static void process_unpacked_message(listener *l,
        int64_t seq_id = result.u.success.seq_id;
        void *opaque_msg = result.u.success.msg;

        if (seq_id < l->largest_seq_id_seen && l->largest_seq_id_seen != 0) {
        if (seq_id < ci->largest_seq_id_seen && ci->largest_seq_id_seen != 0) {
            BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 128,
                "suspicious sequence ID: largest seen is %lld, got %lld\n",
                (long long)l->largest_seq_id_seen, (long long)seq_id);
                (long long)ci->largest_seq_id_seen, (long long)seq_id);
        }
        ci->largest_seq_id_seen = seq_id;

        rx_info_t *info = find_info_by_sequence_id(l, ci->fd, seq_id);
        if (info) {
@@ -641,10 +642,11 @@ static void process_unpacked_message(listener *l,
            }
        } else {
            /* We received a response that we weren't expecting. */
            if (seq_id != 0) {
                BUS_LOG_SNPRINTF(b, 2 - 2, LOG_LISTENER, b->udata, 128,
                    "Couldn't find info for fd %d, seq_id %lld, msg %p",
                    ci->fd, (long long)seq_id, opaque_msg);
   
            }   
            if (b->unexpected_msg_cb) {
                b->unexpected_msg_cb(opaque_msg, seq_id, b->udata, ci->udata);
            }
@@ -1211,7 +1213,7 @@ static void attempt_delivery(listener *l, struct rx_info_t *info) {
        /* success */
        BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 256,
            "successfully delivered box %p (seq_id %lld), marking info %d as DONE",
            (void*)box, (long long)box->out_seq_id, info->id);
            (void*)box, (long long)seq_id, info->id);
        info->u.expect.error = RX_ERROR_DONE;
        BUS_LOG_SNPRINTF(b, 4, LOG_LISTENER, b->udata, 128,
            "initial clean-up attempt for completed RX event at info +%d", info->id);
+13 −8
Original line number Diff line number Diff line
@@ -439,6 +439,7 @@ static bool register_socket_info(sender *s, int fd, SSL *ssl) {
    info->fd = fd;
    info->ssl = ssl;
    info->refcount = 0;
    info->largest_seq_id_seen = 0;

    void *old = NULL;
    if (!yacht_set(s->fd_hash_table, fd, info, &old)) {
@@ -568,14 +569,18 @@ static void enqueue_write(struct sender *s, tx_info_t *info) {
    int fd = info->u.enqueue.fd;
    fd_info *fdi = NULL;
    int64_t out_seq_id = info->u.enqueue.box->out_seq_id;
    if (s->largest_seq_id_seen >= out_seq_id && s->largest_seq_id_seen > 0) {
    if (yacht_get(s->fd_hash_table, fd, (void **)&fdi)) {
        assert(fdi);

        if (fdi->largest_seq_id_seen > out_seq_id && fdi->largest_seq_id_seen > 0) {
            BUS_LOG_SNPRINTF(b, 0 , LOG_SENDER, b->udata, 64,
                "suspicious outgoing sequence ID: got %lld, already sent up to %lld",
            (long long)out_seq_id, (long long)s->largest_seq_id_seen);
                (long long)out_seq_id, (long long)fdi->largest_seq_id_seen);
            set_error_for_socket(s, fd, TX_ERROR_BAD_SEQUENCE_ID);
    } else if (yacht_get(s->fd_hash_table, fd, (void **)&fdi)) {
        assert(fdi);
        s->largest_seq_id_seen = out_seq_id;
            return;
        }
        fdi->largest_seq_id_seen = out_seq_id;

        /* Notify the listener that we're about to start writing to a drive,
         * because (in rare cases) the response may arrive between finishing
         * the write and the listener processing the notification. In that
+1 −0
Original line number Diff line number Diff line
@@ -68,6 +68,7 @@ typedef struct {
    int fd;
    SSL *ssl;                   /* SSL handle. Can be NULL. */
    int refcount;
    int64_t largest_seq_id_seen;
} fd_info;

/* Metadata for a message in-flight. */