Loading include/kinetic_types.h +4 −0 Original line number Diff line number Diff line Loading @@ -144,6 +144,10 @@ typedef struct _KineticSession { /// Session configuration structure which must be configured KineticSessionConfig config; /* An overly coarse lock, applied as part of debugging a race condition. * TODO: Remove this when the root cause is determined. */ pthread_mutex_t coarse_lock; /// Connection instance which is dynamically allocated upon call to `KineticClient_CreateConnection`. /// Client must call `KineticClient_DestroyConnection` when finished with a session to shutdown /// a session cleanly and free the `connection`. Loading src/lib/kinetic_client.c +22 −2 Original line number Diff line number Diff line Loading @@ -144,16 +144,36 @@ KineticStatus KineticClient_Put(KineticSession const * const session, assert(session->connection != NULL); assert(entry != NULL); assert(entry->value.array.data != NULL); assert(&session->connection->session == session); assert(session->connection->pSession == session); assert(session->connection == session->connection->pSession->connection); #define PUT_LOCK 1 #if PUT_LOCK KineticSession_Lock(session); #endif KineticOperation* operation = KineticController_CreateOperation(session); if (operation == NULL) {return KINETIC_STATUS_MEMORY_ERROR;} #if PUT_LOCK if (0) { // Debugging: change to 1 to force race condition KineticSession_Unlock(session); KineticSession_Lock(session); } #endif // Initialize request KineticOperation_BuildPut(operation, entry); // Execute the operation return KineticController_ExecuteOperation(operation, closure); assert(operation->connection == session->connection); KineticStatus res = KineticController_ExecuteOperation(operation, closure); #if PUT_LOCK KineticSession_Unlock(session); #endif return res; } KineticStatus KineticClient_Flush(KineticSession const * const session, Loading src/lib/kinetic_session.c +13 −0 Original line number Diff line number Diff line Loading @@ -43,11 +43,14 @@ KineticStatus KineticSession_Create(KineticSession * const session, KineticClien return KINETIC_STATUS_SESSION_EMPTY; } assert(session->connection == NULL); session->connection = KineticAllocator_NewConnection(client->bus, session); if (session->connection == NULL) { return KINETIC_STATUS_MEMORY_ERROR; } pthread_mutex_init(&session->coarse_lock, NULL); // init connection send mutex if (pthread_mutex_init(&session->connection->sendMutex, NULL) != 0) { LOG0("Failed initializing connection send mutex!"); Loading Loading @@ -150,3 +153,13 @@ KineticStatus KineticSession_Disconnect(KineticSession const * const session) return KINETIC_STATUS_SUCCESS; } void KineticSession_Lock(KineticSession const *const ses) { KineticSession *s = (KineticSession *)ses; pthread_mutex_lock(&s->coarse_lock); } void KineticSession_Unlock(KineticSession const * const ses) { KineticSession *s = (KineticSession *)ses; pthread_mutex_unlock(&s->coarse_lock); } src/lib/kinetic_session.h +3 −0 Original line number Diff line number Diff line Loading @@ -29,4 +29,7 @@ KineticStatus KineticSession_Connect(KineticSession const * const session); KineticStatus KineticSession_Disconnect(KineticSession const * const session); void KineticSession_IncrementSequence(KineticSession const * const session); void KineticSession_Lock(KineticSession const * const ses); void KineticSession_Unlock(KineticSession const * const ses); #endif // _KINETIC_SESSION_H Loading
include/kinetic_types.h +4 −0 Original line number Diff line number Diff line Loading @@ -144,6 +144,10 @@ typedef struct _KineticSession { /// Session configuration structure which must be configured KineticSessionConfig config; /* An overly coarse lock, applied as part of debugging a race condition. * TODO: Remove this when the root cause is determined. */ pthread_mutex_t coarse_lock; /// Connection instance which is dynamically allocated upon call to `KineticClient_CreateConnection`. /// Client must call `KineticClient_DestroyConnection` when finished with a session to shutdown /// a session cleanly and free the `connection`. Loading
src/lib/kinetic_client.c +22 −2 Original line number Diff line number Diff line Loading @@ -144,16 +144,36 @@ KineticStatus KineticClient_Put(KineticSession const * const session, assert(session->connection != NULL); assert(entry != NULL); assert(entry->value.array.data != NULL); assert(&session->connection->session == session); assert(session->connection->pSession == session); assert(session->connection == session->connection->pSession->connection); #define PUT_LOCK 1 #if PUT_LOCK KineticSession_Lock(session); #endif KineticOperation* operation = KineticController_CreateOperation(session); if (operation == NULL) {return KINETIC_STATUS_MEMORY_ERROR;} #if PUT_LOCK if (0) { // Debugging: change to 1 to force race condition KineticSession_Unlock(session); KineticSession_Lock(session); } #endif // Initialize request KineticOperation_BuildPut(operation, entry); // Execute the operation return KineticController_ExecuteOperation(operation, closure); assert(operation->connection == session->connection); KineticStatus res = KineticController_ExecuteOperation(operation, closure); #if PUT_LOCK KineticSession_Unlock(session); #endif return res; } KineticStatus KineticClient_Flush(KineticSession const * const session, Loading
src/lib/kinetic_session.c +13 −0 Original line number Diff line number Diff line Loading @@ -43,11 +43,14 @@ KineticStatus KineticSession_Create(KineticSession * const session, KineticClien return KINETIC_STATUS_SESSION_EMPTY; } assert(session->connection == NULL); session->connection = KineticAllocator_NewConnection(client->bus, session); if (session->connection == NULL) { return KINETIC_STATUS_MEMORY_ERROR; } pthread_mutex_init(&session->coarse_lock, NULL); // init connection send mutex if (pthread_mutex_init(&session->connection->sendMutex, NULL) != 0) { LOG0("Failed initializing connection send mutex!"); Loading Loading @@ -150,3 +153,13 @@ KineticStatus KineticSession_Disconnect(KineticSession const * const session) return KINETIC_STATUS_SUCCESS; } void KineticSession_Lock(KineticSession const *const ses) { KineticSession *s = (KineticSession *)ses; pthread_mutex_lock(&s->coarse_lock); } void KineticSession_Unlock(KineticSession const * const ses) { KineticSession *s = (KineticSession *)ses; pthread_mutex_unlock(&s->coarse_lock); }
src/lib/kinetic_session.h +3 −0 Original line number Diff line number Diff line Loading @@ -29,4 +29,7 @@ KineticStatus KineticSession_Connect(KineticSession const * const session); KineticStatus KineticSession_Disconnect(KineticSession const * const session); void KineticSession_IncrementSequence(KineticSession const * const session); void KineticSession_Lock(KineticSession const * const ses); void KineticSession_Unlock(KineticSession const * const ses); #endif // _KINETIC_SESSION_H