Commit d2d574cb authored by Job Vranish's avatar Job Vranish
Browse files

removed several unneeded fields from kineticPDU and started re-writing the allocator tests

parent f4f504d3
Loading
Loading
Loading
Loading
+5 −11
Original line number Diff line number Diff line
@@ -30,7 +30,10 @@
KineticConnection* KineticAllocator_NewConnection(void)
{
    KineticConnection* connection = calloc(1, sizeof(KineticConnection));
    KINETIC_CONNECTION_INIT(connection);
    if (connection == NULL) {
        LOG0("Failed allocating new Connection!");
        return NULL;
    }
    return connection;
}

@@ -53,7 +56,6 @@ KineticPDU* KineticAllocator_NewPDU(KineticConnection* connection)
        LOG0("Failed allocating new PDU!");
        return NULL;
    }
    assert(newPDU->proto == NULL);
    KineticPDU_Init(newPDU, connection);
    LOGF3("Allocated new PDU (0x%0llX) on connection", newPDU, connection);
    return newPDU;
@@ -61,16 +63,8 @@ KineticPDU* KineticAllocator_NewPDU(KineticConnection* connection)

void KineticAllocator_FreePDU(KineticPDU* pdu)
{
    KineticConnection* connection = pdu->connection;
    LOGF3("Freeing PDU (0x%0llX) on connection (0x%0llX)", pdu, connection);
    if (pdu && (pdu->proto != NULL) && pdu->protobufDynamicallyExtracted) {
        LOG3("Freeing dynamically allocated protobuf");
        KineticProto_Message__free_unpacked(pdu->proto, NULL);
        pdu->proto = NULL;
    };
    
    free(pdu);
    LOGF3("Freed PDU (0x%0llX) on connection (0x%0llX)", pdu, connection);
    LOGF3("Freed PDU (0x%0llX)", pdu);
}


+75 −78
Original line number Diff line number Diff line
@@ -40,118 +40,117 @@ KineticStatus KineticOperation_SendRequest(KineticOperation* const operation)
    assert(operation != NULL);
    assert(operation->connection != NULL);
    assert(operation->request != NULL);
    assert(operation->request->connection == operation->connection);
    LOGF2("\nSending PDU via fd=%d", operation->connection->messageBus);
    KineticPDU* request = operation->request;
    request->proto = &operation->request->protoData.message.message;
    KineticProto_Message* proto = &operation->request->message.message;

    // Pack the command, if available
    if (request->protoData.message.has_command) {
        size_t expectedLen = KineticProto_command__get_packed_size(&request->protoData.message.command);
        request->protoData.message.message.commandBytes.data = (uint8_t*)malloc(expectedLen);
        assert(request->protoData.message.message.commandBytes.data != NULL);
    if (request->message.has_command) {
        size_t expectedLen = KineticProto_command__get_packed_size(&request->message.command);
        request->message.message.commandBytes.data = (uint8_t*)malloc(expectedLen);
        assert(request->message.message.commandBytes.data != NULL);
        size_t packedLen = KineticProto_command__pack(
            &request->protoData.message.command,
            request->protoData.message.message.commandBytes.data);
            &request->message.command,
            request->message.message.commandBytes.data);
        assert(packedLen == expectedLen);
        request->protoData.message.message.commandBytes.len = packedLen;
        request->protoData.message.message.has_commandBytes = true;
        request->message.message.commandBytes.len = packedLen;
        request->message.message.has_commandBytes = true;
        KineticLogger_LogByteArray(2, "commandBytes", (ByteArray){
            .data = request->protoData.message.message.commandBytes.data,
            .len = request->protoData.message.message.commandBytes.len,
            .data = request->message.message.commandBytes.data,
            .len = request->message.message.commandBytes.len,
        });
    }

    switch (operation->request->proto->authType) {
    switch (proto->authType) {
    case KINETIC_PROTO_MESSAGE_AUTH_TYPE_PINAUTH:
        /* TODO: If operation uses PIN AUTH, then init that */
        break;
    case KINETIC_PROTO_MESSAGE_AUTH_TYPE_HMACAUTH:
        {
            KineticHMAC hmac;

            // Populate the HMAC for the protobuf
        KineticHMAC_Init(&request->hmac, KINETIC_PROTO_COMMAND_SECURITY_ACL_HMACALGORITHM_HmacSHA1);
        KineticHMAC_Populate(&request->hmac, request->proto, request->connection->session.config.hmacKey);
        break;
            KineticHMAC_Init(&hmac, KINETIC_PROTO_COMMAND_SECURITY_ACL_HMACALGORITHM_HmacSHA1);
            KineticHMAC_Populate(&hmac, proto, operation->connection->session.config.hmacKey);
        } break;
    default:
        break;
    }

    KineticPDUHeader header;

    // Configure PDU header length fields
    request->header.versionPrefix = 'F';
    request->header.protobufLength = KineticProto_Message__get_packed_size(request->proto);
    KineticLogger_LogProtobuf(3, request->proto);
    header.versionPrefix = 'F';
    header.protobufLength = KineticProto_Message__get_packed_size(proto);
    KineticLogger_LogProtobuf(3, proto);

    if (request->header.protobufLength > PDU_PROTO_MAX_LEN) {
    if (header.protobufLength > PDU_PROTO_MAX_LEN) {
        // Packed message exceeds max size.
        LOGF2("\nPacked protobuf exceeds maximum size. Packed size is: %d, Max size is: %d", request->header.protobufLength, PDU_PROTO_MAX_LEN);
        LOGF2("\nPacked protobuf exceeds maximum size. Packed size is: %d, Max size is: %d", header.protobufLength, PDU_PROTO_MAX_LEN);
        return KINETIC_STATUS_BUFFER_OVERRUN;
    }

    if (operation->entry != NULL && operation->sendValue) {
        request->header.valueLength = operation->entry->value.bytesUsed;
        if (request->header.valueLength > PDU_PROTO_MAX_LEN) {
        header.valueLength = operation->entry->value.bytesUsed;
        if (header.valueLength > PDU_PROTO_MAX_LEN) {
            // Packed value exceeds max size.
            LOGF2("\nPacked value exceeds maximum size. Packed size is: %d, Max size is: %d", request->header.valueLength, PDU_PROTO_MAX_LEN);
            LOGF2("\nPacked value exceeds maximum size. Packed size is: %d, Max size is: %d", header.valueLength, PDU_PROTO_MAX_LEN);
            return KINETIC_STATUS_BUFFER_OVERRUN;
        }
    }
    else {
        request->header.valueLength = 0;
        header.valueLength = 0;
    }
    KineticLogger_LogHeader(1, &request->header);

    // Create NBO copy of header for sending
    request->headerNBO.versionPrefix = 'F';
    request->headerNBO.protobufLength = KineticNBO_FromHostU32(request->header.protobufLength);
    request->headerNBO.valueLength = KineticNBO_FromHostU32(request->header.valueLength);
    KineticLogger_LogHeader(1, &header);

    uint32_t nboProtoLength = KineticNBO_FromHostU32(request->header.protobufLength);
    uint32_t nboValueLength = KineticNBO_FromHostU32(request->header.valueLength);
    uint32_t nboProtoLength = KineticNBO_FromHostU32(header.protobufLength);
    uint32_t nboValueLength = KineticNBO_FromHostU32(header.valueLength);

    size_t offset = 0;
    uint8_t * msg = malloc(PDU_HEADER_LEN + request->header.protobufLength + request->header.valueLength);
    uint8_t * msg = malloc(PDU_HEADER_LEN + header.protobufLength + header.valueLength);
    if (msg == NULL)
    {
        if (request->protoData.message.has_command)
        if (request->message.has_command)
        {
            free(request->protoData.message.message.commandBytes.data);
            request->protoData.message.message.commandBytes.data = NULL;
            free(request->message.message.commandBytes.data);
            request->message.message.commandBytes.data = NULL;
        }

        LOG0("Failed to allocate outgoing message!");
        return KINETIC_STATUS_MEMORY_ERROR;
    }
    
    msg[offset] = request->header.versionPrefix;
    offset += sizeof(request->header.versionPrefix);
    msg[offset] = header.versionPrefix;
    offset += sizeof(header.versionPrefix);

    memcpy(&msg[offset], &nboProtoLength, sizeof(nboProtoLength));
    offset += sizeof(nboProtoLength);
    memcpy(&msg[offset], &nboValueLength, sizeof(nboValueLength));
    offset += sizeof(nboValueLength);

    size_t len = KineticProto_Message__pack(&request->protoData.message.message, &msg[offset]);
    assert(len == request->header.protobufLength);
    offset += request->header.protobufLength;
    size_t len = KineticProto_Message__pack(&request->message.message, &msg[offset]);
    assert(len == header.protobufLength);
    offset += header.protobufLength;

    if (request->protoData.message.has_command)
    if (request->message.has_command)
    {
        free(request->protoData.message.message.commandBytes.data);
        request->protoData.message.message.commandBytes.data = NULL;
        free(request->message.message.commandBytes.data);
        request->message.message.commandBytes.data = NULL;
    }

    // Send the value/payload, if specified
    if (request->header.valueLength > 0) {
    if (header.valueLength > 0) {
        LOGF1("Sending PDU Value Payload (%zu bytes)", operation->entry->value.bytesUsed);
        memcpy(&msg[offset], operation->entry->value.array.data, operation->entry->value.bytesUsed);
        offset += operation->entry->value.bytesUsed;
    }
    assert((PDU_HEADER_LEN + request->header.protobufLength + request->header.valueLength) == offset);
    assert((PDU_HEADER_LEN + header.protobufLength + header.valueLength) == offset);

    bus_send_request(operation->connection->messageBus, &(bus_user_msg){
        .fd       = operation->connection->socket,
        .type     = BUS_SOCKET_PLAIN,
        // #TODO it would probably be good to clean up how we setup this sequence number
        .seq_id   = request->protoData.message.header.sequence,
        .seq_id   = request->message.header.sequence,
        .msg      = msg,
        .msg_size = offset,
        .cb       = KineticController_HandleExpectedResponse,
@@ -185,8 +184,8 @@ void KineticOperation_BuildNoop(KineticOperation* const operation)
{
    KineticOperation_ValidateOperation(operation);
    KineticSession_IncrementSequence(&operation->connection->session);
    operation->request->protoData.message.command.header->messageType = KINETIC_PROTO_COMMAND_MESSAGE_TYPE_NOOP;
    operation->request->protoData.message.command.header->has_messageType = true;
    operation->request->message.command.header->messageType = KINETIC_PROTO_COMMAND_MESSAGE_TYPE_NOOP;
    operation->request->message.command.header->has_messageType = true;
    operation->valueEnabled = false;
    // ######## TODO should be able to remove sendvalue
    operation->sendValue = false;
@@ -231,11 +230,11 @@ void KineticOperation_BuildPut(KineticOperation* const operation,
    KineticOperation_ValidateOperation(operation);
    KineticSession_IncrementSequence(&operation->connection->session);

    operation->request->protoData.message.command.header->messageType = KINETIC_PROTO_COMMAND_MESSAGE_TYPE_PUT;
    operation->request->protoData.message.command.header->has_messageType = true;
    operation->request->message.command.header->messageType = KINETIC_PROTO_COMMAND_MESSAGE_TYPE_PUT;
    operation->request->message.command.header->has_messageType = true;
    operation->entry = entry;

    KineticMessage_ConfigureKeyValue(&operation->request->protoData.message, operation->entry);
    KineticMessage_ConfigureKeyValue(&operation->request->message, operation->entry);

    operation->valueEnabled = !operation->entry->metadataOnly;
    operation->sendValue = true;
@@ -277,11 +276,11 @@ static void build_get_command(KineticOperation* const operation,
    KineticOperation_ValidateOperation(operation);
    KineticSession_IncrementSequence(&operation->connection->session);

    operation->request->protoData.message.command.header->messageType = command_id;
    operation->request->protoData.message.command.header->has_messageType = true;
    operation->request->message.command.header->messageType = command_id;
    operation->request->message.command.header->has_messageType = true;
    operation->entry = entry;

    KineticMessage_ConfigureKeyValue(&operation->request->protoData.message, entry);
    KineticMessage_ConfigureKeyValue(&operation->request->message, entry);

    if (operation->entry->value.array.data != NULL) {
        ByteBuffer_Reset(&operation->entry->value);
@@ -342,9 +341,9 @@ void KineticOperation_BuildFlush(KineticOperation* const operation)
{
    KineticOperation_ValidateOperation(operation);
    KineticSession_IncrementSequence(&operation->connection->session);
    operation->request->protoData.message.command.header->messageType =
    operation->request->message.command.header->messageType =
      KINETIC_PROTO_COMMAND_MESSAGE_TYPE_FLUSHALLDATA;
    operation->request->protoData.message.command.header->has_messageType = true;
    operation->request->message.command.header->has_messageType = true;
    operation->valueEnabled = false;
    operation->sendValue = false;
    operation->callback = &KineticOperation_FlushCallback;
@@ -367,11 +366,11 @@ void KineticOperation_BuildDelete(KineticOperation* const operation,
    KineticOperation_ValidateOperation(operation);
    KineticSession_IncrementSequence(&operation->connection->session);

    operation->request->protoData.message.command.header->messageType = KINETIC_PROTO_COMMAND_MESSAGE_TYPE_DELETE;
    operation->request->protoData.message.command.header->has_messageType = true;
    operation->request->message.command.header->messageType = KINETIC_PROTO_COMMAND_MESSAGE_TYPE_DELETE;
    operation->request->message.command.header->has_messageType = true;
    operation->entry = entry;

    KineticMessage_ConfigureKeyValue(&operation->request->protoData.message, operation->entry);
    KineticMessage_ConfigureKeyValue(&operation->request->message, operation->entry);

    if (operation->entry->value.array.data != NULL) {
        ByteBuffer_Reset(&operation->entry->value);
@@ -416,7 +415,7 @@ void KineticOperation_BuildGetKeyRange(KineticOperation* const operation,
    operation->request->command->header->messageType = KINETIC_PROTO_COMMAND_MESSAGE_TYPE_GETKEYRANGE;
    operation->request->command->header->has_messageType = true;

    KineticMessage_ConfigureKeyRange(&operation->request->protoData.message, range);
    KineticMessage_ConfigureKeyRange(&operation->request->message, range);

    operation->valueEnabled = false;
    operation->sendValue = false;
@@ -459,9 +458,9 @@ void KineticOperation_BuildGetLog(KineticOperation* const operation,
        
    operation->request->command->header->messageType = KINETIC_PROTO_COMMAND_MESSAGE_TYPE_GETLOG;
    operation->request->command->header->has_messageType = true;
    operation->request->command->body = &operation->request->protoData.message.body;
    operation->request->command->body->getLog = &operation->request->protoData.message.getLog;
    operation->request->command->body->getLog->types = &operation->request->protoData.message.getLogType;
    operation->request->command->body = &operation->request->message.body;
    operation->request->command->body->getLog = &operation->request->message.getLog;
    operation->request->command->body->getLog->types = &operation->request->message.getLogType;
    operation->request->command->body->getLog->types[0] = protoType;
    operation->request->command->body->getLog->n_types = 1;
    operation->deviceInfo = info;
@@ -626,7 +625,7 @@ KineticStatus KineticOperation_BuildP2POperation(KineticOperation* const operati
        
    operation->request->command->header->messageType = KINETIC_PROTO_COMMAND_MESSAGE_TYPE_PEER2PEERPUSH;
    operation->request->command->header->has_messageType = true;
    operation->request->command->body = &operation->request->protoData.message.body;
    operation->request->command->body = &operation->request->message.body;

    operation->request->command->body->p2pOperation = build_p2pOp(0, p2pOp);
    
@@ -652,10 +651,10 @@ void KineticOperation_BuildInstantSecureErase(KineticOperation* operation)
{
    KineticOperation_ValidateOperation(operation);
    KineticSession_IncrementSequence(&operation->connection->session);
    operation->request->protoData.message.command.header->messageType = KINETIC_PROTO_COMMAND_MESSAGE_TYPE_SETUP;
    operation->request->protoData.message.command.header->has_messageType = true;
    operation->request->command->body = &operation->request->protoData.message.body;
    operation->request->command->body->pinOp = &operation->request->protoData.message.pinOp;
    operation->request->message.command.header->messageType = KINETIC_PROTO_COMMAND_MESSAGE_TYPE_SETUP;
    operation->request->message.command.header->has_messageType = true;
    operation->request->command->body = &operation->request->message.body;
    operation->request->command->body->pinOp = &operation->request->message.pinOp;
 
#if 0
    /* Replace HMAC auth with pin auth */
@@ -674,7 +673,7 @@ void KineticOperation_BuildInstantSecureErase(KineticOperation* operation)
    };
#endif

    pdu->pinAuth = &operation->request->protoData.message.pinAuth;
    pdu->pinAuth = &operation->request->message.pinAuth;
    pdu->pinAuth->pin = pin_bd;
    pdu->pinAuth->has_pin = true;
#endif
@@ -703,13 +702,13 @@ void KineticOperation_BuildSetClusterVersion(KineticOperation* operation, int64_
{
    KineticOperation_ValidateOperation(operation);
    KineticSession_IncrementSequence(&operation->connection->session);
    operation->request->protoData.message.command.header->messageType = KINETIC_PROTO_COMMAND_MESSAGE_TYPE_SETUP;
    operation->request->protoData.message.command.header->has_messageType = true;
    operation->request->message.command.header->messageType = KINETIC_PROTO_COMMAND_MESSAGE_TYPE_SETUP;
    operation->request->message.command.header->has_messageType = true;
    
    operation->request->command->body->setup->newClusterVersion = newClusterVersion;
    operation->request->command->body->setup->has_newClusterVersion = true;

    operation->request->command->body = &operation->request->protoData.message.body;
    operation->request->command->body = &operation->request->message.body;

    operation->valueEnabled = false;
    operation->sendValue = false;
@@ -721,9 +720,7 @@ static void KineticOperation_ValidateOperation(KineticOperation* operation)
    assert(operation != NULL);
    assert(operation->connection != NULL);
    assert(operation->request != NULL);
    assert(operation->request->connection != NULL);
    assert(operation->request->proto != NULL);
    assert(operation->request->protoData.message.has_command);
    assert(operation->request->message.has_command);
    assert(operation->request->command != NULL);
    assert(operation->request->command->header != NULL);
    assert(operation->request->command->header->has_sequence);
+0 −1
Original line number Diff line number Diff line
@@ -44,7 +44,6 @@ KineticStatus KineticSession_Create(KineticSession * const session, KineticClien
        return KINETIC_STATUS_MEMORY_ERROR;
    }

    KINETIC_CONNECTION_INIT(session->connection);
    session->connection->session = *session; // TODO: KILL ME!!!
    session->connection->messageBus = client->bus;
    return KINETIC_STATUS_SUCCESS;
+0 −60
Original line number Diff line number Diff line
@@ -307,44 +307,6 @@ KineticStatus KineticSocket_Read(int socket, ByteBuffer* dest, size_t len)
    return KINETIC_STATUS_SUCCESS;
}

KineticStatus KineticSocket_ReadProtobuf(int socket, KineticPDU* pdu)
{
    size_t bytesToRead = pdu->header.protobufLength;
    LOGF3("Reading %zd bytes of protobuf", bytesToRead);

    uint8_t* packed = (uint8_t*)malloc(bytesToRead);
    if (packed == NULL) {
        LOG0("Failed allocating memory for protocol buffer");
        return KINETIC_STATUS_MEMORY_ERROR;
    }

    ByteBuffer recvBuffer = ByteBuffer_Create(packed, bytesToRead, 0);
    KineticStatus status = KineticSocket_Read(socket, &recvBuffer, bytesToRead);

    if (status != KINETIC_STATUS_SUCCESS) {
        LOG0("Protobuf read failed!");
        free(packed);
        return status;
    }
    else {
        pdu->proto = KineticProto_Message__unpack(
                         NULL, recvBuffer.bytesUsed, recvBuffer.array.data);
    }

    free(packed);

    if (pdu->proto == NULL) {
        pdu->protobufDynamicallyExtracted = false;
        LOG0("Error unpacking incoming Kinetic protobuf message!");
        return KINETIC_STATUS_DATA_ERROR;
    }
    else {
        pdu->protobufDynamicallyExtracted = true;
        LOG3("Protobuf unpacked successfully!");
        return KINETIC_STATUS_SUCCESS;
    }
}

KineticStatus KineticSocket_Write(int socket, ByteBuffer* src)
{
    LOGF3("Writing %zu bytes to socket...", src->bytesUsed);
@@ -368,28 +330,6 @@ KineticStatus KineticSocket_Write(int socket, ByteBuffer* src)
    return KINETIC_STATUS_SUCCESS;
}

KineticStatus KineticSocket_WriteProtobuf(int socket, KineticPDU* pdu)
{
    assert(pdu != NULL);
    LOGF3("Writing protobuf (%zd bytes)...", pdu->header.protobufLength);

    uint8_t* packed = (uint8_t*)malloc(pdu->header.protobufLength);

    if (packed == NULL) {
        LOG0("Failed allocating memory for protocol buffer");
        return KINETIC_STATUS_MEMORY_ERROR;
    }
    size_t len = KineticProto_Message__pack(&pdu->protoData.message.message, packed);
    assert(len == pdu->header.protobufLength);

    ByteBuffer buffer = ByteBuffer_Create(packed, len, len);

    KineticStatus status = KineticSocket_Write(socket, &buffer);

    free(packed);
    return status;
}

void KineticSocket_BeginPacket(int socket)
{
#if !defined(__APPLE__) /* TCP_CORK is NOT available on OSX */
+0 −2
Original line number Diff line number Diff line
@@ -38,10 +38,8 @@ void KineticSocket_Close(int socket);
int KineticSocket_DataBytesAvailable(int socket);
KineticWaitStatus KineticSocket_WaitUntilDataAvailable(int socket, int timeout);
KineticStatus KineticSocket_Read(int socket, ByteBuffer* dest, size_t len);
KineticStatus KineticSocket_ReadProtobuf(int socket, KineticPDU* pdu);

KineticStatus KineticSocket_Write(int socket, ByteBuffer* src);
KineticStatus KineticSocket_WriteProtobuf(int socket, KineticPDU* pdu);
void KineticSocket_BeginPacket(int socket);
void KineticSocket_FinishPacket(int socket);

Loading