Loading src/lib/bus/bus.c +1 −1 Original line number Diff line number Diff line Loading @@ -682,7 +682,7 @@ void bus_free(bus *b) { pthread_mutex_destroy(&b->log_lock); bus_ssl_free(b); bus_ssl_ctx_free(b); free(b); } Loading src/lib/bus/bus_ssl.c +2 −2 Original line number Diff line number Diff line Loading @@ -39,7 +39,7 @@ bool bus_ssl_connect(struct bus *b, connection_info *ci) { ci->ssl = ssl; if (!SSL_set_fd(ci->ssl, ci->fd)) { return false;; return false; } if (do_blocking_connection(b, ci)) { Loading @@ -59,7 +59,7 @@ bool bus_ssl_disconnect(struct bus *b, SSL *ssl) { } /* Free all internal data for using SSL. */ void bus_ssl_free(struct bus *b) { void bus_ssl_ctx_free(struct bus *b) { if (b && b->ssl_ctx) { SSL_CTX_free(b->ssl_ctx); b->ssl_ctx = NULL; Loading src/lib/bus/bus_ssl.h +2 −2 Original line number Diff line number Diff line Loading @@ -19,7 +19,7 @@ bool bus_ssl_connect(struct bus *b, connection_info *ci); /* Disconnect and free an individual SSL handle. */ bool bus_ssl_disconnect(struct bus *b, SSL *ssl); /* Free all internal data for using SSL. */ void bus_ssl_free(struct bus *b); /* Free all internal data for using SSL (the SSL_CTX). */ void bus_ssl_ctx_free(struct bus *b); #endif src/lib/bus/listener.c +13 −11 Original line number Diff line number Diff line Loading @@ -95,8 +95,8 @@ bool listener_remove_socket(struct listener *l, int fd) { listener_msg *msg = get_free_msg(l); if (msg == NULL) { return false; } msg->type = MSG_CLOSE_SOCKET; msg->u.close_socket.fd = fd; msg->type = MSG_REMOVE_SOCKET; msg->u.remove_socket.fd = fd; return push_message(l, msg); } Loading Loading @@ -254,8 +254,8 @@ void listener_free(struct listener *l) { } for (int i = 0; i < l->tracked_fds; i++) { /* Forget off the front to stress forget_socket. */ forget_socket(l, l->fds[0].fd); /* Remove off the front to stress remove_socket. */ remove_socket(l, l->fds[0].fd); } if (l->read_buf) { Loading Loading @@ -393,7 +393,7 @@ static bool sink_socket_read(struct bus *b, listener *l, connection_info *ci, ssize_t size); static void attempt_recv(listener *l, int available) { /* --> failure --> close socket, don't die */ /* --> failure --> set 'closed' error on socket, don't die */ struct bus *b = l->bus; int read_from = 0; BUS_LOG(b, 3, LOG_LISTENER, "attempting receive", b->udata); Loading @@ -406,11 +406,13 @@ static void attempt_recv(listener *l, int available) { if (fd->revents & (POLLERR | POLLNVAL)) { read_from++; BUS_LOG(b, 2, LOG_LISTENER, "pollfd: socket error (POLLERR | POLLNVAL)", b->udata); BUS_LOG(b, 2, LOG_LISTENER, "pollfd: socket error (POLLERR | POLLNVAL)", b->udata); set_error_for_socket(l, i, ci->fd, RX_ERROR_POLLERR); } else if (fd->revents & POLLHUP) { read_from++; BUS_LOG(b, 3, LOG_LISTENER, "pollfd: socket error POLLHUP", b->udata); BUS_LOG(b, 3, LOG_LISTENER, "pollfd: socket error POLLHUP", b->udata); set_error_for_socket(l, i, ci->fd, RX_ERROR_POLLHUP); } else if (fd->revents & POLLIN) { BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64, Loading Loading @@ -1021,8 +1023,8 @@ static void msg_handler(listener *l, listener_msg *pmsg) { case MSG_ADD_SOCKET: add_socket(l, msg.u.add_socket.info, msg.u.add_socket.notify_fd); break; case MSG_CLOSE_SOCKET: forget_socket(l, msg.u.close_socket.fd); case MSG_REMOVE_SOCKET: remove_socket(l, msg.u.remove_socket.fd); break; case MSG_HOLD_RESPONSE: hold_response(l, msg.u.hold.fd, msg.u.hold.seq_id, Loading Loading @@ -1123,10 +1125,10 @@ static void free_ci(connection_info *ci) { } } static void forget_socket(listener *l, int fd) { static void remove_socket(listener *l, int fd) { struct bus *b = l->bus; BUS_LOG_SNPRINTF(b, 2, LOG_LISTENER, b->udata, 128, "forgetting socket %d", fd); "removing socket %d", fd); /* don't really close it, just drop info about it in the listener */ for (int i = 0; i < l->tracked_fds; i++) { Loading src/lib/bus/listener_internal.h +3 −3 Original line number Diff line number Diff line Loading @@ -23,7 +23,7 @@ typedef enum { MSG_NONE, MSG_ADD_SOCKET, MSG_CLOSE_SOCKET, MSG_REMOVE_SOCKET, MSG_HOLD_RESPONSE, MSG_EXPECT_RESPONSE, MSG_SHUTDOWN, Loading @@ -41,7 +41,7 @@ typedef struct listener_msg { } add_socket; struct { int fd; } close_socket; } remove_socket; struct { int fd; int64_t seq_id; Loading Loading @@ -146,7 +146,7 @@ static void dump_rx_info_table(listener *l); static void msg_handler(listener *l, listener_msg *msg); static void add_socket(listener *l, connection_info *ci, int notify_fd); static void forget_socket(listener *l, int fd); static void remove_socket(listener *l, int fd); static void hold_response(listener *l, int fd, int64_t seq_id, int16_t timeout_sec); static void expect_response(listener *l, boxed_msg *box); static void shutdown(listener *l); Loading Loading
src/lib/bus/bus.c +1 −1 Original line number Diff line number Diff line Loading @@ -682,7 +682,7 @@ void bus_free(bus *b) { pthread_mutex_destroy(&b->log_lock); bus_ssl_free(b); bus_ssl_ctx_free(b); free(b); } Loading
src/lib/bus/bus_ssl.c +2 −2 Original line number Diff line number Diff line Loading @@ -39,7 +39,7 @@ bool bus_ssl_connect(struct bus *b, connection_info *ci) { ci->ssl = ssl; if (!SSL_set_fd(ci->ssl, ci->fd)) { return false;; return false; } if (do_blocking_connection(b, ci)) { Loading @@ -59,7 +59,7 @@ bool bus_ssl_disconnect(struct bus *b, SSL *ssl) { } /* Free all internal data for using SSL. */ void bus_ssl_free(struct bus *b) { void bus_ssl_ctx_free(struct bus *b) { if (b && b->ssl_ctx) { SSL_CTX_free(b->ssl_ctx); b->ssl_ctx = NULL; Loading
src/lib/bus/bus_ssl.h +2 −2 Original line number Diff line number Diff line Loading @@ -19,7 +19,7 @@ bool bus_ssl_connect(struct bus *b, connection_info *ci); /* Disconnect and free an individual SSL handle. */ bool bus_ssl_disconnect(struct bus *b, SSL *ssl); /* Free all internal data for using SSL. */ void bus_ssl_free(struct bus *b); /* Free all internal data for using SSL (the SSL_CTX). */ void bus_ssl_ctx_free(struct bus *b); #endif
src/lib/bus/listener.c +13 −11 Original line number Diff line number Diff line Loading @@ -95,8 +95,8 @@ bool listener_remove_socket(struct listener *l, int fd) { listener_msg *msg = get_free_msg(l); if (msg == NULL) { return false; } msg->type = MSG_CLOSE_SOCKET; msg->u.close_socket.fd = fd; msg->type = MSG_REMOVE_SOCKET; msg->u.remove_socket.fd = fd; return push_message(l, msg); } Loading Loading @@ -254,8 +254,8 @@ void listener_free(struct listener *l) { } for (int i = 0; i < l->tracked_fds; i++) { /* Forget off the front to stress forget_socket. */ forget_socket(l, l->fds[0].fd); /* Remove off the front to stress remove_socket. */ remove_socket(l, l->fds[0].fd); } if (l->read_buf) { Loading Loading @@ -393,7 +393,7 @@ static bool sink_socket_read(struct bus *b, listener *l, connection_info *ci, ssize_t size); static void attempt_recv(listener *l, int available) { /* --> failure --> close socket, don't die */ /* --> failure --> set 'closed' error on socket, don't die */ struct bus *b = l->bus; int read_from = 0; BUS_LOG(b, 3, LOG_LISTENER, "attempting receive", b->udata); Loading @@ -406,11 +406,13 @@ static void attempt_recv(listener *l, int available) { if (fd->revents & (POLLERR | POLLNVAL)) { read_from++; BUS_LOG(b, 2, LOG_LISTENER, "pollfd: socket error (POLLERR | POLLNVAL)", b->udata); BUS_LOG(b, 2, LOG_LISTENER, "pollfd: socket error (POLLERR | POLLNVAL)", b->udata); set_error_for_socket(l, i, ci->fd, RX_ERROR_POLLERR); } else if (fd->revents & POLLHUP) { read_from++; BUS_LOG(b, 3, LOG_LISTENER, "pollfd: socket error POLLHUP", b->udata); BUS_LOG(b, 3, LOG_LISTENER, "pollfd: socket error POLLHUP", b->udata); set_error_for_socket(l, i, ci->fd, RX_ERROR_POLLHUP); } else if (fd->revents & POLLIN) { BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64, Loading Loading @@ -1021,8 +1023,8 @@ static void msg_handler(listener *l, listener_msg *pmsg) { case MSG_ADD_SOCKET: add_socket(l, msg.u.add_socket.info, msg.u.add_socket.notify_fd); break; case MSG_CLOSE_SOCKET: forget_socket(l, msg.u.close_socket.fd); case MSG_REMOVE_SOCKET: remove_socket(l, msg.u.remove_socket.fd); break; case MSG_HOLD_RESPONSE: hold_response(l, msg.u.hold.fd, msg.u.hold.seq_id, Loading Loading @@ -1123,10 +1125,10 @@ static void free_ci(connection_info *ci) { } } static void forget_socket(listener *l, int fd) { static void remove_socket(listener *l, int fd) { struct bus *b = l->bus; BUS_LOG_SNPRINTF(b, 2, LOG_LISTENER, b->udata, 128, "forgetting socket %d", fd); "removing socket %d", fd); /* don't really close it, just drop info about it in the listener */ for (int i = 0; i < l->tracked_fds; i++) { Loading
src/lib/bus/listener_internal.h +3 −3 Original line number Diff line number Diff line Loading @@ -23,7 +23,7 @@ typedef enum { MSG_NONE, MSG_ADD_SOCKET, MSG_CLOSE_SOCKET, MSG_REMOVE_SOCKET, MSG_HOLD_RESPONSE, MSG_EXPECT_RESPONSE, MSG_SHUTDOWN, Loading @@ -41,7 +41,7 @@ typedef struct listener_msg { } add_socket; struct { int fd; } close_socket; } remove_socket; struct { int fd; int64_t seq_id; Loading Loading @@ -146,7 +146,7 @@ static void dump_rx_info_table(listener *l); static void msg_handler(listener *l, listener_msg *msg); static void add_socket(listener *l, connection_info *ci, int notify_fd); static void forget_socket(listener *l, int fd); static void remove_socket(listener *l, int fd); static void hold_response(listener *l, int fd, int64_t seq_id, int16_t timeout_sec); static void expect_response(listener *l, boxed_msg *box); static void shutdown(listener *l); Loading