Loading src/lib/bus/atomic.h +4 −0 Original line number Diff line number Diff line Loading @@ -20,6 +20,10 @@ #ifndef ATOMIC_H #define ATOMIC_H /* Compiler intrinsic for an atomic compare-and-swap. * Returns true if *PTR has been atomically changed from * OLD to NEW, or false if OLD is no longer the current value * (typically, because another thread changed it first). */ #define ATOMIC_BOOL_COMPARE_AND_SWAP(PTR, OLD, NEW) \ (__sync_bool_compare_and_swap(PTR, OLD, NEW)) Loading src/lib/bus/bus.h +9 −9 Original line number Diff line number Diff line Loading @@ -22,26 +22,26 @@ #include "bus_types.h" /* Initialize a bus, based on configuration in *config. RetuBus_RegisterSockets a bool /** Initialize a bus, based on configuration in *config. Returns a bool * indicating whether the construction succeeded, and the bus pointer * and/or a status code indicating the cause of failure in *res. */ bool Bus_Init(bus_config *config, struct bus_result *res); /* Send a request. Blocks until the request has been transmitted. /** Send a request. Blocks until the request has been transmitted. * * Assumes the FD has been registered with Bus_register_socket; * sending to an unregistered socket is an error. * * RetuBus_RegisterSockets true if the request has been accepted and the bus will * Returns true if the request has been accepted and the bus will * attempt to handle the request and response. They can still fail, * but the error status will be passed to the result handler callback. * * RetuBus_RegisterSockets false if the request has been rejected, due to a memory * Returns false if the request has been rejected, due to a memory * allocation error or invalid arguments. * */ bool Bus_SendRequest(struct bus *b, bus_user_msg *msg); /* Register a socket connected to an endpoint, and data that will be passed /** Register a socket connected to an endpoint, and data that will be passed * to all interactions on that socket. * * The socket will have request -> response messages with timeouts, as Loading @@ -51,17 +51,17 @@ bool Bus_SendRequest(struct bus *b, bus_user_msg *msg); * SSL/TLS connection handshake has completed. */ bool Bus_RegisterSocket(struct bus *b, bus_socket_t type, int fd, void *socket_udata); /* Free metadata about a socket that has been disconnected. */ /** Free metadata about a socket that has been disconnected. */ bool Bus_ReleaseSocket(struct bus *b, int fd, void **socket_udata_out); /* Begin shutting the system down. RetuBus_RegisterSockets true once everything pending /** Begin shutting the system down. Returns true once everything pending * has resolved. */ bool Bus_Shutdown(struct bus *b); /* Free internal data structures for the bus. */ /** Free internal data structures for the bus. */ void Bus_Free(struct bus *b); /* Inward facing portion of the message bus -- functions called /** Inward facing portion of the message bus -- functions called * by other parts of the message bus, like the Listener thread, * but not by code outside the bus. */ #include "bus_inward.h" Loading src/lib/bus/bus_internal_types.h +30 −30 Original line number Diff line number Diff line Loading @@ -35,31 +35,31 @@ * along the way. This must only have a single owner at a time. */ typedef struct boxed_msg { /* Result message, constructed in place after the request/response cycle /** Result message, constructed in place after the request/response cycle * has completed or failed due to timeout / unrecoverable error. */ bus_msg_result_t result; /* Message send timeout. */ /** Message send timeout. */ time_t timeout_sec; /* Callback and userdata to which the bus_msg_result_t above will be sunk. */ /** Callback and userdata to which the bus_msg_result_t above will be sunk. */ bus_msg_cb *cb; void *udata; /* Event timestamps to track timeouts. */ /** Event timestamps to track timeouts. */ struct timeval tv_send_start; struct timeval tv_send_done; /* Destination filename and message body. */ /** Destination filename and message body. */ int fd; SSL *ssl; /* valid pointer or BUS_BOXED_MSG_NO_SSL */ SSL *ssl; ///< valid pointer or BUS_BOXED_MSG_NO_SSL int64_t out_seq_id; uint8_t *out_msg; size_t out_msg_size; size_t out_sent_size; } boxed_msg; // Special "NO SSL" value, to distinguish from a NULL SSL handle. /** Special "NO SSL" value, to distinguish from a NULL SSL handle. */ #define BUS_NO_SSL ((SSL *)-2) typedef enum { Loading @@ -68,33 +68,33 @@ typedef enum { SHUTDOWN_STATE_HALTED, } shutdown_state_t; /* Message bus. */ /** Message bus. */ typedef struct bus { bus_sink_cb *sink_cb; bus_unpack_cb *unpack_cb; bus_unexpected_msg_cb *unexpected_msg_cb; bus_error_cb *error_cb; void *udata; bus_sink_cb *sink_cb; ///< IO sink callback bus_unpack_cb *unpack_cb; ///< Message unpacking callback bus_unexpected_msg_cb *unexpected_msg_cb; //< Unexpected message callback bus_error_cb *error_cb; ///< Error handling callback void *udata; ///< User data for callbacks int log_level; bus_log_cb *log_cb; int log_level; ///< Log level bus_log_cb *log_cb; ///< Logging callback uint8_t listener_count; struct listener **listeners; uint8_t listener_count; ///< Number of listeners struct listener **listeners; ///< Listener array bool *joined; pthread_t *threads; shutdown_state_t shutdown_state; bool *joined; ///< Which threads have joined pthread_t *threads; ///< Threads shutdown_state_t shutdown_state; ///< Current shutdown state struct threadpool *threadpool; SSL_CTX *ssl_ctx; struct threadpool *threadpool; ///< Thread pool SSL_CTX *ssl_ctx; ///< SSL context /* Locked hash table for fd -> connection_info */ /** Locked hash table for fd -> connection_info */ struct yacht *fd_set; pthread_mutex_t fd_set_lock; } bus; /* Special timeout value indicating UNBOUND. */ /** Special timeout value indicating UNBOUND. */ #define TIMEOUT_NOT_YET_SET ((time_t)(-1)) typedef enum { Loading @@ -107,17 +107,17 @@ typedef enum { RX_ERROR_TIMEOUT = -34, } rx_error_t; /* Per-socket connection context. (Owned by the listener.) */ /** Per-socket connection context. (Owned by the listener.) */ typedef struct { /* Shared */ const int fd; const bus_socket_t type; void *udata; /* user connection data */ void *udata; ///< user connection data /* Shared, cleaned up by client */ SSL *ssl; /* SSL handle. Must be valid or BUS_NO_SSL. */ SSL *ssl; ///< SSL handle. Must be valid or BUS_NO_SSL. /* Set by client thread */ /** Set by client thread. Monotonically increasing max sequence ID. */ int64_t largest_wr_seq_id_seen; /* Set by listener thread */ Loading @@ -125,10 +125,10 @@ typedef struct { size_t to_read_size; } connection_info; /* Arbitrary byte used to tag writes from the listener. */ /** Arbitrary byte used to tag writes from the listener. */ #define LISTENER_MSG_TAG 0x15 /* Starting size^2 for file descriptor hash table. */ /** Starting size^2 for file descriptor hash table. */ #define DEF_FD_SET_SIZE2 4 #endif src/lib/bus/bus_inward.h +4 −4 Original line number Diff line number Diff line Loading @@ -22,18 +22,18 @@ #include "bus_types.h" /* Get the string key for a log event ID. */ /** Get the string key for a log event ID. */ const char *Bus_LogEventStr(log_event_t event); /* For a given file descriptor, get the listener ID to use. /** For a given file descriptor, get the listener ID to use. * This will level sockets between multiple threads. */ struct listener *Bus_GetListenerForSocket(struct bus *b, int fd); /* Deliver a boxed message to the thread pool to execute. */ /** Deliver a boxed message to the thread pool to execute. */ bool Bus_ProcessBoxedMessage(struct bus *b, struct boxed_msg *box, size_t *backpressure); /* Provide backpressure by sleeping for (backpressure >> shift) msec, if /** Provide backpressure by sleeping for (backpressure >> shift) msec, if * the value is greater than 0. */ void Bus_BackpressureDelay(struct bus *b, size_t backpressure, uint8_t shift); Loading src/lib/bus/bus_poll.h +1 −1 Original line number Diff line number Diff line Loading @@ -23,7 +23,7 @@ #include <stdbool.h> #include "bus_types.h" /* Poll on fd until complete, return true on success or false on IO /** Poll on fd until complete, return true on success or false on IO * error. (This is mostly in a distinct module to add a testing seam.) */ bool BusPoll_OnCompletion(struct bus *b, int fd); Loading Loading
src/lib/bus/atomic.h +4 −0 Original line number Diff line number Diff line Loading @@ -20,6 +20,10 @@ #ifndef ATOMIC_H #define ATOMIC_H /* Compiler intrinsic for an atomic compare-and-swap. * Returns true if *PTR has been atomically changed from * OLD to NEW, or false if OLD is no longer the current value * (typically, because another thread changed it first). */ #define ATOMIC_BOOL_COMPARE_AND_SWAP(PTR, OLD, NEW) \ (__sync_bool_compare_and_swap(PTR, OLD, NEW)) Loading
src/lib/bus/bus.h +9 −9 Original line number Diff line number Diff line Loading @@ -22,26 +22,26 @@ #include "bus_types.h" /* Initialize a bus, based on configuration in *config. RetuBus_RegisterSockets a bool /** Initialize a bus, based on configuration in *config. Returns a bool * indicating whether the construction succeeded, and the bus pointer * and/or a status code indicating the cause of failure in *res. */ bool Bus_Init(bus_config *config, struct bus_result *res); /* Send a request. Blocks until the request has been transmitted. /** Send a request. Blocks until the request has been transmitted. * * Assumes the FD has been registered with Bus_register_socket; * sending to an unregistered socket is an error. * * RetuBus_RegisterSockets true if the request has been accepted and the bus will * Returns true if the request has been accepted and the bus will * attempt to handle the request and response. They can still fail, * but the error status will be passed to the result handler callback. * * RetuBus_RegisterSockets false if the request has been rejected, due to a memory * Returns false if the request has been rejected, due to a memory * allocation error or invalid arguments. * */ bool Bus_SendRequest(struct bus *b, bus_user_msg *msg); /* Register a socket connected to an endpoint, and data that will be passed /** Register a socket connected to an endpoint, and data that will be passed * to all interactions on that socket. * * The socket will have request -> response messages with timeouts, as Loading @@ -51,17 +51,17 @@ bool Bus_SendRequest(struct bus *b, bus_user_msg *msg); * SSL/TLS connection handshake has completed. */ bool Bus_RegisterSocket(struct bus *b, bus_socket_t type, int fd, void *socket_udata); /* Free metadata about a socket that has been disconnected. */ /** Free metadata about a socket that has been disconnected. */ bool Bus_ReleaseSocket(struct bus *b, int fd, void **socket_udata_out); /* Begin shutting the system down. RetuBus_RegisterSockets true once everything pending /** Begin shutting the system down. Returns true once everything pending * has resolved. */ bool Bus_Shutdown(struct bus *b); /* Free internal data structures for the bus. */ /** Free internal data structures for the bus. */ void Bus_Free(struct bus *b); /* Inward facing portion of the message bus -- functions called /** Inward facing portion of the message bus -- functions called * by other parts of the message bus, like the Listener thread, * but not by code outside the bus. */ #include "bus_inward.h" Loading
src/lib/bus/bus_internal_types.h +30 −30 Original line number Diff line number Diff line Loading @@ -35,31 +35,31 @@ * along the way. This must only have a single owner at a time. */ typedef struct boxed_msg { /* Result message, constructed in place after the request/response cycle /** Result message, constructed in place after the request/response cycle * has completed or failed due to timeout / unrecoverable error. */ bus_msg_result_t result; /* Message send timeout. */ /** Message send timeout. */ time_t timeout_sec; /* Callback and userdata to which the bus_msg_result_t above will be sunk. */ /** Callback and userdata to which the bus_msg_result_t above will be sunk. */ bus_msg_cb *cb; void *udata; /* Event timestamps to track timeouts. */ /** Event timestamps to track timeouts. */ struct timeval tv_send_start; struct timeval tv_send_done; /* Destination filename and message body. */ /** Destination filename and message body. */ int fd; SSL *ssl; /* valid pointer or BUS_BOXED_MSG_NO_SSL */ SSL *ssl; ///< valid pointer or BUS_BOXED_MSG_NO_SSL int64_t out_seq_id; uint8_t *out_msg; size_t out_msg_size; size_t out_sent_size; } boxed_msg; // Special "NO SSL" value, to distinguish from a NULL SSL handle. /** Special "NO SSL" value, to distinguish from a NULL SSL handle. */ #define BUS_NO_SSL ((SSL *)-2) typedef enum { Loading @@ -68,33 +68,33 @@ typedef enum { SHUTDOWN_STATE_HALTED, } shutdown_state_t; /* Message bus. */ /** Message bus. */ typedef struct bus { bus_sink_cb *sink_cb; bus_unpack_cb *unpack_cb; bus_unexpected_msg_cb *unexpected_msg_cb; bus_error_cb *error_cb; void *udata; bus_sink_cb *sink_cb; ///< IO sink callback bus_unpack_cb *unpack_cb; ///< Message unpacking callback bus_unexpected_msg_cb *unexpected_msg_cb; //< Unexpected message callback bus_error_cb *error_cb; ///< Error handling callback void *udata; ///< User data for callbacks int log_level; bus_log_cb *log_cb; int log_level; ///< Log level bus_log_cb *log_cb; ///< Logging callback uint8_t listener_count; struct listener **listeners; uint8_t listener_count; ///< Number of listeners struct listener **listeners; ///< Listener array bool *joined; pthread_t *threads; shutdown_state_t shutdown_state; bool *joined; ///< Which threads have joined pthread_t *threads; ///< Threads shutdown_state_t shutdown_state; ///< Current shutdown state struct threadpool *threadpool; SSL_CTX *ssl_ctx; struct threadpool *threadpool; ///< Thread pool SSL_CTX *ssl_ctx; ///< SSL context /* Locked hash table for fd -> connection_info */ /** Locked hash table for fd -> connection_info */ struct yacht *fd_set; pthread_mutex_t fd_set_lock; } bus; /* Special timeout value indicating UNBOUND. */ /** Special timeout value indicating UNBOUND. */ #define TIMEOUT_NOT_YET_SET ((time_t)(-1)) typedef enum { Loading @@ -107,17 +107,17 @@ typedef enum { RX_ERROR_TIMEOUT = -34, } rx_error_t; /* Per-socket connection context. (Owned by the listener.) */ /** Per-socket connection context. (Owned by the listener.) */ typedef struct { /* Shared */ const int fd; const bus_socket_t type; void *udata; /* user connection data */ void *udata; ///< user connection data /* Shared, cleaned up by client */ SSL *ssl; /* SSL handle. Must be valid or BUS_NO_SSL. */ SSL *ssl; ///< SSL handle. Must be valid or BUS_NO_SSL. /* Set by client thread */ /** Set by client thread. Monotonically increasing max sequence ID. */ int64_t largest_wr_seq_id_seen; /* Set by listener thread */ Loading @@ -125,10 +125,10 @@ typedef struct { size_t to_read_size; } connection_info; /* Arbitrary byte used to tag writes from the listener. */ /** Arbitrary byte used to tag writes from the listener. */ #define LISTENER_MSG_TAG 0x15 /* Starting size^2 for file descriptor hash table. */ /** Starting size^2 for file descriptor hash table. */ #define DEF_FD_SET_SIZE2 4 #endif
src/lib/bus/bus_inward.h +4 −4 Original line number Diff line number Diff line Loading @@ -22,18 +22,18 @@ #include "bus_types.h" /* Get the string key for a log event ID. */ /** Get the string key for a log event ID. */ const char *Bus_LogEventStr(log_event_t event); /* For a given file descriptor, get the listener ID to use. /** For a given file descriptor, get the listener ID to use. * This will level sockets between multiple threads. */ struct listener *Bus_GetListenerForSocket(struct bus *b, int fd); /* Deliver a boxed message to the thread pool to execute. */ /** Deliver a boxed message to the thread pool to execute. */ bool Bus_ProcessBoxedMessage(struct bus *b, struct boxed_msg *box, size_t *backpressure); /* Provide backpressure by sleeping for (backpressure >> shift) msec, if /** Provide backpressure by sleeping for (backpressure >> shift) msec, if * the value is greater than 0. */ void Bus_BackpressureDelay(struct bus *b, size_t backpressure, uint8_t shift); Loading
src/lib/bus/bus_poll.h +1 −1 Original line number Diff line number Diff line Loading @@ -23,7 +23,7 @@ #include <stdbool.h> #include "bus_types.h" /* Poll on fd until complete, return true on success or false on IO /** Poll on fd until complete, return true on success or false on IO * error. (This is mostly in a distinct module to add a testing seam.) */ bool BusPoll_OnCompletion(struct bus *b, int fd); Loading