Commit 1696c261 authored by Greg Williams's avatar Greg Williams
Browse files

Added kinetic_allocator to manage dynamically allocated PDUs, and have nearly completed updates.

parent 7224c8a1
Loading
Loading
Loading
Loading
+37 −32
Original line number Diff line number Diff line
@@ -24,10 +24,10 @@
#include "kinetic_types.h"

/**
 * Initializes the Kinetic API, configures logging destination, establishes a
 * @brief Initializes the Kinetic API, configures logging destination, establishes a
 * connection to the specified Kinetic Device, and establishes a session.
 *
 * @session Session instance to configure with connection info
 * @config          Session configuration
 *  .host               Host name or IP address to connect to
 *  .port               Port to establish socket connection on
 *  .nonBlocking        Set to true for non-blocking or false for blocking I/O
@@ -35,60 +35,65 @@
 *  .identity           Identity to use for the session
 *  .hmacKey            Key to use for HMAC calculations (NULL-terminated string)
 *  .logFile            Path to log file. Defaults to STDOUT if unspecified
 * @handle          Pointer to KineticSessionHandle (populated upon successful connection)
 *
 * @return          Returns the resulting KineticStatus
 */
KineticStatus KineticClient_Connect(const KineticSession* session);
KineticStatus KineticClient_Connect(const KineticSession* config,
    KineticSessionHandle* const handle);

/**
 * @brief Closes the connection to a host.
 *
 * @param session   KineticSession instance
 * @param handle    KineticSessionHandle for a connected session.
 *
 * @return          Returns the resulting KineticStatus
 */
KineticStatus KineticClient_Disconnect(const KineticSession* session);
KineticStatus KineticClient_Disconnect(KineticSessionHandle* const handle);

/**
 * @brief Executes a NOOP command to test whether the Kinetic Device is operational
 * @brief Executes a NOOP command to test whether the Kinetic Device is operational.
 *
 * @param operation     KineticOperation to use for the operation.
 *                      If operation is already allocated, it will be reused
 *                      If operation is empty, one will be allocated, if available.
 * @param handle        KineticSessionHandle for a connected session.
 *
 * @return              Returns the resulting KineticStatus
 */
KineticStatus KineticClient_NoOp(const KineticSession* session,
    const KineticOperation* operation);
KineticStatus KineticClient_NoOp(KineticSessionHandle session);

/**
 * @brief Executes a PUT command to store/update an entry on the Kinetic Device
 * @brief Executes a PUT command to store/update an entry on the Kinetic Device.
 *
 * @param operation     KineticOperation to use for the operation.
 *                      If operation is already allocated, it will be reused
 *                      If operation is empty, one will be allocated, if available.
 * @param handle        KineticSessionHandle for a connected session.
 * @param metadata      Key/value metadata for object to store. 'value' must
 *                      specify the data to be stored.
 *
 * @return              Returns the resulting KineticStatus
 */
KineticStatus KineticClient_Put(KineticSession* session,
    const KineticOperation* operation,
    const KineticKeyValue* metadata);
KineticStatus KineticClient_Put(KineticSessionHandle session,
    KineticKeyValue* const metadata);

/**
 * @brief Executes a GET command to retrieve and entry from the Kinetic Device.
 *
 * @param handle        KineticSessionHandle for a connected session.
 * @param metadata      Key/value metadata for object to retrieve. 'value' will
 *                      be populated unless 'metadataOnly' is set to 'true'
 *
 * @return              Returns the resulting KineticStatus
 */
KineticStatus KineticClient_Get(KineticSessionHandle session,
    KineticKeyValue* const metadata);

/**
 * @brief Executes a DELETE command to delete an entry from the Kinetic Device
 *
 * @param operation     KineticOperation to use for the operation.
 *                      If operation is already allocated, it will be reused
 *                      If operation is empty, one will be allocated, if available.
 * @param handle        KineticSessionHandle for a connected session.
 * @param metadata      Key/value metadata for object to delete. 'value' is
 *                      not used for this operation.
 *
 * @return              Returns the resulting KineticStatus
 */
KineticStatus KineticClient_Delete(const KineticSession* session,
    const KineticOperation* operation,
    const KineticKeyValue* metadata);
KineticStatus KineticClient_Delete(KineticSessionHandle session,
    KineticKeyValue* const metadata);

#endif // _KINETIC_CLIENT_H
+52 −33
Original line number Diff line number Diff line
@@ -32,15 +32,18 @@
#include <stdio.h>
#include <assert.h>
#include <limits.h>

// #include <netinet/in.h>
// #include <ifaddrs.h>


#define KINETIC_PORT            8123
#define KINETIC_TLS_PORT        8443
#define KINETIC_HANDLE_INVALID  (0)
#define KINETIC_PORT            (8123)
#define KINETIC_TLS_PORT        (8443)
#define KINETIC_HMAC_SHA1_LEN   (SHA_DIGEST_LENGTH)
#define KINETIC_HMAC_MAX_LEN    (KINETIC_HMAC_SHA1_LEN)
#define KINETIC_MAX_KEY_LEN     128
#define KINETIC_MAX_KEY_LEN     (128)
#define PDU_VALUE_MAX_LEN       (1024 * 1024)

// Define max host name length
// Some Linux environments require this, although not all, but it's benign.
@@ -57,8 +60,6 @@
#define LOG_FILE_NAME_MAX (HOST_NAME_MAX)
#endif

#include <time.h>

/**
 * @brief Structure for handling generic arrays of bytes
 *
@@ -88,6 +89,13 @@ typedef struct _ByteArray {
#define BYTE_ARRAY_FILL_WITH_DUMMY_DATA(_array) \
    {size_t i=0; for(;i<(_array).len;++i){(_array).data[i] = (uint8_t)(i & 0xFFu);} }


/**
 * @brief Structure for an embedded ByteArray as a buffer
 *
 * The `bytesUsed` field is initialized to zero, and is to incremented as each
 * byte is consumed, but shall not exceed the `array` length
 */
typedef struct
{
    ByteArray   array;
@@ -100,17 +108,22 @@ typedef struct
    .bytesUsed = 0, \
}


/**
 * @brief Enumeration of encryption/checksum key algorithms
 */
typedef enum _KineticAlgorithm {
    KINETIC_ALGORITHM_INVALID = -1,
    KINETIC_ALGORITHM_SHA1 = 1,
    KINETIC_ALGORITHM_SHA2 = 2,
    KINETIC_ALGORITHM_SHA3 = 3,
    KINETIC_ALGORITHM_CRC32 = 4,
    KINETIC_ALGORITHM_CRC64 = 5
    KINETIC_ALGORITHM_SHA1 = 2,
    KINETIC_ALGORITHM_SHA2,
    KINETIC_ALGORITHM_SHA3,
    KINETIC_ALGORITHM_CRC32,
    KINETIC_ALGORITHM_CRC64
} KineticAlgorithm;


/**
 * @brief Enumeration of synchronization types for an operation.
 */
typedef enum _KineticSynchronization {
  KINETIC_SYNCHRONIZATION_INVALID = -1,
  KINETIC_SYNCHRONIZATION_WRITETHROUGH = 1,
@@ -119,21 +132,25 @@ typedef enum _KineticSynchronization {
} KineticSynchronization;


// Kinetic session
#define SESSION_HANDLE_INVALID (0)
/**
 * @brief Handle for a session instance
 */
typedef int KineticSessionHandle;


/**
 * @brief Structure used to specify the configuration of a session.
 */
typedef struct _KineticSession
{
    // Log file name (uses stdout if empty)
    char    logFile[LOG_FILE_NAME_MAX];

    // Set to true to enable non-blocking/asynchronous I/O
    bool    nonBlocking;
    // Host name/IP address of Kinetic Device
    char    host[HOST_NAME_MAX];

    // Port for Kinetic Device session
    int     port;

    // Host name/IP address of Kinetic Device
    char    host[HOST_NAME_MAX];
    // Set to true to enable non-blocking/asynchronous I/O
    bool    nonBlocking;

    // The version number of this cluster definition. If this is not equal to
    // the value on the Kinetic Device, the request is rejected and will return
@@ -150,9 +167,10 @@ typedef struct _KineticSession
    uint8_t keyData[KINETIC_MAX_KEY_LEN];
    ByteArray hmacKey;

    // Session instance handle (0 = none/invalid session)
    int     handle;
    // Log file name (uses stdout if empty)
    char    logFile[LOG_FILE_NAME_MAX];
} KineticSession;

#define KINETIC_SESSION_INIT(_session, \
    _host, _clusterVersion, _identity, _hmacKey) { \
    *(_session) = (KineticSession) { \
@@ -162,18 +180,12 @@ typedef struct _KineticSession
        .identity = (_identity), \
        .hmacKey = {.data = (_session)->keyData, .len = (_hmacKey).len}, \
    }; \
    strcpy((_session)->host, (_host)); \
    memcpy((_session)->hmacKey.data, (_hmacKey).data, (_hmacKey).len); \
}


// Kinetic Operation
typedef struct _KineticOperation
{
    KineticConnection* session; // Associated KineticSession
    int requestHandle;          // Handle to allocated request
    int responseHandle;         // Handle to allocated response
} KineticOperation;

// Operation handle
typedef int KineticOperationHandle;

// Kinetic Status Codes
typedef enum
@@ -184,7 +196,7 @@ typedef enum
    KINETIC_STATUS_SESSION_INVALID,     // Session configuration was invalid or NULL
    KINETIC_STATUS_HOST_EMPTY,          // Host was empty in request
    KINETIC_STATUS_HMAC_EMPTY,          // HMAC key is empty or NULL
    KINETIC_STATUS_NO_PDUS_AVAVILABLE   // All PDUs for the session have been allocated
    KINETIC_STATUS_NO_PDUS_AVAVILABLE,  // All PDUs for the session have been allocated
    KINETIC_STATUS_DEVICE_BUSY,         // Device busy (retry later)
    KINETIC_STATUS_CONNECTION_ERROR,    // No connection/disconnected
    KINETIC_STATUS_INVALID_REQUEST,     // Something about the request is invalid
@@ -192,8 +204,8 @@ typedef enum
    KINETIC_STATUS_OPERATION_FAILED,    // Device reported an operation error
    KINETIC_STATUS_VERSION_FAILURE,     // Basically a VERSION_MISMATCH error for a PUT
    KINETIC_STATUS_DATA_ERROR,          // Device reported data error, no space or HMAC failure
    KINETIC_STATUS_COUNT                // Number of status codes in KineticStatusDescriptor
} KineticStatus;
extern const int KineticStatusDescriptorCount;
extern const char* KineticStatusDescriptor[];


@@ -211,5 +223,12 @@ typedef struct _KineticKeyValue
    ByteArray value;
} KineticKeyValue;

// Expose normally private data for test builds to allow inspection
#ifdef TEST
#define STATIC
#else
#define STATIC static
#endif


#endif // _KINETIC_TYPES_H
+169 −0
Original line number Diff line number Diff line
/*
* kinetic-c
* Copyright (C) 2014 Seagate Technology.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
*
*/

#include "kinetic_allocator.h"
#include "kinetic_logger.h"
#include <stdlib.h>

STATIC KineticPDUListItem* PDUList = NULL;
STATIC KineticPDUListItem* PDUListLast = NULL;

KineticPDU* KineticAllocator_NewPDU(void)
{
    KineticPDUListItem* newItem = (KineticPDUListItem*)malloc(sizeof(KineticPDUListItem));
    newItem->next = NULL;
    newItem->previous = NULL;

    if (PDUList == NULL)
    {
        PDUList = newItem;
    }
    else
    {
        newItem->previous = PDUListLast;
        PDUListLast->next = newItem;
    }
    PDUListLast = newItem;

    LOGF("Allocated new PDU list item @ 0x%0llX w/PDU @ 0x%0llX",
        (long long)newItem, (long long)&newItem->pdu);

    return &newItem->pdu;
}

void KineticAllocator_FreePDU(KineticPDU** pdu)
{
    KineticPDUListItem* cur = PDUList;
    while (&cur->pdu != *pdu)
    {
        if (cur->next == NULL)
        {
            LOG("  Reached end of list before finding PDU to free!");
            return;
        }
        else
        {
            LOG("  next..");
            cur = cur->next;
        }
    }
    LOG("  Done searching for PDU list item");

    if ((cur != NULL) && (&cur->pdu == *pdu))
    {
        LOG("  PDU found! freeing it.");
        // Handle PDU list emptied
        if (cur->previous == NULL)
        {
            LOG("  At start of list.");
            if (cur->next == NULL)
            {
                LOG("  Making it empty, since all deallocated!");
                PDUList = NULL;
                PDUListLast = NULL;
            }
            else
            {
                LOG("  Moving current item to head, since head deallocated!");
                PDUList = cur->next;
                PDUList->previous = NULL;
            }
        }
        else
        {
            // Relink from previous to next, if avaliable
            LOG("  Not at list start, so relinking list to free PDU.");
            if (cur->previous->next != NULL)
            {
                LOG("  Relinking previous to next");
                if (cur->next != NULL)
                {
                    LOG("    next being reset!");
                    cur->previous->next = cur->next;
                }
                else
                {
                    PDUListLast = cur->previous;
                    PDUListLast->next = NULL;
                    LOGF("    next is NULL. End of list now @ 0x%0llX",
                        (long long)PDUListLast);
                }
            }
            else
            {
                LOG("  This shouldn't happen!");
                PDUListLast = cur->previous;
            }
        }

        LOGF("  Freeing item @ 0x%0llX, pdu @ 0x%0llX",
            (long long)cur, (long long)&cur->pdu);
        free(cur);
        cur = NULL;
    }

    *pdu = NULL;
}

void KineticAllocator_FreeAllPDUs(void)
{
    LOG_LOCATION;
    if (PDUList != NULL)
    {
        LOG("Freeing list of PDUs...");
        KineticPDUListItem* current = PDUList;

        while (current->next != NULL)
        {
            LOG("Advancing to next list item...");
            current = current->next;
        }

        while (current != NULL)
        {
            LOG("  Current item not freed!");
            LOGF("  DEALLOCATING item: 0x%0llX, pdu: 0x%llX, prev: 0x%0llX",
                (long long)current, (long long)&current->pdu, (long long)current->previous);
            KineticPDUListItem* curItem = current;
            KineticPDUListItem* prevItem = current->previous;
            if (curItem != NULL)
            {
                LOG("  Freeing list item");
                free(curItem); 
            }
            current = prevItem;
            LOGF("  on to prev=0x%llX", (long long)current);
        }
    }
    else
    {
        LOG("  Nothing to free!");
    }
    PDUList = NULL;
    PDUListLast = NULL;
}

bool KineticAllocator_ValidateAllMemoryFreed(void)
{
    bool empty = (PDUList == NULL);
    LOG_LOCATION; LOGF("  PDUList: 0x%0llX, empty=%s",
        (long long)PDUList, empty ? "true" : "false");
    return empty;
}
+31 −0
Original line number Diff line number Diff line
/*
* kinetic-c
* Copyright (C) 2014 Seagate Technology.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
*
*/

#ifndef _KINETIC_ALLOCATOR_H
#define _KINETIC_ALLOCATOR_H

#include "kinetic_types_internal.h"

KineticPDU* KineticAllocator_NewPDU(void);
void KineticAllocator_FreePDU(KineticPDU** pdu);
void KineticAllocator_FreeAllPDUs(void);
bool KineticAllocator_ValidateAllMemoryFreed(void);

#endif // _KINETIC_ALLOCATOR
+46 −72
Original line number Diff line number Diff line
@@ -26,43 +26,7 @@
#include "kinetic_message.h"
#include "kinetic_pdu.h"
#include "kinetic_logger.h"

static KineticOperation KineticClient_CreateOperation(
    KineticConnection* connection,
    KineticPDU* request,
    KineticPDU* response)
{
    KineticOperation op;

    if (connection == NULL)
    {
        LOG("Specified KineticConnection is NULL!");
        assert(connection != NULL);
    }

    if (request == NULL)
    {
        LOG("Specified KineticPDU request is NULL!");
        assert(request != NULL);
    }

    if (response == NULL)
    {
        LOG("Specified KineticPDU response is NULL!");
        assert(response != NULL);
    }

    KineticPDU_Init(request, connection);
    KINETIC_PDU_INIT_WITH_MESSAGE(request, connection);
    KineticPDU_Init(response, connection);

    op.connection = connection;
    op.request = request;
    op.request->proto = &op.request->protoData.message.proto;
    op.response = response;

    return op;
}
#include <stdlib.h>

static KineticStatus KineticClient_ExecuteOperation(KineticOperation* operation)
{
@@ -84,89 +48,98 @@ static KineticStatus KineticClient_ExecuteOperation(KineticOperation* operation)
    return status;
}

KineticStatus KineticClient_Connect(KineticSession* session)
KineticStatus KineticClient_Connect(const KineticSession* config,
    KineticSessionHandle* const handle)
{
    if (session == NULL)
    if (config == NULL)
    {
        LOG("Specified KineticSession is NULL!");
        LOG("KineticSession is NULL!");
        return KINETIC_STATUS_SESSION_EMPTY;
    }

    if (strlen(session->host) == 0)
    if (strlen(config->host) == 0)
    {
        LOG("Session host is empty!");
        LOG("Host is empty!");
        return KINETIC_STATUS_HOST_EMPTY;
    }

    if (session->hmacKey.len < 1 || session->hmacKey.data == NULL)
    if (config->hmacKey.len < 1 || config->hmacKey.data == NULL)
    {
        LOG("HMAC key is NULL or empty!");
        return KINETIC_STATUS_HMAC_EMPTY;
    }

    KineticConnection* connection = KineticConnection_NewConnection(session);
    if (connection == NULL)
    *handle = KineticConnection_NewConnection(config);
    if (handle == KINETIC_HANDLE_INVALID)
    {
        LOG("Failed connecting to device (connection is NULL)!");
        LOG("Failed connecting to device!");
        return KINETIC_STATUS_SESSION_INVALID;
    }

    KineticConnection* connection = KineticConnection_FromHandle(*handle);
    if (!KineticConnection_Connect(connection))
    {
        LOGF("Failed creating connection to %s:%d",
            session->host, session->port);
            config->host, config->port);
        KineticConnection_FreeConnection(handle);
        *handle = KINETIC_HANDLE_INVALID;
        return KINETIC_STATUS_CONNECTION_ERROR;
    }

    return KINETIC_STATUS_SUCCESS;
}

KineticStatus KineticClient_Disconnect(KineticSession* session)
KineticStatus KineticClient_Disconnect(KineticSessionHandle* const handle)
{
    if (session == NULL)
    if (*handle == KINETIC_HANDLE_INVALID)
    {
        LOG("Specified KineticSession is NULL!");
        return KINETIC_STATUS_SESSION_EMPTY;
        LOG("Invalid KineticSessionHandle specified!");
        return KINETIC_STATUS_SESSION_INVALID;
    }
    KineticConnection* connection = KineticConnection_GetFromSession(session);

    KineticConnection* connection = KineticConnection_FromHandle(*handle);
    if (connection == NULL)
    {
        return KINETIC_STATUS_SESSION_INVALID;
    }

    KineticConnection_Disconnect(connection);
    KineticConnection_FreeConnection(session);
    KineticConnection_FreeConnection(handle);
    
    return KINETIC_STATUS_SUCCESS;
}

KineticOperation KineticClient_NewOperation(KineticSession* session)
KineticStatus KineticClient_NoOp(KineticSessionHandle session)
{
    KineticOperation operation;
    if (session == NULL)
    if (session == KINETIC_HANDLE_INVALID)
    {
        operation = (KineticOperation) {.session = NULL};
        LOG("Specified session has invalid handle value");
        return KINETIC_STATUS_SESSION_EMPTY;
    }
    else

    KineticConnection* connection = KineticConnection_FromHandle(session);
    if (connection == NULL)
    {
        KINETIC_OPERATION_INIT(&operation, session);
        KineticOperation_Create(&operation, session);
    }
    return operation;
        LOG("Specified session is not associated with a connection");
        return KINETIC_STATUS_SESSION_INVALID;
    }

KineticStatus KineticClient_NoOp(const KineticOperation* operation)
    KineticOperation operation;
    KineticStatus status = KineticOperation_Create(&operation, connection);
    if (status != KINETIC_STATUS_SUCCESS)
    {
    assert(operation->connection != NULL);
    assert(operation->request != NULL);
    assert(operation->response != NULL);
        return status;
    }

    // Initialize request
    KineticOperation_BuildNoop(operation);
    KineticOperation_BuildNoop(&operation);

    // Execute the operation
    return KineticClient_ExecuteOperation(operation);
    return KineticClient_ExecuteOperation(&operation);
}

KineticStatus KineticClient_Put(const KineticOperation* operation,
#if 0
KineticStatus KineticClient_Put(KineticSessionHandle session,
    const KineticKeyValue* const metadata)
{
    assert(operation->connection != NULL);
@@ -183,7 +156,7 @@ KineticStatus KineticClient_Put(const KineticOperation* operation,
    return KineticClient_ExecuteOperation(operation);
}

KineticStatus KineticClient_Get(const KineticOperation* operation,
KineticStatus KineticClient_Get(KineticSessionHandle session,
    const KineticKeyValue* metadata)
{
    assert(operation->connection != NULL);
@@ -222,7 +195,7 @@ KineticStatus KineticClient_Get(const KineticOperation* operation,
    return status;
}

KineticStatus KineticClient_Delete(KineticOperation* operation,
KineticStatus KineticClient_Delete(KineticSessionHandle session,
    const KineticKeyValue* const metadata)
{
    assert(operation->connection != NULL);
@@ -244,3 +217,4 @@ KineticStatus KineticClient_Delete(KineticOperation* operation,

    return status;
}
#endif
Loading