Commit 3887106f authored by Scott Vokes's avatar Scott Vokes
Browse files

Reject new requests on FDs that have already errored.

parent dd6b8684
Loading
Loading
Loading
Loading
+15 −2
Original line number Diff line number Diff line
@@ -434,7 +434,7 @@ static void cleanup(sender *s) {

static bool register_socket_info(sender *s, int fd, SSL *ssl) {
    if (s->shutdown) { return false; }
    fd_info *info = malloc(sizeof(*info));
    fd_info *info = calloc(1, sizeof(*info));
    if (info == NULL) { 
        return false;
    }
@@ -503,6 +503,7 @@ static bool release_socket_info(sender *s, int fd) {
    fd_info *info = (fd_info *)old;
    if (info) {
        assert(fd == info->fd);
        info->errored = true;
        /* Expire any pending events on this socket. */
        if (info->refcount > 0) {
            set_error_for_socket(s, fd, TX_ERROR_CLOSED);            
@@ -574,6 +575,11 @@ static void enqueue_write(struct sender *s, tx_info_t *info) {
    if (yacht_get(s->fd_hash_table, fd, (void **)&fdi)) {
        assert(fdi);

        if (fdi->errored) {
            set_error_for_socket(s, fd, TX_ERROR_CLOSED);
            return;
        }

        if (fdi->largest_seq_id_seen > out_seq_id && fdi->largest_seq_id_seen > 0) {
            BUS_LOG_SNPRINTF(b, 0 , LOG_SENDER, b->udata, 64,
                "suspicious outgoing sequence ID on %d: got %lld, already sent up to %lld",
@@ -684,6 +690,11 @@ static void set_error_for_socket(sender *s, int fd, tx_error_t error) {
        break;
    }

    fd_info *fdi = NULL;
    if (yacht_get(s->fd_hash_table, fd, (void **)&fdi)) {
        fdi->errored = true;
    }

    for (int i = 0; i < MAX_CONCURRENT_SENDS; i++) {
        tx_flag_t cur = 1 << i;
        if (s->tx_flags & cur) {
@@ -791,6 +802,7 @@ static ssize_t socket_write_plain(sender *s, tx_info_t *info) {
    BUS_LOG_SNPRINTF(b, 10, LOG_SENDER, b->udata, 64,
        "write %p to %d, %zd bytes (info %d)",
        (void*)&msg[sent_size], fd, rem, info->id);

    ssize_t wrsz = write(fd, &msg[sent_size], rem);
    if (wrsz == -1) {
        if (util_is_resumable_io_error(errno)) {
@@ -805,6 +817,7 @@ static ssize_t socket_write_plain(sender *s, tx_info_t *info) {
            return 0;
        }
    } else if (wrsz > 0) {

        update_sent(b, s, info, wrsz);
        BUS_LOG_SNPRINTF(b, 5, LOG_SENDER, b->udata, 64,
            "sent: %zd\n", wrsz);
+1 −0
Original line number Diff line number Diff line
@@ -69,6 +69,7 @@ typedef struct {
    SSL *ssl;                   /* SSL handle. Can be NULL. */
    int refcount;
    int64_t largest_seq_id_seen;
    bool errored;
} fd_info;

/* Metadata for a message in-flight. */