Commit 2ad9637c authored by Scott Vokes's avatar Scott Vokes
Browse files

Merge branch 'master' of github.com:Seagate/kinetic-c

parents 14ff108b 0f3366ea
Loading
Loading
Loading
Loading
+1 −18
Original line number Diff line number Diff line
@@ -291,8 +291,6 @@ void *listener_mainloop(void *arg) {
    listener *self = (listener *)arg;
    assert(self);
    struct bus *b = self->bus;
    int timeout = MIN_DELAY;

    struct timeval tv;
    
    gettimeofday(&tv, NULL);
@@ -304,7 +302,6 @@ void *listener_mainloop(void *arg) {
     * internal locking. */

    while (!self->shutdown) {
        bool work_done = false;
        gettimeofday(&tv, NULL);  // TODO: clock_gettime
        time_t cur_sec = tv.tv_sec;
        if (cur_sec != last_sec) {
@@ -312,7 +309,7 @@ void *listener_mainloop(void *arg) {
            last_sec = cur_sec;
        }

        int res = poll(self->fds, self->tracked_fds + INCOMING_MSG_PIPE, timeout);
        int res = poll(self->fds, self->tracked_fds + INCOMING_MSG_PIPE, -1);
        BUS_LOG_SNPRINTF(b, (res == 0 ? 6 : 4), LOG_LISTENER, b->udata, 64,
            "poll res %d", res);

@@ -322,8 +319,6 @@ void *listener_mainloop(void *arg) {
            msg_handler(self, msg);
            listener_msg *nmsg = casq_pop(self->q);
            msg = nmsg;
            timeout = 0;
            work_done = true;
        }

        if (res < 0) {
@@ -337,21 +332,9 @@ void *listener_mainloop(void *arg) {
        } else if (res > 0) {
            check_and_flush_incoming_msg_pipe(self, &res);
            attempt_recv(self, res);
            work_done = true;
        } else {
            /* nothing to do */
        }

        if (work_done) {
            timeout = 0;
        } else if (timeout == 0) {
            timeout = MIN_DELAY;
        } else {
            timeout <<= 1;
            if (timeout > MAX_DELAY) {
                timeout = INFINITE_DELAY;
            }
        }
    }

    BUS_LOG(b, 3, LOG_LISTENER, "shutting down", b->udata);
+3 −15
Original line number Diff line number Diff line
@@ -36,9 +36,6 @@
#include "yacht.h"
#include "sender_internal.h"

#define MIN_DELAY 100 /* msec */
#define INFINITE_DELAY -1  /* poll will only return upon an event */

/* Offset for s->fds[0], which is the command pipe. */
#define CMD_FD (1)

@@ -333,7 +330,6 @@ void *sender_mainloop(void *arg) {
    sender *self = (sender *)arg;
    assert(self);
    struct bus *b = self->bus;
    int delay = MIN_DELAY;
    
    struct timeval tv;
    gettimeofday(&tv, NULL);
@@ -341,7 +337,6 @@ void *sender_mainloop(void *arg) {
    
    BUS_LOG(b, 5, LOG_SENDER, "entering main loop", b->udata);
    while (!self->shutdown) {
        bool work = false;
        
        gettimeofday(&tv, NULL);  // TODO: clock_gettime
        time_t cur_sec = tv.tv_sec;
@@ -358,7 +353,7 @@ void *sender_mainloop(void *arg) {
         *     sense to use poll here -- self->active_fds will be small. */
        BUS_LOG(b, 7, LOG_SENDER, "polling", b->udata);
        
        int res = poll(self->fds, self->active_fds + CMD_FD, delay);
        int res = poll(self->fds, self->active_fds + CMD_FD, -1);

        BUS_LOG_SNPRINTF(b, (res == 0 ? 6 : 4), LOG_SENDER, b->udata, 64,
            "poll res %d, active fds %d", res, self->active_fds);
@@ -373,7 +368,7 @@ void *sender_mainloop(void *arg) {
            }
        } else if (res > 0) {
            if (self->fds[0].revents & POLLIN) {
                work = check_incoming_commands(self);
                (void)check_incoming_commands(self);
                res--;
            }

@@ -383,14 +378,7 @@ void *sender_mainloop(void *arg) {
            if (res > 0) {
                attempt_write(self, res);
            }
            work = true;
        }
        
        if (work) {
            delay = MIN_DELAY;
        } else if (delay != INFINITE_DELAY) {
            delay <<= 1;
            if (delay > MAX_TIMEOUT) { delay = INFINITE_DELAY; }
            // work = true;
        }
    }
    
+4 −20
Original line number Diff line number Diff line
@@ -204,7 +204,7 @@ void threadpool_free(struct threadpool *t) {
static void notify_new_task(struct threadpool *t) {
    for (int i = 0; i < t->live_threads; i++) {
        struct thread_info *ti = &t->threads[i];
        if (ti->status == STATUS_ASLEEP) {
        if (ti->status == STATUS_ASLEEP || true) {
            ssize_t res = write(ti->parent_fd,
                NOTIFY_MSG, NOTIFY_MSG_LEN);
            if (2 == res) {
@@ -290,26 +290,11 @@ static void *thread_task(void *arg) {

    size_t mask = t->task_ringbuf_mask;
    struct pollfd pfd[1] = { { .fd=ti->child_fd, .events=POLLIN }, };
    uint8_t read_buf[NOTIFY_MSG_LEN];
    size_t delay = MIN_DELAY;
    uint8_t read_buf[NOTIFY_MSG_LEN*32];

    while (ti->status < STATUS_SHUTDOWN) {
        if (t->task_request_head == t->task_commit_head) {

            if (ti->status == STATUS_AWAKE) {
                if (delay > MIN_DELAY) { ti->status = STATUS_ASLEEP; }
            } else {
                if (delay == 0) {
                    delay = MIN_DELAY;
                } else {
                    delay <<= 1;
                }
                if ((size_t)delay > t->max_delay) {
                    delay = INFINITE_DELAY;
                }
            }

            int res = poll(pfd, 1, delay);
            int res = poll(pfd, 1, -1);
            if (res == 1) {
                if (pfd[0].revents & (POLLHUP | POLLERR | POLLNVAL)) {
                    /* TODO: HUP should be distinct from ERR -- hup is
@@ -318,9 +303,8 @@ static void *thread_task(void *arg) {
                    break;
                } else if (pfd[0].revents & POLLIN) {
                    if (ti->status == STATUS_ASLEEP) { ti->status = STATUS_AWAKE; }
                    delay = MIN_DELAY;
                    //SPIN_ADJ(t->active_threads, 1);
                    ssize_t rres = read(ti->child_fd, read_buf, NOTIFY_MSG_LEN);
                    ssize_t rres = read(ti->child_fd, read_buf, sizeof(read_buf));
                    if (rres < 0) {
                        assert(0);
                    }