Commit f7da6b52 authored by Scott Vokes's avatar Scott Vokes
Browse files

Refactor KineticOperation_SendRequestInner, extracting kinetic_request.c.

This is intended to break up KineticOperation_SendRequestInner, making
the dataflow for locking and error handling clearer, and to add better
seams for testing.
parent 8a6cfb2b
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -58,6 +58,7 @@ LIB_OBJS = \
	$(OUT_DIR)/kinetic_allocator.o \
	$(OUT_DIR)/kinetic_nbo.o \
	$(OUT_DIR)/kinetic_operation.o \
	$(OUT_DIR)/kinetic_request.o \
	$(OUT_DIR)/kinetic_response.o \
	$(OUT_DIR)/kinetic_bus.o \
	$(OUT_DIR)/kinetic_auth.o \
+44 −136
Original line number Diff line number Diff line
@@ -23,25 +23,27 @@
#include "kinetic_message.h"
#include "kinetic_bus.h"
#include "kinetic_response.h"
#include "kinetic_nbo.h"
#include "kinetic_auth.h"
#include "kinetic_socket.h"
#include "kinetic_device_info.h"
#include "kinetic_allocator.h"
#include "kinetic_logger.h"
#include <pthread.h>
#include "kinetic_request.h"

#include <stdlib.h>
#include <errno.h>
#include <sys/time.h>
#include <stdio.h>

#include "bus.h"
#include "acl.h"

#define ATOMIC_FETCH_AND_INCREMENT(P) __sync_fetch_and_add(P, 1)

#ifdef TEST
uint8_t * msg = NULL;
size_t msgSize = 0;
#endif

static void KineticOperation_ValidateOperation(KineticOperation* operation);
static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const operation);
static KineticStatus KineticOperation_SendRequestInLock(KineticOperation* const operation);

KineticStatus KineticOperation_SendRequest(KineticOperation* const operation)
{
@@ -49,26 +51,17 @@ KineticStatus KineticOperation_SendRequest(KineticOperation* const operation)
    KINETIC_ASSERT(operation->connection != NULL);
    KINETIC_ASSERT(operation->request != NULL);
    
    KineticStatus status = KineticOperation_SendRequestInner(operation);
    if (status != KINETIC_STATUS_SUCCESS)
    {
        //cleanup
        KineticRequest* request = operation->request;
        if (request != NULL) {
            if (request->message.message.commandBytes.data != NULL) {
                free(request->message.message.commandBytes.data);
                request->message.message.commandBytes.data = NULL;
    if (!KineticRequest_LockOperation(operation)) {
        return KINETIC_STATUS_CONNECTION_ERROR;
    }
            free(request);
            operation->request = NULL;
        }
    }

    KineticStatus status = KineticOperation_SendRequestInLock(operation);
    KineticRequest_UnlockOperation(operation);
    return status;
}

static void log_request_seq_id(int fd, int64_t seq_id, KineticMessageType mt)
{
    #ifndef TEST
    #if KINETIC_LOGGER_LOG_SEQUENCE_ID
    struct timeval tv;
    gettimeofday(&tv, NULL);
@@ -79,151 +72,66 @@ static void log_request_seq_id(int fd, int64_t seq_id, KineticMessageType mt)
    (void)seq_id;
    (void)mt;
    #endif
    #endif
}

// TODO: Assess refactoring this method by dissecting out Operation and relocate to kinetic_pdu
static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const operation)
/* Send request.
 * Note: This whole function operates with operation->connection->sendMutex locked. */
static KineticStatus KineticOperation_SendRequestInLock(KineticOperation* const operation)
{
    LOGF3("\nSending PDU via fd=%d", operation->connection->socket);

    KineticStatus status = KINETIC_STATUS_INVALID;
    uint8_t * msg = NULL;
    KineticRequest* request = operation->request;

    KINETIC_ASSERT(operation);
    KINETIC_ASSERT(operation->request);
    KINETIC_ASSERT(operation->connection);
    KINETIC_ASSERT(request);

    // Acquire lock
    pthread_mutex_t* sendMutex = &operation->connection->sendMutex;
    pthread_mutex_lock(sendMutex);
    KineticSession *session = operation->connection->pSession;
    KineticRequest* request = operation->request;

    // Populate sequence count and increment it for next operation
    int64_t seq_id = KineticSession_GetNextSequenceCount(operation->connection->pSession);
    KINETIC_ASSERT(request->message.header.sequence == KINETIC_SEQUENCE_NOT_YET_BOUND);
    int64_t seq_id = KineticSession_GetNextSequenceCount(session);
    request->message.header.sequence = seq_id;

    // Pack the command, if available
    size_t expectedLen = KineticProto_command__get_packed_size(&request->message.command);
    request->message.message.commandBytes.data = (uint8_t*)malloc(expectedLen);
    if(request->message.message.commandBytes.data == NULL)
    {
        LOG0("Failed to allocate command bytes!");
        status = KINETIC_STATUS_MEMORY_ERROR;
        goto cleanup;
    size_t expectedLen = KineticRequest_PackCommand(request);
    if (expectedLen == KINETIC_REQUEST_PACK_FAILURE) {
        return KINETIC_STATUS_MEMORY_ERROR;
    }
    size_t packedLen = KineticProto_command__pack(
        &request->message.command,
        request->message.message.commandBytes.data);
    KINETIC_ASSERT(packedLen == expectedLen);
    request->message.message.commandBytes.len = packedLen;
    request->message.message.has_commandBytes = true;
    KineticLogger_LogByteArray(3, "commandBytes", (ByteArray){
        .data = request->message.message.commandBytes.data,
        .len = request->message.message.commandBytes.len,
    });
    uint8_t * commandData = request->message.message.commandBytes.data;

    log_request_seq_id(operation->connection->socket, seq_id, request->message.header.messageType);

    // Populate the HMAC/PIN for protobuf authentication
    if (operation->pin != NULL) {
        status = KineticAuth_PopulatePin(&session->config, operation->request, *operation->pin);
    }
    else {
        status = KineticAuth_PopulateHmac(&session->config, operation->request);
    }
    KineticSession *session = operation->connection->pSession;
    KineticStatus status = KineticRequest_PopulateAuthentication(&session->config,
        operation->request, operation->pin);
    if (status != KINETIC_STATUS_SUCCESS) {
        LOG0("Failed populating authentication info for new request!");
        if (commandData) { free(commandData); }
        return status;        
    }

    // Configure PDU header
    KineticProto_Message* proto = &operation->request->message.message;
    KineticPDUHeader header = {
        .versionPrefix = 'F',
        .protobufLength = KineticProto_Message__get_packed_size(proto)
    };
    header.valueLength = operation->value.len;
    uint32_t nboProtoLength = KineticNBO_FromHostU32(header.protobufLength);
    uint32_t nboValueLength = KineticNBO_FromHostU32(header.valueLength);

    // Allocate and pack protobuf message
    size_t offset = 0;
    msg = malloc(PDU_HEADER_LEN + header.protobufLength + header.valueLength);
    if (msg == NULL) {
        LOG0("Failed to allocate outgoing message!");
        status = KINETIC_STATUS_MEMORY_ERROR;
        goto cleanup;
    #ifndef TEST
    uint8_t * msg = NULL;
    size_t msgSize = 0;
    #endif
    status = KineticRequest_PackMessage(operation, &msg, &msgSize);
    if (status != KINETIC_STATUS_SUCCESS) {
        if (commandData) { free(commandData); }
        return status;
    }

    // Pack header
    msg[offset] = header.versionPrefix;
    offset += sizeof(header.versionPrefix);
    memcpy(&msg[offset], &nboProtoLength, sizeof(nboProtoLength));
    offset += sizeof(nboProtoLength);
    memcpy(&msg[offset], &nboValueLength, sizeof(nboValueLength));
    offset += sizeof(nboValueLength);
    size_t len = KineticProto_Message__pack(&request->message.message, &msg[offset]);
    KINETIC_ASSERT(len == header.protobufLength);
    offset += header.protobufLength;

    // Log protobuf per configuration
    LOGF2("[PDU TX] pdu: %p, session: %p, bus: %p, "
        "fd: %6d, seq: %8lld, protoLen: %8u, valueLen: %8u, op: %p, msgType: %02x",
        (void*)operation->request,
        (void*)operation->connection->pSession, (void*)operation->connection->messageBus,
        operation->connection->socket, (long long)request->message.header.sequence,
        header.protobufLength, header.valueLength,
        (void*)operation, request->message.header.messageType);
    KineticLogger_LogHeader(3, &header);
    KineticLogger_LogProtobuf(3, proto);

    // Free command bytes now that we are done
    free(request->message.message.commandBytes.data);
    request->message.message.commandBytes.data = NULL;

    // Pack value payload, if supplied
    if (header.valueLength > 0) {
        memcpy(&msg[offset], operation->value.data, operation->value.len);
        offset += operation->value.len;
    }
    KINETIC_ASSERT((PDU_HEADER_LEN + header.protobufLength + header.valueLength) == offset);

    // Claim a send PDU for proper throttling of concurrent requests to avoid flooding the drive
    if (commandData) { free(commandData); }
    KineticCountingSemaphore * const sem = operation->connection->outstandingOperations;
    KineticCountingSemaphore_Take(sem);
    int fd = operation->connection->socket;
    if (!bus_send_request(operation->connection->messageBus, &(bus_user_msg) {
        .fd       = fd,
        .type     = BUS_SOCKET_PLAIN,
        .seq_id   = seq_id,
        .msg      = msg,
        .msg_size = offset,
        .cb       = KineticController_HandleResult,
        .udata    = operation,
        .timeout_sec = operation->timeoutSeconds,
        }))
    {
        LOGF0("Failed queuing request %p for transmit on fd=%d w/seq=%lld",
            (void*)request, fd, (long long)seq_id);
        // Since the request PDU failed getting queued into the bus,
        // release this request from the throttle to clean up properly
        KineticCountingSemaphore_Give(sem);
    KineticCountingSemaphore_Take(sem);  // limit total concurrent requests

    if (!KineticRequest_SendRequest(operation, msg, msgSize)) {
        LOGF0("Failed queuing request %p for transmit on fd=%d w/seq=%lld",
            (void*)request, operation->connection->socket, (long long)seq_id);
        /* A false result from bus_send_request means that the request was
         * rejected outright, so the usual asynchronous, callback-based
         * error handling for errors during the request or response will
         * not be used. */
        KineticCountingSemaphore_Give(sem);
        status = KINETIC_STATUS_REQUEST_REJECTED;
    }
    else {
    } else {
        status = KINETIC_STATUS_SUCCESS;
    }

cleanup:
    pthread_mutex_unlock(sendMutex);
    if (msg != NULL) { free(msg); }
    return status;
}
+158 −0
Original line number Diff line number Diff line
/*
* kinetic-c
* Copyright (C) 2014 Seagate Technology.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
*/
#include "kinetic_request.h"
#include <pthread.h>

#include "kinetic_logger.h"
#include "kinetic_session.h"
#include "kinetic_auth.h"
#include "kinetic_nbo.h"
#include "kinetic_controller.h"
#include "byte_array.h"
#include "bus.h"

#ifdef TEST
uint8_t *cmdBuf = NULL;
uint8_t *msg = NULL;
#endif

size_t KineticRequest_PackCommand(KineticRequest* request)
{
    size_t expectedLen = KineticProto_command__get_packed_size(&request->message.command);
    #ifndef TEST
    uint8_t *cmdBuf = (uint8_t*)malloc(expectedLen);
    #endif
    if (cmdBuf == NULL)
    {
        LOGF0("Failed to allocate command bytes: %zd", expectedLen);
        return KINETIC_REQUEST_PACK_FAILURE;
    }
    request->message.message.commandBytes.data = cmdBuf;

    size_t packedLen = KineticProto_command__pack(
        &request->message.command, cmdBuf);
    KINETIC_ASSERT(packedLen == expectedLen);
    request->message.message.commandBytes.len = packedLen;
    request->message.message.has_commandBytes = true;
    KineticLogger_LogByteArray(3, "commandBytes", (ByteArray){
        .data = request->message.message.commandBytes.data,
        .len = request->message.message.commandBytes.len,
    });

    return packedLen;
}

KineticStatus KineticRequest_PopulateAuthentication(KineticSessionConfig *config,
    KineticRequest *request, ByteArray *pin)
{
    if (pin != NULL) {
        return KineticAuth_PopulatePin(config, request, *pin);
    } else {
        return KineticAuth_PopulateHmac(config, request);
    }
}

KineticStatus KineticRequest_PackMessage(KineticOperation *operation,
    uint8_t **out_msg, size_t *msgSize)
{
    // Configure PDU header
    KineticProto_Message* proto = &operation->request->message.message;
    KineticPDUHeader header = {
        .versionPrefix = 'F',
        .protobufLength = KineticProto_Message__get_packed_size(proto)
    };
    header.valueLength = operation->value.len;
    uint32_t nboProtoLength = KineticNBO_FromHostU32(header.protobufLength);
    uint32_t nboValueLength = KineticNBO_FromHostU32(header.valueLength);

    // Allocate and pack protobuf message
    size_t offset = 0;
    #ifndef TEST
    uint8_t *msg = malloc(PDU_HEADER_LEN + header.protobufLength + header.valueLength);
    #endif
    if (msg == NULL) {
        LOG0("Failed to allocate outgoing message!");
        return KINETIC_STATUS_MEMORY_ERROR;
    }

    // Pack header
    KineticRequest* request = operation->request;
    msg[offset] = header.versionPrefix;
    offset += sizeof(header.versionPrefix);
    memcpy(&msg[offset], &nboProtoLength, sizeof(nboProtoLength));
    offset += sizeof(nboProtoLength);
    memcpy(&msg[offset], &nboValueLength, sizeof(nboValueLength));
    offset += sizeof(nboValueLength);
    size_t len = KineticProto_Message__pack(&request->message.message, &msg[offset]);
    KINETIC_ASSERT(len == header.protobufLength);
    offset += header.protobufLength;

    #ifndef TEST
    // Log protobuf per configuration
    LOGF2("[PDU TX] pdu: %p, session: %p, bus: %p, "
        "fd: %6d, seq: %8lld, protoLen: %8u, valueLen: %8u, op: %p, msgType: %02x",
        (void*)operation->request,
        (void*)operation->connection->pSession, (void*)operation->connection->messageBus,
        operation->connection->socket, (long long)request->message.header.sequence,
        header.protobufLength, header.valueLength,
        (void*)operation, request->message.header.messageType);
    KineticLogger_LogHeader(3, &header);
    KineticLogger_LogProtobuf(3, proto);
    #endif
    
    // Pack value payload, if supplied
    if (header.valueLength > 0) {
        memcpy(&msg[offset], operation->value.data, operation->value.len);
        offset += operation->value.len;
    }
    KINETIC_ASSERT((PDU_HEADER_LEN + header.protobufLength + header.valueLength) == offset);

    *out_msg = msg;
    *msgSize = offset;
    return KINETIC_STATUS_SUCCESS;
}

bool KineticRequest_SendRequest(KineticOperation *operation,
    uint8_t *msg, size_t msgSize)
{
    KINETIC_ASSERT(msg);
    KINETIC_ASSERT(msgSize > 0);
    bus_user_msg bus_msg = {
        .fd       = operation->connection->socket,
        .type     = BUS_SOCKET_PLAIN,  // FIXME: no SSL?
        .seq_id   = operation->request->message.header.sequence,
        .msg      = msg,
        .msg_size = msgSize,
        .cb       = KineticController_HandleResult,
        .udata    = operation,
        .timeout_sec = operation->timeoutSeconds,
    };
    return bus_send_request(operation->connection->messageBus, &bus_msg);
}

bool KineticRequest_LockOperation(KineticOperation *operation)
{
    return 0 == pthread_mutex_lock(&operation->connection->sendMutex);
}

bool KineticRequest_UnlockOperation(KineticOperation *operation)
{
    return 0 == pthread_mutex_unlock(&operation->connection->sendMutex);
}
+55 −0
Original line number Diff line number Diff line
/*
* kinetic-c
* Copyright (C) 2014 Seagate Technology.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*
*/
#ifndef KINETIC_REQUEST_H
#define KINETIC_REQUEST_H

#include "kinetic_types_internal.h"

/* Special 'failed to pack' sentinel. */
#define KINETIC_REQUEST_PACK_FAILURE ((size_t)-1)

/* Pack the command into request->message.message.commandBytes.data.
 * Returns the allocated size,
 * or KINETIC_REQUEST_PACK_FAILURE on malloc failure. */
size_t KineticRequest_PackCommand(KineticRequest* request);

/* Populate the request's authentication info. If PIN is non-NULL,
 * use PIN authentication, otherwise use HMAC. */
KineticStatus KineticRequest_PopulateAuthentication(KineticSessionConfig *config,
    KineticRequest *request, ByteArray *pin);

/* Pack the header, command, and value (if any), allocating a buffer and
 * returning the buffer and its size in *msg and *msgSize.
 * Returns KINETIC_STATUS_SUCCESS on success, or KINETIC_STATUS_MEMORY_ERROR
 * on allocation failure. */
KineticStatus KineticRequest_PackMessage(KineticOperation *operation,
    uint8_t **msg, size_t *msgSize);

/* Send the request. Returns whether the request was successfully queued
 * up for delivery, or whether it was rejected due to invalid arguments.
 * If this returns false, then the asynchronous result callback will
 * not be called. */
bool KineticRequest_SendRequest(KineticOperation *operation,
    uint8_t *msg, size_t msgSize);

bool KineticRequest_LockOperation(KineticOperation *operation);
bool KineticRequest_UnlockOperation(KineticOperation *operation);

#endif
+103 −9
Original line number Diff line number Diff line
@@ -23,7 +23,6 @@
#include "byte_array.h"
#include "kinetic_types.h"
#include "kinetic_operation.h"
#include "kinetic_nbo.h"
#include "kinetic_proto.h"
#include "kinetic_logger.h"
#include "kinetic_types_internal.h"
@@ -32,13 +31,10 @@
#include "mock_kinetic_session.h"
#include "mock_kinetic_message.h"
#include "mock_kinetic_response.h"
#include "mock_kinetic_socket.h"
#include "mock_kinetic_auth.h"
#include "mock_kinetic_hmac.h"
#include "mock_bus.h"
#include "mock_acl.h"
#include "mock_kinetic_controller.h"
#include "mock_kinetic_countingsemaphore.h"
#include "mock_kinetic_request.h"

static KineticSessionConfig SessionConfig;
static KineticSession Session;
@@ -47,6 +43,9 @@ static const int64_t ConnectionID = 12345;
static KineticRequest Request;
static KineticOperation Operation;

extern uint8_t * msg;
extern size_t msgSize;

void setUp(void)
{
    KineticLogger_Init("stdout", 1);
@@ -86,14 +85,111 @@ void test_KineticOperation_Init_should_configure_the_operation(void)


/*******************************************************************************
 * Client Operations
 * Requests
*******************************************************************************/

void test_KineticOperation_SendRequest_should_error_out_on_lock_failure(void)
{
    KineticRequest_LockOperation_ExpectAndReturn(&Operation, false);
    KineticStatus status = KineticOperation_SendRequest(&Operation);
    TEST_ASSERT_EQUAL(KINETIC_STATUS_CONNECTION_ERROR, status);
}


void test_KineticOperation_SendRequest_should_return_MEMORY_ERROR_on_command_pack_failure(void)
{
    KineticRequest_LockOperation_ExpectAndReturn(&Operation, true);
    KineticSession *session = Operation.connection->pSession;
    KineticSession_GetNextSequenceCount_ExpectAndReturn(session, 12345);

    KineticRequest_PackCommand_ExpectAndReturn(Operation.request, KINETIC_REQUEST_PACK_FAILURE);
    KineticRequest_UnlockOperation_ExpectAndReturn(&Operation, true);

    KineticStatus status = KineticOperation_SendRequest(&Operation);
    TEST_ASSERT_EQUAL(KINETIC_STATUS_MEMORY_ERROR, status);
}

void test_KineticOperation_SendRequest_should_return_error_status_on_authentication_failure(void)
{
    KineticRequest_LockOperation_ExpectAndReturn(&Operation, true);
    KineticSession *session = Operation.connection->pSession;
    KineticSession_GetNextSequenceCount_ExpectAndReturn(session, 12345);

    KineticRequest_PackCommand_ExpectAndReturn(Operation.request, 100);
    KineticRequest_PopulateAuthentication_ExpectAndReturn(&session->config,
        Operation.request, NULL, KINETIC_STATUS_HMAC_REQUIRED);
    KineticRequest_UnlockOperation_ExpectAndReturn(&Operation, true);

    KineticStatus status = KineticOperation_SendRequest(&Operation);
    TEST_ASSERT_EQUAL(KINETIC_STATUS_HMAC_REQUIRED, status);
}

void test_KineticOperation_SendRequest_should_return_error_status_on_PackMessage_failure(void)
{
    KineticRequest_LockOperation_ExpectAndReturn(&Operation, true);
    KineticSession *session = Operation.connection->pSession;
    KineticSession_GetNextSequenceCount_ExpectAndReturn(session, 12345);

    KineticRequest_PackCommand_ExpectAndReturn(Operation.request, 100);
    KineticRequest_PopulateAuthentication_ExpectAndReturn(&session->config,
        Operation.request, NULL, KINETIC_STATUS_SUCCESS);

    KineticRequest_PackMessage_ExpectAndReturn(&Operation, &msg, &msgSize,
        KINETIC_STATUS_MEMORY_ERROR);
    KineticRequest_UnlockOperation_ExpectAndReturn(&Operation, true);

    KineticStatus status = KineticOperation_SendRequest(&Operation);
    TEST_ASSERT_EQUAL(KINETIC_STATUS_MEMORY_ERROR, status);
}

void test_KineticOperation_SendRequest_should_return_REQUEST_REJECTED_if_SendRequest_fails(void)
{
    KineticRequest_LockOperation_ExpectAndReturn(&Operation, true);
    KineticSession *session = Operation.connection->pSession;
    KineticSession_GetNextSequenceCount_ExpectAndReturn(session, 12345);

    KineticRequest_PackCommand_ExpectAndReturn(Operation.request, 100);
    KineticRequest_PopulateAuthentication_ExpectAndReturn(&session->config,
        Operation.request, NULL, KINETIC_STATUS_SUCCESS);

    KineticRequest_PackMessage_ExpectAndReturn(&Operation, &msg, &msgSize, KINETIC_STATUS_SUCCESS);

    KineticCountingSemaphore_Take_Expect(Operation.connection->outstandingOperations);

    KineticRequest_SendRequest_ExpectAndReturn(&Operation, msg, msgSize, false);
    KineticCountingSemaphore_Give_Expect(Operation.connection->outstandingOperations);
    KineticRequest_UnlockOperation_ExpectAndReturn(&Operation, true);

    KineticStatus status = KineticOperation_SendRequest(&Operation);
    TEST_ASSERT_EQUAL(KINETIC_STATUS_REQUEST_REJECTED, status);
}

void test_KineticOperation_SendRequest_should_acquire_and_increment_sequence_count_and_send_PDU_to_bus(void)
{
    TEST_IGNORE_MESSAGE("Need to test KineticOperation_SendRequest");
    KineticRequest_LockOperation_ExpectAndReturn(&Operation, true);
    KineticSession *session = Operation.connection->pSession;
    KineticSession_GetNextSequenceCount_ExpectAndReturn(session, 12345);

    KineticRequest_PackCommand_ExpectAndReturn(Operation.request, 100);
    KineticRequest_PopulateAuthentication_ExpectAndReturn(&session->config,
        Operation.request, NULL, KINETIC_STATUS_SUCCESS);

    KineticRequest_PackMessage_ExpectAndReturn(&Operation, &msg, &msgSize, KINETIC_STATUS_SUCCESS);

    KineticCountingSemaphore_Take_Expect(Operation.connection->outstandingOperations);

    KineticRequest_SendRequest_ExpectAndReturn(&Operation, msg, msgSize, true);
    KineticRequest_UnlockOperation_ExpectAndReturn(&Operation, true);

    KineticStatus status = KineticOperation_SendRequest(&Operation);
    TEST_ASSERT_EQUAL(KINETIC_STATUS_SUCCESS, status);
}


/*******************************************************************************
 * Client Operations
*******************************************************************************/

void test_KineticOperation_BuildNoop_should_build_and_execute_a_NOOP_operation(void)
{
    KineticOperation_BuildNoop(&Operation);
@@ -105,8 +201,6 @@ void test_KineticOperation_BuildNoop_should_build_and_execute_a_NOOP_operation(v
    TEST_ASSERT_NULL(Operation.response);
}



void test_KineticOperation_BuildPut_should_return_BUFFER_OVERRUN_if_object_value_too_long(void)
{
    ByteArray value = ByteArray_CreateWithCString("Luke, I am your father");
Loading