Commit b70a8106 authored by Scott Vokes's avatar Scott Vokes
Browse files

Don't block on poll if timeouts are pending.

parent 18121e14
Loading
Loading
Loading
Loading
+13 −4
Original line number Diff line number Diff line
@@ -287,6 +287,8 @@ void listener_free(struct listener *l) {

// ==================================================

#define TIMEOUT_DELAY 100

void *listener_mainloop(void *arg) {
    listener *self = (listener *)arg;
    assert(self);
@@ -309,7 +311,8 @@ void *listener_mainloop(void *arg) {
            last_sec = cur_sec;
        }

        int res = poll(self->fds, self->tracked_fds + INCOMING_MSG_PIPE, -1);
        int delay = (self->is_idle ? -1 : TIMEOUT_DELAY);
        int res = poll(self->fds, self->tracked_fds + INCOMING_MSG_PIPE, delay);
        BUS_LOG_SNPRINTF(b, (res == 0 ? 6 : 4), LOG_LISTENER, b->udata, 64,
            "poll res %d", res);

@@ -343,12 +346,12 @@ void *listener_mainloop(void *arg) {

static void check_and_flush_incoming_msg_pipe(listener *l, int *res) {
    struct bus *b = l->bus;
    short ev = l->fds[INCOMING_MSG_PIPE_ID].revents;
    if (ev & (POLLERR | POLLHUP | POLLNVAL)) {  /* hangup/error */
    short events = l->fds[INCOMING_MSG_PIPE_ID].revents;
    if (events & (POLLERR | POLLHUP | POLLNVAL)) {  /* hangup/error */
        return;
    }

    if (ev & POLLIN) {
    if (events & POLLIN) {
        char buf[64];
        for (;;) {
            ssize_t rd = read(l->fds[INCOMING_MSG_PIPE_ID].fd, buf, sizeof(buf));
@@ -697,6 +700,7 @@ static void process_unpacked_message(listener *l,

static void tick_handler(listener *l) {
    struct bus *b = l->bus;
    bool any_work = false;

    BUS_LOG_SNPRINTF(b, 2, LOG_LISTENER, b->udata, 128,
        "tick... %p: %d of %d msgs in use, %d of %d rx_info in use, %d tracked_fds",
@@ -718,6 +722,7 @@ static void tick_handler(listener *l) {
        case RIS_INACTIVE:
            break;
        case RIS_HOLD:
            any_work = true;
            /* Check timeout */
            if (info->timeout_sec == 1) {
                struct timeval tv;
@@ -743,6 +748,7 @@ static void tick_handler(listener *l) {
            }
            break;
        case RIS_EXPECT:
            any_work = true;
            if (info->u.expect.error == RX_ERROR_READY_FOR_DELIVERY) {
                BUS_LOG(b, 4, LOG_LISTENER,
                    "retrying RX event delivery", b->udata);
@@ -786,6 +792,7 @@ static void tick_handler(listener *l) {
            BUS_ASSERT(b, b->udata, false);
        }
    }
    if (!any_work) { l->is_idle = true; }
}

static void dump_rx_info_table(listener *l) {
@@ -1106,6 +1113,8 @@ static void msg_handler(listener *l, listener_msg *pmsg) {
    BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 128,
        "Handling message -- %p", (void*)pmsg);

    l->is_idle = false;

    listener_msg msg = *pmsg;
    switch (msg.type) {

+1 −0
Original line number Diff line number Diff line
@@ -111,6 +111,7 @@ typedef struct listener {
    /* Pipes used to wake the sleeping listener on queue input. */
    int commit_pipe;
    int incoming_msg_pipe;
    bool is_idle;

    rx_info_t rx_info[MAX_PENDING_MESSAGES];
    rx_info_t *rx_info_freelist;
+15 −7
Original line number Diff line number Diff line
@@ -267,14 +267,14 @@ static tx_error_t commit_event_and_block(struct sender *s, tx_info_t *info) {
        BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64,
            "polling done_pipe: %d", res);
        if (res == 1) {
            short ev = fds[0].revents;
            short events = fds[0].revents;
            BUS_LOG_SNPRINTF(b, 8, LOG_SENDER, b->udata, 64,
                "poll: ev %d, errno %d", ev, errno);
            if (ev & (POLLHUP | POLLERR | POLLNVAL)) {
                "poll: events %d, errno %d", events, errno);
            if (events & (POLLHUP | POLLERR | POLLNVAL)) {
                /* We've been hung up on due to a shutdown event. */
                close(info->done_pipe);
                return TX_ERROR_CLOSED;
            } else if (ev & POLLIN) {
            } else if (events & POLLIN) {
                uint16_t backpressure = 0;
                uint8_t buf[sizeof(bool) + sizeof(backpressure)];
                ssize_t rd = read(info->done_pipe, buf, sizeof(buf));
@@ -311,7 +311,7 @@ static tx_error_t commit_event_and_block(struct sender *s, tx_info_t *info) {
            } else {
                /* Shouldn't happen -- blocking. */
                BUS_LOG_SNPRINTF(b, 1, LOG_SENDER, b->udata, 64,
                    "shouldn't happen: ev %d, errno %d", ev, errno);
                    "shouldn't happen: events %d, errno %d", events, errno);
                assert(false);
            }
        } else if (res == -1) {
@@ -326,6 +326,8 @@ static tx_error_t commit_event_and_block(struct sender *s, tx_info_t *info) {
    }
}

#define TIMEOUT_DELAY 100

void *sender_mainloop(void *arg) {
    sender *self = (sender *)arg;
    assert(self);
@@ -337,7 +339,6 @@ void *sender_mainloop(void *arg) {
    
    BUS_LOG(b, 5, LOG_SENDER, "entering main loop", b->udata);
    while (!self->shutdown) {
        
        gettimeofday(&tv, NULL);  // TODO: clock_gettime
        time_t cur_sec = tv.tv_sec;
        if (cur_sec != last_sec) {
@@ -353,7 +354,8 @@ void *sender_mainloop(void *arg) {
         *     sense to use poll here -- self->active_fds will be small. */
        BUS_LOG(b, 7, LOG_SENDER, "polling", b->udata);
        
        int res = poll(self->fds, self->active_fds + CMD_FD, -1);
        int delay = self->is_idle ? -1 : TIMEOUT_DELAY;
        int res = poll(self->fds, self->active_fds + CMD_FD, delay);

        BUS_LOG_SNPRINTF(b, (res == 0 ? 6 : 4), LOG_SENDER, b->udata, 64,
            "poll res %d, active fds %d", res, self->active_fds);
@@ -515,6 +517,8 @@ static void handle_command(sender *s, int id) {
    assert(id < MAX_CONCURRENT_SENDS);
    tx_info_t *info = &s->tx_info[id];

    s->is_idle = false;
    
    switch (info->state) {
    case TIS_ADD_SOCKET:
    {
@@ -975,6 +979,8 @@ static void notify_caller(sender *s, tx_info_t *info, bool success) {
static void tick_handler(sender *s) {
    struct bus *b = s->bus;
    int tx_info_in_use = 0;
    bool any_work = false;

    for (int i = 0; i < MAX_CONCURRENT_SENDS; i++) {
        tx_flag_t bit = (1 << i);
        if (s->tx_flags & bit) {
@@ -993,6 +999,7 @@ static void tick_handler(sender *s) {
    for (int i = 0; i < MAX_CONCURRENT_SENDS; i++) {
        tx_flag_t bit = (1 << i);
        if (s->tx_flags & bit) {  /* if info is in use */
            any_work = true;
            tx_info_t *info = &s->tx_info[i];
            switch (info->state) {
            case TIS_REQUEST_WRITE:
@@ -1012,6 +1019,7 @@ static void tick_handler(sender *s) {
            }
        }
    }
    if (!any_work) { s->is_idle = true; }
}

static void tick_timeout(sender *s, tx_info_t *info) {
+1 −0
Original line number Diff line number Diff line
@@ -154,6 +154,7 @@ typedef struct sender {
     * for write. */
    uint8_t active_fds;
    struct pollfd fds[MAX_CONCURRENT_SENDS + 1];
    bool is_idle;

    /* Hash table for file descriptors in use -> fd_info.
     * This tracks which file descriptors are registered