Commit 0c37d12b authored by Scott Vokes's avatar Scott Vokes
Browse files

Use atomic fetch and increment within locked region to assign sequence ID.

parent 46ea462e
Loading
Loading
Loading
Loading
+10 −4
Original line number Diff line number Diff line
@@ -34,6 +34,8 @@
#include <sys/time.h>
#include "bus.h"

#define ATOMIC_FETCH_AND_INCREMENT(P) __sync_fetch_and_add(P, 1)

static void KineticOperation_ValidateOperation(KineticOperation* operation);
static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const operation);

@@ -131,7 +133,9 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o
    }

    // Populate sequence count and increment it for next operation
    request->message.header.sequence = operation->connection->sequence++;
    assert(request->message.header.sequence == KINETIC_SEQUENCE_NOT_YET_BOUND);
    int seq_id = ATOMIC_FETCH_AND_INCREMENT(&operation->connection->sequence);
    request->message.header.sequence = seq_id;

    LOGF1("[PDU TX] pdu: 0x%0llX, op: 0x%llX, session: 0x%llX, bus: 0x%llX, fd: %6d, seq: %5lld, protoLen: %4u, valueLen: %u",
        operation->request, operation, operation->connection->pSession, operation->connection->messageBus,
@@ -172,10 +176,12 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o
    }
    assert((PDU_HEADER_LEN + header.protobufLength + header.valueLength) == offset);

    int fd = operation->connection->socket;

    if (!bus_send_request(operation->connection->messageBus, &(bus_user_msg){
        .fd       = operation->connection->socket,
        .fd       = fd,
        .type     = BUS_SOCKET_PLAIN,
        .seq_id   = request->message.header.sequence,
        .seq_id   = seq_id,
        .msg      = msg,
        .msg_size = offset,
        .cb       = KineticController_HandleExpectedResponse,
@@ -183,7 +189,7 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o
        }))
    {
        LOGF0("Failed queuing request %p for transmit on fd=%d w/seq=%lld",
            (void*)request, operation->connection->socket, (long long)request->message.header.sequence);
            (void*)request, fd, (long long)seq_id);
        status = KINETIC_STATUS_SOCKET_ERROR;
    }
    else {
+1 −1
Original line number Diff line number Diff line
@@ -439,7 +439,7 @@ void KineticMessage_HeaderInit(KineticProto_Command_Header* hdr,
        .has_connectionID = true,
        .connectionID = con->connectionID,
        .has_sequence = true,
        .sequence = con->sequence,
        .sequence = KINETIC_SEQUENCE_NOT_YET_BOUND,
    };
}

+2 −0
Original line number Diff line number Diff line
@@ -80,6 +80,8 @@ enum unpack_error {
    UNPACK_ERROR_PAYLOAD_MALLOC_FAIL,
};

#define KINETIC_SEQUENCE_NOT_YET_BOUND ((int64_t)-2)

typedef struct {
    enum socket_state state;
    KineticPDUHeader header;