Commit c165a020 authored by Job Vranish's avatar Job Vranish
Browse files

implemented mutex around write socket?

parent f2f16109
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -173,6 +173,7 @@ void KineticConnection_FreeConnection(KineticSessionHandle* const handle)
    assert(*handle != KINETIC_HANDLE_INVALID);
    KineticConnection* connection = KineticConnection_FromHandle(*handle);
    assert(connection != NULL);
    pthread_mutex_destroy(&connection->writeMutex);
    *connection = (KineticConnection) {
        .connected = false
    };
+38 −30
Original line number Diff line number Diff line
@@ -32,6 +32,40 @@

static void KineticOperation_ValidateOperation(KineticOperation* operation);

static KineticStatus WritePDU(KineticOperation* const operation)
{
    KineticPDU* request = operation->request;
    // Pack and send the PDU header
    ByteBuffer hdr = ByteBuffer_Create(&request->headerNBO, sizeof(KineticPDUHeader), sizeof(KineticPDUHeader));
    KineticStatus status = KineticSocket_Write(request->connection->socket, &hdr);
    if (status != KINETIC_STATUS_SUCCESS) {
        LOG0("Failed to send PDU header!");
        return status;
    }

    // Send the protobuf message
    LOG1("Sending PDU Protobuf:");
    KineticLogger_LogProtobuf(2, request->proto);
    status = KineticSocket_WriteProtobuf(request->connection->socket, request);
    if (status != KINETIC_STATUS_SUCCESS) {
        LOG0("Failed to send PDU protobuf message!");
        return status;
    }

    // Send the value/payload, if specified
    if (operation->valueEnabled && operation->sendValue) {
        LOGF1("Sending PDU Value Payload (%zu bytes)", operation->entry->value.bytesUsed);
        status = KineticSocket_Write(request->connection->socket, &operation->entry->value);
        if (status != KINETIC_STATUS_SUCCESS) {
            LOG0("Failed to send PDU value payload!");
            return status;
        }
    }

    LOG2("PDU sent successfully!");
    return KINETIC_STATUS_SUCCESS;
}

KineticStatus KineticOperation_SendRequest(KineticOperation* const operation)
{
    assert(operation != NULL);
@@ -39,7 +73,6 @@ KineticStatus KineticOperation_SendRequest(KineticOperation* const operation)
    assert(operation->request != NULL);
    assert(operation->request->connection == operation->connection);
    LOGF1("\nSending PDU via fd=%d", operation->connection->socket);
    KineticStatus status = KINETIC_STATUS_INVALID;
    KineticPDU* request = operation->request;
    request->proto = &operation->request->protoData.message.message;

@@ -81,36 +114,11 @@ KineticStatus KineticOperation_SendRequest(KineticOperation* const operation)
    request->headerNBO.protobufLength = KineticNBO_FromHostU32(request->header.protobufLength);
    request->headerNBO.valueLength = KineticNBO_FromHostU32(request->header.valueLength);

    // Pack and send the PDU header
    ByteBuffer hdr = ByteBuffer_Create(&request->headerNBO, sizeof(KineticPDUHeader), sizeof(KineticPDUHeader));
    status = KineticSocket_Write(request->connection->socket, &hdr);
    if (status != KINETIC_STATUS_SUCCESS) {
        LOG0("Failed to send PDU header!");
        return status;
    }

    // Send the protobuf message
    LOG1("Sending PDU Protobuf:");
    KineticLogger_LogProtobuf(2, request->proto);
    status = KineticSocket_WriteProtobuf(request->connection->socket, request);
    if (status != KINETIC_STATUS_SUCCESS) {
        LOG0("Failed to send PDU protobuf message!");
        return status;
    }

    // Send the value/payload, if specified
    if (operation->valueEnabled && operation->sendValue) {
        LOGF1("Sending PDU Value Payload (%zu bytes)", operation->entry->value.bytesUsed);
        status = KineticSocket_Write(request->connection->socket, &operation->entry->value);
        if (status != KINETIC_STATUS_SUCCESS) {
            LOG0("Failed to send PDU value payload!");
    pthread_mutex_lock(&operation->connection->writeMutex);
    KineticStatus status = WritePDU(operation);
    pthread_mutex_unlock(&operation->connection->writeMutex);
    return status;
}
    }

    LOG2("PDU sent successfully!");
    return KINETIC_STATUS_SUCCESS;
}

KineticStatus KineticOperation_GetStatus(const KineticOperation* const operation)
{
+2 −0
Original line number Diff line number Diff line
@@ -85,6 +85,7 @@ typedef struct _KineticThread {
struct _KineticConnection {
    bool            connected;      // state of connection
    int             socket;         // socket file descriptor
    pthread_mutex_t writeMutex;     // socket write mutex
    int64_t         connectionID;   // initialized to seconds since epoch
    int64_t         sequence;       // increments for each request in a session
    KineticList     pdus;           // list of dynamically allocated PDUs
@@ -96,6 +97,7 @@ struct _KineticConnection {
#define KINETIC_CONNECTION_INIT(_con) { (*_con) = (KineticConnection) { \
        .connected = false, \
        .socket = -1, \
        .writeMutex = PTHREAD_MUTEX_INITIALIZER, \
        .operations = KINETIC_LIST_INITIALIZER, \
        .pdus = KINETIC_LIST_INITIALIZER, \
    }; \