Loading Makefile +11 −3 Original line number Diff line number Diff line Loading @@ -70,6 +70,7 @@ LIB_OBJS = \ $(OUT_DIR)/kinetic_types_internal.o \ $(OUT_DIR)/kinetic_types.o \ $(OUT_DIR)/kinetic_memory.o \ $(OUT_DIR)/kinetic_semaphore.o \ $(OUT_DIR)/byte_array.o \ $(OUT_DIR)/kinetic_client.o \ $(OUT_DIR)/threadpool.o \ Loading Loading @@ -225,6 +226,8 @@ install: $(KINETIC_LIB) $(KINETIC_SO_DEV) $(INSTALL) -d $(PREFIX)/include/ $(INSTALL) -c $(PUB_INC)/$(API_NAME).h $(PREFIX)/include/ $(INSTALL) -c $(PUB_INC)/kinetic_types.h $(PREFIX)/include/ $(INSTALL) -c $(PUB_INC)/kinetic_semaphore.h $(PREFIX)/include/ $(INSTALL) -c $(PUB_INC)/byte_array.h $(PREFIX)/include/ uninstall: @echo Loading @@ -236,6 +239,8 @@ uninstall: $(RM) -f $(PREFIX)${LIBDIR}/lib$(PROJECT)*.so $(RM) -f $(PREFIX)/include/${API_NAME}.h $(RM) -f $(PREFIX)/include/kinetic_types.h $(RM) -f $(PREFIX)/include/kinetic_semaphore.h $(RM) -f $(PREFIX)/include/byte_array.h $(RM) -f $(PREFIX)/include/kinetic_proto.h $(RM) -f $(PREFIX)/include/protobuf-c/protobuf-c.h $(RM) -f $(PREFIX)/include/protobuf-c.h Loading Loading @@ -268,7 +273,7 @@ stop_simulator: SYSTEST_SRC = ./test/system SYSTEST_OUT = $(BIN_DIR)/systest SYSTEST_LDFLAGS += -lm -L${OPENSSL_PATH}/lib -lssl -lcrypto $(KINETIC_LIB) -l pthread SYSTEST_LDFLAGS += -lm $(KINETIC_LIB) -L${OPENSSL_PATH}/lib -lssl -lcrypto -lpthread SYSTEST_WARN = -Wall -Wextra -Werror -Wstrict-prototypes -pedantic -Wno-missing-field-initializers -Werror=strict-prototypes SYSTEST_CFLAGS += -std=c99 -fPIC -g $(SYSTEST_WARN) $(CDEFS) $(OPTIMIZE) -DTEST UNITY_INC = ./vendor/unity/src Loading Loading @@ -354,7 +359,7 @@ UTILITY = kinetic-c-util UTIL_DIR = ./src/utility UTIL_EXEC = $(BIN_DIR)/$(UTILITY) UTIL_OBJ = $(OUT_DIR)/main.o UTIL_LDFLAGS += -lm -L${OPENSSL_PATH}/lib -lssl $(KINETIC_LIB) -lcrypto -lpthread UTIL_LDFLAGS += -lm $(KINETIC_LIB) -L${OPENSSL_PATH}/lib -lssl -lcrypto -lpthread $(UTIL_OBJ): $(UTIL_DIR)/main.c $(CC) -c -o $@ $< $(CFLAGS) -I$(PUB_INC) -I$(UTIL_DIR) Loading Loading @@ -399,6 +404,7 @@ run: $(UTIL_EXEC) start_simulator EXAMPLE_SRC = ./src/examples EXAMPLE_LDFLAGS += -lm -l ssl $(KINETIC_LIB) -l crypto -l pthread EXAMPLE_CFLAGS += -Wno-deprecated-declarations EXAMPLES = write_file_blocking VALGRIND = valgrind VALGRIND_ARGS = --track-origins=yes #--leak-check=full Loading @@ -411,7 +417,7 @@ $(BIN_DIR)/examples/%: $(EXAMPLE_SRC)/%.c $(KINETIC_LIB) @echo ================================================================================ @echo Building example: '$<' @echo -------------------------------------------------------------------------------- $(CC) -o $@ $< $(CFLAGS) -I$(PUB_INC) $(UTIL_LDFLAGS) $(KINETIC_LIB) $(CC) -o $@ $< $(CFLAGS) $(EXAMPLE_CFLAGS) -I$(PUB_INC) $(UTIL_LDFLAGS) $(KINETIC_LIB) @echo ================================================================================ @echo Loading Loading @@ -451,6 +457,7 @@ setup_examples: $(example_executables) \ examples: setup_examples \ start_simulator \ run_example_put_nonblocking \ run_example_write_file_blocking \ run_example_write_file_blocking_threads \ run_example_write_file_nonblocking \ Loading @@ -460,6 +467,7 @@ examples: setup_examples \ valgrind_examples: setup_examples \ start_simulator \ valgrind_put_nonblocking \ valgrind_example_write_file_blocking \ valgrind_example_write_file_blocking_threads \ valgrind_example_write_file_nonblocking \ Loading include/byte_array.h +3 −0 Original line number Diff line number Diff line Loading @@ -62,5 +62,8 @@ ByteBuffer* ByteBuffer_AppendCString(ByteBuffer* buffer, const char* data); ByteBuffer* ByteBuffer_AppendFormattedCString(ByteBuffer* buffer, const char * format, ...); ByteBuffer* ByteBuffer_AppendDummyData(ByteBuffer* buffer, size_t len); bool ByteBuffer_IsNull(ByteBuffer const buffer); ByteBuffer ByteBuffer_Malloc(size_t size); ByteBuffer ByteBuffer_MallocAndAppend(const void* data, size_t len); void ByteBuffer_Free(ByteBuffer buffer); #endif // _BYTE_ARRAY_H include/kinetic_semaphore.h 0 → 100644 +14 −0 Original line number Diff line number Diff line #ifndef _KINETIC_SEMAPHORE_H #define _KINETIC_SEMAPHORE_H struct _KineticSemaphore; typedef struct _KineticSemaphore KineticSemaphore; KineticSemaphore * KineticSemaphore_Create(void); void KineticSemaphore_Lock(KineticSemaphore * sem); void KineticSemaphore_Unlock(KineticSemaphore * sem); void KineticSemaphore_Signal(KineticSemaphore * sem); void KineticSemaphore_WaitForSignalAndDestroy(KineticSemaphore * sem); #endif // _KINETIC_SEMAPHORE_H src/examples/put_nonblocking.c 0 → 100644 +116 −0 Original line number Diff line number Diff line #include "kinetic_client.h" #include "kinetic_types.h" #include "kinetic_semaphore.h" #include <stdlib.h> #include <openssl/sha.h> #include <pthread.h> typedef struct { KineticSemaphore * sem; KineticStatus status; } PutStatus; static void put_finished(KineticCompletionData* kinetic_data, void* clientData); int main(int argc, char** argv) { (void)argc; (void)argv; // Initialize kinetic-c and establish session KineticClient * client = KineticClient_Init("stdout", 0); if (client == NULL) { return 1; } const char HmacKeyString[] = "asdfasdf"; KineticSession session = {.config = { .host = "localhost", .port = KINETIC_PORT, .clusterVersion = 0, .identity = 1, .hmacKey = ByteArray_CreateWithCString(HmacKeyString), }}; KineticStatus connect_status = KineticClient_CreateConnection(&session, client); if (connect_status != KINETIC_STATUS_SUCCESS) { fprintf(stderr, "Failed connecting to the Kinetic device w/status: %s\n", Kinetic_GetStatusDescription(connect_status)); return 1; } // Create structure to populate with PUT status in callback // a semaphore is used to notify the main thread that it's // safe to proceed. PutStatus put_status = { .sem = KineticSemaphore_Create(), .status = KINETIC_STATUS_INVALID, }; // some dummy data to PUT uint8_t value_data[] = { 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F }; ByteBuffer value = ByteBuffer_MallocAndAppend(value_data, sizeof(value_data)); // a dummy key uint8_t key_data[] = {0x00, 0x01, 0x02, 0x03, 0x04}; ByteBuffer key = ByteBuffer_MallocAndAppend(key_data, sizeof(key_data)); // Populate tag with SHA1 ByteBuffer tag = ByteBuffer_Malloc(20); uint8_t sha1[20]; SHA1(value.array.data, value.bytesUsed, &sha1[0]); ByteBuffer_Append(&tag, sha1, sizeof(sha1)); // Because I'm passing a pointer to this entry to KineticClient_Put(), this entry must not // go out of scope until the PUT completes KineticEntry entry = { .key = key, .tag = tag, .algorithm = KINETIC_ALGORITHM_SHA1, .value = value, .synchronization = KINETIC_SYNCHRONIZATION_WRITETHROUGH, }; KineticStatus status = KineticClient_Put( &session, &entry, &(KineticCompletionClosure) { .callback = put_finished, .clientData = &put_status, } ); if (status != KINETIC_STATUS_SUCCESS) { fprintf(stderr, "Put failed w/status: %s\n", Kinetic_GetStatusDescription(status)); return 1; } // Wait for put to finish KineticSemaphore_WaitForSignalAndDestroy(put_status.sem); if (put_status.status != KINETIC_STATUS_SUCCESS) { fprintf(stderr, "Transfer failed w/status: %s\n", Kinetic_GetStatusDescription(put_status.status)); return 1; } printf("Transfer completed successfully!\n"); // Free malloc'd buffers ByteBuffer_Free(value); ByteBuffer_Free(key); ByteBuffer_Free(tag); // Shutdown client connection and cleanup KineticClient_DestroyConnection(&session); KineticClient_Shutdown(client); return 0; } static void put_finished(KineticCompletionData* kinetic_data, void* clientData) { PutStatus * put_status = clientData; KineticSemaphore_Lock(put_status->sem); // Save PUT result status put_status->status = kinetic_data->status; // Signal that we're done KineticSemaphore_Signal(put_status->sem); KineticSemaphore_Unlock(put_status->sem); } src/lib/bus/sender.c +10 −8 Original line number Diff line number Diff line Loading @@ -110,7 +110,7 @@ bool sender_register_socket(struct sender *s, int fd, SSL *ssl) { info->u.add_socket.fd = fd; info->u.add_socket.ssl = ssl; BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64, "registering socket %d with SSL %p", fd, ssl); "registering socket %d with SSL %p", fd, (void*)ssl); bool res = commit_event_and_block(s, info); release_tx_info(s, info); BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64, Loading Loading @@ -139,7 +139,7 @@ bool sender_send_request(struct sender *s, boxed_msg *box) { info->u.enqueue.box = box; BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64, "sending request on %d: box %p", box->fd, box); "sending request on %d: box %p", box->fd, (void*)box); bool res = commit_event_and_block(s, info); BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64, "sending request: releasing tx_info, res %d", res); Loading @@ -148,6 +148,7 @@ bool sender_send_request(struct sender *s, boxed_msg *box) { } bool sender_shutdown(struct sender *s) { if (s->fd_hash_table == NULL) { return true; } struct bus *b = s->bus; tx_info_t *info = get_free_tx_info(s); if (info == NULL) { return false; } Loading Loading @@ -381,6 +382,8 @@ static void cleanup(sender *s) { struct bus *b = s->bus; BUS_LOG(b, 2, LOG_SHUTDOWN, "sender_cleanup", b->udata); if (s->fd_hash_table) { /* make idempotent */ struct yacht *y = s->fd_hash_table; s->fd_hash_table = NULL; int shutdown_id = -1; for (int i = 0; i < MAX_CONCURRENT_SENDS; i++) { Loading @@ -404,8 +407,7 @@ static void cleanup(sender *s) { close(s->pipes[shutdown_id][1]); } yacht_free(s->fd_hash_table, free_fd_info_cb, NULL); s->fd_hash_table = NULL; yacht_free(y, free_fd_info_cb, NULL); } } Loading Loading @@ -728,7 +730,7 @@ static ssize_t socket_write_plain(sender *s, tx_info_t *info) { BUS_LOG_SNPRINTF(b, 10, LOG_SENDER, b->udata, 64, "write %p to %d, %zd bytes (info %d)", &msg[sent_size], fd, rem, info->id); (void*)&msg[sent_size], fd, rem, info->id); ssize_t wrsz = write(fd, &msg[sent_size], rem); if (wrsz == -1) { if (util_is_resumable_io_error(errno)) { Loading Loading @@ -815,7 +817,7 @@ static void update_sent(struct bus *b, sender *s, tx_info_t *info, ssize_t sent) BUS_LOG_SNPRINTF(b, 5, LOG_SENDER, b->udata, 64, "wrote %zd, msg_size %zd (%p)", sent, msg_size, box->out_msg); sent, msg_size, (void*)box->out_msg); if (rem == 0) { /* completed! */ fd_info *fdi = info->u.write.fdi; Loading @@ -835,7 +837,7 @@ static void update_sent(struct bus *b, sender *s, tx_info_t *info, ssize_t sent) decrement_fd_refcount(s, fdi); BUS_LOG_SNPRINTF(b, 5, LOG_SENDER, b->udata, 64, "wrote all of %p, clearing", box->out_msg); "wrote all of %p, clearing", (void*)box->out_msg); attempt_to_enqueue_message_to_listener(s, info); } } Loading Loading @@ -1027,7 +1029,7 @@ static void notify_message_failure(sender *s, tx_info_t *info, bus_send_status_t if (bus_process_boxed_message(s->bus, box, &backpressure)) { BUS_LOG_SNPRINTF(b, 5, LOG_SENDER, b->udata, 64, "deleting box %p for info->id %d (msg failure)", info->u.error.box, info->id); (void*)info->u.error.box, info->id); info->u.error.box = NULL; info->u.error.backpressure = backpressure; notify_caller(s, info, false); Loading Loading
Makefile +11 −3 Original line number Diff line number Diff line Loading @@ -70,6 +70,7 @@ LIB_OBJS = \ $(OUT_DIR)/kinetic_types_internal.o \ $(OUT_DIR)/kinetic_types.o \ $(OUT_DIR)/kinetic_memory.o \ $(OUT_DIR)/kinetic_semaphore.o \ $(OUT_DIR)/byte_array.o \ $(OUT_DIR)/kinetic_client.o \ $(OUT_DIR)/threadpool.o \ Loading Loading @@ -225,6 +226,8 @@ install: $(KINETIC_LIB) $(KINETIC_SO_DEV) $(INSTALL) -d $(PREFIX)/include/ $(INSTALL) -c $(PUB_INC)/$(API_NAME).h $(PREFIX)/include/ $(INSTALL) -c $(PUB_INC)/kinetic_types.h $(PREFIX)/include/ $(INSTALL) -c $(PUB_INC)/kinetic_semaphore.h $(PREFIX)/include/ $(INSTALL) -c $(PUB_INC)/byte_array.h $(PREFIX)/include/ uninstall: @echo Loading @@ -236,6 +239,8 @@ uninstall: $(RM) -f $(PREFIX)${LIBDIR}/lib$(PROJECT)*.so $(RM) -f $(PREFIX)/include/${API_NAME}.h $(RM) -f $(PREFIX)/include/kinetic_types.h $(RM) -f $(PREFIX)/include/kinetic_semaphore.h $(RM) -f $(PREFIX)/include/byte_array.h $(RM) -f $(PREFIX)/include/kinetic_proto.h $(RM) -f $(PREFIX)/include/protobuf-c/protobuf-c.h $(RM) -f $(PREFIX)/include/protobuf-c.h Loading Loading @@ -268,7 +273,7 @@ stop_simulator: SYSTEST_SRC = ./test/system SYSTEST_OUT = $(BIN_DIR)/systest SYSTEST_LDFLAGS += -lm -L${OPENSSL_PATH}/lib -lssl -lcrypto $(KINETIC_LIB) -l pthread SYSTEST_LDFLAGS += -lm $(KINETIC_LIB) -L${OPENSSL_PATH}/lib -lssl -lcrypto -lpthread SYSTEST_WARN = -Wall -Wextra -Werror -Wstrict-prototypes -pedantic -Wno-missing-field-initializers -Werror=strict-prototypes SYSTEST_CFLAGS += -std=c99 -fPIC -g $(SYSTEST_WARN) $(CDEFS) $(OPTIMIZE) -DTEST UNITY_INC = ./vendor/unity/src Loading Loading @@ -354,7 +359,7 @@ UTILITY = kinetic-c-util UTIL_DIR = ./src/utility UTIL_EXEC = $(BIN_DIR)/$(UTILITY) UTIL_OBJ = $(OUT_DIR)/main.o UTIL_LDFLAGS += -lm -L${OPENSSL_PATH}/lib -lssl $(KINETIC_LIB) -lcrypto -lpthread UTIL_LDFLAGS += -lm $(KINETIC_LIB) -L${OPENSSL_PATH}/lib -lssl -lcrypto -lpthread $(UTIL_OBJ): $(UTIL_DIR)/main.c $(CC) -c -o $@ $< $(CFLAGS) -I$(PUB_INC) -I$(UTIL_DIR) Loading Loading @@ -399,6 +404,7 @@ run: $(UTIL_EXEC) start_simulator EXAMPLE_SRC = ./src/examples EXAMPLE_LDFLAGS += -lm -l ssl $(KINETIC_LIB) -l crypto -l pthread EXAMPLE_CFLAGS += -Wno-deprecated-declarations EXAMPLES = write_file_blocking VALGRIND = valgrind VALGRIND_ARGS = --track-origins=yes #--leak-check=full Loading @@ -411,7 +417,7 @@ $(BIN_DIR)/examples/%: $(EXAMPLE_SRC)/%.c $(KINETIC_LIB) @echo ================================================================================ @echo Building example: '$<' @echo -------------------------------------------------------------------------------- $(CC) -o $@ $< $(CFLAGS) -I$(PUB_INC) $(UTIL_LDFLAGS) $(KINETIC_LIB) $(CC) -o $@ $< $(CFLAGS) $(EXAMPLE_CFLAGS) -I$(PUB_INC) $(UTIL_LDFLAGS) $(KINETIC_LIB) @echo ================================================================================ @echo Loading Loading @@ -451,6 +457,7 @@ setup_examples: $(example_executables) \ examples: setup_examples \ start_simulator \ run_example_put_nonblocking \ run_example_write_file_blocking \ run_example_write_file_blocking_threads \ run_example_write_file_nonblocking \ Loading @@ -460,6 +467,7 @@ examples: setup_examples \ valgrind_examples: setup_examples \ start_simulator \ valgrind_put_nonblocking \ valgrind_example_write_file_blocking \ valgrind_example_write_file_blocking_threads \ valgrind_example_write_file_nonblocking \ Loading
include/byte_array.h +3 −0 Original line number Diff line number Diff line Loading @@ -62,5 +62,8 @@ ByteBuffer* ByteBuffer_AppendCString(ByteBuffer* buffer, const char* data); ByteBuffer* ByteBuffer_AppendFormattedCString(ByteBuffer* buffer, const char * format, ...); ByteBuffer* ByteBuffer_AppendDummyData(ByteBuffer* buffer, size_t len); bool ByteBuffer_IsNull(ByteBuffer const buffer); ByteBuffer ByteBuffer_Malloc(size_t size); ByteBuffer ByteBuffer_MallocAndAppend(const void* data, size_t len); void ByteBuffer_Free(ByteBuffer buffer); #endif // _BYTE_ARRAY_H
include/kinetic_semaphore.h 0 → 100644 +14 −0 Original line number Diff line number Diff line #ifndef _KINETIC_SEMAPHORE_H #define _KINETIC_SEMAPHORE_H struct _KineticSemaphore; typedef struct _KineticSemaphore KineticSemaphore; KineticSemaphore * KineticSemaphore_Create(void); void KineticSemaphore_Lock(KineticSemaphore * sem); void KineticSemaphore_Unlock(KineticSemaphore * sem); void KineticSemaphore_Signal(KineticSemaphore * sem); void KineticSemaphore_WaitForSignalAndDestroy(KineticSemaphore * sem); #endif // _KINETIC_SEMAPHORE_H
src/examples/put_nonblocking.c 0 → 100644 +116 −0 Original line number Diff line number Diff line #include "kinetic_client.h" #include "kinetic_types.h" #include "kinetic_semaphore.h" #include <stdlib.h> #include <openssl/sha.h> #include <pthread.h> typedef struct { KineticSemaphore * sem; KineticStatus status; } PutStatus; static void put_finished(KineticCompletionData* kinetic_data, void* clientData); int main(int argc, char** argv) { (void)argc; (void)argv; // Initialize kinetic-c and establish session KineticClient * client = KineticClient_Init("stdout", 0); if (client == NULL) { return 1; } const char HmacKeyString[] = "asdfasdf"; KineticSession session = {.config = { .host = "localhost", .port = KINETIC_PORT, .clusterVersion = 0, .identity = 1, .hmacKey = ByteArray_CreateWithCString(HmacKeyString), }}; KineticStatus connect_status = KineticClient_CreateConnection(&session, client); if (connect_status != KINETIC_STATUS_SUCCESS) { fprintf(stderr, "Failed connecting to the Kinetic device w/status: %s\n", Kinetic_GetStatusDescription(connect_status)); return 1; } // Create structure to populate with PUT status in callback // a semaphore is used to notify the main thread that it's // safe to proceed. PutStatus put_status = { .sem = KineticSemaphore_Create(), .status = KINETIC_STATUS_INVALID, }; // some dummy data to PUT uint8_t value_data[] = { 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F }; ByteBuffer value = ByteBuffer_MallocAndAppend(value_data, sizeof(value_data)); // a dummy key uint8_t key_data[] = {0x00, 0x01, 0x02, 0x03, 0x04}; ByteBuffer key = ByteBuffer_MallocAndAppend(key_data, sizeof(key_data)); // Populate tag with SHA1 ByteBuffer tag = ByteBuffer_Malloc(20); uint8_t sha1[20]; SHA1(value.array.data, value.bytesUsed, &sha1[0]); ByteBuffer_Append(&tag, sha1, sizeof(sha1)); // Because I'm passing a pointer to this entry to KineticClient_Put(), this entry must not // go out of scope until the PUT completes KineticEntry entry = { .key = key, .tag = tag, .algorithm = KINETIC_ALGORITHM_SHA1, .value = value, .synchronization = KINETIC_SYNCHRONIZATION_WRITETHROUGH, }; KineticStatus status = KineticClient_Put( &session, &entry, &(KineticCompletionClosure) { .callback = put_finished, .clientData = &put_status, } ); if (status != KINETIC_STATUS_SUCCESS) { fprintf(stderr, "Put failed w/status: %s\n", Kinetic_GetStatusDescription(status)); return 1; } // Wait for put to finish KineticSemaphore_WaitForSignalAndDestroy(put_status.sem); if (put_status.status != KINETIC_STATUS_SUCCESS) { fprintf(stderr, "Transfer failed w/status: %s\n", Kinetic_GetStatusDescription(put_status.status)); return 1; } printf("Transfer completed successfully!\n"); // Free malloc'd buffers ByteBuffer_Free(value); ByteBuffer_Free(key); ByteBuffer_Free(tag); // Shutdown client connection and cleanup KineticClient_DestroyConnection(&session); KineticClient_Shutdown(client); return 0; } static void put_finished(KineticCompletionData* kinetic_data, void* clientData) { PutStatus * put_status = clientData; KineticSemaphore_Lock(put_status->sem); // Save PUT result status put_status->status = kinetic_data->status; // Signal that we're done KineticSemaphore_Signal(put_status->sem); KineticSemaphore_Unlock(put_status->sem); }
src/lib/bus/sender.c +10 −8 Original line number Diff line number Diff line Loading @@ -110,7 +110,7 @@ bool sender_register_socket(struct sender *s, int fd, SSL *ssl) { info->u.add_socket.fd = fd; info->u.add_socket.ssl = ssl; BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64, "registering socket %d with SSL %p", fd, ssl); "registering socket %d with SSL %p", fd, (void*)ssl); bool res = commit_event_and_block(s, info); release_tx_info(s, info); BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64, Loading Loading @@ -139,7 +139,7 @@ bool sender_send_request(struct sender *s, boxed_msg *box) { info->u.enqueue.box = box; BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64, "sending request on %d: box %p", box->fd, box); "sending request on %d: box %p", box->fd, (void*)box); bool res = commit_event_and_block(s, info); BUS_LOG_SNPRINTF(b, 3, LOG_SENDER, b->udata, 64, "sending request: releasing tx_info, res %d", res); Loading @@ -148,6 +148,7 @@ bool sender_send_request(struct sender *s, boxed_msg *box) { } bool sender_shutdown(struct sender *s) { if (s->fd_hash_table == NULL) { return true; } struct bus *b = s->bus; tx_info_t *info = get_free_tx_info(s); if (info == NULL) { return false; } Loading Loading @@ -381,6 +382,8 @@ static void cleanup(sender *s) { struct bus *b = s->bus; BUS_LOG(b, 2, LOG_SHUTDOWN, "sender_cleanup", b->udata); if (s->fd_hash_table) { /* make idempotent */ struct yacht *y = s->fd_hash_table; s->fd_hash_table = NULL; int shutdown_id = -1; for (int i = 0; i < MAX_CONCURRENT_SENDS; i++) { Loading @@ -404,8 +407,7 @@ static void cleanup(sender *s) { close(s->pipes[shutdown_id][1]); } yacht_free(s->fd_hash_table, free_fd_info_cb, NULL); s->fd_hash_table = NULL; yacht_free(y, free_fd_info_cb, NULL); } } Loading Loading @@ -728,7 +730,7 @@ static ssize_t socket_write_plain(sender *s, tx_info_t *info) { BUS_LOG_SNPRINTF(b, 10, LOG_SENDER, b->udata, 64, "write %p to %d, %zd bytes (info %d)", &msg[sent_size], fd, rem, info->id); (void*)&msg[sent_size], fd, rem, info->id); ssize_t wrsz = write(fd, &msg[sent_size], rem); if (wrsz == -1) { if (util_is_resumable_io_error(errno)) { Loading Loading @@ -815,7 +817,7 @@ static void update_sent(struct bus *b, sender *s, tx_info_t *info, ssize_t sent) BUS_LOG_SNPRINTF(b, 5, LOG_SENDER, b->udata, 64, "wrote %zd, msg_size %zd (%p)", sent, msg_size, box->out_msg); sent, msg_size, (void*)box->out_msg); if (rem == 0) { /* completed! */ fd_info *fdi = info->u.write.fdi; Loading @@ -835,7 +837,7 @@ static void update_sent(struct bus *b, sender *s, tx_info_t *info, ssize_t sent) decrement_fd_refcount(s, fdi); BUS_LOG_SNPRINTF(b, 5, LOG_SENDER, b->udata, 64, "wrote all of %p, clearing", box->out_msg); "wrote all of %p, clearing", (void*)box->out_msg); attempt_to_enqueue_message_to_listener(s, info); } } Loading Loading @@ -1027,7 +1029,7 @@ static void notify_message_failure(sender *s, tx_info_t *info, bus_send_status_t if (bus_process_boxed_message(s->bus, box, &backpressure)) { BUS_LOG_SNPRINTF(b, 5, LOG_SENDER, b->udata, 64, "deleting box %p for info->id %d (msg failure)", info->u.error.box, info->id); (void*)info->u.error.box, info->id); info->u.error.box = NULL; info->u.error.backpressure = backpressure; notify_caller(s, info, false); Loading