Commit 328be836 authored by Greg Williams's avatar Greg Williams
Browse files

Completed implementation of PDU recieve worker thread and initial callback...

Completed implementation of PDU recieve worker thread and initial callback mechanism in order to support asynchronous/overlapped IO in a multi or single-threaded client context.
parent 14688876
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
@@ -157,6 +157,8 @@ typedef enum {

const char* Kinetic_GetStatusDescription(KineticStatus status);

typedef void (*KineticCompletionCallback)(KineticStatus status);

// KineticEntry - byte arrays need to be preallocated by the client
typedef struct _KineticEntry {
    ByteBuffer key;
+14 −9
Original line number Diff line number Diff line
@@ -29,6 +29,8 @@
#include "kinetic_logger.h"
#include <stdlib.h>

STATIC bool AsyncModeEnabled = false;

static KineticStatus KineticClient_CreateOperation(
    KineticOperation* const operation,
    KineticSessionHandle handle)
@@ -68,17 +70,24 @@ static KineticStatus KineticClient_ExecuteOperation(KineticOperation* operation)

    // Send the request
    status = KineticPDU_Send(operation->request);
    if (status == KINETIC_STATUS_SUCCESS) {
    if (status != KINETIC_STATUS_SUCCESS) {
        return status;
    }

    if (AsyncModeEnabled) {
    }
    else {
        // Associate response with same exchange as request
        operation->response->connection = operation->request->connection;

        // Receive the response
        status = KineticPDU_Receive(operation->response);
    }

    // Update with status from response, if execution suceeded
    if (status == KINETIC_STATUS_SUCCESS) {
        status = KineticOperation_GetStatus(operation);
    }
    }

    return status;
}

@@ -137,9 +146,6 @@ KineticStatus KineticClient_Connect(const KineticSession* config,
        return status;
    }

    // Retrieve initial connection status info
    status = KineticConnection_ReceiveDeviceStatusMessage(connection);

    return status;
}

@@ -149,18 +155,17 @@ KineticStatus KineticClient_Disconnect(KineticSessionHandle* const handle)
        LOG0("Invalid KineticSessionHandle specified!");
        return KINETIC_STATUS_SESSION_INVALID;
    }

    KineticConnection* connection = KineticConnection_FromHandle(*handle);
    if (connection == NULL) {
        LOG0("Failed getting valid connection from handle!");
        return KINETIC_STATUS_CONNECTION_ERROR;
    }

    // Disconnect
    KineticStatus status = KineticConnection_Disconnect(connection);
    if (status != KINETIC_STATUS_SUCCESS) {
        LOG0("Disconnection failed!");
    }

    KineticConnection_FreeConnection(handle);
    *handle = KINETIC_HANDLE_INVALID;

+160 −22
Original line number Diff line number Diff line
@@ -22,14 +22,107 @@
#include "kinetic_types_internal.h"
#include "kinetic_socket.h"
#include "kinetic_pdu.h"
#include "kinetic_operation.h"
#include "kinetic_allocator.h"
#include "kinetic_logger.h"
#include <string.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <errno.h>
#include <sys/time.h>

STATIC KineticConnection ConnectionInstances[KINETIC_SESSIONS_MAX];
STATIC KineticConnection* Connections[KINETIC_SESSIONS_MAX];

static void* KineticConnection_Worker(void* thread_arg)
{
    KineticStatus status;
    KineticThread* thread = thread_arg;
    bool noDataArrived = false;
    bool someDataArrived = false;

    while (!thread->connection->threadCreated) {
        sleep(1);
    }

    while(!thread->abortRequested && !thread->fatalError) {

        // Wait for and receive a PDU
        int dataAvailable = KineticSocket_DataBytesAvailable(thread->connection->socket);
        if (dataAvailable < 0) {
            LOG0("ERROR: Socket error while waiting for PDU to arrive");
            thread->fatalError = true;
        }
        else if (dataAvailable == 0) {
            if (!noDataArrived) {
                LOG2("No data available to process yet...");
                noDataArrived = true;
                someDataArrived = false;
            }
            sleep(0);
        }
        else if (dataAvailable < (int)PDU_HEADER_LEN) {
            if (!someDataArrived) {
                LOG2("Not enough data available to process yet");
                someDataArrived = true;
                noDataArrived = false;
            }
            sleep(0);
        }
        else {
            KineticPDU* response = KineticAllocator_NewPDU(&thread->connection->pdus, thread->connection);
            status = KineticPDU_Receive(response);
            if (status != KINETIC_STATUS_SUCCESS) {
                LOGF0("ERROR: PDU receive reported an error: %s", Kinetic_GetStatusDescription(status));
            }
            else {
                status = KineticPDU_GetStatus(response);
            }

            if (response->proto != NULL && response->proto->has_authType) {

                // Handle unsolicited status PDUs
                if (response->proto->authType == KINETIC_PROTO_MESSAGE_AUTH_TYPE_UNSOLICITEDSTATUS) {
                    if (response->command != NULL &&
                        response->command->header != NULL &&
                        response->command->header->has_connectionID)
                    {
                        // Extract connectionID from unsolicited status message
                        response->connection->connectionID = response->command->header->connectionID;
                        LOGF2("Extracted connection ID from unsolicited status PDU (id=%lld)",
                            response->connection->connectionID);
                    }
                    else {
                        LOG0("WARNING: Unsolicited PDU is not recognized!");
                    }
                }

                // Associate solicited response PDUs with their requests
                else {
                    KineticPDU* request = NULL;
                    response->type = KINETIC_PDU_TYPE_RESPONSE;
                    request = KineticOperation_AssociateResponseWithRequest(response);
                    if (request == NULL) {
                        LOG0("Failed to find request matching received response PDU!");
                    }
                    else {
                        LOG2("Found associated request for response PDU.");
                        if (request->callback != NULL) {
                            request->callback(status);
                        }
                    }
                }
            }

            // Always free the reponse PDU
            KineticAllocator_FreePDU(&thread->connection->pdus, response);
        }
    }

    LOG1("Worker thread terminated!");
    return (void*)NULL;
}

KineticSessionHandle KineticConnection_NewConnection(
    const KineticSession* const config)
{
@@ -75,52 +168,97 @@ KineticStatus KineticConnection_Connect(KineticConnection* const connection)
        return KINETIC_STATUS_SESSION_EMPTY;
    }

    // Establish the connection
    connection->connected = false;
    connection->socket = KineticSocket_Connect(
                             connection->session.host,
                             connection->session.port,
                             connection->session.nonBlocking);
    connection->connected = (connection->socket >= 0);

    if (!connection->connected) {
        LOG0("Session connection failed!");
        connection->socket = KINETIC_SOCKET_DESCRIPTOR_INVALID;
        return KINETIC_STATUS_CONNECTION_ERROR;
    }

    // Kick off the worker thread
    connection->threadCreated = false;
    connection->thread.connection = connection;
    int pthreadStatus = pthread_create(&connection->threadID, NULL, KineticConnection_Worker, &connection->thread);
    if (pthreadStatus == 0) {
        connection->threadCreated = true;
    }
    else {
        char errMsg[256];
        Kinetic_GetErrnoDescription(pthreadStatus, errMsg, sizeof(errMsg));
        LOGF0("Failed creating worker thread w/error: %s", errMsg);
    }

    return KINETIC_STATUS_SUCCESS;
}

KineticStatus KineticConnection_Disconnect(KineticConnection* const connection)
KineticStatus KineticConnection_WaitForInitialDeviceStatus(KineticConnection* const connection)
{
    if (connection == NULL || connection->socket < 0) {
        return KINETIC_STATUS_SESSION_INVALID;
    assert(connection != NULL);

    KineticStatus status = KINETIC_STATUS_INVALID;
    bool statusReceived = false;
    bool timeout = false;
    struct timeval tv;
    time_t startTime;
    time_t currentTime;

    // Obtain start time
    gettimeofday(&tv, NULL);
    startTime = tv.tv_sec;

    while(!statusReceived && !timeout) {
        gettimeofday(&tv, NULL);
        currentTime = tv.tv_sec;
        if ((currentTime - startTime) >= KINETIC_CONNECTION_INITIAL_STATUS_TIMEOUT_SECS) {
            timeout = true;
        }
        else if (connection->connectionID > 0) {
            statusReceived = true;
        }
        else {
            sleep(0);
        }
    }

    close(connection->socket);
    connection->socket = KINETIC_HANDLE_INVALID;
    return KINETIC_STATUS_SUCCESS;
    if (statusReceived) {
        status = KINETIC_STATUS_SUCCESS;
    }
    else if (timeout) {
        status = KINETIC_STATUS_SOCKET_TIMEOUT;
    }

KineticStatus KineticConnection_ReceiveDeviceStatusMessage(
    KineticConnection* const connection)
    return status;
}

KineticStatus KineticConnection_Disconnect(KineticConnection* const connection)
{
    if (connection == NULL || connection->socket < 0) {
    if (connection == NULL || !connection->connected || connection->socket < 0) {
        return KINETIC_STATUS_SESSION_INVALID;
    }

    KineticPDU* statusPDU = KineticAllocator_NewPDU(&connection->pdus, connection);
    if (statusPDU == NULL) {
        LOG0("Failed allocating connection status PDU to receive session info!");
        return KINETIC_STATUS_MEMORY_ERROR;
    // Shutdown the worker thread
    KineticStatus status = KINETIC_STATUS_SUCCESS;
    connection->thread.abortRequested = true;
    LOG0("Sent abort request to worker thread!");
    int pthreadStatus = pthread_join(connection->threadID, NULL);
    if (pthreadStatus != 0) {
        char errMsg[256];
        Kinetic_GetErrnoDescription(pthreadStatus, errMsg, sizeof(errMsg));
        LOGF0("Failed terminating worker thread w/error: %s", errMsg);
        status = KINETIC_STATUS_CONNECTION_ERROR;
    }
    KineticStatus status = KineticPDU_Receive(statusPDU);
    if (status == KINETIC_STATUS_SUCCESS) {
        if (statusPDU->command != NULL && statusPDU->command->header != NULL) {
            connection->connectionID = statusPDU->command->header->connectionID;  
        }
    }
    KineticAllocator_FreePDU(&connection->pdus, statusPDU);

    // Close the connection
    close(connection->socket);
    connection->socket = KINETIC_HANDLE_INVALID;
    connection->connected = false;

    return status;
}

+1 −1
Original line number Diff line number Diff line
@@ -27,8 +27,8 @@ KineticSessionHandle KineticConnection_NewConnection(const KineticSession* const
void KineticConnection_FreeConnection(KineticSessionHandle* const handle);
KineticConnection* KineticConnection_FromHandle(KineticSessionHandle handle);
KineticStatus KineticConnection_Connect(KineticConnection* const connection);
KineticStatus KineticConnection_WaitForInitialDeviceStatus(KineticConnection* const connection);
KineticStatus KineticConnection_Disconnect(KineticConnection* const connection);
KineticStatus KineticConnection_ReceiveDeviceStatusMessage(KineticConnection* const connection);
void KineticConnection_IncrementSequence(KineticConnection* const connection);

#endif // _KINETIC_CONNECTION_H
+19 −6
Original line number Diff line number Diff line
@@ -22,8 +22,10 @@
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/time.h>
#include <stdlib.h>
#include <errno.h>
#include <pthread.h>

// #define USE_GENERIC_LOGGER 1 (not ready yet!)

@@ -35,6 +37,8 @@

STATIC int KineticLogLevel = -1;
STATIC FILE* KineticLoggerHandle = NULL;
STATIC bool KineticLogggerAbortRequested = false;
STATIC pthread_t KineticLoggerFlushThread;
STATIC pthread_mutex_t KineticLoggerBufferMutex = PTHREAD_MUTEX_INITIALIZER;
STATIC char KineticLoggerBuffer[KINETIC_LOGGER_BUFFER_SIZE][KINETIC_LOGGER_BUFFER_STR_MAX_LEN];
STATIC int KineticLoggerBufferSize = 0;
@@ -96,6 +100,15 @@ void KineticLogger_Init(const char* log_file, int log_level)
void KineticLogger_Close(void)
{
    if (KineticLogLevel >= 0 && KineticLoggerHandle != NULL) {
        KineticLogggerAbortRequested = true;
        #if KINETIC_LOGGER_FLUSH_THREAD_ENABLED
        int pthreadStatus = pthread_join(KineticLoggerFlushThread, NULL);
        if (pthreadStatus != 0) {
            char errMsg[256];
            Kinetic_GetErrnoDescription(pthreadStatus, errMsg, sizeof(errMsg));
            LOGF0("Failed terminating logger flush thread w/error: %s", errMsg);
        }
        #endif
        KineticLogger_FlushBuffer();
        if (KineticLoggerHandle != stdout) {
            fclose(KineticLoggerHandle);
@@ -141,7 +154,7 @@ void KineticLogger_LogLocation(const char* filename, int line, const char* messa
    }

    if (KineticLogLevel >= 0) {
        KineticLogger_LogPrintf(1, "[@%s:%d] %s", filename, line, message);
        KineticLogger_LogPrintf(1, "\n[@%s:%d] %s", filename, line, message);
    }
    else
    {
@@ -824,7 +837,7 @@ static void* KineticLogger_FlushThread(void* arg)

    lasttime = tv.tv_sec;

    for(;;) {
    while(!KineticLogggerAbortRequested) {
        sleep(KINETIC_LOGGER_SLEEP_TIME_SEC);
        gettimeofday(&tv, NULL);
        curtime = tv.tv_sec;
@@ -846,9 +859,9 @@ static void* KineticLogger_FlushThread(void* arg)

static void KineticLogger_InitFlushThread(void)
{
    pthread_t thr;
    pthread_create(&thr, NULL, KineticLogger_FlushThread, NULL);
    KineticLogger_Log(0, "Flush thread is created.\n");
    KineticLogger_Log(3, "Starting log flush thread");
    KineticLogger_FlushBuffer();
    pthread_create(&KineticLoggerFlushThread, NULL, KineticLogger_FlushThread, NULL);
}

#endif
Loading