Commit 92f7b40c authored by Scott Vokes's avatar Scott Vokes
Browse files

Ensure counterpressure value in listener's response is initialized/set.

parent a15e2460
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -212,10 +212,10 @@ void listener_free(struct listener *l) {
            listener_msg *msg = &l->msgs[i];
            switch (msg->type) {
            case MSG_ADD_SOCKET:
                ListenerCmd_NotifyCaller(msg->u.add_socket.notify_fd);
                ListenerCmd_NotifyCaller(l, msg->u.add_socket.notify_fd);
                break;
            case MSG_REMOVE_SOCKET:
                ListenerCmd_NotifyCaller(msg->u.remove_socket.notify_fd);
                ListenerCmd_NotifyCaller(l, msg->u.remove_socket.notify_fd);
                break;
            case MSG_EXPECT_RESPONSE:
                if (msg->u.expect.box) { free(msg->u.expect.box); }
+10 −7
Original line number Diff line number Diff line
@@ -39,14 +39,17 @@ uint8_t reply_buf[sizeof(uint8_t) + sizeof(uint16_t)];
uint8_t cmd_buf[LISTENER_CMD_BUF_SIZE];
#endif

void ListenerCmd_NotifyCaller(int fd) {
void ListenerCmd_NotifyCaller(listener *l, int fd) {
    if (fd == -1) { return; }
    #ifndef TEST
    uint8_t reply_buf[sizeof(uint8_t) + sizeof(uint16_t)];
    #endif
    reply_buf[0] = LISTENER_MSG_TAG;

    /* TODO: reply_buf[1:2] can be little-endian backpressure.  */
    /* reply_buf[1:2] is little-endian backpressure.  */
    uint16_t backpressure = ListenerTask_GetBackpressure(l);
    reply_buf[1] = (uint8_t)(backpressure & 0xff);
    reply_buf[2] = (uint8_t)((backpressure >> 8) & 0xff);

    for (;;) {
        ssize_t wres = syscall_write(fd, reply_buf, sizeof(reply_buf));
@@ -142,13 +145,13 @@ static void add_socket(listener *l, connection_info *ci, int notify_fd) {
    if (l->tracked_fds == MAX_FDS) {
        /* error: full */
        BUS_LOG(b, 3, LOG_LISTENER, "FULL", b->udata);
        ListenerCmd_NotifyCaller(notify_fd);
        ListenerCmd_NotifyCaller(l, notify_fd);
        return;
    }
    for (int i = 0; i < l->tracked_fds; i++) {
        if (l->fds[i + INCOMING_MSG_PIPE].fd == ci->fd) {
            free(ci);
            ListenerCmd_NotifyCaller(notify_fd);
            ListenerCmd_NotifyCaller(l, notify_fd);
            return;             /* already present */
        }
    }
@@ -166,12 +169,12 @@ static void add_socket(listener *l, connection_info *ci, int notify_fd) {

    if (!ListenerTask_GrowReadBuf(l, ci->to_read_size)) {
        free(ci);
        ListenerCmd_NotifyCaller(notify_fd);
        ListenerCmd_NotifyCaller(l, notify_fd);
        return;             /* alloc failure */
    }

    BUS_LOG(b, 3, LOG_LISTENER, "added socket", b->udata);
    ListenerCmd_NotifyCaller(notify_fd);
    ListenerCmd_NotifyCaller(l, notify_fd);
}

static void remove_socket(listener *l, int fd, int notify_fd) {
@@ -196,7 +199,7 @@ static void remove_socket(listener *l, int fd, int notify_fd) {
        }
    }
    /* CI will be freed by the client thread. */
    ListenerCmd_NotifyCaller(notify_fd);
    ListenerCmd_NotifyCaller(l, notify_fd);
}

static void hold_response(listener *l, int fd, int64_t seq_id, int16_t timeout_sec) {
+1 −1
Original line number Diff line number Diff line
@@ -23,7 +23,7 @@
#include "listener_internal_types.h"

/* Notify the listener's caller that a command has completed. */
void ListenerCmd_NotifyCaller(int fd);
void ListenerCmd_NotifyCaller(listener *l, int fd);

/* Process incoming commands, if any. */
void ListenerCmd_CheckIncomingMessages(listener *l, int *res);
+1 −1
Original line number Diff line number Diff line
@@ -88,7 +88,7 @@ void *ListenerTask_MainLoop(void *arg) {
            "%d connections still open!", self->tracked_fds);
    }

    ListenerCmd_NotifyCaller(self->shutdown_notify_fd);
    ListenerCmd_NotifyCaller(self, self->shutdown_notify_fd);
    self->shutdown_notify_fd = LISTENER_SHUTDOWN_COMPLETE_FD;
    return NULL;
}
+2 −2
Original line number Diff line number Diff line
@@ -203,9 +203,9 @@ void test_listener_free_should_unblock_pending_callers_and_close_file_handles(vo
    /* test cleanup */
    for (int i = 0; i < MAX_QUEUE_MESSAGES; i++) {
        if (i == 4) {
            ListenerCmd_NotifyCaller_Expect(1234);
            ListenerCmd_NotifyCaller_Expect(nl, 1234);
        } else if (i == 7) {
            ListenerCmd_NotifyCaller_Expect(1237);
            ListenerCmd_NotifyCaller_Expect(nl, 1237);
        }

        syscall_close_ExpectAndReturn(i, 0);
Loading