Loading src/lib/kinetic_controller.c +1 −1 Original line number Diff line number Diff line Loading @@ -243,7 +243,7 @@ void KineticController_HandleExpectedResponse(bus_msg_result_t *res, void *udata LOGF1("[PDU RX] pdu: 0x%0llX, op: 0x%llX, session: 0x%llX, bus: 0x%llX, " "seq: %5lld, protoLen: %4u, valueLen: %u, status: %s", response, op, &op->connection->session, op->connection->messageBus, response->command->header->sequence, response->header.protobufLength, response->header.valueLength, response->command->header->ackSequence, response->header.protobufLength, response->header.valueLength, Kinetic_GetStatusDescription(status)); } else Loading src/lib/kinetic_operation.c +29 −22 Original line number Diff line number Diff line Loading @@ -63,8 +63,14 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o { LOGF3("\nSending PDU via fd=%d", operation->connection->messageBus); KineticStatus status = KINETIC_STATUS_INVALID; uint8_t * msg = NULL; KineticPDU* request = operation->request; KineticProto_Message* proto = &operation->request->message.message; pthread_mutex_t* sendMutex = &operation->connection->sendMutex; // Acquire lock pthread_mutex_lock(sendMutex); // Pack the command, if available size_t expectedLen = KineticProto_command__get_packed_size(&request->message.command); Loading @@ -72,7 +78,8 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o if(request->message.message.commandBytes.data == NULL) { LOG0("Failed to allocate command bytes!"); return KINETIC_STATUS_MEMORY_ERROR; status = KINETIC_STATUS_MEMORY_ERROR; goto cleanup; } size_t packedLen = KineticProto_command__pack( &request->message.command, Loading Loading @@ -109,12 +116,6 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o header.protobufLength = KineticProto_Message__get_packed_size(proto); KineticLogger_LogProtobuf(3, proto); if (header.protobufLength > PDU_PROTO_MAX_LEN) { // Packed message exceeds max size. LOGF2("\nPacked protobuf exceeds maximum size. Packed size is: %d, Max size is: %d", header.protobufLength, PDU_PROTO_MAX_LEN); return KINETIC_STATUS_BUFFER_OVERRUN; } if (operation->entry != NULL && operation->sendValue) { header.valueLength = operation->entry->value.bytesUsed; } Loading @@ -125,20 +126,10 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o if (header.valueLength > PDU_PROTO_MAX_LEN) { // Packed value exceeds max size. LOGF2("\nPacked value exceeds maximum size. Packed size is: %d, Max size is: %d", header.valueLength, PDU_PROTO_MAX_LEN); return KINETIC_STATUS_BUFFER_OVERRUN; status = KINETIC_STATUS_BUFFER_OVERRUN; goto cleanup; } size_t offset = 0; uint8_t * msg = malloc(PDU_HEADER_LEN + header.protobufLength + header.valueLength); if (msg == NULL) { LOG0("Failed to allocate outgoing message!"); return KINETIC_STATUS_MEMORY_ERROR; } // Acquire lock pthread_mutex_lock(&operation->connection->sendMutex); // Populate sequence count and increment it for next operation request->message.header.sequence = operation->connection->sequence++; Loading @@ -151,6 +142,14 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o uint32_t nboProtoLength = KineticNBO_FromHostU32(header.protobufLength); uint32_t nboValueLength = KineticNBO_FromHostU32(header.valueLength); size_t offset = 0; msg = malloc(PDU_HEADER_LEN + header.protobufLength + header.valueLength); if (msg == NULL) { LOG0("Failed to allocate outgoing message!"); status = KINETIC_STATUS_MEMORY_ERROR; goto cleanup; } msg[offset] = header.versionPrefix; offset += sizeof(header.versionPrefix); Loading Loading @@ -183,11 +182,15 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o .udata = operation, }); pthread_mutex_unlock(&operation->connection->sendMutex); status = KINETIC_STATUS_SUCCESS; free(msg); cleanup: return KINETIC_STATUS_SUCCESS; pthread_mutex_unlock(sendMutex); if (msg != NULL) { free(msg); } return status; } KineticStatus KineticOperation_GetStatus(const KineticOperation* const operation) Loading Loading @@ -656,6 +659,10 @@ KineticStatus KineticOperation_BuildP2POperation(KineticOperation* const operati return KINETIC_STATUS_OPERATION_INVALID; } if (p2pOp->numOperations >= 100000) { return KINETIC_STATUS_BUFFER_OVERRUN; } operation->p2pOp = p2pOp; operation->callback = &KineticOperation_P2POperationCallback; return KINETIC_STATUS_SUCCESS; Loading test/system/test_system_stress_single_session_threaded.c +1 −3 Original line number Diff line number Diff line Loading @@ -214,7 +214,6 @@ void run_throughput_tests(KineticSession* session, size_t num_ops, size_t value_ } } #if 1 // Measure DELETE performance { OpStatus delete_statuses[num_ops]; Loading Loading @@ -289,7 +288,6 @@ void run_throughput_tests(KineticSession* session, size_t num_ops, size_t value_ elapsed_ms / 1000.0f, throughput); } #endif ByteBuffer_Free(test_data); } Loading Loading @@ -338,7 +336,7 @@ void run_tests(KineticClient * client) TestParams params[] = { { .client = client, .session = &session, .thread_iters = 1, .num_ops = 100, .obj_size = KINETIC_OBJ_SIZE }, { .client = client, .session = &session, .thread_iters = 1, .num_ops = 1000, .obj_size = 120, }, // { .client = client, .session = &session, .thread_iters = 1, .num_ops = 1000, .obj_size = 500, }, { .client = client, .session = &session, .thread_iters = 1, .num_ops = 1000, .obj_size = 500, }, // { .client = client, .session = &session, .thread_iters = 1, .num_ops = 500, .obj_size = 70000, }, // { .client = client, .session = &session, .thread_iters = 1, .num_ops = 1000, .obj_size = 120, }, // { .client = client, .session = &session, .thread_iters = 3, .num_ops = 1000, .obj_size = 120, }, Loading Loading
src/lib/kinetic_controller.c +1 −1 Original line number Diff line number Diff line Loading @@ -243,7 +243,7 @@ void KineticController_HandleExpectedResponse(bus_msg_result_t *res, void *udata LOGF1("[PDU RX] pdu: 0x%0llX, op: 0x%llX, session: 0x%llX, bus: 0x%llX, " "seq: %5lld, protoLen: %4u, valueLen: %u, status: %s", response, op, &op->connection->session, op->connection->messageBus, response->command->header->sequence, response->header.protobufLength, response->header.valueLength, response->command->header->ackSequence, response->header.protobufLength, response->header.valueLength, Kinetic_GetStatusDescription(status)); } else Loading
src/lib/kinetic_operation.c +29 −22 Original line number Diff line number Diff line Loading @@ -63,8 +63,14 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o { LOGF3("\nSending PDU via fd=%d", operation->connection->messageBus); KineticStatus status = KINETIC_STATUS_INVALID; uint8_t * msg = NULL; KineticPDU* request = operation->request; KineticProto_Message* proto = &operation->request->message.message; pthread_mutex_t* sendMutex = &operation->connection->sendMutex; // Acquire lock pthread_mutex_lock(sendMutex); // Pack the command, if available size_t expectedLen = KineticProto_command__get_packed_size(&request->message.command); Loading @@ -72,7 +78,8 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o if(request->message.message.commandBytes.data == NULL) { LOG0("Failed to allocate command bytes!"); return KINETIC_STATUS_MEMORY_ERROR; status = KINETIC_STATUS_MEMORY_ERROR; goto cleanup; } size_t packedLen = KineticProto_command__pack( &request->message.command, Loading Loading @@ -109,12 +116,6 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o header.protobufLength = KineticProto_Message__get_packed_size(proto); KineticLogger_LogProtobuf(3, proto); if (header.protobufLength > PDU_PROTO_MAX_LEN) { // Packed message exceeds max size. LOGF2("\nPacked protobuf exceeds maximum size. Packed size is: %d, Max size is: %d", header.protobufLength, PDU_PROTO_MAX_LEN); return KINETIC_STATUS_BUFFER_OVERRUN; } if (operation->entry != NULL && operation->sendValue) { header.valueLength = operation->entry->value.bytesUsed; } Loading @@ -125,20 +126,10 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o if (header.valueLength > PDU_PROTO_MAX_LEN) { // Packed value exceeds max size. LOGF2("\nPacked value exceeds maximum size. Packed size is: %d, Max size is: %d", header.valueLength, PDU_PROTO_MAX_LEN); return KINETIC_STATUS_BUFFER_OVERRUN; status = KINETIC_STATUS_BUFFER_OVERRUN; goto cleanup; } size_t offset = 0; uint8_t * msg = malloc(PDU_HEADER_LEN + header.protobufLength + header.valueLength); if (msg == NULL) { LOG0("Failed to allocate outgoing message!"); return KINETIC_STATUS_MEMORY_ERROR; } // Acquire lock pthread_mutex_lock(&operation->connection->sendMutex); // Populate sequence count and increment it for next operation request->message.header.sequence = operation->connection->sequence++; Loading @@ -151,6 +142,14 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o uint32_t nboProtoLength = KineticNBO_FromHostU32(header.protobufLength); uint32_t nboValueLength = KineticNBO_FromHostU32(header.valueLength); size_t offset = 0; msg = malloc(PDU_HEADER_LEN + header.protobufLength + header.valueLength); if (msg == NULL) { LOG0("Failed to allocate outgoing message!"); status = KINETIC_STATUS_MEMORY_ERROR; goto cleanup; } msg[offset] = header.versionPrefix; offset += sizeof(header.versionPrefix); Loading Loading @@ -183,11 +182,15 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o .udata = operation, }); pthread_mutex_unlock(&operation->connection->sendMutex); status = KINETIC_STATUS_SUCCESS; free(msg); cleanup: return KINETIC_STATUS_SUCCESS; pthread_mutex_unlock(sendMutex); if (msg != NULL) { free(msg); } return status; } KineticStatus KineticOperation_GetStatus(const KineticOperation* const operation) Loading Loading @@ -656,6 +659,10 @@ KineticStatus KineticOperation_BuildP2POperation(KineticOperation* const operati return KINETIC_STATUS_OPERATION_INVALID; } if (p2pOp->numOperations >= 100000) { return KINETIC_STATUS_BUFFER_OVERRUN; } operation->p2pOp = p2pOp; operation->callback = &KineticOperation_P2POperationCallback; return KINETIC_STATUS_SUCCESS; Loading
test/system/test_system_stress_single_session_threaded.c +1 −3 Original line number Diff line number Diff line Loading @@ -214,7 +214,6 @@ void run_throughput_tests(KineticSession* session, size_t num_ops, size_t value_ } } #if 1 // Measure DELETE performance { OpStatus delete_statuses[num_ops]; Loading Loading @@ -289,7 +288,6 @@ void run_throughput_tests(KineticSession* session, size_t num_ops, size_t value_ elapsed_ms / 1000.0f, throughput); } #endif ByteBuffer_Free(test_data); } Loading Loading @@ -338,7 +336,7 @@ void run_tests(KineticClient * client) TestParams params[] = { { .client = client, .session = &session, .thread_iters = 1, .num_ops = 100, .obj_size = KINETIC_OBJ_SIZE }, { .client = client, .session = &session, .thread_iters = 1, .num_ops = 1000, .obj_size = 120, }, // { .client = client, .session = &session, .thread_iters = 1, .num_ops = 1000, .obj_size = 500, }, { .client = client, .session = &session, .thread_iters = 1, .num_ops = 1000, .obj_size = 500, }, // { .client = client, .session = &session, .thread_iters = 1, .num_ops = 500, .obj_size = 70000, }, // { .client = client, .session = &session, .thread_iters = 1, .num_ops = 1000, .obj_size = 120, }, // { .client = client, .session = &session, .thread_iters = 3, .num_ops = 1000, .obj_size = 120, }, Loading