Loading src/lib/bus/bus.c +87 −8 Original line number Diff line number Diff line Loading @@ -33,6 +33,7 @@ #include "threadpool.h" #include "bus_internal_types.h" #include "bus_ssl.h" #include "yacht.h" /* Function pointers for pthreads. */ void *listener_mainloop(void *arg); Loading @@ -50,6 +51,8 @@ static void set_defaults(bus_config *cfg) { if (cfg->listener_count == 0) { cfg->listener_count = 1; } } #define DEF_FD_SET_SIZE2 4 bool bus_init(bus_config *config, struct bus_result *res) { if (res == NULL) { return false; } if (config == NULL) { Loading Loading @@ -81,6 +84,7 @@ bool bus_init(bus_config *config, struct bus_result *res) { struct threadpool *tp = NULL; bool *j = NULL; pthread_t *pts = NULL; struct yacht *fd_set = NULL; bus *b = calloc(1, sizeof(*b)); if (b == NULL) { goto cleanup; } Loading @@ -98,14 +102,16 @@ bool bus_init(bus_config *config, struct bus_result *res) { res->status = BUS_INIT_ERROR_MUTEX_INIT_FAIL; goto cleanup; } if (0 != pthread_mutex_init(&b->fd_set_lock, NULL)) { res->status = BUS_INIT_ERROR_MUTEX_INIT_FAIL; goto cleanup; } log_lock_init = true; BUS_LOG_SNPRINTF(b, 3, LOG_INITIALIZATION, b->udata, 64, "Initialized bus at %p", b); pthread_mutex_lock(&b->log_lock); pthread_mutex_unlock(&b->log_lock); ss = calloc(config->sender_count, sizeof(*ss)); if (ss == NULL) { goto cleanup; Loading @@ -126,6 +132,7 @@ bool bus_init(bus_config *config, struct bus_result *res) { if (ls == NULL) { goto cleanup; } for (int i = 0; i < config->listener_count; i++) { ls[i] = listener_init(b, config); if (ls[i] == NULL) { Loading @@ -150,6 +157,11 @@ bool bus_init(bus_config *config, struct bus_result *res) { goto cleanup; } fd_set = yacht_init(DEF_FD_SET_SIZE2); if (fd_set == NULL) { goto cleanup; } b->sender_count = config->sender_count; b->senders = ss; b->listener_count = config->listener_count; Loading @@ -176,6 +188,7 @@ bool bus_init(bus_config *config, struct bus_result *res) { } } b->fd_set = fd_set; res->bus = b; BUS_LOG(b, 1, LOG_INITIALIZATION, "initialized", config->bus_udata); return true; Loading @@ -196,12 +209,14 @@ cleanup: if (j) { free(j); } if (b) { if (log_lock_init) { pthread_mutex_destroy(&b->fd_set_lock); pthread_mutex_destroy(&b->log_lock); } free(b); } if (pts) { free(pts); } if (fd_set) { yacht_free(fd_set, NULL, NULL); } return false; } Loading @@ -219,9 +234,29 @@ static boxed_msg *box_msg(struct bus *b, bus_user_msg *msg) { box->fd = msg->fd; assert(msg->fd != 0); /* Lock hash table and check whether this FD uses SSL. */ if (0 != pthread_mutex_lock(&b->fd_set_lock)) { assert(false); } void *value = NULL; SSL *ssl = NULL; if (yacht_get(b->fd_set, box->fd, &value)) { ssl = (SSL *)value; assert(ssl != NULL); box->ssl = ssl; } if (0 != pthread_mutex_unlock(&b->fd_set_lock)) { assert(false); } if (ssl == NULL) { /* socket isn't registered, fail out */ free(box); return NULL; } box->out_seq_id = msg->seq_id; box->out_msg_size = msg->msg_size; /* FIXME: should this be copied by pointer or value? */ /* Store message by pointer, since the client thread using it is blocked * until we are done sending. */ box->out_msg = msg->msg; box->cb = msg->cb; Loading Loading @@ -375,10 +410,22 @@ bool bus_register_socket(struct bus *b, bus_socket_t type, int fd, void *udata) ci->udata = udata; if (type == BUS_SOCKET_SSL) { if (!bus_ssl_connect(b, ci)) { free(ci); return false; if (!bus_ssl_connect(b, ci)) { goto cleanup; } } else { ci->ssl = BUS_NO_SSL; } void *old_value = NULL; /* Lock hash table and save whether this FD uses SSL. */ if (0 != pthread_mutex_lock(&b->fd_set_lock)) { assert(false); } bool set_ok = yacht_set(b->fd_set, fd, (void *)ci->ssl, &old_value); if (0 != pthread_mutex_unlock(&b->fd_set_lock)) { assert(false); } if (set_ok) { assert(old_value == NULL); } else { goto cleanup; } bool res = listener_add_socket(l, ci, pipe_in); Loading @@ -403,6 +450,38 @@ cleanup: return false; } /* Free metadata about a socket that has been disconnected. */ bool bus_release_socket(struct bus *b, int fd) { /* Register a socket internally with a listener. */ int l_id = listener_id_of_socket(b, fd); BUS_LOG_SNPRINTF(b, 2, LOG_SOCKET_REGISTERED, b->udata, 64, "forgetting socket %d", fd); /* Spread sockets throughout the different listener processes. */ struct listener *l = b->listeners[l_id]; if (!listener_remove_socket(l, fd)) { return false; /* couldn't send msg to listener */ } /* Lock hash table and forget whether this FD uses SSL. */ void *old_value = NULL; if (0 != pthread_mutex_lock(&b->fd_set_lock)) { assert(false); } bool rm_ok = yacht_remove(b->fd_set, fd, &old_value); if (0 != pthread_mutex_unlock(&b->fd_set_lock)) { assert(false); } assert(rm_ok); SSL *ssl = (SSL *)old_value; assert(ssl != NULL); if (ssl == BUS_NO_SSL) { return true; /* nothing else to do */ } else { return bus_ssl_disconnect(b, ssl); } } bool bus_schedule_threadpool_task(struct bus *b, struct threadpool_task *task, size_t *backpressure) { return threadpool_schedule(b->threadpool, task, backpressure); Loading src/lib/bus/bus.h +3 −2 Original line number Diff line number Diff line Loading @@ -32,8 +32,6 @@ bool bus_init(bus_config *config, struct bus_result *res); /* Send a request. Blocks until the request has been transmitted. * * TODO: liveness of msg: copy or take ownership? * * Assumes the FD has been registered with bus_register_socket; * sending to an unregistered socket is an error. */ Loading @@ -52,6 +50,9 @@ const char *bus_log_event_str(log_event_t event); * SSL/TLS connection handshake has completed. */ bool bus_register_socket(struct bus *b, bus_socket_t type, int fd, void *socket_udata); /* Free metadata about a socket that has been disconnected. */ bool bus_release_socket(struct bus *b, int fd); /* Begin shutting the system down. Returns true once everything pending * has resolved. */ bool bus_shutdown(struct bus *b); Loading src/lib/bus/bus_internal_types.h +9 −1 Original line number Diff line number Diff line Loading @@ -28,6 +28,7 @@ #include <openssl/err.h> #include "bus.h" #include "yacht.h" /* Struct for a message that will be passed from sender to listener to * threadpool, proceeding directly to the threadpool if there is an error Loading @@ -44,11 +45,14 @@ typedef struct boxed_msg { /* Destination filename and message body. */ int fd; SSL *ssl; /* valid pointer or BUS_BOXED_MSG_NO_SSL */ int64_t out_seq_id; uint8_t *out_msg; size_t out_msg_size; } boxed_msg; #define BUS_NO_SSL ((SSL *)-2) /* Message bus. */ typedef struct bus { bus_sink_cb *sink_cb; Loading @@ -72,6 +76,10 @@ typedef struct bus { struct threadpool *threadpool; SSL_CTX *ssl_ctx; /* Locked hash table for fd -> (SSL * | BUS_NO_SSL) */ struct yacht *fd_set; pthread_mutex_t fd_set_lock; } bus; /* Special timeout value indicating UNBOUND. */ Loading @@ -93,7 +101,7 @@ typedef struct { rx_error_t error; size_t to_read_size; SSL *ssl; /* SSL handle. Can be NULL. */ SSL *ssl; /* SSL handle. Must be valid or BUS_NO_SSL. */ bus_socket_t type; void *udata; /* user connection data */ Loading src/lib/bus/bus_ssl.c +8 −0 Original line number Diff line number Diff line Loading @@ -51,6 +51,14 @@ bool bus_ssl_connect(struct bus *b, connection_info *ci) { } } /* Disconnect and free an individual SSL handle. */ bool bus_ssl_disconnect(struct bus *b, SSL *ssl) { SSL_free(ssl); (void)b; return true; } /* Free all internal data for using SSL. */ void bus_ssl_free(struct bus *b) { if (b && b->ssl_ctx) { SSL_CTX_free(b->ssl_ctx); Loading src/lib/bus/bus_ssl.h +4 −0 Original line number Diff line number Diff line Loading @@ -14,6 +14,10 @@ bool bus_ssl_init(struct bus *b); * Returns whether the connection succeeded. */ bool bus_ssl_connect(struct bus *b, connection_info *ci); /* Disconnect and free an individual SSL handle. */ bool bus_ssl_disconnect(struct bus *b, SSL *ssl); /* Free all internal data for using SSL. */ void bus_ssl_free(struct bus *b); #endif Loading
src/lib/bus/bus.c +87 −8 Original line number Diff line number Diff line Loading @@ -33,6 +33,7 @@ #include "threadpool.h" #include "bus_internal_types.h" #include "bus_ssl.h" #include "yacht.h" /* Function pointers for pthreads. */ void *listener_mainloop(void *arg); Loading @@ -50,6 +51,8 @@ static void set_defaults(bus_config *cfg) { if (cfg->listener_count == 0) { cfg->listener_count = 1; } } #define DEF_FD_SET_SIZE2 4 bool bus_init(bus_config *config, struct bus_result *res) { if (res == NULL) { return false; } if (config == NULL) { Loading Loading @@ -81,6 +84,7 @@ bool bus_init(bus_config *config, struct bus_result *res) { struct threadpool *tp = NULL; bool *j = NULL; pthread_t *pts = NULL; struct yacht *fd_set = NULL; bus *b = calloc(1, sizeof(*b)); if (b == NULL) { goto cleanup; } Loading @@ -98,14 +102,16 @@ bool bus_init(bus_config *config, struct bus_result *res) { res->status = BUS_INIT_ERROR_MUTEX_INIT_FAIL; goto cleanup; } if (0 != pthread_mutex_init(&b->fd_set_lock, NULL)) { res->status = BUS_INIT_ERROR_MUTEX_INIT_FAIL; goto cleanup; } log_lock_init = true; BUS_LOG_SNPRINTF(b, 3, LOG_INITIALIZATION, b->udata, 64, "Initialized bus at %p", b); pthread_mutex_lock(&b->log_lock); pthread_mutex_unlock(&b->log_lock); ss = calloc(config->sender_count, sizeof(*ss)); if (ss == NULL) { goto cleanup; Loading @@ -126,6 +132,7 @@ bool bus_init(bus_config *config, struct bus_result *res) { if (ls == NULL) { goto cleanup; } for (int i = 0; i < config->listener_count; i++) { ls[i] = listener_init(b, config); if (ls[i] == NULL) { Loading @@ -150,6 +157,11 @@ bool bus_init(bus_config *config, struct bus_result *res) { goto cleanup; } fd_set = yacht_init(DEF_FD_SET_SIZE2); if (fd_set == NULL) { goto cleanup; } b->sender_count = config->sender_count; b->senders = ss; b->listener_count = config->listener_count; Loading @@ -176,6 +188,7 @@ bool bus_init(bus_config *config, struct bus_result *res) { } } b->fd_set = fd_set; res->bus = b; BUS_LOG(b, 1, LOG_INITIALIZATION, "initialized", config->bus_udata); return true; Loading @@ -196,12 +209,14 @@ cleanup: if (j) { free(j); } if (b) { if (log_lock_init) { pthread_mutex_destroy(&b->fd_set_lock); pthread_mutex_destroy(&b->log_lock); } free(b); } if (pts) { free(pts); } if (fd_set) { yacht_free(fd_set, NULL, NULL); } return false; } Loading @@ -219,9 +234,29 @@ static boxed_msg *box_msg(struct bus *b, bus_user_msg *msg) { box->fd = msg->fd; assert(msg->fd != 0); /* Lock hash table and check whether this FD uses SSL. */ if (0 != pthread_mutex_lock(&b->fd_set_lock)) { assert(false); } void *value = NULL; SSL *ssl = NULL; if (yacht_get(b->fd_set, box->fd, &value)) { ssl = (SSL *)value; assert(ssl != NULL); box->ssl = ssl; } if (0 != pthread_mutex_unlock(&b->fd_set_lock)) { assert(false); } if (ssl == NULL) { /* socket isn't registered, fail out */ free(box); return NULL; } box->out_seq_id = msg->seq_id; box->out_msg_size = msg->msg_size; /* FIXME: should this be copied by pointer or value? */ /* Store message by pointer, since the client thread using it is blocked * until we are done sending. */ box->out_msg = msg->msg; box->cb = msg->cb; Loading Loading @@ -375,10 +410,22 @@ bool bus_register_socket(struct bus *b, bus_socket_t type, int fd, void *udata) ci->udata = udata; if (type == BUS_SOCKET_SSL) { if (!bus_ssl_connect(b, ci)) { free(ci); return false; if (!bus_ssl_connect(b, ci)) { goto cleanup; } } else { ci->ssl = BUS_NO_SSL; } void *old_value = NULL; /* Lock hash table and save whether this FD uses SSL. */ if (0 != pthread_mutex_lock(&b->fd_set_lock)) { assert(false); } bool set_ok = yacht_set(b->fd_set, fd, (void *)ci->ssl, &old_value); if (0 != pthread_mutex_unlock(&b->fd_set_lock)) { assert(false); } if (set_ok) { assert(old_value == NULL); } else { goto cleanup; } bool res = listener_add_socket(l, ci, pipe_in); Loading @@ -403,6 +450,38 @@ cleanup: return false; } /* Free metadata about a socket that has been disconnected. */ bool bus_release_socket(struct bus *b, int fd) { /* Register a socket internally with a listener. */ int l_id = listener_id_of_socket(b, fd); BUS_LOG_SNPRINTF(b, 2, LOG_SOCKET_REGISTERED, b->udata, 64, "forgetting socket %d", fd); /* Spread sockets throughout the different listener processes. */ struct listener *l = b->listeners[l_id]; if (!listener_remove_socket(l, fd)) { return false; /* couldn't send msg to listener */ } /* Lock hash table and forget whether this FD uses SSL. */ void *old_value = NULL; if (0 != pthread_mutex_lock(&b->fd_set_lock)) { assert(false); } bool rm_ok = yacht_remove(b->fd_set, fd, &old_value); if (0 != pthread_mutex_unlock(&b->fd_set_lock)) { assert(false); } assert(rm_ok); SSL *ssl = (SSL *)old_value; assert(ssl != NULL); if (ssl == BUS_NO_SSL) { return true; /* nothing else to do */ } else { return bus_ssl_disconnect(b, ssl); } } bool bus_schedule_threadpool_task(struct bus *b, struct threadpool_task *task, size_t *backpressure) { return threadpool_schedule(b->threadpool, task, backpressure); Loading
src/lib/bus/bus.h +3 −2 Original line number Diff line number Diff line Loading @@ -32,8 +32,6 @@ bool bus_init(bus_config *config, struct bus_result *res); /* Send a request. Blocks until the request has been transmitted. * * TODO: liveness of msg: copy or take ownership? * * Assumes the FD has been registered with bus_register_socket; * sending to an unregistered socket is an error. */ Loading @@ -52,6 +50,9 @@ const char *bus_log_event_str(log_event_t event); * SSL/TLS connection handshake has completed. */ bool bus_register_socket(struct bus *b, bus_socket_t type, int fd, void *socket_udata); /* Free metadata about a socket that has been disconnected. */ bool bus_release_socket(struct bus *b, int fd); /* Begin shutting the system down. Returns true once everything pending * has resolved. */ bool bus_shutdown(struct bus *b); Loading
src/lib/bus/bus_internal_types.h +9 −1 Original line number Diff line number Diff line Loading @@ -28,6 +28,7 @@ #include <openssl/err.h> #include "bus.h" #include "yacht.h" /* Struct for a message that will be passed from sender to listener to * threadpool, proceeding directly to the threadpool if there is an error Loading @@ -44,11 +45,14 @@ typedef struct boxed_msg { /* Destination filename and message body. */ int fd; SSL *ssl; /* valid pointer or BUS_BOXED_MSG_NO_SSL */ int64_t out_seq_id; uint8_t *out_msg; size_t out_msg_size; } boxed_msg; #define BUS_NO_SSL ((SSL *)-2) /* Message bus. */ typedef struct bus { bus_sink_cb *sink_cb; Loading @@ -72,6 +76,10 @@ typedef struct bus { struct threadpool *threadpool; SSL_CTX *ssl_ctx; /* Locked hash table for fd -> (SSL * | BUS_NO_SSL) */ struct yacht *fd_set; pthread_mutex_t fd_set_lock; } bus; /* Special timeout value indicating UNBOUND. */ Loading @@ -93,7 +101,7 @@ typedef struct { rx_error_t error; size_t to_read_size; SSL *ssl; /* SSL handle. Can be NULL. */ SSL *ssl; /* SSL handle. Must be valid or BUS_NO_SSL. */ bus_socket_t type; void *udata; /* user connection data */ Loading
src/lib/bus/bus_ssl.c +8 −0 Original line number Diff line number Diff line Loading @@ -51,6 +51,14 @@ bool bus_ssl_connect(struct bus *b, connection_info *ci) { } } /* Disconnect and free an individual SSL handle. */ bool bus_ssl_disconnect(struct bus *b, SSL *ssl) { SSL_free(ssl); (void)b; return true; } /* Free all internal data for using SSL. */ void bus_ssl_free(struct bus *b) { if (b && b->ssl_ctx) { SSL_CTX_free(b->ssl_ctx); Loading
src/lib/bus/bus_ssl.h +4 −0 Original line number Diff line number Diff line Loading @@ -14,6 +14,10 @@ bool bus_ssl_init(struct bus *b); * Returns whether the connection succeeded. */ bool bus_ssl_connect(struct bus *b, connection_info *ci); /* Disconnect and free an individual SSL handle. */ bool bus_ssl_disconnect(struct bus *b, SSL *ssl); /* Free all internal data for using SSL. */ void bus_ssl_free(struct bus *b); #endif