Commit f54a526a authored by Greg Williams's avatar Greg Williams
Browse files

Merge branch 'feature/async_io' into develop

parents 00265a94 fa06122f
Loading
Loading
Loading
Loading
+0 −1
Original line number Diff line number Diff line
@@ -30,7 +30,6 @@

#define KINETIC_LIST_LOCK(_list) { \
    /*LOG_LOCATION; LOGF3("Locking list! (list_addr=0x%llX)", (_list));*/ \
    assert(!((_list)->locked)); \
    pthread_mutex_lock(&((_list)->mutex)); \
    ((_list)->locked) = true; \
}
+75 −67
Original line number Diff line number Diff line
@@ -55,15 +55,11 @@ static void* KineticConnection_Worker(void* thread_arg)
        } 

        // Wait for and receive a PDU
        int dataAvailable = KineticSocket_DataBytesAvailable(thread->connection->socket);
        if (dataAvailable < 0) {
            LOG0("ERROR: Socket error while waiting for PDU to arrive");
            thread->fatalError = true;
        }
        else if (dataAvailable < (int)PDU_HEADER_LEN) {
            sleep(0);
        }
        else {
        KineticWaitStatus wait_status = KineticSocket_WaitUntilDataAvailable(thread->connection->socket, 100);
        switch(wait_status)
        {
            case KINETIC_WAIT_STATUS_DATA_AVAILABLE:
            {
                KineticPDU* response = KineticAllocator_NewPDU(thread->connection);
                status = KineticPDU_ReceiveMain(response);
                if (status != KINETIC_STATUS_SUCCESS) {
@@ -132,6 +128,18 @@ static void* KineticConnection_Worker(void* thread_arg)
                    // Free invalid PDU
                    KineticAllocator_FreePDU(thread->connection, response);
                }
            } break;
            case KINETIC_WAIT_STATUS_TIMED_OUT:
            case KINETIC_WAIT_STATUS_RETRYABLE_ERROR:
            {
                sleep(0);
            } break;
            default:
            case KINETIC_WAIT_STATUS_FATAL_ERROR:
            {
                LOG0("ERROR: Socket error while waiting for PDU to arrive");
                thread->fatalError = true;
            } break;
        }
    }

+37 −0
Original line number Diff line number Diff line
@@ -40,6 +40,7 @@
#include <netdb.h>
#include <signal.h>
#include <unistd.h>
#include <poll.h>
#include "socket99/socket99.h"

int KineticSocket_Connect(const char* host, int port, bool nonBlocking)
@@ -148,6 +149,42 @@ void KineticSocket_Close(int socket)
    }
}

KineticWaitStatus KineticSocket_WaitUntilDataAvailable(int socket, int timeout)
{
    if (socket < 0) {return -1;}
    struct pollfd fd = {
        .fd = socket,
        .events = POLLIN,
        .revents = 0,
    };

    int res = poll(&fd, 1, timeout);

    if (res > 0) {
        //if (fd.revents & POLLHUP) // hung up
        if (fd.revents & POLLIN)
        {
            return KINETIC_WAIT_STATUS_DATA_AVAILABLE;
        }
        else
        {
            return KINETIC_WAIT_STATUS_FATAL_ERROR;
        }
    }
    else if (res == 0)
    {
        return KINETIC_WAIT_STATUS_TIMED_OUT;
    }
    else if ((errno & (EAGAIN | EINTR)) != 0)
    {
        return KINETIC_WAIT_STATUS_RETRYABLE_ERROR;
    }
    else
    {
        return KINETIC_WAIT_STATUS_FATAL_ERROR;
    }
}

int KineticSocket_DataBytesAvailable(int socket)
{
    if (socket < 0) {return -1;}
+9 −0
Original line number Diff line number Diff line
@@ -24,10 +24,19 @@
#include "kinetic_types_internal.h"
#include "kinetic_message.h"

typedef enum
{
    KINETIC_WAIT_STATUS_DATA_AVAILABLE,
    KINETIC_WAIT_STATUS_TIMED_OUT,
    KINETIC_WAIT_STATUS_RETRYABLE_ERROR,
    KINETIC_WAIT_STATUS_FATAL_ERROR,
} KineticWaitStatus;

int KineticSocket_Connect(const char* host, int port, bool nonBlocking);
void KineticSocket_Close(int socket);

int KineticSocket_DataBytesAvailable(int socket);
KineticWaitStatus KineticSocket_WaitUntilDataAvailable(int socket, int timeout);
KineticStatus KineticSocket_Read(int socket, ByteBuffer* dest, size_t len);
KineticStatus KineticSocket_ReadProtobuf(int socket, KineticPDU* pdu);

+11 −65
Original line number Diff line number Diff line
@@ -156,7 +156,7 @@ void test_KineticConnection_Connect_should_connect_to_specified_host(void)
                                          expected.session.nonBlocking, expected.socket);

    // Setup mock expectations for worker thread
    KineticSocket_DataBytesAvailable_IgnoreAndReturn(0);
    KineticSocket_WaitUntilDataAvailable_IgnoreAndReturn(KINETIC_WAIT_STATUS_TIMED_OUT);

    // Establish connection
    KineticStatus status = KineticConnection_Connect(Connection);
@@ -209,7 +209,7 @@ void test_KineticConnection_Worker_should_run_fine_while_no_data_arrives(void)
                                          expected.session.nonBlocking, expected.socket);

    // Setup mock expectations for worker thread so it can run in IDLE mode
    KineticSocket_DataBytesAvailable_IgnoreAndReturn(0);
    KineticSocket_WaitUntilDataAvailable_IgnoreAndReturn(KINETIC_WAIT_STATUS_TIMED_OUT);

    // Establish connection
    KineticStatus status = KineticConnection_Connect(Connection);
@@ -220,60 +220,6 @@ void test_KineticConnection_Worker_should_run_fine_while_no_data_arrives(void)
    LOG0("Done allowing worker thread to execute for a bit!");
}

void test_KineticConnection_Worker_should_run_fine_if_not_enough_data_has_arrived(void)
{
    LOG_LOCATION;
    const uint8_t hmacKey[] = {1, 6, 3, 5, 4, 8, 19};
    const int socket = 24;

    KineticConnection expected = (KineticConnection) {
        .connected = true,
        .socket = socket,
        .session = (KineticSession) {
            .host = "valid-host.com",
            .port = 1234,
            .clusterVersion = 17,
            .identity = 12,
            .hmacKey = {.data = expected.session.keyData, .len = sizeof(hmacKey)},
        },
    };
    memcpy(expected.session.hmacKey.data, hmacKey, expected.session.hmacKey.len);

    *Connection = (KineticConnection) {
        .connected = false,
        .socket = -1,
        .session = (KineticSession) {
            .host = "valid-host.com",
            .port = expected.session.port,
            .nonBlocking = false,
            .clusterVersion = expected.session.clusterVersion,
            .identity = expected.session.identity,
            .hmacKey = {.data = Connection->session.keyData, .len = sizeof(hmacKey)},
        },
    };
    memcpy(Connection->session.hmacKey.data, hmacKey, expected.session.hmacKey.len);

    KineticSocket_Connect_ExpectAndReturn(expected.session.host, expected.session.port,
                                          expected.session.nonBlocking, expected.socket);

    // Setup mock expectations for worker thread
    KineticSocket_DataBytesAvailable_IgnoreAndReturn(0);

    // Establish connection
    KineticStatus status = KineticConnection_Connect(Connection);
    TEST_ASSERT_EQUAL(KINETIC_STATUS_SUCCESS, status);

    // Ensure no PDUs are attempted to be read if some, but not enough, data has been received
    sleep(0);
    KineticSocket_DataBytesAvailable_ExpectAndReturn(socket, 1);
    KineticSocket_DataBytesAvailable_IgnoreAndReturn(0);
    sleep(0);
    KineticSocket_DataBytesAvailable_ExpectAndReturn(socket, PDU_HEADER_LEN - 1);
    KineticSocket_DataBytesAvailable_IgnoreAndReturn(0);
    sleep(0);
}


void test_KineticConnection_Worker_should_process_unsolicited_response_PDUs(void)
{
    LOG_LOCATION;
@@ -312,7 +258,7 @@ void test_KineticConnection_Worker_should_process_unsolicited_response_PDUs(void
                                          expected.session.nonBlocking, expected.socket);

    // Setup mock expectations for worker thread
    KineticSocket_DataBytesAvailable_IgnoreAndReturn(0);
    KineticSocket_WaitUntilDataAvailable_IgnoreAndReturn(KINETIC_WAIT_STATUS_TIMED_OUT);
    Response.type = KINETIC_PDU_TYPE_UNSOLICITED;
    Response.command->header->connectionID = connectionID;
    Response.command->header->has_connectionID = true;
@@ -339,10 +285,10 @@ void test_KineticConnection_Worker_should_process_unsolicited_response_PDUs(void
    KineticAllocator_FreePDU_Expect(Connection, &Response);

    // Must trigger data ready last, in order for mocked simulation to work as desired
    KineticSocket_DataBytesAvailable_ExpectAndReturn(socket, PDU_HEADER_LEN);
    KineticSocket_WaitUntilDataAvailable_ExpectAndReturn(socket, 100, KINETIC_WAIT_STATUS_DATA_AVAILABLE);

    // Make sure to return read thread to IDLE state
    KineticSocket_DataBytesAvailable_IgnoreAndReturn(0);
    KineticSocket_WaitUntilDataAvailable_IgnoreAndReturn(KINETIC_WAIT_STATUS_TIMED_OUT);
    KineticConnection_Pause(Connection, false);

    // Wait for unsolicited status PDU to be received and processed...
@@ -418,7 +364,7 @@ void test_KineticConnection_Worker_should_process_solicited_response_PDUs(void)
    };

    // Setup mock expectations for worker thread
    KineticSocket_DataBytesAvailable_IgnoreAndReturn(0);
    KineticSocket_WaitUntilDataAvailable_IgnoreAndReturn(KINETIC_WAIT_STATUS_TIMED_OUT);
    Response.type = KINETIC_PDU_TYPE_RESPONSE;
    Response.proto->authType = KINETIC_PROTO_MESSAGE_AUTH_TYPE_HMACAUTH;
    Response.proto->has_authType = true;
@@ -440,10 +386,10 @@ void test_KineticConnection_Worker_should_process_solicited_response_PDUs(void)
    KineticAllocator_FreeOperation_Expect(Connection, &op);

    // Signal data has arrived so status PDU can be consumed
    KineticSocket_DataBytesAvailable_ExpectAndReturn(socket, PDU_HEADER_LEN);
    KineticSocket_WaitUntilDataAvailable_ExpectAndReturn(socket, 100, KINETIC_WAIT_STATUS_DATA_AVAILABLE);

    // Make sure to return read thread to IDLE state
    KineticSocket_DataBytesAvailable_IgnoreAndReturn(0);
    KineticSocket_WaitUntilDataAvailable_IgnoreAndReturn(KINETIC_WAIT_STATUS_TIMED_OUT);
    KineticConnection_Pause(Connection, false);

    // Wait for solicited status PDU to be received and processed...
@@ -506,7 +452,7 @@ void test_KineticConnection_Worker_should_process_solicited_response_PDUs_with_V
    op.entry = &entry;

    // Setup mock expectations for worker thread
    KineticSocket_DataBytesAvailable_IgnoreAndReturn(0);
    KineticSocket_WaitUntilDataAvailable_IgnoreAndReturn(KINETIC_WAIT_STATUS_TIMED_OUT);
    Response.type = KINETIC_PDU_TYPE_RESPONSE;
    Response.proto->authType = KINETIC_PROTO_MESSAGE_AUTH_TYPE_HMACAUTH;
    Response.proto->has_authType = true;
@@ -529,10 +475,10 @@ void test_KineticConnection_Worker_should_process_solicited_response_PDUs_with_V
    KineticAllocator_FreeOperation_Expect(Connection, &op);

    // Signal data has arrived so status PDU can be consumed
    KineticSocket_DataBytesAvailable_ExpectAndReturn(socket, PDU_HEADER_LEN);
    KineticSocket_WaitUntilDataAvailable_ExpectAndReturn(socket, 100, KINETIC_WAIT_STATUS_DATA_AVAILABLE);

    // Make sure to return read thread to IDLE state
    KineticSocket_DataBytesAvailable_IgnoreAndReturn(0);
    KineticSocket_WaitUntilDataAvailable_IgnoreAndReturn(KINETIC_WAIT_STATUS_TIMED_OUT);
    KineticConnection_Pause(Connection, false);

    // Wait for solicited status PDU to be received and processed...