Loading src/lib/bus/listener.c +1 −18 Original line number Diff line number Diff line Loading @@ -291,8 +291,6 @@ void *listener_mainloop(void *arg) { listener *self = (listener *)arg; assert(self); struct bus *b = self->bus; int timeout = MIN_DELAY; struct timeval tv; gettimeofday(&tv, NULL); Loading @@ -304,7 +302,6 @@ void *listener_mainloop(void *arg) { * internal locking. */ while (!self->shutdown) { bool work_done = false; gettimeofday(&tv, NULL); // TODO: clock_gettime time_t cur_sec = tv.tv_sec; if (cur_sec != last_sec) { Loading @@ -312,7 +309,7 @@ void *listener_mainloop(void *arg) { last_sec = cur_sec; } int res = poll(self->fds, self->tracked_fds + INCOMING_MSG_PIPE, timeout); int res = poll(self->fds, self->tracked_fds + INCOMING_MSG_PIPE, -1); BUS_LOG_SNPRINTF(b, (res == 0 ? 6 : 4), LOG_LISTENER, b->udata, 64, "poll res %d", res); Loading @@ -322,8 +319,6 @@ void *listener_mainloop(void *arg) { msg_handler(self, msg); listener_msg *nmsg = casq_pop(self->q); msg = nmsg; timeout = 0; work_done = true; } if (res < 0) { Loading @@ -337,21 +332,9 @@ void *listener_mainloop(void *arg) { } else if (res > 0) { check_and_flush_incoming_msg_pipe(self, &res); attempt_recv(self, res); work_done = true; } else { /* nothing to do */ } if (work_done) { timeout = 0; } else if (timeout == 0) { timeout = MIN_DELAY; } else { timeout <<= 1; if (timeout > MAX_DELAY) { timeout = INFINITE_DELAY; } } } BUS_LOG(b, 3, LOG_LISTENER, "shutting down", b->udata); Loading src/lib/bus/sender.c +3 −15 Original line number Diff line number Diff line Loading @@ -36,9 +36,6 @@ #include "yacht.h" #include "sender_internal.h" #define MIN_DELAY 100 /* msec */ #define INFINITE_DELAY -1 /* poll will only return upon an event */ /* Offset for s->fds[0], which is the command pipe. */ #define CMD_FD (1) Loading Loading @@ -333,7 +330,6 @@ void *sender_mainloop(void *arg) { sender *self = (sender *)arg; assert(self); struct bus *b = self->bus; int delay = MIN_DELAY; struct timeval tv; gettimeofday(&tv, NULL); Loading @@ -341,7 +337,6 @@ void *sender_mainloop(void *arg) { BUS_LOG(b, 5, LOG_SENDER, "entering main loop", b->udata); while (!self->shutdown) { bool work = false; gettimeofday(&tv, NULL); // TODO: clock_gettime time_t cur_sec = tv.tv_sec; Loading @@ -358,7 +353,7 @@ void *sender_mainloop(void *arg) { * sense to use poll here -- self->active_fds will be small. */ BUS_LOG(b, 7, LOG_SENDER, "polling", b->udata); int res = poll(self->fds, self->active_fds + CMD_FD, delay); int res = poll(self->fds, self->active_fds + CMD_FD, -1); BUS_LOG_SNPRINTF(b, (res == 0 ? 6 : 4), LOG_SENDER, b->udata, 64, "poll res %d, active fds %d", res, self->active_fds); Loading @@ -373,7 +368,7 @@ void *sender_mainloop(void *arg) { } } else if (res > 0) { if (self->fds[0].revents & POLLIN) { work = check_incoming_commands(self); (void)check_incoming_commands(self); res--; } Loading @@ -383,14 +378,7 @@ void *sender_mainloop(void *arg) { if (res > 0) { attempt_write(self, res); } work = true; } if (work) { delay = MIN_DELAY; } else if (delay != INFINITE_DELAY) { delay <<= 1; if (delay > MAX_TIMEOUT) { delay = INFINITE_DELAY; } // work = true; } } Loading src/lib/threadpool/threadpool.c +4 −20 Original line number Diff line number Diff line Loading @@ -204,7 +204,7 @@ void threadpool_free(struct threadpool *t) { static void notify_new_task(struct threadpool *t) { for (int i = 0; i < t->live_threads; i++) { struct thread_info *ti = &t->threads[i]; if (ti->status == STATUS_ASLEEP) { if (ti->status == STATUS_ASLEEP || true) { ssize_t res = write(ti->parent_fd, NOTIFY_MSG, NOTIFY_MSG_LEN); if (2 == res) { Loading Loading @@ -290,26 +290,11 @@ static void *thread_task(void *arg) { size_t mask = t->task_ringbuf_mask; struct pollfd pfd[1] = { { .fd=ti->child_fd, .events=POLLIN }, }; uint8_t read_buf[NOTIFY_MSG_LEN]; size_t delay = MIN_DELAY; uint8_t read_buf[NOTIFY_MSG_LEN*32]; while (ti->status < STATUS_SHUTDOWN) { if (t->task_request_head == t->task_commit_head) { if (ti->status == STATUS_AWAKE) { if (delay > MIN_DELAY) { ti->status = STATUS_ASLEEP; } } else { if (delay == 0) { delay = MIN_DELAY; } else { delay <<= 1; } if ((size_t)delay > t->max_delay) { delay = INFINITE_DELAY; } } int res = poll(pfd, 1, delay); int res = poll(pfd, 1, -1); if (res == 1) { if (pfd[0].revents & (POLLHUP | POLLERR | POLLNVAL)) { /* TODO: HUP should be distinct from ERR -- hup is Loading @@ -318,9 +303,8 @@ static void *thread_task(void *arg) { break; } else if (pfd[0].revents & POLLIN) { if (ti->status == STATUS_ASLEEP) { ti->status = STATUS_AWAKE; } delay = MIN_DELAY; //SPIN_ADJ(t->active_threads, 1); ssize_t rres = read(ti->child_fd, read_buf, NOTIFY_MSG_LEN); ssize_t rres = read(ti->child_fd, read_buf, sizeof(read_buf)); if (rres < 0) { assert(0); } Loading Loading
src/lib/bus/listener.c +1 −18 Original line number Diff line number Diff line Loading @@ -291,8 +291,6 @@ void *listener_mainloop(void *arg) { listener *self = (listener *)arg; assert(self); struct bus *b = self->bus; int timeout = MIN_DELAY; struct timeval tv; gettimeofday(&tv, NULL); Loading @@ -304,7 +302,6 @@ void *listener_mainloop(void *arg) { * internal locking. */ while (!self->shutdown) { bool work_done = false; gettimeofday(&tv, NULL); // TODO: clock_gettime time_t cur_sec = tv.tv_sec; if (cur_sec != last_sec) { Loading @@ -312,7 +309,7 @@ void *listener_mainloop(void *arg) { last_sec = cur_sec; } int res = poll(self->fds, self->tracked_fds + INCOMING_MSG_PIPE, timeout); int res = poll(self->fds, self->tracked_fds + INCOMING_MSG_PIPE, -1); BUS_LOG_SNPRINTF(b, (res == 0 ? 6 : 4), LOG_LISTENER, b->udata, 64, "poll res %d", res); Loading @@ -322,8 +319,6 @@ void *listener_mainloop(void *arg) { msg_handler(self, msg); listener_msg *nmsg = casq_pop(self->q); msg = nmsg; timeout = 0; work_done = true; } if (res < 0) { Loading @@ -337,21 +332,9 @@ void *listener_mainloop(void *arg) { } else if (res > 0) { check_and_flush_incoming_msg_pipe(self, &res); attempt_recv(self, res); work_done = true; } else { /* nothing to do */ } if (work_done) { timeout = 0; } else if (timeout == 0) { timeout = MIN_DELAY; } else { timeout <<= 1; if (timeout > MAX_DELAY) { timeout = INFINITE_DELAY; } } } BUS_LOG(b, 3, LOG_LISTENER, "shutting down", b->udata); Loading
src/lib/bus/sender.c +3 −15 Original line number Diff line number Diff line Loading @@ -36,9 +36,6 @@ #include "yacht.h" #include "sender_internal.h" #define MIN_DELAY 100 /* msec */ #define INFINITE_DELAY -1 /* poll will only return upon an event */ /* Offset for s->fds[0], which is the command pipe. */ #define CMD_FD (1) Loading Loading @@ -333,7 +330,6 @@ void *sender_mainloop(void *arg) { sender *self = (sender *)arg; assert(self); struct bus *b = self->bus; int delay = MIN_DELAY; struct timeval tv; gettimeofday(&tv, NULL); Loading @@ -341,7 +337,6 @@ void *sender_mainloop(void *arg) { BUS_LOG(b, 5, LOG_SENDER, "entering main loop", b->udata); while (!self->shutdown) { bool work = false; gettimeofday(&tv, NULL); // TODO: clock_gettime time_t cur_sec = tv.tv_sec; Loading @@ -358,7 +353,7 @@ void *sender_mainloop(void *arg) { * sense to use poll here -- self->active_fds will be small. */ BUS_LOG(b, 7, LOG_SENDER, "polling", b->udata); int res = poll(self->fds, self->active_fds + CMD_FD, delay); int res = poll(self->fds, self->active_fds + CMD_FD, -1); BUS_LOG_SNPRINTF(b, (res == 0 ? 6 : 4), LOG_SENDER, b->udata, 64, "poll res %d, active fds %d", res, self->active_fds); Loading @@ -373,7 +368,7 @@ void *sender_mainloop(void *arg) { } } else if (res > 0) { if (self->fds[0].revents & POLLIN) { work = check_incoming_commands(self); (void)check_incoming_commands(self); res--; } Loading @@ -383,14 +378,7 @@ void *sender_mainloop(void *arg) { if (res > 0) { attempt_write(self, res); } work = true; } if (work) { delay = MIN_DELAY; } else if (delay != INFINITE_DELAY) { delay <<= 1; if (delay > MAX_TIMEOUT) { delay = INFINITE_DELAY; } // work = true; } } Loading
src/lib/threadpool/threadpool.c +4 −20 Original line number Diff line number Diff line Loading @@ -204,7 +204,7 @@ void threadpool_free(struct threadpool *t) { static void notify_new_task(struct threadpool *t) { for (int i = 0; i < t->live_threads; i++) { struct thread_info *ti = &t->threads[i]; if (ti->status == STATUS_ASLEEP) { if (ti->status == STATUS_ASLEEP || true) { ssize_t res = write(ti->parent_fd, NOTIFY_MSG, NOTIFY_MSG_LEN); if (2 == res) { Loading Loading @@ -290,26 +290,11 @@ static void *thread_task(void *arg) { size_t mask = t->task_ringbuf_mask; struct pollfd pfd[1] = { { .fd=ti->child_fd, .events=POLLIN }, }; uint8_t read_buf[NOTIFY_MSG_LEN]; size_t delay = MIN_DELAY; uint8_t read_buf[NOTIFY_MSG_LEN*32]; while (ti->status < STATUS_SHUTDOWN) { if (t->task_request_head == t->task_commit_head) { if (ti->status == STATUS_AWAKE) { if (delay > MIN_DELAY) { ti->status = STATUS_ASLEEP; } } else { if (delay == 0) { delay = MIN_DELAY; } else { delay <<= 1; } if ((size_t)delay > t->max_delay) { delay = INFINITE_DELAY; } } int res = poll(pfd, 1, delay); int res = poll(pfd, 1, -1); if (res == 1) { if (pfd[0].revents & (POLLHUP | POLLERR | POLLNVAL)) { /* TODO: HUP should be distinct from ERR -- hup is Loading @@ -318,9 +303,8 @@ static void *thread_task(void *arg) { break; } else if (pfd[0].revents & POLLIN) { if (ti->status == STATUS_ASLEEP) { ti->status = STATUS_AWAKE; } delay = MIN_DELAY; //SPIN_ADJ(t->active_threads, 1); ssize_t rres = read(ti->child_fd, read_buf, NOTIFY_MSG_LEN); ssize_t rres = read(ti->child_fd, read_buf, sizeof(read_buf)); if (rres < 0) { assert(0); } Loading