Loading src/lib/threadpool/threadpool.c +0 −3 Original line number Diff line number Diff line Loading @@ -46,7 +46,6 @@ static void set_defaults(struct threadpool_config *cfg) { cfg->task_ringbuf_size2 = DEFAULT_TASK_RINGBUF_SIZE2; } if (cfg->max_delay == 0) { cfg->max_delay = DEFAULT_MAX_DELAY; } if (cfg->max_threads == 0) { cfg->max_threads = DEFAULT_MAX_THREADS; } } Loading Loading @@ -88,7 +87,6 @@ struct threadpool *threadpool_init(struct threadpool_config *cfg) { t->task_ringbuf_size2 = cfg->task_ringbuf_size2; t->task_ringbuf_mask = t->task_ringbuf_size - 1; t->max_threads = cfg->max_threads; t->max_delay = cfg->max_delay; return t; cleanup: Loading @@ -107,7 +105,6 @@ bool threadpool_schedule(struct threadpool *t, struct threadpool_task *task, * shutting down. */ if (t->shutting_down) { return false; } //size_t queue_size = (1 << t->task_ringbuf_size2) - 1; size_t queue_size = t->task_ringbuf_size - 1; size_t mask = queue_size; Loading src/lib/threadpool/threadpool_internals.h +21 −17 Original line number Diff line number Diff line Loading @@ -33,10 +33,10 @@ typedef enum { /* Info retained by a thread while working. */ struct thread_info { pthread_t t; int parent_fd; int child_fd; thread_status_t status; pthread_t t; /* thread */ int parent_fd; /* alert pipe parent writes into */ int child_fd; /* alert pipe child reads from */ thread_status_t status; /* current worker thread status */ }; /* Thread_info, plus pointer back to main threadpool manager. */ Loading @@ -50,7 +50,7 @@ struct marked_task { threadpool_task_cb *task; threadpool_task_cleanup_cb *cleanup; void *udata; /* This mark is used to indicate which tasks can have the comit_head /* This mark is used to indicate which tasks can have the commit_head * and release_head counters advanced past them. * * mark == (task_commit_head that points at tast): commit Loading @@ -61,35 +61,39 @@ struct marked_task { size_t mark; }; /* Internal threadpool state */ /* Internal threadpool state. */ struct threadpool { /* reserve -> commit -> request -> release */ size_t task_reserve_head; /* reserved for write */ size_t task_commit_head; /* done with write */ size_t task_request_head; /* requested for read */ size_t task_release_head; /* done processing task, can overwrite */ size_t task_release_head; /* done processing task, can be overwritten */ struct marked_task *tasks; /* user stats */ size_t task_ringbuf_size; size_t task_ringbuf_mask; struct marked_task *tasks; /* ring buffer for tasks */ size_t max_delay; /* ceil of exponential sleep back-off */ uint8_t task_ringbuf_size2; /* Size and mask. These can be derived from task_ringbuf_size2, * but are cached to reduce CPU. */ size_t task_ringbuf_size; /* size of ring buffer */ size_t task_ringbuf_mask; /* mask to fit counter within ring buffer */ uint8_t task_ringbuf_size2; /* log2 of size of ring buffer */ bool shutting_down; uint8_t live_threads; uint8_t max_threads; bool shutting_down; /* shutdown has been called */ uint8_t live_threads; /* currently live threads */ uint8_t max_threads; /* max number of threads to start */ struct thread_info *threads; }; /* Do an atomic compare-and-swap, changing *PTR from OLD to NEW. Returns * true if the swap succeeded, false if it failed (generally because * another thread updated the memory first). */ #define ATOMIC_BOOL_COMPARE_AND_SWAP(PTR, OLD, NEW) \ (__sync_bool_compare_and_swap(PTR, OLD, NEW)) /* Message sent to wake up a thread. The message contents are currently unimportant. */ #define NOTIFY_MSG "!" #define NOTIFY_MSG_LEN 1 /* Spin attempting to atomically adjust F by ADJ until successful */ /* Spin attempting to atomically adjust F by ADJ until successful. */ #define SPIN_ADJ(F, ADJ) \ do { \ for (;;) { \ Loading Loading
src/lib/threadpool/threadpool.c +0 −3 Original line number Diff line number Diff line Loading @@ -46,7 +46,6 @@ static void set_defaults(struct threadpool_config *cfg) { cfg->task_ringbuf_size2 = DEFAULT_TASK_RINGBUF_SIZE2; } if (cfg->max_delay == 0) { cfg->max_delay = DEFAULT_MAX_DELAY; } if (cfg->max_threads == 0) { cfg->max_threads = DEFAULT_MAX_THREADS; } } Loading Loading @@ -88,7 +87,6 @@ struct threadpool *threadpool_init(struct threadpool_config *cfg) { t->task_ringbuf_size2 = cfg->task_ringbuf_size2; t->task_ringbuf_mask = t->task_ringbuf_size - 1; t->max_threads = cfg->max_threads; t->max_delay = cfg->max_delay; return t; cleanup: Loading @@ -107,7 +105,6 @@ bool threadpool_schedule(struct threadpool *t, struct threadpool_task *task, * shutting down. */ if (t->shutting_down) { return false; } //size_t queue_size = (1 << t->task_ringbuf_size2) - 1; size_t queue_size = t->task_ringbuf_size - 1; size_t mask = queue_size; Loading
src/lib/threadpool/threadpool_internals.h +21 −17 Original line number Diff line number Diff line Loading @@ -33,10 +33,10 @@ typedef enum { /* Info retained by a thread while working. */ struct thread_info { pthread_t t; int parent_fd; int child_fd; thread_status_t status; pthread_t t; /* thread */ int parent_fd; /* alert pipe parent writes into */ int child_fd; /* alert pipe child reads from */ thread_status_t status; /* current worker thread status */ }; /* Thread_info, plus pointer back to main threadpool manager. */ Loading @@ -50,7 +50,7 @@ struct marked_task { threadpool_task_cb *task; threadpool_task_cleanup_cb *cleanup; void *udata; /* This mark is used to indicate which tasks can have the comit_head /* This mark is used to indicate which tasks can have the commit_head * and release_head counters advanced past them. * * mark == (task_commit_head that points at tast): commit Loading @@ -61,35 +61,39 @@ struct marked_task { size_t mark; }; /* Internal threadpool state */ /* Internal threadpool state. */ struct threadpool { /* reserve -> commit -> request -> release */ size_t task_reserve_head; /* reserved for write */ size_t task_commit_head; /* done with write */ size_t task_request_head; /* requested for read */ size_t task_release_head; /* done processing task, can overwrite */ size_t task_release_head; /* done processing task, can be overwritten */ struct marked_task *tasks; /* user stats */ size_t task_ringbuf_size; size_t task_ringbuf_mask; struct marked_task *tasks; /* ring buffer for tasks */ size_t max_delay; /* ceil of exponential sleep back-off */ uint8_t task_ringbuf_size2; /* Size and mask. These can be derived from task_ringbuf_size2, * but are cached to reduce CPU. */ size_t task_ringbuf_size; /* size of ring buffer */ size_t task_ringbuf_mask; /* mask to fit counter within ring buffer */ uint8_t task_ringbuf_size2; /* log2 of size of ring buffer */ bool shutting_down; uint8_t live_threads; uint8_t max_threads; bool shutting_down; /* shutdown has been called */ uint8_t live_threads; /* currently live threads */ uint8_t max_threads; /* max number of threads to start */ struct thread_info *threads; }; /* Do an atomic compare-and-swap, changing *PTR from OLD to NEW. Returns * true if the swap succeeded, false if it failed (generally because * another thread updated the memory first). */ #define ATOMIC_BOOL_COMPARE_AND_SWAP(PTR, OLD, NEW) \ (__sync_bool_compare_and_swap(PTR, OLD, NEW)) /* Message sent to wake up a thread. The message contents are currently unimportant. */ #define NOTIFY_MSG "!" #define NOTIFY_MSG_LEN 1 /* Spin attempting to atomically adjust F by ADJ until successful */ /* Spin attempting to atomically adjust F by ADJ until successful. */ #define SPIN_ADJ(F, ADJ) \ do { \ for (;;) { \ Loading