Commit 4feeca68 authored by Scott Vokes's avatar Scott Vokes
Browse files

Merge remote-tracking branch 'origin/develop' into develop

parents 84ee84f9 db64d059
Loading
Loading
Loading
Loading
+23 −2
Original line number Diff line number Diff line
#ifndef _KINETIC_SEMAPHORE_H
#define _KINETIC_SEMAPHORE_H

#include <stdbool.h>

typedef struct _KineticSemaphore KineticSemaphore;

/**
@@ -8,7 +10,7 @@ typedef struct _KineticSemaphore KineticSemaphore;
 *        around a pthread condition variable and provides a a thread-safe
 *        way to block a thread and wait for notification from another thread.
 *
 * @return          Returns a pointer to a KineticSemaphore
 * @return          Returns a pointer to a KineticSemaphore.
 */
KineticSemaphore * KineticSemaphore_Create(void);

@@ -17,11 +19,30 @@ KineticSemaphore * KineticSemaphore_Create(void);
 *        thread that's blocked on the given semaphore using KineticSemaphore_WaitForSignalAndDestroy()
 *        You should never signal the same KineticSemaphore more than once.
 *
 * @param sem       A pointer to the semaphore to signal
 * @param sem       A pointer to the semaphore to signal.
 *
 */
void KineticSemaphore_Signal(KineticSemaphore * sem);

/**
 * @brief Reports whether the KineticSemaphore has been signaled.
 *
 * @param sem       A pointer to the semaphore to report signaled status from.
 *
 * @return          Returns true if signaled.
 */
bool KineticSemaphore_CheckSignaled(KineticSemaphore * sem);

/**
 * @brief Destorys the KineticSemaphore if it has been signaled.
 *
 * @param sem       A pointer to the semaphore to destroy.
 *
 * @return          Returns true signaled and detroyed.
 *                  Returns false if not yet signaled.
 */
bool KineticSemaphore_DestroyIfSignaled(KineticSemaphore * sem);

/**
 * @brief Blocks until the given semaphore is signaled. This will not block
 *        if the Semaphore has already been signaled. 
+0 −1
Original line number Diff line number Diff line
@@ -31,7 +31,6 @@ KineticConnection* KineticAllocator_NewConnection(void)
        LOG0("Failed allocating new Connection!");
        return NULL;
    }
    connection->socket = -1;  // start without a file descriptor
    return connection;
}

+17 −19
Original line number Diff line number Diff line
@@ -28,6 +28,7 @@
#include "kinetic_device_info.h"
#include "kinetic_allocator.h"
#include "kinetic_logger.h"
#include <pthread.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/time.h>
@@ -127,6 +128,20 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o
        return KINETIC_STATUS_BUFFER_OVERRUN;
    }

    size_t offset = 0;
    uint8_t * msg = malloc(PDU_HEADER_LEN + header.protobufLength + header.valueLength);
    if (msg == NULL)
    {
        LOG0("Failed to allocate outgoing message!");
        return KINETIC_STATUS_MEMORY_ERROR;
    }

    // Acquire lock
    pthread_mutex_lock(&operation->connection->sendMutex);

    // Populate sequence count and increment it for next operation
    request->message.header.sequence = operation->connection->sequence++;

    LOGF1("[PDU TX] pdu: 0x%0llX, op: 0x%llX, session: 0x%llX, bus: 0x%llX, seq: %5lld, protoLen: %4u, valueLen: %u",
        operation->request, operation, &operation->connection->session, operation->connection->messageBus,
        request->message.header.sequence, header.protobufLength, header.valueLength);
@@ -136,14 +151,6 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o
    uint32_t nboProtoLength = KineticNBO_FromHostU32(header.protobufLength);
    uint32_t nboValueLength = KineticNBO_FromHostU32(header.valueLength);
    
    size_t offset = 0;
    uint8_t * msg = malloc(PDU_HEADER_LEN + header.protobufLength + header.valueLength);
    if (msg == NULL)
    {
        LOG0("Failed to allocate outgoing message!");
        return KINETIC_STATUS_MEMORY_ERROR;
    }
    
    msg[offset] = header.versionPrefix;
    offset += sizeof(header.versionPrefix);

@@ -169,7 +176,6 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o
    bus_send_request(operation->connection->messageBus, &(bus_user_msg){
        .fd       = operation->connection->socket,
        .type     = BUS_SOCKET_PLAIN,
        // #TODO it would probably be good to clean up how we setup this sequence number
        .seq_id   = request->message.header.sequence,
        .msg      = msg,
        .msg_size = offset,
@@ -177,6 +183,8 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o
        .udata    = operation,
    });

    pthread_mutex_unlock(&operation->connection->sendMutex);

    free(msg);

    return KINETIC_STATUS_SUCCESS;
@@ -203,7 +211,6 @@ KineticStatus KineticOperation_NoopCallback(KineticOperation* const operation, K
void KineticOperation_BuildNoop(KineticOperation* const operation)
{
    KineticOperation_ValidateOperation(operation);
    KineticSession_IncrementSequence(&operation->connection->session);
    operation->request->message.command.header->messageType = KINETIC_PROTO_COMMAND_MESSAGE_TYPE_NOOP;
    operation->request->message.command.header->has_messageType = true;
    operation->valueEnabled = false;
@@ -248,7 +255,6 @@ void KineticOperation_BuildPut(KineticOperation* const operation,
                               KineticEntry* const entry)
{
    KineticOperation_ValidateOperation(operation);
    KineticSession_IncrementSequence(&operation->connection->session);

    operation->request->message.command.header->messageType = KINETIC_PROTO_COMMAND_MESSAGE_TYPE_PUT;
    operation->request->message.command.header->has_messageType = true;
@@ -299,7 +305,6 @@ static void build_get_command(KineticOperation* const operation,
                              KineticProto_Command_MessageType command_id)
{
    KineticOperation_ValidateOperation(operation);
    KineticSession_IncrementSequence(&operation->connection->session);

    operation->request->message.command.header->messageType = command_id;
    operation->request->message.command.header->has_messageType = true;
@@ -365,7 +370,6 @@ KineticStatus KineticOperation_FlushCallback(KineticOperation* const operation,
void KineticOperation_BuildFlush(KineticOperation* const operation)
{
    KineticOperation_ValidateOperation(operation);
    KineticSession_IncrementSequence(&operation->connection->session);
    operation->request->message.command.header->messageType =
      KINETIC_PROTO_COMMAND_MESSAGE_TYPE_FLUSHALLDATA;
    operation->request->message.command.header->has_messageType = true;
@@ -388,7 +392,6 @@ void KineticOperation_BuildDelete(KineticOperation* const operation,
                                  KineticEntry* const entry)
{
    KineticOperation_ValidateOperation(operation);
    KineticSession_IncrementSequence(&operation->connection->session);

    operation->request->message.command.header->messageType = KINETIC_PROTO_COMMAND_MESSAGE_TYPE_DELETE;
    operation->request->message.command.header->has_messageType = true;
@@ -434,7 +437,6 @@ void KineticOperation_BuildGetKeyRange(KineticOperation* const operation,
    KineticOperation_ValidateOperation(operation);
    assert(range != NULL);
    assert(buffers != NULL);
    KineticSession_IncrementSequence(&operation->connection->session);

    operation->request->command->header->messageType = KINETIC_PROTO_COMMAND_MESSAGE_TYPE_GETKEYRANGE;
    operation->request->command->header->has_messageType = true;
@@ -475,7 +477,6 @@ void KineticOperation_BuildGetLog(KineticOperation* const operation,
    KineticDeviceInfo** info)
{
    KineticOperation_ValidateOperation(operation);
    KineticSession_IncrementSequence(&operation->connection->session);
    KineticProto_Command_GetLog_Type protoType =
        KineticDeviceInfo_Type_to_KineticProto_Command_GetLog_Type(type);
        
@@ -644,7 +645,6 @@ KineticStatus KineticOperation_BuildP2POperation(KineticOperation* const operati
                                                 KineticP2P_Operation* const p2pOp)
{
    KineticOperation_ValidateOperation(operation);
    KineticSession_IncrementSequence(&operation->connection->session);
        
    operation->request->command->header->messageType = KINETIC_PROTO_COMMAND_MESSAGE_TYPE_PEER2PEERPUSH;
    operation->request->command->header->has_messageType = true;
@@ -673,7 +673,6 @@ KineticStatus KineticOperation_InstantSecureEraseCallback(KineticOperation* cons
void KineticOperation_BuildInstantSecureErase(KineticOperation* operation)
{
    KineticOperation_ValidateOperation(operation);
    KineticSession_IncrementSequence(&operation->connection->session);
    operation->request->message.command.header->messageType = KINETIC_PROTO_COMMAND_MESSAGE_TYPE_SETUP;
    operation->request->message.command.header->has_messageType = true;
    operation->request->command->body = &operation->request->message.body;
@@ -724,7 +723,6 @@ KineticStatus KineticOperation_SetClusterVersionCallback(KineticOperation* opera
void KineticOperation_BuildSetClusterVersion(KineticOperation* operation, int64_t newClusterVersion)
{
    KineticOperation_ValidateOperation(operation);
    KineticSession_IncrementSequence(&operation->connection->session);
    operation->request->message.command.header->messageType = KINETIC_PROTO_COMMAND_MESSAGE_TYPE_SETUP;
    operation->request->message.command.header->has_messageType = true;
    
+18 −1
Original line number Diff line number Diff line
#include "kinetic_semaphore.h"
#include <pthread.h>
#include <stdlib.h>
#include <stdbool.h>

struct _KineticSemaphore
{
@@ -30,6 +29,24 @@ void KineticSemaphore_Signal(KineticSemaphore * sem)
    pthread_mutex_unlock(&sem->mutex); 
}

bool KineticSemaphore_CheckSignaled(KineticSemaphore * sem)
{
    return sem->signaled;
}

bool KineticSemaphore_DestroyIfSignaled(KineticSemaphore * sem)
{
    if (sem->signaled) {
        pthread_mutex_destroy(&sem->mutex);
        pthread_cond_destroy(&sem->complete);
        free(sem);
        return true;
    }
    else {
        return false; // Semaphore has not yet been signaled
    }
}

void KineticSemaphore_WaitForSignalAndDestroy(KineticSemaphore * sem)
{
    pthread_mutex_lock(&sem->mutex);
+14 −19
Original line number Diff line number Diff line
@@ -48,14 +48,25 @@ KineticStatus KineticSession_Create(KineticSession * const session, KineticClien
        return KINETIC_STATUS_MEMORY_ERROR;
    }

    session->connection->session = *session; // TODO: KILL ME!!!
    session->connection->messageBus = client->bus;
    session->connection->socket = -1;  // start without a file descriptor
    
    // init connection send mutex
    if (pthread_mutex_init(&session->connection->sendMutex, NULL) != 0) {
        LOG0("Failed initializing connection send mutex!");
        KineticAllocator_FreeConnection(session->connection);
        return KINETIC_STATUS_MEMORY_ERROR;
    }

    session->connection->outstandingOperations =
        KineticCountingSemaphore_Create(KINETIC_MAX_OUTSTANDING_OPERATIONS_PER_SESSION);
    if (session->connection->outstandingOperations == NULL) {
        LOG0("Failed initializing session counting semaphore!");
        KineticAllocator_FreeConnection(session->connection);
        return KINETIC_STATUS_MEMORY_ERROR;
    }
    session->connection->session = *session; // TODO: KILL ME!!!
    session->connection->messageBus = client->bus;

    return KINETIC_STATUS_SUCCESS;
}

@@ -138,25 +149,9 @@ KineticStatus KineticSession_Disconnect(KineticSession const * const session)
    // Close the connection
    bus_release_socket(connection->messageBus, connection->socket);
    free(connection->si);

    connection->socket = KINETIC_HANDLE_INVALID;
    connection->connected = false;
    pthread_mutex_destroy(&connection->sendMutex);

    return KINETIC_STATUS_SUCCESS;
}

#define CAS(PTR, OLD, NEW) (__sync_bool_compare_and_swap(PTR, OLD, NEW))

void KineticSession_IncrementSequence(KineticSession const * const session)
{
    assert(session != NULL);
    assert(session->connection != NULL);

    for (;;) {
        int64_t cur_seq_id = session->connection->sequence;
        if (CAS(&session->connection->sequence,
                cur_seq_id, cur_seq_id + 1)) {
            break;
        }
    }
}
Loading