Commit 810f17f7 authored by Greg Williams's avatar Greg Williams
Browse files

Almost done internalizing PDUs. Build still broken.

parent 05b294de
Loading
Loading
Loading
Loading
+37 −10
Original line number Diff line number Diff line
@@ -53,9 +53,24 @@
    #define HOST_NAME_MAX 256
#endif // HOST_NAME_MAX

#ifndef LOG_FILE_NAME_MAX
#define LOG_FILE_NAME_MAX (HOST_NAME_MAX)
#endif

#include <time.h>

typedef ProtobufCBinaryData ByteArray;
/**
 * @brief Structure for handling generic arrays of bytes
 *
 * The data contained in a `ByteArray` is an arbitrary sequence of
 * bytes. It may contain embedded `NULL` characters and is not required to be
 * `NULL`-terminated.
 */
typedef struct _ByteArray {
    size_t  len;    /**< Number of bytes in the `data` field. */
    uint8_t *data;  /**< Pointer to an allocated array of data bytes. */
} ByteArray;

#define BYTE_ARRAY_NONE \
    (ByteArray){.len = 0, .data = NULL}
#define BYTE_ARRAY_INIT(_data) (ByteArray) \
@@ -75,12 +90,15 @@ typedef ProtobufCBinaryData ByteArray;

typedef struct
{
    ByteArray   buffer;
    size_t      maxLen;
    ByteArray   array;
    size_t      bytesUsed;
} ByteBuffer;
#define BYTE_BUFFER_INIT(_buf, _max) (ByteBuffer) { \
    .buffer = {.data = (uint8_t*)(_buf), .len = 0}, \
    .maxLen = sizeof(_buf) }
#define BYTE_BUFFER_INIT(_array) (ByteBuffer) { \
    .array = (ByteArray) { \
        .data = (_array).data, \
        .len = (_array).len }, \
    .bytesUsed = 0, \
}


typedef enum _KineticAlgorithm {
@@ -105,15 +123,21 @@ typedef enum _KineticSynchronization {
#define SESSION_HANDLE_INVALID (0)
typedef struct _KineticSession
{
    int     handle;
    // Log file name (uses stdout if empty)
    char    logFile[LOG_FILE_NAME_MAX];

    // Set to true to enable non-blocking/asynchronous I/O
    bool    nonBlocking;

    // Port for Kinetic Device session
    int     port;

    // Host name/IP address of Kinetic Device
    char    host[HOST_NAME_MAX];
    char    logFile[LOG_FILE_NAME_MAX],

    // The version number of this cluster definition. If this is not equal to
    // the value on the device, the request is rejected and will return a
    // `VERSION_FAILURE` `statusCode` in the `Status` message.
    // the value on the Kinetic Device, the request is rejected and will return
    // `KINETIC_STATUS_VERSION_FAILURE`
    int64_t clusterVersion;

    // The identity associated with this request. See the ACL discussion above.
@@ -125,6 +149,9 @@ typedef struct _KineticSession
    // client and the device, used to sign requests.
    uint8_t keyData[KINETIC_MAX_KEY_LEN];
    ByteArray hmacKey;

    // Session instance handle (0 = none/invalid session)
    int     handle;
} KineticSession;


+22 −9
Original line number Diff line number Diff line
@@ -28,6 +28,8 @@
#include "kinetic_logger.h"
#include <stdio.h>



static KineticStatus KineticClient_ExecuteOperation(KineticOperation* operation)
{
    KineticStatus status = KINETIC_STATUS_INVALID;
@@ -74,27 +76,38 @@ int KineticClient_Connect(KineticSession* session)
        return -1;
    }

    if (!KineticConnection_Connect(connection, host, port, nonBlocking,
        clusterVersion, identity, hmacKey))
    KineticConnection connection = KineticConnection_NewConnection(session);
    if (connection == NULL)
    {
        connection->connected = false;
        connection->socketDescriptor = -1;
        char message[64];
        sprintf(message, "Failed creating connection to %s:%d", host, port);
        LOG(message);
        LOG("Failed connecting to device (connection is NULL)!");
        return -1;
    }

    connection->connected = true;
    if (!KineticConnection_Connect(connection))
    {
        LOGF("Failed creating connection to %s:%d",
            session->host, session->port);
        return -1;
    }

    return true;
    return 0;
}

void KineticClient_Disconnect(KineticConnection* connection)
{
   KineticConnection_Disconnect(connection);
   KineticConnection_FreeConnection(connection->session);
}

/**
 * @brief Creates and initializes a Kinetic operation.
 *
 * @param connection    KineticConnection instance to associate with operation
 * @param request       KineticPDU instance to use for request
 * @param response      KineticPDU instance to use for reponse
 *
 * @return              Returns a configured operation instance
 */
KineticOperation KineticClient_CreateOperation(KineticConnection* connection,
    KineticPDU* request,
    KineticPDU* response)
+42 −29
Original line number Diff line number Diff line
@@ -21,24 +21,32 @@
#include "kinetic_connection.h"
#include "kinetic_socket.h"
#include <string.h>

#include <stdlib.h>

static KineticConnection ConnectionInstances[KINETIC_SESSIONS_MAX];
static KineticConnection* Connections[KINETIC_SESSIONS_MAX];

static int PDUsPerSession = KINETIC_PDUS_PER_SESSION_DEFAULT;

KineticConnection* KineticConnection_NewConnection(KineticSession* session)
{
    assert(session);
    session->handle = KINETIC_SESSION_INVALID;
    if (session == NULL)
    {
        return NULL;
    }
    session->handle = SESSION_HANDLE_INVALID;
    for (int handle = 1; handle <= KINETIC_SESSIONS_MAX; handle++)
    {
        if (Connections[handle-1] == NULL)
        int idx = session->handle - 1;
        if (Connections[idx] == NULL)
        {
            Connections[handle-1] = &ConnectionInstances[handle-1];
            Connections[idx] = &ConnectionInstances[idx];
            session->handle = handle;
            *Connections[handle-1] = (KineticConnection){.session = session};
            return Connections[handle-1];
            *Connections[idx] = (KineticConnection){.session = session};
            for (int i = 0; i < PDUsPerSession; i++)
            {
                Connections[idx]->pdus[i] = malloc(sizeof(KineticPDU));
            }
            return Connections[idx];
        }
    }
    return NULL;
@@ -47,44 +55,49 @@ KineticConnection* KineticConnection_NewConnection(KineticSession* session)
void KineticConnection_FreeConnection(KineticSession* session)
{
    assert(session);
    assert(session->handle > KINETIC_SESSION_INVALID);
    assert(session->handle > SESSION_HANDLE_INVALID);
    assert(session->handle <= KINETIC_SESSIONS_MAX);
    if (Connections[session->handle-1] != NULL)
    int idx = session->handle - 1;
    if (Connections[idx] != NULL)
    {
        *Connections[idx] =
            (KineticConnection) {.session = SESSION_HANDLE_INVALID};
        for (int i = 0; i < KINETIC_PDUS_PER_SESSION_MAX; i++)
        {
            if (Connections[idx]->pdus[i] != NULL)
            {
        *Connections[handle-1] = {.session = KINETIC_SESSION_INVALID};
        Connections[handle-1] = NULL;
                free(Connections[idx]->pdus[i]);
                Connections[idx]->pdus[i] = NULL;
            }
    session->handle = KINETIC_SESSION_INVALID;
        }
        Connections[idx] = NULL;
    }
    session->handle = SESSION_HANDLE_INVALID;
}


bool KineticConnection_Connect(KineticConnection* const connection)
{
    assert(connection != NULL);
    assert(connection->session != NULL);
    connection->connected = false;
    connection->nonBlocking = nonBlocking;
    connection->port = port;
    connection->socketDescriptor = -1;
    connection->clusterVersion = clusterVersion;
    connection->identity = identity;

    strcpy(connection->host, host);
    connection->key.data = connection->keyData;
    memcpy(connection->key.data, key.data, key.len);
    connection->key.len = key.len;
    connection->socket = -1;

    connection->socketDescriptor = KineticSocket_Connect(
        connection->host, connection->port, nonBlocking);
    connection->connected = (connection->socketDescriptor >= 0);
    connection->socket = KineticSocket_Connect(
        connection->session->host,
        connection->session->port,
        connection->session->nonBlocking);
    connection->connected = (connection->socket >= 0);

    return connection->connected;
}

void KineticConnection_Disconnect(KineticConnection* connection)
{
    if (connection != NULL || connection->socketDescriptor >= 0)
    if (connection != NULL || connection->socket >= 0)
    {
        close(connection->socketDescriptor);
        connection->socketDescriptor = -1;
        close(connection->socket);
        connection->socket = -1;
    }
}

+4 −4
Original line number Diff line number Diff line
@@ -155,8 +155,8 @@ int KineticLogger_ByteArraySliceToCString(char* p_buf,
}

#define BYTES_TO_CSTRING(_buf_start, _array, _array_start, _count) { \
    char* p_buf = (_buf_start); \
    KineticLogger_ByteArraySliceToCString(p_buf, (_array), 0, _array.len); \
    ByteArray key = {.data = _array.data, .len = _array.len}; \
    KineticLogger_ByteArraySliceToCString((char*)(_buf_start), key, 0, key.len); \
}

void KineticLogger_LogProtobuf(const KineticProto* proto)
@@ -240,8 +240,8 @@ void KineticLogger_LogProtobuf(const KineticProto* proto)
                        if (proto->command->body->keyValue->has_key)
                        {
                            BYTES_TO_CSTRING(tmpBuf,
                                proto->command->body->keyValue->key,
                                0, proto->command->body->keyValue->key.len);
                                proto->command->body->keyValue->key, 0,
                                proto->command->body->keyValue->key.len);
                            LOGF("%skey: '%s'", _indent, tmpBuf);
                        }
                        if (proto->command->body->keyValue->has_newVersion)
+11 −5
Original line number Diff line number Diff line
@@ -29,7 +29,8 @@ void KineticMessage_Init(KineticMessage* const message)
// e.g. CONFIG_FIELD_BYTE_ARRAY(key, message->keyValue, metadata)
#define CONFIG_FIELD_BYTE_ARRAY(_name, _field, _config) { \
    if (_config->_name.data != NULL && _config->_name.len > 0) { \
        _field._name = _config->_name; \
        _field._name.data = _config->_name.data; \
        _field._name.len = _config->_name.len; \
        _field.has_ ## _name = true; \
    } \
}
@@ -58,9 +59,14 @@ void KineticMessage_ConfigureKeyValue(KineticMessage* const message,
    CONFIG_FIELD_BYTE_ARRAY(dbVersion, message->keyValue, metadata);
    CONFIG_FIELD_BYTE_ARRAY(tag, message->keyValue, metadata);
    message->keyValue.has_algorithm = (bool)((int)metadata->algorithm > 0);
    if (message->keyValue.has_algorithm) {
        message->keyValue.algorithm = metadata->algorithm; }
    if (message->keyValue.has_algorithm)
    {
        message->keyValue.algorithm = (KineticProto_Algorithm)
            metadata->algorithm;
    }
    message->keyValue.has_metadataOnly = metadata->metadataOnly;
    if (message->keyValue.has_metadataOnly) {
        message->keyValue.metadataOnly = metadata->metadataOnly; }
    if (message->keyValue.has_metadataOnly)
    {
        message->keyValue.metadataOnly = metadata->metadataOnly;
    }
}
Loading