Commit bcdf29e3 authored by Job Vranish's avatar Job Vranish
Browse files

integrated threadpool and message bus

parent f35af2fa
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
@@ -263,6 +263,7 @@ stop_simulator:
# System Tests
#===============================================================================


SYSTEST_SRC = ./test/system
SYSTEST_OUT = $(BIN_DIR)/systest
SYSTEST_LDFLAGS += -lm -L${OPENSSL_PATH}/lib -lssl -lcrypto $(KINETIC_LIB) -l pthread
@@ -277,6 +278,8 @@ systest_results = $(patsubst $(SYSTEST_OUT)/run_%,$(SYSTEST_OUT)/%.log,$(systest
systest_passfiles = $(patsubst $(SYSTEST_OUT)/run_%,$(SYSTEST_OUT)/%.testpass,$(systest_executables))
systest_names = $(patsubst $(SYSTEST_OUT)/run_%,%,$(systest_executables))

.SECONDARY: $(systest_executables)

list_system_tests:
	echo $(systest_names)

+3 −3
Original line number Diff line number Diff line
@@ -29,12 +29,12 @@
 * @param log_file (path to log file, 'stdout' to log to STDOUT, NULL to disable logging)
 * @param log_level Logging level (-1:none, 0:error, 1:info, 2:verbose, 3:full)
 */
void KineticClient_Init(const char* log_file, int log_level);
KineticClient * KineticClient_Init(const char* log_file, int log_level);

/**
 * @brief Performs shutdown/cleanup of the kinetic-c client lib
 */
void KineticClient_Shutdown(void);
void KineticClient_Shutdown(KineticClient * const client);

/**
 * @brief Initializes the Kinetic API, configures logging destination, establishes a
@@ -57,7 +57,7 @@ void KineticClient_Shutdown(void);
 *                  order to shutdown a connection and cleanup resources when
 *                  done using a KineticSession.
 */
KineticStatus KineticClient_CreateConnection(KineticSession * const session);
KineticStatus KineticClient_CreateConnection(KineticSession * const session, KineticClient * const client);

/**
 * @brief Closes the connection to a host.
+4 −0
Original line number Diff line number Diff line
@@ -87,6 +87,10 @@ typedef enum _KineticSynchronization {
} KineticSynchronization;


struct _KineticClient;
typedef struct _KineticClient KineticClient;


/**
 * @brief Kinetic Connection Instance
 */
+2 −2
Original line number Diff line number Diff line
@@ -195,7 +195,7 @@ typedef struct {
        struct {
            // user needs to free *msg
            int64_t seq_id;
            uint8_t *opaque_msg;
            void *opaque_msg;
        } response;
    } u;
} bus_msg_result_t;
+26 −235
Original line number Diff line number Diff line
@@ -27,27 +27,6 @@
//==============================================================================
// Generic List Support (INTERNAL)
//==============================================================================

#define KINETIC_LIST_LOCK(_list) { \
    /*LOG_LOCATION; LOGF3("Locking list! (list_addr=0x%llX)", (_list));*/ \
    pthread_mutex_lock(&((_list)->mutex)); \
    ((_list)->locked) = true; \
}

#define KINETIC_LIST_UNLOCK(_list) { \
    /*LOG_LOCATION; LOGF3("Unlocking list! (list_addr=0x%llX)", (_list));*/ \
    assert(((_list)->locked)); \
    pthread_mutex_unlock(&((_list)->mutex)); \
    ((_list)->locked) = false; \
}

void KineticAllocator_InitLists(KineticConnection* connection)
{
    assert(connection != NULL);
    connection->pdus = KINETIC_LIST_INITIALIZER;
    connection->operations = KINETIC_LIST_INITIALIZER;
}

KineticConnection* KineticAllocator_NewConnection(void)
{
    KineticConnection* connection = calloc(1, sizeof(KineticConnection));
@@ -61,138 +40,6 @@ void KineticAllocator_FreeConnection(KineticConnection* connection)
    free(connection);
}

static void* KineticAllocator_GetFirstListItem(KineticList* list)
{
    assert(list != NULL);
    if (list->start == NULL) {
        return NULL;
    }
    return list->start->data;
}

static void* KineticAllocator_GetNextListItem(KineticList* list, void* item_data)
{
    assert(list != NULL);
    void* nextData = NULL;
    KINETIC_LIST_LOCK(list);
    KineticListItem* current = list->start;
    while (current != NULL) {
        void* currData = current->data;
        if (currData == item_data) {
            if (current->next != NULL) {
                nextData = current->next->data;
            }
            break;
        }
        current = current->next;
    }
    KINETIC_LIST_UNLOCK(list);
    return nextData;
}

static void* KineticAllocator_NewItem(KineticList* const list, size_t size)
{
    KineticListItem* newItem = (KineticListItem*)malloc(sizeof(KineticListItem));
    if (newItem == NULL) {
        LOG0("  Failed allocating new list item!");
        return NULL;
    }
    memset(newItem, 0, sizeof(KineticListItem));
    newItem->data = malloc(size);
    if (newItem->data == NULL) {
        LOG0("  Failed allocating data for list item!");
        return NULL;
    }
    memset(newItem->data, 0, size);

    // Add the new item to the list
    KINETIC_LIST_LOCK(list);
    newItem->next = list->start;
    list->start = newItem;
    KINETIC_LIST_UNLOCK(list);

    LOGF3("  Allocated new list item (0x%0llX) w/data (0x%0llX)",
         (long long)newItem, (long long)newItem->data);

    return newItem->data;
}

static void KineticAllocator_FreeItem(KineticList* const list, void* item, bool lock)
{
    /* Make locking optional, since the lock may already be owned by the caller. */
    if (lock) {
        KINETIC_LIST_LOCK(list);
    }
    KineticListItem* cur = list->start;
    KineticListItem* prev = NULL;

    while (cur->data != item) {
        if (cur->next == NULL) {
            LOG1("  Reached end of list before finding item to free!");
            if (lock) {
                KINETIC_LIST_UNLOCK(list);
            }
            return;
        }
        else {
            prev = cur;
            cur = cur->next;
        }
    }
    LOG3("  Done searching for item list item");

    if ((cur != NULL) && (cur->data == item)) {
        LOG3("  item found! freeing it.");

        if (prev == NULL) {
            LOG3("  At start of list.");
            list->start = cur->next;
        } else {
            LOG3("  Not at list start, so relinking list to free item.");
            prev->next = cur->next;
        }

        LOGF3("  Freeing item (0x%0llX) w/data (0x%0llX)", cur, &cur->data);
        free(cur->data);
        cur->data = NULL;
        free(cur);
        cur = NULL;
    }
    if (lock) {
        KINETIC_LIST_UNLOCK(list);
    }
}

static void KineticAllocator_FreeList(KineticList* const list)
{
    if (list != NULL) {
        LOGF3("  Freeing list (0x%0llX) of all items...", list);
        KINETIC_LIST_LOCK(list);

        KineticListItem* next = NULL;
        for (KineticListItem* item = list->start; item; item = next) {
            next = item->next;
            
            LOGF3("  Freeing list item (0x%0llX) w/ data (0x%llX)",
                    (long long)item, (long long)&item->data);
                if (item->data != NULL) {
                    free(item->data);
                    item->data = NULL;
                }
                free(item);
        }

        // Make list empty, but leave mutex alone so the state is retained!
        list->start = NULL;
        /* list->last = NULL; */
        KINETIC_LIST_UNLOCK(list);
    }
    else {
        LOGF3("  Nothing to free from list (0x%0llX)", list);
    }
}


//==============================================================================
// PDU List Support
//==============================================================================
@@ -201,8 +48,7 @@ KineticPDU* KineticAllocator_NewPDU(KineticConnection* connection)
{
    assert(connection != NULL);
    LOGF3("Allocating new PDU on connection (0x%0llX)", connection);
    KineticPDU* newPDU = (KineticPDU*)KineticAllocator_NewItem(
                             &connection->pdus, sizeof(KineticPDU));
    KineticPDU* newPDU = (KineticPDU*)calloc(1, sizeof(KineticPDU));
    if (newPDU == NULL) {
        LOG0("Failed allocating new PDU!");
        return NULL;
@@ -213,62 +59,44 @@ KineticPDU* KineticAllocator_NewPDU(KineticConnection* connection)
    return newPDU;
}

void KineticAllocator_FreePDU(KineticConnection* connection, KineticPDU* pdu)
void KineticAllocator_FreePDU(KineticPDU* pdu)
{
    KineticConnection* connection = pdu->connection;
    LOGF3("Freeing PDU (0x%0llX) on connection (0x%0llX)", pdu, connection);
    KINETIC_LIST_LOCK(&connection->pdus);
    if (pdu && (pdu->proto != NULL) && pdu->protobufDynamicallyExtracted) {
        LOG3("Freeing dynamically allocated protobuf");
        KineticProto_Message__free_unpacked(pdu->proto, NULL);
        pdu->proto = NULL;
    };
    
    /* TODO: We can't unlock until the function below completes, but it
     *     normally also tries to lock, so pass in a flag indicating
     *     we already have it locked. The way that the mutexes are
     *     currently initialized makes adding an attribute of
     *     PTHREAD_MUTEX_RECURSIVE significantly more trouble. */
    KineticAllocator_FreeItem(&connection->pdus, (void*)pdu, false);
    KINETIC_LIST_UNLOCK(&connection->pdus);
    free(pdu);
    LOGF3("Freed PDU (0x%0llX) on connection (0x%0llX)", pdu, connection);
}

KineticPDU* KineticAllocator_GetFirstPDU(KineticConnection* connection)
{
    assert(connection != NULL);
    return (KineticPDU*)KineticAllocator_GetFirstListItem(&connection->pdus);
}

KineticPDU* KineticAllocator_GetNextPDU(KineticConnection* connection, KineticPDU* pdu)
KineticResponse * KineticAllocator_NewKineticResponse(size_t const valueLength)
{
    assert(connection != NULL);
    return (KineticPDU*)KineticAllocator_GetNextListItem(&connection->pdus, pdu);
    KineticResponse * response = calloc(1, sizeof(*response) + valueLength);
    if (response == NULL) {
        LOG0("Failed allocating new response!");
        return NULL;
    }
    return response;
}

void KineticAllocator_FreeAllPDUs(KineticConnection* connection)
void KineticAllocator_FreeKineticResponse(KineticResponse * response)
{
    assert(connection != NULL);
    if (connection->pdus.start != NULL) {
        LOG3("Freeing all PDUs...");
        KINETIC_LIST_LOCK(&connection->pdus);
        KineticListItem* current = connection->pdus.start;
        while (current != NULL) {
            KineticPDU* pdu = (KineticPDU*)current->data;
            if (pdu != NULL && pdu->proto != NULL
                && pdu->protobufDynamicallyExtracted) {
                KineticProto_Message__free_unpacked(pdu->proto, NULL);
            }
            current = current->next;
    if (response != NULL) {
        if (response->command != NULL) {
            protobuf_c_message_free_unpacked(&response->command->base, NULL);
        }
        KINETIC_LIST_UNLOCK(&connection->pdus);
        KineticAllocator_FreeList(&connection->pdus);
        if (response->proto != NULL) {
            protobuf_c_message_free_unpacked(&response->proto->base, NULL);
        }
    else {
        LOG1("  Nothing to free!");
        free(response);
    }
}


//==============================================================================
// Operation List Support
//==============================================================================
@@ -278,7 +106,7 @@ KineticOperation* KineticAllocator_NewOperation(KineticConnection* const connect
    assert(connection != NULL);
    LOGF3("Allocating new operation on connection (0x%0llX)", connection);
    KineticOperation* newOperation =
        (KineticOperation*)KineticAllocator_NewItem(&connection->operations, sizeof(KineticOperation));
        (KineticOperation*)calloc(1, sizeof(KineticOperation));
    if (newOperation == NULL) {
        LOGF0("Failed allocating new operation on connection (0x%0llX)!", connection);
        return NULL;
@@ -290,60 +118,23 @@ KineticOperation* KineticAllocator_NewOperation(KineticConnection* const connect
    return newOperation;
}

void KineticAllocator_FreeOperation(KineticConnection* const connection, KineticOperation* operation)
void KineticAllocator_FreeOperation(KineticOperation* operation)
{
    assert(connection != NULL);
    assert(operation != NULL);
    KineticConnection* const connection = operation->connection;
    LOGF3("Freeing operation (0x%0llX) on connection (0x%0llX)", operation, connection);
    if (operation->request != NULL) {
        LOGF3("Freeing request PDU (0x%0llX) from operation (0x%0llX) on connection (0x%0llX)",
            operation->request, operation, connection);
        KineticAllocator_FreePDU(connection, operation->request);
        KineticAllocator_FreePDU(operation->request);
        operation->request = NULL;
    }
    if (operation->response != NULL) {
        LOGF3("Freeing response PDU (0x%0llX) from operation (0x%0llX) on connection (0x%0llX)",
        LOGF3("Freeing response (0x%0llX) from operation (0x%0llX) on connection (0x%0llX)",
            operation->response, operation, connection);
        KineticAllocator_FreePDU(connection, operation->response);
        KineticAllocator_FreeKineticResponse(operation->response);
        operation->response = NULL;
    }
    pthread_mutex_destroy(&operation->timeoutTimeMutex);
    KineticAllocator_FreeItem(&connection->operations, (void*)operation, true);
    free(operation);
    LOGF3("Freed operation (0x%0llX) on connection (0x%0llX)", operation, connection);
}

KineticOperation* KineticAllocator_GetFirstOperation(KineticConnection* const connection)
{
    assert(connection != NULL);
    return (KineticOperation*)KineticAllocator_GetFirstListItem(&connection->operations);
}

KineticOperation* KineticAllocator_GetNextOperation(KineticConnection* const connection, KineticOperation* operation)
{
    assert(connection != NULL);
    return (KineticOperation*)KineticAllocator_GetNextListItem(&connection->operations, operation);
}

void KineticAllocator_FreeAllOperations(KineticConnection* const connection)
{
    KineticOperation* op = KineticAllocator_GetFirstOperation(connection);
    while (op) {
        KineticAllocator_FreeOperation(connection, op);
        op = KineticAllocator_GetFirstOperation(connection);
    }
}

bool KineticAllocator_ValidateAllMemoryFreed(KineticConnection* const connection)
{
    assert(connection != NULL);
    LOGF3("Checking to see if all memory has been freed from connection (0x%0llX)...",
        connection);
    bool empty = true;
    LOGF3("  Operations: 0x%0llX, empty=%s",connection->operations.start,
        BOOL_TO_STRING(connection->operations.start == NULL));
    if (connection->operations.start != NULL) {empty = false;}
    LOGF3("  PDUs: 0x%0llX, empty=%s", connection->pdus.start,
        BOOL_TO_STRING(connection->pdus.start == NULL));
    if (connection->pdus.start != NULL) {empty = false;}
    return empty;
}
Loading