Commit 11b7db9c authored by Scott Vokes's avatar Scott Vokes
Browse files

Add FLUSHALLDATA command.

Note: Flush appears to be a no-op in the simulator, so one of the system tests is
currently failing.
parent 4842e7de
Loading
Loading
Loading
Loading
+13 −0
Original line number Diff line number Diff line
@@ -88,6 +88,19 @@ KineticStatus KineticClient_Put(KineticSessionHandle handle,
                                KineticEntry* const entry,
                                KineticCompletionClosure* closure);

/**
 * @brief Executes a FLUSHALLDATA command to flush pending PUTs or DELETEs.
 *
 * @param handle        KineticSessionHandle for a connected session.
 * @param closure       Optional closure. If specified, operation will be
 *                      executed in asynchronous mode, and closure callback
 *                      will be called upon completion.
 *                      
 * @return              Returns the resulting KineticStatus
 */
KineticStatus KineticClient_Flush(KineticSessionHandle handle,
                                  KineticCompletionClosure* closure);

/**
 * @brief Executes a GET command to retrieve and entry from the Kinetic Device.
 *
+14 −0
Original line number Diff line number Diff line
@@ -138,6 +138,20 @@ KineticStatus KineticClient_Put(KineticSessionHandle handle,
    return KineticController_ExecuteOperation(operation, closure);
}

KineticStatus KineticClient_Flush(KineticSessionHandle handle,
                                  KineticCompletionClosure* closure)
{
    assert(handle != KINETIC_HANDLE_INVALID);
    KineticOperation* operation = KineticController_CreateOperation(handle);
    if (operation == NULL) { return KINETIC_STATUS_MEMORY_ERROR; }

    // Initialize request
    KineticOperation_BuildFlush(operation);

    // Execute the operation
    return KineticController_ExecuteOperation(operation, closure);
}

KineticStatus KineticClient_Get(KineticSessionHandle handle,
                                KineticEntry* const entry,
                                KineticCompletionClosure* closure)
+24 −0
Original line number Diff line number Diff line
@@ -299,6 +299,30 @@ void KineticOperation_BuildGet(KineticOperation* const operation,
    operation->callback = &KineticOperation_GetCallback;
}

KineticStatus KineticOperation_FlushCallback(KineticOperation* operation)
{
    assert(operation != NULL);
    assert(operation->connection != NULL);
    LOGF3("FLUSHALLDATA callback w/ operation (0x%0llX) on connection (0x%0llX)",
        operation, operation->connection);
    return KINETIC_STATUS_SUCCESS;
}

void KineticOperation_BuildFlush(KineticOperation* const operation)
{
    KineticOperation_ValidateOperation(operation);
    KineticConnection_IncrementSequence(operation->connection);
    operation->request->protoData.message.command.header->messageType =
      KINETIC_PROTO_COMMAND_MESSAGE_TYPE_FLUSHALLDATA;
    operation->request->protoData.message.command.header->has_messageType = true;
    operation->valueEnabled = false;
    operation->sendValue = false;
    if (operation->callback == NULL)
    {
        operation->callback = &KineticOperation_FlushCallback;
    }
}

KineticStatus KineticOperation_DeleteCallback(KineticOperation* operation)
{
    assert(operation != NULL);
+3 −0
Original line number Diff line number Diff line
@@ -42,6 +42,9 @@ KineticStatus KineticOperation_GetCallback(KineticOperation* operation);
void KineticOperation_BuildGet(KineticOperation* const operation,
                               KineticEntry* const entry);

KineticStatus KineticOperation_FlushCallback(KineticOperation* operation);
void KineticOperation_BuildFlush(KineticOperation* const operation);

KineticStatus KineticOperation_DeleteCallback(KineticOperation* operation);
void KineticOperation_BuildDelete(KineticOperation* const operation,
                                  KineticEntry* const entry);
+214 −0
Original line number Diff line number Diff line
/*
* kinetic-c
* Copyright (C) 2014 Seagate Technology.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
*/
#include "byte_array.h"
#include "unity.h"
#include "unity_helper.h"
#include "system_test_fixture.h"
#include "protobuf-c/protobuf-c.h"
#include "socket99.h"
#include <string.h>
#include <stdlib.h>

#include "kinetic_client.h"
#include "kinetic_types.h"
#include "kinetic_types_internal.h"
#include "kinetic_controller.h"
#include "kinetic_device_info.h"
#include "kinetic_serial_allocator.h"
#include "kinetic_proto.h"
#include "kinetic_allocator.h"
#include "kinetic_message.h"
#include "kinetic_pdu.h"
#include "kinetic_logger.h"
#include "kinetic_operation.h"
#include "kinetic_hmac.h"
#include "kinetic_connection.h"
#include "kinetic_socket.h"
#include "kinetic_nbo.h"

static SystemTestFixture Fixture;

void setUp(void)
{
    SystemTestSetup(&Fixture);
}

void tearDown(void)
{
    SystemTestTearDown(&Fixture);
}

void test_Flush_should_succeed(void)
{
    KineticStatus status = KineticClient_Flush(Fixture.handle, NULL);
    TEST_ASSERT_EQUAL_KineticStatus(KINETIC_STATUS_SUCCESS, status);
}

void test_Flush_should_be_idempotent(void)
{
    KineticStatus status = KineticClient_Flush(Fixture.handle, NULL);
    TEST_ASSERT_EQUAL_KineticStatus(KINETIC_STATUS_SUCCESS, status);
    status = KineticClient_Flush(Fixture.handle, NULL);
    TEST_ASSERT_EQUAL_KineticStatus(KINETIC_STATUS_SUCCESS, status);
}

typedef struct {
    bool flag;
    pthread_cond_t cond;
    pthread_mutex_t mutex;
} completion_test_env;

static void completion_cb(KineticCompletionData* kinetic_data, void* client_data)
{
    TEST_ASSERT_NOT_NULL(client_data);
    completion_test_env *env = (completion_test_env *)client_data;
    env->flag = true;
    TEST_ASSERT_EQUAL(0, pthread_cond_signal(&env->cond));
    (void)kinetic_data;
}

void test_Flush_should_call_callback_after_completion(void)
{
    completion_test_env env;
    memset(&env, 0, sizeof(env));

    TEST_ASSERT_EQUAL(0, pthread_cond_init(&env.cond, NULL));
    TEST_ASSERT_EQUAL(0, pthread_mutex_init(&env.mutex, NULL));

    KineticCompletionClosure closure = {
        .callback = completion_cb,
        .clientData = (void *)&env
    };

    KineticStatus status = KineticClient_Flush(Fixture.handle, &closure);

    /* Wait up to 10 seconds for the callback to fire. */
    struct timeval tv;
    struct timespec ts;
    gettimeofday(&tv, NULL);
    ts.tv_sec = tv.tv_sec + 10;
    int res = pthread_cond_timedwait(&env.cond, &env.mutex, &ts);
    TEST_ASSERT_EQUAL(0, res);

    TEST_ASSERT_TRUE(env.flag);
    TEST_ASSERT_EQUAL_KineticStatus(KINETIC_STATUS_SUCCESS, status);
    TEST_ASSERT_EQUAL(0, pthread_cond_destroy(&env.cond));
}

static void no_op_callback(KineticCompletionData* kinetic_data, void* client_data)
{
    (void)kinetic_data;
    (void)client_data;
}

void test_Flush_should_flush_pending_async_PUTs_and_DELETEs(void)
{
    // Arguments shared between entries
    uint8_t VersionData[1024];
    ByteBuffer VersionBuffer = ByteBuffer_Create(VersionData, sizeof(VersionData), 0);
    uint8_t TagData[1024];
    ByteBuffer TagBuffer = ByteBuffer_Create(TagData, sizeof(TagData), 0);

    uint8_t key1[] = "key1";
    uint8_t value1[] = "value1";
    ByteBuffer KeyBuffer1 = ByteBuffer_Create(key1, sizeof(key1), 0);
    ByteBuffer ValueBuffer1 = ByteBuffer_Create(value1, sizeof(value1), 0);

    uint8_t key2[] = "key2";
    uint8_t value2[] = "value2";
    ByteBuffer KeyBuffer2 = ByteBuffer_Create(key2, sizeof(key2), 0);
    ByteBuffer ValueBuffer2 = ByteBuffer_Create(value2, sizeof(value2), 0);

    // Do a blocking PUT ("key1" => "value1") so we can delete it later
    KineticEntry Entry = (KineticEntry) {
        .key = KeyBuffer1,
        .newVersion = VersionBuffer,
        .tag = TagBuffer,
        .algorithm = KINETIC_ALGORITHM_SHA1,
        .value = ValueBuffer1,
        .force = true,
    };
    KineticStatus status = KineticClient_Put(Fixture.handle, &Entry, NULL);
    TEST_ASSERT_EQUAL_KineticStatus(KINETIC_STATUS_SUCCESS, status);

    // Do an async PUT ("key2" => "value2") so we can flush to complete it
    Entry = (KineticEntry) {
        .key = KeyBuffer2,
        .newVersion = VersionBuffer,
        .tag = TagBuffer,
        .algorithm = KINETIC_ALGORITHM_SHA1,
        .value = ValueBuffer2,
        .synchronization = KINETIC_SYNCHRONIZATION_WRITEBACK,
        .force = true,
    };
    KineticCompletionClosure no_op_closure = {
        .callback = &no_op_callback,
    };

    // Include a closure to signal that the PUT should be non-blocking.
    status = KineticClient_Put(Fixture.handle, &Entry, &no_op_closure);
    TEST_ASSERT_EQUAL_KineticStatus(KINETIC_STATUS_SUCCESS, status);

    // Do an async DELETE so we can flush to complete it
    KineticEntry deleteEntry = {
        .key = KeyBuffer1,
    };
    status = KineticClient_Delete(Fixture.handle, &deleteEntry, &no_op_closure);

    /* Now do a blocking flush and confirm that (key1,value1) has been
     * DELETEd and (key2,value2) have been PUT. */
    status = KineticClient_Flush(Fixture.handle, NULL);
    TEST_ASSERT_EQUAL_KineticStatus(KINETIC_STATUS_SUCCESS, status);

    // Reset buffers for GET requests
    ByteBuffer_Reset(&ValueBuffer1);
    ByteBuffer_Reset(&ValueBuffer2);

    // GET key1 --> expect NOT FOUND
    KineticEntry getEntry1 = {
        .key = KeyBuffer1,
        .dbVersion = VersionBuffer,
        .tag = TagBuffer,
        .algorithm = KINETIC_ALGORITHM_SHA1,
        .value = ValueBuffer1,
        .force = true,
    };
    status = KineticClient_Get(Fixture.handle, &getEntry1, NULL);
    TEST_ASSERT_EQUAL_KineticStatus(KINETIC_STATUS_NOT_FOUND, status);

    // GET key2 --> present
    KineticEntry getEntry2 = {
        .key = KeyBuffer2,
        .dbVersion = VersionBuffer,
        .tag = TagBuffer,
        .algorithm = KINETIC_ALGORITHM_SHA1,
        .value = ValueBuffer2,
        .force = true,
    };
    status = KineticClient_Get(Fixture.handle, &getEntry2, NULL);
    TEST_ASSERT_EQUAL_KineticStatus(KINETIC_STATUS_SUCCESS, status);
}



/*******************************************************************************
* ENSURE THIS IS AFTER ALL TESTS IN THE TEST SUITE
*******************************************************************************/
SYSTEM_TEST_SUITE_TEARDOWN(&Fixture);
Loading