Commit 3c584872 authored by Greg Williams's avatar Greg Williams
Browse files

Merge branch 'develop' into remote_hangup_handling

parents 1c7e3f18 3e15f88e
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -234,8 +234,8 @@ static void expect_response(listener *l, struct boxed_msg *box) {
        (void *)box, box->fd, (long long)box->out_seq_id);

    /* If there's a pending HOLD message, convert it. */
    rx_info_t *info = listener_helper_get_hold_rx_info(l, box->fd, box->out_seq_id);
    if (info) {
    rx_info_t *info = listener_helper_find_info_by_sequence_id(l, box->fd, box->out_seq_id);
    if (info && info->state == RIS_HOLD) {
        BUS_ASSERT(b, b->udata, info->state == RIS_HOLD);
        if (info->u.hold.has_result) {
            bus_unpack_cb_res_t result = info->u.hold.result;
+7 −13
Original line number Diff line number Diff line
@@ -24,6 +24,10 @@

#include <assert.h>

#ifdef TEST
uint8_t msg_buf[sizeof(uint8_t)];
#endif

listener_msg *listener_helper_get_free_msg(listener *l) {
    struct bus *b = l->bus;

@@ -65,7 +69,9 @@ bool listener_helper_push_message(struct listener *l, listener_msg *msg, int *re
    struct bus *b = l->bus;
    BUS_ASSERT(b, b->udata, msg);
  
    uint8_t msg_buf[sizeof(msg->id)];
    #ifndef TEST
    uint8_t msg_buf[sizeof(uint8_t)];
    #endif
    msg_buf[0] = msg->id;

    if (reply_fd) { *reply_fd = msg->pipes[0]; }
@@ -116,18 +122,6 @@ rx_info_t *listener_helper_get_free_rx_info(struct listener *l) {
    }
}

rx_info_t *listener_helper_get_hold_rx_info(listener *l, int fd, int64_t seq_id) {
    for (int i = 0; i <= l->rx_info_max_used; i++) {
        rx_info_t *info = &l->rx_info[i];
        if (info->state == RIS_HOLD &&
            info->u.hold.fd == fd &&
            info->u.hold.seq_id == seq_id) {
            return info;
        }
    }
    return NULL;
}

rx_info_t *listener_helper_find_info_by_sequence_id(listener *l,
        int fd, int64_t seq_id) {
    struct bus *b = l->bus;    
+0 −1
Original line number Diff line number Diff line
@@ -28,7 +28,6 @@ listener_msg *listener_helper_get_free_msg(listener *l);
bool listener_helper_push_message(struct listener *l, listener_msg *msg, int *reply_fd);

rx_info_t *listener_helper_get_free_rx_info(listener *l);
rx_info_t *listener_helper_get_hold_rx_info(listener *l, int fd, int64_t seq_id);
rx_info_t *listener_helper_find_info_by_sequence_id(listener *l,
    int fd, int64_t seq_id);

+3 −0
Original line number Diff line number Diff line
@@ -68,6 +68,9 @@ typedef struct listener_msg {
    } u;
} listener_msg;

/* How long the listener should wait for responses before blocking. */
#define LISTENER_TASK_TIMEOUT_DELAY 100

typedef enum {
    RIS_HOLD = 1,
    RIS_EXPECT = 2,
+14 −28
Original line number Diff line number Diff line
@@ -28,8 +28,6 @@
#include "syscall.h"
#include "util.h"

#define YELP(...) //fprintf(stderr, __VA_ARGS__)

void ListenerIO_AttemptRecv(listener *l, int available) {
    /*   --> failure --> set 'closed' error on socket, don't die */
    struct bus *b = l->bus;
@@ -46,19 +44,7 @@ void ListenerIO_AttemptRecv(listener *l, int available) {
         * (POLLHUP | POLLERR | POLLNVAL), so if we get a status message
         * with a reason for a hangup we can still pass it along. */

        YELP("checking l->fds[%d]: revents 0x%04x\n", i + 1, l->fds[i + 1].revents); 

        if (fd->revents & (POLLERR | POLLNVAL)) {
            read_from++;
            BUS_LOG(b, 2, LOG_LISTENER,
                "pollfd: socket error (POLLERR | POLLNVAL)", b->udata);
            set_error_for_socket(l, i, ci->fd, RX_ERROR_POLLERR);
        } else if (fd->revents & POLLHUP) {
            read_from++;
            BUS_LOG(b, 3, LOG_LISTENER, "pollfd: socket error POLLHUP",
                b->udata);
            set_error_for_socket(l, i, ci->fd, RX_ERROR_POLLHUP);
        } else if (fd->revents & POLLIN) {
        if (fd->revents & POLLIN) {
            BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64,
                "reading %zd bytes from socket (buf is %zd)",
                ci->to_read_size, l->read_buf_size);
@@ -76,13 +62,24 @@ void ListenerIO_AttemptRecv(listener *l, int available) {
                BUS_ASSERT(b, b->udata, false);
            }
        }

        if (fd->revents & (POLLERR | POLLNVAL)) {
            read_from++;
            BUS_LOG(b, 2, LOG_LISTENER,
                "pollfd: socket error (POLLERR | POLLNVAL)", b->udata);
            set_error_for_socket(l, i, ci->fd, RX_ERROR_POLLERR);
        } else if (fd->revents & POLLHUP) {
            read_from++;
            BUS_LOG(b, 3, LOG_LISTENER, "pollfd: socket error POLLHUP",
                b->udata);
            set_error_for_socket(l, i, ci->fd, RX_ERROR_POLLHUP);
        }
    }
}
    
static bool socket_read_plain(struct bus *b, listener *l, int pfd_i, connection_info *ci) {
    for (;;) {
        ssize_t size = syscall_read(ci->fd, l->read_buf, ci->to_read_size);
        YELP("read %zd on fd %d, to_read_size %zd\n", size, ci->fd, ci->to_read_size);
        if (size == -1) {
            if (util_is_resumable_io_error(errno)) {
                errno = 0;
@@ -192,11 +189,9 @@ static bool sink_socket_read(struct bus *b,
    bus_unlock_log(b);
#endif
    
    YELP("sinking read, %zd bytes\n", size);
    bus_sink_cb_res_t sres = b->sink_cb(l->read_buf, size, ci->udata);
    if (sres.full_msg_buffer) {
        BUS_LOG(b, 3, LOG_LISTENER, "calling unpack CB", b->udata);
        YELP("calling unpack_cb\n");
        bus_unpack_cb_res_t ures = b->unpack_cb(sres.full_msg_buffer, ci->udata);
        BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64,
            "process_unpacked_message: ok? %d, seq_id:%lld",
@@ -210,7 +205,6 @@ static bool sink_socket_read(struct bus *b,
        "expecting next read to have %zd bytes", ci->to_read_size);
    
    /* Grow read buffer if necessary. */
    YELP("grow buffer? %zd => %zd\n", ci->to_read_size, l->read_buf_size);
    if (ci->to_read_size > l->read_buf_size) {
        if (!ListenerTask_GrowReadBuf(l, ci->to_read_size)) {
            BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128,
@@ -227,7 +221,6 @@ static void set_error_for_socket(listener *l, int id, int fd, rx_error_t err) {
    struct bus *b = l->bus;
    BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64,
        "set_error_for_socket %d, err %d", fd, err);
    YELP("set_error_for_socket: fd %d, error %d\n", fd, err);

    for (int i = 0; i <= l->rx_info_max_used; i++) {
        rx_info_t *info = &l->rx_info[i];
@@ -264,20 +257,13 @@ static void process_unpacked_message(listener *l,

    /* NOTE: message may be an unsolicited status message */

    YELP("processing unpacked message. ok? %d\n", result.ok);

    if (result.ok) {
        int64_t seq_id = result.u.success.seq_id;
        void *opaque_msg = result.u.success.msg;

        YELP("msg: seq_id %lld, opaque_msg %p\n", seq_id, opaque_msg);

        rx_info_t *info = listener_helper_find_info_by_sequence_id(l, ci->fd, seq_id);
        YELP("got: %p (sz %zd)\n", info, sizeof(info));
        YELP("listener_helper_find_info_by_sequence_id <fd:%d, seq_id:%lld> => %p\n",
            ci->fd, seq_id, info);

        if (info) {
            YELP("info->state %d\n", info->state);

            switch (info->state) {
            case RIS_HOLD:
Loading