Commit 732d776b authored by Greg Williams's avatar Greg Williams
Browse files

Converted session within connection to a pointer to eliminate duplication and sync issues.

Ensured all unsolicited responses are freed to prevent memory leaks.
Moved init of connection into allocation for encapsulation.
parent a38f9c91
Loading
Loading
Loading
Loading
+4 −1
Original line number Diff line number Diff line
@@ -24,13 +24,16 @@
#include <stdlib.h>
#include <pthread.h>

KineticConnection* KineticAllocator_NewConnection(void)
KineticConnection* KineticAllocator_NewConnection(struct bus * b, KineticSession* const session)
{
    KineticConnection* connection = KineticCalloc(1, sizeof(KineticConnection));
    if (connection == NULL) {
        LOG0("Failed allocating new Connection!");
        return NULL;
    }
    connection->pSession = session;
    connection->messageBus = b;
    connection->socket = -1;  // start with an invalid file descriptor
    return connection;
}

+1 −1
Original line number Diff line number Diff line
@@ -25,7 +25,7 @@

void KineticAllocator_InitLists(KineticConnection* connection);

KineticConnection* KineticAllocator_NewConnection(void);
KineticConnection* KineticAllocator_NewConnection(struct bus * b, KineticSession* const session);
void KineticAllocator_FreeConnection(KineticConnection* connection);

KineticPDU* KineticAllocator_NewPDU(KineticConnection* connection);
+12 −11
Original line number Diff line number Diff line
@@ -80,7 +80,7 @@ KineticStatus KineticController_ExecuteOperation(KineticOperation* operation, Ki
{
    assert(operation != NULL);
    assert(operation->connection != NULL);
    assert(&operation->connection->session != NULL);
    assert(operation->connection->pSession != NULL);
    KineticStatus status = KINETIC_STATUS_INVALID;

    if (closure != NULL)
@@ -177,6 +177,7 @@ static const char *bus_error_string(bus_send_status_t t) {
        return "bad_response";
    }
}

void KineticController_HandleUnexecpectedResponse(void *msg,
                                                  int64_t seq_id,
                                                  void *bus_udata,
@@ -188,8 +189,10 @@ void KineticController_HandleUnexecpectedResponse(void *msg,
    (void)seq_id;
    (void)bus_udata;

    KineticLogger_LogProtobuf(2, response->proto);

    LOGF1("[PDU RX UNSOLICITED] pdu: 0x%0llX, session: 0x%llX, bus: 0x%llX, "
                "fd: %6d, protoLen: %u, valueLen: %u",
                response, connection->pSession, connection->messageBus, connection->socket,
                response->header.protobufLength, response->header.valueLength);

    // Handle unsolicited status PDUs
    if (response->proto->authType == KINETIC_PROTO_MESSAGE_AUTH_TYPE_UNSOLICITEDSTATUS) {
@@ -197,25 +200,23 @@ void KineticController_HandleUnexecpectedResponse(void *msg,
            response->command->header != NULL &&
            response->command->header->has_connectionID)
        {
            LOGF1("[PDU RX UNSOLICITED] pdu: 0x%0llX, session: 0x%llX, bus: 0x%llX, "
                "fd: %6d, protoLen: %u, valueLen: %u",
                response, &connection->session, connection->messageBus, connection->socket,
                response->header.protobufLength, response->header.valueLength);

            // Extract connectionID from unsolicited status message
            connection->connectionID = response->command->header->connectionID;
            LOGF2("Extracted connection ID from unsolicited status PDU (id=%lld)",
                connection->connectionID);
        }
        else {
            LOG0("WARNING: Unsolicited PDU is invalid. Does not specify connection ID!");
            LOG0("WARNING: Unsolicited PDU received after session initialized!");
        }
        KineticAllocator_FreeKineticResponse(response);
    }
    else
    {
        LOG0("WARNING: Received unexpected response that was not an unsolicited status.");
    }

    KineticLogger_LogProtobuf(2, response->proto);

    KineticAllocator_FreeKineticResponse(response);
}

void KineticController_HandleExpectedResponse(bus_msg_result_t *res, void *udata)
@@ -242,7 +243,7 @@ void KineticController_HandleExpectedResponse(bus_msg_result_t *res, void *udata

        LOGF1("[PDU RX] pdu: 0x%0llX, op: 0x%llX, session: 0x%llX, bus: 0x%llX, fd: %6d, "
            "seq: %5lld, protoLen: %4u, valueLen: %u, status: %s",
            response, op, &op->connection->session, op->connection->messageBus, op->connection->socket, 
            response, op, op->connection->pSession, op->connection->messageBus, op->connection->socket, 
            response->command->header->ackSequence, response->header.protobufLength, response->header.valueLength,
            Kinetic_GetStatusDescription(status));
    }
+2 −3
Original line number Diff line number Diff line
@@ -102,7 +102,7 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o

            // Populate the HMAC for the protobuf
            KineticHMAC_Init(&hmac, KINETIC_PROTO_COMMAND_SECURITY_ACL_HMACALGORITHM_HmacSHA1);
            KineticHMAC_Populate(&hmac, proto, operation->connection->session.config.hmacKey);
            KineticHMAC_Populate(&hmac, proto, operation->connection->pSession->config.hmacKey);
        } break;
    default:
        break;
@@ -134,7 +134,7 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o
    request->message.header.sequence = operation->connection->sequence++;

    LOGF1("[PDU TX] pdu: 0x%0llX, op: 0x%llX, session: 0x%llX, bus: 0x%llX, fd: %6d, seq: %5lld, protoLen: %4u, valueLen: %u",
        operation->request, operation, &operation->connection->session, operation->connection->messageBus,
        operation->request, operation, operation->connection->pSession, operation->connection->messageBus,
        operation->connection->socket, request->message.header.sequence, header.protobufLength, header.valueLength);

    KineticLogger_LogHeader(3, &header);
@@ -190,7 +190,6 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o
        status = KINETIC_STATUS_SUCCESS;
    }


cleanup:

    pthread_mutex_unlock(sendMutex);
+1 −6
Original line number Diff line number Diff line
@@ -43,15 +43,11 @@ KineticStatus KineticSession_Create(KineticSession * const session, KineticClien
        return KINETIC_STATUS_SESSION_EMPTY;
    }

    session->connection = KineticAllocator_NewConnection();
    session->connection = KineticAllocator_NewConnection(client->bus, session);
    if (session->connection == NULL) {
        return KINETIC_STATUS_MEMORY_ERROR;
    }
    
    session->connection->session = *session; // TODO: KILL ME!!!
    session->connection->messageBus = client->bus;
    session->connection->socket = -1;  // start without a file descriptor
    
    // init connection send mutex
    if (pthread_mutex_init(&session->connection->sendMutex, NULL) != 0) {
        LOG0("Failed initializing connection send mutex!");
@@ -117,7 +113,6 @@ KineticStatus KineticSession_Connect(KineticSession const * const session)
        return KINETIC_STATUS_SESSION_INVALID;
    }

    connection->session = *session;
    // #TODO what to do if we time out here? I think the bus should timeout by itself or something

    // Wait for initial unsolicited status to be received in order to obtain connectionID
Loading