Commit a605a120 authored by Scott Vokes's avatar Scott Vokes
Browse files

Switch listener to same incoming command handling as sender.

Rather than using a thread-safe queue, threads sending messages to
listeners atomically reserve a message struct, write a command into
it, and commit the command by writing the message ID into the listener
thread's incoming command pipe. It polls on network IO and the command
pipe, so commands wake it up. It reads the ID, processes the command
written into l->msgs[ID], and atomically releases the message struct.
parent e0e37e04
Loading
Loading
Loading
Loading
+0 −1
Original line number Diff line number Diff line
@@ -82,7 +82,6 @@ LIB_OBJS = \
	$(OUT_DIR)/threadpool.o \
	$(OUT_DIR)/bus.o \
	$(OUT_DIR)/bus_ssl.o \
	$(OUT_DIR)/casq.o \
	$(OUT_DIR)/listener.o \
	$(OUT_DIR)/sender.o \
	$(OUT_DIR)/util.o \
+2 −7
Original line number Diff line number Diff line
@@ -10,7 +10,6 @@ LDFLAGS += -L. -L${LIB_PATH} -L${THREADPOOL_PATH} -L${OPENSSL_PATH}/lib -L${SOCK
BUS_OBJS = \
	bus.o \
	bus_ssl.o \
	casq.o \
	listener.o \
	sender.o \
	util.o \
@@ -20,10 +19,9 @@ ECHOSRV_OBJS = \
	echosrv.o \
	util.o \

all: bus.png test_casq test_yacht echosrv bus_example
all: bus.png test_yacht echosrv bus_example

test: test_casq test_yacht
	./test_casq
	./test_yacht

%.png: %.dot
@@ -32,9 +30,6 @@ test: test_casq test_yacht
libbus.a: ${BUS_OBJS}
	ar -rcs $@ $^

test_casq: test_casq.o casq.o
	${CC} -o $@ $^ ${LDFLAGS}

test_yacht: test_yacht.o yacht.o
	${CC} -o $@ $^ ${LDFLAGS}

@@ -45,7 +40,7 @@ bus_example: bus_example.o libbus.a
	${CC} -o $@ $^ ${LDFLAGS} -lbus -lthreadpool

clean:
	rm -f *.a *.o test_casq echosrv bus_example
	rm -f *.a *.o echosrv bus_example

tags: TAGS

src/lib/bus/casq.c

deleted100644 → 0
+0 −171
Original line number Diff line number Diff line
/*
* kinetic-c-client
* Copyright (C) 2014 Seagate Technology.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
*/
#include <stdlib.h>

#include "casq.h"
#include "atomic.h"

typedef struct casq_link {
    void *data;
    struct casq_link *next;
} casq_link;

struct casq {
    casq_link *head;
    casq_link *accum;
    casq_link *free_links;
    int is_reversing;
};

struct casq *casq_new(void) {
    struct casq *q = calloc(1, sizeof(*q));
    return q;
}

static casq_link *get_link(struct casq *q) {
    for (;;) {
        if (q->free_links == NULL) {
            casq_link *nl = calloc(1, sizeof(*nl));
            return nl;

        } else {
            casq_link *n = q->free_links;
            if (ATOMIC_BOOL_COMPARE_AND_SWAP(&q->free_links, n, n->next)) {
                n->next = NULL;
                return n;
            }
        }
    }
}

bool casq_push(struct casq *q, void *data) {
    casq_link *l = get_link(q);
    if (l == NULL) {
        return false;
    }

    l->data = data;

    for (;;) {                  /* spin, push */
        casq_link *cur_head = q->accum;
        l->next = cur_head;
        if (ATOMIC_BOOL_COMPARE_AND_SWAP(&q->accum, cur_head, l)) {
            return true;
        }
    }
}

static void reverse(struct casq *q) {
    casq_link *to_reverse = NULL;
    for (;;) {                  /* spin, unlink */
        to_reverse = q->accum;
        if (q->accum == NULL) { return; }
        if (ATOMIC_BOOL_COMPARE_AND_SWAP(&q->accum, to_reverse, NULL)) {
            break;
        }
    }
    
    casq_link *new_head = q->head;
    casq_link *next = NULL;

    for (casq_link *cur = to_reverse; cur; cur = next) {
        next = cur->next;
        cur->next = new_head;
        new_head = cur;
    }
    
    q->head = new_head;
}

bool casq_empty(struct casq *q) {
    bool res = q->head == NULL && q->accum == NULL;
    return res;
}

void *casq_pop(struct casq *q) {
    for (;;) {
        if (q->head != NULL) {
            casq_link *l = NULL;
            for (;;) {              /* spin, pop */
                l = q->head;
                if (ATOMIC_BOOL_COMPARE_AND_SWAP(&q->head, l, l->next)) {
                    break;
                }            
            }
            void *res = l->data;
            l->data = NULL;
            for (;;) {              /* spin, push */
                l->next = q->free_links;
                if (ATOMIC_BOOL_COMPARE_AND_SWAP(&q->free_links, l->next, l)) {
                    break;
                }
            }
            return res;
        } else if (q->accum != NULL) {
            /* If no more links are available, reverse the accumulated ones on demand. */
            if (ATOMIC_BOOL_COMPARE_AND_SWAP(&q->is_reversing, 0, 1)) { /* spin, lock */
                reverse(q);
                q->is_reversing = 0;
            } else {
                continue;
            }
        } else {                    /* empty */
            return NULL;
        }
    }
}

static void free_ll(casq_link *l, casq_free_cb *cb, void *udata) {
    casq_link *cur = NULL;
    casq_link *next = NULL;
    for (cur = l; cur; cur = next) {
        next = cur->next;
        if (cur->data) { cb(cur->data, udata); }
        free(cur);
    }
}

void casq_free(struct casq *q, casq_free_cb *cb, void *udata) {
    casq_link *head = NULL;
    casq_link *accum = NULL;
    casq_link *free_links = NULL;

    for (;;) {
        head = q->head;
        if (ATOMIC_BOOL_COMPARE_AND_SWAP(&q->head, head, NULL)) { break; }
    }

    for (;;) {
        accum = q->accum;
        if (ATOMIC_BOOL_COMPARE_AND_SWAP(&q->accum, accum, NULL)) { break; }
    }

    for (;;) {
        free_links = q->free_links;
        if (ATOMIC_BOOL_COMPARE_AND_SWAP(&q->free_links, free_links, NULL)) { break; }
    }

    free_ll(head, cb, udata);
    free_ll(accum, cb, udata);
    free_ll(free_links, cb, udata);

    free(q);
}

src/lib/bus/casq.h

deleted100644 → 0
+0 −38
Original line number Diff line number Diff line
/*
* kinetic-c-client
* Copyright (C) 2014 Seagate Technology.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
*/
#ifndef CASQ_H
#define CASQ_H

#include <stdbool.h>

/* Atomic queue. */
struct casq;

struct casq *casq_new(void);

bool casq_push(struct casq *q, void *data);
void *casq_pop(struct casq *q);
bool casq_empty(struct casq *q);

typedef void (casq_free_cb)(void *data, void *udata);

void casq_free(struct casq *q, casq_free_cb *cb, void *udata);

#endif
+34 −55
Original line number Diff line number Diff line
@@ -53,15 +53,6 @@ struct listener *listener_init(struct bus *b, struct bus_config *cfg) {
        return NULL;
    }

    struct casq *q = casq_new();
    if (q == NULL) {
        free(l);
        close(pipes[0]);
        close(pipes[1]);
        return NULL;
    }
    l->q = q;

    l->commit_pipe = pipes[1];
    l->incoming_msg_pipe = pipes[0];
    l->fds[INCOMING_MSG_PIPE_ID].fd = l->incoming_msg_pipe;
@@ -226,27 +217,10 @@ bool listener_shutdown(struct listener *l) {
    return push_message(l, msg);
}

static void free_queue_cb(void *data, void *udata) {
    listener_msg *msg = (listener_msg *)data;
    switch (msg->type) {
    case MSG_ADD_SOCKET:
        if (msg->u.add_socket.info) { free_ci(msg->u.add_socket.info); }
        break;
    case MSG_EXPECT_RESPONSE:
        if (msg->u.expect.box) { free(msg->u.expect.box); }
        break;
    default:
        break;
    }
    (void)udata;
}

void listener_free(struct listener *l) {
    struct bus *b = l->bus;
    /* assert: pthread must be join'd. */
    if (l) {
        casq_free(l->q, free_queue_cb, l);

        for (int i = 0; i < MAX_PENDING_MESSAGES; i++) {
            rx_info_t *info = &l->rx_info[i];

@@ -268,6 +242,20 @@ void listener_free(struct listener *l) {
            }
        }

        for (int i = 0; i < MAX_QUEUE_MESSAGES; i++) {
            listener_msg *msg = &l->msgs[i];
            switch (msg->type) {
            case MSG_ADD_SOCKET:
                if (msg->u.add_socket.info) { free_ci(msg->u.add_socket.info); }
                break;
            case MSG_EXPECT_RESPONSE:
                if (msg->u.expect.box) { free(msg->u.expect.box); }
                break;
            default:
                break;
            }
        }

        while (l->tracked_fds > 0) {
            /* Remove off the front to stress remove_socket. */
            remove_socket(l, l->fds[0 + INCOMING_MSG_PIPE].fd);
@@ -297,9 +285,11 @@ void *listener_mainloop(void *arg) {
    time_t last_sec = tv.tv_sec;

    /* The listener thread has full control over its execution -- the
     * only thing other threads can do is put messages into its
     * thread-safe queue to be processed, so it doesn't need any
     * internal locking. */
     * only thing other threads can do is reserve messages from l->msgs,
     * write commands into them, and then commit them by writing their
     * msg->id into the incoming command ID pipe. All cross-thread
     * communication is managed at the command interface, so it doesn't
     * need any internal locking. */

    while (!self->shutdown) {
        gettimeofday(&tv, NULL);  // TODO: clock_gettime
@@ -313,14 +303,6 @@ void *listener_mainloop(void *arg) {
        BUS_LOG_SNPRINTF(b, (res == 0 ? 6 : 4), LOG_LISTENER, b->udata, 64,
            "poll res %d", res);

        /* Pop queue for incoming events, if able to handle them. */
        listener_msg *msg = casq_pop(self->q);
        while (msg && self->rx_info_in_use < MAX_PENDING_MESSAGES) {
            msg_handler(self, msg);
            listener_msg *nmsg = casq_pop(self->q);
            msg = nmsg;
        }

        if (res < 0) {
            if (util_is_resumable_io_error(errno)) {
                errno = 0;
@@ -363,7 +345,11 @@ static void check_and_flush_incoming_msg_pipe(listener *l, int *res) {
                    break;
                }
            } else {
                /* no-op, msg is unused */
                for (ssize_t i = 0; i < rd; i++) {
                    uint8_t msg_id = buf[i];
                    listener_msg *msg = &l->msgs[msg_id];
                    msg_handler(l, msg);
                }
                (*res)--;
                break;
            }
@@ -1072,31 +1058,24 @@ static bool push_message(struct listener *l, listener_msg *msg) {
    struct bus *b = l->bus;
    BUS_ASSERT(b, b->udata, msg);
  
    if (casq_push(l->q, msg)) {
retry:
        BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 128,
            "Pushed message -- %p -- of type %d", (void*)msg, msg->type);
        ssize_t wr = write(l->commit_pipe, "!", 1);
        if (wr == 1) {
            return true;
    uint8_t msg_buf[sizeof(msg->id)];
    msg_buf[0] = msg->id;

    for (;;) {
        ssize_t wr = write(l->commit_pipe, msg_buf, sizeof(msg_buf));
        if (wr == sizeof(msg_buf)) {
            return true;  // committed
        } else {
            BUS_ASSERT(b, b->udata, wr == -1);
            if (errno == EINTR) {
            if (errno == EINTR) { /* signal interrupted; retry */
                errno = 0;
                goto retry;
            } else {
                BUS_LOG_SNPRINTF(b, 10, LOG_LISTENER, b->udata, 64,
                    "write_commit errno %d", errno);
                    "write_commit error, errno %d", errno);
                errno = 0;
                release_msg(l, msg);
                return false;
            }
        }
    } else {
        BUS_LOG_SNPRINTF(b, 3 - 3, LOG_LISTENER, b->udata, 128,
            "Failed to pushed message -- %p", (void*)msg);
        BUS_ASSERT(b, b->udata, false);
        release_msg(l, msg);
        return false;
    }
}

Loading