Commit bbef064a authored by Scott Vokes's avatar Scott Vokes
Browse files

Make threadpool_shutdown logic monotonic. Error on new tasks after shutdown.

parent 345003c9
Loading
Loading
Loading
Loading
+12 −5
Original line number Diff line number Diff line
@@ -94,6 +94,10 @@ bool threadpool_schedule(struct threadpool *t, struct threadpool_task *task,
    if (t == NULL) { return false; }
    if (task == NULL || task->task == NULL) { return false; }

    /* New tasks must not be scheduled after the threadpool starts
     * shutting down. */
    if (t->shutting_down) { return false; }

    //size_t queue_size = (1 << t->task_ringbuf_size2) - 1;
    size_t queue_size = t->task_ringbuf_size - 1;
    size_t mask = queue_size;
@@ -153,6 +157,7 @@ void threadpool_stats(struct threadpool *t, struct threadpool_info *ti) {
}

bool threadpool_shutdown(struct threadpool *t, bool kill_all) {
    t->shutting_down = true;
    size_t mask = t->task_ringbuf_mask;

    if (kill_all) {
@@ -282,7 +287,7 @@ static void *thread_task(void *arg) {

    size_t delay = 1;

    for (;;) {
    while (ti->status < STATUS_SHUTDOWN) {
        if (t->task_request_head == t->task_commit_head) {
            if (ti->status == STATUS_AWAKE) {
                if (delay > 1) { ti->status = STATUS_ASLEEP; }
@@ -303,9 +308,7 @@ static void *thread_task(void *arg) {
                    /* TODO: HUP should be distinct from ERR -- hup is
                     * intentional shutdown, ERR probably isn't. */
                    ti->status = STATUS_SHUTDOWN;
                    close(ti->child_fd);
                    free(tc);
                    return NULL;
                    break;
                } else if (pfd[0].revents & POLLIN) {
                    if (ti->status == STATUS_ASLEEP) { ti->status = STATUS_AWAKE; }
                    delay = 0;
@@ -318,7 +321,7 @@ static void *thread_task(void *arg) {
            }
        }

        for (;;) {
        while (ti->status < STATUS_SHUTDOWN) {
            size_t ch = t->task_commit_head;
            size_t rh = t->task_request_head;
            if (rh > ch - 1) {
@@ -341,6 +344,10 @@ static void *thread_task(void *arg) {
            }
        }
    }

    close(ti->child_fd);
    free(tc);
    return NULL;
}

static void release_current_task(struct threadpool *t, struct marked_task *task, size_t rh) {
+2 −1
Original line number Diff line number Diff line
@@ -62,7 +62,8 @@ struct threadpool_info {
struct threadpool *threadpool_init(struct threadpool_config *cfg);

/* Schedule a task in the threadpool. Returns whether the task was successfully
 * registered or not.
 * registered or not. If threadpool_shutdown has been called, this
 * function will always return false, due to API misuse.
 *
 * If *pushback is non-NULL, it will be set to the number of tasks
 * in the backlog, so code upstream can provide counterpressure.
+1 −0
Original line number Diff line number Diff line
@@ -57,6 +57,7 @@ struct threadpool {
    size_t max_delay;           /* ceil of exponential sleep back-off */
    uint8_t task_ringbuf_size2;

    bool shutting_down;
    uint8_t live_threads;
    uint8_t max_threads;
    struct thread_info *threads;