Commit 73e1ebc0 authored by Scott Vokes's avatar Scott Vokes
Browse files

Use sizeof(pipe write buffer), not hardcoded size. Add comment about blocking.

parent d120d67a
Loading
Loading
Loading
Loading
+15 −7
Original line number Diff line number Diff line
@@ -279,7 +279,7 @@ bool bus_send_request(struct bus *b, bus_user_msg *msg)
        return false;
    }

    int complete_fd = 0;
    int complete_fd = -1;

    int s_id = sender_id_of_socket(b, msg->fd);
    struct sender *s = b->senders[s_id];
@@ -312,25 +312,28 @@ static bool poll_on_completion(struct bus *b, int fd) {
        if (res == -1) {
            if (is_resumable_io_error(errno)) {
                BUS_LOG(b, 3, LOG_SENDING_REQUEST, "Polling on completion...EAGAIN", b->udata);
                if (errno == EINTR && i > 0) { i--; }
                errno = 0;
            } else {
                assert(false);
                break;
            }
        } else if (res > 0) {
            uint8_t read_buf[2];
            uint16_t msec = 0;
            uint8_t read_buf[sizeof(msec)];
            
            BUS_LOG(b, 3, LOG_SENDING_REQUEST, "Reading alert pipe...", b->udata);
            ssize_t sz = read(fd, read_buf, 2);
            ssize_t sz = read(fd, read_buf, sizeof(read_buf));

            if (sz == 2) {
            if (sz == sizeof(read_buf)) {
                /* Payload: little-endian uint16_t, msec of backpressure. */
                uint16_t msec = (read_buf[0] << 0) + (read_buf[1] << 8);
                msec = (read_buf[0] << 0) + (read_buf[1] << 8);
                if (msec > 0) {
                    BUS_LOG_SNPRINTF(b, 5, LOG_SENDING_REQUEST, b->udata, 64,
                        " -- backpressure of %d msec", msec);
                        " -- awakening client thread with backpressure of %d msec", msec);
                    (void)poll(fds, 0, msec);
                }

                BUS_LOG(b, 3, LOG_SENDING_REQUEST, "sent!", b->udata);
                return true;
            } else if (sz == -1) {
@@ -343,7 +346,12 @@ static bool poll_on_completion(struct bus *b, int fd) {
            }
        }
    }
    BUS_LOG(b, 2, LOG_SENDING_REQUEST, "failed to send (timeout)", b->udata);
    BUS_LOG(b, 2, LOG_SENDING_REQUEST, "failed to send (timeout on sender)", b->udata);

    #if 0
    assert(false);
    #endif

    return false;
}

+3 −3
Original line number Diff line number Diff line
@@ -830,11 +830,11 @@ static void msg_handler(listener *l, listener_msg *pmsg) {
}

static void notify_caller(int fd) {
    uint8_t reply_buf[2] = {0x00, 0x00};
    uint8_t reply_buf[sizeof(uint16_t)] = {0x00};

    for (;;) {
        ssize_t wres = write(fd, reply_buf, 2);
        if (wres == 2) { break; }
        ssize_t wres = write(fd, reply_buf, sizeof(reply_buf));
        if (wres == sizeof(reply_buf)) { break; }
        if (wres == -1) {
            if (errno == EINTR) {
                errno = 0;
+11 −4
Original line number Diff line number Diff line
@@ -105,7 +105,9 @@ bool sender_enqueue_message(struct sender *s,
        BUS_LOG(b, 3, LOG_SENDER, "enqueue: no messages left", b->udata);
        return false;
    } else {
        assert(*response_fd == SENDER_FD_NOT_IN_USE);
        *response_fd = s->pipes[tx_info->id][0];
        assert(*response_fd != SENDER_FD_NOT_IN_USE);
        /* Message is queued for delivery once TX info is populated. */
        BUS_LOG(b, 4, LOG_SENDER, "enqueue: messages enqueued", b->udata);
        return populate_tx_info(s, tx_info, box);
@@ -489,7 +491,7 @@ static ssize_t socket_write_plain(sender *s, tx_info_t *info) {
    size_t msg_size = info->box->out_msg_size;
    size_t rem = msg_size - info->sent_size;

    BUS_LOG_SNPRINTF(b, 9, LOG_SENDER, b->udata, 64,
    BUS_LOG_SNPRINTF(b, 10, LOG_SENDER, b->udata, 64,
        "write %p to %d, %zd bytes (info %d)",
        &msg[info->sent_size], info->fd, rem, info->id);
    ssize_t wrsz = write(info->fd, &msg[info->sent_size], rem);
@@ -582,9 +584,10 @@ static void update_sent(struct bus *b, sender *s, tx_info_t *info, ssize_t sent)
}

static bool write_backpressure(sender *s, tx_info_t *info, uint16_t bp) {
    uint8_t buf[2];
    uint8_t buf[sizeof(bp)];
    buf[0] = (uint8_t)((bp & 0x00FF) >> 0);
    buf[1] = (uint8_t)((bp & 0xFF00) >> 8);

    int pipe_fd = s->pipes[info->id][1];

    ssize_t res = write(pipe_fd, buf, sizeof(buf));
@@ -594,7 +597,7 @@ static bool write_backpressure(sender *s, tx_info_t *info, uint16_t bp) {
         *     But, under what circumstances can it even fail? */
        errno = 0;
        return false;
    } else if (res == 2) {
    } else if (res == sizeof(buf)) {
        return true;
    } else {
        assert(false);
@@ -663,10 +666,14 @@ static void attempt_to_enqueue_message_to_listener(sender *s, tx_info_t *info) {
    uint16_t backpressure = 0;
    /* If this succeeds, then this thread cannot touch the box anymore. */
    if (listener_expect_response(l, box, &backpressure)) {
        BUS_LOG_SNPRINTF(b, 9, LOG_SENDER, b->udata, 64,
        BUS_LOG_SNPRINTF(b, 10, LOG_SENDER, b->udata, 64,
            "unblocking listener, releasing msg: %p, backpressure %u\n", out_msg, backpressure);

        write_backpressure(s, info, backpressure);   /* alert blocked client thread */
        BUS_LOG_SNPRINTF(b, 8, LOG_SENDER, b->udata, 128, "release_tx_info %d", __LINE__);
        /* Note: We are not releasing the TX INFO until _after_ we've
         * done the write to unblock the backpressure, because we need to prevent
         * one write from unblocking another. */
        release_tx_info(s, info);
    } else {
        BUS_LOG(b, 2, LOG_SENDER, "failed delivery", b->udata);