Commit 575c5206 authored by Greg Williams's avatar Greg Williams
Browse files

More API updates

parent 91a0e39e
Loading
Loading
Loading
Loading
+4 −4
Original line number Diff line number Diff line
@@ -132,14 +132,14 @@ typedef struct _KineticSession {
} KineticSession;

#define KINETIC_SESSION_INIT(_session, _host, _clusterVersion, _identity, _hmacKey) { \
    _session.config = (KineticSessionConfig) { \
    (*_session).config = (KineticSessionConfig) { \
        .port = KINETIC_PORT, \
        .clusterVersion = (_clusterVersion), \
        .identity = (_identity), \
        .hmacKey = {.data = (_session)->keyData, .len = (_hmacKey).len}, \
        .hmacKey = {.data = (_session)->config.keyData, .len = (_hmacKey).len}, \
    }; \
    strcpy(*(_session).config->host, (_host)); \
    memcpy(*(_session).config->hmacKey.data, (_hmacKey).data, (_hmacKey).len); \
    strcpy((_session)->config.host, (_host)); \
    memcpy((_session)->config.hmacKey.data, (_hmacKey).data, (_hmacKey).len); \
}

// Kinetic Status Codes
+3 −3
Original line number Diff line number Diff line
@@ -44,12 +44,12 @@ KineticStatus KineticClient_CreateConnection(KineticSession* const session)
        return KINETIC_STATUS_SESSION_EMPTY;
    }

    if (strlen(session->host) == 0) {
    if (strlen(session->config.host) == 0) {
        LOG0("Host is empty!");
        return KINETIC_STATUS_HOST_EMPTY;
    }

    if (session->hmacKey.len < 1 || session->hmacKey.data == NULL) {
    if (session->config.hmacKey.len < 1 || session->config.hmacKey.data == NULL) {
        LOG0("HMAC key is NULL or empty!");
        return KINETIC_STATUS_HMAC_EMPTY;
    }
@@ -63,7 +63,7 @@ KineticStatus KineticClient_CreateConnection(KineticSession* const session)
    // Create the connection
    KineticStatus status = KineticSession_Connect(session);
    if (status != KINETIC_STATUS_SUCCESS) {
        LOGF0("Failed creating connection to %s:%d", session->host, session->port);
        LOGF0("Failed creating connection to %s:%d", session->config.host, session->config.port);
        KineticSession_Destroy(session);
        session->connection = NULL;
        return status;
+3 −2
Original line number Diff line number Diff line
@@ -72,9 +72,10 @@ KineticStatus KineticSession_Connect(KineticSession const * const session)
    // Establish the connection
    assert(session != NULL);
    assert(session->connection != NULL);
    assert(strlen(session->host) > 0);
    assert(strlen(session->config.host) > 0);
    connection->connected = false;
    connection->socket = KineticSocket_Connect(session->host, session->port);
    connection->socket = KineticSocket_Connect(
        session->config.host, session->config.port);
    connection->connected = (connection->socket >= 0);
    if (!connection->connected) {
        LOG0("Session connection failed!");
+74 −64
Original line number Diff line number Diff line
@@ -147,29 +147,13 @@ void KineticController_Pause(KineticSession const * const session, bool pause)
    session->connection->thread.paused = pause;
}

void* KineticController_ReceiveThread(void* thread_arg)
void KineticController_HandleIncomingPDU(KineticConnection* const connection)
{
    KineticStatus status;
    KineticThread* thread = thread_arg;

    while(!thread->abortRequested && !thread->fatalError) {

        // Do not service PDUs if thread paused
        if (thread->paused) {
            sleep(0);
            continue;
        } 

        // Wait for and receive a PDU
        KineticWaitStatus wait_status = KineticSocket_WaitUntilDataAvailable(thread->connection->socket, 100);
        switch(wait_status)
        {
            case KINETIC_WAIT_STATUS_DATA_AVAILABLE:
            {
                KineticPDU* response = KineticAllocator_NewPDU(thread->connection);
                status = KineticPDU_ReceiveMain(response);
    KineticPDU* response = KineticAllocator_NewPDU(connection);
    KineticStatus status = KineticPDU_ReceiveMain(response);
    if (status != KINETIC_STATUS_SUCCESS) {
                    LOGF0("ERROR: PDU receive reported an error: %s", Kinetic_GetStatusDescription(status));
        LOGF0("ERROR: PDU receive reported an error: %s",
            Kinetic_GetStatusDescription(status));
    }

    if (response->proto != NULL && response->proto->has_authType) {
@@ -189,7 +173,7 @@ void* KineticController_ReceiveThread(void* thread_arg)
            else {
                LOG0("WARNING: Unsolicited PDU is not recognized!");
            }
                        KineticAllocator_FreePDU(thread->connection, response);
            KineticAllocator_FreePDU(connection, response);
        }

        // Associate solicited response PDUs with their requests
@@ -198,7 +182,7 @@ void* KineticController_ReceiveThread(void* thread_arg)
            KineticOperation* op = KineticOperation_AssociateResponseWithOperation(response);
            if (op == NULL) {
                LOG0("Failed to find request matching received response PDU!");
                            KineticAllocator_FreePDU(thread->connection, response);
                KineticAllocator_FreePDU(connection, response);
            }
            else {
                LOG2("Found associated operation/request for response PDU.");
@@ -217,7 +201,8 @@ void* KineticController_ReceiveThread(void* thread_arg)
                    status = KineticPDU_GetStatus(response);
                }
                
                            LOGF2("Response PDU received w/status %s, %i", Kinetic_GetStatusDescription(status), status);
                LOGF2("Response PDU received w/status %s, %i",
                    Kinetic_GetStatusDescription(status), status);

                KineticOperation_Complete(op, status);
            }
@@ -225,8 +210,32 @@ void* KineticController_ReceiveThread(void* thread_arg)
    }
    else {
        // Free invalid PDU
                    KineticAllocator_FreePDU(thread->connection, response);
        KineticAllocator_FreePDU(connection, response);
    }
}

void* KineticController_ReceiveThread(void* thread_arg)
{
    KineticThread* thread = thread_arg;

    while(!thread->abortRequested && !thread->fatalError) {

        // Do not service PDUs if thread paused
        if (thread->paused) {
            sleep(0);
            continue;
        }

        // Wait for and receive a PDU
        KineticWaitStatus wait_status = KineticSocket_WaitUntilDataAvailable(
            thread->connection->socket, 100);

        // Handle wait completion events
        switch(wait_status)
        {
            case KINETIC_WAIT_STATUS_DATA_AVAILABLE:
            {
                KineticController_HandleIncomingPDU(thread->connection);
            } break;
            case KINETIC_WAIT_STATUS_TIMED_OUT:
            case KINETIC_WAIT_STATUS_RETRYABLE_ERROR:
@@ -241,6 +250,7 @@ void* KineticController_ReceiveThread(void* thread_arg)
            } break;
        }


        KineticOperation_TimeoutOperations(thread->connection);
    }

+1 −0
Original line number Diff line number Diff line
@@ -28,5 +28,6 @@ KineticOperation* KineticController_CreateOperation(KineticSession const * const
KineticStatus KineticController_ExecuteOperation(KineticOperation* operation, KineticCompletionClosure* closure);
void KineticController_Pause(KineticSession const * const session, bool pause);
void* KineticController_ReceiveThread(void* thread_arg);
void KineticController_HandleIncomingPDU(KineticConnection* const connection);

#endif // _KINETIC_CONTROLLER_H
Loading