Commit 4adfac9b authored by Greg Williams's avatar Greg Williams
Browse files

Updated PDU allocation to use seperate lists per kinetic session with...

Updated PDU allocation to use seperate lists per kinetic session with per-session mutexes to lock only the list being modified, which should have benefits on overlapped IO operations as well.
parent c523fa6e
Loading
Loading
Loading
Loading
+26 −20
Original line number Diff line number Diff line
@@ -25,20 +25,26 @@

// #define KINETIC_LOG_ALLOCATOR

pthread_mutex_t _global_pdu_lists_mutex = PTHREAD_MUTEX_INITIALIZER;
STATIC bool listsLocked = false;
static inline void KineticAllocator_LockList(KineticList* const list)
{
    assert(!list->locked);
    pthread_mutex_lock(&list->mutex);
    list->locked = true;
}

static inline void KineticAllocator_Lock(void)
static inline void KineticAllocator_UnlockList(KineticList* const list)
{
    pthread_mutex_lock(&_global_pdu_lists_mutex);
    assert(!listsLocked);
    listsLocked = true;
    // assert(list->locked);
    pthread_mutex_unlock(&list->mutex);
    list->locked = false;
}

static inline void KineticAllocator_Unlock(void)
void KineticAllocator_InitList(KineticList* const list)
{
    pthread_mutex_unlock(&_global_pdu_lists_mutex);
    listsLocked = false;
    *list = (KineticList) {
        .mutex = PTHREAD_MUTEX_INITIALIZER,
        .locked = false,
    };
}

void* KineticAllocator_NewItem(KineticList* const list, size_t size)
@@ -57,7 +63,7 @@ void* KineticAllocator_NewItem(KineticList* const list, size_t size)
    memset(newItem->data, 0, size);

    // Add the new item to the list
    KineticAllocator_Lock();
    KineticAllocator_LockList(list);
    if (list->start == NULL) {
        list->start = newItem;
    }
@@ -66,7 +72,7 @@ void* KineticAllocator_NewItem(KineticList* const list, size_t size)
        list->last->next = newItem;
    }
    list->last = newItem;
    KineticAllocator_Unlock();
    KineticAllocator_UnlockList(list);

    #ifdef KINETIC_LOG_ALLOCATOR
    LOGF("Allocated new list item @ 0x%0llX w/data @ 0x%0llX",
@@ -78,12 +84,12 @@ void* KineticAllocator_NewItem(KineticList* const list, size_t size)

void KineticAllocator_FreeItem(KineticList* const list, void* item)
{
    KineticAllocator_Lock();
    KineticAllocator_LockList(list);
    KineticListItem* cur = list->start;
    while (cur->data != item) {
        if (cur->next == NULL) {
            LOG("  Reached end of list before finding item to free!");
            KineticAllocator_Unlock();
            KineticAllocator_UnlockList(list);
            return;
        }
        else {
@@ -140,7 +146,7 @@ void KineticAllocator_FreeItem(KineticList* const list, void* item)
        free(cur);
        cur = NULL;
    }
    KineticAllocator_Unlock();
    KineticAllocator_UnlockList(list);
}

void KineticAllocator_FreeList(KineticList* const list)
@@ -150,7 +156,7 @@ void KineticAllocator_FreeList(KineticList* const list)
        LOG("Freeing list of all items");
        #endif

        KineticAllocator_Lock();
        KineticAllocator_LockList(list);

        KineticListItem* current = list->start;

@@ -188,7 +194,7 @@ void KineticAllocator_FreeList(KineticList* const list)
            .start = NULL, .last = NULL
        };

        KineticAllocator_Unlock();
        KineticAllocator_UnlockList(list);
    }
    else {
        #ifdef KINETIC_LOG_ALLOCATOR
@@ -217,14 +223,14 @@ KineticPDU* KineticAllocator_NewPDU(KineticList* const list, KineticConnection*

void KineticAllocator_FreePDU(KineticList* const list, KineticPDU* pdu)
{
    KineticAllocator_Lock();
    KineticAllocator_LockList(list);
    if ((pdu->proto != NULL) && pdu->protobufDynamicallyExtracted) {
        #ifdef KINETIC_LOG_ALLOCATOR
        LOG("Freeing dynamically allocated protobuf");
        #endif
        KineticProto_Message__free_unpacked(pdu->proto, NULL);
    };
    KineticAllocator_Unlock();
    KineticAllocator_UnlockList(list);
    KineticAllocator_FreeItem(list, (void*)pdu);
}

@@ -232,7 +238,7 @@ void KineticAllocator_FreeAllPDUs(KineticList* const list)
{
    if (list->start != NULL) {
        LOG("Freeing all PDUs...");
        KineticAllocator_Lock();
        KineticAllocator_LockList(list);
        KineticListItem* current = list->start;
        while (current != NULL) {
            KineticPDU* pdu = (KineticPDU*)current->data;
@@ -242,7 +248,7 @@ void KineticAllocator_FreeAllPDUs(KineticList* const list)
            }
            current = current->next;
        }
        KineticAllocator_Unlock();
        KineticAllocator_UnlockList(list);
        KineticAllocator_FreeList(list);
    }
    else {
+1 −0
Original line number Diff line number Diff line
@@ -23,6 +23,7 @@

#include "kinetic_types_internal.h"

void KineticAllocator_InitList(KineticList* const list);
KineticPDU* KineticAllocator_NewPDU(KineticList* const list, KineticConnection* connection);
void KineticAllocator_FreePDU(KineticList* const list, KineticPDU* pdu);
void KineticAllocator_FreeAllPDUs(KineticList* const list);
+2 −0
Original line number Diff line number Diff line
@@ -56,6 +56,8 @@ struct _KineticListItem {
typedef struct _KineticList {
    KineticListItem* start;
    KineticListItem* last;
    pthread_mutex_t mutex;
    bool locked;
} KineticList;

typedef struct _KineticPDU KineticPDU;
+1 −1
Original line number Diff line number Diff line
@@ -36,7 +36,7 @@ void SystemTestSetup(SystemTestFixture* fixture)
    if (!fixture->connected) {
        *fixture = (SystemTestFixture) {
            .config = (KineticSession) {
                .host = "localhost",
                .host = SYSTEM_TEST_HOST,
                .port = KINETIC_PORT,
                .clusterVersion = 0,
                .identity =  1,
+5 −0
Original line number Diff line number Diff line
@@ -24,6 +24,11 @@
#include "kinetic_types.h"
#include "kinetic_logger.h"

#ifndef SYSTEM_TEST_HOST
// #define SYSTEM_TEST_HOST "localhost"
#define SYSTEM_TEST_HOST "192.168.2.17"
#endif

typedef struct _SystemTestFixture {
    KineticSession config;
    KineticSessionHandle handle;
Loading