Commit 3fef92ad authored by Scott Vokes's avatar Scott Vokes
Browse files

Use notification pipe hangup as shutdown event notification response.

This now handles POLLHUP and POLLNVAL on the shutdown command response
pipe as shutdown notifications, closing a race where the client thread
called poll(2) just after the sender thread closed the notification
pipe, leading to an unnoticed POLLNVAL & retry cycle, blocking forever.
(This race condition was easiest to reproduce by setting up a
KineticClient, then shutting it down without doing anything, in a loop.)

Also, add some shutdown logging statements (with high log levels) and
make sender_register_socket and sender_remove_socket calls ignored
after sender_shutdown has been called.
parent 4509db80
Loading
Loading
Loading
Loading
+21 −2
Original line number Diff line number Diff line
@@ -536,13 +536,17 @@ bool bus_shutdown(bus *b) {
    for (int i = 0; i < b->sender_count; i++) {
        int off = 0;
        if (!b->joined[i + off]) {
            BUS_LOG(b, 2, LOG_SHUTDOWN, "sender_shutdown...", b->udata);
            BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128,
                "sender_shutdown -- %d", i);
            while (!sender_shutdown(b->senders[i])) {
                BUS_LOG(b, 2, LOG_SHUTDOWN, "sender_shutdown... (retry)", b->udata);
                BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128,
                    "sender_shutdown -- retry %d", i);
                sleep(1);
            }
            void *unused = NULL;
            int res = pthread_join(b->threads[i + off], &unused);
            BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128,
                "sender_shutdown -- joined %d", i);
            assert(res == 0);
            b->joined[i + off] = true;
        }
@@ -552,11 +556,17 @@ bool bus_shutdown(bus *b) {
    for (int i = 0; i < b->listener_count; i++) {
        int off = b->sender_count;
        if (!b->joined[i + off]) {
            BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128,
                "listener_shutdown -- %d", i);
            while (!listener_shutdown(b->listeners[i])) {
                sleep(1);
            }
            BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128,
                "listener_shutdown -- joining %d", i);
            void *unused = NULL;
            int res = pthread_join(b->threads[i + off], &unused);
            BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128,
                "listener_shutdown -- joined %d", i);
            assert(res == 0);
            b->joined[i + off] = true;
        }
@@ -616,24 +626,33 @@ void bus_free(bus *b) {
    bus_shutdown(b);

    for (int i = 0; i < b->sender_count; i++) {
        BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128,
            "sender_free -- %d", i);
        sender_free(b->senders[i]);
    }
    free(b->senders);

    for (int i = 0; i < b->listener_count; i++) {
        BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128,
            "listener_free -- %d", i);
        listener_free(b->listeners[i]);
    }
    free(b->listeners);

    int limit = (1000 * THREAD_SHUTDOWN_SECONDS)/10;
    for (int i = 0; i < limit; i++) {
        BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128,
            "threadpool_shutdown -- %d", i);
        if (threadpool_shutdown(b->threadpool, false)) { break; }
        (void)poll(NULL, 0, 10);

        if (i == limit - 1) {
            BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128,
                "threadpool_shutdown -- %d (forced)", i);
            threadpool_shutdown(b->threadpool, true);
        }
    }
    BUS_LOG(b, 3, LOG_SHUTDOWN, "threadpool_free", b->udata);
    threadpool_free(b->threadpool);

    free(b->joined);
+56 −42
Original line number Diff line number Diff line
@@ -258,6 +258,14 @@ static bool commit_event_and_block(struct sender *s, tx_info_t *info) {
        BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64,
            "polling done_pipe: %d", res);
        if (res == 1) {
            short ev = fds[0].revents;
            BUS_LOG_SNPRINTF(b, 8, LOG_SENDER, b->udata, 64,
                "poll: ev %d, errno %d", ev, errno);
            if ((ev & POLLHUP) || (ev & POLLERR) || (ev & POLLNVAL)) {
                /* We've been hung up on due to a shutdown event. */
                close(info->done_pipe);
                return true;
            } else if (ev & POLLIN) {
                uint16_t backpressure = 0;
                uint8_t buf[sizeof(bool) + sizeof(backpressure)];
                ssize_t rd = read(info->done_pipe, buf, sizeof(buf));
@@ -291,6 +299,12 @@ static bool commit_event_and_block(struct sender *s, tx_info_t *info) {
                        return false;
                    }
                }
            } else {
                /* Shouldn't happen -- blocking. */
                BUS_LOG_SNPRINTF(b, 1, LOG_SENDER, b->udata, 64,
                    "shouldn't happen: ev %d, errno %d", ev, errno);
                assert(false);
            }
        } else if (res == -1) {
            BUS_LOG_SNPRINTF(b, 10, LOG_SENDER, b->udata, 64,
                "blocking poll for done_pipe: errno %d", errno);
@@ -348,11 +362,12 @@ void *sender_mainloop(void *arg) {
            }
        } else if (res > 0) {
            if (self->fds[0].revents & POLLIN) {
                BUS_LOG(b, 5, LOG_SENDER, "got command(s)", b->udata);
                work = check_incoming_commands(self);
                res--;
            }

            if (self->shutdown) { break; }

            /* If the incoming command pipe isn't the only active FD: */
            if (res > 0) {
                attempt_write(self, res);
@@ -368,7 +383,8 @@ void *sender_mainloop(void *arg) {
        }
    }
    
    BUS_LOG(b, 4, LOG_SENDER, "shutting down", b->udata);
    BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64,
        "shutting down, shutdown == %d", self->shutdown);
    cleanup(self);

    return NULL;
@@ -401,12 +417,9 @@ static void cleanup(sender *s) {
        BUS_LOG_SNPRINTF(b, 6 , LOG_SENDER, b->udata, 64,
            "shutdown_id: %d", shutdown_id);
        if (shutdown_id != -1) {
            /* TODO: The sender_shutdown caller should be notified
             * and unblocked from here, but it doesn't appears to unblock
             * correctly. */
            //notify_caller(s, &s->tx_info[shutdown_id], true);

            close(s->pipes[shutdown_id][0]);
            /* Notify the sender about the shutdown via POLLHUP / POLLNVAL. */
            BUS_LOG(b, 3, LOG_SENDER, "closing to notify shutdown", b->udata);
            /* Client thread will close the other end. */
            close(s->pipes[shutdown_id][1]);
        }
        
@@ -415,6 +428,7 @@ static void cleanup(sender *s) {
}

static bool register_socket_info(sender *s, int fd, SSL *ssl) {
    if (s->shutdown) { return false; }
    fd_info *info = malloc(sizeof(*info));
    if (info == NULL) { 
        return false;
@@ -474,6 +488,8 @@ static void decrement_fd_refcount(sender *s, fd_info *fdi) {
}

static bool release_socket_info(sender *s, int fd) {
    if (s->shutdown) { return false; }

    void *old = NULL;
    if (!yacht_remove(s->fd_hash_table, fd, &old)) {
        return false;
@@ -521,10 +537,8 @@ static void handle_command(sender *s, int id) {
    case TIS_SHUTDOWN:
    {
        s->shutdown = true;
        notify_caller(s, info, true);

        /* TODO: caller should be notified from cleanup(), blocking
         * until shutdown is complete. */
        /* Caller should be notified from cleanup(), by closing the
         * notification pipe. Block until shutdown is complete. */
        break;
    }
    case TIS_REQUEST_ENQUEUE: