Commit 05b294de authored by Greg Williams's avatar Greg Williams
Browse files

Refactored public API and public/private types.

Added new public KineticSession type and corresponding allocation and initialization of connection instances (sessions).
parent 7c2644f1
Loading
Loading
Loading
Loading
+20 −59
Original line number Diff line number Diff line
@@ -24,63 +24,37 @@
#include "kinetic_types.h"

/**
 * Initializes the Kinetic API andcsonfigures logging destination.
 *
 * @param logFile Path to log file. Specify NULL to log to STDOUT.
 */
void KineticClient_Init(const char* logFile);

/**
 * @brief Configures the session and establishes a socket connection to a Kinetic Device
 *
 * @param connection        KineticConnection instance to configure with connection info
 * @param host              Host name or IP address to connect to
 * @param port              Port to establish socket connection on
 * @param nonBlocking       Set to true for non-blocking or false for blocking I/O
 * @param clusterVersion    Cluster version to use for the session
 * @param identity          Identity to use for the session
 * @param hmacKey           Key to use for HMAC calculations
 *
 * @return                  Returns true if connection succeeded
 * Initializes the Kinetic API, configures logging destination, establishes a
 * connection to the specified Kinetic Device, and establishes a session.
 *
 * @session Session instance to configure with connection info
 *  .host           Host name or IP address to connect to
 *  .port           Port to establish socket connection on
 *  .nonBlocking    Set to true for non-blocking or false for blocking I/O
 *  .clusterVersion Cluster version to use for the session
 *  .identity       Identity to use for the session
 *  .hmacKey        Key to use for HMAC calculations (NULL-terminated string)
 *  .logFile        Path to log file. Defaults to STDOUT if unspecified
 *
 * @return          Returns 0 upon succes, -1 or Kinetic status upon failure
 */
bool KineticClient_Connect(KineticConnection* connection,
    const char* host,
    int port,
    bool nonBlocking,
    int64_t clusterVersion,
    int64_t identity,
    ByteArray hmacKey);
int KineticClient_Connect(KineticSession* session);

/**
 * @brief Closes the socket connection to a host.
 *
 * @param connection    KineticConnection instance
 */
void KineticClient_Disconnect(KineticConnection* connection);

/**
 * @brief Creates and initializes a Kinetic operation.
 *
 * @param connection    KineticConnection instance to associate with operation
 * @param request       KineticPDU instance to use for request
 * @param response      KineticPDU instance to use for reponse
 * @brief Closes the connection to a host.
 *
 * @return              Returns a configured operation instance
 * @param session   KineticSession instance to terminate
 */
KineticOperation KineticClient_CreateOperation(
    KineticConnection* connection,
    KineticPDU* request,
    KineticPDU* response);
void KineticClient_Disconnect(KineticSession* session);

/**
 * @brief Executes a NOOP command to test whether the Kinetic Device is operational
 *
 * @param operation     KineticOperation instance to use for the operation
 *
 * @return              Returns 0 upon succes, -1 or the Kinetic status code
 *                      upon failure
 * @return              Returns 0 upon succes, -1 or the Kinetic status code upon failure
 */
KineticStatus KineticClient_NoOp(KineticOperation* operation);
Kinetic_Status KineticClient_NoOp(KineticSession* session);

/**
 * @brief Executes a PUT command to store/update an entry on the Kinetic Device
@@ -92,21 +66,8 @@ KineticStatus KineticClient_NoOp(KineticOperation* operation);
 * @return              Returns 0 upon succes, -1 or the Kinetic status code
 *                      upon failure
 */
KineticStatus KineticClient_Put(KineticOperation* operation,
    const KineticKeyValue* metadata);

/**
 * @brief Executes a GET command to retrieve and entry from the Kinetic Device
 *
 * @param operation     KineticOperation instance to use for the operation
 * @param metadata      Key/value metadata for object to retrieve. 'value' will
 *                      be populated unless 'metadataOnly' is set to 'true'
 *
 * @return              Returns 0 upon succes, -1 or the Kinetic status code
 *                      upon failure
 */
KineticStatus KineticClient_Get(KineticOperation* operation,
    KineticKeyValue* metadata);
Kinetic_Status KineticClient_Put(KineticSession* session,
    const Kinetic_KeyValue* metadata);

/**
 * @brief Executes a DELETE command to delete an entry from the Kinetic Device
@@ -118,7 +79,7 @@ KineticStatus KineticClient_Get(KineticOperation* operation,
 * @return              Returns 0 upon succes, -1 or the Kinetic status code
 *                      upon failure
 */
KineticStatus KineticClient_Delete(KineticOperation* operation,
KineticStatus KineticClient_Delete(KineticSession* session,
    KineticKeyValue* metadata);

#endif // _KINETIC_CLIENT_H
+29 −188
Original line number Diff line number Diff line
@@ -32,10 +32,9 @@
#include <stdio.h>
#include <assert.h>
#include <limits.h>
// #include <netinet/in.h>
// #include <ifaddrs.h>

#include <netinet/in.h>
#include <ifaddrs.h>
#include <openssl/sha.h>

#define KINETIC_PORT            8123
#define KINETIC_TLS_PORT        8443
@@ -43,15 +42,6 @@
#define KINETIC_HMAC_MAX_LEN    (KINETIC_HMAC_SHA1_LEN)
#define KINETIC_MAX_KEY_LEN     128

// Ensure __func__ is defined (for debugging)
#if __STDC_VERSION__ < 199901L
    #if __GNUC__ >= 2
        #define __func__ __FUNCTION__
    #else
        #define __func__ "<unknown>"
    #endif
#endif

// Define max host name length
// Some Linux environments require this, although not all, but it's benign.
#ifndef _BSD_SOURCE
@@ -63,7 +53,6 @@
    #define HOST_NAME_MAX 256
#endif // HOST_NAME_MAX

#include "kinetic_proto.h"
#include <time.h>

typedef ProtobufCBinaryData ByteArray;
@@ -84,16 +73,6 @@ typedef ProtobufCBinaryData ByteArray;
#define BYTE_ARRAY_FILL_WITH_DUMMY_DATA(_array) \
    {size_t i=0; for(;i<(_array).len;++i){(_array).data[i] = (uint8_t)(i & 0xFFu);} }


// // Structure for defining a custom memory allocator.
// typedef struct 
// {
//     void        *(*alloc)(void *allocator_data, size_t size);
//     void        (*free)(void *allocator_data, void *pointer);
//     // Opaque pointer passed to `alloc` and `free` functions
//     void        *allocator_data;
// } ProtobufCAllocator;

typedef struct
{
    ByteArray   buffer;
@@ -104,103 +83,49 @@ typedef struct
    .maxLen = sizeof(_buf) }


// Kinetic Device Client Connection
typedef struct _KineticConnection
typedef enum _KineticAlgorithm {
    KINETIC_ALGORITHM_INVALID = -1,
    KINETIC_ALGORITHM_SHA1 = 1,
    KINETIC_ALGORITHM_SHA2 = 2,
    KINETIC_ALGORITHM_SHA3 = 3,
    KINETIC_ALGORITHM_CRC32 = 4,
    KINETIC_ALGORITHM_CRC64 = 5
} KineticAlgorithm;


typedef enum _KineticSynchronization {
  KINETIC_SYNCHRONIZATION_INVALID = -1,
  KINETIC_SYNCHRONIZATION_WRITETHROUGH = 1,
  KINETIC_SYNCHRONIZATION_WRITEBACK = 2,
  KINETIC_SYNCHRONIZATION_FLUSH = 3
} KineticSynchronization;


// Kinetic session
#define SESSION_HANDLE_INVALID (0)
typedef struct _KineticSession
{
    bool    connected;
    int     handle;
    bool    nonBlocking;
    int     port;
    int     socketDescriptor;
    int64_t connectionID;
    char    host[HOST_NAME_MAX];
    char    logFile[LOG_FILE_NAME_MAX],

    // Optional field - default value is 0
    // The version number of this cluster definition. If this is not equal to
    // the value on the device, the request is rejected and will return a
    // `VERSION_FAILURE` `statusCode` in the `Status` message.
    int64_t clusterVersion;

    // Required field
    // The identity associated with this request. See the ACL discussion above.
    // The Kinetic Device will use this identity value to lookup the
    // HMAC key (shared secret) to verify the HMAC.
    int64_t identity;

    // Required field
    // This is the identity's HMAC Key. This is a shared secret between the
    // client and the device, used to sign requests.
    uint8_t keyData[KINETIC_MAX_KEY_LEN];
    ByteArray key;

    // Required field
    // A monotonically increasing number for each request in a TCP connection.
    int64_t sequence;
} KineticConnection;
#define KINETIC_CONNECTION_INIT(_con, _id, _key) { \
    (*_con) = (KineticConnection) { \
        .socketDescriptor = -1, \
        .connectionID = time(NULL), \
        .identity = (_id), \
        .sequence = 0, \
    }; \
    (*_con).key = (ByteArray){.data = (*_con).keyData, .len = (_key).len}; \
    if ((_key).data != NULL && (_key).len > 0) { \
        memcpy((_con)->keyData, (_key).data, (_key).len); } \
}


// Kinetic Message HMAC
typedef struct _KineticHMAC
{
    KineticProto_Security_ACL_HMACAlgorithm algorithm;
    uint32_t len;
    uint8_t data[KINETIC_HMAC_MAX_LEN];
} KineticHMAC;


// Kinetic Device Message Request
typedef struct _KineticMessage
{
    // Kinetic Protocol Buffer Elements
    KineticProto                proto;
    KineticProto_Command        command;
    KineticProto_Header         header;
    KineticProto_Body           body;
    KineticProto_Status         status;
    KineticProto_Security       security;
    KineticProto_Security_ACL   acl;
    KineticProto_KeyValue       keyValue;
    uint8_t                     hmacData[KINETIC_HMAC_MAX_LEN];
} KineticMessage;
#define KINETIC_MESSAGE_HEADER_INIT(_hdr, _con) { \
    assert((void *)(_hdr) != NULL); \
    assert((void *)(_con) != NULL); \
    *(_hdr) = (KineticProto_Header) { \
        .base = PROTOBUF_C_MESSAGE_INIT(&KineticProto_header__descriptor), \
        .has_clusterVersion = true, \
        .clusterVersion = (_con)->clusterVersion, \
        .has_identity = true, \
        .identity = (_con)->identity, \
        .has_connectionID = true, \
        .connectionID = (_con)->connectionID, \
        .has_sequence = true, \
        .sequence = (_con)->sequence, \
    }; \
}
#define KINETIC_MESSAGE_INIT(msg) { \
    KineticProto__init(&(msg)->proto); \
    KineticProto_command__init(&(msg)->command); \
    KineticProto_header__init(&(msg)->header); \
    KineticProto_status__init(&(msg)->status); \
    KineticProto_body__init(&(msg)->body); \
    KineticProto_key_value__init(&(msg)->keyValue); \
    memset((msg)->hmacData, 0, SHA_DIGEST_LENGTH); \
    (msg)->proto.hmac.data = (msg)->hmacData; \
    (msg)->proto.hmac.len = KINETIC_HMAC_MAX_LEN; \
    (msg)->proto.has_hmac = true; \
    (msg)->command.header = &(msg)->header; \
    (msg)->proto.command = &(msg)->command; \
}
    ByteArray hmacKey;
} KineticSession;


// Kinetic Status Codes
@@ -227,95 +152,11 @@ typedef struct _KineticKeyValue
    ByteArray dbVersion;
    ByteArray tag;
    bool force;
    KineticProto_Algorithm algorithm;
    KineticAlgorithm algorithm;
    bool metadataOnly;
    KineticProto_Synchronization synchronization;
    KineticSynchronization synchronization;
    ByteArray value;
} KineticKeyValue;


// Kinetic PDU Header
#define PDU_HEADER_LEN              (1 + (2 * sizeof(int32_t)))
#define PDU_PROTO_MAX_LEN           (1024 * 1024)
#define PDU_PROTO_MAX_UNPACKED_LEN  (PDU_PROTO_MAX_LEN * 2)
#define PDU_VALUE_MAX_LEN           (1024 * 1024)
#define PDU_MAX_LEN                 (PDU_HEADER_LEN + \
                                    PDU_PROTO_MAX_LEN + PDU_VALUE_MAX_LEN)
typedef struct __attribute__ ((__packed__)) _KineticPDUHeader
{
    uint8_t     versionPrefix;
    uint32_t    protobufLength;
    uint32_t    valueLength;
} KineticPDUHeader;
#define KINETIC_PDU_HEADER_INIT \
    (KineticPDUHeader) {.versionPrefix = 'F'}


// Kinetic PDU
typedef struct _KineticPDU
{
    // Binary PDU header
    KineticPDUHeader header;    // Header struct in native byte order
    KineticPDUHeader headerNBO; // Header struct in network-byte-order

    // Message associated with this PDU instance
    union {
        KineticProto protoBase;

        // Pre-structured message w/command
        KineticMessage message;

        // Pad protobuf to max size for extraction of arbitrary packed proto
        uint8_t buffer[PDU_PROTO_MAX_UNPACKED_LEN];
    } protoData;        // Proto will always be first
    KineticProto* proto;
    // bool rawProtoEnabled;
    uint8_t protobufRaw[PDU_PROTO_MAX_LEN];

    // Object meta-data to be used/populated if provided and pertinent to the opearation
    KineticKeyValue* metadata;

    // Value data associated with PDU (if any)
    uint8_t valueBuffer[PDU_VALUE_MAX_LEN];
    ByteArray value;

    // Embedded HMAC instance
    KineticHMAC hmac;

    // Exchange associated with this PDU instance (info gets embedded in protobuf message)
    KineticConnection* connection;
} KineticPDU;

#define KINETIC_PDU_INIT(_pdu, _con) { \
    assert((void *)(_pdu) != NULL); \
    assert((void *)(_con) != NULL); \
    (_pdu)->connection = (_con); \
    (_pdu)->header = KINETIC_PDU_HEADER_INIT; \
    (_pdu)->headerNBO = KINETIC_PDU_HEADER_INIT; \
    (_pdu)->value = BYTE_ARRAY_NONE; \
    (_pdu)->proto = &(_pdu)->protoData.message.proto; \
    KINETIC_MESSAGE_HEADER_INIT(&((_pdu)->protoData.message.header), (_con)); \
}
#define KINETIC_PDU_INIT_WITH_MESSAGE(_pdu, _con) { \
    KINETIC_PDU_INIT((_pdu), (_con)) \
    KINETIC_MESSAGE_INIT(&((_pdu)->protoData.message)); \
    (_pdu)->proto->command->header = &(_pdu)->protoData.message.header; \
    KINETIC_MESSAGE_HEADER_INIT(&(_pdu)->protoData.message.header, (_con)); \
}

// Kinetic Operation
typedef struct _KineticOperation
{
    KineticConnection* connection;
    KineticPDU* request;
    KineticPDU* response;
} KineticOperation;
#define KINETIC_OPERATION_INIT(_op, _con, _req, _resp) \
*(_op) = (KineticOperation) { \
    .connection = (_con), \
    .request = (_req), \
    .response = (_resp), \
}


#endif // _KINETIC_TYPES_H
+29 −42
Original line number Diff line number Diff line
@@ -28,43 +28,50 @@
#include "kinetic_logger.h"
#include <stdio.h>

KineticStatus KineticClient_ExecuteOperation(KineticOperation* operation);
static KineticStatus KineticClient_ExecuteOperation(KineticOperation* operation)
{
    KineticStatus status = KINETIC_STATUS_INVALID;

void KineticClient_Init(const char* logFile)
    // Send the request
    if (KineticPDU_Send(operation->request))
    {
        // Associate response with same exchange as request
        operation->response->connection = operation->request->connection;

        // Receive the response
        if (KineticPDU_Receive(operation->response))
        {
    KineticLogger_Init(logFile);
            status = KineticOperation_GetStatus(operation);
        }
    }

bool KineticClient_Connect(KineticConnection* connection,
    const char* host,
    int port,
    bool nonBlocking,
    int64_t clusterVersion,
    int64_t identity,
    ByteArray hmacKey)
    return status;
}

int KineticClient_Connect(KineticSession* session)
{
    if (connection == NULL)
    if (session == NULL)
    {
        LOG("Specified KineticConnection is NULL!");
        return false;
        LOG("Specified KineticSession is NULL!");
        return -1;
    }

    if (host == NULL)
    if (strlen(session.host) == 0)
    {
        LOG("Specified host is NULL!");
        return false;
        LOG("Session host is empty!");
        return -1;
    }

    if (hmacKey.len < 1)
    if (session.hmacKey.len < 1)
    {
        LOG("Specified HMAC key is empty!");
        return false;
        return -1;
    }

    if (hmacKey.data == NULL)
    if (session.hmacKey.data == NULL)
    {
        LOG("Specified HMAC key is NULL!");
        return false;
        return -1;
    }

    if (!KineticConnection_Connect(connection, host, port, nonBlocking,
@@ -75,7 +82,7 @@ bool KineticClient_Connect(KineticConnection* connection,
        char message[64];
        sprintf(message, "Failed creating connection to %s:%d", host, port);
        LOG(message);
        return false;
        return -1;
    }

    connection->connected = true;
@@ -216,23 +223,3 @@ KineticStatus KineticClient_Delete(KineticOperation* operation,
    return status;
}
KineticStatus KineticClient_ExecuteOperation(KineticOperation* operation)
{
    KineticStatus status = KINETIC_STATUS_INVALID;

    // Send the request
    if (KineticPDU_Send(operation->request))
    {
        // Associate response with same exchange as request
        operation->response->connection = operation->request->connection;

        // Receive the response
        if (KineticPDU_Receive(operation->response))
        {
            status = KineticOperation_GetStatus(operation);
        }
    }

    return status;
}
+37 −3
Original line number Diff line number Diff line
@@ -22,9 +22,43 @@
#include "kinetic_socket.h"
#include <string.h>

bool KineticConnection_Connect(KineticConnection* const connection,
    const char* host, int port, bool nonBlocking,
    int64_t clusterVersion, int64_t identity, const ByteArray key)

static KineticConnection ConnectionInstances[KINETIC_SESSIONS_MAX];
static KineticConnection* Connections[KINETIC_SESSIONS_MAX];


KineticConnection* KineticConnection_NewConnection(KineticSession* session)
{
    assert(session);
    session->handle = KINETIC_SESSION_INVALID;
    for (int handle = 1; handle <= KINETIC_SESSIONS_MAX; handle++)
    {
        if (Connections[handle-1] == NULL)
        {
            Connections[handle-1] = &ConnectionInstances[handle-1];
            session->handle = handle;
            *Connections[handle-1] = (KineticConnection){.session = session};
            return Connections[handle-1];
        }
    }
    return NULL;
}

void KineticConnection_FreeConnection(KineticSession* session)
{
    assert(session);
    assert(session->handle > KINETIC_SESSION_INVALID);
    assert(session->handle <= KINETIC_SESSIONS_MAX);
    if (Connections[session->handle-1] != NULL)
    {
        *Connections[handle-1] = {.session = KINETIC_SESSION_INVALID};
        Connections[handle-1] = NULL;
    }
    session->handle = KINETIC_SESSION_INVALID;
}


bool KineticConnection_Connect(KineticConnection* const connection)
{
    connection->connected = false;
    connection->nonBlocking = nonBlocking;
+3 −9
Original line number Diff line number Diff line
@@ -23,16 +23,10 @@

#include "kinetic_types_internal.h"

bool KineticConnection_Connect(KineticConnection* connection,
    const char* host,
    int port,
    bool nonBlocking,
    int64_t clusterVersion,
    int64_t identity,
    const ByteArray key);

KineticConnection* KineticConnection_NewConnection(KineticSession* session);
void KineticConnection_FreeConnection(KineticSession* session);
bool KineticConnection_Connect(KineticConnection* const connection);
void KineticConnection_Disconnect(KineticConnection* const connection);

void KineticConnection_IncrementSequence(KineticConnection* const connection);

#endif // _KINETIC_CONNECTION_H
Loading