Commit 491deb21 authored by Scott Vokes's avatar Scott Vokes
Browse files

Mocked-out IO tests for listener.

parent ff34c8a6
Loading
Loading
Loading
Loading
+0 −1
Original line number Diff line number Diff line
@@ -387,7 +387,6 @@ bool bus_register_socket(struct bus *b, bus_socket_t type, int fd, void *udata)
    *(bus_socket_t *)&ci->type = type;
    ci->ssl = ssl;
    ci->udata = udata;
    ci->largest_rd_seq_id_seen = BUS_NO_SEQ_ID;
    ci->largest_wr_seq_id_seen = BUS_NO_SEQ_ID;

    #ifndef TEST
+0 −1
Original line number Diff line number Diff line
@@ -124,7 +124,6 @@ typedef struct {
    /* Set by listener thread */
    rx_error_t error;
    size_t to_read_size;
    int64_t largest_rd_seq_id_seen;
} connection_info;

/* Arbitrary byte used to tag writes from the listener. */
+45 −0
Original line number Diff line number Diff line
@@ -126,3 +126,48 @@ rx_info_t *listener_helper_get_hold_rx_info(listener *l, int fd, int64_t seq_id)
    }
    return NULL;
}

rx_info_t *listener_helper_find_info_by_sequence_id(listener *l,
        int fd, int64_t seq_id) {
    struct bus *b = l->bus;    
    for (int i = 0; i <= l->rx_info_max_used; i++) {
        rx_info_t *info = &l->rx_info[i];

        switch (info->state) {
        case RIS_INACTIVE:
            break;            /* skip */
        case RIS_HOLD:
            BUS_LOG_SNPRINTF(b, 4, LOG_MEMORY, b->udata, 128,
                "find_info_by_sequence_id: info (%p) at +%d: <fd:%d, seq_id:%lld>",
                (void*)info, info->id, fd, (long long)seq_id);
            if (info->u.hold.fd == fd && info->u.hold.seq_id == seq_id) {
                return info;
            }
            break;
        case RIS_EXPECT:
        {
            struct boxed_msg *box = info->u.expect.box;
            BUS_LOG_SNPRINTF(b, 4, LOG_MEMORY, b->udata, 128,
                "find_info_by_sequence_id: info (%p) at +%d [s %d]: box is %p",
                (void*)info, info->id, info->u.expect.error, (void*)box);
            if (box != NULL && box->out_seq_id == seq_id && box->fd == fd) {
                return info;
            }
            break;
        }
        default:
            BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 64,
                "match fail %d on line %d", info->state, __LINE__);
            BUS_ASSERT(b, b->udata, false);
        }
    }

    if (b->log_level > 5 || 0) {
        BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 64,
            "==== Could not find <fd:%d, seq_id:%lld>, dumping table ====\n", 
            fd, (long long)seq_id);
        ListenerTask_DumpRXInfoTable(l);
    }
    /* Not found. Probably an unsolicited status message. */
    return NULL;
}
+2 −1
Original line number Diff line number Diff line
@@ -29,6 +29,7 @@ bool listener_helper_push_message(struct listener *l, listener_msg *msg, int *re

rx_info_t *listener_helper_get_free_rx_info(listener *l);
rx_info_t *listener_helper_get_hold_rx_info(listener *l, int fd, int64_t seq_id);

rx_info_t *listener_helper_find_info_by_sequence_id(listener *l,
    int fd, int64_t seq_id);

#endif
+40 −73
Original line number Diff line number Diff line
@@ -19,6 +19,7 @@
*/
#include "listener_io.h"
#include "listener_io_internal.h"
#include "listener_helper.h"

#include <unistd.h>
#include <assert.h>
@@ -27,6 +28,8 @@
#include "syscall.h"
#include "util.h"

#define YELP(...) //fprintf(stderr, __VA_ARGS__)

void ListenerIO_AttemptRecv(listener *l, int available) {
    /*   --> failure --> set 'closed' error on socket, don't die */
    struct bus *b = l->bus;
@@ -43,6 +46,8 @@ void ListenerIO_AttemptRecv(listener *l, int available) {
         * (POLLHUP | POLLERR | POLLNVAL), so if we get a status message
         * with a reason for a hangup we can still pass it along. */

        YELP("checking l->fds[%d]: revents 0x%04x\n", i + 1, l->fds[i + 1].revents); 

        if (fd->revents & (POLLERR | POLLNVAL)) {
            read_from++;
            BUS_LOG(b, 2, LOG_LISTENER,
@@ -75,10 +80,13 @@ void ListenerIO_AttemptRecv(listener *l, int available) {
}
    
static bool socket_read_plain(struct bus *b, listener *l, int pfd_i, connection_info *ci) {
    for (;;) {
        ssize_t size = syscall_read(ci->fd, l->read_buf, ci->to_read_size);
        YELP("read %zd on fd %d, to_read_size %zd\n", size, ci->fd, ci->to_read_size);
        if (size == -1) {
            if (util_is_resumable_io_error(errno)) {
                errno = 0;
                continue;
            } else {
                BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64,
                    "read: socket error reading, %d", errno);
@@ -90,12 +98,12 @@ static bool socket_read_plain(struct bus *b, listener *l, int pfd_i, connection_
        if (size > 0) {
            BUS_LOG_SNPRINTF(b, 5, LOG_LISTENER, b->udata, 64,
                "read: %zd", size);
        
            return sink_socket_read(b, l, ci, size);
        } else {
            return false;
        }
    }
}

static void print_SSL_error(struct bus *b, connection_info *ci, int lvl, const char *prefix) {
    unsigned long errval = ERR_get_error();
@@ -112,7 +120,7 @@ static void print_SSL_error(struct bus *b, connection_info *ci, int lvl, const c

static bool socket_read_ssl(struct bus *b, listener *l, int pfd_i, connection_info *ci) {
    BUS_ASSERT(b, b->udata, ci->ssl);
    for (;;) {
    while (ci->to_read_size > 0) {
        // ssize_t pending = SSL_pending(ci->ssl);
        ssize_t size = (ssize_t)syscall_SSL_read(ci->ssl, l->read_buf, ci->to_read_size);
        
@@ -157,6 +165,7 @@ static bool socket_read_ssl(struct bus *b, listener *l, int pfd_i, connection_in
            }
        } else if (size > 0) {
            sink_socket_read(b, l, ci, size);
            if ((size_t)size == ci->to_read_size) { break; }
        }
    }
    return true;
@@ -180,9 +189,11 @@ static bool sink_socket_read(struct bus *b,
    bus_unlock_log(b);
#endif
    
    YELP("sinking read, %zd bytes\n", size);
    bus_sink_cb_res_t sres = b->sink_cb(l->read_buf, size, ci->udata);
    if (sres.full_msg_buffer) {
        BUS_LOG(b, 3, LOG_LISTENER, "calling unpack CB", b->udata);
        YELP("calling unpack_cb\n");
        bus_unpack_cb_res_t ures = b->unpack_cb(sres.full_msg_buffer, ci->udata);
        BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64,
            "process_unpacked_message: ok? %d, seq_id:%lld",
@@ -196,6 +207,7 @@ static bool sink_socket_read(struct bus *b,
        "expecting next read to have %zd bytes", ci->to_read_size);
    
    /* Grow read buffer if necessary. */
    YELP("grow buffer? %zd => %zd\n", ci->to_read_size, l->read_buf_size);
    if (ci->to_read_size > l->read_buf_size) {
        if (!ListenerTask_GrowReadBuf(l, ci->to_read_size)) {
            BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128,
@@ -212,6 +224,7 @@ static void set_error_for_socket(listener *l, int id, int fd, rx_error_t err) {
    struct bus *b = l->bus;
    BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64,
        "set_error_for_socket %d, err %d", fd, err);
    YELP("set_error_for_socket: fd %d, error %d\n", fd, err);

    for (int i = 0; i <= l->rx_info_max_used; i++) {
        rx_info_t *info = &l->rx_info[i];
@@ -242,73 +255,27 @@ static void set_error_for_socket(listener *l, int id, int fd, rx_error_t err) {
    l->fds[id + INCOMING_MSG_PIPE].events &= ~POLLIN;
}


static rx_info_t *find_info_by_sequence_id(listener *l,
        int fd, int64_t seq_id) {
    struct bus *b = l->bus;    
    for (int i = 0; i <= l->rx_info_max_used; i++) {
        rx_info_t *info = &l->rx_info[i];

        switch (info->state) {
        case RIS_INACTIVE:
            break;            /* skip */
        case RIS_HOLD:
            BUS_LOG_SNPRINTF(b, 4, LOG_MEMORY, b->udata, 128,
                "find_info_by_sequence_id: info (%p) at +%d: <fd:%d, seq_id:%lld>",
                (void*)info, info->id, fd, (long long)seq_id);
            if (info->u.hold.fd == fd && info->u.hold.seq_id == seq_id) {
                return info;
            }
            break;
        case RIS_EXPECT:
        {
            struct boxed_msg *box = info->u.expect.box;
            BUS_LOG_SNPRINTF(b, 4, LOG_MEMORY, b->udata, 128,
                "find_info_by_sequence_id: info (%p) at +%d [s %d]: box is %p",
                (void*)info, info->id, info->u.expect.error, (void*)box);
            if (box != NULL && box->out_seq_id == seq_id && box->fd == fd) {
                return info;
            }
            break;
        }
        default:
            BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 64,
                "match fail %d on line %d", info->state, __LINE__);
            BUS_ASSERT(b, b->udata, false);
        }
    }

    if (b->log_level > 5 || 0) {
        BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 64,
            "==== Could not find <fd:%d, seq_id:%lld>, dumping table ====\n", 
            fd, (long long)seq_id);
        ListenerTask_DumpRXInfoTable(l);
    }
    /* Not found. Probably an unsolicited status message. */
    return NULL;
}

static void process_unpacked_message(listener *l,
        connection_info *ci, bus_unpack_cb_res_t result) {
    struct bus *b = l->bus;

    /* NOTE: message may be an unsolicited status message */

    YELP("processing unpacked message. ok? %d\n", result.ok);

    if (result.ok) {
        int64_t seq_id = result.u.success.seq_id;
        void *opaque_msg = result.u.success.msg;

        if ((seq_id < ci->largest_rd_seq_id_seen) &&
                (ci->largest_rd_seq_id_seen != BUS_NO_SEQ_ID) &&
                (seq_id != BUS_NO_SEQ_ID)) {
            BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 128,
                "suspicious sequence ID on %d: largest seen is %lld, got %lld\n",
                ci->fd, (long long)ci->largest_rd_seq_id_seen, (long long)seq_id);
        }
        ci->largest_rd_seq_id_seen = seq_id;
        YELP("msg: seq_id %lld, opaque_msg %p\n", seq_id, opaque_msg);

        rx_info_t *info = find_info_by_sequence_id(l, ci->fd, seq_id);
        rx_info_t *info = listener_helper_find_info_by_sequence_id(l, ci->fd, seq_id);
        YELP("got: %p (sz %zd)\n", info, sizeof(info));
        YELP("listener_helper_find_info_by_sequence_id <fd:%d, seq_id:%lld> => %p\n",
            ci->fd, seq_id, info);
        if (info) {
            YELP("info->state %d\n", info->state);

            switch (info->state) {
            case RIS_HOLD:
                /* Just save result, to match up later. */
Loading