Commit 11493002 authored by Scott Vokes's avatar Scott Vokes
Browse files

Add error handler callback for unpacked, invalid user messages.

parent 5eb46750
Loading
Loading
Loading
Loading
+25 −0
Original line number Diff line number Diff line
@@ -25,6 +25,7 @@
#include <poll.h>
#include <errno.h>
#include <assert.h>
#include <limits.h>

#include "bus.h"
#include "sender.h"
@@ -39,6 +40,9 @@ void *sender_mainloop(void *arg);
static bool poll_on_completion(struct bus *b, int fd);
static int sender_id_of_socket(struct bus *b, int fd);
static int listener_id_of_socket(struct bus *b, int fd);
static void noop_log_cb(log_event_t event,
        int log_level, const char *msg, void *udata);
static void noop_error_cb(bus_unpack_cb_res_t result, void *socket_udata);

static void set_defaults(bus_config *cfg) {
    if (cfg->sender_count == 0) { cfg->sender_count = 1; }
@@ -60,6 +64,13 @@ bool bus_init(bus_config *config, struct bus_result *res) {
        res->status = BUS_INIT_ERROR_MISSING_UNPACK_CB;
        return false;
    }
    if (config->log_cb == NULL) {
        config->log_cb = noop_log_cb;
        config->log_level = INT_MIN;
    }
    if (config->error_cb == NULL) {
        config->error_cb = noop_error_cb;
    }

    res->status = BUS_INIT_ERROR_ALLOC_FAIL;

@@ -76,6 +87,7 @@ bool bus_init(bus_config *config, struct bus_result *res) {
    b->sink_cb = config->sink_cb;
    b->unpack_cb = config->unpack_cb;
    b->unexpected_msg_cb = config->unexpected_msg_cb;
    b->error_cb = config->error_cb;
    b->log_cb = config->log_cb;
    b->log_level = config->log_level;
    b->udata = config->bus_udata;
@@ -484,3 +496,16 @@ void bus_free(bus *b) {

    free(b);
}

static void noop_log_cb(log_event_t event,
        int log_level, const char *msg, void *udata) {
    (void)event;
    (void)log_level;
    (void)msg;
    (void)udata;
}

static void noop_error_cb(bus_unpack_cb_res_t result, void *socket_udata) {
    (void)result;
    (void)socket_udata;
}
+1 −0
Original line number Diff line number Diff line
@@ -51,6 +51,7 @@ typedef struct bus {
    bus_sink_cb *sink_cb;
    bus_unpack_cb *unpack_cb;
    bus_unexpected_msg_cb *unexpected_msg_cb;
    bus_error_cb *error_cb;
    void *udata;

    int log_level;
+6 −2
Original line number Diff line number Diff line
@@ -39,7 +39,7 @@ struct boxed_msg;
        log_event_t event_key = EVENT_KEY;                             \
        char *msg = MSG;                                               \
        void *udata = UDATA;                                           \
        if (_b->log_level >= level) {                                  \
        if (_b->log_level >= level && _b->log_cb != NULL) {            \
            bus_lock_log(_b);                                          \
            _b->log_cb(event_key, level, msg, udata);                  \
            bus_unlock_log(_b);                                        \
@@ -54,7 +54,7 @@ struct boxed_msg;
        int level = LEVEL;                                             \
        log_event_t event_key = EVENT_KEY;                             \
        void *udata = UDATA;                                           \
        if (_b->log_level >= level) {                                  \
        if (_b->log_level >= level && _b->log_cb != NULL) {            \
            bus_lock_log(_b);                                          \
            char log_buf[MAX_SZ];                                      \
            if (MAX_SZ < snprintf(log_buf, MAX_SZ,                     \
@@ -128,6 +128,9 @@ typedef struct {
 * Note that the udata pointer is socket-specific, NOT client-specific. */
typedef bus_unpack_cb_res_t (bus_unpack_cb)(void *msg, void *socket_udata);

/* Handle a result from bus_unpack_cb that is marked as an error. */
typedef void (bus_error_cb)(bus_unpack_cb_res_t result, void *socket_udata);

/* Process a message that was successfully unpacked, but does not have
 * an expected sequence ID. This is likely a keepalive message, status
 * message, etc. */
@@ -145,6 +148,7 @@ typedef struct bus_config {
    bus_sink_cb *sink_cb;       /* required */
    bus_unpack_cb *unpack_cb;   /* required */
    bus_unexpected_msg_cb *unexpected_msg_cb;
    bus_error_cb *error_cb;

    int log_level;
    bus_log_cb *log_cb;         /* optional */
+5 −2
Original line number Diff line number Diff line
@@ -394,7 +394,7 @@ static rx_info_t *find_info_by_sequence_id(listener *l,
static void process_unpacked_message(listener *l,
        connection_info *ci, bus_unpack_cb_res_t result) {
    struct bus *b = l->bus;
    size_t backpressure;
    size_t backpressure = 0;

    /* NOTE: message may be an unsolicited status message */

@@ -451,7 +451,10 @@ static void process_unpacked_message(listener *l,
            "Got opaque_error_id of %lu (0x%08lx)",
            e_id, e_id);

        /* Timeouts will clean up after it, and user code already knows there was an error. */
        /* Timeouts will clean up after it; give user code a chance to
         * clean up after it here, though technically speaking they
         * could in the unpack_cb above. */
        b->error_cb(result, ci->udata);
    }
};