Loading src/lib/bus/bus_types.h +12 −0 Original line number Diff line number Diff line Loading @@ -76,6 +76,17 @@ struct boxed_msg; } \ } while (0) \ #define BUS_ASSERT(B, UDATA, COND) \ do { \ if(!(COND)) \ { \ BUS_LOG_SNPRINTF(B, 0, LOG_ASSERT, UDATA, 128, \ "BUS FAILURE at %s:%d in %s: assert(" #COND ")", \ __FILE__, (int)__LINE__, __FUNCTION__); \ assert(COND); \ } \ } while(0) /* Event tag for a log message. */ typedef enum { LOG_INITIALIZATION, Loading @@ -86,6 +97,7 @@ typedef enum { LOG_SENDER, LOG_LISTENER, LOG_MEMORY, LOG_ASSERT, /* ... */ LOG_EVENT_TYPE_COUNT, } log_event_t; Loading src/lib/bus/listener.c +58 −56 Original line number Diff line number Diff line Loading @@ -264,7 +264,7 @@ void listener_free(struct listener *l) { default: BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 64, "match fail %d on line %d", info->state, __LINE__); assert(false); BUS_ASSERT(b, b->udata, false); } } Loading Loading @@ -332,7 +332,7 @@ void *listener_mainloop(void *arg) { } else { /* unrecoverable poll error -- FD count is bad * or FDS is a bad pointer. */ assert(false); BUS_ASSERT(b, b->udata, false); } } else if (res > 0) { check_and_flush_incoming_msg_pipe(self, &res); Loading Loading @@ -417,7 +417,7 @@ static void set_error_for_socket(listener *l, int id, int fd, rx_error_t err) { { BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 64, "match fail %d on line %d", info->state, __LINE__); assert(false); BUS_ASSERT(b, b->udata, false); } } } Loading Loading @@ -449,7 +449,7 @@ static void attempt_recv(listener *l, int available) { if (read_from == available) { break; } struct pollfd *fd = &l->fds[i + INCOMING_MSG_PIPE]; connection_info *ci = l->fd_info[i]; assert(ci->fd == fd->fd); BUS_ASSERT(b, b->udata, ci->fd == fd->fd); if (fd->revents & (POLLERR | POLLNVAL)) { read_from++; Loading @@ -463,7 +463,7 @@ static void attempt_recv(listener *l, int available) { BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64, "reading %zd bytes from socket (buf is %zd)", ci->to_read_size, l->read_buf_size); assert(l->read_buf_size >= ci->to_read_size); BUS_ASSERT(b, b->udata, l->read_buf_size >= ci->to_read_size); read_from++; switch (ci->type) { Loading @@ -474,7 +474,7 @@ static void attempt_recv(listener *l, int available) { socket_read_ssl(b, l, i, ci); break; default: assert(false); BUS_ASSERT(b, b->udata, false); } } } Loading Loading @@ -504,7 +504,7 @@ static bool socket_read_plain(struct bus *b, listener *l, int pfd_i, connection_ } static bool socket_read_ssl(struct bus *b, listener *l, int pfd_i, connection_info *ci) { assert(ci->ssl); BUS_ASSERT(b, b->udata, ci->ssl); for (;;) { ssize_t pending = SSL_pending(ci->ssl); ssize_t size = (ssize_t)SSL_read(ci->ssl, l->read_buf, ci->to_read_size); Loading @@ -519,13 +519,13 @@ static bool socket_read_ssl(struct bus *b, listener *l, int pfd_i, connection_in return true; case SSL_ERROR_WANT_WRITE: assert(false); BUS_ASSERT(b, b->udata, false); case SSL_ERROR_SYSCALL: { if (errno == 0) { print_SSL_error(b, ci, 1, "SSL_ERROR_SYSCALL errno 0"); assert(false); BUS_ASSERT(b, b->udata, false); } else if (util_is_resumable_io_error(errno)) { errno = 0; } else { Loading @@ -547,7 +547,7 @@ static bool socket_read_ssl(struct bus *b, listener *l, int pfd_i, connection_in default: print_SSL_error(b, ci, 1, "SSL_ERROR UNKNOWN"); set_error_for_socket(l, pfd_i, ci->fd, RX_ERROR_READ_FAILURE); assert(false); BUS_ASSERT(b, b->udata, false); } } else if (size > 0) { sink_socket_read(b, l, ci, size); Loading Loading @@ -595,7 +595,7 @@ static bool sink_socket_read(struct bus *b, BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "Read buffer realloc failure for %p (%zd to %zd)", l->read_buf, l->read_buf_size, ci->to_read_size); assert(false); BUS_ASSERT(b, b->udata, false); } } return true; Loading Loading @@ -632,7 +632,7 @@ static rx_info_t *find_info_by_sequence_id(listener *l, default: BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 64, "match fail %d on line %d", info->state, __LINE__); assert(false); BUS_ASSERT(b, b->udata, false); } } Loading Loading @@ -668,7 +668,7 @@ static void process_unpacked_message(listener *l, switch (info->state) { case RIS_HOLD: /* Just save result, to match up later. */ assert(!info->u.hold.has_result); BUS_ASSERT(b, b->udata, !info->u.hold.has_result); info->u.hold.has_result = true; info->u.hold.result = result; break; Loading @@ -678,7 +678,7 @@ static void process_unpacked_message(listener *l, "marking info %d, seq_id:%lld ready for delivery", info->id, (long long)result.u.success.seq_id); info->u.expect.error = RX_ERROR_READY_FOR_DELIVERY; assert(!info->u.hold.has_result); BUS_ASSERT(b, b->udata, !info->u.hold.has_result); info->u.expect.has_result = true; info->u.expect.result = result; attempt_delivery(l, info); Loading @@ -686,7 +686,7 @@ static void process_unpacked_message(listener *l, } case RIS_INACTIVE: default: assert(false); BUS_ASSERT(b, b->udata, false); } } else { /* We received a response that we weren't expecting. */ Loading Loading @@ -777,7 +777,7 @@ static void tick_handler(listener *l) { default: BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 64, "match fail %d on line %d", info->state, __LINE__); assert(false); BUS_ASSERT(b, b->udata, false); } } } Loading Loading @@ -809,16 +809,16 @@ static void dump_rx_info_table(listener *l) { } static void retry_delivery(listener *l, rx_info_t *info) { assert(info->state == RIS_EXPECT); assert(info->u.expect.error == RX_ERROR_READY_FOR_DELIVERY); assert(info->u.expect.box); struct bus *b = l->bus; BUS_ASSERT(b, b->udata, info->state == RIS_EXPECT); BUS_ASSERT(b, b->udata, info->u.expect.error == RX_ERROR_READY_FOR_DELIVERY); BUS_ASSERT(b, b->udata, info->u.expect.box); struct boxed_msg *box = info->u.expect.box; info->u.expect.box = NULL; /* release */ BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "releasing box %p at line %d", (void*)box, __LINE__); assert(box->result.status == BUS_SEND_SUCCESS); BUS_ASSERT(b, b->udata, box->result.status == BUS_SEND_SUCCESS); size_t backpressure = 0; if (bus_process_boxed_message(l->bus, box, &backpressure)) { Loading @@ -837,9 +837,9 @@ static void retry_delivery(listener *l, rx_info_t *info) { } static void clean_up_completed_info(listener *l, rx_info_t *info) { assert(info->state == RIS_EXPECT); assert(info->u.expect.error == RX_ERROR_DONE); struct bus *b = l->bus; BUS_ASSERT(b, b->udata, info->state == RIS_EXPECT); BUS_ASSERT(b, b->udata, info->u.expect.error == RX_ERROR_DONE); BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "info %p, box is %p at line %d", (void*)info, (void*)info->u.expect.box, __LINE__); Loading @@ -857,7 +857,7 @@ static void clean_up_completed_info(listener *l, rx_info_t *info) { printf(" info->box->out_msg %p\n", (void*)box->out_msg); } assert(box->result.status == BUS_SEND_SUCCESS); BUS_ASSERT(b, b->udata, box->result.status == BUS_SEND_SUCCESS); BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "releasing box %p at line %d", (void*)box, __LINE__); info->u.expect.box = NULL; /* release */ Loading @@ -877,11 +877,12 @@ static void clean_up_completed_info(listener *l, rx_info_t *info) { static void notify_message_failure(listener *l, rx_info_t *info, bus_send_status_t status) { assert(info->state == RIS_EXPECT); assert(info->u.expect.box); info->u.expect.box->result.status = status; size_t backpressure = 0; struct bus *b = l->bus; BUS_ASSERT(b, b->udata, info->state == RIS_EXPECT); BUS_ASSERT(b, b->udata, info->u.expect.box); info->u.expect.box->result.status = status; boxed_msg *box = info->u.expect.box; info->u.expect.box = NULL; Loading Loading @@ -913,37 +914,38 @@ static rx_info_t *get_free_rx_info(struct listener *l) { head->next = NULL; l->rx_info_in_use++; BUS_LOG(l->bus, 4, LOG_LISTENER, "reserving RX info", l->bus->udata); assert(head->state == RIS_INACTIVE); BUS_ASSERT(b, b->udata, head->state == RIS_INACTIVE); if (l->rx_info_max_used < head->id) { BUS_LOG_SNPRINTF(b, 5, LOG_LISTENER, b->udata, 128, "rx_info_max_used <- %d", head->id); l->rx_info_max_used = head->id; assert(l->rx_info_max_used < MAX_PENDING_MESSAGES); BUS_ASSERT(b, b->udata, l->rx_info_max_used < MAX_PENDING_MESSAGES); } BUS_LOG_SNPRINTF(b, 5, LOG_LISTENER, b->udata, 128, "got free rx_info_t %d (%p)", head->id, (void *)head); assert(head == &l->rx_info[head->id]); BUS_ASSERT(b, b->udata, head == &l->rx_info[head->id]); return head; } } static connection_info *get_connection_info(struct listener *l, int fd) { struct bus *b = l->bus; for (int i = 0; i < l->tracked_fds; i++) { connection_info *ci = l->fd_info[i]; assert(ci); BUS_ASSERT(b, b->udata, ci); if (ci->fd == fd) { return ci; } } return NULL; } static void release_rx_info(struct listener *l, rx_info_t *info) { assert(info); struct bus *b = l->bus; BUS_ASSERT(b, b->udata, info); BUS_LOG_SNPRINTF(b, 5, LOG_LISTENER, b->udata, 128, "releasing RX info %d (%p)", info->id, (void *)info); assert(info->id < MAX_PENDING_MESSAGES); assert(info == &l->rx_info[info->id]); BUS_ASSERT(b, b->udata, info->id < MAX_PENDING_MESSAGES); BUS_ASSERT(b, b->udata, info == &l->rx_info[info->id]); switch (info->state) { case RIS_HOLD: Loading Loading @@ -971,12 +973,12 @@ static void release_rx_info(struct listener *l, rx_info_t *info) { } break; case RIS_EXPECT: assert(info->u.expect.error == RX_ERROR_DONE); assert(info->u.expect.box == NULL); BUS_ASSERT(b, b->udata, info->u.expect.error == RX_ERROR_DONE); BUS_ASSERT(b, b->udata, info->u.expect.box == NULL); break; default: case RIS_INACTIVE: assert(false); BUS_ASSERT(b, b->udata, false); } /* Set to no longer active and push on the freelist. */ Loading @@ -984,7 +986,7 @@ static void release_rx_info(struct listener *l, rx_info_t *info) { "releasing rx_info_t %d (%p), was %d", info->id, (void *)info, info->state); assert(info->state != RIS_INACTIVE); BUS_ASSERT(b, b->udata, info->state != RIS_INACTIVE); info->state = RIS_INACTIVE; memset(&info->u, 0, sizeof(info->u)); info->next = l->rx_info_freelist; Loading @@ -998,7 +1000,7 @@ static void release_rx_info(struct listener *l, rx_info_t *info) { l->rx_info_max_used--; if (l->rx_info_max_used == 0) { break; } } assert(l->rx_info_max_used < MAX_PENDING_MESSAGES); BUS_ASSERT(b, b->udata, l->rx_info_max_used < MAX_PENDING_MESSAGES); } l->rx_info_in_use--; Loading Loading @@ -1032,7 +1034,7 @@ static listener_msg *get_free_msg(listener *l) { }; nanosleep(&ts, NULL); } assert(head->type == MSG_NONE); BUS_ASSERT(b, b->udata, head->type == MSG_NONE); return head; } } Loading @@ -1042,7 +1044,7 @@ static listener_msg *get_free_msg(listener *l) { static void release_msg(struct listener *l, listener_msg *msg) { struct bus *b = l->bus; assert(msg->id < MAX_QUEUE_MESSAGES); BUS_ASSERT(b, b->udata, msg->id < MAX_QUEUE_MESSAGES); msg->type = MSG_NONE; for (;;) { Loading @@ -1052,7 +1054,7 @@ static void release_msg(struct listener *l, listener_msg *msg) { for (;;) { int16_t miu = l->msgs_in_use; if (ATOMIC_BOOL_COMPARE_AND_SWAP(&l->msgs_in_use, miu, miu - 1)) { assert(miu >= 0); BUS_ASSERT(b, b->udata, miu >= 0); BUS_LOG(b, 3, LOG_LISTENER, "Releasing msg", b->udata); return; } Loading @@ -1062,8 +1064,8 @@ static void release_msg(struct listener *l, listener_msg *msg) { } static bool push_message(struct listener *l, listener_msg *msg) { assert(msg); struct bus *b = l->bus; BUS_ASSERT(b, b->udata, msg); if (casq_push(l->q, msg)) { retry: Loading @@ -1073,7 +1075,7 @@ retry: if (wr == 1) { return true; } else { assert(wr == -1); BUS_ASSERT(b, b->udata, wr == -1); if (errno == EINTR) { errno = 0; goto retry; Loading @@ -1087,7 +1089,7 @@ retry: } else { BUS_LOG_SNPRINTF(b, 3 - 3, LOG_LISTENER, b->udata, 128, "Failed to pushed message -- %p", (void*)msg); assert(false); BUS_ASSERT(b, b->udata, false); release_msg(l, msg); return false; } Loading Loading @@ -1120,7 +1122,7 @@ static void msg_handler(listener *l, listener_msg *pmsg) { case MSG_NONE: default: assert(false); BUS_ASSERT(b, b->udata, false); break; } release_msg(l, pmsg); Loading Loading @@ -1185,7 +1187,7 @@ static void add_socket(listener *l, connection_info *ci, int notify_fd) { /* Prime the pump by sinking 0 bytes and getting a size to expect. */ bus_sink_cb_res_t sink_res = b->sink_cb(l->read_buf, 0, ci->udata); assert(sink_res.full_msg_buffer == NULL); // should have nothing to handle yet BUS_ASSERT(b, b->udata, sink_res.full_msg_buffer == NULL); // should have nothing to handle yet ci->to_read_size = sink_res.next_read; if (!grow_read_buf(l, ci->to_read_size)) { Loading Loading @@ -1235,8 +1237,8 @@ static void hold_response(listener *l, int fd, int64_t seq_id, int16_t timeout_s struct bus *b = l->bus; rx_info_t *info = get_free_rx_info(l); assert(info); assert(info->state == RIS_INACTIVE); BUS_ASSERT(b, b->udata, info); BUS_ASSERT(b, b->udata, info->state == RIS_INACTIVE); BUS_LOG_SNPRINTF(b, 5, LOG_LISTENER, b->udata, 128, "setting info %p(+%d) to hold response <fd:%d, seq_id:%lld>", (void *)info, info->id, fd, (long long)seq_id); Loading Loading @@ -1277,16 +1279,16 @@ static void attempt_delivery(listener *l, struct rx_info_t *info) { bus_unpack_cb_res_t unpacked_result; switch (info->state) { case RIS_EXPECT: assert(info->u.expect.has_result); BUS_ASSERT(b, b->udata, info->u.expect.has_result); unpacked_result = info->u.expect.result; break; default: case RIS_HOLD: case RIS_INACTIVE: assert(false); BUS_ASSERT(b, b->udata, false); } assert(unpacked_result.ok); BUS_ASSERT(b, b->udata, unpacked_result.ok); int64_t seq_id = unpacked_result.u.success.seq_id; void *opaque_msg = unpacked_result.u.success.msg; result->u.response.seq_id = seq_id; Loading @@ -1312,8 +1314,8 @@ static void attempt_delivery(listener *l, struct rx_info_t *info) { } static void expect_response(listener *l, struct boxed_msg *box) { assert(box); struct bus *b = l->bus; BUS_ASSERT(b, b->udata, box); BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 128, "notifying to expect response <box:%p, fd:%d, seq_id:%lld>", (void *)box, box->fd, (long long)box->out_seq_id); Loading @@ -1321,7 +1323,7 @@ static void expect_response(listener *l, struct boxed_msg *box) { /* If there's a pending HOLD message, convert it. */ rx_info_t *info = get_hold_rx_info(l, box->fd, box->out_seq_id); if (info) { assert(info->state == RIS_HOLD); BUS_ASSERT(b, b->udata, info->state == RIS_HOLD); if (info->u.hold.has_result) { bus_unpack_cb_res_t result = info->u.hold.result; Loading Loading @@ -1360,15 +1362,15 @@ static void expect_response(listener *l, struct boxed_msg *box) { /* This should be treated like a send timeout. */ info = get_free_rx_info(l); assert(info); assert(info->state == RIS_INACTIVE); BUS_ASSERT(b, b->udata, info); BUS_ASSERT(b, b->udata, info->state == RIS_INACTIVE); BUS_LOG_SNPRINTF(b, 3-3, LOG_MEMORY, b->udata, 256, "Setting info %p (+%d)'s box to %p, which will be expired immediately (timeout %lld)", (void*)info, info->id, (void*)box, (long long)box->timeout_sec); info->state = RIS_EXPECT; assert(info->u.expect.box == NULL); BUS_ASSERT(b, b->udata, info->u.expect.box == NULL); info->u.expect.box = box; info->u.expect.error = RX_ERROR_NONE; info->u.expect.has_result = false; Loading src/lib/kinetic_allocator.c +4 −4 Original line number Diff line number Diff line Loading @@ -42,7 +42,7 @@ KineticConnection* KineticAllocator_NewConnection(struct bus * b, KineticSession void KineticAllocator_FreeConnection(KineticConnection* connection) { assert(connection != NULL); KINETIC_ASSERT(connection != NULL); KineticResourceWaiter_Destroy(&connection->connectionReady); KineticFree(connection); } Loading @@ -59,7 +59,7 @@ KineticResponse * KineticAllocator_NewKineticResponse(size_t const valueLength) void KineticAllocator_FreeKineticResponse(KineticResponse * response) { assert(response != NULL); KINETIC_ASSERT(response != NULL); if (response->command != NULL) { protobuf_c_message_free_unpacked(&response->command->base, NULL); Loading @@ -72,7 +72,7 @@ void KineticAllocator_FreeKineticResponse(KineticResponse * response) KineticOperation* KineticAllocator_NewOperation(KineticConnection* const connection) { assert(connection != NULL); KINETIC_ASSERT(connection != NULL); LOGF3("Allocating new operation on connection (0x%0llX)", connection); KineticOperation* newOperation = (KineticOperation*)KineticCalloc(1, sizeof(KineticOperation)); Loading @@ -95,7 +95,7 @@ KineticOperation* KineticAllocator_NewOperation(KineticConnection* const connect void KineticAllocator_FreeOperation(KineticOperation* operation) { assert(operation != NULL); KINETIC_ASSERT(operation != NULL); KineticConnection* const connection = operation->connection; LOGF3("Freeing operation (0x%0llX) on connection (0x%0llX)", operation, connection); if (operation->request != NULL) { Loading src/lib/kinetic_client.c +33 −33 Original line number Diff line number Diff line Loading @@ -126,8 +126,8 @@ KineticStatus KineticClient_DestroyConnection(KineticSession* const session) KineticStatus KineticClient_NoOp(KineticSession const * const session) { assert(session != NULL); assert(session->connection != NULL); KINETIC_ASSERT(session != NULL); KINETIC_ASSERT(session->connection != NULL); KineticOperation* operation = KineticController_CreateOperation(session); if (operation == NULL) {return KINETIC_STATUS_MEMORY_ERROR;} Loading @@ -140,13 +140,13 @@ KineticStatus KineticClient_Put(KineticSession const * const session, KineticEntry* const entry, KineticCompletionClosure* closure) { assert(session != NULL); assert(session->connection != NULL); assert(entry != NULL); assert(entry->value.array.data != NULL); KINETIC_ASSERT(session != NULL); KINETIC_ASSERT(session->connection != NULL); KINETIC_ASSERT(entry != NULL); KINETIC_ASSERT(entry->value.array.data != NULL); assert(session->connection->pSession == session); assert(session->connection == session->connection->pSession->connection); KINETIC_ASSERT(session->connection->pSession == session); KINETIC_ASSERT(session->connection == session->connection->pSession->connection); KineticOperation* operation = KineticController_CreateOperation(session); if (operation == NULL) {return KINETIC_STATUS_MEMORY_ERROR;} Loading @@ -155,7 +155,7 @@ KineticStatus KineticClient_Put(KineticSession const * const session, KineticOperation_BuildPut(operation, entry); // Execute the operation assert(operation->connection == session->connection); KINETIC_ASSERT(operation->connection == session->connection); KineticStatus res = KineticController_ExecuteOperation(operation, closure); return res; } Loading @@ -163,8 +163,8 @@ KineticStatus KineticClient_Put(KineticSession const * const session, KineticStatus KineticClient_Flush(KineticSession const * const session, KineticCompletionClosure* closure) { assert(session != NULL); assert(session->connection != NULL); KINETIC_ASSERT(session != NULL); KINETIC_ASSERT(session->connection != NULL); KineticOperation* operation = KineticController_CreateOperation(session); if (operation == NULL) { return KINETIC_STATUS_MEMORY_ERROR; } Loading Loading @@ -197,9 +197,9 @@ static KineticStatus handle_get_command(GET_COMMAND cmd, KineticEntry* const entry, KineticCompletionClosure* closure) { assert(session != NULL); assert(session->connection != NULL); assert(entry != NULL); KINETIC_ASSERT(session != NULL); KINETIC_ASSERT(session->connection != NULL); KINETIC_ASSERT(entry != NULL); if (!has_key(entry)) {return KINETIC_STATUS_MISSING_KEY;} if (!has_value_buffer(entry) && !entry->metadataOnly) { Loading @@ -224,7 +224,7 @@ static KineticStatus handle_get_command(GET_COMMAND cmd, KineticOperation_BuildGetPrevious(operation, entry); break; default: assert(false); KINETIC_ASSERT(false); } // Execute the operation Loading Loading @@ -256,9 +256,9 @@ KineticStatus KineticClient_Delete(KineticSession const * const session, KineticEntry* const entry, KineticCompletionClosure* closure) { assert(session != NULL); assert(session->connection != NULL); assert(entry != NULL); KINETIC_ASSERT(session != NULL); KINETIC_ASSERT(session->connection != NULL); KINETIC_ASSERT(entry != NULL); KineticOperation* operation = KineticController_CreateOperation(session); if (operation == NULL) {return KINETIC_STATUS_MEMORY_ERROR;} Loading @@ -275,12 +275,12 @@ KineticStatus KineticClient_GetKeyRange(KineticSession const * const session, ByteBufferArray* keys, KineticCompletionClosure* closure) { assert(session != NULL); assert(session->connection != NULL); assert(range != NULL); assert(keys != NULL); assert(keys->buffers != NULL); assert(keys->count > 0); KINETIC_ASSERT(session != NULL); KINETIC_ASSERT(session->connection != NULL); KINETIC_ASSERT(range != NULL); KINETIC_ASSERT(keys != NULL); KINETIC_ASSERT(keys->buffers != NULL); KINETIC_ASSERT(keys->count > 0); KineticOperation* operation = KineticController_CreateOperation(session); if (operation == NULL) {return KINETIC_STATUS_MEMORY_ERROR;} Loading @@ -297,9 +297,9 @@ KineticStatus KineticClient_GetLog(KineticSession const * const session, KineticDeviceInfo** info, KineticCompletionClosure* closure) { assert(session != NULL); assert(session->connection != NULL); assert(info != NULL); KINETIC_ASSERT(session != NULL); KINETIC_ASSERT(session->connection != NULL); KINETIC_ASSERT(info != NULL); KineticOperation* operation = KineticController_CreateOperation(session); if (operation == NULL) {return KINETIC_STATUS_MEMORY_ERROR;} Loading @@ -314,7 +314,7 @@ KineticStatus KineticClient_GetLog(KineticSession const * const session, void KineticClient_FreeDeviceInfo(KineticSession const * const session, KineticDeviceInfo* info) { assert(session != NULL); KINETIC_ASSERT(session != NULL); if (info) { KineticDeviceInfo_Free(info); } /* The session is not currently used, but part of the API to allow Loading @@ -327,9 +327,9 @@ KineticStatus KineticClient_P2POperation(KineticSession const * const session, KineticP2P_Operation* const p2pOp, KineticCompletionClosure* closure) { assert(session != NULL); assert(session->connection != NULL); assert(p2pOp != NULL); KINETIC_ASSERT(session != NULL); KINETIC_ASSERT(session->connection != NULL); KINETIC_ASSERT(p2pOp != NULL); KineticOperation* operation = KineticController_CreateOperation(session); if (operation == NULL) {return KINETIC_STATUS_MEMORY_ERROR;} Loading @@ -351,8 +351,8 @@ KineticStatus KineticClient_P2POperation(KineticSession const * const session, KineticStatus KineticClient_InstantSecureErase(KineticSession const * const session) { assert(session != NULL); assert(session->connection != NULL); KINETIC_ASSERT(session != NULL); KINETIC_ASSERT(session->connection != NULL); KineticOperation* operation = KineticController_CreateOperation(session); if (operation == NULL) {return KINETIC_STATUS_MEMORY_ERROR;} Loading src/lib/kinetic_controller.c +4 −4 Original line number Diff line number Diff line Loading @@ -79,9 +79,9 @@ static KineticCompletionClosure DefaultClosure(DefaultCallbackData * const data) KineticStatus KineticController_ExecuteOperation(KineticOperation* operation, KineticCompletionClosure* const closure) { assert(operation != NULL); assert(operation->connection != NULL); assert(operation->connection->pSession != NULL); KINETIC_ASSERT(operation != NULL); KINETIC_ASSERT(operation->connection != NULL); KINETIC_ASSERT(operation->connection->pSession != NULL); KineticStatus status = KINETIC_STATUS_INVALID; if (closure != NULL) Loading Loading @@ -152,7 +152,7 @@ KineticStatus bus_to_kinetic_status(bus_send_status_t const status) default: { LOGF0("bus_to_kinetic_status: UNMATCHED %d\n", status); assert(false); KINETIC_ASSERT(false); return KINETIC_STATUS_INVALID; } } Loading Loading
src/lib/bus/bus_types.h +12 −0 Original line number Diff line number Diff line Loading @@ -76,6 +76,17 @@ struct boxed_msg; } \ } while (0) \ #define BUS_ASSERT(B, UDATA, COND) \ do { \ if(!(COND)) \ { \ BUS_LOG_SNPRINTF(B, 0, LOG_ASSERT, UDATA, 128, \ "BUS FAILURE at %s:%d in %s: assert(" #COND ")", \ __FILE__, (int)__LINE__, __FUNCTION__); \ assert(COND); \ } \ } while(0) /* Event tag for a log message. */ typedef enum { LOG_INITIALIZATION, Loading @@ -86,6 +97,7 @@ typedef enum { LOG_SENDER, LOG_LISTENER, LOG_MEMORY, LOG_ASSERT, /* ... */ LOG_EVENT_TYPE_COUNT, } log_event_t; Loading
src/lib/bus/listener.c +58 −56 Original line number Diff line number Diff line Loading @@ -264,7 +264,7 @@ void listener_free(struct listener *l) { default: BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 64, "match fail %d on line %d", info->state, __LINE__); assert(false); BUS_ASSERT(b, b->udata, false); } } Loading Loading @@ -332,7 +332,7 @@ void *listener_mainloop(void *arg) { } else { /* unrecoverable poll error -- FD count is bad * or FDS is a bad pointer. */ assert(false); BUS_ASSERT(b, b->udata, false); } } else if (res > 0) { check_and_flush_incoming_msg_pipe(self, &res); Loading Loading @@ -417,7 +417,7 @@ static void set_error_for_socket(listener *l, int id, int fd, rx_error_t err) { { BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 64, "match fail %d on line %d", info->state, __LINE__); assert(false); BUS_ASSERT(b, b->udata, false); } } } Loading Loading @@ -449,7 +449,7 @@ static void attempt_recv(listener *l, int available) { if (read_from == available) { break; } struct pollfd *fd = &l->fds[i + INCOMING_MSG_PIPE]; connection_info *ci = l->fd_info[i]; assert(ci->fd == fd->fd); BUS_ASSERT(b, b->udata, ci->fd == fd->fd); if (fd->revents & (POLLERR | POLLNVAL)) { read_from++; Loading @@ -463,7 +463,7 @@ static void attempt_recv(listener *l, int available) { BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64, "reading %zd bytes from socket (buf is %zd)", ci->to_read_size, l->read_buf_size); assert(l->read_buf_size >= ci->to_read_size); BUS_ASSERT(b, b->udata, l->read_buf_size >= ci->to_read_size); read_from++; switch (ci->type) { Loading @@ -474,7 +474,7 @@ static void attempt_recv(listener *l, int available) { socket_read_ssl(b, l, i, ci); break; default: assert(false); BUS_ASSERT(b, b->udata, false); } } } Loading Loading @@ -504,7 +504,7 @@ static bool socket_read_plain(struct bus *b, listener *l, int pfd_i, connection_ } static bool socket_read_ssl(struct bus *b, listener *l, int pfd_i, connection_info *ci) { assert(ci->ssl); BUS_ASSERT(b, b->udata, ci->ssl); for (;;) { ssize_t pending = SSL_pending(ci->ssl); ssize_t size = (ssize_t)SSL_read(ci->ssl, l->read_buf, ci->to_read_size); Loading @@ -519,13 +519,13 @@ static bool socket_read_ssl(struct bus *b, listener *l, int pfd_i, connection_in return true; case SSL_ERROR_WANT_WRITE: assert(false); BUS_ASSERT(b, b->udata, false); case SSL_ERROR_SYSCALL: { if (errno == 0) { print_SSL_error(b, ci, 1, "SSL_ERROR_SYSCALL errno 0"); assert(false); BUS_ASSERT(b, b->udata, false); } else if (util_is_resumable_io_error(errno)) { errno = 0; } else { Loading @@ -547,7 +547,7 @@ static bool socket_read_ssl(struct bus *b, listener *l, int pfd_i, connection_in default: print_SSL_error(b, ci, 1, "SSL_ERROR UNKNOWN"); set_error_for_socket(l, pfd_i, ci->fd, RX_ERROR_READ_FAILURE); assert(false); BUS_ASSERT(b, b->udata, false); } } else if (size > 0) { sink_socket_read(b, l, ci, size); Loading Loading @@ -595,7 +595,7 @@ static bool sink_socket_read(struct bus *b, BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "Read buffer realloc failure for %p (%zd to %zd)", l->read_buf, l->read_buf_size, ci->to_read_size); assert(false); BUS_ASSERT(b, b->udata, false); } } return true; Loading Loading @@ -632,7 +632,7 @@ static rx_info_t *find_info_by_sequence_id(listener *l, default: BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 64, "match fail %d on line %d", info->state, __LINE__); assert(false); BUS_ASSERT(b, b->udata, false); } } Loading Loading @@ -668,7 +668,7 @@ static void process_unpacked_message(listener *l, switch (info->state) { case RIS_HOLD: /* Just save result, to match up later. */ assert(!info->u.hold.has_result); BUS_ASSERT(b, b->udata, !info->u.hold.has_result); info->u.hold.has_result = true; info->u.hold.result = result; break; Loading @@ -678,7 +678,7 @@ static void process_unpacked_message(listener *l, "marking info %d, seq_id:%lld ready for delivery", info->id, (long long)result.u.success.seq_id); info->u.expect.error = RX_ERROR_READY_FOR_DELIVERY; assert(!info->u.hold.has_result); BUS_ASSERT(b, b->udata, !info->u.hold.has_result); info->u.expect.has_result = true; info->u.expect.result = result; attempt_delivery(l, info); Loading @@ -686,7 +686,7 @@ static void process_unpacked_message(listener *l, } case RIS_INACTIVE: default: assert(false); BUS_ASSERT(b, b->udata, false); } } else { /* We received a response that we weren't expecting. */ Loading Loading @@ -777,7 +777,7 @@ static void tick_handler(listener *l) { default: BUS_LOG_SNPRINTF(b, 0, LOG_LISTENER, b->udata, 64, "match fail %d on line %d", info->state, __LINE__); assert(false); BUS_ASSERT(b, b->udata, false); } } } Loading Loading @@ -809,16 +809,16 @@ static void dump_rx_info_table(listener *l) { } static void retry_delivery(listener *l, rx_info_t *info) { assert(info->state == RIS_EXPECT); assert(info->u.expect.error == RX_ERROR_READY_FOR_DELIVERY); assert(info->u.expect.box); struct bus *b = l->bus; BUS_ASSERT(b, b->udata, info->state == RIS_EXPECT); BUS_ASSERT(b, b->udata, info->u.expect.error == RX_ERROR_READY_FOR_DELIVERY); BUS_ASSERT(b, b->udata, info->u.expect.box); struct boxed_msg *box = info->u.expect.box; info->u.expect.box = NULL; /* release */ BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "releasing box %p at line %d", (void*)box, __LINE__); assert(box->result.status == BUS_SEND_SUCCESS); BUS_ASSERT(b, b->udata, box->result.status == BUS_SEND_SUCCESS); size_t backpressure = 0; if (bus_process_boxed_message(l->bus, box, &backpressure)) { Loading @@ -837,9 +837,9 @@ static void retry_delivery(listener *l, rx_info_t *info) { } static void clean_up_completed_info(listener *l, rx_info_t *info) { assert(info->state == RIS_EXPECT); assert(info->u.expect.error == RX_ERROR_DONE); struct bus *b = l->bus; BUS_ASSERT(b, b->udata, info->state == RIS_EXPECT); BUS_ASSERT(b, b->udata, info->u.expect.error == RX_ERROR_DONE); BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "info %p, box is %p at line %d", (void*)info, (void*)info->u.expect.box, __LINE__); Loading @@ -857,7 +857,7 @@ static void clean_up_completed_info(listener *l, rx_info_t *info) { printf(" info->box->out_msg %p\n", (void*)box->out_msg); } assert(box->result.status == BUS_SEND_SUCCESS); BUS_ASSERT(b, b->udata, box->result.status == BUS_SEND_SUCCESS); BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128, "releasing box %p at line %d", (void*)box, __LINE__); info->u.expect.box = NULL; /* release */ Loading @@ -877,11 +877,12 @@ static void clean_up_completed_info(listener *l, rx_info_t *info) { static void notify_message_failure(listener *l, rx_info_t *info, bus_send_status_t status) { assert(info->state == RIS_EXPECT); assert(info->u.expect.box); info->u.expect.box->result.status = status; size_t backpressure = 0; struct bus *b = l->bus; BUS_ASSERT(b, b->udata, info->state == RIS_EXPECT); BUS_ASSERT(b, b->udata, info->u.expect.box); info->u.expect.box->result.status = status; boxed_msg *box = info->u.expect.box; info->u.expect.box = NULL; Loading Loading @@ -913,37 +914,38 @@ static rx_info_t *get_free_rx_info(struct listener *l) { head->next = NULL; l->rx_info_in_use++; BUS_LOG(l->bus, 4, LOG_LISTENER, "reserving RX info", l->bus->udata); assert(head->state == RIS_INACTIVE); BUS_ASSERT(b, b->udata, head->state == RIS_INACTIVE); if (l->rx_info_max_used < head->id) { BUS_LOG_SNPRINTF(b, 5, LOG_LISTENER, b->udata, 128, "rx_info_max_used <- %d", head->id); l->rx_info_max_used = head->id; assert(l->rx_info_max_used < MAX_PENDING_MESSAGES); BUS_ASSERT(b, b->udata, l->rx_info_max_used < MAX_PENDING_MESSAGES); } BUS_LOG_SNPRINTF(b, 5, LOG_LISTENER, b->udata, 128, "got free rx_info_t %d (%p)", head->id, (void *)head); assert(head == &l->rx_info[head->id]); BUS_ASSERT(b, b->udata, head == &l->rx_info[head->id]); return head; } } static connection_info *get_connection_info(struct listener *l, int fd) { struct bus *b = l->bus; for (int i = 0; i < l->tracked_fds; i++) { connection_info *ci = l->fd_info[i]; assert(ci); BUS_ASSERT(b, b->udata, ci); if (ci->fd == fd) { return ci; } } return NULL; } static void release_rx_info(struct listener *l, rx_info_t *info) { assert(info); struct bus *b = l->bus; BUS_ASSERT(b, b->udata, info); BUS_LOG_SNPRINTF(b, 5, LOG_LISTENER, b->udata, 128, "releasing RX info %d (%p)", info->id, (void *)info); assert(info->id < MAX_PENDING_MESSAGES); assert(info == &l->rx_info[info->id]); BUS_ASSERT(b, b->udata, info->id < MAX_PENDING_MESSAGES); BUS_ASSERT(b, b->udata, info == &l->rx_info[info->id]); switch (info->state) { case RIS_HOLD: Loading Loading @@ -971,12 +973,12 @@ static void release_rx_info(struct listener *l, rx_info_t *info) { } break; case RIS_EXPECT: assert(info->u.expect.error == RX_ERROR_DONE); assert(info->u.expect.box == NULL); BUS_ASSERT(b, b->udata, info->u.expect.error == RX_ERROR_DONE); BUS_ASSERT(b, b->udata, info->u.expect.box == NULL); break; default: case RIS_INACTIVE: assert(false); BUS_ASSERT(b, b->udata, false); } /* Set to no longer active and push on the freelist. */ Loading @@ -984,7 +986,7 @@ static void release_rx_info(struct listener *l, rx_info_t *info) { "releasing rx_info_t %d (%p), was %d", info->id, (void *)info, info->state); assert(info->state != RIS_INACTIVE); BUS_ASSERT(b, b->udata, info->state != RIS_INACTIVE); info->state = RIS_INACTIVE; memset(&info->u, 0, sizeof(info->u)); info->next = l->rx_info_freelist; Loading @@ -998,7 +1000,7 @@ static void release_rx_info(struct listener *l, rx_info_t *info) { l->rx_info_max_used--; if (l->rx_info_max_used == 0) { break; } } assert(l->rx_info_max_used < MAX_PENDING_MESSAGES); BUS_ASSERT(b, b->udata, l->rx_info_max_used < MAX_PENDING_MESSAGES); } l->rx_info_in_use--; Loading Loading @@ -1032,7 +1034,7 @@ static listener_msg *get_free_msg(listener *l) { }; nanosleep(&ts, NULL); } assert(head->type == MSG_NONE); BUS_ASSERT(b, b->udata, head->type == MSG_NONE); return head; } } Loading @@ -1042,7 +1044,7 @@ static listener_msg *get_free_msg(listener *l) { static void release_msg(struct listener *l, listener_msg *msg) { struct bus *b = l->bus; assert(msg->id < MAX_QUEUE_MESSAGES); BUS_ASSERT(b, b->udata, msg->id < MAX_QUEUE_MESSAGES); msg->type = MSG_NONE; for (;;) { Loading @@ -1052,7 +1054,7 @@ static void release_msg(struct listener *l, listener_msg *msg) { for (;;) { int16_t miu = l->msgs_in_use; if (ATOMIC_BOOL_COMPARE_AND_SWAP(&l->msgs_in_use, miu, miu - 1)) { assert(miu >= 0); BUS_ASSERT(b, b->udata, miu >= 0); BUS_LOG(b, 3, LOG_LISTENER, "Releasing msg", b->udata); return; } Loading @@ -1062,8 +1064,8 @@ static void release_msg(struct listener *l, listener_msg *msg) { } static bool push_message(struct listener *l, listener_msg *msg) { assert(msg); struct bus *b = l->bus; BUS_ASSERT(b, b->udata, msg); if (casq_push(l->q, msg)) { retry: Loading @@ -1073,7 +1075,7 @@ retry: if (wr == 1) { return true; } else { assert(wr == -1); BUS_ASSERT(b, b->udata, wr == -1); if (errno == EINTR) { errno = 0; goto retry; Loading @@ -1087,7 +1089,7 @@ retry: } else { BUS_LOG_SNPRINTF(b, 3 - 3, LOG_LISTENER, b->udata, 128, "Failed to pushed message -- %p", (void*)msg); assert(false); BUS_ASSERT(b, b->udata, false); release_msg(l, msg); return false; } Loading Loading @@ -1120,7 +1122,7 @@ static void msg_handler(listener *l, listener_msg *pmsg) { case MSG_NONE: default: assert(false); BUS_ASSERT(b, b->udata, false); break; } release_msg(l, pmsg); Loading Loading @@ -1185,7 +1187,7 @@ static void add_socket(listener *l, connection_info *ci, int notify_fd) { /* Prime the pump by sinking 0 bytes and getting a size to expect. */ bus_sink_cb_res_t sink_res = b->sink_cb(l->read_buf, 0, ci->udata); assert(sink_res.full_msg_buffer == NULL); // should have nothing to handle yet BUS_ASSERT(b, b->udata, sink_res.full_msg_buffer == NULL); // should have nothing to handle yet ci->to_read_size = sink_res.next_read; if (!grow_read_buf(l, ci->to_read_size)) { Loading Loading @@ -1235,8 +1237,8 @@ static void hold_response(listener *l, int fd, int64_t seq_id, int16_t timeout_s struct bus *b = l->bus; rx_info_t *info = get_free_rx_info(l); assert(info); assert(info->state == RIS_INACTIVE); BUS_ASSERT(b, b->udata, info); BUS_ASSERT(b, b->udata, info->state == RIS_INACTIVE); BUS_LOG_SNPRINTF(b, 5, LOG_LISTENER, b->udata, 128, "setting info %p(+%d) to hold response <fd:%d, seq_id:%lld>", (void *)info, info->id, fd, (long long)seq_id); Loading Loading @@ -1277,16 +1279,16 @@ static void attempt_delivery(listener *l, struct rx_info_t *info) { bus_unpack_cb_res_t unpacked_result; switch (info->state) { case RIS_EXPECT: assert(info->u.expect.has_result); BUS_ASSERT(b, b->udata, info->u.expect.has_result); unpacked_result = info->u.expect.result; break; default: case RIS_HOLD: case RIS_INACTIVE: assert(false); BUS_ASSERT(b, b->udata, false); } assert(unpacked_result.ok); BUS_ASSERT(b, b->udata, unpacked_result.ok); int64_t seq_id = unpacked_result.u.success.seq_id; void *opaque_msg = unpacked_result.u.success.msg; result->u.response.seq_id = seq_id; Loading @@ -1312,8 +1314,8 @@ static void attempt_delivery(listener *l, struct rx_info_t *info) { } static void expect_response(listener *l, struct boxed_msg *box) { assert(box); struct bus *b = l->bus; BUS_ASSERT(b, b->udata, box); BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 128, "notifying to expect response <box:%p, fd:%d, seq_id:%lld>", (void *)box, box->fd, (long long)box->out_seq_id); Loading @@ -1321,7 +1323,7 @@ static void expect_response(listener *l, struct boxed_msg *box) { /* If there's a pending HOLD message, convert it. */ rx_info_t *info = get_hold_rx_info(l, box->fd, box->out_seq_id); if (info) { assert(info->state == RIS_HOLD); BUS_ASSERT(b, b->udata, info->state == RIS_HOLD); if (info->u.hold.has_result) { bus_unpack_cb_res_t result = info->u.hold.result; Loading Loading @@ -1360,15 +1362,15 @@ static void expect_response(listener *l, struct boxed_msg *box) { /* This should be treated like a send timeout. */ info = get_free_rx_info(l); assert(info); assert(info->state == RIS_INACTIVE); BUS_ASSERT(b, b->udata, info); BUS_ASSERT(b, b->udata, info->state == RIS_INACTIVE); BUS_LOG_SNPRINTF(b, 3-3, LOG_MEMORY, b->udata, 256, "Setting info %p (+%d)'s box to %p, which will be expired immediately (timeout %lld)", (void*)info, info->id, (void*)box, (long long)box->timeout_sec); info->state = RIS_EXPECT; assert(info->u.expect.box == NULL); BUS_ASSERT(b, b->udata, info->u.expect.box == NULL); info->u.expect.box = box; info->u.expect.error = RX_ERROR_NONE; info->u.expect.has_result = false; Loading
src/lib/kinetic_allocator.c +4 −4 Original line number Diff line number Diff line Loading @@ -42,7 +42,7 @@ KineticConnection* KineticAllocator_NewConnection(struct bus * b, KineticSession void KineticAllocator_FreeConnection(KineticConnection* connection) { assert(connection != NULL); KINETIC_ASSERT(connection != NULL); KineticResourceWaiter_Destroy(&connection->connectionReady); KineticFree(connection); } Loading @@ -59,7 +59,7 @@ KineticResponse * KineticAllocator_NewKineticResponse(size_t const valueLength) void KineticAllocator_FreeKineticResponse(KineticResponse * response) { assert(response != NULL); KINETIC_ASSERT(response != NULL); if (response->command != NULL) { protobuf_c_message_free_unpacked(&response->command->base, NULL); Loading @@ -72,7 +72,7 @@ void KineticAllocator_FreeKineticResponse(KineticResponse * response) KineticOperation* KineticAllocator_NewOperation(KineticConnection* const connection) { assert(connection != NULL); KINETIC_ASSERT(connection != NULL); LOGF3("Allocating new operation on connection (0x%0llX)", connection); KineticOperation* newOperation = (KineticOperation*)KineticCalloc(1, sizeof(KineticOperation)); Loading @@ -95,7 +95,7 @@ KineticOperation* KineticAllocator_NewOperation(KineticConnection* const connect void KineticAllocator_FreeOperation(KineticOperation* operation) { assert(operation != NULL); KINETIC_ASSERT(operation != NULL); KineticConnection* const connection = operation->connection; LOGF3("Freeing operation (0x%0llX) on connection (0x%0llX)", operation, connection); if (operation->request != NULL) { Loading
src/lib/kinetic_client.c +33 −33 Original line number Diff line number Diff line Loading @@ -126,8 +126,8 @@ KineticStatus KineticClient_DestroyConnection(KineticSession* const session) KineticStatus KineticClient_NoOp(KineticSession const * const session) { assert(session != NULL); assert(session->connection != NULL); KINETIC_ASSERT(session != NULL); KINETIC_ASSERT(session->connection != NULL); KineticOperation* operation = KineticController_CreateOperation(session); if (operation == NULL) {return KINETIC_STATUS_MEMORY_ERROR;} Loading @@ -140,13 +140,13 @@ KineticStatus KineticClient_Put(KineticSession const * const session, KineticEntry* const entry, KineticCompletionClosure* closure) { assert(session != NULL); assert(session->connection != NULL); assert(entry != NULL); assert(entry->value.array.data != NULL); KINETIC_ASSERT(session != NULL); KINETIC_ASSERT(session->connection != NULL); KINETIC_ASSERT(entry != NULL); KINETIC_ASSERT(entry->value.array.data != NULL); assert(session->connection->pSession == session); assert(session->connection == session->connection->pSession->connection); KINETIC_ASSERT(session->connection->pSession == session); KINETIC_ASSERT(session->connection == session->connection->pSession->connection); KineticOperation* operation = KineticController_CreateOperation(session); if (operation == NULL) {return KINETIC_STATUS_MEMORY_ERROR;} Loading @@ -155,7 +155,7 @@ KineticStatus KineticClient_Put(KineticSession const * const session, KineticOperation_BuildPut(operation, entry); // Execute the operation assert(operation->connection == session->connection); KINETIC_ASSERT(operation->connection == session->connection); KineticStatus res = KineticController_ExecuteOperation(operation, closure); return res; } Loading @@ -163,8 +163,8 @@ KineticStatus KineticClient_Put(KineticSession const * const session, KineticStatus KineticClient_Flush(KineticSession const * const session, KineticCompletionClosure* closure) { assert(session != NULL); assert(session->connection != NULL); KINETIC_ASSERT(session != NULL); KINETIC_ASSERT(session->connection != NULL); KineticOperation* operation = KineticController_CreateOperation(session); if (operation == NULL) { return KINETIC_STATUS_MEMORY_ERROR; } Loading Loading @@ -197,9 +197,9 @@ static KineticStatus handle_get_command(GET_COMMAND cmd, KineticEntry* const entry, KineticCompletionClosure* closure) { assert(session != NULL); assert(session->connection != NULL); assert(entry != NULL); KINETIC_ASSERT(session != NULL); KINETIC_ASSERT(session->connection != NULL); KINETIC_ASSERT(entry != NULL); if (!has_key(entry)) {return KINETIC_STATUS_MISSING_KEY;} if (!has_value_buffer(entry) && !entry->metadataOnly) { Loading @@ -224,7 +224,7 @@ static KineticStatus handle_get_command(GET_COMMAND cmd, KineticOperation_BuildGetPrevious(operation, entry); break; default: assert(false); KINETIC_ASSERT(false); } // Execute the operation Loading Loading @@ -256,9 +256,9 @@ KineticStatus KineticClient_Delete(KineticSession const * const session, KineticEntry* const entry, KineticCompletionClosure* closure) { assert(session != NULL); assert(session->connection != NULL); assert(entry != NULL); KINETIC_ASSERT(session != NULL); KINETIC_ASSERT(session->connection != NULL); KINETIC_ASSERT(entry != NULL); KineticOperation* operation = KineticController_CreateOperation(session); if (operation == NULL) {return KINETIC_STATUS_MEMORY_ERROR;} Loading @@ -275,12 +275,12 @@ KineticStatus KineticClient_GetKeyRange(KineticSession const * const session, ByteBufferArray* keys, KineticCompletionClosure* closure) { assert(session != NULL); assert(session->connection != NULL); assert(range != NULL); assert(keys != NULL); assert(keys->buffers != NULL); assert(keys->count > 0); KINETIC_ASSERT(session != NULL); KINETIC_ASSERT(session->connection != NULL); KINETIC_ASSERT(range != NULL); KINETIC_ASSERT(keys != NULL); KINETIC_ASSERT(keys->buffers != NULL); KINETIC_ASSERT(keys->count > 0); KineticOperation* operation = KineticController_CreateOperation(session); if (operation == NULL) {return KINETIC_STATUS_MEMORY_ERROR;} Loading @@ -297,9 +297,9 @@ KineticStatus KineticClient_GetLog(KineticSession const * const session, KineticDeviceInfo** info, KineticCompletionClosure* closure) { assert(session != NULL); assert(session->connection != NULL); assert(info != NULL); KINETIC_ASSERT(session != NULL); KINETIC_ASSERT(session->connection != NULL); KINETIC_ASSERT(info != NULL); KineticOperation* operation = KineticController_CreateOperation(session); if (operation == NULL) {return KINETIC_STATUS_MEMORY_ERROR;} Loading @@ -314,7 +314,7 @@ KineticStatus KineticClient_GetLog(KineticSession const * const session, void KineticClient_FreeDeviceInfo(KineticSession const * const session, KineticDeviceInfo* info) { assert(session != NULL); KINETIC_ASSERT(session != NULL); if (info) { KineticDeviceInfo_Free(info); } /* The session is not currently used, but part of the API to allow Loading @@ -327,9 +327,9 @@ KineticStatus KineticClient_P2POperation(KineticSession const * const session, KineticP2P_Operation* const p2pOp, KineticCompletionClosure* closure) { assert(session != NULL); assert(session->connection != NULL); assert(p2pOp != NULL); KINETIC_ASSERT(session != NULL); KINETIC_ASSERT(session->connection != NULL); KINETIC_ASSERT(p2pOp != NULL); KineticOperation* operation = KineticController_CreateOperation(session); if (operation == NULL) {return KINETIC_STATUS_MEMORY_ERROR;} Loading @@ -351,8 +351,8 @@ KineticStatus KineticClient_P2POperation(KineticSession const * const session, KineticStatus KineticClient_InstantSecureErase(KineticSession const * const session) { assert(session != NULL); assert(session->connection != NULL); KINETIC_ASSERT(session != NULL); KINETIC_ASSERT(session->connection != NULL); KineticOperation* operation = KineticController_CreateOperation(session); if (operation == NULL) {return KINETIC_STATUS_MEMORY_ERROR;} Loading
src/lib/kinetic_controller.c +4 −4 Original line number Diff line number Diff line Loading @@ -79,9 +79,9 @@ static KineticCompletionClosure DefaultClosure(DefaultCallbackData * const data) KineticStatus KineticController_ExecuteOperation(KineticOperation* operation, KineticCompletionClosure* const closure) { assert(operation != NULL); assert(operation->connection != NULL); assert(operation->connection->pSession != NULL); KINETIC_ASSERT(operation != NULL); KINETIC_ASSERT(operation->connection != NULL); KINETIC_ASSERT(operation->connection->pSession != NULL); KineticStatus status = KINETIC_STATUS_INVALID; if (closure != NULL) Loading Loading @@ -152,7 +152,7 @@ KineticStatus bus_to_kinetic_status(bus_send_status_t const status) default: { LOGF0("bus_to_kinetic_status: UNMATCHED %d\n", status); assert(false); KINETIC_ASSERT(false); return KINETIC_STATUS_INVALID; } } Loading