Commit 1d4b868a authored by Greg Williams's avatar Greg Williams
Browse files

Added kinetic_controller module to house worker threads and high-level operation management support

parent 6de5c002
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
@@ -46,6 +46,7 @@ LIB_DEPS = \
	$(LIB_DIR)/kinetic_message.h \
	$(LIB_DIR)/kinetic_logger.h \
	$(LIB_DIR)/kinetic_hmac.h \
	$(LIB_DIR)/kinetic_controller.h \
	$(LIB_DIR)/kinetic_connection.h \
	$(LIB_DIR)/kinetic_types_internal.h \
	$(PUB_INC)/kinetic_types.h \
@@ -65,6 +66,7 @@ LIB_OBJS = \
	$(OUT_DIR)/kinetic_message.o \
	$(OUT_DIR)/kinetic_logger.o \
	$(OUT_DIR)/kinetic_hmac.o \
	$(OUT_DIR)/kinetic_controller.o \
	$(OUT_DIR)/kinetic_connection.o \
	$(OUT_DIR)/kinetic_types_internal.o \
	$(OUT_DIR)/kinetic_types.o \
@@ -107,6 +109,8 @@ $(OUT_DIR)/kinetic_logger.o: $(LIB_DIR)/kinetic_logger.c $(LIB_DEPS)
	$(CC) -c -o $@ $< $(CFLAGS) $(LIB_INCS)
$(OUT_DIR)/kinetic_hmac.o: $(LIB_DIR)/kinetic_hmac.c $(LIB_DEPS)
	$(CC) -c -o $@ $< $(CFLAGS) $(LIB_INCS)
$(OUT_DIR)/kinetic_controller.o: $(LIB_DIR)/kinetic_controller.c $(LIB_DEPS)
	$(CC) -c -o $@ $< $(CFLAGS) $(LIB_INCS)
$(OUT_DIR)/kinetic_connection.o: $(LIB_DIR)/kinetic_connection.c $(LIB_DEPS)
	$(CC) -c -o $@ $< $(CFLAGS) $(LIB_INCS)
$(OUT_DIR)/kinetic_types_internal.o: $(LIB_DIR)/kinetic_types_internal.c $(LIB_DEPS)

config/Doxyfile_libdev

0 → 100644
+2310 −0

File added.

Preview size limit exceeded, changes collapsed.

+10 −72
Original line number Diff line number Diff line
@@ -30,62 +30,6 @@
#include <stdlib.h>
#include <sys/time.h>

static KineticStatus KineticClient_CreateOperation(
    KineticOperation** operation,
    KineticSessionHandle handle)
{
    if (handle == KINETIC_HANDLE_INVALID) {
        LOG0("Specified session has invalid handle value");
        return KINETIC_STATUS_SESSION_EMPTY;
    }

    KineticConnection* connection = KineticConnection_FromHandle(handle);
    if (connection == NULL) {
        LOG0("Specified session is not associated with a connection");
        return KINETIC_STATUS_SESSION_INVALID;
    }

    LOGF1("\n"
         "--------------------------------------------------\n"
         "Building new operation on connection @ 0x%llX", connection);

    *operation = KineticAllocator_NewOperation(connection);
    if (*operation == NULL) {
        return KINETIC_STATUS_MEMORY_ERROR;
    }
    if ((*operation)->request == NULL) {
        return KINETIC_STATUS_NO_PDUS_AVAVILABLE;
    }

    return KINETIC_STATUS_SUCCESS;
}

static KineticStatus KineticClient_ExecuteOperation(KineticOperation* operation)
{
    assert(operation != NULL);
    KineticStatus status = KINETIC_STATUS_INVALID;

    LOGF1("Executing operation: 0x%llX", operation);
    if (operation->entry != NULL &&
        operation->entry->value.array.data != NULL &&
        operation->entry->value.bytesUsed > 0)
    {
        LOGF1("  Sending PDU (0x%0llX) w/value (%zu bytes)",
            operation->request, operation->entry->value.bytesUsed);
    }
    else {
        LOGF1("  Sending PDU (0x%0llX) w/o value", operation->request);
    }

    // Send the request
    status = KineticOperation_SendRequest(operation);
    if (status != KINETIC_STATUS_SUCCESS) {
        return status;
    }

    return KineticOperation_ReceiveAsync(operation);
}

void KineticClient_Init(const char* log_file, int log_level)
{
    KineticLogger_Init(log_file, log_level);
@@ -170,16 +114,15 @@ KineticStatus KineticClient_Disconnect(KineticSessionHandle* const handle)

KineticStatus KineticClient_NoOp(KineticSessionHandle handle)
{
    KineticStatus status;
    KineticOperation* operation;
    status = KineticClient_CreateOperation(&operation, handle);
    KineticStatus status = KineticController_CreateOperation(&operation, handle);
    if (status != KINETIC_STATUS_SUCCESS) {return status;}

    // Initialize request
    KineticOperation_BuildNoop(operation);

    // Execute the operation
    status = KineticClient_ExecuteOperation(operation);
    status = KineticController_ExecuteOperation(operation);

    return status;
}
@@ -190,9 +133,8 @@ KineticStatus KineticClient_Put(KineticSessionHandle handle,
{
    assert(entry != NULL);
    assert(entry->value.array.data != NULL);
    KineticStatus status;
    KineticOperation* operation;
    status = KineticClient_CreateOperation(&operation, handle);
    KineticStatus status = KineticController_CreateOperation(&operation, handle);
    if (status != KINETIC_STATUS_SUCCESS) {return status;}

    // Initialize request
@@ -200,7 +142,7 @@ KineticStatus KineticClient_Put(KineticSessionHandle handle,
    if (closure != NULL) {operation->closure = *closure;}

    // Execute the operation
    return KineticClient_ExecuteOperation(operation);
    return KineticController_ExecuteOperation(operation);
}

KineticStatus KineticClient_Get(KineticSessionHandle handle,
@@ -209,9 +151,8 @@ KineticStatus KineticClient_Get(KineticSessionHandle handle,
{
    assert(entry != NULL);
    if (!entry->metadataOnly) {assert(entry->value.array.data != NULL);}
    KineticStatus status;
    KineticOperation* operation;
    status = KineticClient_CreateOperation(&operation, handle);
    KineticStatus status = KineticController_CreateOperation(&operation, handle);
    if (status != KINETIC_STATUS_SUCCESS) {return status;}

    // Initialize request
@@ -219,7 +160,7 @@ KineticStatus KineticClient_Get(KineticSessionHandle handle,
    if (closure != NULL) {operation->closure = *closure;}

    // Execute the operation
    return KineticClient_ExecuteOperation(operation);
    return KineticController_ExecuteOperation(operation);
}

KineticStatus KineticClient_Delete(KineticSessionHandle handle,
@@ -227,9 +168,8 @@ KineticStatus KineticClient_Delete(KineticSessionHandle handle,
                                   KineticCompletionClosure* closure)
{
    assert(entry != NULL);
    KineticStatus status;
    KineticOperation* operation;
    status = KineticClient_CreateOperation(&operation, handle);
    KineticStatus status = KineticController_CreateOperation(&operation, handle);
    if (status != KINETIC_STATUS_SUCCESS) {return status;}

    // Initialize request
@@ -237,7 +177,7 @@ KineticStatus KineticClient_Delete(KineticSessionHandle handle,
    if (closure != NULL) {operation->closure = *closure;}

    // Execute the operation
    return KineticClient_ExecuteOperation(operation);
    return KineticController_ExecuteOperation(operation);
}

KineticStatus KineticClient_GetKeyRange(KineticSessionHandle handle,
@@ -251,10 +191,8 @@ KineticStatus KineticClient_GetKeyRange(KineticSessionHandle handle,
    assert(keys->buffers != NULL);
    assert(keys->count > 0);

    KineticStatus status;
    KineticOperation* operation;

    status = KineticClient_CreateOperation(&operation, handle);
    KineticStatus status = KineticController_CreateOperation(&operation, handle);
    if (status != KINETIC_STATUS_SUCCESS) {
        return status;
    }
@@ -264,5 +202,5 @@ KineticStatus KineticClient_GetKeyRange(KineticSessionHandle handle,
    if (closure != NULL) {operation->closure = *closure;}

    // Execute the operation
    return KineticClient_ExecuteOperation(operation);
    return KineticController_ExecuteOperation(operation);
}
+2 −119
Original line number Diff line number Diff line
@@ -23,6 +23,7 @@
#include "kinetic_socket.h"
#include "kinetic_pdu.h"
#include "kinetic_operation.h"
#include "kinetic_controller.h"
#include "kinetic_allocator.h"
#include "kinetic_logger.h"
#include <stdlib.h>
@@ -34,119 +35,6 @@
STATIC KineticConnection ConnectionInstances[KINETIC_SESSIONS_MAX];
STATIC KineticConnection* Connections[KINETIC_SESSIONS_MAX];


void KineticConnection_Pause(KineticConnection* const connection, bool pause)
{
    assert(connection != NULL);
    connection->thread.paused = pause;
}

static void* KineticConnection_Worker(void* thread_arg)
{
    KineticStatus status;
    KineticThread* thread = thread_arg;

    while(!thread->abortRequested && !thread->fatalError) {

        // Do not service PDUs if thread paused
        if (thread->paused) {
            sleep(0);
            continue;
        } 

        // Wait for and receive a PDU
        KineticWaitStatus wait_status = KineticSocket_WaitUntilDataAvailable(thread->connection->socket, 100);
        switch(wait_status)
        {
            case KINETIC_WAIT_STATUS_DATA_AVAILABLE:
            {
                KineticPDU* response = KineticAllocator_NewPDU(thread->connection);
                status = KineticPDU_ReceiveMain(response);
                if (status != KINETIC_STATUS_SUCCESS) {
                    LOGF0("ERROR: PDU receive reported an error: %s", Kinetic_GetStatusDescription(status));
                }
                else {
                    status = KineticPDU_GetStatus(response);
                }

                if (response->proto != NULL && response->proto->has_authType) {

                    // Handle unsolicited status PDUs
                    if (response->proto->authType == KINETIC_PROTO_MESSAGE_AUTH_TYPE_UNSOLICITEDSTATUS) {
                        response->type = KINETIC_PDU_TYPE_UNSOLICITED;
                        if (response->command != NULL &&
                            response->command->header != NULL &&
                            response->command->header->has_connectionID)
                        {
                            // Extract connectionID from unsolicited status message
                            response->connection->connectionID = response->command->header->connectionID;
                            LOGF2("Extracted connection ID from unsolicited status PDU (id=%lld)",
                                response->connection->connectionID);
                        }
                        else {
                            LOG0("WARNING: Unsolicited PDU is not recognized!");
                        }
                        KineticAllocator_FreePDU(thread->connection, response);
                    }

                    // Associate solicited response PDUs with their requests
                    else {
                        response->type = KINETIC_PDU_TYPE_RESPONSE;
                        KineticOperation* op = KineticOperation_AssociateResponseWithOperation(response);
                        if (op == NULL) {
                            LOG0("Failed to find request matching received response PDU!");
                            KineticAllocator_FreePDU(thread->connection, response);
                        }
                        else {
                            LOG2("Found associated operation/request for response PDU.");
                            size_t valueLength = KineticPDU_GetValueLength(response);
                            if (valueLength > 0) {
                                status = KineticPDU_ReceiveValue(op->connection->socket,
                                    &op->entry->value, valueLength);
                            }

                            // Call operation-specific callback, if configured
                            if (status == KINETIC_STATUS_SUCCESS && op->callback != NULL) {
                                status = op->callback(op);
                            }

                            // Call client-supplied closure callback, if supplied
                            if (op->closure.callback != NULL) {
                                KineticCompletionData completionData = {.status = status};
                                op->closure.callback(&completionData, op->closure.clientData);
                                KineticAllocator_FreeOperation(thread->connection, op);
                            }

                            // Otherwise, is a synchronous opearation, so just set a flag
                            else {
                                op->receiveComplete = true;
                            }
                        }
                    }
                }
                else {
                    // Free invalid PDU
                    KineticAllocator_FreePDU(thread->connection, response);
                }
            } break;
            case KINETIC_WAIT_STATUS_TIMED_OUT:
            case KINETIC_WAIT_STATUS_RETRYABLE_ERROR:
            {
                sleep(0);
            } break;
            default:
            case KINETIC_WAIT_STATUS_FATAL_ERROR:
            {
                LOG0("ERROR: Socket error while waiting for PDU to arrive");
                thread->fatalError = true;
            } break;
        }
    }

    LOG1("Worker thread terminated!");
    return (void*)NULL;
}

KineticSessionHandle KineticConnection_NewConnection(
    const KineticSession* const config)
{
@@ -207,12 +95,7 @@ KineticStatus KineticConnection_Connect(KineticConnection* const connection)

    // Kick off the worker thread
    connection->thread.connection = connection;
    int pthreadStatus = pthread_create(&connection->threadID, NULL, KineticConnection_Worker, &connection->thread);
    if (pthreadStatus != 0) {
        char errMsg[256];
        Kinetic_GetErrnoDescription(pthreadStatus, errMsg, sizeof(errMsg));
        LOGF0("Failed creating worker thread w/error: %s", errMsg);
    }
    KineticController_CreateWorkerThreads(connection);

    return KINETIC_STATUS_SUCCESS;
}
+0 −1
Original line number Diff line number Diff line
@@ -27,7 +27,6 @@ KineticSessionHandle KineticConnection_NewConnection(const KineticSession* const
void KineticConnection_FreeConnection(KineticSessionHandle* const handle);
KineticConnection* KineticConnection_FromHandle(KineticSessionHandle handle);
KineticStatus KineticConnection_Connect(KineticConnection* const connection);
void KineticConnection_Pause(KineticConnection* const connection, bool pause);
KineticStatus KineticConnection_Disconnect(KineticConnection* const connection);
void KineticConnection_IncrementSequence(KineticConnection* const connection);

Loading