Commit fb2c0fb2 authored by Greg Williams's avatar Greg Williams
Browse files

Completed changeover to always operate on a single kinetic entry so that buffer state is retained

parent 6c96538c
Loading
Loading
Loading
Loading
+4 −4
Original line number Diff line number Diff line
@@ -66,12 +66,12 @@ static KineticStatus KineticClient_ExecuteOperation(KineticOperation* operation)
    KineticStatus status = KINETIC_STATUS_INVALID;

    LOGF1("Executing operation: 0x%llX", operation);
    if (operation->destEntry != NULL &&
        operation->destEntry->value.array.data != NULL &&
        operation->destEntry->value.bytesUsed > 0)
    if (operation->entry != NULL &&
        operation->entry->value.array.data != NULL &&
        operation->entry->value.bytesUsed > 0)
    {
        LOGF1("  Sending PDU (0x%0llX) w/value (%zu bytes)",
            operation->request, operation->destEntry->value.bytesUsed);
            operation->request, operation->entry->value.bytesUsed);
    }
    else {
        LOGF1("  Sending PDU (0x%0llX) w/o value", operation->request);
+2 −2
Original line number Diff line number Diff line
@@ -106,7 +106,7 @@ static void* KineticConnection_Worker(void* thread_arg)
                        size_t valueLength = KineticPDU_GetValueLength(response);
                        if (valueLength > 0) {
                            status = KineticPDU_ReceiveValue(op->connection->socket,
                                &op->destEntry->value, valueLength);
                                &op->entry->value, valueLength);
                        }

                        // Call operation-specific callback, if configured
@@ -222,7 +222,7 @@ KineticStatus KineticConnection_Disconnect(KineticConnection* const connection)
    // Shutdown the worker thread
    KineticStatus status = KINETIC_STATUS_SUCCESS;
    connection->thread.abortRequested = true;
    LOG0("\nSent abort request to worker thread!\n");
    LOG2("\nSent abort request to worker thread!\n");
    int pthreadStatus = pthread_join(connection->threadID, NULL);
    if (pthreadStatus != 0) {
        char errMsg[256];
+18 −18
Original line number Diff line number Diff line
@@ -66,8 +66,8 @@ KineticStatus KineticOperation_SendRequest(KineticOperation* const operation)
    // Configure PDU header length fields
    request->header.versionPrefix = 'F';
    request->header.protobufLength = KineticProto_Message__get_packed_size(request->proto);
    if (operation->destEntry != NULL && operation->sendValue) {
        request->header.valueLength = operation->destEntry->value.bytesUsed;
    if (operation->entry != NULL && operation->sendValue) {
        request->header.valueLength = operation->entry->value.bytesUsed;
    }
    else
    {
@@ -99,8 +99,8 @@ KineticStatus KineticOperation_SendRequest(KineticOperation* const operation)

    // Send the value/payload, if specified
    if (operation->valueEnabled && operation->sendValue) {
        LOGF1("Sending PDU Value Payload (%zu bytes)", operation->destEntry->value.bytesUsed);
        status = KineticSocket_Write(request->connection->socket, &operation->destEntry->value);
        LOGF1("Sending PDU Value Payload (%zu bytes)", operation->entry->value.bytesUsed);
        status = KineticSocket_Write(request->connection->socket, &operation->entry->value);
        if (status != KINETIC_STATUS_SUCCESS) {
            LOG0("Failed to send PDU value payload!");
            return status;
@@ -248,7 +248,7 @@ KineticStatus KineticOperation_PutCallback(KineticOperation* operation)
    assert(operation->entryEnabled);

    // Propagate newVersion to dbVersion in metadata, if newVersion specified
    KineticEntry* entry = operation->destEntry;
    KineticEntry* entry = operation->entry;
    if (entry->newVersion.array.data != NULL && entry->newVersion.array.len > 0) {
        // If both buffers supplied, copy newVersion into dbVersion, and clear newVersion
        if (entry->dbVersion.array.data != NULL && entry->dbVersion.array.len > 0) {
@@ -275,12 +275,12 @@ void KineticOperation_BuildPut(KineticOperation* const operation,

    operation->request->protoData.message.command.header->messageType = KINETIC_PROTO_COMMAND_MESSAGE_TYPE_PUT;
    operation->request->protoData.message.command.header->has_messageType = true;
    operation->destEntry = entry;
    operation->entry = entry;

    KineticMessage_ConfigureKeyValue(&operation->request->protoData.message, operation->destEntry);
    KineticMessage_ConfigureKeyValue(&operation->request->protoData.message, operation->entry);

    operation->entryEnabled = true;
    operation->valueEnabled = !operation->destEntry->metadataOnly;
    operation->valueEnabled = !operation->entry->metadataOnly;
    operation->sendValue = true;
    operation->callback = &KineticOperation_PutCallback;
}
@@ -296,12 +296,12 @@ KineticStatus KineticOperation_GetCallback(KineticOperation* operation)
    // Update the entry upon success
    KineticProto_Command_KeyValue* keyValue = KineticPDU_GetKeyValue(operation->response);
    if (keyValue != NULL) {
        if (!Copy_KineticProto_Command_KeyValue_to_KineticEntry(keyValue, operation->destEntry)) {
        if (!Copy_KineticProto_Command_KeyValue_to_KineticEntry(keyValue, operation->entry)) {
            return KINETIC_STATUS_BUFFER_OVERRUN;
        }
    }
    // if (operation->destEntry != NULL) {
        // operation->destEntry->value.bytesUsed = operation->destEntry->value.bytesUsed;
    // if (operation->entry != NULL) {
        // operation->entry->value.bytesUsed = operation->entry->value.bytesUsed;
    // }
    return KINETIC_STATUS_SUCCESS;
}
@@ -314,12 +314,12 @@ void KineticOperation_BuildGet(KineticOperation* const operation,

    operation->request->protoData.message.command.header->messageType = KINETIC_PROTO_COMMAND_MESSAGE_TYPE_GET;
    operation->request->protoData.message.command.header->has_messageType = true;
    operation->destEntry = entry;
    operation->entry = entry;

    KineticMessage_ConfigureKeyValue(&operation->request->protoData.message, entry);

    if (operation->destEntry->value.array.data != NULL) {
        ByteBuffer_Reset(&operation->destEntry->value);
    if (operation->entry->value.array.data != NULL) {
        ByteBuffer_Reset(&operation->entry->value);
    }

    operation->entryEnabled = true;
@@ -346,12 +346,12 @@ void KineticOperation_BuildDelete(KineticOperation* const operation,

    operation->request->protoData.message.command.header->messageType = KINETIC_PROTO_COMMAND_MESSAGE_TYPE_DELETE;
    operation->request->protoData.message.command.header->has_messageType = true;
    operation->destEntry = entry;
    operation->entry = entry;

    KineticMessage_ConfigureKeyValue(&operation->request->protoData.message, operation->destEntry);
    KineticMessage_ConfigureKeyValue(&operation->request->protoData.message, operation->entry);

    if (operation->destEntry->value.array.data != NULL) {
        ByteBuffer_Reset(&operation->destEntry->value);
    if (operation->entry->value.array.data != NULL) {
        ByteBuffer_Reset(&operation->entry->value);
    }

    operation->entryEnabled = true;
+1 −2
Original line number Diff line number Diff line
@@ -267,8 +267,7 @@ struct _KineticOperation {
    bool valueEnabled;
    bool sendValue;
    bool receiveComplete;
    // KineticEntry entry;
    KineticEntry* destEntry;
    KineticEntry* entry;
    ByteBufferArray* buffers;
    KineticOperationCallback callback;
    KineticCompletionClosure closure;
+12 −15
Original line number Diff line number Diff line
@@ -59,7 +59,7 @@ struct kinetic_thread_arg {

void setUp(void)
{
    KineticClient_Init(NULL, 0);
    KineticClient_Init("stdout", 0);
}

void tearDown(void)
@@ -197,13 +197,8 @@ void test_kinetic_client_should_be_able_to_store_an_arbitrarily_large_binary_obj
        kt_arg = malloc(sizeof(struct kinetic_thread_arg) * NUM_COPIES);
        TEST_ASSERT_NOT_NULL_MESSAGE(kt_arg, "kinetic_thread_arg malloc failed");

        // Establish all of the connection first, so their session can all get initialized first
        for (int i = 0; i < NUM_COPIES; i++) {

            printf("  *** Overlapped PUT operations (writing copy %d of %d)"
                   " on IP (iteration %d of %d):%s\n",
                   i + 1, NUM_COPIES, iteration + 1,
                   MAX_ITERATIONS, sessionConfig.host);

            // Establish connection
            TEST_ASSERT_EQUAL_KineticStatus(
                KINETIC_STATUS_SUCCESS,
@@ -232,6 +227,15 @@ void test_kinetic_client_should_be_able_to_store_an_arbitrarily_large_binary_obj
                .algorithm = KINETIC_ALGORITHM_SHA1,
                .value = valBuf,
            };
        }
        sleep(2); // Give a generous chunk of time for session initialized by the target device

        // Write all of the copies simultaneously (overlapped)
        for (int i = 0; i < NUM_COPIES; i++) {
            printf("  *** Overlapped PUT operations (writing copy %d of %d)"
                   " on IP (iteration %d of %d):%s\n",
                   i + 1, NUM_COPIES, iteration + 1,
                   MAX_ITERATIONS, sessionConfig.host);

            // Spawn the thread
            kt_arg[i].sessionHandle = kinetic_client[i];
@@ -279,14 +283,7 @@ void test_kinetic_client_should_be_able_to_store_an_arbitrarily_large_binary_obj
    printf("Min write bandwidth:      %.2f (MB/sec)\n", minBandwidth);
    printf("Max write bandwidth:      %.2f (MB/sec)\n", maxBandwidth);
    printf("Mean write bandwidth:     %.2f (MB/sec)\n", meanBandwidth);
    // printf("Mean aggregate bandwidth: %.2f (MB/sec)\n", meanAggregateBandwidth);
    printf("Mean aggregate bandwidth: %.2f (MB/sec)\n", meanAggregateBandwidth);
    printf("\n");
    fflush(stdout);

    TEST_IGNORE_MESSAGE("TODO: Need to fix aggregate PUT bandwidth calculation!");
}

void test_KineticClient_GetKeyRange_needs_to_work(void)
{
    TEST_IGNORE_MESSAGE("TODO: Need to fix crash in GETKEYRANGE!");
}
Loading