Commit 382b3fcd authored by Scott Vokes's avatar Scott Vokes
Browse files

Set sequence ID immediately after locking.

parent 0e227fd7
Loading
Loading
Loading
Loading
+12 −9
Original line number Diff line number Diff line
@@ -45,7 +45,9 @@ KineticStatus KineticOperation_SendRequest(KineticOperation* const operation)
    assert(operation->connection != NULL);
    assert(operation->request != NULL);

    KineticCountingSemaphore_Take(operation->connection->outstandingOperations);
    KineticCountingSemaphore * const sem = operation->connection->outstandingOperations;

    KineticCountingSemaphore_Take(sem);
    KineticStatus status = KineticOperation_SendRequestInner(operation);
    if (status != KINETIC_STATUS_SUCCESS)
    {
@@ -55,12 +57,12 @@ KineticStatus KineticOperation_SendRequest(KineticOperation* const operation)
            free(request->message.message.commandBytes.data);
            request->message.message.commandBytes.data = NULL;
        }
        KineticCountingSemaphore_Give(operation->connection->outstandingOperations);
        KineticCountingSemaphore_Give(sem);
    }
    return status;
}

// TODO: Asses refactoring this methog by disecting out Operation and relocate to kinetic_pdu
// TODO: Assess refactoring this method by dissecting out Operation and relocate to kinetic_pdu
static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const operation)
{
    LOGF3("\nSending PDU via fd=%d", operation->connection->socket);
@@ -73,6 +75,12 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o

    // Acquire lock
    pthread_mutex_lock(sendMutex);
    KineticSession *session = operation->connection->pSession;

    // Populate sequence count and increment it for next operation
    assert(request->message.header.sequence == KINETIC_SEQUENCE_NOT_YET_BOUND);
    int seq_id = ATOMIC_FETCH_AND_INCREMENT(&operation->connection->sequence);
    request->message.header.sequence = seq_id;

    // Pack the command, if available
    size_t expectedLen = KineticProto_command__get_packed_size(&request->message.command);
@@ -105,7 +113,7 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o

            // Populate the HMAC for the protobuf
            KineticHMAC_Init(&hmac, KINETIC_PROTO_COMMAND_SECURITY_ACL_HMACALGORITHM_HmacSHA1);
            KineticHMAC_Populate(&hmac, proto, operation->connection->pSession->config.hmacKey);
            KineticHMAC_Populate(&hmac, proto, session->config.hmacKey);
        } break;
    default:
        break;
@@ -133,11 +141,6 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o
        goto cleanup;
    }

    // Populate sequence count and increment it for next operation
    assert(request->message.header.sequence == KINETIC_SEQUENCE_NOT_YET_BOUND);
    int seq_id = ATOMIC_FETCH_AND_INCREMENT(&operation->connection->sequence);
    request->message.header.sequence = seq_id;

    LOGF1("[PDU TX] pdu: 0x%0llX, op: 0x%llX, session: 0x%llX, bus: 0x%llX, fd: %6d, seq: %5lld, protoLen: %4u, valueLen: %u",
        operation->request, operation, operation->connection->pSession, operation->connection->messageBus,
        operation->connection->socket, request->message.header.sequence, header.protobufLength, header.valueLength);