Commit a2b2194e authored by Greg Williams's avatar Greg Williams
Browse files

Updated packetization to cork and uncork around packets when NOT on OSX, and...

Updated packetization to cork and uncork around packets when NOT on OSX, and to always set socket opt TCP_NODELAY on after packet stuffed into socket
parent 18963473
Loading
Loading
Loading
Loading
+5 −5
Original line number Diff line number Diff line
@@ -38,14 +38,14 @@ static KineticStatus WritePDU(KineticOperation* const operation)
    KineticPDU* request = operation->request;

    // Cork the PDU to begin aggregation
    KineticSocket_CorkPacket(request->connection->socket);
    KineticSocket_BeginPacket(request->connection->socket);

    // Pack and send the PDU header
    ByteBuffer hdr = ByteBuffer_Create(&request->headerNBO, sizeof(KineticPDUHeader), sizeof(KineticPDUHeader));
    KineticStatus status = KineticSocket_Write(request->connection->socket, &hdr);
    if (status != KINETIC_STATUS_SUCCESS) {
        LOG0("Failed to send PDU header!");
        KineticSocket_UncorkPacket(request->connection->socket);
        KineticSocket_FinishPacket(request->connection->socket);
        return status;
    }

@@ -55,7 +55,7 @@ static KineticStatus WritePDU(KineticOperation* const operation)
    status = KineticSocket_WriteProtobuf(request->connection->socket, request);
    if (status != KINETIC_STATUS_SUCCESS) {
        LOG0("Failed to send PDU protobuf message!");
        KineticSocket_UncorkPacket(request->connection->socket);
        KineticSocket_FinishPacket(request->connection->socket);
        return status;
    }

@@ -65,13 +65,13 @@ static KineticStatus WritePDU(KineticOperation* const operation)
        status = KineticSocket_Write(request->connection->socket, &operation->entry->value);
        if (status != KINETIC_STATUS_SUCCESS) {
            LOG0("Failed to send PDU value payload!");
            KineticSocket_UncorkPacket(request->connection->socket);
            KineticSocket_FinishPacket(request->connection->socket);
            return status;
        }
    }

    // Uncork the PDU to begin transmission
    KineticSocket_UncorkPacket(request->connection->socket);
    KineticSocket_FinishPacket(request->connection->socket);

    LOG2("PDU sent successfully!");
    return KINETIC_STATUS_SUCCESS;
+8 −8
Original line number Diff line number Diff line
@@ -426,22 +426,22 @@ KineticStatus KineticSocket_WriteProtobuf(int socket, KineticPDU* pdu)
    return status;
}

void KineticSocket_CorkPacket(int socket)
void KineticSocket_BeginPacket(int socket)
{
#if defined(__APPLE__)
    (void)socket;
#else
#if !defined(__APPLE__) /* TCP_CORK is NOT available on OSX */
    int on = 1;
    setsockopt(socket, IPPROTO_TCP, TCP_CORK, &on, sizeof(on));
#else
    (void)socket;
#endif
}

void KineticSocket_UncorkPacket(int socket)
void KineticSocket_FinishPacket(int socket)
{
#if defined(__APPLE__)
    (void)socket;
#else
#if !defined(__APPLE__) /* TCP_CORK is NOT available on OSX */
    int off = 0;
    setsockopt(socket, IPPROTO_TCP, TCP_CORK, &off, sizeof(off));
#endif
    int on = 1;
    setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on));
}
+2 −2
Original line number Diff line number Diff line
@@ -42,7 +42,7 @@ KineticStatus KineticSocket_ReadProtobuf(int socket, KineticPDU* pdu);

KineticStatus KineticSocket_Write(int socket, ByteBuffer* src);
KineticStatus KineticSocket_WriteProtobuf(int socket, KineticPDU* pdu);
void KineticSocket_CorkPacket(int socket);
void KineticSocket_UncorkPacket(int socket);
void KineticSocket_BeginPacket(int socket);
void KineticSocket_FinishPacket(int socket);

#endif // _KINETIC_SOCKET_H
+1 −2
Original line number Diff line number Diff line
@@ -101,9 +101,8 @@ void test_Flush_should_call_callback_after_completion(void)

    /* Wait up to 10 seconds for the callback to fire. */
    struct timeval tv;
    struct timespec ts;
    gettimeofday(&tv, NULL);
    ts.tv_sec = tv.tv_sec + 10;
    struct timespec ts = {ts.tv_sec = tv.tv_sec + 10};
    int res = pthread_cond_timedwait(&env.cond, &env.mutex, &ts);
    TEST_ASSERT_EQUAL(0, res);

+1 −0
Original line number Diff line number Diff line
@@ -81,6 +81,7 @@ static void* kinetic_put(void* kinetic_arg);

void setUp(void)
{
    sleep(1);
    KineticClient_Init("stdout", 0);
}

Loading