Commit ee9cfd04 authored by Greg Williams's avatar Greg Williams
Browse files

Added proper cleanup of allocated memory and mutex unlocking to PDU send upon failure.

Fixed issue with response PDU logging of sequence count.
parent db64d059
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -243,7 +243,7 @@ void KineticController_HandleExpectedResponse(bus_msg_result_t *res, void *udata
        LOGF1("[PDU RX] pdu: 0x%0llX, op: 0x%llX, session: 0x%llX, bus: 0x%llX, "
            "seq: %5lld, protoLen: %4u, valueLen: %u, status: %s",
            response, op, &op->connection->session, op->connection->messageBus,
            response->command->header->sequence, response->header.protobufLength, response->header.valueLength,
            response->command->header->ackSequence, response->header.protobufLength, response->header.valueLength,
            Kinetic_GetStatusDescription(status));
    }
    else
+25 −22
Original line number Diff line number Diff line
@@ -63,8 +63,14 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o
{
    LOGF3("\nSending PDU via fd=%d", operation->connection->messageBus);

    KineticStatus status = KINETIC_STATUS_INVALID;
    uint8_t * msg = NULL;
    KineticPDU* request = operation->request;
    KineticProto_Message* proto = &operation->request->message.message;
    pthread_mutex_t* sendMutex = &operation->connection->sendMutex;

    // Acquire lock
    pthread_mutex_lock(sendMutex);

    // Pack the command, if available
    size_t expectedLen = KineticProto_command__get_packed_size(&request->message.command);
@@ -72,7 +78,8 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o
    if(request->message.message.commandBytes.data == NULL)
    {
        LOG0("Failed to allocate command bytes!");
        return KINETIC_STATUS_MEMORY_ERROR;
        status = KINETIC_STATUS_MEMORY_ERROR;
        goto cleanup;
    }
    size_t packedLen = KineticProto_command__pack(
        &request->message.command,
@@ -109,12 +116,6 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o
    header.protobufLength = KineticProto_Message__get_packed_size(proto);
    KineticLogger_LogProtobuf(3, proto);

    if (header.protobufLength > PDU_PROTO_MAX_LEN) {
        // Packed message exceeds max size.
        LOGF2("\nPacked protobuf exceeds maximum size. Packed size is: %d, Max size is: %d", header.protobufLength, PDU_PROTO_MAX_LEN);
        return KINETIC_STATUS_BUFFER_OVERRUN;
    }

    if (operation->entry != NULL && operation->sendValue) {
        header.valueLength = operation->entry->value.bytesUsed;
    }
@@ -125,20 +126,10 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o
    if (header.valueLength > PDU_PROTO_MAX_LEN) {
        // Packed value exceeds max size.
        LOGF2("\nPacked value exceeds maximum size. Packed size is: %d, Max size is: %d", header.valueLength, PDU_PROTO_MAX_LEN);
        return KINETIC_STATUS_BUFFER_OVERRUN;
    }

    size_t offset = 0;
    uint8_t * msg = malloc(PDU_HEADER_LEN + header.protobufLength + header.valueLength);
    if (msg == NULL)
    {
        LOG0("Failed to allocate outgoing message!");
        return KINETIC_STATUS_MEMORY_ERROR;
        status = KINETIC_STATUS_BUFFER_OVERRUN;
        goto cleanup;
    }

    // Acquire lock
    pthread_mutex_lock(&operation->connection->sendMutex);

    // Populate sequence count and increment it for next operation
    request->message.header.sequence = operation->connection->sequence++;

@@ -151,6 +142,14 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o
    uint32_t nboProtoLength = KineticNBO_FromHostU32(header.protobufLength);
    uint32_t nboValueLength = KineticNBO_FromHostU32(header.valueLength);

    size_t offset = 0;
    msg = malloc(PDU_HEADER_LEN + header.protobufLength + header.valueLength);
    if (msg == NULL) {
        LOG0("Failed to allocate outgoing message!");
        status = KINETIC_STATUS_MEMORY_ERROR;
        goto cleanup;
    }
    
    msg[offset] = header.versionPrefix;
    offset += sizeof(header.versionPrefix);

@@ -183,11 +182,15 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o
        .udata    = operation,
    });

    pthread_mutex_unlock(&operation->connection->sendMutex);
    status = KINETIC_STATUS_SUCCESS;

    free(msg);
cleanup:

    return KINETIC_STATUS_SUCCESS;
    pthread_mutex_unlock(sendMutex);

    if (msg != NULL) { free(msg); }

    return status;
}

KineticStatus KineticOperation_GetStatus(const KineticOperation* const operation)
+1 −3
Original line number Diff line number Diff line
@@ -214,7 +214,6 @@ void run_throughput_tests(KineticSession* session, size_t num_ops, size_t value_
        }
    }

    #if 1
    // Measure DELETE performance
    {
        OpStatus delete_statuses[num_ops];
@@ -289,7 +288,6 @@ void run_throughput_tests(KineticSession* session, size_t num_ops, size_t value_
            elapsed_ms / 1000.0f,
            throughput);
    }
    #endif

    ByteBuffer_Free(test_data);
}
@@ -338,7 +336,7 @@ void run_tests(KineticClient * client)
    TestParams params[] = { 
        { .client = client, .session = &session, .thread_iters = 1, .num_ops = 100,  .obj_size = KINETIC_OBJ_SIZE },
        { .client = client, .session = &session, .thread_iters = 1, .num_ops = 1000, .obj_size = 120,             },
        // { .client = client, .session = &session, .thread_iters = 1, .num_ops = 1000, .obj_size = 500,             },
        { .client = client, .session = &session, .thread_iters = 1, .num_ops = 1000, .obj_size = 500,             },
        // { .client = client, .session = &session, .thread_iters = 1, .num_ops = 500,  .obj_size = 70000,           },
        // { .client = client, .session = &session, .thread_iters = 1, .num_ops = 1000, .obj_size = 120,             },
        // { .client = client, .session = &session, .thread_iters = 3, .num_ops = 1000, .obj_size = 120,             },