Loading Makefile +2 −3 Original line number Diff line number Diff line Loading @@ -11,7 +11,7 @@ PUB_INC = ./include # SSL/TLS Library Options #------------------------------------------------------------------------------- # FIXME: Currently OSX/homebrew specific, rework before integration. # This may need to be set if OpenSSL is in a nonstandard place. OPENSSL_PATH ?= . #=============================================================================== Loading Loading @@ -71,6 +71,7 @@ LIB_OBJS = \ $(OUT_DIR)/kinetic_types.o \ $(OUT_DIR)/kinetic_memory.o \ $(OUT_DIR)/kinetic_semaphore.o \ $(OUT_DIR)/kinetic_countingsemaphore.o \ $(OUT_DIR)/byte_array.o \ $(OUT_DIR)/kinetic_client.o \ $(OUT_DIR)/threadpool.o \ Loading Loading @@ -463,7 +464,6 @@ examples: setup_examples \ run_example_write_file_blocking \ run_example_write_file_blocking_threads \ run_example_write_file_nonblocking \ run_example_write_file_nonblocking_threads \ run_example_get_key_range \ stop_simulator Loading @@ -474,6 +474,5 @@ valgrind_examples: setup_examples \ valgrind_example_write_file_blocking \ valgrind_example_write_file_blocking_threads \ valgrind_example_write_file_nonblocking \ valgrind_example_write_file_nonblocking_threads \ valgrind_example_get_key_range \ stop_simulator README.md +6 −0 Original line number Diff line number Diff line Loading @@ -21,6 +21,12 @@ Prerequisites * OSX (using [Homebrew](http://brew.sh/)) * `> brew install openssl` A release of OpenSSL that provides TLS 1.1 or newer is required. If the OpenSSL installation is not found, the `OPENSSL_PATH` environment variable may need to be set to its base path, e.g. `/usr/local/openssl/1.0.1k/`. Getting Started --------------- Loading include/kinetic_semaphore.h +1 −2 Original line number Diff line number Diff line #ifndef _KINETIC_SEMAPHORE_H #define _KINETIC_SEMAPHORE_H struct _KineticSemaphore; typedef struct _KineticSemaphore KineticSemaphore; /** * @brief Creates a KineticSemaphore. The KineticSemaphore is a simple wrapper * around a pthread condition variable and provides a a thread safe * around a pthread condition variable and provides a a thread-safe * way to block a thread and wait for notification from another thread. * * @return Returns a pointer to a KineticSemaphore Loading src/examples/write_file_nonblocking_threads.cdeleted 100644 → 0 +0 −300 Original line number Diff line number Diff line /* * kinetic-c * Copyright (C) 2014 Seagate Technology. * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License * as published by the Free Software Foundation; either version 2 * of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * */ #include "kinetic_client.h" #include "kinetic_types.h" #include "byte_array.h" #include <stdlib.h> #include <getopt.h> #include <stdio.h> #include <sys/param.h> #include <sys/stat.h> #include <sys/file.h> #include <pthread.h> #include <errno.h> #define REPORT_ERRNO(en, msg) if(en != 0){errno = en; perror(msg);} struct kinetic_thread_arg { char ip[16]; struct kinetic_put_arg* opArgs; int opCount; }; typedef struct { size_t opsInProgress; size_t currentChunk; size_t maxOverlappedChunks; int fd; ByteBuffer keyPrefix; uint8_t keyPrefixBuffer[KINETIC_DEFAULT_KEY_LEN]; pthread_mutex_t transferMutex; pthread_mutex_t completeMutex; pthread_cond_t completeCond; KineticStatus status; KineticSession* session; } FileTransferProgress; typedef struct { KineticEntry entry; uint8_t key[KINETIC_DEFAULT_KEY_LEN]; uint8_t value[KINETIC_OBJ_SIZE]; uint8_t tag[KINETIC_DEFAULT_KEY_LEN]; FileTransferProgress* currentTransfer; } AsyncWriteClosureData; typedef struct { KineticSession* session; const char* filename; const size_t maxOverlappedChunks; uint64_t keyPrefix; pthread_t thread; KineticStatus status; } StoreFileOperation; void* store_file_thread(void* storeArgs); FileTransferProgress* start_file_transfer(KineticSession* session, char const * const filename, uint64_t prefix, uint32_t maxOverlappedChunks); KineticStatus wait_for_transfer_complete(FileTransferProgress* const transfer); static int put_chunk_of_file(FileTransferProgress* transfer); static void put_chunk_of_file_finished(KineticCompletionData* kinetic_data, void* client_data); int main(int argc, char** argv) { (void)argc; (void)argv; // Initialize kinetic-c and configure session const char HmacKeyString[] = "asdfasdf"; KineticSession session = { .config = (KineticSessionConfig) { .host = "localhost", .port = KINETIC_PORT, .clusterVersion = 0, .identity = 1, .hmacKey = ByteArray_CreateWithCString(HmacKeyString), } }; KineticClient * client = KineticClient_Init("stdout", 0); if (client == NULL) { return 1; } // Establish connection KineticStatus status = KineticClient_CreateConnection(&session, client); if (status != KINETIC_STATUS_SUCCESS) { fprintf(stdout, "Failed connecting to the Kinetic device w/status: %s\n", Kinetic_GetStatusDescription(status)); return -1; } // Create a unique/common key prefix struct timeval now; gettimeofday(&now, NULL); uint64_t prefix = (uint64_t)now.tv_sec << sizeof(8); // Store the file(s) and wait for completion bool success = true; const int maxOverlappedChunks = 15; StoreFileOperation ops[] = { { .session = &session, .filename = "./test/support/data/file_a.png", .keyPrefix = prefix, .maxOverlappedChunks = maxOverlappedChunks, }, { .session = &session, .filename = "./test/support/data/file_b.png", .keyPrefix = prefix, .maxOverlappedChunks = maxOverlappedChunks, }, { .session = &session, .filename = "./test/support/data/file_c.png", .keyPrefix = prefix, .maxOverlappedChunks = maxOverlappedChunks, }, }; const int numFiles = sizeof(ops) / sizeof(StoreFileOperation); for (int i = 0; i < numFiles; i++) { printf("Storing '%s' to disk...\n", ops[i].filename); int pthreadStatus = pthread_create(&ops[i].thread, NULL, store_file_thread, &ops[i]); if (pthreadStatus != 0) { REPORT_ERRNO(pthreadStatus, "pthread_create"); fprintf(stdout, "Failed creating store thread for '%s'!\n", ops[i].filename); success = false; } } for (int i = 0; i < numFiles; i++) { int pthreadStatus = pthread_join(ops[i].thread, NULL); if (pthreadStatus == 0) { printf("File '%s' stored successfully!\n", ops[i].filename); } else { REPORT_ERRNO(pthreadStatus, "pthread_join"); fprintf(stdout, "Failed storing '%s' to disk! status: %s\n", ops[i].filename, Kinetic_GetStatusDescription(ops[i].status)); success = false; } } printf("Complete!\n"); // Shutdown client connection and cleanup KineticClient_DestroyConnection(&session); KineticClient_Shutdown(client); return success ? 0 : -1; } void* store_file_thread(void* storeArgs) { // Kick off the chained write/PUT operations and wait for completion StoreFileOperation* op = storeArgs; FileTransferProgress* transfer = start_file_transfer(op->session, op->filename, op->keyPrefix, op->maxOverlappedChunks); op->status = wait_for_transfer_complete(transfer); if (op->status != KINETIC_STATUS_SUCCESS) { fprintf(stdout, "Transfer failed w/status: %s\n", Kinetic_GetStatusDescription(op->status)); } return (void*)storeArgs; } FileTransferProgress * start_file_transfer(KineticSession* session, char const * const filename, uint64_t prefix, uint32_t maxOverlappedChunks) { FileTransferProgress * transferState = malloc(sizeof(FileTransferProgress)); *transferState = (FileTransferProgress) { .session = session, .maxOverlappedChunks = maxOverlappedChunks, .keyPrefix = ByteBuffer_CreateAndAppend(transferState->keyPrefixBuffer, sizeof(transferState->keyPrefixBuffer), &prefix, sizeof(prefix)), .fd = open(filename, O_RDONLY), }; pthread_mutex_init(&transferState->transferMutex, NULL); pthread_mutex_init(&transferState->completeMutex, NULL); pthread_cond_init(&transferState->completeCond, NULL); // Start max overlapped PUT operations for (size_t i = 0; i < transferState->maxOverlappedChunks; i++) { put_chunk_of_file(transferState); } return transferState; } KineticStatus wait_for_transfer_complete(FileTransferProgress* const transfer) { pthread_mutex_lock(&transfer->completeMutex); pthread_cond_wait(&transfer->completeCond, &transfer->completeMutex); pthread_mutex_unlock(&transfer->completeMutex); KineticStatus status = transfer->status; pthread_mutex_destroy(&transfer->completeMutex); pthread_cond_destroy(&transfer->completeCond); close(transfer->fd); pthread_mutex_destroy(&transfer->transferMutex); free(transfer); return status; } int put_chunk_of_file(FileTransferProgress* transfer) { AsyncWriteClosureData* closureData = calloc(1, sizeof(AsyncWriteClosureData)); pthread_mutex_lock(&transfer->transferMutex); transfer->opsInProgress++; closureData->currentTransfer = transfer; int bytesRead = read(transfer->fd, closureData->value, sizeof(closureData->value)); if (bytesRead > 0) { transfer->currentChunk++; closureData->entry = (KineticEntry){ .key = ByteBuffer_CreateAndAppend(closureData->key, sizeof(closureData->key), transfer->keyPrefix.array.data, transfer->keyPrefix.bytesUsed), .tag = ByteBuffer_CreateAndAppendFormattedCString(closureData->tag, sizeof(closureData->tag), "some_value_tag..._%04d", transfer->currentChunk), .algorithm = KINETIC_ALGORITHM_SHA1, .value = ByteBuffer_Create(closureData->value, sizeof(closureData->value), (size_t)bytesRead), .synchronization = KINETIC_SYNCHRONIZATION_WRITETHROUGH, }; KineticStatus status = KineticClient_Put(transfer->session, &closureData->entry, &(KineticCompletionClosure) { .callback = put_chunk_of_file_finished, .clientData = closureData, }); if (status != KINETIC_STATUS_SUCCESS) { transfer->opsInProgress--; free(closureData); fprintf(stdout, "Failed writing chunk! PUT request reported status: %s\n", Kinetic_GetStatusDescription(status)); } } else if (bytesRead == 0) { // EOF reached transfer->opsInProgress--; free(closureData); } else { transfer->opsInProgress--; free(closureData); fprintf(stdout, "Failed reading data from file!\n"); REPORT_ERRNO(bytesRead, "read"); } pthread_mutex_unlock(&transfer->transferMutex); return bytesRead; } void put_chunk_of_file_finished(KineticCompletionData* kinetic_data, void* clientData) { AsyncWriteClosureData* closureData = clientData; FileTransferProgress* transfer = closureData->currentTransfer; free(closureData); pthread_mutex_lock(&transfer->transferMutex); transfer->opsInProgress--; pthread_mutex_unlock(&transfer->transferMutex); if (kinetic_data->status == KINETIC_STATUS_SUCCESS) { int bytesPut = put_chunk_of_file(transfer); if (bytesPut <= 0 && transfer->opsInProgress == 0) { if (transfer->status == KINETIC_STATUS_NOT_ATTEMPTED) { transfer->status = KINETIC_STATUS_SUCCESS; } pthread_cond_signal(&transfer->completeCond); } } else { transfer->status = kinetic_data->status; // only signal when finished // keep track of outstanding operations // if there is no more data to read (or error), and no outstanding operations, // then signal pthread_cond_signal(&transfer->completeCond); fprintf(stdout, "Failed writing chunk! PUT response reported status: %s\n", Kinetic_GetStatusDescription(kinetic_data->status)); } } src/lib/bus/bus.c +50 −2 Original line number Diff line number Diff line Loading @@ -26,6 +26,7 @@ #include <errno.h> #include <assert.h> #include <limits.h> #include <sys/resource.h> #include "bus.h" #include "sender.h" Loading @@ -46,6 +47,7 @@ static int listener_id_of_socket(struct bus *b, int fd); static void noop_log_cb(log_event_t event, int log_level, const char *msg, void *udata); static void noop_error_cb(bus_unpack_cb_res_t result, void *socket_udata); static bool attempt_to_increase_resource_limits(struct bus *b); static void set_defaults(bus_config *cfg) { if (cfg->sender_count == 0) { cfg->sender_count = 1; } Loading Loading @@ -110,6 +112,8 @@ bool bus_init(bus_config *config, struct bus_result *res) { log_lock_init = true; attempt_to_increase_resource_limits(b); BUS_LOG_SNPRINTF(b, 3, LOG_INITIALIZATION, b->udata, 64, "Initialized bus at %p", (void*)b); Loading Loading @@ -223,6 +227,31 @@ cleanup: return false; } static bool attempt_to_increase_resource_limits(struct bus *b) { struct rlimit info; if (-1 == getrlimit(RLIMIT_NOFILE, &info)) { fprintf(stderr, "getrlimit: %s", strerror(errno)); errno = 0; return false; } const unsigned int nval = 1024; BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 256, "Current FD resource limits, [%lu, %lu], changing to %u", (unsigned long)info.rlim_cur, (unsigned long)info.rlim_max, nval); if (info.rlim_cur < nval && info.rlim_max > nval) { info.rlim_cur = nval; if (-1 == setrlimit(RLIMIT_NOFILE, &info)) { fprintf(stderr, "getrlimit: %s", strerror(errno)); errno = 0; return false; } } return true; } /* Pack message to deliver on behalf of the user into an envelope * that can track status / routing along the way. * Loading Loading @@ -507,13 +536,17 @@ bool bus_shutdown(bus *b) { for (int i = 0; i < b->sender_count; i++) { int off = 0; if (!b->joined[i + off]) { BUS_LOG(b, 2, LOG_SHUTDOWN, "sender_shutdown...", b->udata); BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128, "sender_shutdown -- %d", i); while (!sender_shutdown(b->senders[i])) { BUS_LOG(b, 2, LOG_SHUTDOWN, "sender_shutdown... (retry)", b->udata); BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128, "sender_shutdown -- retry %d", i); sleep(1); } void *unused = NULL; int res = pthread_join(b->threads[i + off], &unused); BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128, "sender_shutdown -- joined %d", i); assert(res == 0); b->joined[i + off] = true; } Loading @@ -523,11 +556,17 @@ bool bus_shutdown(bus *b) { for (int i = 0; i < b->listener_count; i++) { int off = b->sender_count; if (!b->joined[i + off]) { BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128, "listener_shutdown -- %d", i); while (!listener_shutdown(b->listeners[i])) { sleep(1); } BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128, "listener_shutdown -- joining %d", i); void *unused = NULL; int res = pthread_join(b->threads[i + off], &unused); BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128, "listener_shutdown -- joined %d", i); assert(res == 0); b->joined[i + off] = true; } Loading Loading @@ -587,24 +626,33 @@ void bus_free(bus *b) { bus_shutdown(b); for (int i = 0; i < b->sender_count; i++) { BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128, "sender_free -- %d", i); sender_free(b->senders[i]); } free(b->senders); for (int i = 0; i < b->listener_count; i++) { BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128, "listener_free -- %d", i); listener_free(b->listeners[i]); } free(b->listeners); int limit = (1000 * THREAD_SHUTDOWN_SECONDS)/10; for (int i = 0; i < limit; i++) { BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128, "threadpool_shutdown -- %d", i); if (threadpool_shutdown(b->threadpool, false)) { break; } (void)poll(NULL, 0, 10); if (i == limit - 1) { BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128, "threadpool_shutdown -- %d (forced)", i); threadpool_shutdown(b->threadpool, true); } } BUS_LOG(b, 3, LOG_SHUTDOWN, "threadpool_free", b->udata); threadpool_free(b->threadpool); free(b->joined); Loading Loading
Makefile +2 −3 Original line number Diff line number Diff line Loading @@ -11,7 +11,7 @@ PUB_INC = ./include # SSL/TLS Library Options #------------------------------------------------------------------------------- # FIXME: Currently OSX/homebrew specific, rework before integration. # This may need to be set if OpenSSL is in a nonstandard place. OPENSSL_PATH ?= . #=============================================================================== Loading Loading @@ -71,6 +71,7 @@ LIB_OBJS = \ $(OUT_DIR)/kinetic_types.o \ $(OUT_DIR)/kinetic_memory.o \ $(OUT_DIR)/kinetic_semaphore.o \ $(OUT_DIR)/kinetic_countingsemaphore.o \ $(OUT_DIR)/byte_array.o \ $(OUT_DIR)/kinetic_client.o \ $(OUT_DIR)/threadpool.o \ Loading Loading @@ -463,7 +464,6 @@ examples: setup_examples \ run_example_write_file_blocking \ run_example_write_file_blocking_threads \ run_example_write_file_nonblocking \ run_example_write_file_nonblocking_threads \ run_example_get_key_range \ stop_simulator Loading @@ -474,6 +474,5 @@ valgrind_examples: setup_examples \ valgrind_example_write_file_blocking \ valgrind_example_write_file_blocking_threads \ valgrind_example_write_file_nonblocking \ valgrind_example_write_file_nonblocking_threads \ valgrind_example_get_key_range \ stop_simulator
README.md +6 −0 Original line number Diff line number Diff line Loading @@ -21,6 +21,12 @@ Prerequisites * OSX (using [Homebrew](http://brew.sh/)) * `> brew install openssl` A release of OpenSSL that provides TLS 1.1 or newer is required. If the OpenSSL installation is not found, the `OPENSSL_PATH` environment variable may need to be set to its base path, e.g. `/usr/local/openssl/1.0.1k/`. Getting Started --------------- Loading
include/kinetic_semaphore.h +1 −2 Original line number Diff line number Diff line #ifndef _KINETIC_SEMAPHORE_H #define _KINETIC_SEMAPHORE_H struct _KineticSemaphore; typedef struct _KineticSemaphore KineticSemaphore; /** * @brief Creates a KineticSemaphore. The KineticSemaphore is a simple wrapper * around a pthread condition variable and provides a a thread safe * around a pthread condition variable and provides a a thread-safe * way to block a thread and wait for notification from another thread. * * @return Returns a pointer to a KineticSemaphore Loading
src/examples/write_file_nonblocking_threads.cdeleted 100644 → 0 +0 −300 Original line number Diff line number Diff line /* * kinetic-c * Copyright (C) 2014 Seagate Technology. * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License * as published by the Free Software Foundation; either version 2 * of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * */ #include "kinetic_client.h" #include "kinetic_types.h" #include "byte_array.h" #include <stdlib.h> #include <getopt.h> #include <stdio.h> #include <sys/param.h> #include <sys/stat.h> #include <sys/file.h> #include <pthread.h> #include <errno.h> #define REPORT_ERRNO(en, msg) if(en != 0){errno = en; perror(msg);} struct kinetic_thread_arg { char ip[16]; struct kinetic_put_arg* opArgs; int opCount; }; typedef struct { size_t opsInProgress; size_t currentChunk; size_t maxOverlappedChunks; int fd; ByteBuffer keyPrefix; uint8_t keyPrefixBuffer[KINETIC_DEFAULT_KEY_LEN]; pthread_mutex_t transferMutex; pthread_mutex_t completeMutex; pthread_cond_t completeCond; KineticStatus status; KineticSession* session; } FileTransferProgress; typedef struct { KineticEntry entry; uint8_t key[KINETIC_DEFAULT_KEY_LEN]; uint8_t value[KINETIC_OBJ_SIZE]; uint8_t tag[KINETIC_DEFAULT_KEY_LEN]; FileTransferProgress* currentTransfer; } AsyncWriteClosureData; typedef struct { KineticSession* session; const char* filename; const size_t maxOverlappedChunks; uint64_t keyPrefix; pthread_t thread; KineticStatus status; } StoreFileOperation; void* store_file_thread(void* storeArgs); FileTransferProgress* start_file_transfer(KineticSession* session, char const * const filename, uint64_t prefix, uint32_t maxOverlappedChunks); KineticStatus wait_for_transfer_complete(FileTransferProgress* const transfer); static int put_chunk_of_file(FileTransferProgress* transfer); static void put_chunk_of_file_finished(KineticCompletionData* kinetic_data, void* client_data); int main(int argc, char** argv) { (void)argc; (void)argv; // Initialize kinetic-c and configure session const char HmacKeyString[] = "asdfasdf"; KineticSession session = { .config = (KineticSessionConfig) { .host = "localhost", .port = KINETIC_PORT, .clusterVersion = 0, .identity = 1, .hmacKey = ByteArray_CreateWithCString(HmacKeyString), } }; KineticClient * client = KineticClient_Init("stdout", 0); if (client == NULL) { return 1; } // Establish connection KineticStatus status = KineticClient_CreateConnection(&session, client); if (status != KINETIC_STATUS_SUCCESS) { fprintf(stdout, "Failed connecting to the Kinetic device w/status: %s\n", Kinetic_GetStatusDescription(status)); return -1; } // Create a unique/common key prefix struct timeval now; gettimeofday(&now, NULL); uint64_t prefix = (uint64_t)now.tv_sec << sizeof(8); // Store the file(s) and wait for completion bool success = true; const int maxOverlappedChunks = 15; StoreFileOperation ops[] = { { .session = &session, .filename = "./test/support/data/file_a.png", .keyPrefix = prefix, .maxOverlappedChunks = maxOverlappedChunks, }, { .session = &session, .filename = "./test/support/data/file_b.png", .keyPrefix = prefix, .maxOverlappedChunks = maxOverlappedChunks, }, { .session = &session, .filename = "./test/support/data/file_c.png", .keyPrefix = prefix, .maxOverlappedChunks = maxOverlappedChunks, }, }; const int numFiles = sizeof(ops) / sizeof(StoreFileOperation); for (int i = 0; i < numFiles; i++) { printf("Storing '%s' to disk...\n", ops[i].filename); int pthreadStatus = pthread_create(&ops[i].thread, NULL, store_file_thread, &ops[i]); if (pthreadStatus != 0) { REPORT_ERRNO(pthreadStatus, "pthread_create"); fprintf(stdout, "Failed creating store thread for '%s'!\n", ops[i].filename); success = false; } } for (int i = 0; i < numFiles; i++) { int pthreadStatus = pthread_join(ops[i].thread, NULL); if (pthreadStatus == 0) { printf("File '%s' stored successfully!\n", ops[i].filename); } else { REPORT_ERRNO(pthreadStatus, "pthread_join"); fprintf(stdout, "Failed storing '%s' to disk! status: %s\n", ops[i].filename, Kinetic_GetStatusDescription(ops[i].status)); success = false; } } printf("Complete!\n"); // Shutdown client connection and cleanup KineticClient_DestroyConnection(&session); KineticClient_Shutdown(client); return success ? 0 : -1; } void* store_file_thread(void* storeArgs) { // Kick off the chained write/PUT operations and wait for completion StoreFileOperation* op = storeArgs; FileTransferProgress* transfer = start_file_transfer(op->session, op->filename, op->keyPrefix, op->maxOverlappedChunks); op->status = wait_for_transfer_complete(transfer); if (op->status != KINETIC_STATUS_SUCCESS) { fprintf(stdout, "Transfer failed w/status: %s\n", Kinetic_GetStatusDescription(op->status)); } return (void*)storeArgs; } FileTransferProgress * start_file_transfer(KineticSession* session, char const * const filename, uint64_t prefix, uint32_t maxOverlappedChunks) { FileTransferProgress * transferState = malloc(sizeof(FileTransferProgress)); *transferState = (FileTransferProgress) { .session = session, .maxOverlappedChunks = maxOverlappedChunks, .keyPrefix = ByteBuffer_CreateAndAppend(transferState->keyPrefixBuffer, sizeof(transferState->keyPrefixBuffer), &prefix, sizeof(prefix)), .fd = open(filename, O_RDONLY), }; pthread_mutex_init(&transferState->transferMutex, NULL); pthread_mutex_init(&transferState->completeMutex, NULL); pthread_cond_init(&transferState->completeCond, NULL); // Start max overlapped PUT operations for (size_t i = 0; i < transferState->maxOverlappedChunks; i++) { put_chunk_of_file(transferState); } return transferState; } KineticStatus wait_for_transfer_complete(FileTransferProgress* const transfer) { pthread_mutex_lock(&transfer->completeMutex); pthread_cond_wait(&transfer->completeCond, &transfer->completeMutex); pthread_mutex_unlock(&transfer->completeMutex); KineticStatus status = transfer->status; pthread_mutex_destroy(&transfer->completeMutex); pthread_cond_destroy(&transfer->completeCond); close(transfer->fd); pthread_mutex_destroy(&transfer->transferMutex); free(transfer); return status; } int put_chunk_of_file(FileTransferProgress* transfer) { AsyncWriteClosureData* closureData = calloc(1, sizeof(AsyncWriteClosureData)); pthread_mutex_lock(&transfer->transferMutex); transfer->opsInProgress++; closureData->currentTransfer = transfer; int bytesRead = read(transfer->fd, closureData->value, sizeof(closureData->value)); if (bytesRead > 0) { transfer->currentChunk++; closureData->entry = (KineticEntry){ .key = ByteBuffer_CreateAndAppend(closureData->key, sizeof(closureData->key), transfer->keyPrefix.array.data, transfer->keyPrefix.bytesUsed), .tag = ByteBuffer_CreateAndAppendFormattedCString(closureData->tag, sizeof(closureData->tag), "some_value_tag..._%04d", transfer->currentChunk), .algorithm = KINETIC_ALGORITHM_SHA1, .value = ByteBuffer_Create(closureData->value, sizeof(closureData->value), (size_t)bytesRead), .synchronization = KINETIC_SYNCHRONIZATION_WRITETHROUGH, }; KineticStatus status = KineticClient_Put(transfer->session, &closureData->entry, &(KineticCompletionClosure) { .callback = put_chunk_of_file_finished, .clientData = closureData, }); if (status != KINETIC_STATUS_SUCCESS) { transfer->opsInProgress--; free(closureData); fprintf(stdout, "Failed writing chunk! PUT request reported status: %s\n", Kinetic_GetStatusDescription(status)); } } else if (bytesRead == 0) { // EOF reached transfer->opsInProgress--; free(closureData); } else { transfer->opsInProgress--; free(closureData); fprintf(stdout, "Failed reading data from file!\n"); REPORT_ERRNO(bytesRead, "read"); } pthread_mutex_unlock(&transfer->transferMutex); return bytesRead; } void put_chunk_of_file_finished(KineticCompletionData* kinetic_data, void* clientData) { AsyncWriteClosureData* closureData = clientData; FileTransferProgress* transfer = closureData->currentTransfer; free(closureData); pthread_mutex_lock(&transfer->transferMutex); transfer->opsInProgress--; pthread_mutex_unlock(&transfer->transferMutex); if (kinetic_data->status == KINETIC_STATUS_SUCCESS) { int bytesPut = put_chunk_of_file(transfer); if (bytesPut <= 0 && transfer->opsInProgress == 0) { if (transfer->status == KINETIC_STATUS_NOT_ATTEMPTED) { transfer->status = KINETIC_STATUS_SUCCESS; } pthread_cond_signal(&transfer->completeCond); } } else { transfer->status = kinetic_data->status; // only signal when finished // keep track of outstanding operations // if there is no more data to read (or error), and no outstanding operations, // then signal pthread_cond_signal(&transfer->completeCond); fprintf(stdout, "Failed writing chunk! PUT response reported status: %s\n", Kinetic_GetStatusDescription(kinetic_data->status)); } }
src/lib/bus/bus.c +50 −2 Original line number Diff line number Diff line Loading @@ -26,6 +26,7 @@ #include <errno.h> #include <assert.h> #include <limits.h> #include <sys/resource.h> #include "bus.h" #include "sender.h" Loading @@ -46,6 +47,7 @@ static int listener_id_of_socket(struct bus *b, int fd); static void noop_log_cb(log_event_t event, int log_level, const char *msg, void *udata); static void noop_error_cb(bus_unpack_cb_res_t result, void *socket_udata); static bool attempt_to_increase_resource_limits(struct bus *b); static void set_defaults(bus_config *cfg) { if (cfg->sender_count == 0) { cfg->sender_count = 1; } Loading Loading @@ -110,6 +112,8 @@ bool bus_init(bus_config *config, struct bus_result *res) { log_lock_init = true; attempt_to_increase_resource_limits(b); BUS_LOG_SNPRINTF(b, 3, LOG_INITIALIZATION, b->udata, 64, "Initialized bus at %p", (void*)b); Loading Loading @@ -223,6 +227,31 @@ cleanup: return false; } static bool attempt_to_increase_resource_limits(struct bus *b) { struct rlimit info; if (-1 == getrlimit(RLIMIT_NOFILE, &info)) { fprintf(stderr, "getrlimit: %s", strerror(errno)); errno = 0; return false; } const unsigned int nval = 1024; BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 256, "Current FD resource limits, [%lu, %lu], changing to %u", (unsigned long)info.rlim_cur, (unsigned long)info.rlim_max, nval); if (info.rlim_cur < nval && info.rlim_max > nval) { info.rlim_cur = nval; if (-1 == setrlimit(RLIMIT_NOFILE, &info)) { fprintf(stderr, "getrlimit: %s", strerror(errno)); errno = 0; return false; } } return true; } /* Pack message to deliver on behalf of the user into an envelope * that can track status / routing along the way. * Loading Loading @@ -507,13 +536,17 @@ bool bus_shutdown(bus *b) { for (int i = 0; i < b->sender_count; i++) { int off = 0; if (!b->joined[i + off]) { BUS_LOG(b, 2, LOG_SHUTDOWN, "sender_shutdown...", b->udata); BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128, "sender_shutdown -- %d", i); while (!sender_shutdown(b->senders[i])) { BUS_LOG(b, 2, LOG_SHUTDOWN, "sender_shutdown... (retry)", b->udata); BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128, "sender_shutdown -- retry %d", i); sleep(1); } void *unused = NULL; int res = pthread_join(b->threads[i + off], &unused); BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128, "sender_shutdown -- joined %d", i); assert(res == 0); b->joined[i + off] = true; } Loading @@ -523,11 +556,17 @@ bool bus_shutdown(bus *b) { for (int i = 0; i < b->listener_count; i++) { int off = b->sender_count; if (!b->joined[i + off]) { BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128, "listener_shutdown -- %d", i); while (!listener_shutdown(b->listeners[i])) { sleep(1); } BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128, "listener_shutdown -- joining %d", i); void *unused = NULL; int res = pthread_join(b->threads[i + off], &unused); BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128, "listener_shutdown -- joined %d", i); assert(res == 0); b->joined[i + off] = true; } Loading Loading @@ -587,24 +626,33 @@ void bus_free(bus *b) { bus_shutdown(b); for (int i = 0; i < b->sender_count; i++) { BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128, "sender_free -- %d", i); sender_free(b->senders[i]); } free(b->senders); for (int i = 0; i < b->listener_count; i++) { BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128, "listener_free -- %d", i); listener_free(b->listeners[i]); } free(b->listeners); int limit = (1000 * THREAD_SHUTDOWN_SECONDS)/10; for (int i = 0; i < limit; i++) { BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128, "threadpool_shutdown -- %d", i); if (threadpool_shutdown(b->threadpool, false)) { break; } (void)poll(NULL, 0, 10); if (i == limit - 1) { BUS_LOG_SNPRINTF(b, 3, LOG_SHUTDOWN, b->udata, 128, "threadpool_shutdown -- %d (forced)", i); threadpool_shutdown(b->threadpool, true); } } BUS_LOG(b, 3, LOG_SHUTDOWN, "threadpool_free", b->udata); threadpool_free(b->threadpool); free(b->joined); Loading