Commit 8de218fe authored by Greg Williams's avatar Greg Williams
Browse files

Eliminated the rest of statically sized memory from Entries in favor of...

Eliminated the rest of statically sized memory from Entries in favor of dynamic allocation of exact size per PDU header.
Added stubbed out KineticClient_GetKeyRange() to public API.
parent 7b949623
Loading
Loading
Loading
Loading
+26 −1
Original line number Diff line number Diff line
@@ -23,6 +23,13 @@

#include "kinetic_types.h"

/**
 * Initializes the Kinetic API andcsonfigures logging destination.
 *
 * @param logFile Path to log file. Specify NULL to log to STDOUT.
 */
void KineticClient_Init(const char* logFile);

/**
 * @brief Initializes the Kinetic API, configures logging destination, establishes a
 * connection to the specified Kinetic Device, and establishes a session.
@@ -34,7 +41,6 @@
 *  .clusterVersion     Cluster version to use for the session
 *  .identity           Identity to use for the session
 *  .hmacKey            Key to use for HMAC calculations (NULL-terminated string)
 *  .logFile            Path to log file. Defaults to STDOUT if unspecified
 * @handle          Pointer to KineticSessionHandle (populated upon successful connection)
 *
 * @return          Returns the resulting KineticStatus
@@ -96,4 +102,23 @@ KineticStatus KineticClient_Get(KineticSessionHandle handle,
KineticStatus KineticClient_Delete(KineticSessionHandle handle,
                                   KineticEntry* const metadata);

/**
 * @brief Executes a GETKEYRANGE command to retrive a set of keys in the range
 * specified range from the Kinetic Device
 *
 * @param handle        KineticSessionHandle for a connected session.
 * @param range         KineticKeyRange specifying keys to return
 * @param keys          An pointer to an array of ByteBuffers with pre-allocated
 *                      arrays to store the retrieved keys
 * @param max_keys      The number maximum number of keys to request from the
 *                      device. There must be at least this many ByteBuffers in
 *                      the `keys` array for population.
 *
 *
 * @return              Returns 0 upon succes, -1 or the Kinetic status code
 *                      upon failure
 */
KineticStatus KineticClient_GetKeyRange(KineticSessionHandle handle,
                                        KineticKeyRange* range, ByteBuffer* keys[], int max_keys);

#endif // _KINETIC_CLIENT_H
+11 −6
Original line number Diff line number Diff line
@@ -115,15 +115,10 @@ typedef struct _KineticSession {
    // client and the device, used to sign requests.
    uint8_t keyData[KINETIC_MAX_KEY_LEN];
    ByteArray hmacKey;

    // Log file name (uses stdout if empty)
    char    logFile[LOG_FILE_NAME_MAX];
} KineticSession;

#define KINETIC_SESSION_INIT(_session, \
    _host, _clusterVersion, _identity, _hmacKey) { \
#define KINETIC_SESSION_INIT(_session, _host, _clusterVersion, _identity, _hmacKey) { \
    *(_session) = (KineticSession) { \
        .logFile = "", \
        .port = KINETIC_PORT, \
        .clusterVersion = (_clusterVersion), \
        .identity = (_identity), \
@@ -172,4 +167,14 @@ typedef struct _KineticEntry {
    ByteBuffer value;
} KineticEntry;

// Kinetic Key Range request structure
typedef struct _KineticKeyRange {
    ByteBuffer startKey;
    ByteBuffer endKey;
    bool startKeyInclusive;
    bool endKeyInclusive;
    int32_t maxReturned;
    bool reverse;
} KineticKeyRange;

#endif // _KINETIC_TYPES_H
+56 −0
Original line number Diff line number Diff line
@@ -81,6 +81,11 @@ static KineticStatus KineticClient_ExecuteOperation(KineticOperation* operation)
    return status;
}

void KineticClient_Init(const char* logFile)
{
    KineticLogger_Init(logFile);
}

KineticStatus KineticClient_Connect(const KineticSession* config,
                                    KineticSessionHandle* handle)
{
@@ -263,3 +268,54 @@ KineticStatus KineticClient_Delete(KineticSessionHandle handle,

    return status;
}

// command {
//   header {
//     // See above for descriptions of these fields
//     clusterVersion: ...
//     identity: ...
//     connectionID: ...
//     sequence: ...

//     // messageType should be GETKEYRANGE
//     messageType: GETKEYRANGE
//   }
//   body {
//     // The range message must be populated
//     range {
//       // Required bytes, the beginning of the requested range
//       startKey: "..."

//       // Optional bool, defaults to false
//       // True indicates that the start key should be included in the returned
//       // range
//       startKeyInclusive: ...

//       // Required bytes, the end of the requested range
//       endKey: "..."

//       // Optional bool, defaults to false
//       // True indicates that the end key should be included in the returned
//       // range
//       endKeyInclusive: ...

//       // Required int32, must be greater than 0
//       // The maximum number of keys returned, in sorted order
//       maxReturned: ...

//       // Optional bool, defaults to false
//       // If true, the key range will be returned in reverse order, starting at
//       // endKey and moving back to startKey.  For instance
//       // if the search is startKey="j", endKey="k", maxReturned=2,
//       // reverse=true and the keys "k0", "k1", "k2" exist
//       // the system will return "k2" and "k1" in that order.
//       reverse: ....
//     }
//   }
// }
KineticStatus KineticClient_GetKeyRange(KineticSessionHandle handle,
                                        KineticKeyRange* range, ByteBuffer* keys[], int max_keys)
{
    KineticStatus status = KINETIC_STATUS_SUCCESS;
    return status;
}
+27 −10
Original line number Diff line number Diff line
@@ -316,20 +316,28 @@ KineticStatus KineticSocket_ReadProtobuf(int socket, KineticPDU* pdu)
    #ifdef KINETIC_LOG_SOCKET_OPERATIONS
    LOGF("Reading %zd bytes of protobuf", bytesToRead);
    #endif
    ByteBuffer recvBuffer = ByteBuffer_Create(pdu->protobufRaw, bytesToRead);

    uint8_t* packed = (uint8_t*)malloc(bytesToRead);
    if (packed == NULL) {
        LOG("Failed allocating memory for protocol buffer");
        return KINETIC_STATUS_MEMORY_ERROR;
    }

    ByteBuffer recvBuffer = ByteBuffer_Create(packed, bytesToRead);
    KineticStatus status = KineticSocket_Read(socket, &recvBuffer, bytesToRead);

    if (status != KINETIC_STATUS_SUCCESS) {
        LOG("Protobuf read failed!");
        free(packed);
        return status;
    }
    else {
        pdu->proto = KineticProto__unpack(
            NULL, recvBuffer.bytesUsed, recvBuffer.array.data);
    }

    #ifdef KINETIC_LOG_SOCKET_OPERATIONS
    LOG("Read packed protobuf successfully!");
    #endif
    free(packed);

    pdu->proto =
        KineticProto__unpack(NULL, recvBuffer.bytesUsed, recvBuffer.array.data);
    if (pdu->proto == NULL) {
        pdu->protobufDynamicallyExtracted = false;
        LOG("Error unpacking incoming Kinetic protobuf message!");
@@ -384,12 +392,21 @@ KineticStatus KineticSocket_WriteProtobuf(int socket, KineticPDU* pdu)
    #ifdef KINETIC_LOG_SOCKET_OPERATIONS
    LOGF("Writing protobuf (%zd bytes)...", pdu->header.protobufLength);
    #endif
    size_t len = KineticProto__pack(&pdu->protoData.message.proto,
                                    pdu->protobufRaw);

    uint8_t* packed = (uint8_t*)malloc(pdu->header.protobufLength);

    if (packed == NULL) {
        LOG("Failed allocating memory for protocol buffer");
        return KINETIC_STATUS_MEMORY_ERROR;
    }
    size_t len = KineticProto__pack(&pdu->protoData.message.proto, packed);
    assert(len == pdu->header.protobufLength);

    ByteBuffer buffer = ByteBuffer_Create(pdu->protobufRaw, len);
    ByteBuffer buffer = ByteBuffer_Create(packed, len);
    buffer.bytesUsed = len;

    return KineticSocket_Write(socket, &buffer);
    KineticStatus status = KineticSocket_Write(socket, &buffer);

    free(packed);
    return status;
}
+0 −20
Original line number Diff line number Diff line
@@ -76,9 +76,6 @@ typedef struct _KineticConnection {
        .connectionID = time(NULL), \
        .sequence = 0, \
    }; \
    /*(*_con).key = (ByteArray){.data = (*_con).keyData, .len = (_key).len};*/ \
    /*if ((_key).data != NULL && (_key).len > 0) {*/ \
    /*    memcpy((_con)->keyData, (_key).data, (_key).len); }*/ \
}


@@ -158,17 +155,10 @@ struct _KineticPDU {
    // Message associated with this PDU instance
    union {
        KineticProto protoBase;

        // Pre-structured message w/command
        KineticMessage message;

        // Pad protobuf to max size for extraction of arbitrary packed proto
        uint8_t buffer[PDU_PROTO_MAX_UNPACKED_LEN];
    } protoData;        // Proto will always be first
    KineticProto* proto;
    bool protobufDynamicallyExtracted;
    // bool rawProtoEnabled;
    uint8_t protobufRaw[PDU_PROTO_MAX_LEN];

    // Object meta-data to be used/populated if provided and pertinent to the operation
    KineticEntry entry;
@@ -186,8 +176,6 @@ struct _KineticPDU {
    (_pdu)->connection = (_con); \
    (_pdu)->header = KINETIC_PDU_HEADER_INIT; \
    (_pdu)->headerNBO = KINETIC_PDU_HEADER_INIT; \
    /*(_pdu)->value = BYTE_ARRAY_NONE;*/ \
    /*(_pdu)->proto = &(_pdu)->protoData.message.proto;*/ \
    KINETIC_MESSAGE_HEADER_INIT(&((_pdu)->protoData.message.header), (_con)); \
}
#define KINETIC_PDU_INIT_WITH_MESSAGE(_pdu, _con) { \
@@ -213,14 +201,6 @@ typedef struct _KineticOperation {
        .connection = (_con), \
    }

// // Structure for defining a custom memory allocator.
// typedef struct
// {
//     void        *(*alloc)(void *allocator_data, size_t size);
//     void        (*free)(void *allocator_data, void *pointer);
//     // Opaque pointer passed to `alloc` and `free` functions
//     void        *allocator_data;
// } ProtobufCAllocator;

KineticProto_Algorithm KineticProto_Algorithm_from_KineticAlgorithm(
    KineticAlgorithm kinteicAlgorithm);
Loading