Commit 45515f11 authored by Scott Vokes's avatar Scott Vokes
Browse files

Merge remote-tracking branch 'origin/develop' into bus

Conflicts:
	src/lib/kinetic_types_internal.h
parents 5dd9b736 b260d8d7
Loading
Loading
Loading
Loading
+21 −53
Original line number Diff line number Diff line
@@ -23,14 +23,13 @@
#include <stdlib.h>
#include <pthread.h>


//==============================================================================
// Generic List Support (INTERNAL)
//==============================================================================
KineticConnection* KineticAllocator_NewConnection(void)
{
    KineticConnection* connection = calloc(1, sizeof(KineticConnection));
    KINETIC_CONNECTION_INIT(connection);
    if (connection == NULL) {
        LOG0("Failed allocating new Connection!");
        return NULL;
    }
    return connection;
}

@@ -40,40 +39,6 @@ void KineticAllocator_FreeConnection(KineticConnection* connection)
    free(connection);
}

//==============================================================================
// PDU List Support
//==============================================================================

KineticPDU* KineticAllocator_NewPDU(KineticConnection* connection)
{
    assert(connection != NULL);
    LOGF3("Allocating new PDU on connection (0x%0llX)", connection);
    KineticPDU* newPDU = (KineticPDU*)calloc(1, sizeof(KineticPDU));
    if (newPDU == NULL) {
        LOG0("Failed allocating new PDU!");
        return NULL;
    }
    assert(newPDU->proto == NULL);
    KineticPDU_Init(newPDU, connection);
    LOGF3("Allocated new PDU (0x%0llX) on connection", newPDU, connection);
    return newPDU;
}

void KineticAllocator_FreePDU(KineticPDU* pdu)
{
    KineticConnection* connection = pdu->connection;
    LOGF3("Freeing PDU (0x%0llX) on connection (0x%0llX)", pdu, connection);
    if (pdu && (pdu->proto != NULL) && pdu->protobufDynamicallyExtracted) {
        LOG3("Freeing dynamically allocated protobuf");
        KineticProto_Message__free_unpacked(pdu->proto, NULL);
        pdu->proto = NULL;
    };
    
    free(pdu);
    LOGF3("Freed PDU (0x%0llX) on connection (0x%0llX)", pdu, connection);
}


KineticResponse * KineticAllocator_NewKineticResponse(size_t const valueLength)
{
    KineticResponse * response = calloc(1, sizeof(*response) + valueLength);
@@ -86,7 +51,8 @@ KineticResponse * KineticAllocator_NewKineticResponse(size_t const valueLength)

void KineticAllocator_FreeKineticResponse(KineticResponse * response)
{
    if (response != NULL) {
    assert(response != NULL);

    if (response->command != NULL) {
        protobuf_c_message_free_unpacked(&response->command->base, NULL);
    }
@@ -95,11 +61,6 @@ void KineticAllocator_FreeKineticResponse(KineticResponse * response)
    }
    free(response);
}
}

//==============================================================================
// Operation List Support
//==============================================================================

KineticOperation* KineticAllocator_NewOperation(KineticConnection* const connection)
{
@@ -112,7 +73,13 @@ KineticOperation* KineticAllocator_NewOperation(KineticConnection* const connect
        return NULL;
    }
    KineticOperation_Init(newOperation, connection);
    newOperation->request = KineticAllocator_NewPDU(connection);
    LOGF3("Allocating new PDU on connection (0x%0llX)", connection);
    newOperation->request = (KineticPDU*)calloc(1, sizeof(KineticPDU));
    if (newOperation->request == NULL) {
        LOG0("Failed allocating new PDU!");
        free(newOperation);
        return NULL;
    }
    KineticPDU_InitWithCommand(newOperation->request, connection);
    LOGF3("Allocated new operation (0x%0llX) on connection (0x%0llX)", newOperation, connection);
    return newOperation;
@@ -126,7 +93,8 @@ void KineticAllocator_FreeOperation(KineticOperation* operation)
    if (operation->request != NULL) {
        LOGF3("Freeing request PDU (0x%0llX) from operation (0x%0llX) on connection (0x%0llX)",
            operation->request, operation, connection);
        KineticAllocator_FreePDU(operation->request);
        free(operation->request);
        LOGF3("Freed PDU (0x%0llX)", operation->request);
        operation->request = NULL;
    }
    if (operation->response != NULL) {
+78 −86
Original line number Diff line number Diff line
@@ -40,118 +40,113 @@ KineticStatus KineticOperation_SendRequest(KineticOperation* const operation)
    assert(operation != NULL);
    assert(operation->connection != NULL);
    assert(operation->request != NULL);
    assert(operation->request->connection == operation->connection);
    LOGF2("\nSending PDU via fd=%d", operation->connection->messageBus);
    KineticPDU* request = operation->request;
    request->proto = &operation->request->protoData.message.message;
    KineticProto_Message* proto = &operation->request->message.message;

    // Pack the command, if available
    if (request->protoData.message.has_command) {
        size_t expectedLen = KineticProto_command__get_packed_size(&request->protoData.message.command);
        request->protoData.message.message.commandBytes.data = (uint8_t*)malloc(expectedLen);
        assert(request->protoData.message.message.commandBytes.data != NULL);
    size_t expectedLen = KineticProto_command__get_packed_size(&request->message.command);
    request->message.message.commandBytes.data = (uint8_t*)malloc(expectedLen);
    if(request->message.message.commandBytes.data == NULL)
    {
        LOG0("Failed to allocate command bytes!");
        return KINETIC_STATUS_MEMORY_ERROR;
    }
    size_t packedLen = KineticProto_command__pack(
            &request->protoData.message.command,
            request->protoData.message.message.commandBytes.data);
        &request->message.command,
        request->message.message.commandBytes.data);
    assert(packedLen == expectedLen);
        request->protoData.message.message.commandBytes.len = packedLen;
        request->protoData.message.message.has_commandBytes = true;
    request->message.message.commandBytes.len = packedLen;
    request->message.message.has_commandBytes = true;
    KineticLogger_LogByteArray(2, "commandBytes", (ByteArray){
            .data = request->protoData.message.message.commandBytes.data,
            .len = request->protoData.message.message.commandBytes.len,
        .data = request->message.message.commandBytes.data,
        .len = request->message.message.commandBytes.len,
    });
    }

    switch (operation->request->proto->authType) {
    switch (proto->authType) {
    case KINETIC_PROTO_MESSAGE_AUTH_TYPE_PINAUTH:
        /* TODO: If operation uses PIN AUTH, then init that */
        break;
    case KINETIC_PROTO_MESSAGE_AUTH_TYPE_HMACAUTH:
        {
            KineticHMAC hmac;

            // Populate the HMAC for the protobuf
        KineticHMAC_Init(&request->hmac, KINETIC_PROTO_COMMAND_SECURITY_ACL_HMACALGORITHM_HmacSHA1);
        KineticHMAC_Populate(&request->hmac, request->proto, request->connection->session.config.hmacKey);
        break;
            KineticHMAC_Init(&hmac, KINETIC_PROTO_COMMAND_SECURITY_ACL_HMACALGORITHM_HmacSHA1);
            KineticHMAC_Populate(&hmac, proto, operation->connection->session.config.hmacKey);
        } break;
    default:
        break;
    }

    KineticPDUHeader header;

    // Configure PDU header length fields
    request->header.versionPrefix = 'F';
    request->header.protobufLength = KineticProto_Message__get_packed_size(request->proto);
    KineticLogger_LogProtobuf(3, request->proto);
    header.versionPrefix = 'F';
    header.protobufLength = KineticProto_Message__get_packed_size(proto);
    KineticLogger_LogProtobuf(3, proto);

    if (request->header.protobufLength > PDU_PROTO_MAX_LEN) {
    if (header.protobufLength > PDU_PROTO_MAX_LEN) {
        // Packed message exceeds max size.
        LOGF2("\nPacked protobuf exceeds maximum size. Packed size is: %d, Max size is: %d", request->header.protobufLength, PDU_PROTO_MAX_LEN);
        LOGF2("\nPacked protobuf exceeds maximum size. Packed size is: %d, Max size is: %d", header.protobufLength, PDU_PROTO_MAX_LEN);
        return KINETIC_STATUS_BUFFER_OVERRUN;
    }

    if (operation->entry != NULL && operation->sendValue) {
        request->header.valueLength = operation->entry->value.bytesUsed;
        if (request->header.valueLength > PDU_PROTO_MAX_LEN) {
        header.valueLength = operation->entry->value.bytesUsed;
        if (header.valueLength > PDU_PROTO_MAX_LEN) {
            // Packed value exceeds max size.
            LOGF2("\nPacked value exceeds maximum size. Packed size is: %d, Max size is: %d", request->header.valueLength, PDU_PROTO_MAX_LEN);
            LOGF2("\nPacked value exceeds maximum size. Packed size is: %d, Max size is: %d", header.valueLength, PDU_PROTO_MAX_LEN);
            return KINETIC_STATUS_BUFFER_OVERRUN;
        }
    }
    else {
        request->header.valueLength = 0;
        header.valueLength = 0;
    }
    KineticLogger_LogHeader(1, &request->header);

    // Create NBO copy of header for sending
    request->headerNBO.versionPrefix = 'F';
    request->headerNBO.protobufLength = KineticNBO_FromHostU32(request->header.protobufLength);
    request->headerNBO.valueLength = KineticNBO_FromHostU32(request->header.valueLength);
    KineticLogger_LogHeader(1, &header);

    uint32_t nboProtoLength = KineticNBO_FromHostU32(request->header.protobufLength);
    uint32_t nboValueLength = KineticNBO_FromHostU32(request->header.valueLength);
    uint32_t nboProtoLength = KineticNBO_FromHostU32(header.protobufLength);
    uint32_t nboValueLength = KineticNBO_FromHostU32(header.valueLength);

    size_t offset = 0;
    uint8_t * msg = malloc(PDU_HEADER_LEN + request->header.protobufLength + request->header.valueLength);
    uint8_t * msg = malloc(PDU_HEADER_LEN + header.protobufLength + header.valueLength);
    if (msg == NULL)
    {
        if (request->protoData.message.has_command)
        {
            free(request->protoData.message.message.commandBytes.data);
            request->protoData.message.message.commandBytes.data = NULL;
        }
        free(request->message.message.commandBytes.data);
        request->message.message.commandBytes.data = NULL;

        LOG0("Failed to allocate outgoing message!");
        return KINETIC_STATUS_MEMORY_ERROR;
    }
    
    msg[offset] = request->header.versionPrefix;
    offset += sizeof(request->header.versionPrefix);
    msg[offset] = header.versionPrefix;
    offset += sizeof(header.versionPrefix);

    memcpy(&msg[offset], &nboProtoLength, sizeof(nboProtoLength));
    offset += sizeof(nboProtoLength);
    memcpy(&msg[offset], &nboValueLength, sizeof(nboValueLength));
    offset += sizeof(nboValueLength);

    size_t len = KineticProto_Message__pack(&request->protoData.message.message, &msg[offset]);
    assert(len == request->header.protobufLength);
    offset += request->header.protobufLength;
    size_t len = KineticProto_Message__pack(&request->message.message, &msg[offset]);
    assert(len == header.protobufLength);
    offset += header.protobufLength;

    if (request->protoData.message.has_command)
    {
        free(request->protoData.message.message.commandBytes.data);
        request->protoData.message.message.commandBytes.data = NULL;
    }
    free(request->message.message.commandBytes.data);
    request->message.message.commandBytes.data = NULL;

    // Send the value/payload, if specified
    if (request->header.valueLength > 0) {
    if (header.valueLength > 0) {
        LOGF1("Sending PDU Value Payload (%zu bytes)", operation->entry->value.bytesUsed);
        memcpy(&msg[offset], operation->entry->value.array.data, operation->entry->value.bytesUsed);
        offset += operation->entry->value.bytesUsed;
    }
    assert((PDU_HEADER_LEN + request->header.protobufLength + request->header.valueLength) == offset);
    assert((PDU_HEADER_LEN + header.protobufLength + header.valueLength) == offset);

    bus_send_request(operation->connection->messageBus, &(bus_user_msg){
        .fd       = operation->connection->socket,
        .type     = BUS_SOCKET_PLAIN,
        // #TODO it would probably be good to clean up how we setup this sequence number
        .seq_id   = request->protoData.message.header.sequence,
        .seq_id   = request->message.header.sequence,
        .msg      = msg,
        .msg_size = offset,
        .cb       = KineticController_HandleExpectedResponse,
@@ -185,8 +180,8 @@ void KineticOperation_BuildNoop(KineticOperation* const operation)
{
    KineticOperation_ValidateOperation(operation);
    KineticSession_IncrementSequence(&operation->connection->session);
    operation->request->protoData.message.command.header->messageType = KINETIC_PROTO_COMMAND_MESSAGE_TYPE_NOOP;
    operation->request->protoData.message.command.header->has_messageType = true;
    operation->request->message.command.header->messageType = KINETIC_PROTO_COMMAND_MESSAGE_TYPE_NOOP;
    operation->request->message.command.header->has_messageType = true;
    operation->valueEnabled = false;
    // ######## TODO should be able to remove sendvalue
    operation->sendValue = false;
@@ -231,11 +226,11 @@ void KineticOperation_BuildPut(KineticOperation* const operation,
    KineticOperation_ValidateOperation(operation);
    KineticSession_IncrementSequence(&operation->connection->session);

    operation->request->protoData.message.command.header->messageType = KINETIC_PROTO_COMMAND_MESSAGE_TYPE_PUT;
    operation->request->protoData.message.command.header->has_messageType = true;
    operation->request->message.command.header->messageType = KINETIC_PROTO_COMMAND_MESSAGE_TYPE_PUT;
    operation->request->message.command.header->has_messageType = true;
    operation->entry = entry;

    KineticMessage_ConfigureKeyValue(&operation->request->protoData.message, operation->entry);
    KineticMessage_ConfigureKeyValue(&operation->request->message, operation->entry);

    operation->valueEnabled = !operation->entry->metadataOnly;
    operation->sendValue = true;
@@ -277,11 +272,11 @@ static void build_get_command(KineticOperation* const operation,
    KineticOperation_ValidateOperation(operation);
    KineticSession_IncrementSequence(&operation->connection->session);

    operation->request->protoData.message.command.header->messageType = command_id;
    operation->request->protoData.message.command.header->has_messageType = true;
    operation->request->message.command.header->messageType = command_id;
    operation->request->message.command.header->has_messageType = true;
    operation->entry = entry;

    KineticMessage_ConfigureKeyValue(&operation->request->protoData.message, entry);
    KineticMessage_ConfigureKeyValue(&operation->request->message, entry);

    if (operation->entry->value.array.data != NULL) {
        ByteBuffer_Reset(&operation->entry->value);
@@ -342,9 +337,9 @@ void KineticOperation_BuildFlush(KineticOperation* const operation)
{
    KineticOperation_ValidateOperation(operation);
    KineticSession_IncrementSequence(&operation->connection->session);
    operation->request->protoData.message.command.header->messageType =
    operation->request->message.command.header->messageType =
      KINETIC_PROTO_COMMAND_MESSAGE_TYPE_FLUSHALLDATA;
    operation->request->protoData.message.command.header->has_messageType = true;
    operation->request->message.command.header->has_messageType = true;
    operation->valueEnabled = false;
    operation->sendValue = false;
    operation->callback = &KineticOperation_FlushCallback;
@@ -367,11 +362,11 @@ void KineticOperation_BuildDelete(KineticOperation* const operation,
    KineticOperation_ValidateOperation(operation);
    KineticSession_IncrementSequence(&operation->connection->session);

    operation->request->protoData.message.command.header->messageType = KINETIC_PROTO_COMMAND_MESSAGE_TYPE_DELETE;
    operation->request->protoData.message.command.header->has_messageType = true;
    operation->request->message.command.header->messageType = KINETIC_PROTO_COMMAND_MESSAGE_TYPE_DELETE;
    operation->request->message.command.header->has_messageType = true;
    operation->entry = entry;

    KineticMessage_ConfigureKeyValue(&operation->request->protoData.message, operation->entry);
    KineticMessage_ConfigureKeyValue(&operation->request->message, operation->entry);

    if (operation->entry->value.array.data != NULL) {
        ByteBuffer_Reset(&operation->entry->value);
@@ -416,7 +411,7 @@ void KineticOperation_BuildGetKeyRange(KineticOperation* const operation,
    operation->request->command->header->messageType = KINETIC_PROTO_COMMAND_MESSAGE_TYPE_GETKEYRANGE;
    operation->request->command->header->has_messageType = true;

    KineticMessage_ConfigureKeyRange(&operation->request->protoData.message, range);
    KineticMessage_ConfigureKeyRange(&operation->request->message, range);

    operation->valueEnabled = false;
    operation->sendValue = false;
@@ -459,9 +454,9 @@ void KineticOperation_BuildGetLog(KineticOperation* const operation,
        
    operation->request->command->header->messageType = KINETIC_PROTO_COMMAND_MESSAGE_TYPE_GETLOG;
    operation->request->command->header->has_messageType = true;
    operation->request->command->body = &operation->request->protoData.message.body;
    operation->request->command->body->getLog = &operation->request->protoData.message.getLog;
    operation->request->command->body->getLog->types = &operation->request->protoData.message.getLogType;
    operation->request->command->body = &operation->request->message.body;
    operation->request->command->body->getLog = &operation->request->message.getLog;
    operation->request->command->body->getLog->types = &operation->request->message.getLogType;
    operation->request->command->body->getLog->types[0] = protoType;
    operation->request->command->body->getLog->n_types = 1;
    operation->deviceInfo = info;
@@ -626,7 +621,7 @@ KineticStatus KineticOperation_BuildP2POperation(KineticOperation* const operati
        
    operation->request->command->header->messageType = KINETIC_PROTO_COMMAND_MESSAGE_TYPE_PEER2PEERPUSH;
    operation->request->command->header->has_messageType = true;
    operation->request->command->body = &operation->request->protoData.message.body;
    operation->request->command->body = &operation->request->message.body;

    operation->request->command->body->p2pOperation = build_p2pOp(0, p2pOp);
    
@@ -652,10 +647,10 @@ void KineticOperation_BuildInstantSecureErase(KineticOperation* operation)
{
    KineticOperation_ValidateOperation(operation);
    KineticSession_IncrementSequence(&operation->connection->session);
    operation->request->protoData.message.command.header->messageType = KINETIC_PROTO_COMMAND_MESSAGE_TYPE_SETUP;
    operation->request->protoData.message.command.header->has_messageType = true;
    operation->request->command->body = &operation->request->protoData.message.body;
    operation->request->command->body->pinOp = &operation->request->protoData.message.pinOp;
    operation->request->message.command.header->messageType = KINETIC_PROTO_COMMAND_MESSAGE_TYPE_SETUP;
    operation->request->message.command.header->has_messageType = true;
    operation->request->command->body = &operation->request->message.body;
    operation->request->command->body->pinOp = &operation->request->message.pinOp;
 
#if 0
    /* Replace HMAC auth with pin auth */
@@ -674,7 +669,7 @@ void KineticOperation_BuildInstantSecureErase(KineticOperation* operation)
    };
#endif

    pdu->pinAuth = &operation->request->protoData.message.pinAuth;
    pdu->pinAuth = &operation->request->message.pinAuth;
    pdu->pinAuth->pin = pin_bd;
    pdu->pinAuth->has_pin = true;
#endif
@@ -703,13 +698,13 @@ void KineticOperation_BuildSetClusterVersion(KineticOperation* operation, int64_
{
    KineticOperation_ValidateOperation(operation);
    KineticSession_IncrementSequence(&operation->connection->session);
    operation->request->protoData.message.command.header->messageType = KINETIC_PROTO_COMMAND_MESSAGE_TYPE_SETUP;
    operation->request->protoData.message.command.header->has_messageType = true;
    operation->request->message.command.header->messageType = KINETIC_PROTO_COMMAND_MESSAGE_TYPE_SETUP;
    operation->request->message.command.header->has_messageType = true;
    
    operation->request->command->body->setup->newClusterVersion = newClusterVersion;
    operation->request->command->body->setup->has_newClusterVersion = true;

    operation->request->command->body = &operation->request->protoData.message.body;
    operation->request->command->body = &operation->request->message.body;

    operation->valueEnabled = false;
    operation->sendValue = false;
@@ -721,9 +716,6 @@ static void KineticOperation_ValidateOperation(KineticOperation* operation)
    assert(operation != NULL);
    assert(operation->connection != NULL);
    assert(operation->request != NULL);
    assert(operation->request->connection != NULL);
    assert(operation->request->proto != NULL);
    assert(operation->request->protoData.message.has_command);
    assert(operation->request->command != NULL);
    assert(operation->request->command->header != NULL);
    assert(operation->request->command->header->has_sequence);
+0 −1
Original line number Diff line number Diff line
@@ -48,7 +48,6 @@ KineticStatus KineticSession_Create(KineticSession * const session, KineticClien
        return KINETIC_STATUS_MEMORY_ERROR;
    }

    KINETIC_CONNECTION_INIT(session->connection);
    session->connection->session = *session; // TODO: KILL ME!!!
    session->connection->messageBus = client->bus;
    return KINETIC_STATUS_SUCCESS;
+0 −239

File changed.

Preview size limit exceeded, changes collapsed.

+0 −7
Original line number Diff line number Diff line
@@ -35,13 +35,6 @@ typedef enum
int KineticSocket_Connect(const char* host, int port);
void KineticSocket_Close(int socket);

int KineticSocket_DataBytesAvailable(int socket);
KineticWaitStatus KineticSocket_WaitUntilDataAvailable(int socket, int timeout);
KineticStatus KineticSocket_Read(int socket, ByteBuffer* dest, size_t len);
KineticStatus KineticSocket_ReadProtobuf(int socket, KineticPDU* pdu);

KineticStatus KineticSocket_Write(int socket, ByteBuffer* src);
KineticStatus KineticSocket_WriteProtobuf(int socket, KineticPDU* pdu);
void KineticSocket_BeginPacket(int socket);
void KineticSocket_FinishPacket(int socket);

Loading