Commit c55b7454 authored by Greg Williams's avatar Greg Williams
Browse files

Fixed regression causing entry values to not get updated properly for PUT/GET operations

parent 08a0a289
Loading
Loading
Loading
Loading
+3 −3
Original line number Diff line number Diff line
@@ -65,11 +65,11 @@ static KineticStatus KineticClient_ExecuteOperation(KineticOperation* operation)
    KineticStatus status = KINETIC_STATUS_INVALID;

    LOGF1("Executing operation: 0x%llX", operation);
    if (operation->entry.value.array.data != NULL
        && operation->entry.value.bytesUsed > 0)
    if (operation->destEntry->value.array.data != NULL
        && operation->destEntry->value.bytesUsed > 0)
    {
        LOGF1("  Sending PDU (0x%0llX) w/value (%zu bytes)",
            operation->request, operation->entry.value.bytesUsed);
            operation->request, operation->destEntry->value.bytesUsed);
    }
    else {
        LOGF1("  Sending PDU (0x%0llX) w/o value", operation->request);
+1 −1
Original line number Diff line number Diff line
@@ -106,7 +106,7 @@ static void* KineticConnection_Worker(void* thread_arg)
                        size_t valueLength = KineticPDU_GetValueLength(response);
                        if (valueLength > 0) {
                            status = KineticPDU_ReceiveValue(op->connection->socket,
                                &op->entry.value, valueLength);
                                &op->destEntry->value, valueLength);
                        }

                        // Call operation-specific callback, if configured
+2 −1
Original line number Diff line number Diff line
@@ -32,7 +32,8 @@ void KineticMessage_Init(KineticMessage* const message)
    if ((_entry)->_name.array.data != NULL \
        && (_entry)->_name.array.len > 0 \
        && (_entry)->_name.bytesUsed > 0 \
        && (_entry)->_name.bytesUsed <= (_entry)->_name.array.len) { \
        && (_entry)->_name.bytesUsed <= (_entry)->_name.array.len) \
    { \
        (_field)._name.data = (_entry)->_name.array.data; \
        (_field)._name.len = (_entry)->_name.bytesUsed; \
        (_field).has_ ## _name = true; \
+12 −13
Original line number Diff line number Diff line
@@ -66,7 +66,7 @@ KineticStatus KineticOperation_SendRequest(KineticOperation* const operation)
    // Configure PDU header length fields
    request->header.versionPrefix = 'F';
    request->header.protobufLength = KineticProto_Message__get_packed_size(request->proto);
    request->header.valueLength = (operation->sendValue) ? operation->entry.value.bytesUsed : 0;
    request->header.valueLength = (operation->sendValue) ? operation->destEntry->value.bytesUsed : 0;
    KineticLogger_LogHeader(1, &request->header);

    // Create NBO copy of header for sending
@@ -93,8 +93,8 @@ KineticStatus KineticOperation_SendRequest(KineticOperation* const operation)

    // Send the value/payload, if specified
    if (operation->valueEnabled && operation->sendValue) {
        LOGF1("Sending PDU Value Payload (%zu bytes)", operation->entry.value.bytesUsed);
        status = KineticSocket_Write(request->connection->socket, &operation->entry.value);
        LOGF1("Sending PDU Value Payload (%zu bytes)", operation->destEntry->value.bytesUsed);
        status = KineticSocket_Write(request->connection->socket, &operation->destEntry->value);
        if (status != KINETIC_STATUS_SUCCESS) {
            LOG0("Failed to send PDU value payload!");
            return status;
@@ -226,7 +226,6 @@ void KineticOperation_BuildNoop(KineticOperation* const operation)
    KineticConnection_IncrementSequence(operation->connection);
    operation->request->protoData.message.command.header->messageType = KINETIC_PROTO_COMMAND_MESSAGE_TYPE_NOOP;
    operation->request->protoData.message.command.header->has_messageType = true;
    operation->entry.value = BYTE_BUFFER_NONE;

    operation->entryEnabled = false;
    operation->valueEnabled = false;
@@ -243,7 +242,7 @@ KineticStatus KineticOperation_PutCallback(KineticOperation* operation)
    assert(operation->entryEnabled);

    // Propagate newVersion to dbVersion in metadata, if newVersion specified
    KineticEntry* entry = &operation->entry;
    KineticEntry* entry = operation->destEntry;
    if (entry->newVersion.array.data != NULL && entry->newVersion.array.len > 0) {
        // If both buffers supplied, copy newVersion into dbVersion, and clear newVersion
        if (entry->dbVersion.array.data != NULL && entry->dbVersion.array.len > 0) {
@@ -272,10 +271,10 @@ void KineticOperation_BuildPut(KineticOperation* const operation,
    operation->request->protoData.message.command.header->has_messageType = true;
    operation->destEntry = entry;

    KineticMessage_ConfigureKeyValue(&operation->request->protoData.message, entry);
    KineticMessage_ConfigureKeyValue(&operation->request->protoData.message, operation->destEntry);

    operation->entryEnabled = true;
    operation->valueEnabled = !entry->metadataOnly;
    operation->valueEnabled = !operation->destEntry->metadataOnly;
    operation->sendValue = true;
    operation->callback = &KineticOperation_PutCallback;
}
@@ -291,12 +290,12 @@ KineticStatus KineticOperation_GetCallback(KineticOperation* operation)
    // Update the entry upon success
    KineticProto_Command_KeyValue* keyValue = KineticPDU_GetKeyValue(operation->response);
    if (keyValue != NULL) {
        if (!Copy_KineticProto_Command_KeyValue_to_KineticEntry(keyValue, &operation->entry)) {
        if (!Copy_KineticProto_Command_KeyValue_to_KineticEntry(keyValue, operation->destEntry)) {
            return KINETIC_STATUS_BUFFER_OVERRUN;
        }
    }
    // if (operation->destEntry != NULL) {
        // operation->destEntry->value.bytesUsed = operation->entry.value.bytesUsed;
        // operation->destEntry->value.bytesUsed = operation->destEntry->value.bytesUsed;
    // }
    return KINETIC_STATUS_SUCCESS;
}
@@ -313,8 +312,8 @@ void KineticOperation_BuildGet(KineticOperation* const operation,

    KineticMessage_ConfigureKeyValue(&operation->request->protoData.message, entry);

    if (operation->entry.value.array.data != NULL) {
        ByteBuffer_Reset(&operation->entry.value);
    if (operation->destEntry->value.array.data != NULL) {
        ByteBuffer_Reset(&operation->destEntry->value);
    }

    operation->entryEnabled = true;
@@ -343,9 +342,9 @@ void KineticOperation_BuildDelete(KineticOperation* const operation,
    operation->request->protoData.message.command.header->has_messageType = true;
    operation->destEntry = entry;

    KineticMessage_ConfigureKeyValue(&operation->request->protoData.message, entry);
    KineticMessage_ConfigureKeyValue(&operation->request->protoData.message, operation->destEntry);

    if (operation->entry.value.array.data != NULL) {
    if (operation->destEntry->value.array.data != NULL) {
        ByteBuffer_Reset(&operation->destEntry->value);
    }

+1 −1
Original line number Diff line number Diff line
@@ -262,8 +262,8 @@ bool Copy_KineticProto_Command_KeyValue_to_KineticEntry(KineticProto_Command_Key

        ByteBuffer_Reset(&entry->tag);
        if (keyValue->has_tag) {
            entry->tag.bytesUsed = keyValue->tag.len;
            if (entry->tag.array.data == NULL || entry->tag.array.len < keyValue->tag.len) {
                entry->tag.bytesUsed = keyValue->tag.len;
                LOG1(" BUFFER_OVERRUN: tag");
                bufferOverflow = true;
            }
Loading