Commit 46ea462e authored by Scott Vokes's avatar Scott Vokes
Browse files

Merge remote-tracking branch 'origin/develop' into develop

parents c552afd9 343dc611
Loading
Loading
Loading
Loading
+4 −1
Original line number Diff line number Diff line
@@ -24,13 +24,16 @@
#include <stdlib.h>
#include <pthread.h>

KineticConnection* KineticAllocator_NewConnection(void)
KineticConnection* KineticAllocator_NewConnection(struct bus * b, KineticSession* const session)
{
    KineticConnection* connection = KineticCalloc(1, sizeof(KineticConnection));
    if (connection == NULL) {
        LOG0("Failed allocating new Connection!");
        return NULL;
    }
    connection->pSession = session;
    connection->messageBus = b;
    connection->socket = -1;  // start with an invalid file descriptor
    return connection;
}

+1 −1
Original line number Diff line number Diff line
@@ -25,7 +25,7 @@

void KineticAllocator_InitLists(KineticConnection* connection);

KineticConnection* KineticAllocator_NewConnection(void);
KineticConnection* KineticAllocator_NewConnection(struct bus * b, KineticSession* const session);
void KineticAllocator_FreeConnection(KineticConnection* connection);

KineticPDU* KineticAllocator_NewPDU(KineticConnection* connection);
+12 −11
Original line number Diff line number Diff line
@@ -80,7 +80,7 @@ KineticStatus KineticController_ExecuteOperation(KineticOperation* operation, Ki
{
    assert(operation != NULL);
    assert(operation->connection != NULL);
    assert(&operation->connection->session != NULL);
    assert(operation->connection->pSession != NULL);
    KineticStatus status = KINETIC_STATUS_INVALID;

    if (closure != NULL)
@@ -177,6 +177,7 @@ static const char *bus_error_string(bus_send_status_t t) {
        return "bad_response";
    }
}

void KineticController_HandleUnexecpectedResponse(void *msg,
                                                  int64_t seq_id,
                                                  void *bus_udata,
@@ -188,8 +189,10 @@ void KineticController_HandleUnexecpectedResponse(void *msg,
    (void)seq_id;
    (void)bus_udata;

    KineticLogger_LogProtobuf(2, response->proto);

    LOGF1("[PDU RX UNSOLICITED] pdu: 0x%0llX, session: 0x%llX, bus: 0x%llX, "
                "fd: %6d, protoLen: %u, valueLen: %u",
                response, connection->pSession, connection->messageBus, connection->socket,
                response->header.protobufLength, response->header.valueLength);

    // Handle unsolicited status PDUs
    if (response->proto->authType == KINETIC_PROTO_MESSAGE_AUTH_TYPE_UNSOLICITEDSTATUS) {
@@ -197,25 +200,23 @@ void KineticController_HandleUnexecpectedResponse(void *msg,
            response->command->header != NULL &&
            response->command->header->has_connectionID)
        {
            LOGF1("[PDU RX UNSOLICITED] pdu: 0x%0llX, session: 0x%llX, bus: 0x%llX, "
                "fd: %6d, protoLen: %u, valueLen: %u",
                response, &connection->session, connection->messageBus, connection->socket,
                response->header.protobufLength, response->header.valueLength);

            // Extract connectionID from unsolicited status message
            connection->connectionID = response->command->header->connectionID;
            LOGF2("Extracted connection ID from unsolicited status PDU (id=%lld)",
                connection->connectionID);
        }
        else {
            LOG0("WARNING: Unsolicited PDU is invalid. Does not specify connection ID!");
            LOG0("WARNING: Unsolicited PDU received after session initialized!");
        }
        KineticAllocator_FreeKineticResponse(response);
    }
    else
    {
        LOG0("WARNING: Received unexpected response that was not an unsolicited status.");
    }

    KineticLogger_LogProtobuf(2, response->proto);

    KineticAllocator_FreeKineticResponse(response);
}

void KineticController_HandleExpectedResponse(bus_msg_result_t *res, void *udata)
@@ -242,7 +243,7 @@ void KineticController_HandleExpectedResponse(bus_msg_result_t *res, void *udata

        LOGF1("[PDU RX] pdu: 0x%0llX, op: 0x%llX, session: 0x%llX, bus: 0x%llX, fd: %6d, "
            "seq: %5lld, protoLen: %4u, valueLen: %u, status: %s",
            response, op, &op->connection->session, op->connection->messageBus, op->connection->socket, 
            response, op, op->connection->pSession, op->connection->messageBus, op->connection->socket, 
            response->command->header->ackSequence, response->header.protobufLength, response->header.valueLength,
            Kinetic_GetStatusDescription(status));
    }
+2 −3
Original line number Diff line number Diff line
@@ -102,7 +102,7 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o

            // Populate the HMAC for the protobuf
            KineticHMAC_Init(&hmac, KINETIC_PROTO_COMMAND_SECURITY_ACL_HMACALGORITHM_HmacSHA1);
            KineticHMAC_Populate(&hmac, proto, operation->connection->session.config.hmacKey);
            KineticHMAC_Populate(&hmac, proto, operation->connection->pSession->config.hmacKey);
        } break;
    default:
        break;
@@ -134,7 +134,7 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o
    request->message.header.sequence = operation->connection->sequence++;

    LOGF1("[PDU TX] pdu: 0x%0llX, op: 0x%llX, session: 0x%llX, bus: 0x%llX, fd: %6d, seq: %5lld, protoLen: %4u, valueLen: %u",
        operation->request, operation, &operation->connection->session, operation->connection->messageBus,
        operation->request, operation, operation->connection->pSession, operation->connection->messageBus,
        operation->connection->socket, request->message.header.sequence, header.protobufLength, header.valueLength);

    KineticLogger_LogHeader(3, &header);
@@ -190,7 +190,6 @@ static KineticStatus KineticOperation_SendRequestInner(KineticOperation* const o
        status = KINETIC_STATUS_SUCCESS;
    }


cleanup:

    pthread_mutex_unlock(sendMutex);
+1 −6
Original line number Diff line number Diff line
@@ -43,15 +43,11 @@ KineticStatus KineticSession_Create(KineticSession * const session, KineticClien
        return KINETIC_STATUS_SESSION_EMPTY;
    }

    session->connection = KineticAllocator_NewConnection();
    session->connection = KineticAllocator_NewConnection(client->bus, session);
    if (session->connection == NULL) {
        return KINETIC_STATUS_MEMORY_ERROR;
    }
    
    session->connection->session = *session; // TODO: KILL ME!!!
    session->connection->messageBus = client->bus;
    session->connection->socket = -1;  // start without a file descriptor
    
    // init connection send mutex
    if (pthread_mutex_init(&session->connection->sendMutex, NULL) != 0) {
        LOG0("Failed initializing connection send mutex!");
@@ -117,7 +113,6 @@ KineticStatus KineticSession_Connect(KineticSession const * const session)
        return KINETIC_STATUS_SESSION_INVALID;
    }

    connection->session = *session;
    // #TODO what to do if we time out here? I think the bus should timeout by itself or something

    // Wait for initial unsolicited status to be received in order to obtain connectionID
Loading