Commit a50c2c74 authored by Greg Williams's avatar Greg Williams
Browse files

Updated test_system_multi_process.c to validate throughput against 2 kinetic...

Updated test_system_multi_process.c to validate throughput against 2 kinetic devices from 2 seperate processes/daemons.
NOTE: The drive apparently has an issue with processing very short keys (4 bytes) causing low throughput. Bump up to 8 bytes fixes performance issue in this test. Needs further investigation.
parent a67b9d17
Loading
Loading
Loading
Loading
+0 −6
Original line number Diff line number Diff line
@@ -121,8 +121,6 @@ static void run_throghput_tests(size_t num_ops, size_t value_size)
            }
        }

        printf("Waiting for PUTs to finish\n");

        size_t num_failures = 0;
        for (size_t i = 0; i < num_ops; i++) {
            KineticSemaphore_WaitForSignalAndDestroy(op_statuses[i].sem);
@@ -195,8 +193,6 @@ static void run_throghput_tests(size_t num_ops, size_t value_size)
            }
        }

        printf("Waiting for GETs to finish\n");

        size_t bytes_read = 0;
        for (size_t i = 0; i < num_ops; i++)
        {
@@ -275,8 +271,6 @@ static void run_throghput_tests(size_t num_ops, size_t value_size)
            }
        }

        printf("Waiting for DELETEs to finish\n");

        for (size_t i = 0; i < num_ops; i++)
        {
            KineticSemaphore_WaitForSignalAndDestroy(op_statuses[i].sem);
+71 −73
Original line number Diff line number Diff line
@@ -34,17 +34,28 @@ typedef struct {
    KineticStatus status;
} OpStatus;

struct key_struct {
    char data[32];
};

static const int MaxDaemons = 2;
static int NumDaemons = 0;
static char* CurrentHost;
static int CurrentPort;

static char* Hosts[] = {SYSTEM_TEST_HOST, SYSTEM_TEST_HOST};
static const int Ports[] = {KINETIC_PORT, KINETIC_PORT+1};

// static char* Hosts[] = {"10.138.123.153", "10.138.123.128", "10.138.123.112"};
// static const int Ports[] = {KINETIC_PORT, KINETIC_PORT};

static void op_finished(KineticCompletionData* kinetic_data, void* clientData);

static void child_task(void) {

    srand(time(NULL) + getpid()); // re-randomize this process

    const size_t num_ops = 100;
    const size_t num_ops = 500;
    const size_t obj_size = KINETIC_OBJ_SIZE;

    KineticClientConfig config = {
@@ -54,15 +65,16 @@ static void child_task(void) {
    KineticClient * client = KineticClient_Init(&config);

    KineticStatus status;
    ByteArray hmacArray = ByteArray_CreateWithCString("asdfasdf");
    KineticSession* session;
    KineticSessionConfig sessionConfig = {
        .host = SYSTEM_TEST_HOST,
        .port = CurrentPort,
        .clusterVersion = 0,
        .identity = 1,
        .hmacKey = hmacArray,
        .clusterVersion = SESSION_CLUSTER_VERSION,
        .identity = SESSION_IDENTITY,
    };
    KineticSession* session;
    const char* hmacKey = "asdfasdf";
    strncpy((char*)sessionConfig.keyData, hmacKey, strlen(hmacKey));
    sessionConfig.hmacKey = ByteArray_Create(sessionConfig.keyData, strlen(hmacKey));
    strncpy((char*)sessionConfig.host, CurrentHost, sizeof(sessionConfig.host)-1);
    status = KineticClient_CreateSession(&sessionConfig, client, &session);
    TEST_ASSERT_EQUAL_KineticStatus(KINETIC_STATUS_SUCCESS, status);

@@ -76,56 +88,51 @@ static void child_task(void) {
    ByteBuffer_AppendDummyData(&test_data, test_data.array.len);
    uint8_t tag_data[] = {0x00, 0x01, 0x02, 0x03};
    ByteBuffer tag = ByteBuffer_Create(tag_data, sizeof(tag_data), sizeof(tag_data));
    uint64_t keys[num_ops];
    KineticEntry entries[num_ops];
    uint64_t r = rand();
    for (uint32_t put = 0; put < num_ops; put++) {
        keys[put] = put | (r << 16);
    }

    // Measure PUT performance
    {
    KineticEntry* entries = calloc(num_ops, sizeof(KineticEntry));
    KineticCompletionClosure* closures = calloc(num_ops, sizeof(KineticCompletionClosure));
    struct key_struct* keyValues = calloc(num_ops, sizeof(struct key_struct));
    ByteBuffer *keys = calloc(num_ops, sizeof(ByteBuffer));
    OpStatus put_statuses[num_ops];
        for (size_t i = 0; i < num_ops; i++) {

    for (uint32_t i = 0; i < num_ops; i++) {
        keys[i] = ByteBuffer_CreateAndAppendFormattedCString(
            &keyValues[i].data, sizeof(struct key_struct), "%08u", (unsigned int)i);

        KineticSynchronization sync = (i == num_ops - 1)
            ? KINETIC_SYNCHRONIZATION_FLUSH
            : KINETIC_SYNCHRONIZATION_WRITEBACK;

        entries[i] = (KineticEntry) {
            .key = keys[i],
            .tag = tag,
            .algorithm = KINETIC_ALGORITHM_SHA3,
            .value = test_data,
            .synchronization = sync,
            .force = true,
        };
            
        put_statuses[i] = (OpStatus){
            .sem = KineticSemaphore_Create(),
            .status = KINETIC_STATUS_INVALID,
        };

        closures[i] = (KineticCompletionClosure) {
            .callback = op_finished,
            .clientData = &put_statuses[i],
        };
    }

    // Measure PUT performance
    {
        struct timeval start_time;
        gettimeofday(&start_time, NULL);

        size_t bytes_written = 0;

        for (uint32_t put = 0; put < num_ops; put++) {
            ByteBuffer key = ByteBuffer_Create(&keys[put], sizeof(keys[put]), sizeof(keys[put]));

            KineticSynchronization sync = KINETIC_SYNCHRONIZATION_WRITEBACK;
            if ((put == num_ops - 1) || (num_ops % 7 == 0)) {
                sync = KINETIC_SYNCHRONIZATION_FLUSH;
            }

            ByteBuffer my_data = test_data;
            // my_data.array.len = rand() % KINETIC_OBJ_SIZE;
            // my_data.bytesUsed = my_data.array.len;

            entries[put] = (KineticEntry) {
                .key = key,
                .tag = tag,
                .algorithm = KINETIC_ALGORITHM_SHA1,
                .value = my_data,
                .synchronization = sync,
            };

        for (uint32_t i = 0; i < num_ops; i++) {
            KineticStatus status = KineticClient_Put(
                session,
                &entries[put],
                &(KineticCompletionClosure) {
                    .callback = op_finished,
                    .clientData = &put_statuses[put],
                }
            );
                session, &entries[i], &closures[i]);

            if (status != KINETIC_STATUS_SUCCESS) {
                char msg[128];
@@ -134,7 +141,7 @@ static void child_task(void) {
                TEST_FAIL_MESSAGE(msg);
            }

            bytes_written += my_data.bytesUsed;
            bytes_written += test_data.bytesUsed;
        }

        for (size_t i = 0; i < num_ops; i++)
@@ -179,21 +186,19 @@ static void child_task(void) {
        struct timeval start_time;
        gettimeofday(&start_time, NULL);

        for (uint32_t get = 0; get < num_ops; get++) {
            ByteBuffer key = ByteBuffer_Create(&keys[get], sizeof(keys[get]), sizeof(keys[get]));

            entries[get] = (KineticEntry) {
                .key = key,
        for (uint32_t i = 0; i < num_ops; i++) {
            entries[i] = (KineticEntry) {
                .key = keys[i],
                .tag = tag,
                .value = test_get_datas[get],
                .value = test_get_datas[i],
            };

            KineticStatus status = KineticClient_Get(
                session,
                &entries[get],
                &entries[i],
                &(KineticCompletionClosure) {
                    .callback = op_finished,
                    .clientData = &get_statuses[get],
                    .clientData = &get_statuses[i],
                }
            );

@@ -262,26 +267,14 @@ static void child_task(void) {
        struct timeval start_time;
        gettimeofday(&start_time, NULL);

        for (uint32_t del = 0; del < num_ops; del++) {
            ByteBuffer key = ByteBuffer_Create(&keys[del], sizeof(keys[del]), sizeof(keys[del]));

            KineticSynchronization sync = (del == num_ops - 1)
                ? KINETIC_SYNCHRONIZATION_FLUSH
                : KINETIC_SYNCHRONIZATION_WRITEBACK;

            entries[del] = (KineticEntry) {
                .key = key,
                .tag = tag,
                .synchronization = sync,
                .force = true,
            };
        for (uint32_t i = 0; i < num_ops; i++) {

            KineticStatus status = KineticClient_Delete(
                session,
                &entries[del],
                &entries[i],
                &(KineticCompletionClosure) {
                    .callback = op_finished,
                    .clientData = &delete_statuses[del],
                    .clientData = &delete_statuses[i],
                }
            );

@@ -318,6 +311,10 @@ static void child_task(void) {

    // Shutdown client connection and cleanup
    ByteBuffer_Free(test_data);
    free(keys);
    free(keyValues);
    free(entries);
    free(closures);
    status = KineticClient_DestroySession(session);
    TEST_ASSERT_EQUAL_MESSAGE(KINETIC_STATUS_SUCCESS, status, "Error when disconnecting client!");
    KineticClient_Shutdown(client);
@@ -336,16 +333,17 @@ static void op_finished(KineticCompletionData* kinetic_data, void* clientData)
void test_kinetic_client_stress_multiple_processes(void)
{
    srand(time(NULL));
    CurrentPort = KINETIC_PORT;
    for (NumDaemons = 0; NumDaemons < MaxDaemons; NumDaemons++) {
        pid_t pid = fork();
        if (pid == 0) {
            CurrentHost = Hosts[NumDaemons];
            CurrentPort = Ports[NumDaemons];
            LOGF0("\nStarting kinetic daemon on port %d", CurrentPort);
            child_task();
        } else if (pid == -1) {
            err(1, "fork");
        } else {
            CurrentPort++;
            // CurrentPort++;
        }
    }