Commit 9f585463 authored by Scott Vokes's avatar Scott Vokes
Browse files

Initial version of messaging bus

parent 4ff6389e
Loading
Loading
Loading
Loading
+12 −0
Original line number Diff line number Diff line
@@ -255,4 +255,16 @@ KineticStatus KineticClient_P2POperation(KineticSession const * const session,
 */
KineticStatus KineticClient_InstantSecureErase(KineticSession const * const session);

/**
 * @brief Updates the cluster version.
 *
 * @param clusterVersion      Current cluster version.
 * @param newClusterVersion   New cluster version.
 *
 * @return              Returns the resulting KineticStatus.
 */
KineticStatus KineticClient_SetClusterVersion(KineticSessionHandle handle,
                                              int64_t clusterVersion,
                                              int64_t newClusterVersion);

#endif // _KINETIC_CLIENT_H
+15 −13
Original line number Diff line number Diff line
@@ -97,7 +97,7 @@ int main(int argc, char** argv)
    // Establish connection
    KineticStatus status = KineticClient_CreateConnection(&session);
    if (status != KINETIC_STATUS_SUCCESS) {
        fprintf(stderr, "Failed connecting to the Kinetic device w/status: %s\n",
        fprintf(stdout, "Failed connecting to the Kinetic device w/status: %s\n",
            Kinetic_GetStatusDescription(status));
        return -1;
    }
@@ -109,24 +109,26 @@ int main(int argc, char** argv)

    // Store the file(s) and wait for completion
    bool success = true;
    const int maxOverlappedChunks = 15;

    StoreFileOperation ops[] = {
        {
            .session = &session,
            .filename = "./test/support/data/file_a.png",
            .keyPrefix = prefix,
            .maxOverlappedChunks = 4,
            .maxOverlappedChunks = maxOverlappedChunks,
        },
        {
            .session = &session,
            .filename = "./test/support/data/file_b.png",
            .keyPrefix = prefix,
            .maxOverlappedChunks = 4,
            .maxOverlappedChunks = maxOverlappedChunks,
        },
        {
            .session = &session,
            .filename = "./test/support/data/file_c.png",
            .keyPrefix = prefix,
            .maxOverlappedChunks = 4,
            .maxOverlappedChunks = maxOverlappedChunks,
        },
    };
    const int numFiles = sizeof(ops) / sizeof(StoreFileOperation);
@@ -135,7 +137,7 @@ int main(int argc, char** argv)
        int pthreadStatus = pthread_create(&ops[i].thread, NULL, store_file_thread, &ops[i]);
        if (pthreadStatus != 0) {
            REPORT_ERRNO(pthreadStatus, "pthread_create");
            fprintf(stderr, "Failed creating store thread for '%s'!\n", ops[i].filename);
            fprintf(stdout, "Failed creating store thread for '%s'!\n", ops[i].filename);
            success = false;
        }
    }
@@ -146,7 +148,7 @@ int main(int argc, char** argv)
        }
        else {
            REPORT_ERRNO(pthreadStatus, "pthread_join");
            fprintf(stderr, "Failed storing '%s' to disk! status: %s\n",
            fprintf(stdout, "Failed storing '%s' to disk! status: %s\n",
                ops[i].filename, Kinetic_GetStatusDescription(ops[i].status));
            success = false;
        }
@@ -168,8 +170,9 @@ void* store_file_thread(void* storeArgs)
        start_file_transfer(op->session, op->filename, op->keyPrefix, op->maxOverlappedChunks);
    op->status = wait_for_transfer_complete(transfer);
    if (op->status != KINETIC_STATUS_SUCCESS) {
        fprintf(stderr, "Transfer failed w/status: %s\n", Kinetic_GetStatusDescription(op->status));
        fprintf(stdout, "Transfer failed w/status: %s\n", Kinetic_GetStatusDescription(op->status));
    }

    return (void*)storeArgs;
}

@@ -184,6 +187,7 @@ FileTransferProgress * start_file_transfer(KineticSession* session,
            sizeof(transferState->keyPrefixBuffer), &prefix, sizeof(prefix)),
        .fd = open(filename, O_RDONLY),
    };

    pthread_mutex_init(&transferState->transferMutex, NULL);
    pthread_mutex_init(&transferState->completeMutex, NULL); 
    pthread_cond_init(&transferState->completeCond, NULL);
@@ -243,7 +247,7 @@ int put_chunk_of_file(FileTransferProgress* transfer)
        if (status != KINETIC_STATUS_SUCCESS) {
            transfer->opsInProgress--;
            free(closureData);
            fprintf(stderr, "Failed writing chunk! PUT request reported status: %s\n",
            fprintf(stdout, "Failed writing chunk! PUT request reported status: %s\n",
                Kinetic_GetStatusDescription(status));
        }
    }
@@ -254,7 +258,7 @@ int put_chunk_of_file(FileTransferProgress* transfer)
    else {
        transfer->opsInProgress--;
        free(closureData);
        fprintf(stderr, "Failed reading data from file!\n");
        fprintf(stdout, "Failed reading data from file!\n");
        REPORT_ERRNO(bytesRead, "read");
    }

@@ -289,9 +293,7 @@ void put_chunk_of_file_finished(KineticCompletionData* kinetic_data, void* clien
        // if there is no more data to read (or error), and no outstanding operations,
        // then signal
        pthread_cond_signal(&transfer->completeCond);
        pthread_mutex_destroy(&transfer->transferMutex);
        pthread_mutex_destroy(&transfer->completeMutex);
        fprintf(stderr, "Failed writing chunk! PUT response reported status: %s\n",
        fprintf(stdout, "Failed writing chunk! PUT response reported status: %s\n",
            Kinetic_GetStatusDescription(kinetic_data->status));
    }
}

src/lib/bus/.gitignore

0 → 100644
+6 −0
Original line number Diff line number Diff line
TAGS
*.dSYM/
bus_example
echosrv
notes
test_casq

src/lib/bus/Makefile

0 → 100644
+47 −0
Original line number Diff line number Diff line
SOCKET99_PATH=	../../../vendor/socket99
THREADPOOL_PATH=	../threadpool

OPT=		-O0
LIB_INC =	-I${SOCKET99_PATH} -I${THREADPOOL_PATH}
CFLAGS +=	-std=c99 ${OPT} -g ${LIB_INC}
LDFLAGS +=	-L. -lcasq -L${SOCKET99_PATH} -lsocket99 -L${THREADPOOL_PATH} -lthreadpool

BUS_OBJS = \
	bus.o \
	listener.o \
	sender.o \
	util.o \

ECHOSRV_OBJS = \
	echosrv.o \
	util.o \

all: bus.png libcasq.a test_casq echosrv bus_example

test: test_casq
	./test_casq

%.png: %.dot
	dot -Tpng -o $@ $^

libcasq.a: casq.o
	ar -rcs $@ $^

test_casq: libcasq.a test_casq.o
	${CC} -o $@ ${LDFLAGS} $^

echosrv: libcasq.a ${ECHOSRV_OBJS}
	${CC} -o $@ ${LDFLAGS} $^

bus_example: libcasq.a ${BUS_OBJS} bus_example.o
	${CC} -o $@ ${LDFLAGS} $^

clean:
	rm -f *.a *.o test_casq echosrv bus_example

tags: TAGS

TAGS: *.[ch]
	etags *.[ch]

*.o: bus_types.h

src/lib/bus/atomic.h

0 → 100644
+18 −0
Original line number Diff line number Diff line
#ifndef ATOMIC_H
#define ATOMIC_H

#define ATOMIC_BOOL_COMPARE_AND_SWAP(PTR, OLD, NEW)     \
    (__sync_bool_compare_and_swap(PTR, OLD, NEW))

/* Spin attempting to atomically adjust F by ADJ until successful */
#define SPIN_ADJ(F, ADJ)                                                \
    do {                                                                \
        for (;;) {                                                      \
            size_t v = F;                                               \
            if (ATOMIC_BOOL_COMPARE_AND_SWAP(&F, v, v + ADJ)) {         \
                break;                                                  \
            }                                                           \
        }                                                               \
    } while (0)

#endif
Loading