Commit ecaa60eb authored by Scott Vokes's avatar Scott Vokes
Browse files

Make stress test periodically sleep, to stress threadpool sleep/wake-up cycle.

Also, make naming consistent within threadpool module.
parent 8f77add6
Loading
Loading
Loading
Loading
+9 −9
Original line number Diff line number Diff line
@@ -137,7 +137,7 @@ bool Bus_Init(bus_config *config, struct bus_result *res) {
        }
    }

    tp = threadpool_init(&config->threadpool_cfg);
    tp = Threadpool_Init(&config->threadpool_cfg);
    if (tp == NULL) {
        res->status = BUS_INIT_ERROR_THREADPOOL_INIT_FAIL;
        goto cleanup;
@@ -182,7 +182,7 @@ cleanup:
        }
        free(ls);
    }
    if (tp) { threadpool_free(tp); }
    if (tp) { Threadpool_Free(tp); }
    if (joined) { free(joined); }
    if (b) {
        if (locks_initialized > 1) {
@@ -606,7 +606,7 @@ bool Bus_ProcessBoxedMessage(struct bus *b,

    BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128,
        "Scheduling boxed message -- %p -- where it will be freed", (void*)box);
    return threadpool_schedule(b->threadpool, &task, backpressure);
    return Threadpool_Schedule(b->threadpool, &task, backpressure);
}

/* How many seconds should it give the thread pool to shut down? */
@@ -629,18 +629,18 @@ void Bus_Free(bus *b) {
    int limit = (1000 * THREAD_SHUTDOWN_SECONDS)/10;
    for (int i = 0; i < limit; i++) {
        BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128,
            "threadpool_shutdown -- %d", i);
        if (threadpool_shutdown(b->threadpool, false)) { break; }
            "Threadpool_Shutdown -- %d", i);
        if (Threadpool_Shutdown(b->threadpool, false)) { break; }
        (void)syscall_poll(NULL, 0, 10);

        if (i == limit - 1) {
            BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128,
                "threadpool_shutdown -- %d (forced)", i);
            threadpool_shutdown(b->threadpool, true);
                "Threadpool_Shutdown -- %d (forced)", i);
            Threadpool_Shutdown(b->threadpool, true);
        }
    }
    BUS_LOG(b, 3, LOG_SHUTDOWN, "threadpool_free", b->udata);
    threadpool_free(b->threadpool);
    BUS_LOG(b, 3, LOG_SHUTDOWN, "Threadpool_Free", b->udata);
    Threadpool_Free(b->threadpool);
    free(b->joined);
    free(b->threads);
    pthread_mutex_destroy(&b->fd_set_lock);
+9 −9
Original line number Diff line number Diff line
@@ -61,7 +61,7 @@ static void task_cb(void *udata) {
    printf("%zd -- fibs(%zd) => %zd", tc, arg, res);
    
    struct threadpool_info stats;
    threadpool_stats(t, &stats);
    Threadpool_Stats(t, &stats);
    dump_stats("", &stats);
}

@@ -85,7 +85,7 @@ int main(int argc, char **argv) {
        .task_ringbuf_size2 = sz2,
        .max_threads = max_threads,
    };
    struct threadpool *t = threadpool_init(&cfg);
    struct threadpool *t = Threadpool_Init(&cfg);
    assert(t);

    struct threadpool_task task = {
@@ -99,7 +99,7 @@ int main(int argc, char **argv) {
        for (int j = 0; j < 40; j++) {
            for (;;) {
                size_t counterpressure = 0;
                if (threadpool_schedule(t, &task, &counterpressure)) {
                if (Threadpool_Schedule(t, &task, &counterpressure)) {
                    break;
                } else {
                    size_t msec = 10 * 1000 * counterpressure;
@@ -108,10 +108,10 @@ int main(int argc, char **argv) {
            }
        }

        threadpool_stats(t, &stats);
        Threadpool_Stats(t, &stats);
        dump_stats("sleeping...", &stats);
        sleep(1);
        threadpool_stats(t, &stats);
        Threadpool_Stats(t, &stats);
        dump_stats("waking...", &stats);
    }

@@ -121,7 +121,7 @@ int main(int argc, char **argv) {

    task.task = inf_loop_cb;
    size_t counterpressure = 0;
    while (!threadpool_schedule(t, &task, &counterpressure)) {
    while (!Threadpool_Schedule(t, &task, &counterpressure)) {
        usleep(10 * 1000);
    }

@@ -132,15 +132,15 @@ int main(int argc, char **argv) {

    int limit = (1000 * THREAD_SHUTDOWN_SECONDS)/10;
    for (int i = 0; i < limit; i++) {
        if (threadpool_shutdown(t, false)) { break; }
        if (Threadpool_Shutdown(t, false)) { break; }
        (void)poll(NULL, 0, 10);

        if (i == limit - 1) {
            printf("cancelling thread in intentional infinite loop\n");
            threadpool_shutdown(t, true);
            Threadpool_Shutdown(t, true);
        }
    }
    threadpool_free(t);
    Threadpool_Free(t);

    return 0;
}
+4 −4
Original line number Diff line number Diff line
@@ -73,7 +73,7 @@ static void task_cb(void *udata) {
            printf("count: %zd on %p\n", e->count, (void *)pthread_self());
        }
        for (;;) {
            if (threadpool_schedule(e->t, e->task, NULL)) {
            if (Threadpool_Schedule(e->t, e->task, NULL)) {
                break;
            }
            usleep(1 * 1000);
@@ -101,7 +101,7 @@ int main(int argc, char **argv) {
        .task_ringbuf_size2 = sz2,
        .max_threads = max_threads,
    };
    struct threadpool *t = threadpool_init(&cfg);
    struct threadpool *t = Threadpool_Init(&cfg);
    assert(t);

    struct threadpool_info stats;
@@ -121,7 +121,7 @@ int main(int argc, char **argv) {
        tasks[i] = (struct threadpool_task){ .task = task_cb, .udata = &envs[i], };

        for (;;) {
            if (threadpool_schedule(t, &tasks[i], &counterpressure)) {
            if (Threadpool_Schedule(t, &tasks[i], &counterpressure)) {
                break;
            }
        }        
@@ -133,7 +133,7 @@ int main(int argc, char **argv) {
        gettimeofday(&tv, NULL);
        if (tv.tv_sec > last_sec) {
            last_sec = tv.tv_sec;
            threadpool_stats(t, &stats);
            Threadpool_Stats(t, &stats);
            ticks++;
            dump_stats("tick...", &stats, ticks);
        }
+9 −3
Original line number Diff line number Diff line
@@ -80,7 +80,7 @@ int main(int argc, char **argv) {
        .task_ringbuf_size2 = sz2,
        .max_threads = max_threads,
    };
    struct threadpool *t = threadpool_init(&cfg);
    struct threadpool *t = Threadpool_Init(&cfg);
    assert(t);

    struct threadpool_task task = {
@@ -99,13 +99,19 @@ int main(int argc, char **argv) {
        gettimeofday(&tv, NULL);
        if (tv.tv_sec > last_sec) {
            last_sec = tv.tv_sec;
            threadpool_stats(t, &stats);
            Threadpool_Stats(t, &stats);
            ticks++;
            dump_stats("tick...", &stats, ticks);
        }

        /* Every 16 seconds, pause scheduling for 5 seconds to test
         * thread sleep/wake-up alerts. */
        if ((ticks & 15) == 0) {
            sleep(5);
        }

        for (size_t i = 0; i < 1000; i++) {
            if (!threadpool_schedule(t, &task, &counterpressure)) {
            if (!Threadpool_Schedule(t, &task, &counterpressure)) {
                size_t msec = i * 1000 * counterpressure;
                usleep(msec >> 12);
            } else {
+5 −5
Original line number Diff line number Diff line
@@ -49,7 +49,7 @@ static void set_defaults(struct threadpool_config *cfg) {
    if (cfg->max_threads == 0) { cfg->max_threads = DEFAULT_MAX_THREADS; }
}

struct threadpool *threadpool_init(struct threadpool_config *cfg) {
struct threadpool *Threadpool_Init(struct threadpool_config *cfg) {
    set_defaults(cfg);

    if (cfg->task_ringbuf_size2 > THREADPOOL_MAX_RINGBUF_SIZE2) {
@@ -96,7 +96,7 @@ cleanup:
    return NULL;
}

bool threadpool_schedule(struct threadpool *t, struct threadpool_task *task,
bool Threadpool_Schedule(struct threadpool *t, struct threadpool_task *task,
        size_t *pushback) {
    if (t == NULL) { return false; }
    if (task == NULL || task->task == NULL) { return false; }
@@ -146,7 +146,7 @@ static void commit_current_task(struct threadpool *t, struct marked_task *task,
    }
}

void threadpool_stats(struct threadpool *t, struct threadpool_info *info) {
void Threadpool_Stats(struct threadpool *t, struct threadpool_info *info) {
    if (info) {
        uint8_t at = 0;
        for (int i = 0; i < t->live_threads; i++) {
@@ -160,7 +160,7 @@ void threadpool_stats(struct threadpool *t, struct threadpool_info *info) {
    }
}

bool threadpool_shutdown(struct threadpool *t, bool kill_all) {
bool Threadpool_Shutdown(struct threadpool *t, bool kill_all) {
    t->shutting_down = true;
    size_t mask = t->task_ringbuf_mask;

@@ -197,7 +197,7 @@ bool threadpool_shutdown(struct threadpool *t, bool kill_all) {
    return notify_shutdown(t);
}

void threadpool_free(struct threadpool *t) {
void Threadpool_Free(struct threadpool *t) {
    free(t->tasks);
    t->tasks = NULL;
    free(t->threads);
Loading