Commit 0064b683 authored by Scott Vokes's avatar Scott Vokes
Browse files

Switch to using shared read buffer for whole listener thread.

parent 03b7c932
Loading
Loading
Loading
Loading
+2 −9
Original line number Diff line number Diff line
@@ -21,8 +21,6 @@ static bool poll_on_completion(struct bus *b, int fd);
static int sender_id_of_socket(struct bus *b, int fd);
static int listener_id_of_socket(struct bus *b, int fd);

static int live_box_count = 0; // NOCOMMIT

static void set_defaults(bus_config *cfg) {
    if (cfg->sender_count == 0) { cfg->sender_count = 1; }
    if (cfg->listener_count == 0) { cfg->listener_count = 1; }
@@ -185,8 +183,6 @@ static boxed_msg *box_msg(struct bus *b, bus_user_msg *msg) {
    BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 64,
        "Allocated boxed message -- %p", box);

    live_box_count++;

    box->fd = msg->fd;
    assert(msg->fd != 0);
    box->out_seq_id = msg->seq_id;
@@ -247,7 +243,7 @@ static bool poll_on_completion(struct bus *b, int fd) {
            if (is_resumable_io_error(errno)) {
                errno = 0;
            } else {
                assert(false);  /* FIXME: classify error conditions */
                assert(false);
                break;
            }
        } else if (res > 0) {
@@ -271,7 +267,7 @@ static bool poll_on_completion(struct bus *b, int fd) {
                if (is_resumable_io_error(errno)) {
                    errno = 0;
                } else {
                    assert(false);  /* FIXME: classify error conditions */
                    assert(false);
                    break;
                }
            }
@@ -340,8 +336,6 @@ bool bus_register_socket(struct bus *b, int fd, void *udata) {

    ci->fd = fd;
    ci->to_read_size = 0;
    ci->read_buf_size = 0;      /* will be realloc'd on demand */
    ci->read_buf = NULL;
    ci->udata = udata;

    bool res = listener_add_socket(l, ci, pipe_in);
@@ -420,7 +414,6 @@ static void box_execute_cb(void *udata) {
    bus_msg_cb *cb = box->cb;

    free(box);
    live_box_count--;
    cb(&res, out_udata);
}

+1 −4
Original line number Diff line number Diff line
@@ -14,15 +14,12 @@ bool bus_init(bus_config *config, struct bus_result *res);

/* Send a request. Blocks until the request has been transmitted.
 *
 * TODO: liveness of msg
 * TODO: liveness of msg: copy or take ownership?
 * 
 * Assumes the FD has been registered with bus_register_socket;
 * sending to an unregistered socket is an error. */
bool bus_send_request(struct bus *b, bus_user_msg *msg);

/* FIXME: config callback for status msgs */


/* Get the string key for a log event ID. */
const char *bus_log_event_str(log_event_t event);

+4 −1
Original line number Diff line number Diff line
@@ -18,6 +18,7 @@

#define MAX_SOCKETS 1000
#define DEFAULT_BUF_SIZE (32 * 1024)
#define PRINT_RESPONSES 0

enum socket_state {
    STATE_UNINIT = 0,
@@ -153,7 +154,7 @@ static bus_unpack_cb_res_t unpack_cb(void *msg, void *socket_udata) {
    prot_header_t *header = (prot_header_t *)&si->buf[0];
    uint8_t *payload = (uint8_t *)&si->buf[sizeof(prot_header_t)];

#if 0
#if PRINT_RESPONSES
    for (int i = 0; i < si->used; i++) {
        if ((i & 15) == 0 && i > 0) { printf("\n"); }
        printf("%02x ", si->buf[i]);
@@ -340,6 +341,7 @@ static void completion_cb(bus_msg_result_t *res, void *udata) {
    switch (res->status) {
    case BUS_SEND_SUCCESS:
    {
#if 1
        /* CAS completion? or what? */
        size_t cur = s->completed_deliveries;
        for (;;) {
@@ -350,6 +352,7 @@ static void completion_cb(bus_msg_result_t *res, void *udata) {
                break;
            }
        }
#endif
    }
    break;
    case BUS_SEND_TX_TIMEOUT:
+0 −4
Original line number Diff line number Diff line
@@ -70,10 +70,6 @@ typedef struct {

    size_t to_read_size;

    /* Read buffer and it's size. Will be grown on demand. */
    size_t read_buf_size;
    uint8_t *read_buf;

    void *udata;                /* user connection data */
} connection_info;

+0 −27
Original line number Diff line number Diff line
#include <stdlib.h>
#include <pthread.h>

#include "casq.h"
#include "atomic.h"
@@ -14,24 +13,10 @@ struct casq {
    casq_link *accum;
    casq_link *free_links;
    int is_reversing;
    pthread_mutex_t lock;
};

#if 0
#define LOCK(X) pthread_mutex_lock(&X->lock)
#define UNLOCK(X) pthread_mutex_unlock(&X->lock)
#else
#define LOCK(X)
#define UNLOCK(X)
#endif

struct casq *casq_new(void) {
    struct casq *q = calloc(1, sizeof(*q));
    if (q) {
        if (0 != pthread_mutex_init(&q->lock, NULL)) {
            free(q);
        }
    }
    return q;
}

@@ -52,11 +37,8 @@ static casq_link *get_link(struct casq *q) {
}

bool casq_push(struct casq *q, void *data) {
    LOCK(q);

    casq_link *l = get_link(q);
    if (l == NULL) {
        UNLOCK(q);
        return false;
    }

@@ -66,7 +48,6 @@ bool casq_push(struct casq *q, void *data) {
        casq_link *cur_head = q->accum;
        l->next = cur_head;
        if (ATOMIC_BOOL_COMPARE_AND_SWAP(&q->accum, cur_head, l)) {
            UNLOCK(q);
            return true;
        }
    }
@@ -94,14 +75,11 @@ static void reverse(struct casq *q) {
}

bool casq_empty(struct casq *q) {
    LOCK(q);
    bool res = q->head == NULL && q->accum == NULL;
    UNLOCK(q);
    return res;
}

void *casq_pop(struct casq *q) {
    LOCK(q);
    for (;;) {
        if (q->head != NULL) {
            casq_link *l = NULL;
@@ -119,7 +97,6 @@ void *casq_pop(struct casq *q) {
                    break;
                }
            }
            UNLOCK(q);
            return res;
        } else if (q->accum != NULL) {
            /* If no more links are available, reverse the accumulated ones on demand. */
@@ -130,7 +107,6 @@ void *casq_pop(struct casq *q) {
                continue;
            }
        } else {                    /* empty */
            UNLOCK(q);
            return NULL;
        }
    }
@@ -147,7 +123,6 @@ static void free_ll(casq_link *l, casq_free_cb *cb, void *udata) {
}

void casq_free(struct casq *q, casq_free_cb *cb, void *udata) {
    LOCK(q);
    casq_link *head = NULL;
    casq_link *accum = NULL;
    casq_link *free_links = NULL;
@@ -171,8 +146,6 @@ void casq_free(struct casq *q, casq_free_cb *cb, void *udata) {
    free_ll(accum, cb, udata);
    free_ll(free_links, cb, udata);

    UNLOCK(q);
    pthread_mutex_destroy(&q->lock);
    free(q);
}
Loading