Commit 8e5051e2 authored by Job Vranish's avatar Job Vranish
Browse files

added a throttle to limit the number of outstanding operations on a particular connection

parent 9c33878d
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -71,6 +71,7 @@ LIB_OBJS = \
	$(OUT_DIR)/kinetic_types.o \
	$(OUT_DIR)/kinetic_memory.o \
	$(OUT_DIR)/kinetic_semaphore.o \
	$(OUT_DIR)/kinetic_countingsemaphore.o \
	$(OUT_DIR)/byte_array.o \
	$(OUT_DIR)/kinetic_client.o \
	$(OUT_DIR)/threadpool.o \
+1 −2
Original line number Diff line number Diff line
#ifndef _KINETIC_SEMAPHORE_H
#define _KINETIC_SEMAPHORE_H

struct _KineticSemaphore;
typedef struct _KineticSemaphore KineticSemaphore;

/**
 * @brief Creates a KineticSemaphore. The KineticSemaphore is a simple wrapper 
 *        around a pthread condition variable and provides a a thread safe 
 *        around a pthread condition variable and provides a a thread-safe
 *        way to block a thread and wait for notification from another thread.
 *
 * @return          Returns a pointer to a KineticSemaphore
+5 −2
Original line number Diff line number Diff line
@@ -532,7 +532,10 @@ static void process_unpacked_message(listener *l,
                    BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128,
                        "successfully delivered box %p, marking info %p as DONE", (void*)box, (void*)info);
                    info->error = RX_ERROR_DONE;
                    assert(info->box == NULL);
                    BUS_LOG_SNPRINTF(b, 4, LOG_LISTENER, b->udata, 128,
                        "initial clean-up attempt for completed RX event at info %p", (void*)info);
                    clean_up_completed_info(l, info);
                    info = NULL; /* drop out of scope, likely to be stale */
                } else {
                    BUS_LOG_SNPRINTF(b, 3, LOG_MEMORY, b->udata, 128,
                        "returning box %p at line %d", (void*)box, __LINE__);
@@ -609,7 +612,7 @@ static void tick_handler(listener *l) {
                    "notifying of rx failure -- timeout (info %p)", (void*)info);
                notify_message_failure(l, info, BUS_SEND_RX_TIMEOUT);
            } else {
                BUS_LOG_SNPRINTF(b, 2, LOG_LISTENER, b->udata, 64,
                BUS_LOG_SNPRINTF(b, 3, LOG_LISTENER, b->udata, 64,
                    "decrementing countdown on info %p [%u]: %ld",
                    (void*)info, info->id, info->timeout_sec - 1);
                info->timeout_sec--;
+2 −0
Original line number Diff line number Diff line
@@ -61,8 +61,10 @@ typedef struct {
static void DefaultCallback(KineticCompletionData* kinetic_data, void* client_data)
{
    DefaultCallbackData * data = client_data;
    pthread_mutex_lock(&data->receiveCompleteMutex);
    data->status = kinetic_data->status;
    pthread_cond_signal(&data->receiveComplete);
    pthread_mutex_unlock(&data->receiveCompleteMutex);
}

static KineticCompletionClosure DefaultClosure(DefaultCallbackData * const data)
+42 −0
Original line number Diff line number Diff line
#include "kinetic_countingsemaphore.h"
#include "kinetic_countingsemaphore_types.h"
#include <stdlib.h>
#include <assert.h>

KineticCountingSemaphore * KineticCountingSemaphore_Create(uint32_t counts)
{
    KineticCountingSemaphore * sem = calloc(1, sizeof(KineticCountingSemaphore));
    if (sem == NULL) { return NULL; }
    pthread_mutex_init(&sem->mutex, NULL);
    pthread_cond_init(&sem->available, NULL);
    sem->count = counts;
    return sem;
}

void KineticCountingSemaphore_Take(KineticCountingSemaphore * const sem)
{
    assert(sem != NULL);
    pthread_mutex_lock(&sem->mutex);
    while (sem->count == 0)
    { pthread_cond_wait(&sem->available, &sem->mutex); }
    sem->count--;
    pthread_mutex_unlock(&sem->mutex);
}

void KineticCountingSemaphore_Give(KineticCountingSemaphore * const sem)
{
    assert(sem != NULL);
    pthread_mutex_lock(&sem->mutex);
    if (sem->count == 0)
    { pthread_cond_signal(&sem->available); }
    sem->count++;
    pthread_mutex_unlock(&sem->mutex);
}

void KineticCountingSemaphore_Destroy(KineticCountingSemaphore * const sem)
{
    assert(sem != NULL);
    pthread_mutex_destroy(&sem->mutex);
    pthread_cond_destroy(&sem->available);
    free(sem);
}
Loading