Loading Makefile +10 −9 Original line number Diff line number Diff line Loading @@ -92,7 +92,7 @@ LIB_OBJS = \ KINETIC_LIB_OTHER_DEPS = Makefile Rakefile $(VERSION_FILE) default: makedirs $(KINETIC_LIB) default: makedirs json $(KINETIC_LIB) makedirs: @echo; mkdir -p ./bin/examples &> /dev/null; mkdir -p ./bin/unit &> /dev/null; mkdir -p ./bin/systest &> /dev/null; mkdir -p ./out &> /dev/null Loading Loading @@ -162,10 +162,14 @@ ci: uninstall stop_simulator start_simulator all stop_simulator install uninstal #------------------------------------------------------------------------------- json: ${OUT_DIR}/libjson-c.a json_install: json_install: json cd ${JSONC} && \ make install json_uninstall: if [ -f ${JSONC}/Makefile ]; then cd ${JSONC} && make uninstall; fi; ${JSONC}/Makefile: cd ${JSONC} && \ sh autogen.sh && \ Loading Loading @@ -277,9 +281,9 @@ uninstall: $(RM) -f $(PREFIX)/include/kinetic_proto.h $(RM) -f $(PREFIX)/include/protobuf-c/protobuf-c.h $(RM) -f $(PREFIX)/include/protobuf-c.h if [ -f ${JSONC}/Makefile ]; then cd ${JSONC} && make uninstall; fi; .PHONY: install uninstall .PHONY: install uninstall json_install json_uninstall #=============================================================================== # Java Simulator Support Loading Loading @@ -346,7 +350,6 @@ unit_tests: start_simulator $(unit_passfiles) # System Tests #=============================================================================== SYSTEST_SRC = ./test/system SYSTEST_OUT = $(BIN_DIR)/systest SYSTEST_LDFLAGS += -lm $(KINETIC_LIB) -L${OPENSSL_PATH}/lib -lssl -lcrypto -lpthread Loading Loading @@ -432,8 +435,6 @@ discovery_utility: $(DISCOVERY_UTIL_EXEC) build: discovery_utility #------------------------------------------------------------------------------- # Support for Simulator and Exection of Test Utility #------------------------------------------------------------------------------- Loading Loading @@ -509,7 +510,7 @@ examples: setup_examples \ run_example_write_file_blocking \ run_example_write_file_blocking_threads \ run_example_write_file_nonblocking \ run_example_get_key_range \ run_example_get_key_range valgrind_examples: setup_examples \ valgrind_put_nonblocking \ Loading @@ -517,4 +518,4 @@ valgrind_examples: setup_examples \ valgrind_example_write_file_blocking \ valgrind_example_write_file_blocking_threads \ valgrind_example_write_file_nonblocking \ valgrind_example_get_key_range \ valgrind_example_get_key_range src/lib/kinetic_logger.c +58 −59 Original line number Diff line number Diff line Loading @@ -34,18 +34,18 @@ #define KINETIC_LOGGER_SLEEP_TIME_SEC 10 #define KINETIC_LOGGER_BUFFER_FLUSH_SIZE (0.8 * KINETIC_LOGGER_BUFFER_SIZE) STATIC int KineticLogLevel = -1; STATIC FILE* KineticLoggerHandle = NULL; STATIC pthread_mutex_t KineticLoggerBufferMutex = PTHREAD_MUTEX_INITIALIZER; STATIC char KineticLoggerBuffer[KINETIC_LOGGER_BUFFER_SIZE][KINETIC_LOGGER_BUFFER_STR_MAX_LEN]; STATIC int KineticLoggerBufferSize = 0; static int KineticLogLevel = -1; static FILE* KineticLoggerHandle = NULL; static pthread_mutex_t KineticLoggerBufferMutex = PTHREAD_MUTEX_INITIALIZER; static char KineticLoggerBuffer[KINETIC_LOGGER_BUFFER_SIZE][KINETIC_LOGGER_BUFFER_STR_MAX_LEN]; static int KineticLoggerBufferSize = 0; #if KINETIC_LOGGER_FLUSH_THREAD_ENABLED STATIC pthread_t KineticLoggerFlushThread; STATIC bool KineticLoggerForceFlush = false; STATIC bool KineticLogggerAbortRequested = false; static pthread_t KineticLoggerFlushThread; static bool KineticLoggerForceFlush = false; static bool KineticLogggerAbortRequested = false; #else STATIC bool KineticLoggerForceFlush = true; static bool KineticLoggerForceFlush = true; #endif Loading @@ -62,7 +62,7 @@ static inline void KineticLogger_FinishBuffer(void); static void* KineticLogger_FlushThread(void* arg); static void KineticLogger_InitFlushThread(void); #endif static void KineticLogger_LogProtobufMessage(int log_level, const ProtobufCMessage *msg, char* _indent); static void KineticLogger_LogProtobufMessage(int log_level, const ProtobufCMessage *msg, char* indent); //------------------------------------------------------------------------------ Loading Loading @@ -176,24 +176,33 @@ void KineticLogger_LogHeader(int log_level, const KineticPDUHeader* header) KineticLogger_LogPrintf(log_level, " valueLength: %d", header->valueLength); } #define LOG_PROTO_INIT() char _indent[32] = " "; #define LOG_INDENT " " static char indent[64] = LOG_INDENT; static const size_t max_indent = sizeof(indent)-3; static int indent_overflow = 0; #define LOG_PROTO_INIT() \ indent_overflow = 0; #define LOG_PROTO_LEVEL_START(__name) \ KineticLogger_LogPrintf(2, "%s%s {", (_indent), (__name)); \ (strlen(_indent) < (sizeof(_indent) - 3 )) ? strcat(_indent, " ") : 0; #define LOG_PROTO_LEVEL_START_NO_INDENT() \ KineticLogger_LogPrintf(2, "{"); \ (strlen(_indent) < (sizeof(_indent) - 3)) ? strcat(_indent, " ") : 0; KineticLogger_LogPrintf(2, "%s%s {", indent, __name); \ if (strlen(indent) < max_indent) { strcat(indent, LOG_INDENT); } \ else { indent_overflow++; } #define LOG_PROTO_LEVEL_END() \ _indent[strlen(_indent) - 2] = '\0'; \ KineticLogger_LogPrintf(2, "%s}", _indent); if (indent_overflow == 0) { indent[strlen(indent) - 2] = '\0'; } \ else { indent_overflow--; } \ KineticLogger_LogPrintf(2, "%s}", indent); #define LOG_PROTO_LEVEL_START_ARRAY(__name, __quantity) \ KineticLogger_LogPrintf(2, "%s%s: (%u elements) [", (_indent), (__name), (__quantity)); \ (strlen(_indent) < (sizeof(_indent) - 3)) ? strcat(_indent, " ") : 0; KineticLogger_LogPrintf(2, "%s%s: (%u elements)", (indent), (__name), (__quantity)); \ KineticLogger_LogPrintf(2, "%s[", (indent)); \ if (strlen(indent) < max_indent) { strcat(indent, LOG_INDENT); } \ else { indent_overflow++; } #define LOG_PROTO_LEVEL_END_ARRAY() \ _indent[strlen(_indent) - 2] = '\0'; \ KineticLogger_LogPrintf(2, "%s]", _indent); if (indent_overflow == 0) { indent[strlen(indent) - 2] = '\0'; } \ else { indent_overflow--; } \ KineticLogger_LogPrintf(2, "%s]", (indent)); static int KineticLogger_u8toa(char* p_buf, uint8_t val) { Loading Loading @@ -231,7 +240,7 @@ static void LogUnboxed(int log_level, void const * const fieldData, ProtobufCFieldDescriptor const * const fieldDesc, size_t const i, char* _indent) char* indent) { switch (fieldDesc->type) { case PROTOBUF_C_TYPE_INT32: Loading @@ -239,7 +248,7 @@ static void LogUnboxed(int log_level, case PROTOBUF_C_TYPE_SFIXED32: { int32_t const * value = (int32_t const *)fieldData; KineticLogger_LogPrintf(log_level, "%ld", value[i]); KineticLogger_LogPrintf(log_level, "%s%s: %ld", indent, fieldDesc->name, value[i]); } break; Loading @@ -248,7 +257,7 @@ static void LogUnboxed(int log_level, case PROTOBUF_C_TYPE_SFIXED64: { int64_t* value = (int64_t*)fieldData; KineticLogger_LogPrintf(log_level, "%lld", value[i]); KineticLogger_LogPrintf(log_level, "%s%s: %lld", indent, fieldDesc->name, value[i]); } break; Loading @@ -256,7 +265,7 @@ static void LogUnboxed(int log_level, case PROTOBUF_C_TYPE_FIXED32: { uint32_t* value = (uint32_t*)fieldData; KineticLogger_LogPrintf(log_level, "%lu", value[i]); KineticLogger_LogPrintf(log_level, "%s%s: %lu", indent, fieldDesc->name, value[i]); } break; Loading @@ -264,43 +273,43 @@ static void LogUnboxed(int log_level, case PROTOBUF_C_TYPE_FIXED64: { uint64_t* value = (uint64_t*)fieldData; KineticLogger_LogPrintf(log_level, "%llu", value[i]); KineticLogger_LogPrintf(log_level, "%s%s: %llu", indent, fieldDesc->name, value[i]); } break; case PROTOBUF_C_TYPE_FLOAT: { float* value = (float*)fieldData; KineticLogger_LogPrintf(log_level, "%f", value[i]); KineticLogger_LogPrintf(log_level, "%s%s: %f", indent, fieldDesc->name, value[i]); } break; case PROTOBUF_C_TYPE_DOUBLE: { double* value = (double*)fieldData; KineticLogger_LogPrintf(log_level, "%f", value[i]); KineticLogger_LogPrintf(log_level, "%s%s: %f", indent, fieldDesc->name, value[i]); } break; case PROTOBUF_C_TYPE_BOOL: { protobuf_c_boolean* value = (protobuf_c_boolean*)fieldData; KineticLogger_LogPrintf(log_level, "%s", BOOL_TO_STRING(value[i])); KineticLogger_LogPrintf(log_level, "%s%s: %s", indent, fieldDesc->name, BOOL_TO_STRING(value[i])); } break; case PROTOBUF_C_TYPE_STRING: { char** strings = (char**)fieldData; KineticLogger_LogPrintf(log_level, "%s", strings[i]); KineticLogger_LogPrintf(log_level, "%s%s: %s", indent, fieldDesc->name, strings[i]); } break; case PROTOBUF_C_TYPE_BYTES: { ProtobufCBinaryData* value = (ProtobufCBinaryData*)fieldData; LOG_PROTO_LEVEL_START_NO_INDENT(); KineticLogger_LogByteArray(log_level, _indent, LOG_PROTO_LEVEL_START(fieldDesc->name); KineticLogger_LogByteArray(log_level, indent, (ByteArray){.data = value[i].data, .len = value[i].len}); LOG_PROTO_LEVEL_END(); Loading @@ -312,18 +321,17 @@ static void LogUnboxed(int log_level, int * value = (int*)fieldData; ProtobufCEnumDescriptor const * enumDesc = fieldDesc->descriptor; ProtobufCEnumValue const * enumVal = protobuf_c_enum_descriptor_get_value(enumDesc, value[i]); KineticLogger_LogPrintf(log_level, "%s", enumVal->name); KineticLogger_LogPrintf(log_level, "%s%s: %s", indent, fieldDesc->name, enumVal->name); } break; case PROTOBUF_C_TYPE_MESSAGE: // nested message { ProtobufCMessage** msg = (ProtobufCMessage**)fieldData; if (msg[i] != NULL) { LOG_PROTO_LEVEL_START_NO_INDENT(); KineticLogger_LogProtobufMessage(log_level, msg[i], _indent); LOG_PROTO_LEVEL_START(fieldDesc->name); KineticLogger_LogProtobufMessage(log_level, msg[i], indent); LOG_PROTO_LEVEL_END(); } } break; Loading @@ -335,7 +343,7 @@ static void LogUnboxed(int log_level, }; } static void KineticLogger_LogProtobufMessage(int log_level, ProtobufCMessage const * msg, char* _indent) static void KineticLogger_LogProtobufMessage(int log_level, ProtobufCMessage const * msg, char* indent) { if (msg == NULL || msg->descriptor == NULL || !KineticLogger_IsLevelEnabled(log_level)) { return; Loading @@ -355,9 +363,7 @@ static void KineticLogger_LogProtobufMessage(int log_level, ProtobufCMessage con { case PROTOBUF_C_LABEL_REQUIRED: { printf("%s%s: ", _indent, fieldDesc->name); LogUnboxed(log_level, &pMsg[fieldDesc->offset], fieldDesc, 0, _indent); LogUnboxed(log_level, &pMsg[fieldDesc->offset], fieldDesc, 0, indent); } break; case PROTOBUF_C_LABEL_OPTIONAL: { Loading @@ -366,39 +372,32 @@ static void KineticLogger_LogProtobufMessage(int log_level, ProtobufCMessage con // and a special case: if this is a message, don't show it if the message is NULL (PROTOBUF_C_TYPE_MESSAGE != fieldDesc->type || ((ProtobufCMessage**)(void*)&pMsg[fieldDesc->offset])[0] != NULL)) { printf("%s%s: ", _indent, fieldDesc->name); /* special case for nested command packed into commandBytes field */ // special case for nested command packed into commandBytes field if ((protobuf_c_message_descriptor_get_field_by_name(desc, "commandBytes") == fieldDesc ) && (PROTOBUF_C_TYPE_BYTES == fieldDesc->type)) { (PROTOBUF_C_TYPE_BYTES == fieldDesc->type)) { ProtobufCBinaryData* value = (ProtobufCBinaryData*)(void*)&pMsg[fieldDesc->offset]; if ((value->data != NULL) && (value->len > 0)) { LOG_PROTO_LEVEL_START(fieldDesc->name); KineticProto_Command * cmd = KineticProto_command__unpack(NULL, value->len, value->data); LOG_PROTO_LEVEL_START_NO_INDENT(); KineticLogger_LogProtobufMessage(log_level, &cmd->base, _indent); KineticLogger_LogProtobufMessage(log_level, &cmd->base, indent); LOG_PROTO_LEVEL_END(); free(cmd); } } else { LogUnboxed(log_level, &pMsg[fieldDesc->offset], fieldDesc, 0, _indent); else { LogUnboxed(log_level, &pMsg[fieldDesc->offset], fieldDesc, 0, indent); } } } break; case PROTOBUF_C_LABEL_REPEATED: { unsigned const * quantifier = (unsigned const *)(void*)&pMsg[fieldDesc->quantifier_offset]; if (*quantifier > 0) { if (*quantifier > 0) { LOG_PROTO_LEVEL_START_ARRAY(fieldDesc->name, *quantifier); for (uint32_t i = 0; i < *quantifier; i++) { for (uint32_t i = 0; i < *quantifier; i++) { void const ** box = (void const **)(void*)&pMsg[fieldDesc->offset]; printf("%s", _indent); LogUnboxed(log_level, *box, fieldDesc, i, _indent); LogUnboxed(log_level, *box, fieldDesc, i, indent); } LOG_PROTO_LEVEL_END_ARRAY(); } Loading @@ -416,7 +415,7 @@ void KineticLogger_LogProtobuf(int log_level, const KineticProto_Message* msg) KineticLogger_Log(log_level, "Kinetic Protobuf:"); KineticLogger_LogProtobufMessage(log_level, &msg->base, _indent); KineticLogger_LogProtobufMessage(log_level, &msg->base, indent); } void KineticLogger_LogStatus(int log_level, KineticProto_Command_Status* status) Loading src/lib/kinetic_operation.c +9 −5 Original line number Diff line number Diff line Loading @@ -76,7 +76,7 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o // Populate sequence count and increment it for next operation assert(request->message.header.sequence == KINETIC_SEQUENCE_NOT_YET_BOUND); int64_t seq_id = operation->connection->sequence++; int64_t seq_id = KineticSession_GetNextSequenceCount(session); request->message.header.sequence = seq_id; // Pack the command, if available Loading Loading @@ -826,11 +826,14 @@ KineticStatus KineticOperation_SetClusterVersionCallback(KineticOperation* const assert(operation->connection != NULL); LOGF3("SetClusterVersion callback w/ operation (0x%0llX) on connection (0x%0llX)", operation, operation->connection); (void)status; return KINETIC_STATUS_SUCCESS; if (status == KINETIC_STATUS_SUCCESS) { KineticSession_SetClusterVersion(operation->connection->pSession, operation->pendingClusterVersion); operation->pendingClusterVersion = -1; // Invalidate } return status; } void KineticOperation_BuildSetClusterVersion(KineticOperation* operation, int64_t newClusterVersion) void KineticOperation_BuildSetClusterVersion(KineticOperation* operation, int64_t new_cluster_version) { KineticOperation_ValidateOperation(operation); Loading @@ -839,7 +842,7 @@ void KineticOperation_BuildSetClusterVersion(KineticOperation* operation, int64_ operation->request->command->body = &operation->request->message.body; operation->request->command->body->setup = &operation->request->message.setup; operation->request->command->body->setup->newClusterVersion = newClusterVersion; operation->request->command->body->setup->newClusterVersion = new_cluster_version; operation->request->command->body->setup->has_newClusterVersion = true; operation->request->command->body = &operation->request->message.body; Loading @@ -847,4 +850,5 @@ void KineticOperation_BuildSetClusterVersion(KineticOperation* operation, int64_ operation->valueEnabled = false; operation->sendValue = false; operation->callback = &KineticOperation_SetClusterVersionCallback; operation->pendingClusterVersion = new_cluster_version; } src/lib/kinetic_operation.h +1 −1 Original line number Diff line number Diff line Loading @@ -88,6 +88,6 @@ void KineticOperation_BuildSetAcl(KineticOperation* const operation, KineticDeviceInfo** info); KineticStatus KineticOperation_SetClusterVersionCallback(KineticOperation* const operation, KineticStatus const status); void KineticOperation_BuildSetClusterVersion(KineticOperation* const operation, int64_t newClusterVersion); void KineticOperation_BuildSetClusterVersion(KineticOperation* const operation, int64_t new_cluster_version); #endif // _KINETIC_OPERATION_H src/lib/kinetic_session.c +21 −0 Original line number Diff line number Diff line Loading @@ -170,3 +170,24 @@ KineticStatus KineticSession_Disconnect(KineticSession const * const session) return KINETIC_STATUS_SUCCESS; } #define ATOMIC_FETCH_AND_INCREMENT(P) __sync_fetch_and_add(P, 1) int64_t KineticSession_GetNextSequenceCount(KineticSession const * const session) { assert(session); int64_t seq_cnt = ATOMIC_FETCH_AND_INCREMENT(&session->connection->sequence); return seq_cnt; } int64_t KineticSession_GetClusterVersion(KineticSession const * const session) { assert(session); return session->config.clusterVersion; } void KineticSession_SetClusterVersion(KineticSession * const session, int64_t cluster_version) { assert(session); session->config.clusterVersion = cluster_version; } Loading
Makefile +10 −9 Original line number Diff line number Diff line Loading @@ -92,7 +92,7 @@ LIB_OBJS = \ KINETIC_LIB_OTHER_DEPS = Makefile Rakefile $(VERSION_FILE) default: makedirs $(KINETIC_LIB) default: makedirs json $(KINETIC_LIB) makedirs: @echo; mkdir -p ./bin/examples &> /dev/null; mkdir -p ./bin/unit &> /dev/null; mkdir -p ./bin/systest &> /dev/null; mkdir -p ./out &> /dev/null Loading Loading @@ -162,10 +162,14 @@ ci: uninstall stop_simulator start_simulator all stop_simulator install uninstal #------------------------------------------------------------------------------- json: ${OUT_DIR}/libjson-c.a json_install: json_install: json cd ${JSONC} && \ make install json_uninstall: if [ -f ${JSONC}/Makefile ]; then cd ${JSONC} && make uninstall; fi; ${JSONC}/Makefile: cd ${JSONC} && \ sh autogen.sh && \ Loading Loading @@ -277,9 +281,9 @@ uninstall: $(RM) -f $(PREFIX)/include/kinetic_proto.h $(RM) -f $(PREFIX)/include/protobuf-c/protobuf-c.h $(RM) -f $(PREFIX)/include/protobuf-c.h if [ -f ${JSONC}/Makefile ]; then cd ${JSONC} && make uninstall; fi; .PHONY: install uninstall .PHONY: install uninstall json_install json_uninstall #=============================================================================== # Java Simulator Support Loading Loading @@ -346,7 +350,6 @@ unit_tests: start_simulator $(unit_passfiles) # System Tests #=============================================================================== SYSTEST_SRC = ./test/system SYSTEST_OUT = $(BIN_DIR)/systest SYSTEST_LDFLAGS += -lm $(KINETIC_LIB) -L${OPENSSL_PATH}/lib -lssl -lcrypto -lpthread Loading Loading @@ -432,8 +435,6 @@ discovery_utility: $(DISCOVERY_UTIL_EXEC) build: discovery_utility #------------------------------------------------------------------------------- # Support for Simulator and Exection of Test Utility #------------------------------------------------------------------------------- Loading Loading @@ -509,7 +510,7 @@ examples: setup_examples \ run_example_write_file_blocking \ run_example_write_file_blocking_threads \ run_example_write_file_nonblocking \ run_example_get_key_range \ run_example_get_key_range valgrind_examples: setup_examples \ valgrind_put_nonblocking \ Loading @@ -517,4 +518,4 @@ valgrind_examples: setup_examples \ valgrind_example_write_file_blocking \ valgrind_example_write_file_blocking_threads \ valgrind_example_write_file_nonblocking \ valgrind_example_get_key_range \ valgrind_example_get_key_range
src/lib/kinetic_logger.c +58 −59 Original line number Diff line number Diff line Loading @@ -34,18 +34,18 @@ #define KINETIC_LOGGER_SLEEP_TIME_SEC 10 #define KINETIC_LOGGER_BUFFER_FLUSH_SIZE (0.8 * KINETIC_LOGGER_BUFFER_SIZE) STATIC int KineticLogLevel = -1; STATIC FILE* KineticLoggerHandle = NULL; STATIC pthread_mutex_t KineticLoggerBufferMutex = PTHREAD_MUTEX_INITIALIZER; STATIC char KineticLoggerBuffer[KINETIC_LOGGER_BUFFER_SIZE][KINETIC_LOGGER_BUFFER_STR_MAX_LEN]; STATIC int KineticLoggerBufferSize = 0; static int KineticLogLevel = -1; static FILE* KineticLoggerHandle = NULL; static pthread_mutex_t KineticLoggerBufferMutex = PTHREAD_MUTEX_INITIALIZER; static char KineticLoggerBuffer[KINETIC_LOGGER_BUFFER_SIZE][KINETIC_LOGGER_BUFFER_STR_MAX_LEN]; static int KineticLoggerBufferSize = 0; #if KINETIC_LOGGER_FLUSH_THREAD_ENABLED STATIC pthread_t KineticLoggerFlushThread; STATIC bool KineticLoggerForceFlush = false; STATIC bool KineticLogggerAbortRequested = false; static pthread_t KineticLoggerFlushThread; static bool KineticLoggerForceFlush = false; static bool KineticLogggerAbortRequested = false; #else STATIC bool KineticLoggerForceFlush = true; static bool KineticLoggerForceFlush = true; #endif Loading @@ -62,7 +62,7 @@ static inline void KineticLogger_FinishBuffer(void); static void* KineticLogger_FlushThread(void* arg); static void KineticLogger_InitFlushThread(void); #endif static void KineticLogger_LogProtobufMessage(int log_level, const ProtobufCMessage *msg, char* _indent); static void KineticLogger_LogProtobufMessage(int log_level, const ProtobufCMessage *msg, char* indent); //------------------------------------------------------------------------------ Loading Loading @@ -176,24 +176,33 @@ void KineticLogger_LogHeader(int log_level, const KineticPDUHeader* header) KineticLogger_LogPrintf(log_level, " valueLength: %d", header->valueLength); } #define LOG_PROTO_INIT() char _indent[32] = " "; #define LOG_INDENT " " static char indent[64] = LOG_INDENT; static const size_t max_indent = sizeof(indent)-3; static int indent_overflow = 0; #define LOG_PROTO_INIT() \ indent_overflow = 0; #define LOG_PROTO_LEVEL_START(__name) \ KineticLogger_LogPrintf(2, "%s%s {", (_indent), (__name)); \ (strlen(_indent) < (sizeof(_indent) - 3 )) ? strcat(_indent, " ") : 0; #define LOG_PROTO_LEVEL_START_NO_INDENT() \ KineticLogger_LogPrintf(2, "{"); \ (strlen(_indent) < (sizeof(_indent) - 3)) ? strcat(_indent, " ") : 0; KineticLogger_LogPrintf(2, "%s%s {", indent, __name); \ if (strlen(indent) < max_indent) { strcat(indent, LOG_INDENT); } \ else { indent_overflow++; } #define LOG_PROTO_LEVEL_END() \ _indent[strlen(_indent) - 2] = '\0'; \ KineticLogger_LogPrintf(2, "%s}", _indent); if (indent_overflow == 0) { indent[strlen(indent) - 2] = '\0'; } \ else { indent_overflow--; } \ KineticLogger_LogPrintf(2, "%s}", indent); #define LOG_PROTO_LEVEL_START_ARRAY(__name, __quantity) \ KineticLogger_LogPrintf(2, "%s%s: (%u elements) [", (_indent), (__name), (__quantity)); \ (strlen(_indent) < (sizeof(_indent) - 3)) ? strcat(_indent, " ") : 0; KineticLogger_LogPrintf(2, "%s%s: (%u elements)", (indent), (__name), (__quantity)); \ KineticLogger_LogPrintf(2, "%s[", (indent)); \ if (strlen(indent) < max_indent) { strcat(indent, LOG_INDENT); } \ else { indent_overflow++; } #define LOG_PROTO_LEVEL_END_ARRAY() \ _indent[strlen(_indent) - 2] = '\0'; \ KineticLogger_LogPrintf(2, "%s]", _indent); if (indent_overflow == 0) { indent[strlen(indent) - 2] = '\0'; } \ else { indent_overflow--; } \ KineticLogger_LogPrintf(2, "%s]", (indent)); static int KineticLogger_u8toa(char* p_buf, uint8_t val) { Loading Loading @@ -231,7 +240,7 @@ static void LogUnboxed(int log_level, void const * const fieldData, ProtobufCFieldDescriptor const * const fieldDesc, size_t const i, char* _indent) char* indent) { switch (fieldDesc->type) { case PROTOBUF_C_TYPE_INT32: Loading @@ -239,7 +248,7 @@ static void LogUnboxed(int log_level, case PROTOBUF_C_TYPE_SFIXED32: { int32_t const * value = (int32_t const *)fieldData; KineticLogger_LogPrintf(log_level, "%ld", value[i]); KineticLogger_LogPrintf(log_level, "%s%s: %ld", indent, fieldDesc->name, value[i]); } break; Loading @@ -248,7 +257,7 @@ static void LogUnboxed(int log_level, case PROTOBUF_C_TYPE_SFIXED64: { int64_t* value = (int64_t*)fieldData; KineticLogger_LogPrintf(log_level, "%lld", value[i]); KineticLogger_LogPrintf(log_level, "%s%s: %lld", indent, fieldDesc->name, value[i]); } break; Loading @@ -256,7 +265,7 @@ static void LogUnboxed(int log_level, case PROTOBUF_C_TYPE_FIXED32: { uint32_t* value = (uint32_t*)fieldData; KineticLogger_LogPrintf(log_level, "%lu", value[i]); KineticLogger_LogPrintf(log_level, "%s%s: %lu", indent, fieldDesc->name, value[i]); } break; Loading @@ -264,43 +273,43 @@ static void LogUnboxed(int log_level, case PROTOBUF_C_TYPE_FIXED64: { uint64_t* value = (uint64_t*)fieldData; KineticLogger_LogPrintf(log_level, "%llu", value[i]); KineticLogger_LogPrintf(log_level, "%s%s: %llu", indent, fieldDesc->name, value[i]); } break; case PROTOBUF_C_TYPE_FLOAT: { float* value = (float*)fieldData; KineticLogger_LogPrintf(log_level, "%f", value[i]); KineticLogger_LogPrintf(log_level, "%s%s: %f", indent, fieldDesc->name, value[i]); } break; case PROTOBUF_C_TYPE_DOUBLE: { double* value = (double*)fieldData; KineticLogger_LogPrintf(log_level, "%f", value[i]); KineticLogger_LogPrintf(log_level, "%s%s: %f", indent, fieldDesc->name, value[i]); } break; case PROTOBUF_C_TYPE_BOOL: { protobuf_c_boolean* value = (protobuf_c_boolean*)fieldData; KineticLogger_LogPrintf(log_level, "%s", BOOL_TO_STRING(value[i])); KineticLogger_LogPrintf(log_level, "%s%s: %s", indent, fieldDesc->name, BOOL_TO_STRING(value[i])); } break; case PROTOBUF_C_TYPE_STRING: { char** strings = (char**)fieldData; KineticLogger_LogPrintf(log_level, "%s", strings[i]); KineticLogger_LogPrintf(log_level, "%s%s: %s", indent, fieldDesc->name, strings[i]); } break; case PROTOBUF_C_TYPE_BYTES: { ProtobufCBinaryData* value = (ProtobufCBinaryData*)fieldData; LOG_PROTO_LEVEL_START_NO_INDENT(); KineticLogger_LogByteArray(log_level, _indent, LOG_PROTO_LEVEL_START(fieldDesc->name); KineticLogger_LogByteArray(log_level, indent, (ByteArray){.data = value[i].data, .len = value[i].len}); LOG_PROTO_LEVEL_END(); Loading @@ -312,18 +321,17 @@ static void LogUnboxed(int log_level, int * value = (int*)fieldData; ProtobufCEnumDescriptor const * enumDesc = fieldDesc->descriptor; ProtobufCEnumValue const * enumVal = protobuf_c_enum_descriptor_get_value(enumDesc, value[i]); KineticLogger_LogPrintf(log_level, "%s", enumVal->name); KineticLogger_LogPrintf(log_level, "%s%s: %s", indent, fieldDesc->name, enumVal->name); } break; case PROTOBUF_C_TYPE_MESSAGE: // nested message { ProtobufCMessage** msg = (ProtobufCMessage**)fieldData; if (msg[i] != NULL) { LOG_PROTO_LEVEL_START_NO_INDENT(); KineticLogger_LogProtobufMessage(log_level, msg[i], _indent); LOG_PROTO_LEVEL_START(fieldDesc->name); KineticLogger_LogProtobufMessage(log_level, msg[i], indent); LOG_PROTO_LEVEL_END(); } } break; Loading @@ -335,7 +343,7 @@ static void LogUnboxed(int log_level, }; } static void KineticLogger_LogProtobufMessage(int log_level, ProtobufCMessage const * msg, char* _indent) static void KineticLogger_LogProtobufMessage(int log_level, ProtobufCMessage const * msg, char* indent) { if (msg == NULL || msg->descriptor == NULL || !KineticLogger_IsLevelEnabled(log_level)) { return; Loading @@ -355,9 +363,7 @@ static void KineticLogger_LogProtobufMessage(int log_level, ProtobufCMessage con { case PROTOBUF_C_LABEL_REQUIRED: { printf("%s%s: ", _indent, fieldDesc->name); LogUnboxed(log_level, &pMsg[fieldDesc->offset], fieldDesc, 0, _indent); LogUnboxed(log_level, &pMsg[fieldDesc->offset], fieldDesc, 0, indent); } break; case PROTOBUF_C_LABEL_OPTIONAL: { Loading @@ -366,39 +372,32 @@ static void KineticLogger_LogProtobufMessage(int log_level, ProtobufCMessage con // and a special case: if this is a message, don't show it if the message is NULL (PROTOBUF_C_TYPE_MESSAGE != fieldDesc->type || ((ProtobufCMessage**)(void*)&pMsg[fieldDesc->offset])[0] != NULL)) { printf("%s%s: ", _indent, fieldDesc->name); /* special case for nested command packed into commandBytes field */ // special case for nested command packed into commandBytes field if ((protobuf_c_message_descriptor_get_field_by_name(desc, "commandBytes") == fieldDesc ) && (PROTOBUF_C_TYPE_BYTES == fieldDesc->type)) { (PROTOBUF_C_TYPE_BYTES == fieldDesc->type)) { ProtobufCBinaryData* value = (ProtobufCBinaryData*)(void*)&pMsg[fieldDesc->offset]; if ((value->data != NULL) && (value->len > 0)) { LOG_PROTO_LEVEL_START(fieldDesc->name); KineticProto_Command * cmd = KineticProto_command__unpack(NULL, value->len, value->data); LOG_PROTO_LEVEL_START_NO_INDENT(); KineticLogger_LogProtobufMessage(log_level, &cmd->base, _indent); KineticLogger_LogProtobufMessage(log_level, &cmd->base, indent); LOG_PROTO_LEVEL_END(); free(cmd); } } else { LogUnboxed(log_level, &pMsg[fieldDesc->offset], fieldDesc, 0, _indent); else { LogUnboxed(log_level, &pMsg[fieldDesc->offset], fieldDesc, 0, indent); } } } break; case PROTOBUF_C_LABEL_REPEATED: { unsigned const * quantifier = (unsigned const *)(void*)&pMsg[fieldDesc->quantifier_offset]; if (*quantifier > 0) { if (*quantifier > 0) { LOG_PROTO_LEVEL_START_ARRAY(fieldDesc->name, *quantifier); for (uint32_t i = 0; i < *quantifier; i++) { for (uint32_t i = 0; i < *quantifier; i++) { void const ** box = (void const **)(void*)&pMsg[fieldDesc->offset]; printf("%s", _indent); LogUnboxed(log_level, *box, fieldDesc, i, _indent); LogUnboxed(log_level, *box, fieldDesc, i, indent); } LOG_PROTO_LEVEL_END_ARRAY(); } Loading @@ -416,7 +415,7 @@ void KineticLogger_LogProtobuf(int log_level, const KineticProto_Message* msg) KineticLogger_Log(log_level, "Kinetic Protobuf:"); KineticLogger_LogProtobufMessage(log_level, &msg->base, _indent); KineticLogger_LogProtobufMessage(log_level, &msg->base, indent); } void KineticLogger_LogStatus(int log_level, KineticProto_Command_Status* status) Loading
src/lib/kinetic_operation.c +9 −5 Original line number Diff line number Diff line Loading @@ -76,7 +76,7 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o // Populate sequence count and increment it for next operation assert(request->message.header.sequence == KINETIC_SEQUENCE_NOT_YET_BOUND); int64_t seq_id = operation->connection->sequence++; int64_t seq_id = KineticSession_GetNextSequenceCount(session); request->message.header.sequence = seq_id; // Pack the command, if available Loading Loading @@ -826,11 +826,14 @@ KineticStatus KineticOperation_SetClusterVersionCallback(KineticOperation* const assert(operation->connection != NULL); LOGF3("SetClusterVersion callback w/ operation (0x%0llX) on connection (0x%0llX)", operation, operation->connection); (void)status; return KINETIC_STATUS_SUCCESS; if (status == KINETIC_STATUS_SUCCESS) { KineticSession_SetClusterVersion(operation->connection->pSession, operation->pendingClusterVersion); operation->pendingClusterVersion = -1; // Invalidate } return status; } void KineticOperation_BuildSetClusterVersion(KineticOperation* operation, int64_t newClusterVersion) void KineticOperation_BuildSetClusterVersion(KineticOperation* operation, int64_t new_cluster_version) { KineticOperation_ValidateOperation(operation); Loading @@ -839,7 +842,7 @@ void KineticOperation_BuildSetClusterVersion(KineticOperation* operation, int64_ operation->request->command->body = &operation->request->message.body; operation->request->command->body->setup = &operation->request->message.setup; operation->request->command->body->setup->newClusterVersion = newClusterVersion; operation->request->command->body->setup->newClusterVersion = new_cluster_version; operation->request->command->body->setup->has_newClusterVersion = true; operation->request->command->body = &operation->request->message.body; Loading @@ -847,4 +850,5 @@ void KineticOperation_BuildSetClusterVersion(KineticOperation* operation, int64_ operation->valueEnabled = false; operation->sendValue = false; operation->callback = &KineticOperation_SetClusterVersionCallback; operation->pendingClusterVersion = new_cluster_version; }
src/lib/kinetic_operation.h +1 −1 Original line number Diff line number Diff line Loading @@ -88,6 +88,6 @@ void KineticOperation_BuildSetAcl(KineticOperation* const operation, KineticDeviceInfo** info); KineticStatus KineticOperation_SetClusterVersionCallback(KineticOperation* const operation, KineticStatus const status); void KineticOperation_BuildSetClusterVersion(KineticOperation* const operation, int64_t newClusterVersion); void KineticOperation_BuildSetClusterVersion(KineticOperation* const operation, int64_t new_cluster_version); #endif // _KINETIC_OPERATION_H
src/lib/kinetic_session.c +21 −0 Original line number Diff line number Diff line Loading @@ -170,3 +170,24 @@ KineticStatus KineticSession_Disconnect(KineticSession const * const session) return KINETIC_STATUS_SUCCESS; } #define ATOMIC_FETCH_AND_INCREMENT(P) __sync_fetch_and_add(P, 1) int64_t KineticSession_GetNextSequenceCount(KineticSession const * const session) { assert(session); int64_t seq_cnt = ATOMIC_FETCH_AND_INCREMENT(&session->connection->sequence); return seq_cnt; } int64_t KineticSession_GetClusterVersion(KineticSession const * const session) { assert(session); return session->config.clusterVersion; } void KineticSession_SetClusterVersion(KineticSession * const session, int64_t cluster_version) { assert(session); session->config.clusterVersion = cluster_version; }