Commit aae9a581 authored by Greg Williams's avatar Greg Williams
Browse files

Added test_system_stress_single_session_threaded to cover Ceph use case of...

Added test_system_stress_single_session_threaded to cover Ceph use case of multiple threads using the same session simultaneously. Renamed old stress test to test_system_stress_session_per_thread for clarity.
Cleaned up error reporting and output in stress tests.
Added sequence count to TX/RX messages, though (FIXME:) sequence count is not yet populated at RX log point.
parent c69b1eeb
Loading
Loading
Loading
Loading
+4 −4
Original line number Diff line number Diff line
@@ -181,7 +181,7 @@ void KineticController_HandleUnexecpectedResponse(void *msg,
            response->command->header != NULL &&
            response->command->header->has_connectionID)
        {
            LOGF1("[PDU RX UNSOLICTED] pdu: 0x%0llX, session: 0x%llX, bus: 0x%llX, "
            LOGF1("[PDU RX UNSOLICITED] pdu: 0x%0llX, session: 0x%llX, bus: 0x%llX, "
                "protoLen: %u, valueLen: %u",
                response, &connection->session, connection->messageBus,
                response->header.protobufLength, response->header.valueLength);
@@ -192,7 +192,7 @@ void KineticController_HandleUnexecpectedResponse(void *msg,
                connection->connectionID);
        }
        else {
            LOG0("WARNING: Unsolicited PDU in invalid. Does not specify connection ID!");
            LOG0("WARNING: Unsolicited PDU is invalid. Does not specify connection ID!");
        }
        KineticAllocator_FreeKineticResponse(response);
    }
@@ -225,9 +225,9 @@ void KineticController_HandleExpectedResponse(bus_msg_result_t *res, void *udata
        }

        LOGF1("[PDU RX] pdu: 0x%0llX, op: 0x%llX, session: 0x%llX, bus: 0x%llX, "
            "protoLen: %u, valueLen: %u, status: %s",
            "seq: %5lld, protoLen: %4u, valueLen: %u, status: %s",
            response, op, &op->connection->session, op->connection->messageBus,
            response->header.protobufLength, response->header.valueLength,
            response->command->header->sequence, response->header.protobufLength, response->header.valueLength,
            Kinetic_GetStatusDescription(status));
    }
    else
+7 −6
Original line number Diff line number Diff line
@@ -57,9 +57,10 @@ KineticStatus KineticOperation_SendRequest(KineticOperation* const operation)
    return status;
}

// TODO: Asses refactoring this methog by disecting out Operation and relocate to kinetic_pdu
static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const operation)
{
    LOGF2("\nSending PDU via fd=%d", operation->connection->messageBus);
    LOGF3("\nSending PDU via fd=%d", operation->connection->messageBus);

    KineticPDU* request = operation->request;
    KineticProto_Message* proto = &operation->request->message.message;
@@ -78,7 +79,7 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o
    assert(packedLen == expectedLen);
    request->message.message.commandBytes.len = packedLen;
    request->message.message.has_commandBytes = true;
    KineticLogger_LogByteArray(2, "commandBytes", (ByteArray){
    KineticLogger_LogByteArray(3, "commandBytes", (ByteArray){
        .data = request->message.message.commandBytes.data,
        .len = request->message.message.commandBytes.len,
    });
@@ -125,11 +126,11 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o
        return KINETIC_STATUS_BUFFER_OVERRUN;
    }

    LOGF1("[PDU TX] pdu: 0x%0llX, op: 0x%llX, session: 0x%llX, bus: 0x%llX, protoLen: %u, valueLen: %u",
        operation->request, operation, &operation->connection->session,
        operation->connection->messageBus, header.protobufLength, header.valueLength);
    LOGF1("[PDU TX] pdu: 0x%0llX, op: 0x%llX, session: 0x%llX, bus: 0x%llX, seq: %5lld, protoLen: %4u, valueLen: %u",
        operation->request, operation, &operation->connection->session, operation->connection->messageBus,
        request->message.header.sequence, header.protobufLength, header.valueLength);

    KineticLogger_LogHeader(2, &header);
    KineticLogger_LogHeader(3, &header);

    uint32_t nboProtoLength = KineticNBO_FromHostU32(header.protobufLength);
    uint32_t nboValueLength = KineticNBO_FromHostU32(header.valueLength);
+0 −2
Original line number Diff line number Diff line
@@ -145,9 +145,7 @@ typedef enum {

// Kinetic PDU
struct _KineticPDU {
    // Message associated with this PDU instance
    KineticMessage message;

    KineticProto_Command* command;
};

+44 −45
Original line number Diff line number Diff line
@@ -42,9 +42,6 @@ void run_throghput_tests(KineticClient * client, size_t num_ops, size_t value_si
        "Count:      %zu entries",
        value_size, num_ops );

    ByteBuffer test_data = ByteBuffer_Malloc(value_size);
    ByteBuffer_AppendDummyData(&test_data, test_data.array.len);

    // Initialize kinetic-c and configure sessions
    const char HmacKeyString[] = "asdfasdf";
    KineticSession session = {
@@ -56,22 +53,23 @@ void run_throghput_tests(KineticClient * client, size_t num_ops, size_t value_si
            .hmacKey = ByteArray_CreateWithCString(HmacKeyString),
        },
    };

    // Establish connection
    KineticStatus status = KineticClient_CreateConnection(&session, client);
    if (status != KINETIC_STATUS_SUCCESS) {
        fprintf(stderr, "Failed connecting to the Kinetic device w/status: %s\n",
            Kinetic_GetStatusDescription(status));
        TEST_FAIL();
        char msg[128];
        sprintf(msg, "Failed connecting to the Kinetic device w/status: %s\n", Kinetic_GetStatusDescription(status));
        TEST_FAIL_MESSAGE(msg);
    }

    // Generate test entry data
    ByteBuffer test_data = ByteBuffer_Malloc(value_size);
    ByteBuffer_AppendDummyData(&test_data, test_data.array.len);
    uint8_t tag_data[] = {0x00, 0x01, 0x02, 0x03};
    ByteBuffer tag = ByteBuffer_Create(tag_data, sizeof(tag_data), sizeof(tag_data));

    uint64_t r = rand();

    uint64_t keys[num_ops];
    KineticEntry entries[num_ops];

    for (uint32_t put = 0; put < num_ops; put++) {
        keys[put] = put | (r << 16);
    }
@@ -114,25 +112,26 @@ void run_throghput_tests(KineticClient * client, size_t num_ops, size_t value_si
            );

            if (status != KINETIC_STATUS_SUCCESS) {
                fprintf(stderr, "PUT failed w/status: %s\n", Kinetic_GetStatusDescription(status));
                TEST_FAIL();
                char msg[128];
                sprintf(msg, "PUT failed w/status: %s", Kinetic_GetStatusDescription(status));
                TEST_FAIL_MESSAGE(msg);
            }
        }

        printf("Waiting for PUTs to finish\n");
        LOG0("Waiting for PUTs to finish...");

        for (size_t i = 0; i < num_ops; i++)
        {
            KineticSemaphore_WaitForSignalAndDestroy(put_statuses[i].sem);
            if (put_statuses[i].status != KINETIC_STATUS_SUCCESS) {
                fprintf(stderr, "PUT failed w/status: %s\n", Kinetic_GetStatusDescription(put_statuses[i].status));
                TEST_FAIL();
                char msg[128];
                sprintf(msg, "PUT failed w/status: %s\n", Kinetic_GetStatusDescription(put_statuses[i].status));
                TEST_FAIL_MESSAGE(msg);
            }
        }

        struct timeval stop_time;
        gettimeofday(&stop_time, NULL);

        size_t bytes_written = num_ops * test_data.array.len;
        int64_t elapsed_us = ((stop_time.tv_sec - start_time.tv_sec) * 1000000)
            + (stop_time.tv_usec - start_time.tv_usec);
@@ -148,8 +147,6 @@ void run_throghput_tests(KineticClient * client, size_t num_ops, size_t value_si
            bytes_written / 1024.0f,
            elapsed_ms / 1000.0f,
            bandwidth);


    }

    // Measure GET performance
@@ -190,30 +187,30 @@ void run_throghput_tests(KineticClient * client, size_t num_ops, size_t value_si
            );

            if (status != KINETIC_STATUS_SUCCESS) {
                fprintf(stderr, "GET failed w/status: %s\n", Kinetic_GetStatusDescription(status));
                TEST_FAIL();
                char msg[128];
                sprintf(msg, "GET failed w/status: %s", Kinetic_GetStatusDescription(status));
                TEST_FAIL_MESSAGE(msg);
            }
        }

        printf("Waiting for GETs to finish\n");

        LOG0("Waiting for GETs to finish...");
        size_t bytes_read = 0;
        for (size_t i = 0; i < num_ops; i++)
        {
            KineticSemaphore_WaitForSignalAndDestroy(get_statuses[i].sem);
            if (get_statuses[i].status != KINETIC_STATUS_SUCCESS) {

                fprintf(stderr, "GET failed w/status: %s\n", Kinetic_GetStatusDescription(get_statuses[i].status));
                TEST_FAIL();
                char msg[128];
                sprintf(msg, "GET failed w/status: %s", Kinetic_GetStatusDescription(get_statuses[i].status));
                TEST_FAIL_MESSAGE(msg);
            }
            else
            {
                bytes_read += entries[i].value.bytesUsed;
            }
        }

        struct timeval stop_time;
        gettimeofday(&stop_time, NULL);

        int64_t elapsed_us = ((stop_time.tv_sec - start_time.tv_sec) * 1000000)
            + (stop_time.tv_usec - start_time.tv_usec);
        float elapsed_ms = elapsed_us / 1000.0f;
@@ -229,8 +226,7 @@ void run_throghput_tests(KineticClient * client, size_t num_ops, size_t value_si
            elapsed_ms / 1000.0f,
            bandwidth);

        for (size_t i = 0; i < num_ops; i++)
        {
        for (size_t i = 0; i < num_ops; i++) {
            ByteBuffer_Free(test_get_datas[i]);
        }
    }
@@ -272,25 +268,25 @@ void run_throghput_tests(KineticClient * client, size_t num_ops, size_t value_si
            );

            if (status != KINETIC_STATUS_SUCCESS) {
                fprintf(stderr, "DELETE failed w/status: %s\n", Kinetic_GetStatusDescription(status));
                TEST_FAIL();
                char msg[128];
                sprintf(msg, "DELETE failed w/status: %s\n", Kinetic_GetStatusDescription(status));
                TEST_FAIL_MESSAGE(msg);
            }
        }

        printf("Waiting for DELETEs to finish\n");

        for (size_t i = 0; i < num_ops; i++)
        {
        for (size_t i = 0; i < num_ops; i++) {
            KineticSemaphore_WaitForSignalAndDestroy(delete_statuses[i].sem);
            if (delete_statuses[i].status != KINETIC_STATUS_SUCCESS) {

                fprintf(stderr, "DELETE failed w/status: %s\n", Kinetic_GetStatusDescription(delete_statuses[i].status));
                TEST_FAIL();
                char msg[128];
                sprintf(msg, "DELETE failed w/status: %s", Kinetic_GetStatusDescription(delete_statuses[i].status));
                TEST_FAIL_MESSAGE(msg);
            }
        }

        struct timeval stop_time;
        gettimeofday(&stop_time, NULL);

        int64_t elapsed_us = ((stop_time.tv_sec - start_time.tv_sec) * 1000000)
            + (stop_time.tv_usec - start_time.tv_usec);
        float elapsed_ms = elapsed_us / 1000.0f;
@@ -324,8 +320,7 @@ typedef struct {
static void* test_thread(void* test_params)
{
    TestParams * params = test_params;
    for (uint32_t i = 0; i < params->thread_iters; i++)
    {
    for (uint32_t i = 0; i < params->thread_iters; i++) {
        run_throghput_tests(params->client, params->num_ops, params->obj_size);
    }
    return NULL;
@@ -333,15 +328,20 @@ static void* test_thread(void* test_params)

void run_tests(KineticClient * client)
{
    TestParams params[] = { { .client = client, .num_ops = 100, .obj_size = KINETIC_OBJ_SIZE, .thread_iters = 2 }
                          , { .client = client, .num_ops = 1000, .obj_size = 120, .thread_iters = 2 }
                          , { .client = client, .num_ops = 500, .obj_size = 70000, .thread_iters = 2 } };
                          // , { .client = client, .num_ops = 1000, .obj_size = 120, .thread_iters = 2 }
                          // , { .client = client, .num_ops = 100, .obj_size = KINETIC_OBJ_SIZE, .thread_iters = 2 } };
    TestParams params[] = { 
        { .client = client, .num_ops = 100, .obj_size = KINETIC_OBJ_SIZE, .thread_iters = 2 },
        { .client = client, .num_ops = 1000, .obj_size = 120, .thread_iters = 2 },
        { .client = client, .num_ops = 500, .obj_size = 70000, .thread_iters = 2 },
        // { .client = client, .num_ops = 1000, .obj_size = 120, .thread_iters = 5 },
        // { .client = client, .num_ops = 1000, .obj_size = 120, .thread_iters = 5 },
        // { .client = client, .num_ops = 1000, .obj_size = 120, .thread_iters = 2 },
        // { .client = client, .num_ops = 100, .obj_size = KINETIC_OBJ_SIZE, .thread_iters = 2 },
        // { .client = client, .num_ops = 1000, .obj_size = 120, .thread_iters = 5 },
        // { .client = client, .num_ops = 100, .obj_size = KINETIC_OBJ_SIZE, .thread_iters = 2 },
    };
    pthread_t thread_id[NUM_ELEMENTS(params)];

    for (uint32_t i = 0; i < NUM_ELEMENTS(params); i ++)
    {
    for (uint32_t i = 0; i < NUM_ELEMENTS(params); i ++) {
        int pthreadStatus = pthread_create(&thread_id[i], NULL, test_thread, &params[i]);
        TEST_ASSERT_EQUAL_MESSAGE(0, pthreadStatus, "pthread create failed");
    }
@@ -357,7 +357,6 @@ void test_kinetic_client_throughput_for_small_sized_objects(void)
{
    srand(time(NULL));
    for (uint32_t i = 0; i < 2; i++) {

        KineticClientConfig config = {
            .logFile = "stdout",
            .logLevel = 0,
+388 −0

File added.

Preview size limit exceeded, changes collapsed.