Commit fceafc29 authored by Greg Williams's avatar Greg Williams
Browse files

Added corking to packetization of PDUs. Still need to tune to maximize performance

parent c88a7ee6
Loading
Loading
Loading
Loading
+10 −0
Original line number Diff line number Diff line
@@ -36,11 +36,16 @@ static void KineticOperation_ValidateOperation(KineticOperation* operation);
static KineticStatus WritePDU(KineticOperation* const operation)
{
    KineticPDU* request = operation->request;

    // Cork the PDU to begin aggregation
    KineticSocket_CorkPacket(request->connection->socket);

    // Pack and send the PDU header
    ByteBuffer hdr = ByteBuffer_Create(&request->headerNBO, sizeof(KineticPDUHeader), sizeof(KineticPDUHeader));
    KineticStatus status = KineticSocket_Write(request->connection->socket, &hdr);
    if (status != KINETIC_STATUS_SUCCESS) {
        LOG0("Failed to send PDU header!");
        KineticSocket_UncorkPacket(request->connection->socket);
        return status;
    }

@@ -50,6 +55,7 @@ static KineticStatus WritePDU(KineticOperation* const operation)
    status = KineticSocket_WriteProtobuf(request->connection->socket, request);
    if (status != KINETIC_STATUS_SUCCESS) {
        LOG0("Failed to send PDU protobuf message!");
        KineticSocket_UncorkPacket(request->connection->socket);
        return status;
    }

@@ -59,10 +65,14 @@ static KineticStatus WritePDU(KineticOperation* const operation)
        status = KineticSocket_Write(request->connection->socket, &operation->entry->value);
        if (status != KINETIC_STATUS_SUCCESS) {
            LOG0("Failed to send PDU value payload!");
            KineticSocket_UncorkPacket(request->connection->socket);
            return status;
        }
    }

    // Uncork the PDU to begin transmission
    KineticSocket_UncorkPacket(request->connection->socket);

    LOG2("PDU sent successfully!");
    return KINETIC_STATUS_SUCCESS;
}
+21 −0
Original line number Diff line number Diff line
@@ -424,3 +424,24 @@ KineticStatus KineticSocket_WriteProtobuf(int socket, KineticPDU* pdu)
    free(packed);
    return status;
}

void KineticSocket_CorkPacket(int socket)
{
#if defined(__APPLE__)
    (void)socket;
#else
    int on = 1;
    setsockopt(socket, SOL_TCP, TCP_CORK, &on, sizeof(on));
#endif
}

void KineticSocket_UncorkPacket(int socket)
{
#if defined(__APPLE__)
    (void)socket;
#else
    int on = 1;
    int off = 0;
    setsockopt(socket, SOL_TCP, TCP_CORK, &on, sizeof(on));
#endif
}
+2 −0
Original line number Diff line number Diff line
@@ -42,5 +42,7 @@ KineticStatus KineticSocket_ReadProtobuf(int socket, KineticPDU* pdu);

KineticStatus KineticSocket_Write(int socket, ByteBuffer* src);
KineticStatus KineticSocket_WriteProtobuf(int socket, KineticPDU* pdu);
void KineticSocket_CorkPacket(int socket);
void KineticSocket_UncorkPacket(int socket);

#endif // _KINETIC_SOCKET_H
+12 −1
Original line number Diff line number Diff line
@@ -110,8 +110,10 @@ void test_KineticOperation_SendRequest_should_transmit_PDU_with_no_value_payload
    // Setup expectations for interaction
    KineticHMAC_Init_Expect(&Request.hmac, KINETIC_PROTO_COMMAND_SECURITY_ACL_HMACALGORITHM_HmacSHA1);
    KineticHMAC_Populate_Expect(&Request.hmac, &Request.protoData.message.message, Request.connection->session.hmacKey);
    KineticSocket_CorkPacket_Expect(Connection.socket);
    KineticSocket_Write_ExpectAndReturn(Connection.socket, &headerNBO, KINETIC_STATUS_SUCCESS);
    KineticSocket_WriteProtobuf_ExpectAndReturn(Connection.socket, &Request, KINETIC_STATUS_SUCCESS);
    KineticSocket_UncorkPacket_Expect(Connection.socket);

    KineticStatus status = KineticOperation_SendRequest(&Operation);

@@ -137,9 +139,11 @@ void test_KineticOperation_SendRequest_should_send_PDU_with_value_payload(void)
    KineticHMAC_Init_Expect(&Request.hmac, KINETIC_PROTO_COMMAND_SECURITY_ACL_HMACALGORITHM_HmacSHA1);
    KineticHMAC_Populate_Expect(&Request.hmac,
        &Request.protoData.message.message, Request.connection->session.hmacKey);
    KineticSocket_CorkPacket_Expect(Connection.socket);
    KineticSocket_Write_ExpectAndReturn(Connection.socket, &headerNBO, KINETIC_STATUS_SUCCESS);
    KineticSocket_WriteProtobuf_ExpectAndReturn(Connection.socket, &Request, KINETIC_STATUS_SUCCESS);
    KineticSocket_Write_ExpectAndReturn(Connection.socket, &entry.value, KINETIC_STATUS_SUCCESS);
    KineticSocket_UncorkPacket_Expect(Connection.socket);

    KineticStatus status = KineticOperation_SendRequest(&Operation);

@@ -163,8 +167,11 @@ void test_KineticOperation_SendRequest_should_send_the_specified_message_and_ret
    Operation.sendValue = true;

    KineticHMAC_Init_Expect(&Request.hmac, KINETIC_PROTO_COMMAND_SECURITY_ACL_HMACALGORITHM_HmacSHA1);
    KineticHMAC_Populate_Expect(&Request.hmac, &Request.protoData.message.message, Request.connection->session.hmacKey);
    KineticHMAC_Populate_Expect(&Request.hmac, &Request.protoData.message.message,
        Request.connection->session.hmacKey);
    KineticSocket_CorkPacket_Expect(Connection.socket);
    KineticSocket_Write_ExpectAndReturn(Connection.socket, &headerNBO, KINETIC_STATUS_SOCKET_ERROR);
    KineticSocket_UncorkPacket_Expect(Connection.socket);

    KineticStatus status = KineticOperation_SendRequest(&Operation);

@@ -190,8 +197,10 @@ void test_KineticOperation_SendRequest_should_send_the_specified_message_and_ret
    KineticHMAC_Init_Expect(&Request.hmac, KINETIC_PROTO_COMMAND_SECURITY_ACL_HMACALGORITHM_HmacSHA1);
    KineticHMAC_Populate_Expect(&Request.hmac,
        &Request.protoData.message.message, Request.connection->session.hmacKey);
    KineticSocket_CorkPacket_Expect(Connection.socket);
    KineticSocket_Write_ExpectAndReturn(Connection.socket, &headerNBO, KINETIC_STATUS_SUCCESS);
    KineticSocket_WriteProtobuf_ExpectAndReturn(Connection.socket, &Request, KINETIC_STATUS_SOCKET_TIMEOUT);
    KineticSocket_UncorkPacket_Expect(Connection.socket);

    KineticStatus status = KineticOperation_SendRequest(&Operation);

@@ -217,9 +226,11 @@ void test_KineticOperation_SendRequest_should_send_the_specified_message_and_ret

    KineticHMAC_Init_Expect(&Request.hmac, KINETIC_PROTO_COMMAND_SECURITY_ACL_HMACALGORITHM_HmacSHA1);
    KineticHMAC_Populate_Expect(&Request.hmac, &Request.protoData.message.message, Request.connection->session.hmacKey);
    KineticSocket_CorkPacket_Expect(Connection.socket);
    KineticSocket_Write_ExpectAndReturn(Connection.socket, &headerNBO, KINETIC_STATUS_SUCCESS);
    KineticSocket_WriteProtobuf_ExpectAndReturn(Connection.socket, &Request, KINETIC_STATUS_SUCCESS);
    KineticSocket_Write_ExpectAndReturn(Connection.socket, &entry.value, KINETIC_STATUS_SOCKET_TIMEOUT);
    KineticSocket_UncorkPacket_Expect(Connection.socket);

    KineticStatus status = KineticOperation_SendRequest(&Operation);

+1 −1

File changed.

Contains only whitespace changes.