Commit f611804b authored by Scott Vokes's avatar Scott Vokes
Browse files

Add GETNEXT / GETPREVIOUS and tighten up API for GET commands.

If GET commands are being called without either a key or (if not
using metadataOnly) a value buffer to store the result, then respond
with an explicit error code indicating such.
parent 9b21b557
Loading
Loading
Loading
Loading
+49 −10
Original line number Diff line number Diff line
@@ -48,7 +48,7 @@ void KineticClient_Shutdown(void);
 *  .hmacKey          Key to use for HMAC calculations (NULL-terminated string)
 * @param handle    Pointer to KineticSessionHandle (populated upon successful connection)
 *
 * @return          Returns the resulting KineticStatus
 * @return          Returns the resulting KineticStatus.
 */
KineticStatus KineticClient_Connect(const KineticSession* config,
                                    KineticSessionHandle* handle);
@@ -58,7 +58,7 @@ KineticStatus KineticClient_Connect(const KineticSession* config,
 *
 * @param handle    KineticSessionHandle for a connected session.
 *
 * @return          Returns the resulting KineticStatus
 * @return          Returns the resulting KineticStatus.
 */
KineticStatus KineticClient_Disconnect(KineticSessionHandle* const handle);

@@ -67,7 +67,7 @@ KineticStatus KineticClient_Disconnect(KineticSessionHandle* const handle);
 *
 * @param handle        KineticSessionHandle for a connected session.
 *
 * @return              Returns the resulting KineticStatus
 * @return              Returns the resulting KineticStatus.
 */
KineticStatus KineticClient_NoOp(KineticSessionHandle handle);

@@ -81,8 +81,7 @@ KineticStatus KineticClient_NoOp(KineticSessionHandle handle);
 *                      executed in asynchronous mode, and closure callback
 *                      will be called upon completion.
 * 
 *
 * @return              Returns the resulting KineticStatus
 * @return              Returns the resulting KineticStatus.
 */
KineticStatus KineticClient_Put(KineticSessionHandle handle,
                                KineticEntry* const entry,
@@ -96,13 +95,13 @@ KineticStatus KineticClient_Put(KineticSessionHandle handle,
 *                      executed in asynchronous mode, and closure callback
 *                      will be called upon completion.
 *                      
 * @return              Returns the resulting KineticStatus
 * @return              Returns the resulting KineticStatus.
 */
KineticStatus KineticClient_Flush(KineticSessionHandle handle,
                                  KineticCompletionClosure* closure);

/**
 * @brief Executes a GET command to retrieve and entry from the Kinetic Device.
 * @brief Executes a GET command to retrieve an entry from the Kinetic Device.
 *
 * @param handle        KineticSessionHandle for a connected session.
 * @param entry         Key/value entry for object to retrieve. 'value' will
@@ -111,12 +110,52 @@ KineticStatus KineticClient_Flush(KineticSessionHandle handle,
 *                      executed in asynchronous mode, and closure callback
 *                      will be called upon completion.
 *
 * @return              Returns the resulting KineticStatus
 * @return              Returns the resulting KineticStatus.
 */
KineticStatus KineticClient_Get(KineticSessionHandle handle,
                                KineticEntry* const entry,
                                KineticCompletionClosure* closure);

/**
 * @brief Executes a GETPREVIOUS command to retrieve the next entry from the Kinetic Device.
 *
 * @param handle        KineticSessionHandle for a connected session.
 * @param entry         Key/value entry for object to retrieve. 'value' will
 *                      be populated unless 'metadataOnly' is set to 'true'.
 *                      The key and value fields will be populated with the
 *                      previous key and its corresponding value, according to
 *                      lexicographical byte order.
 *                      
 * @param closure       Optional closure. If specified, operation will be
 *                      executed in asynchronous mode, and closure callback
 *                      will be called upon completion.
 *
 * @return              Returns the resulting KineticStatus.
 */
KineticStatus KineticClient_GetPrevious(KineticSessionHandle handle,
                                        KineticEntry* const entry,
                                        KineticCompletionClosure* closure);

/**
 * @brief Executes a GETNEXT command to retrieve the next entry from the Kinetic Device.
 *
 * @param handle        KineticSessionHandle for a connected session.
 * @param entry         Key/value entry for object to retrieve. 'value' will
 *                      be populated unless 'metadataOnly' is set to 'true'.
 *                      The key and value fields will be populated with the
 *                      next key and its corresponding value, according to
 *                      lexicographical byte order.
 *                      
 * @param closure       Optional closure. If specified, operation will be
 *                      executed in asynchronous mode, and closure callback
 *                      will be called upon completion.
 *
 * @return              Returns the resulting KineticStatus.
 */
KineticStatus KineticClient_GetNext(KineticSessionHandle handle,
                                    KineticEntry* const entry,
                                    KineticCompletionClosure* closure);

/**
 * @brief Executes a DELETE command to delete an entry from the Kinetic Device
 *
@@ -127,7 +166,7 @@ KineticStatus KineticClient_Get(KineticSessionHandle handle,
 *                      executed in asynchronous mode, and closure callback
 *                      will be called upon completion.
 *
 * @return              Returns the resulting KineticStatus
 * @return              Returns the resulting KineticStatus.
 */
KineticStatus KineticClient_Delete(KineticSessionHandle handle,
                                   KineticEntry* const entry,
@@ -178,7 +217,7 @@ KineticStatus KineticClient_GetLog(KineticSessionHandle handle,
 *
 * @param handle        KineticSessionHandle for a connected session.
 *
 * @return              Returns the resulting KineticStatus
 * @return              Returns the resulting KineticStatus.
 */
KineticStatus KineticClient_InstantSecureErase(KineticSessionHandle handle);

+2 −0
Original line number Diff line number Diff line
@@ -157,6 +157,8 @@ typedef enum {
    KINETIC_STATUS_MEMORY_ERROR,        // Failed allocating/deallocating memory
    KINETIC_STATUS_SOCKET_TIMEOUT,      // A timeout occurred while waiting for a socket operation
    KINETIC_STATUS_SOCKET_ERROR,        // An I/O error occurred during a socket operation
    KINETIC_STATUS_MISSING_KEY,         // An operation is missing a required key
    KINETIC_STATUS_MISSING_VALUE_BUFFER,// An operation is missing a required value buffer
    KINETIC_STATUS_COUNT                // Number of status codes in KineticStatusDescriptor
} KineticStatus;

+60 −5
Original line number Diff line number Diff line
@@ -152,24 +152,79 @@ KineticStatus KineticClient_Flush(KineticSessionHandle handle,
    return KineticController_ExecuteOperation(operation, closure);
}

KineticStatus KineticClient_Get(KineticSessionHandle handle,
static bool has_key(KineticEntry* const entry)
{
    return entry->key.array.data != NULL;
}

static bool has_value_buffer(KineticEntry* const entry)
{
    return entry->value.array.data != NULL;
}

typedef enum {
    CMD_GET,
    CMD_GET_NEXT,
    CMD_GET_PREVIOUS,
} GET_COMMAND;

static KineticStatus handle_get_command(GET_COMMAND cmd,
                                        KineticSessionHandle handle,
                                        KineticEntry* const entry,
                                        KineticCompletionClosure* closure)
{
    assert(handle != KINETIC_HANDLE_INVALID);
    assert(entry != NULL);
    if (!entry->metadataOnly) {assert(entry->value.array.data != NULL);}

    if (!has_key(entry)) { return KINETIC_STATUS_MISSING_KEY; }
    if (!has_value_buffer(entry) && !entry->metadataOnly) {
        return KINETIC_STATUS_MISSING_VALUE_BUFFER;
    }

    KineticOperation* operation = KineticController_CreateOperation(handle);
    if (operation == NULL) {return KINETIC_STATUS_MEMORY_ERROR;}

    // Initialize request
    switch (cmd)
    {
    case CMD_GET:
        KineticOperation_BuildGet(operation, entry);
        break;
    case CMD_GET_NEXT:
        KineticOperation_BuildGetNext(operation, entry);
        break;
    case CMD_GET_PREVIOUS:
        KineticOperation_BuildGetPrevious(operation, entry);
        break;
    default:
        assert(false);
    }

    // Execute the operation
    return KineticController_ExecuteOperation(operation, closure);
}

KineticStatus KineticClient_Get(KineticSessionHandle handle,
                                KineticEntry* const entry,
                                KineticCompletionClosure* closure)
{
    return handle_get_command(CMD_GET, handle, entry, closure);
}

KineticStatus KineticClient_GetPrevious(KineticSessionHandle handle,
                                        KineticEntry* const entry,
                                        KineticCompletionClosure* closure)
{
    return handle_get_command(CMD_GET_PREVIOUS, handle, entry, closure);
}

KineticStatus KineticClient_GetNext(KineticSessionHandle handle,
                                    KineticEntry* const entry,
                                    KineticCompletionClosure* closure)
{
    return handle_get_command(CMD_GET_NEXT, handle, entry, closure);
}

KineticStatus KineticClient_Delete(KineticSessionHandle handle,
                                   KineticEntry* const entry,
                                   KineticCompletionClosure* closure)
+45 −7
Original line number Diff line number Diff line
@@ -258,12 +258,12 @@ void KineticOperation_BuildPut(KineticOperation* const operation,
    operation->callback = &KineticOperation_PutCallback;
}

KineticStatus KineticOperation_GetCallback(KineticOperation* operation)
static KineticStatus get_cb(const char *cmd_name, KineticOperation* operation)
{
    assert(operation != NULL);
    assert(operation->connection != NULL);
    LOGF3("GET callback w/ operation (0x%0llX) on connection (0x%0llX)",
        operation, operation->connection);
    LOGF3("%s callback w/ operation (0x%0llX) on connection (0x%0llX)",
        cmd_name, operation, operation->connection);
    assert(operation->response != NULL);
    assert(operation->entry != NULL);

@@ -278,13 +278,15 @@ KineticStatus KineticOperation_GetCallback(KineticOperation* operation)
    return KINETIC_STATUS_SUCCESS;
}

void KineticOperation_BuildGet(KineticOperation* const operation,
                               KineticEntry* const entry)
static void build_get_command(KineticOperation* const operation,
                              KineticEntry* const entry,
                              KineticOperationCallback cb,
                              KineticProto_Command_MessageType command_id)
{
    KineticOperation_ValidateOperation(operation);
    KineticConnection_IncrementSequence(operation->connection);

    operation->request->protoData.message.command.header->messageType = KINETIC_PROTO_COMMAND_MESSAGE_TYPE_GET;
    operation->request->protoData.message.command.header->messageType = command_id;
    operation->request->protoData.message.command.header->has_messageType = true;
    operation->entry = entry;

@@ -296,7 +298,43 @@ void KineticOperation_BuildGet(KineticOperation* const operation,

    operation->valueEnabled = !entry->metadataOnly;
    operation->sendValue = false;
    operation->callback = &KineticOperation_GetCallback;
    operation->callback = cb;
}

static KineticStatus get_cmd_cb(KineticOperation* operation)
{
    return get_cb("GET", operation);
}

void KineticOperation_BuildGet(KineticOperation* const operation,
                               KineticEntry* const entry)
{
    return build_get_command(operation, entry, &get_cmd_cb,
        KINETIC_PROTO_COMMAND_MESSAGE_TYPE_GET);
}

static KineticStatus getprevious_cmd_cb(KineticOperation* operation)
{
    return get_cb("GETPREVIOUS", operation);
}

void KineticOperation_BuildGetPrevious(KineticOperation* const operation,
                                   KineticEntry* const entry)
{
    return build_get_command(operation, entry, &getprevious_cmd_cb,
        KINETIC_PROTO_COMMAND_MESSAGE_TYPE_GETPREVIOUS);
}

static KineticStatus getnext_cmd_cb(KineticOperation* operation)
{
    return get_cb("GETNEXT", operation);
}

void KineticOperation_BuildGetNext(KineticOperation* const operation,
                                   KineticEntry* const entry)
{
    return build_get_command(operation, entry, &getnext_cmd_cb,
        KINETIC_PROTO_COMMAND_MESSAGE_TYPE_GETNEXT);
}

KineticStatus KineticOperation_FlushCallback(KineticOperation* operation)
+5 −0
Original line number Diff line number Diff line
@@ -42,6 +42,11 @@ KineticStatus KineticOperation_GetCallback(KineticOperation* operation);
void KineticOperation_BuildGet(KineticOperation* const operation,
                               KineticEntry* const entry);

void KineticOperation_BuildGetNext(KineticOperation* const operation,
                                   KineticEntry* const entry);
void KineticOperation_BuildGetPrevious(KineticOperation* const operation,
                                       KineticEntry* const entry);

KineticStatus KineticOperation_FlushCallback(KineticOperation* operation);
void KineticOperation_BuildFlush(KineticOperation* const operation);

Loading