Loading src/lib/bus/bus_types.h +1 −0 Original line number Diff line number Diff line Loading @@ -184,6 +184,7 @@ typedef enum { BUS_SEND_RX_FAILURE = -54, BUS_SEND_BAD_RESPONSE = -55, BUS_SEND_UNREGISTERED_SOCKET = -56, BUS_SEND_RX_TIMEOUT_EXPECT = -57, } bus_send_status_t; /* Result from attempting to configure a message bus. */ Loading src/lib/bus/listener.c +40 −6 Original line number Diff line number Diff line Loading @@ -881,6 +881,15 @@ static rx_info_t *get_free_rx_info(struct listener *l) { } } static connection_info *get_connection_info(struct listener *l, int fd) { for (int i = 0; i < l->tracked_fds; i++) { connection_info *ci = l->fd_info[i]; assert(ci); if (ci->fd == fd) { return ci; } } return NULL; } static void release_rx_info(struct listener *l, rx_info_t *info) { assert(info); struct bus *b = l->bus; Loading @@ -896,9 +905,22 @@ static void release_rx_info(struct listener *l, rx_info_t *info) { * free it, but don't know how. We should never get here, * because it means the sender finished sending the message, * but the listener never got the handler callback. */ if (info->u.hold.result.ok) { void *msg = info->u.hold.result.u.success.msg; int64_t seq_id = info->u.hold.result.u.success.seq_id; connection_info *ci = get_connection_info(l, info->u.hold.fd); if (ci && b->unexpected_msg_cb) { BUS_LOG_SNPRINTF(b, 1, LOG_LISTENER, b->udata, 128, "CALLING UNEXPECTED_MSG_CB ON RESULT %p", (void *)&info->u.hold.result); b->unexpected_msg_cb(msg, seq_id, b->udata, ci->udata); } else { BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 128, "LEAKING RESULT %p", (void *)&info->u.hold.result); //assert(false); } } } break; case RIS_EXPECT: Loading Loading @@ -1266,12 +1288,23 @@ static void expect_response(listener *l, struct boxed_msg *box) { info->timeout_sec = box->timeout_sec; } } else { /* use free info */ /* If we get here, the listener thinks the HOLD message timed out, * but the sender doesn't think things timed out badly enough to * itself expose an error. We also don't know if we're going to * get a response or not. */ BUS_LOG_SNPRINTF(b, 3-3, LOG_MEMORY, b->udata, 128, "get_hold_rx_info FAILED: fd %d, seq_id %lld", box->fd, (long long)box->out_seq_id); /* This should be treated like a send timeout. */ info = get_free_rx_info(l); assert(info); assert(info->state == RIS_INACTIVE); BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "Setting info %p (+%d)'s box to %p", (void*)info, info->id, (void*)box); BUS_LOG_SNPRINTF(b, 3-3, LOG_MEMORY, b->udata, 256, "Setting info %p (+%d)'s box to %p, which will be expired immediately (timeout %lld)", (void*)info, info->id, (void*)box, (long long)box->timeout_sec); info->state = RIS_EXPECT; assert(info->u.expect.box == NULL); Loading @@ -1280,6 +1313,7 @@ static void expect_response(listener *l, struct boxed_msg *box) { info->u.expect.has_result = false; /* Switch over to sender's transferred timeout */ info->timeout_sec = box->timeout_sec; notify_message_failure(l, info, BUS_SEND_RX_TIMEOUT_EXPECT); } } Loading src/lib/bus/sender.c +9 −18 Original line number Diff line number Diff line Loading @@ -593,9 +593,15 @@ static void enqueue_write(struct sender *s, tx_info_t *info) { * because (in rare cases) the response may arrive between finishing * the write and the listener processing the notification. In that * case, it should hold onto the unrecognized response until the * sender notifies it (and passes it the callback). */ * sender notifies it (and passes it the callback). * * This timeout is several extra seconds so that we don't have * a window where the HOLD message has timed out, but the * EXPECT hasn't, leading to ambiguity about what to do with * the response (which may or may not have arrived). * */ attempt_to_enqueue_sending_request_message_to_listener(s, fd, out_seq_id, info->u.enqueue.timeout_sec + 1); fd, out_seq_id, info->u.enqueue.timeout_sec + 5); /* Increment the refcount. This will cause poll to watch for the * socket being writable, if it isn't already being watched. */ Loading Loading @@ -898,7 +904,6 @@ static void update_sent(struct bus *b, sender *s, tx_info_t *info, ssize_t sent) .fd = info->u.write.fd, .timeout_sec = info->u.write.timeout_sec, .box = info->u.write.box, .retries = info->u.write.retries, }; /* Message has been sent, so release - caller may free it. */ Loading Loading @@ -1095,26 +1100,12 @@ static void attempt_to_enqueue_request_sent_message_to_listener(sender *s, tx_in info->state = TIS_REQUEST_RELEASE; info->u.release.backpressure = backpressure; notify_caller(s, info, true); notify_caller(s, info, true); // unblock sender } else { BUS_LOG(b, 2, LOG_SENDER, "failed delivery", b->udata); /* Put it back, since we need to keep managing it */ info->u.notify.box = box; info->u.notify.retries++; if (info->u.notify.retries == SENDER_MAX_DELIVERY_RETRIES) { info->state = TIS_ERROR; struct u_error ue = { .error = TX_ERROR_NOTIFY_LISTENER_FAILURE, .box = box, }; info->u.error = ue; notify_message_failure(s, info, BUS_SEND_RX_FAILURE); BUS_LOG(b, 2, LOG_SENDER, "failed delivery, several retries", b->udata); } } } Loading src/lib/bus/sender_internal.h +2 −2 Original line number Diff line number Diff line Loading @@ -25,7 +25,9 @@ #define MAX_TIMEOUT 100 #define TX_TIMEOUT 9 #if 0 #define SENDER_MAX_DELIVERY_RETRIES 5 #endif /* Max number of in-flight messagse. */ #define MAX_CONCURRENT_SENDS 32 Loading Loading @@ -103,7 +105,6 @@ typedef struct { time_t timeout_sec; boxed_msg *box; size_t sent_size; uint8_t retries; fd_info *fdi; } write; Loading @@ -111,7 +112,6 @@ typedef struct { int fd; time_t timeout_sec; boxed_msg *box; uint8_t retries; } notify; struct { Loading src/lib/kinetic_controller.c +16 −8 Original line number Diff line number Diff line Loading @@ -145,6 +145,9 @@ KineticStatus bus_to_kinetic_status(bus_send_status_t const status) case BUS_SEND_UNREGISTERED_SOCKET: res = KINETIC_STATUS_SOCKET_ERROR; break; case BUS_SEND_RX_TIMEOUT_EXPECT: res = KINETIC_STATUS_OPERATION_TIMEDOUT; break; case BUS_SEND_UNDEFINED: default: { Loading Loading @@ -176,10 +179,14 @@ static const char *bus_error_string(bus_send_status_t t) { return "rx_failure"; case BUS_SEND_BAD_RESPONSE: return "bad_response"; case BUS_SEND_UNREGISTERED_SOCKET: return "unregistered socket"; case BUS_SEND_RX_TIMEOUT_EXPECT: return "internal timeout"; } } void KineticController_HandleUnexecpectedResponse(void *msg, void KineticController_HandleUnexpectedResponse(void *msg, int64_t seq_id, void *bus_udata, void *socket_udata) Loading @@ -192,9 +199,10 @@ void KineticController_HandleUnexecpectedResponse(void *msg, (void)bus_udata; LOGF2("[PDU RX UNSOLICITED] pdu: 0x%0llX, session: 0x%llX, bus: 0x%llX, " "fd: %6d, protoLen: %u, valueLen: %u", "fd: %6d, protoLen: %u, valueLen: %u, seq_id: %08llx", response, connection->pSession, connection->messageBus, connection->socket, response->header.protobufLength, response->header.valueLength); response->header.protobufLength, response->header.valueLength, (long long)seq_id); // Handle unsolicited status PDUs if (response->proto->authType == KINETIC_PROTO_MESSAGE_AUTH_TYPE_UNSOLICITEDSTATUS) { Loading Loading @@ -226,7 +234,7 @@ void KineticController_HandleUnexecpectedResponse(void *msg, } } void KineticController_HandleExpectedResponse(bus_msg_result_t *res, void *udata) void KineticController_HandleResult(bus_msg_result_t *res, void *udata) { KineticOperation* op = udata; Loading Loading @@ -257,7 +265,7 @@ void KineticController_HandleExpectedResponse(bus_msg_result_t *res, void *udata else { // pull out bus error? LOGF1("Error receiving response, got message bus error: %s", bus_error_string(res->status)); LOGF0("Error receiving response, got message bus error: %s", bus_error_string(res->status)); } // Call operation-specific callback, if configured Loading Loading
src/lib/bus/bus_types.h +1 −0 Original line number Diff line number Diff line Loading @@ -184,6 +184,7 @@ typedef enum { BUS_SEND_RX_FAILURE = -54, BUS_SEND_BAD_RESPONSE = -55, BUS_SEND_UNREGISTERED_SOCKET = -56, BUS_SEND_RX_TIMEOUT_EXPECT = -57, } bus_send_status_t; /* Result from attempting to configure a message bus. */ Loading
src/lib/bus/listener.c +40 −6 Original line number Diff line number Diff line Loading @@ -881,6 +881,15 @@ static rx_info_t *get_free_rx_info(struct listener *l) { } } static connection_info *get_connection_info(struct listener *l, int fd) { for (int i = 0; i < l->tracked_fds; i++) { connection_info *ci = l->fd_info[i]; assert(ci); if (ci->fd == fd) { return ci; } } return NULL; } static void release_rx_info(struct listener *l, rx_info_t *info) { assert(info); struct bus *b = l->bus; Loading @@ -896,9 +905,22 @@ static void release_rx_info(struct listener *l, rx_info_t *info) { * free it, but don't know how. We should never get here, * because it means the sender finished sending the message, * but the listener never got the handler callback. */ if (info->u.hold.result.ok) { void *msg = info->u.hold.result.u.success.msg; int64_t seq_id = info->u.hold.result.u.success.seq_id; connection_info *ci = get_connection_info(l, info->u.hold.fd); if (ci && b->unexpected_msg_cb) { BUS_LOG_SNPRINTF(b, 1, LOG_LISTENER, b->udata, 128, "CALLING UNEXPECTED_MSG_CB ON RESULT %p", (void *)&info->u.hold.result); b->unexpected_msg_cb(msg, seq_id, b->udata, ci->udata); } else { BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 128, "LEAKING RESULT %p", (void *)&info->u.hold.result); //assert(false); } } } break; case RIS_EXPECT: Loading Loading @@ -1266,12 +1288,23 @@ static void expect_response(listener *l, struct boxed_msg *box) { info->timeout_sec = box->timeout_sec; } } else { /* use free info */ /* If we get here, the listener thinks the HOLD message timed out, * but the sender doesn't think things timed out badly enough to * itself expose an error. We also don't know if we're going to * get a response or not. */ BUS_LOG_SNPRINTF(b, 3-3, LOG_MEMORY, b->udata, 128, "get_hold_rx_info FAILED: fd %d, seq_id %lld", box->fd, (long long)box->out_seq_id); /* This should be treated like a send timeout. */ info = get_free_rx_info(l); assert(info); assert(info->state == RIS_INACTIVE); BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "Setting info %p (+%d)'s box to %p", (void*)info, info->id, (void*)box); BUS_LOG_SNPRINTF(b, 3-3, LOG_MEMORY, b->udata, 256, "Setting info %p (+%d)'s box to %p, which will be expired immediately (timeout %lld)", (void*)info, info->id, (void*)box, (long long)box->timeout_sec); info->state = RIS_EXPECT; assert(info->u.expect.box == NULL); Loading @@ -1280,6 +1313,7 @@ static void expect_response(listener *l, struct boxed_msg *box) { info->u.expect.has_result = false; /* Switch over to sender's transferred timeout */ info->timeout_sec = box->timeout_sec; notify_message_failure(l, info, BUS_SEND_RX_TIMEOUT_EXPECT); } } Loading
src/lib/bus/sender.c +9 −18 Original line number Diff line number Diff line Loading @@ -593,9 +593,15 @@ static void enqueue_write(struct sender *s, tx_info_t *info) { * because (in rare cases) the response may arrive between finishing * the write and the listener processing the notification. In that * case, it should hold onto the unrecognized response until the * sender notifies it (and passes it the callback). */ * sender notifies it (and passes it the callback). * * This timeout is several extra seconds so that we don't have * a window where the HOLD message has timed out, but the * EXPECT hasn't, leading to ambiguity about what to do with * the response (which may or may not have arrived). * */ attempt_to_enqueue_sending_request_message_to_listener(s, fd, out_seq_id, info->u.enqueue.timeout_sec + 1); fd, out_seq_id, info->u.enqueue.timeout_sec + 5); /* Increment the refcount. This will cause poll to watch for the * socket being writable, if it isn't already being watched. */ Loading Loading @@ -898,7 +904,6 @@ static void update_sent(struct bus *b, sender *s, tx_info_t *info, ssize_t sent) .fd = info->u.write.fd, .timeout_sec = info->u.write.timeout_sec, .box = info->u.write.box, .retries = info->u.write.retries, }; /* Message has been sent, so release - caller may free it. */ Loading Loading @@ -1095,26 +1100,12 @@ static void attempt_to_enqueue_request_sent_message_to_listener(sender *s, tx_in info->state = TIS_REQUEST_RELEASE; info->u.release.backpressure = backpressure; notify_caller(s, info, true); notify_caller(s, info, true); // unblock sender } else { BUS_LOG(b, 2, LOG_SENDER, "failed delivery", b->udata); /* Put it back, since we need to keep managing it */ info->u.notify.box = box; info->u.notify.retries++; if (info->u.notify.retries == SENDER_MAX_DELIVERY_RETRIES) { info->state = TIS_ERROR; struct u_error ue = { .error = TX_ERROR_NOTIFY_LISTENER_FAILURE, .box = box, }; info->u.error = ue; notify_message_failure(s, info, BUS_SEND_RX_FAILURE); BUS_LOG(b, 2, LOG_SENDER, "failed delivery, several retries", b->udata); } } } Loading
src/lib/bus/sender_internal.h +2 −2 Original line number Diff line number Diff line Loading @@ -25,7 +25,9 @@ #define MAX_TIMEOUT 100 #define TX_TIMEOUT 9 #if 0 #define SENDER_MAX_DELIVERY_RETRIES 5 #endif /* Max number of in-flight messagse. */ #define MAX_CONCURRENT_SENDS 32 Loading Loading @@ -103,7 +105,6 @@ typedef struct { time_t timeout_sec; boxed_msg *box; size_t sent_size; uint8_t retries; fd_info *fdi; } write; Loading @@ -111,7 +112,6 @@ typedef struct { int fd; time_t timeout_sec; boxed_msg *box; uint8_t retries; } notify; struct { Loading
src/lib/kinetic_controller.c +16 −8 Original line number Diff line number Diff line Loading @@ -145,6 +145,9 @@ KineticStatus bus_to_kinetic_status(bus_send_status_t const status) case BUS_SEND_UNREGISTERED_SOCKET: res = KINETIC_STATUS_SOCKET_ERROR; break; case BUS_SEND_RX_TIMEOUT_EXPECT: res = KINETIC_STATUS_OPERATION_TIMEDOUT; break; case BUS_SEND_UNDEFINED: default: { Loading Loading @@ -176,10 +179,14 @@ static const char *bus_error_string(bus_send_status_t t) { return "rx_failure"; case BUS_SEND_BAD_RESPONSE: return "bad_response"; case BUS_SEND_UNREGISTERED_SOCKET: return "unregistered socket"; case BUS_SEND_RX_TIMEOUT_EXPECT: return "internal timeout"; } } void KineticController_HandleUnexecpectedResponse(void *msg, void KineticController_HandleUnexpectedResponse(void *msg, int64_t seq_id, void *bus_udata, void *socket_udata) Loading @@ -192,9 +199,10 @@ void KineticController_HandleUnexecpectedResponse(void *msg, (void)bus_udata; LOGF2("[PDU RX UNSOLICITED] pdu: 0x%0llX, session: 0x%llX, bus: 0x%llX, " "fd: %6d, protoLen: %u, valueLen: %u", "fd: %6d, protoLen: %u, valueLen: %u, seq_id: %08llx", response, connection->pSession, connection->messageBus, connection->socket, response->header.protobufLength, response->header.valueLength); response->header.protobufLength, response->header.valueLength, (long long)seq_id); // Handle unsolicited status PDUs if (response->proto->authType == KINETIC_PROTO_MESSAGE_AUTH_TYPE_UNSOLICITEDSTATUS) { Loading Loading @@ -226,7 +234,7 @@ void KineticController_HandleUnexecpectedResponse(void *msg, } } void KineticController_HandleExpectedResponse(bus_msg_result_t *res, void *udata) void KineticController_HandleResult(bus_msg_result_t *res, void *udata) { KineticOperation* op = udata; Loading Loading @@ -257,7 +265,7 @@ void KineticController_HandleExpectedResponse(bus_msg_result_t *res, void *udata else { // pull out bus error? LOGF1("Error receiving response, got message bus error: %s", bus_error_string(res->status)); LOGF0("Error receiving response, got message bus error: %s", bus_error_string(res->status)); } // Call operation-specific callback, if configured Loading