Commit 67ba561b authored by Scott Vokes's avatar Scott Vokes
Browse files

Fix race condition and improve threadpool throughput.

1) Use an explicit mark on tasks to indicate which can have the
   task_commit_head and task_release_head counters advance over them.
   The mark must be equal to the ring buffer's cell offset to commit,
   or ~ the offset to release, so the value will be properly initialized
   to avoid false positives every loop around the ring buffer.
   Once a task is ready to commit/release, the counter is advanced as
   far as marks will allow.

2) Copy a task onto the call stack and release it, rather than keeping
   it in the queue while the callback executes. This improves throughput.

3) Change the edge trigger for switching worker threads between their
   asleep and awake states -- rather than marking a thread as asleep
   as soon as it fails to receive work in its awake state loop, wait
   to change it until it has failed to receive work twice (delay > 1).
   This substantially improves the queue throughput, changing the
   no-op benchmark's throughput from ~100 k/sec to ~5 m/sec.

4) Add several asserts throughout.
parent 68d0701a
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
*.a
test_threadpool
test_threadpool_sequencing
test_threadpool_stress
*.o
*.dSYM/
+6 −5
Original line number Diff line number Diff line
@@ -29,14 +29,15 @@

#include "threadpool.h"

/* Start a task that starts another task that starts another task that ... */

static size_t completed_count = 0;
#define MAX_TASKS 32

static void dump_stats(const char *prefix, struct threadpool_info *stats, size_t ticks) {
    printf("%s  -- %8ld thread tasks / sec -- (at %d, dt %d, ta %zd, tc %zd, bl %zd) -- %zd\n",
        prefix, stats->tasks_completed / ticks,
        stats->active_threads, stats->dormant_threads,
        stats->tasks_assigned, stats->tasks_completed, stats->backlog_size,
    printf("%s  -- %8ld thread tasks / sec -- (at %d, dt %d, bl %zd) -- %zd\n",
        prefix, completed_count / ticks,
        stats->active_threads, stats->dormant_threads, stats->backlog_size,
        completed_count);
}

@@ -60,7 +61,7 @@ typedef struct {
    size_t count;
} env;

static size_t limit = 10000000;
static size_t limit = 1000000;

static void task_cb(void *udata) {
    env *e = (env *)udata;
+17 −11
Original line number Diff line number Diff line
@@ -27,13 +27,18 @@

#include "threadpool.h"

/* Stress maximum throughput of no-op tasks */

static size_t task_count = 0;
static size_t last_count = 0;

static void dump_stats(const char *prefix, struct threadpool_info *stats, size_t ticks) {
    printf("%s  -- %8ld thread tasks / sec -- (at %d, dt %d, ta %zd, tc %zd, bl %zd)\n",
        prefix, stats->tasks_completed / ticks,
    size_t delta = task_count - last_count;
    printf("%s  -- %8ld thread tasks / sec -- (at %d, dt %d, bl %zd) -- delta %zd\n",
        prefix, task_count / ticks,
        stats->active_threads, stats->dormant_threads,
        stats->tasks_assigned, stats->tasks_completed, stats->backlog_size);
        stats->backlog_size, delta);
    last_count = task_count;
}

#define ATOMIC_BOOL_COMPARE_AND_SWAP(PTR, OLD, NEW)     \
@@ -56,16 +61,14 @@ static size_t fibs(size_t arg) {
}

static void task_cb(void *udata) {
    //SPIN_ADJ(task_count, 1);
    //task_count++;
    /* fibs(10); */
    SPIN_ADJ(task_count, 1);
    (void)fibs;
    (void)udata;
    (void)task_count;
}

int main(int argc, char **argv) {
    uint8_t sz2 = 8;
    uint8_t sz2 = 12;
    uint8_t max_threads = 8;

    char *sz2_env = getenv("SZ2");
@@ -101,11 +104,14 @@ int main(int argc, char **argv) {
            dump_stats("tick...", &stats, ticks);
        }

        for (size_t i = 0; i < 1000; i++) {
            if (!threadpool_schedule(t, &task, &counterpressure)) {
            size_t msec = 1 * 1000 * counterpressure;
            usleep(msec);
                size_t msec = i * 1000 * counterpressure;
                usleep(msec >> 12);
            } else {
                break;
            }
        }

    }

    return 0;
+100 −119
Original line number Diff line number Diff line
@@ -26,77 +26,9 @@
#include <poll.h>
#include <errno.h>

#include "threadpool.h"

typedef enum {
    STATUS_NONE,                /* undefined status */
    STATUS_ASLEEP,              /* thread is poll-sleeping to reduce CPU */
    STATUS_AWAKE,               /* thread is active */
    STATUS_SHUTDOWN,            /* thread has been notified about shutdown */
    STATUS_JOINED,              /* thread has been pthread_join'd */
} thread_status_t;

/* Info retained by a thread while working. */
struct thread_info {
    pthread_t t;
    int parent_fd;
    int child_fd;
    thread_status_t status;
};

/* Thread_info, plus pointer back to main threadpool manager. */
struct thread_context {
    struct threadpool *t;
    struct thread_info *ti;
};

/* Internal threadpool state */
struct threadpool {
    /* reserve -> commit -> request -> release */
    size_t task_reserve_head;   /* reserved for write */
    size_t task_commit_head;    /* done with write */
    size_t task_request_head;   /* requested for read */
    size_t task_release_head;   /* done processing task, can overwrite */

    struct threadpool_task *tasks;
    /* user stats */
    size_t tasks_assigned;
    size_t tasks_completed;

    size_t max_delay;           /* ceil of exponential sleep back-off */

    uint8_t task_ringbuf_size2;
    uint8_t max_threads;

    uint8_t active_threads;
    uint8_t live_threads;
    struct thread_info *threads;
};

#define ATOMIC_BOOL_COMPARE_AND_SWAP(PTR, OLD, NEW)     \
    (__sync_bool_compare_and_swap(PTR, OLD, NEW))

#define NOTIFY_MSG "!"
#define NOTIFY_MSG_LEN 1
#include "threadpool_internals.h"

/* Spin attempting to atomically adjust F by ADJ until successful */
#define SPIN_ADJ(F, ADJ)                                                \
    do {                                                                \
        for (;;) {                                                      \
            size_t v = F;                                               \
            if (ATOMIC_BOOL_COMPARE_AND_SWAP(&F, v, v + ADJ)) {         \
                break;                                                  \
            }                                                           \
        }                                                               \
    } while (0)


static void notify_new_task(struct threadpool *t);
static bool notify_shutdown(struct threadpool *t);
static bool spawn(struct threadpool *t);
static void *thread_task(void *thread_info);

#define DEFAULT_MAX_DELAY 100   /* msec */
#define DEFAULT_MAX_DELAY 1000   /* msec */
#define DEFAULT_TASK_RINGBUF_SIZE2 8
#define DEFAULT_MAX_THREADS 4

@@ -118,13 +50,13 @@ struct threadpool *threadpool_init(struct threadpool_config *cfg) {
    if (cfg->max_threads < 1) { return NULL; }

    struct threadpool *t = NULL;
    struct threadpool_task *tasks = NULL;
    struct marked_task *tasks = NULL;
    struct thread_info *threads = NULL;

    t = malloc(sizeof(*t));
    if (t == NULL) { goto cleanup; }

    size_t tasks_sz = (1 << cfg->task_ringbuf_size2) * sizeof(struct threadpool_task);
    size_t tasks_sz = (1 << cfg->task_ringbuf_size2) * sizeof(*tasks);
    size_t threads_sz = cfg->max_threads * sizeof(struct thread_info);

    tasks = malloc(tasks_sz);
@@ -134,12 +66,18 @@ struct threadpool *threadpool_init(struct threadpool_config *cfg) {
    if (threads == NULL) { goto cleanup; }

    memset(t, 0, sizeof(*t));
    memset(tasks, 0, tasks_sz);
    memset(threads, 0, threads_sz);

    /* Note: tasks is memset to a non-0 value so that the first slot,
     * tasks[0].mark, will not match its ID and leave it in a
     * prematurely commit-able state. */
    memset(tasks, 0xFF, tasks_sz);

    t->tasks = tasks;
    t->threads = threads;
    t->task_ringbuf_size = 1 << cfg->task_ringbuf_size2;
    t->task_ringbuf_size2 = cfg->task_ringbuf_size2;
    t->task_ringbuf_mask = t->task_ringbuf_size - 1;
    t->max_threads = cfg->max_threads;
    t->max_delay = cfg->max_delay;
    return t;
@@ -156,49 +94,71 @@ bool threadpool_schedule(struct threadpool *t, struct threadpool_task *task,
    if (t == NULL) { return false; }
    if (task == NULL || task->task == NULL) { return false; }

    size_t queue_size = (1 << t->task_ringbuf_size2);
    size_t mask = queue_size - 1;
    //size_t queue_size = (1 << t->task_ringbuf_size2) - 1;
    size_t queue_size = t->task_ringbuf_size - 1;
    size_t mask = queue_size;

    for (;;) {
        size_t wh = t->task_reserve_head;
        size_t rh = t->task_release_head;

        size_t rem = ((wh + queue_size) - rh) & mask;
        if (rem == mask) {
            if (pushback) { *pushback = rem; }
        if (wh - rh >= queue_size - 1) {
            if (pushback) { *pushback = wh - rh; }
            //printf("FULL, %zd, %zd\n", wh - rh, t->task_commit_head - t->task_request_head);
            return false;       /* full, cannot schedule */
        }
        struct threadpool_task *tbuf = &t->tasks[wh & mask];

        if (ATOMIC_BOOL_COMPARE_AND_SWAP(&t->task_reserve_head, wh, wh + 1)) {
            memcpy(tbuf, task, sizeof(*task));
            assert(t->task_reserve_head - t->task_release_head < queue_size);
            struct marked_task *tbuf = &t->tasks[wh & mask];
            tbuf->task = task->task;
            tbuf->cleanup = task->cleanup;
            tbuf->udata = task->udata;

            notify_new_task(t);
            if (pushback) { *pushback = rem; }
            SPIN_ADJ(t->task_commit_head, 1);
            SPIN_ADJ(t->tasks_assigned, 1);
            if (pushback) { *pushback = wh - rh; }

            commit_current_task(t, tbuf, wh);
            //printf("delta %zd\n", wh);
            return true;
        }
    }
}

static void commit_current_task(struct threadpool *t, struct marked_task *task, size_t wh) {
    size_t mask = t->task_ringbuf_mask;
    task->mark = wh;
    for (;;) {
        size_t ch = t->task_commit_head;
        task = &t->tasks[ch & mask];
        if (ch != task->mark) { break; }
        assert(ch < t->task_reserve_head);
        if (ATOMIC_BOOL_COMPARE_AND_SWAP(&t->task_commit_head, ch, ch + 1)) {
            assert(t->task_request_head <= t->task_commit_head);
        }
    }
}

void threadpool_stats(struct threadpool *t, struct threadpool_info *ti) {
    if (ti) {
        ti->active_threads = t->active_threads;
        ti->dormant_threads = t->live_threads - t->active_threads;
        ti->tasks_assigned = t->tasks_assigned;
        ti->tasks_completed = t->tasks_completed;
        uint8_t at = 0;
        for (int i = 0; i < t->live_threads; i++) {
            struct thread_info *ti = &t->threads[i];
            if (ti->status == STATUS_AWAKE) { at++; }
        }
        ti->active_threads = at;

        ti->dormant_threads = t->live_threads - at;
        ti->backlog_size = t->task_commit_head - t->task_request_head;
    }
}

bool threadpool_shutdown(struct threadpool *t, bool kill_all) {
    size_t mask = (1 << t->task_ringbuf_size2) - 1;
    size_t mask = t->task_ringbuf_mask;

    while (t->task_commit_head > t->task_request_head) {
        size_t rh = t->task_request_head;

        struct threadpool_task *tbuf = &t->tasks[rh & mask];
        struct marked_task *tbuf = &t->tasks[rh & mask];
        if (ATOMIC_BOOL_COMPARE_AND_SWAP(&t->task_request_head, rh, rh + 1)) {
            if (tbuf->cleanup) {
                tbuf->cleanup(tbuf->udata);
@@ -210,7 +170,7 @@ bool threadpool_shutdown(struct threadpool *t, bool kill_all) {

    notify_shutdown(t);
    if (kill_all) {
        /* TODO: pthread_kill threads and set STATUS_SHUTDOWN ... */
        /* TODO: pthread_cancel threads and set STATUS_SHUTDOWN ... */
    }
    return true;
}
@@ -225,7 +185,6 @@ void threadpool_free(struct threadpool *t) {

static void notify_new_task(struct threadpool *t) {
    /* FIXME: should this be 'if any are sleeping'? needs benchmarking. */
    if (t->active_threads < t->live_threads) { /* wake up */
    for (int i = 0; i < t->live_threads; i++) {
        struct thread_info *ti = &t->threads[i];
        if (ti->status == STATUS_ASLEEP) {
@@ -240,11 +199,9 @@ static void notify_new_task(struct threadpool *t) {
            }
        }
    }
    }

    if (t->live_threads < t->max_threads) { /* spawn */
        if (spawn(t)) {
            SPIN_ADJ(t->active_threads, 1);
            SPIN_ADJ(t->live_threads, 1);
        }
    } else {
@@ -310,7 +267,7 @@ static void *thread_task(void *arg) {
    struct threadpool *t = tc->t;
    struct thread_info *ti = tc->ti;

    size_t mask = (1 << t->task_ringbuf_size2) - 1;
    size_t mask = t->task_ringbuf_mask;
    struct pollfd pfd[1] = { { .fd=ti->child_fd, .events=POLLIN }, };
    uint8_t read_buf[NOTIFY_MSG_LEN];

@@ -319,10 +276,13 @@ static void *thread_task(void *arg) {
    for (;;) {
        if (t->task_request_head == t->task_commit_head) {
            if (ti->status == STATUS_AWAKE) {
                ti->status = STATUS_ASLEEP;
                SPIN_ADJ(t->active_threads, -1);
                if (delay > 1) { ti->status = STATUS_ASLEEP; }
            } else {
                if (delay == 0) {
                    delay = 1;
                } else {
                    delay <<= 1;
                }
                if (delay > t->max_delay) {
                    delay = t->max_delay;
                }
@@ -338,9 +298,9 @@ static void *thread_task(void *arg) {
                    free(tc);
                    return NULL;
                } else if (pfd[0].revents & POLLIN) {
                    ti->status = STATUS_AWAKE;
                    delay = 1;
                    SPIN_ADJ(t->active_threads, 1);
                    if (ti->status == STATUS_ASLEEP) { ti->status = STATUS_AWAKE; }
                    delay = 0;
                    //SPIN_ADJ(t->active_threads, 1);
                    ssize_t rres = read(ti->child_fd, read_buf, NOTIFY_MSG_LEN);
                    if (rres < 0) {
                        assert(0);
@@ -350,18 +310,39 @@ static void *thread_task(void *arg) {
        }

        for (;;) {
            size_t wh = t->task_commit_head;
            size_t ch = t->task_commit_head;
            size_t rh = t->task_request_head;
            if (rh == wh) {
            if (rh > ch - 1) {
                break;          /* nothing to do */
            }
            struct threadpool_task *task = &t->tasks[rh & mask];
            if (ATOMIC_BOOL_COMPARE_AND_SWAP(&t->task_request_head, rh, rh + 1)) {
                task->task(task->udata);
                SPIN_ADJ(t->task_release_head, 1);
                SPIN_ADJ(t->tasks_completed, 1);
                struct marked_task *ptask = &t->tasks[rh & mask];
                assert(ptask->mark == rh);

                struct marked_task task = {
                    .task = ptask->task,
                    .cleanup = ptask->cleanup,
                    .udata = ptask->udata,
                };

                release_current_task(t, ptask, rh);
                ptask = NULL;
                task.task(task.udata);
                break;
            }
        }
    }
}

static void release_current_task(struct threadpool *t, struct marked_task *task, size_t rh) {
    size_t mask = t->task_ringbuf_mask;
    task->mark = ~rh;
    for (;;) {
        size_t relh = t->task_release_head;
        task = &t->tasks[relh & mask];
        if (task->mark != ~relh) { break; }
        if (ATOMIC_BOOL_COMPARE_AND_SWAP(&t->task_release_head, relh, relh + 1)) {
            assert(relh < t->task_commit_head);
        }
    }
}
+0 −2
Original line number Diff line number Diff line
@@ -55,8 +55,6 @@ struct threadpool_task {
struct threadpool_info {
    uint8_t active_threads;
    uint8_t dormant_threads;
    size_t tasks_assigned;
    size_t tasks_completed;
    size_t backlog_size;
};

Loading