Commit 9891837b authored by Scott Vokes's avatar Scott Vokes
Browse files

Fix buffering for large messages in echo server. (for testing throughput)

parent 43521efa
Loading
Loading
Loading
Loading
+26 −16
Original line number Diff line number Diff line
@@ -35,8 +35,16 @@
#include "atomic.h"
#include "socket99.h"

typedef struct {
    uint32_t magic_number;
    uint32_t size;
    int64_t seq_id;
} prot_header_t;

#define MAGIC_NUMBER 3

#define MAX_SOCKETS 1000
#define DEFAULT_BUF_SIZE (32 * 1024)
#define DEFAULT_BUF_SIZE (1024 * 1024 + sizeof(prot_header_t))
#define PRINT_RESPONSES 0

enum socket_state {
@@ -86,14 +94,6 @@ static void log_cb(log_event_t event, int log_level, const char *msg, void *udat
#define LOG(VERBOSITY, ...)                     \
    do { if (state.verbosity >= VERBOSITY) { printf(__VA_ARGS__); } } while(0)

typedef struct {
    uint32_t magic_number;
    uint32_t size;
    int64_t seq_id;
} prot_header_t;

#define MAGIC_NUMBER 3

static const char *executable_name = NULL;

static bus_sink_cb_res_t reset_transfer(socket_info *si) {
@@ -126,6 +126,7 @@ static bus_sink_cb_res_t sink_cb(uint8_t *read_buf,
        if (read_size != sizeof(prot_header_t)) {
            valid_header = false;
        } else if (header->magic_number != MAGIC_NUMBER) {
            printf("INVALID HEADER B: magic number 0x%08x\n", header->magic_number);
            valid_header = false;
        }

@@ -133,7 +134,7 @@ static bus_sink_cb_res_t sink_cb(uint8_t *read_buf,
            uint8_t *buf = si->buf;
            prot_header_t *header = (prot_header_t *)read_buf;
            si->cur_payload_size = header->size;
            memcpy(buf, read_buf, sizeof(prot_header_t));
            memcpy(buf, header, sizeof(*header));
            si->used = sizeof(*header);

            bus_sink_cb_res_t res = {
@@ -149,17 +150,24 @@ static bus_sink_cb_res_t sink_cb(uint8_t *read_buf,
    }
    case STATE_AWAITING_BODY:
    {
        if (read_size == si->cur_payload_size) {
        assert(DEFAULT_BUF_SIZE - si->used >= read_size);
        memcpy(&si->buf[si->used], read_buf, read_size);
        si->used += read_size;
        size_t rem = si->cur_payload_size + sizeof(prot_header_t) - si->used;

        if (rem == 0) {
            bus_sink_cb_res_t res = {
                .next_read = sizeof(prot_header_t),
                .full_msg_buffer = read_buf,
            };
            memcpy(&si->buf[si->used], read_buf, read_size);
            si->used += read_size;
            si->state = STATE_AWAITING_HEADER;
            si->used = 0;
            return res;
        } else {
            return reset_transfer(si);
            bus_sink_cb_res_t res = {
                .next_read = rem,
            };
            return res;
        }
    }
    default:
@@ -364,7 +372,7 @@ static void completion_cb(bus_msg_result_t *res, void *udata) {
        size_t cur = s->completed_deliveries;
        for (;;) {
            if (ATOMIC_BOOL_COMPARE_AND_SWAP(&s->completed_deliveries, cur, cur + 1)) {
                LOG(4, " -- ! got %zd bytes, seq_id 0x%08llx, %p\n",
                LOG(3, " -- ! got %zd bytes, seq_id 0x%08llx, %p\n",
                    si->cur_payload_size, res->u.response.seq_id,
                    res->u.response.opaque_msg);
                break;
@@ -430,13 +438,15 @@ static void run_bus(example_state *s, struct bus *b) {
            tick_handler(s);
            s->last_second = cur_second;
            payload_size = 8;
            should_send = true;
        } else {
            should_send = true;
        }

        if (should_send) {
            should_send = false;
            size_t msg_size = construct_msg(msg_buf, buf_size, 10*payload_size, seq_id);
            size_t msg_size = construct_msg(msg_buf, buf_size,
                10 * payload_size /* * 1024L*/, seq_id);
            LOG(3, " @@ sending message with %zd bytes\n", msg_size);
            bus_user_msg msg = {
                .fd = s->sockets[cur_socket_i],
+8 −5
Original line number Diff line number Diff line
@@ -36,8 +36,8 @@
/* TODO: Linux => epoll */
#include <poll.h>

#define BUF_SZ (16 * 1024)
#define MAX_CLIENTS 100
#define BUF_SZ (2 * 1024L * 1024)
#define MAX_CLIENTS 10

#define NO_CLIENT ((int)-1)

@@ -56,7 +56,7 @@ typedef struct {
    int fd;
    size_t out_bytes;
    size_t written_bytes;
    uint8_t buf[BUF_SZ];
    uint8_t buf[2*BUF_SZ];
} out_buf;

typedef struct {
@@ -397,8 +397,11 @@ static void enqueue_write(config *cfg, int fd,
            LOG(2, "%ld -- enqueing write of %zd bytes\n",
                cfg->last_second, write_size);

            memcpy(out->buf, buf, write_size);
            out->out_bytes = write_size;
            size_t free_space = BUF_SZ - out->out_bytes;
            assert(free_space >= write_size);
            memcpy(&out->buf[out->out_bytes], buf, write_size);
            out->out_bytes += write_size;

            cfg->client_fds[i].events = POLLOUT;   /* write only */
            return;
        }
+3 −0
Original line number Diff line number Diff line
@@ -369,6 +369,9 @@ static bool socket_read_plain(struct bus *b, listener *l, int pfd_i, connection_
    }
    
    if (size > 0) {
        BUS_LOG_SNPRINTF(b, 5, LOG_LISTENER, b->udata, 64,
            "read: %zd", size);
        
        return sink_socket_read(b, l, ci, size);
    } else {
        return false;
+3 −1
Original line number Diff line number Diff line
@@ -495,6 +495,8 @@ static ssize_t socket_write_plain(sender *s, tx_info_t *info) {
        }
    } else {
        update_sent(b, s, info, wrsz);
        BUS_LOG_SNPRINTF(b, 5, LOG_SENDER, b->udata, 64,
                "sent: %zd\n", wrsz);
        return wrsz;
    }
}
@@ -631,9 +633,9 @@ static void attempt_to_enqueue_message_to_listener(sender *s, tx_info_t *info) {

    uint16_t backpressure = 0;
    if (listener_expect_response(l, box, &backpressure)) {
        box->out_msg = NULL;    /* release value, pointer will be stale after returning */
        write_backpressure(s, info, backpressure);   /* alert blocked client thread */
        BUS_LOG_SNPRINTF(b, 8, LOG_SENDER, b->udata, 128, "release_tx_info %d", __LINE__);
        box->out_msg = NULL;    /* release value, pointer will be stale after returning */
        release_tx_info(s, info);
    } else {
        BUS_LOG(b, 2, LOG_SENDER, "failed delivery", b->udata);