Commit 05dc2513 authored by Scott Vokes's avatar Scott Vokes
Browse files

Track largest sequence IDs seen, reject & warn if earlier ones are used.

parent b8f81aaf
Loading
Loading
Loading
Loading
+6 −0
Original line number Diff line number Diff line
@@ -608,6 +608,12 @@ static void process_unpacked_message(listener *l,
        int64_t seq_id = result.u.success.seq_id;
        void *opaque_msg = result.u.success.msg;

        if (seq_id < l->largest_seq_id_seen && l->largest_seq_id_seen != 0) {
            BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 128,
                "suspicious sequence ID: largest seen is %lld, got %lld\n",
                (long long)l->largest_seq_id_seen, seq_id);
        }

        rx_info_t *info = find_info_by_sequence_id(l, ci->fd, seq_id);
        if (info) {
            switch (info->state) {
+1 −0
Original line number Diff line number Diff line
@@ -111,6 +111,7 @@ typedef struct listener {
    listener_msg msgs[MAX_QUEUE_MESSAGES];
    listener_msg *msg_freelist;
    int16_t msgs_in_use;
    int64_t largest_seq_id_seen;

    size_t upstream_backpressure;

+9 −4
Original line number Diff line number Diff line
@@ -564,13 +564,18 @@ static void handle_command(sender *s, int id) {
static void enqueue_write(struct sender *s, tx_info_t *info) {
    assert(info->state == TIS_REQUEST_ENQUEUE);

    struct bus *b = s->bus;
    int fd = info->u.enqueue.fd;
    fd_info *fdi = NULL;
    if (yacht_get(s->fd_hash_table, fd, (void **)&fdi)) {
        assert(fdi);

    int64_t out_seq_id = info->u.enqueue.box->out_seq_id;

    if (s->largest_seq_id_seen >= out_seq_id && s->largest_seq_id_seen > 0) {
        BUS_LOG_SNPRINTF(b, 0 , LOG_SENDER, b->udata, 64,
            "suspicious outgoing sequence ID: got %lld, already sent up to %lld",
            (long long)out_seq_id, (long long)s->largest_seq_id_seen);
        set_error_for_socket(s, fd, TX_ERROR_BAD_SEQUENCE_ID);
    } else if (yacht_get(s->fd_hash_table, fd, (void **)&fdi)) {
        assert(fdi);
        s->largest_seq_id_seen = out_seq_id;
        /* Notify the listener that we're about to start writing to a drive,
         * because (in rare cases) the response may arrive between finishing
         * the write and the listener processing the notification. In that
+3 −0
Original line number Diff line number Diff line
@@ -44,6 +44,7 @@ typedef enum {
    TX_ERROR_NOTIFY_LISTENER_FAILURE = -6,
    TX_ERROR_WRITE_TIMEOUT = -7,
    TX_ERROR_NOTIFY_TIMEOUT = -8,
    TX_ERROR_BAD_SEQUENCE_ID = -9,
} tx_error_t;

typedef enum {
@@ -142,6 +143,8 @@ typedef struct sender {
    int commit_pipe;
    int incoming_command_pipe;

    int64_t largest_seq_id_seen;

    /* Set of file descriptors to poll.
     * fds[0] is the incoming command pipe, and is polled for
     * read; the rest (fds[1] through fds[1 + active_fds] are