Commit 18cc1b53 authored by Scott Vokes's avatar Scott Vokes
Browse files

Make threadpool shutdown idempotent and cancel threads on timeout.

parent e928899e
Loading
Loading
Loading
Loading
+12 −0
Original line number Diff line number Diff line
@@ -562,6 +562,9 @@ bool bus_process_boxed_message(struct bus *b,
    return bus_schedule_threadpool_task(b, &task, backpressure);
}

/* How many seconds should it give the thread pool to shut down? */
#define THREAD_SHUTDOWN_SECONDS 5

void bus_free(bus *b) {
    if (b == NULL) { return; }
    bus_shutdown(b);
@@ -576,6 +579,15 @@ void bus_free(bus *b) {
    }
    free(b->listeners);

    int limit = (1000 * THREAD_SHUTDOWN_SECONDS)/10;
    for (int i = 0; i < limit; i++) {
        if (threadpool_shutdown(b->threadpool, false)) { break; }
        (void)poll(NULL, 0, 10);

        if (i == limit - 1) {
            threadpool_shutdown(b->threadpool, true);
        }
    }
    threadpool_free(b->threadpool);

    free(b->joined);
+15 −5
Original line number Diff line number Diff line
@@ -155,6 +155,20 @@ void threadpool_stats(struct threadpool *t, struct threadpool_info *ti) {
bool threadpool_shutdown(struct threadpool *t, bool kill_all) {
    size_t mask = t->task_ringbuf_mask;

    if (kill_all) {
        for (int i = 0; i < t->live_threads; i++) {
            struct thread_info *ti = &t->threads[i];
            if (ti->status < STATUS_SHUTDOWN) {
                ti->status = STATUS_SHUTDOWN;
                if (0 != pthread_cancel(ti->t)) {
                    assert(false);
                }
            }
        }
    }

    notify_shutdown(t);

    while (t->task_commit_head > t->task_request_head) {
        size_t rh = t->task_request_head;

@@ -168,11 +182,7 @@ bool threadpool_shutdown(struct threadpool *t, bool kill_all) {
        }
    }

    notify_shutdown(t);
    if (kill_all) {
        /* TODO: pthread_cancel threads and set STATUS_SHUTDOWN ... */
    }
    return true;
    return notify_shutdown(t);
}

void threadpool_free(struct threadpool *t) {