Commit c36b4729 authored by Greg Williams's avatar Greg Williams
Browse files

Completed integration of dynamically allocated/deallocated PDUs per operation,...

Completed integration of dynamically allocated/deallocated PDUs per operation, and also preps for overlapped IO queued allocation, where the PDUs should end up being deallocated after the async callback completes.
Updated license template and instances to fit 80 char width.
parent 1696c261
Loading
Loading
Loading
Loading
+1 −2
Original line number Diff line number Diff line
@@ -17,4 +17,3 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
*/
+8 −8
Original line number Diff line number Diff line
@@ -40,7 +40,7 @@
 * @return          Returns the resulting KineticStatus
 */
KineticStatus KineticClient_Connect(const KineticSession* config,
    KineticSessionHandle* const handle);
    KineticSessionHandle* handle);

/**
 * @brief Closes the connection to a host.
@@ -58,7 +58,7 @@ KineticStatus KineticClient_Disconnect(KineticSessionHandle* const handle);
 *
 * @return              Returns the resulting KineticStatus
 */
KineticStatus KineticClient_NoOp(KineticSessionHandle session);
KineticStatus KineticClient_NoOp(KineticSessionHandle handle);

/**
 * @brief Executes a PUT command to store/update an entry on the Kinetic Device.
@@ -69,8 +69,8 @@ KineticStatus KineticClient_NoOp(KineticSessionHandle session);
 *
 * @return              Returns the resulting KineticStatus
 */
KineticStatus KineticClient_Put(KineticSessionHandle session,
    KineticKeyValue* const metadata);
KineticStatus KineticClient_Put(KineticSessionHandle handle,
    const KineticKeyValue* const metadata);

/**
 * @brief Executes a GET command to retrieve and entry from the Kinetic Device.
@@ -81,7 +81,7 @@ KineticStatus KineticClient_Put(KineticSessionHandle session,
 *
 * @return              Returns the resulting KineticStatus
 */
KineticStatus KineticClient_Get(KineticSessionHandle session,
KineticStatus KineticClient_Get(KineticSessionHandle handle,
    KineticKeyValue* const metadata);

/**
@@ -93,7 +93,7 @@ KineticStatus KineticClient_Get(KineticSessionHandle session,
 *
 * @return              Returns the resulting KineticStatus
 */
KineticStatus KineticClient_Delete(KineticSessionHandle session,
    KineticKeyValue* const metadata);
KineticStatus KineticClient_Delete(KineticSessionHandle handle,
    const KineticKeyValue* const metadata);

#endif // _KINETIC_CLIENT_H
+6 −4
Original line number Diff line number Diff line
@@ -72,10 +72,10 @@
  :plugins:
    - :ignore
    # - :ignore_args
    - :array
    - :cexception
    - :callback
    - :return_thru_ptr
    # - :array
    # - :cexception
    # - :callback
    # - :return_thru_ptr
  :unity_helper_path: test/support/unity_helper.h
  :includes_h_post_orig_header:
    - "unity.h"
@@ -87,6 +87,8 @@
    int8_t:     INT8
    bool_t:     BOOL
    size_t:     INT
    KineticSessionHandle: INT
    KineticOperationHandle: INT
  :callback_include_count: true

:tools:
+8 −1
Original line number Diff line number Diff line
@@ -113,6 +113,13 @@ void KineticAllocator_FreePDU(KineticPDU** pdu)
            }
        }

        if ((cur->pdu.proto != NULL) && cur->pdu.protobufDynamicallyExtracted)
        {
            KineticProto__free_unpacked(cur->pdu.proto, NULL);
            cur->pdu.proto = NULL;
            cur->pdu.protobufDynamicallyExtracted = false;
        };

        LOGF("  Freeing item @ 0x%0llX, pdu @ 0x%0llX",
            (long long)cur, (long long)&cur->pdu);
        free(cur);
+107 −75
Original line number Diff line number Diff line
@@ -28,6 +28,32 @@
#include "kinetic_logger.h"
#include <stdlib.h>

static KineticStatus KineticClient_CreateOperation(
    KineticOperation* const operation,
    KineticSessionHandle handle)
{
    if (handle == KINETIC_HANDLE_INVALID)
    {
        LOG("Specified session has invalid handle value");
        return KINETIC_STATUS_SESSION_EMPTY;
    }

    KineticConnection* connection = KineticConnection_FromHandle(handle);
    if (connection == NULL)
    {
        LOG("Specified session is not associated with a connection");
        return KINETIC_STATUS_SESSION_INVALID;
    }

    *operation = KineticOperation_Create(connection);
    if (operation->request == NULL || operation->response == NULL)
    {
        return KINETIC_STATUS_NO_PDUS_AVAVILABLE;
    }

    return KINETIC_STATUS_SUCCESS;
}

static KineticStatus KineticClient_ExecuteOperation(KineticOperation* operation)
{
    KineticStatus status = KINETIC_STATUS_INVALID;
@@ -45,12 +71,21 @@ static KineticStatus KineticClient_ExecuteOperation(KineticOperation* operation)
        }
    }

    KineticOperation_Free(operation);

    return status;
}

KineticStatus KineticClient_Connect(const KineticSession* config,
    KineticSessionHandle* const handle)
    KineticSessionHandle* handle)
{
    if (handle == NULL)
    {
        LOG("Session handle is NULL!");
        return KINETIC_STATUS_SESSION_EMPTY;
    }
    *handle = KINETIC_HANDLE_INVALID;

    if (config == NULL)
    {
        LOG("KineticSession is NULL!");
@@ -77,13 +112,19 @@ KineticStatus KineticClient_Connect(const KineticSession* config,
    }

    KineticConnection* connection = KineticConnection_FromHandle(*handle);
    if (!KineticConnection_Connect(connection))
    if (connection == NULL)
    {
        LOG("Failed getting valid connection from handle!");
        return KINETIC_STATUS_CONNECTION_ERROR;
    }

    KineticStatus status = KineticConnection_Connect(connection);
    if (status != KINETIC_STATUS_SUCCESS)
    {
        LOGF("Failed creating connection to %s:%d",
            config->host, config->port);
        LOGF("Failed creating connection to %s:%d", config->host, config->port);
        KineticConnection_FreeConnection(handle);
        *handle = KINETIC_HANDLE_INVALID;
        return KINETIC_STATUS_CONNECTION_ERROR;
        return status;
    }

    return KINETIC_STATUS_SUCCESS;
@@ -100,32 +141,28 @@ KineticStatus KineticClient_Disconnect(KineticSessionHandle* const handle)
    KineticConnection* connection = KineticConnection_FromHandle(*handle);
    if (connection == NULL)
    {
        return KINETIC_STATUS_SESSION_INVALID;
    }

    KineticConnection_Disconnect(connection);
    KineticConnection_FreeConnection(handle);
    
    return KINETIC_STATUS_SUCCESS;
        LOG("Failed getting valid connection from handle!");
        return KINETIC_STATUS_CONNECTION_ERROR;
    }

KineticStatus KineticClient_NoOp(KineticSessionHandle session)
{
    if (session == KINETIC_HANDLE_INVALID)
    KineticStatus status = KineticConnection_Disconnect(connection);
    if (status != KINETIC_STATUS_SUCCESS)
    {
        LOG("Specified session has invalid handle value");
        return KINETIC_STATUS_SESSION_EMPTY;
        LOG("Disconnection failed!");
    }

    KineticConnection* connection = KineticConnection_FromHandle(session);
    if (connection == NULL)
    {
        LOG("Specified session is not associated with a connection");
        return KINETIC_STATUS_SESSION_INVALID;
    KineticConnection_FreeConnection(handle);
    *handle = KINETIC_HANDLE_INVALID;
    
    return status;
}

KineticStatus KineticClient_NoOp(KineticSessionHandle handle)
{
    KineticStatus status;
    KineticOperation operation;
    KineticStatus status = KineticOperation_Create(&operation, connection);

    status = KineticClient_CreateOperation(&operation, handle);
    if (status != KINETIC_STATUS_SUCCESS)
    {
        return status;
@@ -138,83 +175,78 @@ KineticStatus KineticClient_NoOp(KineticSessionHandle session)
    return KineticClient_ExecuteOperation(&operation);
}

#if 0
KineticStatus KineticClient_Put(KineticSessionHandle session,
KineticStatus KineticClient_Put(KineticSessionHandle handle,
    const KineticKeyValue* const metadata)
{
    assert(operation->connection != NULL);
    assert(operation->request != NULL);
    assert(operation->response != NULL);
    assert(metadata != NULL);
    assert(metadata->value.data != NULL);
    assert(metadata->value.len <= PDU_VALUE_MAX_LEN);
    KineticStatus status;
    KineticOperation operation;

    status = KineticClient_CreateOperation(&operation, handle);
    if (status != KINETIC_STATUS_SUCCESS)
    {
        return status;
    }

    // Initialize request
    KineticOperation_BuildPut(operation, metadata);
    KineticOperation_BuildPut(&operation, metadata);

    // Execute the operation
    return KineticClient_ExecuteOperation(operation);
    return KineticClient_ExecuteOperation(&operation);
}

KineticStatus KineticClient_Get(KineticSessionHandle session,
    const KineticKeyValue* metadata)
KineticStatus KineticClient_Get(KineticSessionHandle handle,
    KineticKeyValue* const metadata)
{
    assert(operation->connection != NULL);
    assert(operation->request != NULL);
    assert(operation->response != NULL);
    assert(metadata != NULL);
    assert(metadata->key.data != NULL);
    assert(metadata->key.len <= KINETIC_MAX_KEY_LEN);
    KineticStatus status;
    KineticOperation operation;

    if (!metadata->metadataOnly)
    {
        if (metadata->value.data == NULL)
    status = KineticClient_CreateOperation(&operation, handle);
    if (status != KINETIC_STATUS_SUCCESS)
    {
             metadata->value = (ByteArray){
                .data = operation->response->valueBuffer,
                .len = PDU_VALUE_MAX_LEN};
        }
        return status;
    }

    // if (!metadata->metadataOnly)
    // {
    //     if (metadata->value.data == NULL)
    //     {
    //          metadata->value = (ByteArray){
    //             .data = operation->response->valueBuffer,
    //             .len = PDU_VALUE_MAX_LEN};
    //     }
    // }

    // Initialize request
    KineticOperation_BuildGet(operation, metadata);
    KineticOperation_BuildGet(&operation, metadata);

    // Execute the operation
    KineticStatus status = KineticClient_ExecuteOperation(operation);
    status = KineticClient_ExecuteOperation(&operation);

    // Update the metadata with the received value length upon success
    if (status == KINETIC_STATUS_SUCCESS)
    {
        metadata->value.len = operation->response->value.len;
    }
    else
    {
        metadata->value.len = 0;
    }
    // metadata->value.len = 0;
    // if (status == KINETIC_STATUS_SUCCESS)
    // {
    //     metadata->value.len = operation->response->value.len;
    // }

    return status;
}

KineticStatus KineticClient_Delete(KineticSessionHandle session,
KineticStatus KineticClient_Delete(KineticSessionHandle handle,
    const KineticKeyValue* const metadata)
{
    assert(operation->connection != NULL);
    assert(operation->request != NULL);
    assert(operation->response != NULL);
    assert(metadata != NULL);
    assert(metadata->key.data != NULL);
    assert(metadata->key.len > 0);
    KineticStatus status;
    KineticOperation operation;

    status = KineticClient_CreateOperation(&operation, handle);
    if (status != KINETIC_STATUS_SUCCESS)
    {
        return status;
    }

    // Initialize request
    KineticOperation_BuildDelete(operation, metadata);
    KineticOperation_BuildDelete(&operation, metadata);

    // Execute the operation
    KineticStatus status = KineticClient_ExecuteOperation(operation);

    // Zero out value length for all DELETE operations
    operation->response->value.len = 0;
    metadata->value.len = 0;

    return status;
    return KineticClient_ExecuteOperation(&operation);
}
#endif
Loading