Commit 6642f8ea authored by Scott Vokes's avatar Scott Vokes
Browse files

Update tests for Kinetic protocol message buffering / unpacking.

parent e0693cb0
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -57,6 +57,7 @@ LIB_OBJS = \
	$(OUT_DIR)/kinetic_nbo.o \
	$(OUT_DIR)/kinetic_operation.o \
	$(OUT_DIR)/kinetic_pdu.o \
	$(OUT_DIR)/kinetic_pdu_unpack.o \
	$(OUT_DIR)/kinetic_proto.o \
	$(OUT_DIR)/kinetic_socket.o \
	$(OUT_DIR)/kinetic_message.o \
+41 −31
Original line number Diff line number Diff line
@@ -25,17 +25,19 @@
#include "kinetic_hmac.h"
#include "kinetic_logger.h"
#include "kinetic_proto.h"




#include "kinetic_nbo.h"
#include "kinetic_allocator.h"
#include "kinetic_controller.h"
#include "bus.h"
#include "kinetic_pdu_unpack.h"

#ifdef TEST
#define STATIC
#else
#define STATIC static
#endif

static void log_cb(log_event_t event, int log_level, const char *msg, void *udata) {
STATIC void log_cb(log_event_t event, int log_level, const char *msg, void *udata) {
    (void)udata;
    const char *event_str = bus_log_event_str(event);
    fprintf(stderr, "%s[%d] -- %s\n",
@@ -54,8 +56,7 @@ static bus_sink_cb_res_t reset_transfer(socket_info *si) {
    return res;
}


static bool unpack_header(uint8_t const * const read_buf, size_t const read_size, KineticPDUHeader * const header)
STATIC bool unpack_header(uint8_t const * const read_buf, size_t const read_size, KineticPDUHeader * const header)
{
    if (read_size != sizeof(KineticPDUHeader)) {
        return false;
@@ -67,19 +68,21 @@ static bool unpack_header(uint8_t const * const read_buf, size_t const read_size
    uint32_t valueLength = KineticNBO_ToHostU32(buf_header->valueLength);
    uint8_t versionPrefix = buf_header->versionPrefix;

    if (protobufLength <= PDU_PROTO_MAX_LEN &&
        valueLength <= PDU_PROTO_MAX_LEN) {

        *header = (KineticPDUHeader){
            .versionPrefix = versionPrefix,
            .protobufLength = protobufLength,
            .valueLength = valueLength,
        };
    if (header->protobufLength <= PDU_PROTO_MAX_LEN &&
        header->valueLength <= PDU_PROTO_MAX_LEN) {
        return true;
    } else {
        return false;
    }
}
static bus_sink_cb_res_t sink_cb(uint8_t *read_buf,

STATIC bus_sink_cb_res_t sink_cb(uint8_t *read_buf,
        size_t read_size, void *socket_udata) {

    KineticConnection * connection = (KineticConnection *)socket_udata;
@@ -145,7 +148,7 @@ static bus_sink_cb_res_t sink_cb(uint8_t *read_buf,
    }
}

static bus_unpack_cb_res_t unpack_cb(void *msg, void *socket_udata) {
STATIC bus_unpack_cb_res_t unpack_cb(void *msg, void *socket_udata) {
    (void)socket_udata;
    /* just got .full_msg_buffer from sink_cb -- pass it along as-is */
    socket_info *si = (socket_info *)msg;
@@ -158,36 +161,43 @@ static bus_unpack_cb_res_t unpack_cb(void *msg, void *socket_udata) {
        };
    }

    KineticResponse * reponse = KineticAllocator_NewKineticResponse(si->header.valueLength);
    KineticResponse * response = KineticAllocator_NewKineticResponse(si->header.valueLength);

    if (reponse == NULL) {
    if (response == NULL) {
        bus_unpack_cb_res_t res = {
            .ok = false,
            .u.error.opaque_error_id = UNPACK_ERROR_PAYLOAD_MALLOC_FAIL,
        };
        return res;
    } else {
        reponse->header = si->header;
        reponse->proto = KineticProto_Message__unpack(NULL, si->header.protobufLength, si->buf);
        if (reponse->proto->has_commandBytes &&
            reponse->proto->commandBytes.data != NULL &&
            reponse->proto->commandBytes.len > 0)
        response->header = si->header;

        response->proto = KineticPDU_unpack_message(NULL, si->header.protobufLength, si->buf);
        if (response->proto->has_commandBytes &&
            response->proto->commandBytes.data != NULL &&
            response->proto->commandBytes.len > 0)
        {
            reponse->command = KineticProto_command__unpack(NULL, reponse->proto->commandBytes.len, reponse->proto->commandBytes.data);
            response->command = KineticPDU_unpack_command(NULL,
                response->proto->commandBytes.len, response->proto->commandBytes.data);
        } else {
            reponse->command = NULL;
            response->command = NULL;
        }

        if (reponse->header.valueLength > 0)
        if (response->header.valueLength > 0)
        {
            memcpy(reponse->value, &si->buf[si->header.protobufLength], si->header.valueLength);
            memcpy(response->value, &si->buf[si->header.protobufLength], si->header.valueLength);
        }

        int64_t seq_id = 0;
        if (response->command) {
            seq_id = response->command->header->ackSequence;
        }

        bus_unpack_cb_res_t res = {
            .ok = true,
            .u.success = {
                .seq_id = reponse->command->header->ackSequence,
                .msg = reponse,
                .seq_id = seq_id,
                .msg = response,
            },
        };
        return res;
@@ -232,8 +242,8 @@ KineticStatus KineticPDU_GetStatus(KineticResponse* response)
KineticProto_Command_KeyValue* KineticPDU_GetKeyValue(KineticResponse* response)
{
    KineticProto_Command_KeyValue* keyValue = NULL;

    if (response != NULL &&
        response->command != NULL &&
        response->command != NULL &&
        response->command->body != NULL)
    {
+31 −0
Original line number Diff line number Diff line
/*
* kinetic-c
* Copyright (C) 2014 Seagate Technology.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
*/

#include "kinetic_pdu_unpack.h"

KineticProto_Command *KineticPDU_unpack_command(ProtobufCAllocator* allocator,
        size_t len, const uint8_t* data) {
    return KineticProto_command__unpack(allocator, len, data);
}

KineticProto_Message* KineticPDU_unpack_message(ProtobufCAllocator* allocator,
        size_t len, const uint8_t* data) {
    return KineticProto_command__unpack(allocator, len, data);
}
+33 −0
Original line number Diff line number Diff line
/*
* kinetic-c
* Copyright (C) 2014 Seagate Technology.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
*/

#ifndef KINETIC_PDU_UNPACK_H
#define KINETIC_PDU_UNPACK_H

#include "kinetic_proto.h"

/* This wrapper only exists for mocking purposes. */
KineticProto_Command *KineticPDU_unpack_command(ProtobufCAllocator* allocator,
    size_t len, const uint8_t* data);

KineticProto_Message* KineticPDU_unpack_message(ProtobufCAllocator* allocator,
    size_t len, const uint8_t* data);

#endif
+258 −296

File changed.

Preview size limit exceeded, changes collapsed.

Loading