Commit bddaf9f3 authored by Scott Vokes's avatar Scott Vokes
Browse files

Add HOLD message from sender to listener to handle immediate responses.

Previously, there was a window between the sender writing messages out
to a socket, notifying the listener to expect a response with a given
sequence ID, and the listener getting a response. Now, before sending,
the sender notifies the listener to hold a response with a given <FD,
sequence ID> pair until the sender finishes its send and transfers the
result-handling callback to the listener. (In the case of timeouts, the
sender sinks the error into the client library's callback, and the
listener discards its HOLD record.)

If the listener gets a full response before the second (EXPECT) message
from the sender, it will be held and processed when the EXPECT message
arrives.

Also:

. Added backpressure on the sender -> listener queue, because the sender
  is now sending two messages to the listener (HOLD and EXPECT) per
  outgoing request, and when the pool of messages used to relay between
  them is nearly full, there is a major performance hit.

. The listener erroneously used a uint8_t for the ID instead of a
  uint16_t while building the freelist. Fix it, so the freelist
  does not end up with cycles when e.g. record 255's next is 0.
parent 050db34a
Loading
Loading
Loading
Loading
+42 −10
Original line number Diff line number Diff line
@@ -87,7 +87,7 @@ static time_t get_cur_second(void);
static void log_cb(log_event_t event, int log_level, const char *msg, void *udata) {
    example_state *s = (example_state *)udata;
    const char *event_str = bus_log_event_str(event);
    fprintf(stderr, "%ld -- %s[%d] -- %s\n",
    fprintf(/*stderr*/stdout, "%ld -- %s[%d] -- %s\n",
        s->last_second, event_str, log_level, msg);
}

@@ -103,6 +103,7 @@ static bus_sink_cb_res_t reset_transfer(socket_info *si) {
    };
    
    si->state = STATE_AWAITING_HEADER;
    si->used = 0;
    return res;
}

@@ -121,9 +122,38 @@ static bus_sink_cb_res_t sink_cb(uint8_t *read_buf,
    case STATE_AWAITING_HEADER:
    {
        bool valid_header = true;
        bool split_header = false;

        size_t header_rem = sizeof(prot_header_t) - si->used;
        if (read_size > header_rem) {
            printf("surplus read_size %zd\n", read_size);
            printf("header_rem %zd (sizeof(prot_header_t) %zd)\n", header_rem, sizeof(prot_header_t));
            assert(false);
        } else if (read_size < sizeof(prot_header_t)) {
            //printf("split header, %zd\n", read_size);
            split_header = true;
        }

        size_t copied = read_size;
        if (copied > header_rem) { copied = header_rem; }

        memcpy(&si->buf[si->used], read_buf, copied);
        si->used += copied;

        if (si->used < sizeof(prot_header_t)) {
            bus_sink_cb_res_t res = {
                .next_read = sizeof(prot_header_t) - si->used,
            };
            si->state = STATE_AWAITING_HEADER;
            return res;
        }

        assert(si->used == sizeof(prot_header_t));

        prot_header_t *header = (prot_header_t *)&si->buf[0];

        prot_header_t *header = (prot_header_t *)read_buf;
        if (read_size != sizeof(prot_header_t)) {
        if (si->used < sizeof(prot_header_t)) {
            printf("INVALID HEADER A: read_size %zd\n", si->used);
            valid_header = false;
        } else if (header->magic_number != MAGIC_NUMBER) {
            printf("INVALID HEADER B: magic number 0x%08x\n", header->magic_number);
@@ -131,18 +161,17 @@ static bus_sink_cb_res_t sink_cb(uint8_t *read_buf,
        }

        if (valid_header) {
            uint8_t *buf = si->buf;
            prot_header_t *header = (prot_header_t *)read_buf;
            prot_header_t *header = (prot_header_t *)&si->buf[0];
            si->cur_payload_size = header->size;
            memcpy(buf, header, sizeof(*header));
            si->used = sizeof(*header);

            memcpy(&si->buf[si->used], read_buf + copied, read_size - copied);
            si->used += read_size - copied;
            bus_sink_cb_res_t res = {
                .next_read = header->size,
            };
            si->state = STATE_AWAITING_BODY;
            return res;
        } else {
            assert(false);
            return reset_transfer(si);
        }
        
@@ -153,6 +182,7 @@ static bus_sink_cb_res_t sink_cb(uint8_t *read_buf,
        assert(DEFAULT_BUF_SIZE - si->used >= read_size);
        memcpy(&si->buf[si->used], read_buf, read_size);
        si->used += read_size;
        assert(si->used <= si->cur_payload_size + sizeof(prot_header_t));
        size_t rem = si->cur_payload_size + sizeof(prot_header_t) - si->used;

        if (rem == 0) {
@@ -202,6 +232,8 @@ static void unexpected_msg_cb(void *msg,
        int64_t seq_id, void *bus_udata, void *socket_udata) {
    printf("\n\n\nUNEXPECTED MESSAGE: %p, seq_id %lld, bus_udata %p, socket_udata %p\n\n\n\n",
        msg, seq_id, bus_udata, socket_udata);

    assert(false);
}

int main(int argc, char **argv) {
@@ -327,7 +359,7 @@ static void open_sockets(example_state *s) {
        socket99_result res;

        if (!socket99_open(&cfg, &res)) {
            // socket99_fprintf(stderr, &res);
            socket99_fprintf(stderr, &res);
            exit(1);
        }

@@ -446,7 +478,7 @@ static void run_bus(example_state *s, struct bus *b) {
        if (should_send) {
            should_send = false;
            size_t msg_size = construct_msg(msg_buf, buf_size,
                10 * payload_size /* * 1024L*/, seq_id);
                100 * /*payload_size * */ 1024L, seq_id);
            LOG(3, " @@ sending message with %zd bytes\n", msg_size);
            bus_user_msg msg = {
                .fd = s->sockets[cur_socket_i],
+1 −0
Original line number Diff line number Diff line
@@ -76,6 +76,7 @@ static void reverse(struct casq *q) {
    casq_link *to_reverse = NULL;
    for (;;) {                  /* spin, unlink */
        to_reverse = q->accum;
        if (q->accum == NULL) { return; }
        if (ATOMIC_BOOL_COMPARE_AND_SWAP(&q->accum, to_reverse, NULL)) {
            break;
        }
+4 −1
Original line number Diff line number Diff line
@@ -119,6 +119,7 @@ static void parse_args(int argc, char **argv, config *cfg) {
    if (cfg->port_low == 0) { cfg->port_low = cfg->port_high; }
    if (cfg->port_high == 0) { cfg->port_high = cfg->port_low; }
    if (cfg->port_high < cfg->port_low || cfg->port_low == 0) { usage(); }
    if (cfg->verbosity > 0) { printf("verbosity: %d\n", cfg->verbosity); }
}

int main(int argc, char **argv) {
@@ -178,6 +179,7 @@ static void open_ports(config *cfg) {
        } else {
            cfg->accept_fds[i].fd = res.fd;
            cfg->accept_fds[i].events = (POLLIN);
            LOG(2, " -- Accepting on %s:%d\n", scfg.host, scfg.port);
        }
    }
}
@@ -336,6 +338,7 @@ static void handle_client_io(config *cfg, int available) {
        LOG(4, "fd[%d]->events 0x%08x ==> revents: 0x%08x\n", i, fd->events, fd->revents);
        
        if ((fd->revents & POLLERR) || (fd->revents & POLLHUP)) {
            LOG(3, "Disconnecting client %d\n", fd->fd);
            disconnect_client(cfg, fd->fd);
        } else if (fd->revents & POLLOUT) {
            checked++;
@@ -380,7 +383,7 @@ static void handle_client_io(config *cfg, int available) {
                    cfg->last_second, rres);
                enqueue_write(cfg, buf->fd, read_buf, rres);
            } else {

                LOG(2, "else, rres %zd\n", rres);
            }
        }
    }
+459 −152

File changed.

Preview size limit exceeded, changes collapsed.

+7 −0
Original line number Diff line number Diff line
@@ -34,6 +34,13 @@ struct listener *listener_init(struct bus *b, struct bus_config *cfg);
bool listener_add_socket(struct listener *l, connection_info *ci, int notify_fd);
bool listener_remove_socket(struct listener *l, int fd);

/* The sender is about to start a write, the sender should hold on to
 * the response (with timeout) if it arrives before receiving further
 * instructions from the sender. */
bool listener_hold_response(struct listener *l, int fd,
    int64_t seq_id, int16_t timeout_sec);

/* The sender has finished a write, the listener should expect a response. */
bool listener_expect_response(struct listener *l, boxed_msg *box,
    uint16_t *backpressure);

Loading