Commit b260d8d7 authored by Job Vranish's avatar Job Vranish
Browse files

cleaned up kinetic socket tests

parent 4cc0c275
Loading
Loading
Loading
Loading
+0 −179
Original line number Diff line number Diff line
@@ -150,185 +150,6 @@ void KineticSocket_Close(int socket)
    }
}

int KineticSocket_DataBytesAvailable(int socket)
{
    if (socket < 0) {return -1;}
    int count = -1;
    ioctl(socket, FIONREAD, &count);
    return count;
}

KineticStatus KineticSocket_Read(int socket, ByteBuffer* dest, size_t len)
{
    LOGF2("Reading %zd bytes into buffer @ 0x%zX from fd=%d",
         len, (size_t)dest->array.data, socket);

    KineticStatus status = KINETIC_STATUS_INVALID;

    // Read "up to" the allocated number of bytes into dest buffer
    size_t bytesToReadIntoBuffer = len;
    if (dest->array.len < len) {
        bytesToReadIntoBuffer = dest->array.len;
    }
    while (dest->bytesUsed < bytesToReadIntoBuffer) {
        int opStatus;
        fd_set readSet;
        struct timeval timeout;

        // Time out after 5 seconds
        timeout.tv_sec = 5;
        timeout.tv_usec = 0;

        FD_ZERO(&readSet);
        FD_SET(socket, &readSet);
        opStatus = select(socket + 1, &readSet, NULL, NULL, &timeout);

        if (opStatus < 0) { // Error occurred
            LOGF0("Failed waiting to read from socket!"
                 " status=%d, errno=%d, desc='%s'",
                 opStatus, errno, strerror(errno));
            return KINETIC_STATUS_SOCKET_ERROR;
        }
        else if (opStatus == 0) { // Timeout occurred
            LOG0("Timed out waiting for socket data to arrive!");
            return KINETIC_STATUS_SOCKET_TIMEOUT;
        }
        else if (opStatus > 0) { // Data available to read
            // The socket is ready for reading
            opStatus = read(socket,
                            &dest->array.data[dest->bytesUsed],
                            dest->array.len - dest->bytesUsed);
            // Retry if no data yet...
            if (opStatus == -1 &&
                ((errno == EINTR) ||
                 (errno == EAGAIN) ||
                 (errno == EWOULDBLOCK)
                )) {
                continue;
            }
            else if (opStatus <= 0) {
                LOGF0("Failed to read from socket!"
                     " status=%d, errno=%d, desc='%s'",
                     opStatus, errno, strerror(errno));
                return KINETIC_STATUS_SOCKET_ERROR;
            }
            else {
                dest->bytesUsed += opStatus;
                LOGF3("Received %d bytes (%zd of %zd)",
                     opStatus, dest->bytesUsed, len);
            }
        }
    }

    // Flush any remaining data, in case of a truncated read w/short dest buffer
    if (dest->bytesUsed < len) {
        bool abortFlush = false;

        uint8_t* discardedBytes = malloc(len - dest->bytesUsed);
        if (discardedBytes == NULL) {
            LOG0("Failed allocating a socket read discard buffer!");
            abortFlush = true;
            status = KINETIC_STATUS_MEMORY_ERROR;
        }

        while (!abortFlush && dest->bytesUsed < len) {
            int opStatus;
            fd_set readSet;
            struct timeval timeout;
            size_t remainingLen = len - dest->bytesUsed;

            // Time out after 5 seconds
            timeout.tv_sec = 5;
            timeout.tv_usec = 0;

            FD_ZERO(&readSet);
            FD_SET(socket, &readSet);
            opStatus = select(socket + 1, &readSet, NULL, NULL, &timeout);

            if (opStatus < 0) { // Error occurred
                LOGF0("Failure trying to flush read socket data!"
                     " status=%d, errno=%d, desc='%s'",
                     status, errno, strerror(errno));
                abortFlush = true;
                status = KINETIC_STATUS_SOCKET_ERROR;
                continue;
            }
            else if (opStatus == 0) { // Timeout occurred
                LOG0("Timed out waiting to flush socket data!");
                abortFlush = true;
                status = KINETIC_STATUS_SOCKET_TIMEOUT;
                continue;
            }
            else if (opStatus > 0) { // Data available to read or error
                // The socket is ready for reading
                opStatus = read(socket, discardedBytes, remainingLen);
                // Retry if no data yet...
                if (opStatus == -1 &&
                    ((errno == EINTR) ||
                     (errno == EAGAIN) ||
                     (errno == EWOULDBLOCK)
                    )) {
                    continue;
                }
                else if (opStatus <= 0) {
                    LOGF0("Failed to read from socket while flushing!"
                         " status=%d, errno=%d, desc='%s'",
                         opStatus, errno, strerror(errno));
                    abortFlush = true;
                    status = KINETIC_STATUS_SOCKET_ERROR;
                }
                else {
                    dest->bytesUsed += opStatus;
                    LOGF3("Flushed %d bytes from socket read pipe (%zd of %zd)",
                         opStatus, dest->bytesUsed, len);
                }
            }
        }

        // Free up dynamically allocated memory before returning
        if (discardedBytes != NULL) {
            free(discardedBytes);
        }

        // Report any error that occurred during socket flush
        if (abortFlush) {
            LOG0("Socket read pipe flush aborted!");
            assert(status == KINETIC_STATUS_SUCCESS);
            return status;
        }

        // Report truncation of data for any variable length byte arrays
        LOGF1("Socket read buffer was truncated due to buffer overrun!"
             " received=%zu, copied=%zu",
             len, dest->array.len);
        return KINETIC_STATUS_BUFFER_OVERRUN;
    }
    LOGF3("Received %zd of %zd bytes requested", dest->bytesUsed, len);
    return KINETIC_STATUS_SUCCESS;
}

KineticStatus KineticSocket_Write(int socket, ByteBuffer* src)
{
    LOGF3("Writing %zu bytes to socket...", src->bytesUsed);
    for (unsigned int bytesSent = 0; bytesSent < src->bytesUsed;) {
        int bytesRemaining = src->bytesUsed - bytesSent;
        int status = write(socket, &src->array.data[bytesSent], bytesRemaining);
        if (status == -1 &&
            ((errno == EINTR) || (errno == EAGAIN) || (errno == EWOULDBLOCK))) {
            LOG3("Write interrupted. retrying...");
            continue;
        }
        else if (status <= 0) {
            LOGF0("Failed to write to socket! status=%d, errno=%d\n", status, errno);
            return KINETIC_STATUS_SOCKET_ERROR;
        }
        else {
            bytesSent += status;
            LOGF2("Wrote %d bytes (%d of %zu sent)", status, bytesSent, src->bytesUsed);
        }
    }
    return KINETIC_STATUS_SUCCESS;
}

void KineticSocket_BeginPacket(int socket)
{
+0 −5
Original line number Diff line number Diff line
@@ -35,11 +35,6 @@ typedef enum
int KineticSocket_Connect(const char* host, int port);
void KineticSocket_Close(int socket);

int KineticSocket_DataBytesAvailable(int socket);
KineticWaitStatus KineticSocket_WaitUntilDataAvailable(int socket, int timeout);
KineticStatus KineticSocket_Read(int socket, ByteBuffer* dest, size_t len);

KineticStatus KineticSocket_Write(int socket, ByteBuffer* src);
void KineticSocket_BeginPacket(int socket);
void KineticSocket_FinishPacket(int socket);

+0 −260
Original line number Diff line number Diff line
@@ -48,57 +48,16 @@

static int FileDesc;
static int KineticTestPort = KINETIC_PORT;
static uint8_t TestData[128];
static ByteBuffer TestDataBuffer;
static bool SocketReadRequested;

void Socket_RequestBytes(size_t count)
{
    char request[10];
    sprintf(request, "read(%zu)", count);
    uint8_t requestData[64];
    ByteBuffer requestBuffer = ByteBuffer_Create(requestData, sizeof(requestData), 0);
    ByteBuffer_AppendCString(&requestBuffer, request);
    TEST_ASSERT_EQUAL_KineticStatus_MESSAGE(
        KINETIC_STATUS_SUCCESS, KineticSocket_Write(FileDesc, &requestBuffer),
        "Failed requesting dummy data from test socket server");
    sleep(1);
}

void Socket_RequestProtobuf(void)
{
    uint8_t requestData[64];
    ByteBuffer requestBuffer = ByteBuffer_Create(requestData, sizeof(requestData), 0);
    ByteBuffer_AppendCString(&requestBuffer, "readProto()");
    TEST_ASSERT_EQUAL_KineticStatus_MESSAGE(
        KINETIC_STATUS_SUCCESS, KineticSocket_Write(FileDesc, &requestBuffer),
        "Failed requesting dummy protobuf from test socket server");
}

void Socket_FlushReadPipe(void)
{
    LOG0("Flushing socket socket read pipe...");
    uint8_t bufferData[40];
    ByteBuffer recvBuffer = ByteBuffer_Create(bufferData, sizeof(bufferData), 0);
    KineticSocket_Read(FileDesc, &recvBuffer, sizeof(bufferData));
}


void setUp(void)
{
    KineticLogger_Init("stdout", 3);
    FileDesc = -1;
    SocketReadRequested = false;
    TestDataBuffer = ByteBuffer_Create(TestData, sizeof(TestData), 0);
    ByteBuffer_AppendCString(&TestDataBuffer, "Some like it hot!");
}

void tearDown(void)
{
    if (FileDesc >= 0) {
        if (SocketReadRequested) {
            Socket_FlushReadPipe();
        }
        LOG0("Shutting down socket...");
        KineticSocket_Close(FileDesc);
        FileDesc = 0;
@@ -123,222 +82,3 @@ void test_KineticSocket_Connect_should_create_a_socket_connection(void)


// Disabling socket read/write tests in not OSX, since Linux TravisCI builds
// fail, but system test passes. Most likely an issue with KineticRuby server
#if defined(__APPLE__)

static KineticPDU PDU;

void test_KineticSocket_Write_should_write_the_data_to_the_specified_socket(void)
{
    LOG_LOCATION;
    FileDesc = KineticSocket_Connect("localhost", KineticTestPort);
    TEST_ASSERT_TRUE_MESSAGE(FileDesc >= 0, "File descriptor invalid");

#if defined(__unix__) && !defined(__APPLE__)
    TEST_IGNORE_MESSAGE("Disabled on Linux until KineticRuby server client connection cleanup is fixed!");
#endif

    KineticStatus status = KineticSocket_Write(FileDesc, &TestDataBuffer);
    TEST_ASSERT_EQUAL_KineticStatus_MESSAGE(
        KINETIC_STATUS_SUCCESS, status, "Failed to write to socket!");
    Socket_FlushReadPipe();
}

void test_KineticSocket_WriteProtobuf_should_write_serialized_protobuf_to_the_specified_socket(void)
{
    LOG_LOCATION;
    KineticConnection connection;
    KINETIC_CONNECTION_INIT(&connection);
    KineticSession session = {
        .config = (KineticSessionConfig) {
            .clusterVersion = 12345678,
            .identity = -12345678,
        },
        .connection = &connection,
    };

    connection.session = session;
    KineticPDU_InitWithCommand(&PDU, &connection);
    /* KINETIC_PDU_INIT_WITH_COMMAND(&PDU, &connection); */
    KineticMessage_Init(&PDU.protoData.message);
    PDU.header.protobufLength = KineticProto_Message__get_packed_size(PDU.proto);

    FileDesc = KineticSocket_Connect("localhost", KineticTestPort);
    TEST_ASSERT_TRUE_MESSAGE(FileDesc >= 0, "File descriptor invalid");

#if defined(__unix__) && !defined(__APPLE__)
    TEST_IGNORE_MESSAGE("Disabled on Linux until KineticRuby server client connection cleanup is fixed!");
#endif

    LOG0("Writing a dummy protobuf...");
    KineticStatus status = KineticSocket_WriteProtobuf(FileDesc, &PDU);
    TEST_ASSERT_EQUAL_KineticStatus_MESSAGE(
        KINETIC_STATUS_SUCCESS, status, "Failed to write to socket!");
}


void test_KineticSocket_DataBytesAvailable_should_report_receive_pipe_status(void)
{
    LOG_LOCATION;
    TEST_ASSERT_EQUAL(-1, KineticSocket_DataBytesAvailable(123));

    FileDesc = KineticSocket_Connect("localhost", KineticTestPort);
    TEST_ASSERT_TRUE_MESSAGE(FileDesc >= 0, "File descriptor invalid");

    TEST_ASSERT_EQUAL(0, KineticSocket_DataBytesAvailable(FileDesc));

    const size_t expectedLen = 5;
    Socket_RequestBytes(expectedLen);
    int actualLen = KineticSocket_DataBytesAvailable(FileDesc);
    TEST_ASSERT_EQUAL(expectedLen, actualLen);
}



void test_KineticSocket_Read_should_read_data_from_the_specified_socket(void)
{
    LOG_LOCATION;
    FileDesc = KineticSocket_Connect("localhost", KineticTestPort);
    TEST_ASSERT_TRUE_MESSAGE(FileDesc >= 0, "File descriptor invalid");
    const size_t len = 5;
    uint8_t respData[len];
    ByteBuffer respBuffer = ByteBuffer_Create(respData, sizeof(respData), 0);

    Socket_RequestBytes(len);

    KineticStatus status = KineticSocket_Read(FileDesc, &respBuffer, len);

    TEST_ASSERT_EQUAL_KineticStatus_MESSAGE(
        KINETIC_STATUS_SUCCESS, status, "Failed to read from socket!");
    TEST_ASSERT_EQUAL_MESSAGE(
        len, respBuffer.bytesUsed, "Received incorrect number of bytes");
}

void test_KineticSocket_Read_should_timeout_if_requested_data_is_not_received_within_configured_timeout(void)
{
    LOG_LOCATION;
    FileDesc = KineticSocket_Connect("localhost", KineticTestPort);
    TEST_ASSERT_TRUE_MESSAGE(FileDesc >= 0, "File descriptor invalid");
    const size_t len = 5;
    uint8_t respData[len + 2];
    ByteBuffer respBuffer = ByteBuffer_Create(respData, sizeof(respData), 0);

    // Send request to test server to send us some data
    Socket_RequestBytes(len);

    // Try to read more than was requested, to cause a timeout
    KineticStatus status = KineticSocket_Read(
                               FileDesc, &respBuffer, sizeof(respData));
    TEST_ASSERT_EQUAL_KineticStatus_MESSAGE(
        KINETIC_STATUS_SOCKET_TIMEOUT, status,
        "Expected socket to timeout waiting on data!");
}

void test_KineticSocket_Read_should_read_up_to_the_array_length_into_the_buffer_discard_remainder_and_return_BUFFER_OVERRUN_with_bytesUsed_set_to_the_total_length_read(void)
{
    LOG_LOCATION;
    uint8_t respData[20];
    const size_t bufferLen = sizeof(respData);
    const size_t bytesToRead = bufferLen + 15; // Request more than the size of the allocated array
    ByteBuffer respBuffer = ByteBuffer_Create(respData, bufferLen, 0);

    FileDesc = KineticSocket_Connect("localhost", KineticTestPort);
    TEST_ASSERT_TRUE_MESSAGE(FileDesc >= 0, "File descriptor invalid");

    Socket_RequestBytes(bytesToRead);

    KineticStatus status = KineticSocket_Read(FileDesc, &respBuffer, bytesToRead);
    TEST_ASSERT_EQUAL_KineticStatus_MESSAGE(
        KINETIC_STATUS_BUFFER_OVERRUN, status,
        "Socket read should have failed due to too many bytes requested!");
    TEST_ASSERT_EQUAL_MESSAGE(
        bytesToRead, respBuffer.bytesUsed,
        "bytesUsed should reflect full length read upon overflow");
}

#if 0
void test_KineticSocket_ReadProtobuf_should_read_the_specified_length_of_an_encoded_protobuf_from_the_specified_socket(void)
{
    LOG_LOCATION;

    KineticConnection connection;
    KINETIC_CONNECTION_INIT(&connection);
    KineticSession session = {
        .config = (KineticSessionConfig) {
            .clusterVersion = 12345678,
            .identity = -12345678,
        },
        .connection = &connection,
    };
    connection.session = session;
    KINETIC_PDU_INIT_WITH_COMMAND(&PDU, &connection);
    KineticMessage_Init(&PDU.protoData.message);

    FileDesc = KineticSocket_Connect("localhost", KineticTestPort);
    TEST_ASSERT_TRUE_MESSAGE(FileDesc >= 0, "File descriptor invalid");

    // Send request to test server to send us a Kinetic protobuf
    Socket_RequestProtobuf();

    // Receive the response
    KineticPDU_Init(&PDU, &connection);
    PDU.header.protobufLength = 125;
    TEST_ASSERT_FALSE(PDU.protobufDynamicallyExtracted);
    TEST_ASSERT_NULL(PDU.proto);
    KineticStatus status = KineticSocket_ReadProtobuf(FileDesc, &PDU);
    TEST_ASSERT_EQUAL_KineticStatus_MESSAGE(KINETIC_STATUS_SUCCESS, status,
                                            "Failed receiving protobuf response");
    TEST_ASSERT_NOT_NULL_MESSAGE(
        PDU.proto,
        "Protobuf pointer was NULL, but expected dynamic memory allocation!");
    TEST_ASSERT_TRUE_MESSAGE(
        PDU.protobufDynamicallyExtracted,
        "Flag was not set per dynamically allocated/extracted protobuf");

    LOG0("Received Kinetic protobuf:");
    LOGF0("  command: (0x%zX)", (size_t)PDU.command);
    LOGF0("    header: (0x%zX)", (size_t)PDU.command->header);

    LOG0("Kinetic ProtoBuf read successfully!");
}

void test_KineticSocket_ReadProtobuf_should_return_false_if_KineticProto_of_specified_length_fails_to_be_read_within_timeout(void)
{
    LOG_LOCATION;
    KineticSession session = {
        .clusterVersion = 12345678,
        .identity = -12345678,
    };
    KineticConnection connection;
    KINETIC_CONNECTION_INIT(&connection);
    connection.session = session;

    FileDesc = KineticSocket_Connect("localhost", KineticTestPort);
    TEST_ASSERT_TRUE_MESSAGE(FileDesc >= 0, "File descriptor invalid");

    // Send request to test server to send us a Kinetic protobuf
    ByteArray requestArray = ByteArray_CreateWithCString("readProto()");
    ByteBuffer requestBuffer = ByteBuffer_CreateWithArray(requestArray);
    KineticStatus status = KineticSocket_Write(FileDesc, &requestBuffer);
    TEST_ASSERT_EQUAL_KineticStatus_MESSAGE(KINETIC_STATUS_SUCCESS, status,
                                            "Failed sending protobuf read request");

    // Receive the dummy protobuf response, but expect too much data
    // to force timeout
    KineticPDU_Init(&PDU, &connection);
    PDU.header.protobufLength = 1000;
    TEST_ASSERT_FALSE(PDU.protobufDynamicallyExtracted);
    TEST_ASSERT_NULL(PDU.proto);
    status = KineticSocket_ReadProtobuf(FileDesc, &PDU);
    TEST_ASSERT_EQUAL_KineticStatus_MESSAGE(
        KINETIC_STATUS_SOCKET_TIMEOUT, status,
        "Expected socket to timeout waiting on protobuf data!");
    TEST_ASSERT_FALSE_MESSAGE(PDU.protobufDynamicallyExtracted,
                              "Protobuf should not have been extracted because of timeout");
    TEST_ASSERT_NULL_MESSAGE(PDU.proto,
                             "Protobuf should not have been allocated because of timeout");
}
#endif

#endif // defined(__APPLE__)