Commit 425669fb authored by Job Vranish's avatar Job Vranish Committed by Scott Vokes
Browse files

added basic recieve test that uses the bus API to unpack and handle incoming protobuf messages

parent dd77fcd0
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -319,7 +319,7 @@ static void open_sockets(example_state *s) {
        socket99_result res;

        if (!socket99_open(&cfg, &res)) {
            socket99_fprintf(stderr, &res);
            // socket99_fprintf(stderr, &res);
            exit(1);
        }

+303 −0
Original line number Diff line number Diff line
/*
* kinetic-c
* Copyright (C) 2014 Seagate Technology.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
*/
#include "byte_array.h"
#include "unity.h"
#include "unity_helper.h"
#include "system_test_fixture.h"
#include "protobuf-c/protobuf-c.h"
#include "socket99.h"
#include <string.h>
#include <stdlib.h>

#include "kinetic_client.h"
#include "kinetic_types.h"
#include "kinetic_types_internal.h"
#include "kinetic_controller.h"
#include "kinetic_device_info.h"
#include "kinetic_serial_allocator.h"
#include "kinetic_proto.h"
#include "kinetic_allocator.h"
#include "kinetic_message.h"
#include "kinetic_pdu.h"
#include "kinetic_logger.h"
#include "kinetic_operation.h"
#include "kinetic_hmac.h"
#include "kinetic_connection.h"
#include "kinetic_socket.h"
#include "kinetic_nbo.h"

#include "bus.h"

void setUp(void)
{ LOG_LOCATION;
}

void tearDown(void)
{ LOG_LOCATION;

}

enum socket_state {
    STATE_UNINIT = 0,
    STATE_AWAITING_HEADER,
    STATE_AWAITING_BODY,
};

enum unpack_error {
    UNPACK_ERROR_UNDEFINED,
    UNPACK_ERROR_SUCCESS,
    UNPACK_ERROR_INVALID_HEADER,
    UNPACK_ERROR_PAYLOAD_MALLOC_FAIL,
};

typedef struct {
    enum socket_state state;
    KineticPDUHeader header;
    enum unpack_error unpack_status;
    size_t accumulated;
    uint8_t buf[];
} socket_info;

static void log_cb(log_event_t event, int log_level, const char *msg, void *udata) {
    (void)udata;
    const char *event_str = bus_log_event_str(event);
    fprintf(stderr, "%s[%d] -- %s\n",
        event_str, log_level, msg);
}

static bus_sink_cb_res_t reset_transfer(socket_info *si) {
    bus_sink_cb_res_t res = { /* prime pump with header size */
        .next_read = sizeof(KineticPDUHeader),
    };
    
    si->state = STATE_AWAITING_HEADER;
    si->accumulated = 0;
    si->unpack_status = UNPACK_ERROR_UNDEFINED;
    memset(&si->header, 0x00, sizeof(si->header));
    return res;
}


static bool unpack_header(uint8_t const * const read_buf, size_t const read_size, KineticPDUHeader * const header)
{
    if (read_size != sizeof(KineticPDUHeader)) {
        return false;
        // TODO this will fail if we don't get all of the header bytes in one read
        // we should fix this
    } 
    KineticPDUHeader const * const buf_header = (KineticPDUHeader const * const)read_buf;
    uint32_t protobufLength = KineticNBO_ToHostU32(buf_header->protobufLength);
    uint32_t valueLength = KineticNBO_ToHostU32(buf_header->valueLength);
    uint8_t versionPrefix = buf_header->versionPrefix;

    *header = (KineticPDUHeader){
        .versionPrefix = versionPrefix,
        .protobufLength = protobufLength,
        .valueLength = valueLength,
    };
    if (header->protobufLength <= PDU_PROTO_MAX_LEN &&
        header->valueLength <= PDU_PROTO_MAX_LEN) {
        return true;
    } else {
        return false;
    }
}
static bus_sink_cb_res_t sink_cb(uint8_t *read_buf,
        size_t read_size, void *socket_udata) {

    socket_info *si = (socket_info *)socket_udata;
    assert(si);

    switch (si->state) {
    case STATE_UNINIT:
    {
        assert(read_size == 0);
        return reset_transfer(si);
    }
    case STATE_AWAITING_HEADER:
    {
        if (unpack_header(read_buf, read_size, &si->header))
        {
            si->accumulated = 0;
            si->unpack_status = UNPACK_ERROR_SUCCESS;
            si->state = STATE_AWAITING_BODY;
            bus_sink_cb_res_t res = {
                .next_read = si->header.protobufLength + si->header.valueLength,
            };
            return res;
        } else {
            si->accumulated = 0;
            si->unpack_status = UNPACK_ERROR_INVALID_HEADER;
            si->state = STATE_AWAITING_HEADER;
            bus_sink_cb_res_t res = {
                .next_read = sizeof(KineticPDUHeader),
                .full_msg_buffer = si,
            };
            return res;
        }
        break;
    } 
    case STATE_AWAITING_BODY:
    {
        memcpy(&si->buf[si->accumulated], read_buf, read_size);
        si->accumulated += read_size;

        uint32_t remaining = si->header.protobufLength + si->header.valueLength - si->accumulated;

        if (remaining == 0) {
            si->state = STATE_AWAITING_HEADER;
            bus_sink_cb_res_t res = {
                .next_read = sizeof(KineticPDUHeader),
                // returning the whole si, because we need access to the pdu header as well 
                //  as the protobuf and value bytes
                .full_msg_buffer = si,
            };
            return res;
        } else {
            bus_sink_cb_res_t res = {
                .next_read = remaining,
            };
            return res;
        }
        break;
    }
    default:
        assert(false);
    }
}

struct kinetic_response_payload
{
    KineticPDUHeader header;
    KineticProto_Message* protoBuf;
    KineticProto_Command* command;
    uint8_t value[];
};

static void free_payload(struct kinetic_response_payload * payload)
{
    if (payload != NULL) {
        if (payload->command) {
            protobuf_c_message_free_unpacked(&payload->command->base, NULL);
        }
        if (payload->protoBuf) {
            protobuf_c_message_free_unpacked(&payload->protoBuf->base, NULL);
        }
        free(payload);
    }
}

static bus_unpack_cb_res_t unpack_cb(void *msg, void *socket_udata) {
    /* just got .full_msg_buffer from sink_cb -- pass it along as-is */
    socket_info *si = (socket_info *)msg;
    assert(socket_udata == si); //These don't have to be equal, but in this case they will be.

    if (si->unpack_status != UNPACK_ERROR_SUCCESS)
    {
        return (bus_unpack_cb_res_t) {
            .ok = false,
            .u.error.opaque_error_id = si->unpack_status,
        };
    }

    struct kinetic_response_payload * payload = malloc(sizeof(*payload) + si->header.valueLength);

    if (payload == NULL) {
        bus_unpack_cb_res_t res = {
            .ok = false,
            .u.error.opaque_error_id = UNPACK_ERROR_PAYLOAD_MALLOC_FAIL,
        };
        return res;
    } else {
        payload->header = si->header;
        payload->protoBuf = KineticProto_Message__unpack(NULL, si->header.protobufLength, si->buf);
        if (payload->protoBuf->has_commandBytes &&
            payload->protoBuf->commandBytes.data != NULL &&
            payload->protoBuf->commandBytes.len > 0)
        {
            payload->command = KineticProto_command__unpack(NULL, payload->protoBuf->commandBytes.len, payload->protoBuf->commandBytes.data);
        } else {
            payload->command = NULL;
        }

        if (payload->header.valueLength > 0)
        {
            memcpy(payload->value, &si->buf[si->header.protobufLength], si->header.valueLength);
        }

        bus_unpack_cb_res_t res = {
            .ok = true,
            .u.success = {
                .seq_id = payload->command->header->ackSequence,
                .msg = payload,
            },
        };
        return res;
    }
}

static void unexpected_msg_cb(void *msg,
        int64_t seq_id, void *bus_udata, void *socket_udata) {
    struct kinetic_response_payload * payload = (struct kinetic_response_payload *)msg;

    KineticLogger_LogProtobuf(3, payload->protoBuf);

    printf("\n\n\nUNEXPECTED MESSAGE: %p, seq_id %lld, bus_udata %p, socket_udata %p\n\n\n\n",
        msg, seq_id, bus_udata, socket_udata);

    free_payload(payload);
}


void test_that_we_can_register_sockets(void)
{ LOG_LOCATION;

    KineticLogger_Init("stdout", 3);
    bus_config cfg = {
        .log_cb = log_cb,
        .log_level = 2,
        .sink_cb = sink_cb,
        .unpack_cb = unpack_cb,
        .unexpected_msg_cb = unexpected_msg_cb,
        .bus_udata = NULL,
    };
    bus_result res = {0};
    if (!bus_init(&cfg, &res)) {
        LOGF1(0, "failed to init bus: %d\n", res.status);
        return;
    }
    int fd = KineticSocket_Connect("localhost", 8123);
    assert(fd != KINETIC_SOCKET_DESCRIPTOR_INVALID);

    socket_info * si = calloc(1, sizeof(socket_info) + 2 * PDU_PROTO_MAX_LEN);
    assert(si != NULL);
    bool result = bus_register_socket(res.bus, fd, si);
    assert(result);

    sleep(5);

    bus_shutdown(res.bus);

    free(si);

    KineticSocket_Close(fd);

    KineticLogger_Close();
}
+1 −1
Original line number Diff line number Diff line
@@ -46,7 +46,7 @@ static KineticOperation Operation;

void setUp(void)
{
    KineticLogger_Init("stdout", 3);
    KineticLogger_Init("stdout", 1);
    KINETIC_CONNECTION_INIT(&Connection);
    Connection.connectionID = ConnectionID;
    Session.connection = &Connection;