Commit 5d814b74 authored by Scott Vokes's avatar Scott Vokes
Browse files

Make HOLD notification blocking; only do multi-reads when closing connection.

. Make the HOLD notifications blocking, to close a potential window
  where a HOLD message has been queued to the listener, but not yet
  received because the listener is busy reading from one or more
  very active connections. This ensures that a received response will
  ALWAYS have its handling info available, unless the respones has
  timed out.

. Fix yesterday's change for receiving full messages on hangup (which
  are likely to be UNSOLICITEDSTATUS messages with error info). Always
  reading connections until exhausted could lead to the listener
  spending excessively large amounts of time reading between checking
  its command queue, leading to responses arriving with
  not-yet-processed HOLD messages in-queue and getting flagged as
  unexpected.

. Also, add some additional constraints to ensure the listener's
  msg and rx_info structs are managed properly.
parent 24fc5f13
Loading
Loading
Loading
Loading
+8 −12
Original line number Diff line number Diff line
@@ -56,17 +56,15 @@ struct listener *listener_init(struct bus *b, struct bus_config *cfg) {
    l->fds[INCOMING_MSG_PIPE_ID].events = POLLIN;
    l->shutdown_notify_fd = LISTENER_NO_FD;

    for (int i = 0; i < MAX_PENDING_MESSAGES; i++) {
    for (int i = MAX_PENDING_MESSAGES - 1; i >= 0; i--) {
        rx_info_t *info = &l->rx_info[i];
        info->state = RIS_INACTIVE;

        uint16_t *p_id = (uint16_t *)&info->id;
        if (i < MAX_PENDING_MESSAGES - 1) {
            info->next = &l->rx_info[i + 1];
        }
        info->next = l->rx_info_freelist;
        l->rx_info_freelist = info;
        *p_id = i;
    }
    l->rx_info_freelist = &l->rx_info[0];

    for (int pipe_count = 0; pipe_count < MAX_QUEUE_MESSAGES; pipe_count++) {
        listener_msg *msg = &l->msgs[pipe_count];
@@ -84,12 +82,9 @@ struct listener *listener_init(struct bus *b, struct bus_config *cfg) {
            free(l);
            return NULL;
        }

        if (pipe_count < MAX_QUEUE_MESSAGES - 1) { /* forward link */
            msg->next = &l->msgs[pipe_count + 1];
        }
        msg->next = l->msg_freelist;
        l->msg_freelist = msg;
    }
    l->msg_freelist = &l->msgs[0];
    l->rx_info_max_used = 0;

    (void)cfg;
@@ -118,7 +113,7 @@ bool listener_remove_socket(struct listener *l, int fd, int *notify_fd) {
}

bool listener_hold_response(struct listener *l, int fd,
        int64_t seq_id, int16_t timeout_sec) {
        int64_t seq_id, int16_t timeout_sec, int *notify_fd) {
    listener_msg *msg = listener_helper_get_free_msg(l);
    struct bus *b = l->bus;
    if (msg == NULL) {
@@ -134,8 +129,9 @@ bool listener_hold_response(struct listener *l, int fd,
    msg->u.hold.fd = fd;
    msg->u.hold.seq_id = seq_id;
    msg->u.hold.timeout_sec = timeout_sec;
    msg->u.hold.notify_fd = msg->pipes[1];

    bool pm_res = listener_helper_push_message(l, msg, NULL);
    bool pm_res = listener_helper_push_message(l, msg, notify_fd);
    if (!pm_res) {
        BUS_LOG_SNPRINTF(b, 0, LOG_MEMORY, b->udata, 128,
            "listener_hold_response with <fd:%d, seq_id:%lld> FAILED",
+1 −1
Original line number Diff line number Diff line
@@ -45,7 +45,7 @@ bool listener_remove_socket(struct listener *l, int fd, int *notify_fd);
 * the response (with timeout) if it arrives before receiving further
 * instructions from the client. */
bool listener_hold_response(struct listener *l, int fd,
    int64_t seq_id, int16_t timeout_sec);
    int64_t seq_id, int16_t timeout_sec, int *notify_fd);

/* The client has finished a write, the listener should expect a response. */
bool listener_expect_response(struct listener *l, boxed_msg *box,
+13 −3
Original line number Diff line number Diff line
@@ -30,7 +30,7 @@
static void msg_handler(listener *l, listener_msg *pmsg);
static void add_socket(listener *l, connection_info *ci, int notify_fd);
static void remove_socket(listener *l, int fd, int notify_fd);
static void hold_response(listener *l, int fd, int64_t seq_id, int16_t timeout_sec);
static void hold_response(listener *l, int fd, int64_t seq_id, int16_t timeout_sec, int notify_fd);
static void expect_response(listener *l, boxed_msg *box);
static void shutdown(listener *l, int notify_fd);

@@ -127,7 +127,7 @@ static void msg_handler(listener *l, listener_msg *pmsg) {
        break;
    case MSG_HOLD_RESPONSE:
        hold_response(l, msg.u.hold.fd, msg.u.hold.seq_id,
            msg.u.hold.timeout_sec);
            msg.u.hold.timeout_sec, msg.u.hold.notify_fd);
        break;
    case MSG_EXPECT_RESPONSE:
        expect_response(l, msg.u.expect.box);
@@ -209,14 +209,19 @@ static void remove_socket(listener *l, int fd, int notify_fd) {
    ListenerCmd_NotifyCaller(l, notify_fd);
}

static void hold_response(listener *l, int fd, int64_t seq_id, int16_t timeout_sec) {
static void hold_response(listener *l, int fd, int64_t seq_id,
        int16_t timeout_sec, int notify_fd) {
    struct bus *b = l->bus;
    
    BUS_LOG_SNPRINTF(b, 5, LOG_LISTENER, b->udata, 128,
        "hold_response <fd:%d, seq_id:%lld>", fd, (long long)seq_id);

    rx_info_t *info = listener_helper_get_free_rx_info(l);
    if (info == NULL) {
        BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 128,
            "failed to get free rx_info for <fd:%d, seq_id:%lld>, dropping it",
            fd, (long long)seq_id);
        ListenerCmd_NotifyCaller(l, notify_fd);
        return;
    }
    BUS_ASSERT(b, b->udata, info);
@@ -231,6 +236,7 @@ static void hold_response(listener *l, int fd, int64_t seq_id, int16_t timeout_s
    info->u.hold.seq_id = seq_id;
    info->u.hold.has_result = false;
    memset(&info->u.hold.result, 0, sizeof(info->u.hold.result));
    ListenerCmd_NotifyCaller(l, notify_fd);
}

static void expect_response(listener *l, struct boxed_msg *box) {
@@ -262,6 +268,10 @@ static void expect_response(listener *l, struct boxed_msg *box) {

            ListenerTask_AttemptDelivery(l, info);
        } else if (info->u.hold.error != RX_ERROR_NONE) {
            BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 256,
                "info %p (%d) with <box:%p, fd:%d, seq_id:%lld> has error %d",
                (void *)info, info->id, 
                (void *)box, info->u.hold.fd, (long long)info->u.hold.seq_id, info->u.hold.error);
            rx_error_t error = info->u.hold.error;
            bus_unpack_cb_res_t result = info->u.hold.result;
            info->state = RIS_EXPECT;
+4 −2
Original line number Diff line number Diff line
@@ -42,9 +42,11 @@ listener_msg *listener_helper_get_free_msg(listener *l) {
        } else if (ATOMIC_BOOL_COMPARE_AND_SWAP(&l->msg_freelist, head, head->next)) {
            for (;;) {
                int16_t miu = l->msgs_in_use;
                assert(miu < MAX_QUEUE_MESSAGES);

                if (ATOMIC_BOOL_COMPARE_AND_SWAP(&l->msgs_in_use, miu, miu + 1)) {
                    BUS_LOG(l->bus, 5, LOG_LISTENER, "got free msg", l->bus->udata);
                    BUS_LOG_SNPRINTF(b, 5, LOG_LISTENER, b->udata, 64,
                        "got free msg: %u", head->id);

                    /* Add counterpressure between the client and the listener.
                     * 10 * ((n >> 1) ** 2) microseconds */
+2 −1
Original line number Diff line number Diff line
@@ -58,6 +58,7 @@ typedef struct listener_msg {
            int fd;
            int64_t seq_id;
            int16_t timeout_sec;
            int notify_fd;
        } hold;
        struct {
            boxed_msg *box;
@@ -110,7 +111,7 @@ typedef struct rx_info_t {
#define MAX_PENDING_MESSAGES (1024)

/* Max number of unprocessed queue messages */
#define MAX_QUEUE_MESSAGES 32
#define MAX_QUEUE_MESSAGES (32)
typedef uint32_t msg_flag_t;

/* Minimum and maximum poll() delays for listener, before going dormant. */
Loading