Commit 4d08b0c8 authored by Greg Williams's avatar Greg Williams
Browse files

Updated Java simulator startup to only wait until the simulator is listing on...

Updated Java simulator startup to only wait until the simulator is listing on the kinetic port rather than blind sleeps
parent 6ef0f60a
Loading
Loading
Loading
Loading
+2 −9
Original line number Diff line number Diff line
@@ -229,18 +229,10 @@ update_simulator:
	cp vendor/kinetic-java/kinetic-simulator/target/*.jar vendor/kinetic-java-simulator/

start_simulator:
	@echo STARTING SIMULATOR...
	sleep 1
	./vendor/kinetic-simulator/startSimulator.sh &
	sleep 3
	@echo STARTED SIMULATOR!!!
	./vendor/kinetic-simulator/startSimulator.sh

stop_simulator:
	@echo STOPPING SIMULATOR...
	sleep 1
	./vendor/kinetic-simulator/stopSimulator.sh
	sleep 1
	@echo STOPPED SIMULATOR!!!


#===============================================================================
@@ -322,6 +314,7 @@ test_example_%: $(BIN_DIR)/examples/%
	$<
	@echo ================================================================================
	@echo
	./vendor/kinetic-simulator/stopSimulator.sh
test_example_%: start_simulator

run_example_%: $(BIN_DIR)/examples/%
+16 −5
Original line number Diff line number Diff line
@@ -134,6 +134,10 @@ namespace :java_sim do
  JAVA_BIN = File.join(JAVA_HOME, 'bin/java')
  $java_sim = nil

  def kinetic_device_listening?
    `netstat -an` =~ /\*\.8123.+\s+LISTEN\s+/
  end

  def java_sim_start

    return if $java_sim
@@ -150,10 +154,17 @@ namespace :java_sim do
    ENV['CLASSPATH'] = '' unless ENV['CLASSPATH']
    jars += [File.join(JAVA_HOME, 'lib/tools.jar')]
    jars.each {|jar| ENV['CLASSPATH'] += ':' + jar }
    $java_sim = spawn("#{JAVA_BIN} -classpath #{ENV['CLASSPATH']} com.seagate.kinetic.simulator.internal.SimulatorRunner") # &> ./build/kinetic-simulator.log") # &> ./build/kinetic-simulator-test.log")
    sleep 3 # wait for simulator to start up and server ready to receive connections
    # TODO: use netstat or something to just wait until the server opens the port
    #       since it might take longer than the hardcoded sleep(x) above :-/
    $java_sim = spawn("#{JAVA_BIN} -classpath #{ENV['CLASSPATH']} com.seagate.kinetic.simulator.internal.SimulatorRunner")
    max_wait_secs = 10
    sleep_duration = 0.2
    timeout = false
    elapsed_secs = 0
    while !kinetic_device_listening? && !timeout do
      sleep(sleep_duration)
      elapsed_secs += sleep_duration
      timeout = (elapsed_secs > max_wait_secs)
    end
    raise "Kinetic Java simulator failed to start within #{max_wait_secs} seconds!" if timeout
  end

  def java_sim_shutdown
@@ -169,7 +180,7 @@ namespace :java_sim do

  def java_sim_erase_drive
    java_sim_start
    sh "\"#{JAVA_BIN}\" -classpath \"#{ENV['CLASSPATH']}\" com.seagate.kinetic.admin.cli.KineticAdminCLI -instanterase" # &> ./build/kinetic-simulator-setup.log"
    sh "\"#{JAVA_BIN}\" -classpath \"#{ENV['CLASSPATH']}\" com.seagate.kinetic.admin.cli.KineticAdminCLI -instanterase"
  end

  def java_sim_cleanup
+1 −1
Original line number Diff line number Diff line
@@ -108,7 +108,7 @@ int main(int argc, char** argv)
        .hmacKey = ByteArray_CreateWithCString(HmacKeyString),
    };
    write_args* writeArgs = calloc(1, sizeof(write_args));
    KineticClient_Init("stdout", 2);
    KineticClient_Init("stdout", 0);
    status = KineticClient_Connect(&sessionConfig, &writeArgs->sessionHandle);
    if (status != KINETIC_STATUS_SUCCESS) {
        fprintf(stderr, "Connection to host '%s' failed w/ status: %s",
+97 −50
Original line number Diff line number Diff line
@@ -27,8 +27,19 @@
#include <sys/param.h>
#include <sys/stat.h>
#include <sys/file.h>
#include <pthread.h>
#include <errno.h>

#define MAX_ITERATIONS (2)
#define NUM_COPIES (3)
#define BUFSIZE  (128 * KINETIC_OBJ_SIZE)
#define KINETIC_MAX_THREADS (10)
#define MAX_OBJ_SIZE (KINETIC_OBJ_SIZE)

#define REPORT_ERRNO(en, msg) if(en != 0){errno = en; perror(msg);}

typedef struct {
    pthread_t threadID;
    char ip[16];
    KineticSessionHandle sessionHandle;
    char keyPrefix[KINETIC_DEFAULT_KEY_LEN];
@@ -41,41 +52,44 @@ typedef struct {
    KineticStatus status;
} write_args;

void store_data(write_args* args)
void* store_data(void* args)
{
    KineticEntry* entry = &(args->entry);
    write_args* thread_args = (write_args*)args;
    KineticEntry* entry = &(thread_args->entry);
    int32_t objIndex = 0;

    while (ByteBuffer_BytesRemaining(args->data) > 0) {
    while (ByteBuffer_BytesRemaining(thread_args->data) > 0) {

        // Configure entry meta-data
        ByteBuffer_Reset(&entry->key);
        ByteBuffer_AppendCString(&entry->key, args->keyPrefix);
        // Configure meta-data
        char keySuffix[8];
        snprintf(keySuffix, sizeof(keySuffix), "%02d", objIndex);
        entry->key.bytesUsed = strlen(thread_args->keyPrefix);
        ByteBuffer_AppendCString(&entry->key, keySuffix);

        // Prepare entry with the next object to store
        // Prepare the next chunk of data to store
        ByteBuffer_Reset(&entry->value);
        ByteBuffer_AppendArray(
            &entry->value,
            ByteBuffer_Consume(
                &args->data,
                MIN(ByteBuffer_BytesRemaining(args->data), KINETIC_OBJ_SIZE))
            ByteBuffer_Consume(&thread_args->data, MIN(ByteBuffer_BytesRemaining(thread_args->data), MAX_OBJ_SIZE))
        );

        // Store the object
        KineticStatus status = KineticClient_Put(args->sessionHandle, entry, NULL);
        // Set operation-specific attributes
        entry->synchronization = KINETIC_SYNCHRONIZATION_WRITEBACK;

        // Store the data slice
        KineticStatus status = KineticClient_Put(thread_args->sessionHandle, entry, NULL);
        if (status != KINETIC_STATUS_SUCCESS) {
            fprintf(stderr, "Kinetic PUT of object %d to host %s failed w/ status: %s\n",
                objIndex, args->ip, Kinetic_GetStatusDescription(status));
            exit(-1);
            fprintf(stderr, "Failed writing entry %d to disk w/status: %s",
                objIndex+1, Kinetic_GetStatusDescription(status));
            return (void*)NULL;
        }

        objIndex++;
    }

    printf("File stored on Kinetic Device across %d entries\n", objIndex);
    printf("File stored to successfully to Kinetic Device across %d entries!\n", objIndex);

    return (void*)NULL;
}

int main(int argc, char** argv)
@@ -84,6 +98,7 @@ int main(int argc, char** argv)
    (void)argv;

    // Read in file contents to store
    KineticStatus status;
    const char* dataFile = "test/support/data/test.data";
    struct stat st;
    stat(dataFile, &st);
@@ -92,12 +107,17 @@ int main(int argc, char** argv)
    long dataLen = read(fd, buf, st.st_size);
    close(fd);
    if (dataLen <= 0) {
        fprintf(stderr, "Failed reading data file to store: %s", dataFile);
        fprintf(stderr, "Failed reading data file to store: %s\n", dataFile);
        exit(-1);
    }

    // Establish connection
    KineticStatus status;
    // Allocate session/thread data
    write_args* writeArgs = calloc(NUM_COPIES, sizeof(write_args));
    if (writeArgs == NULL) {
        fprintf(stderr, "Failed allocating overlapped thread arguments!\n");
    }

    // Initialize kinetic-c and configure sessions
    const char HmacKeyString[] = "asdfasdf";
    const KineticSession sessionConfig = {
        .host = "localhost",
@@ -107,41 +127,68 @@ int main(int argc, char** argv)
        .nonBlocking = false,
        .hmacKey = ByteArray_CreateWithCString(HmacKeyString),
    };
    write_args* writeArgs = calloc(1, sizeof(write_args));
    KineticClient_Init("stdout", 2);
    status = KineticClient_Connect(&sessionConfig, &writeArgs->sessionHandle);
    KineticClient_Init("stdout", 0);

    // Establish all of the connection first, so their session can all get initialized first
    for (int i = 0; i < NUM_COPIES; i++) {

        // Establish connection
        status = KineticClient_Connect(&sessionConfig, &writeArgs[i].sessionHandle);
        if (status != KINETIC_STATUS_SUCCESS) {
        fprintf(stderr, "Connection to host '%s' failed w/ status: %s",
            sessionConfig.host, Kinetic_GetStatusDescription(status));
            fprintf(stderr, "Failed connecting to the Kinetic device w/status: %s\n",
                Kinetic_GetStatusDescription(status));
            return -1;
        }
        strcpy(writeArgs[i].ip, sessionConfig.host);

        // Create a ByteBuffer for consuming chunks of data out of for overlapped PUTs
    writeArgs->data = ByteBuffer_Create(buf, dataLen, 0);
        writeArgs[i].data = ByteBuffer_Create(buf, dataLen, 0);

    // Configure common meta-data for the entries
        // Configure the KineticEntry
        struct timeval now;
        gettimeofday(&now, NULL);
    snprintf(writeArgs->keyPrefix, sizeof(writeArgs->keyPrefix), "%010ld_", now.tv_sec);
    ByteBuffer verBuf = ByteBuffer_Create(writeArgs->version, sizeof(writeArgs->version), 0);
        snprintf(writeArgs[i].keyPrefix, sizeof(writeArgs[i].keyPrefix), "%010llu_%02d_",
            (unsigned long long)now.tv_sec, i);
        ByteBuffer keyBuf = ByteBuffer_Create(writeArgs[i].key, sizeof(writeArgs[i].key), 0);
        ByteBuffer_AppendCString(&keyBuf, writeArgs[i].keyPrefix);
        ByteBuffer verBuf = ByteBuffer_Create(writeArgs[i].version, sizeof(writeArgs[i].version), 0);
        ByteBuffer_AppendCString(&verBuf, "v1.0");
    ByteBuffer tagBuf = ByteBuffer_Create(writeArgs->tag, sizeof(writeArgs->tag), 0);
        ByteBuffer tagBuf = ByteBuffer_Create(writeArgs[i].tag, sizeof(writeArgs[i].tag), 0);
        ByteBuffer_AppendCString(&tagBuf, "some_value_tag...");
    writeArgs->entry = (KineticEntry) {
        .key = ByteBuffer_Create(writeArgs->key, sizeof(writeArgs->key), 0),
        ByteBuffer valBuf = ByteBuffer_Create(writeArgs[i].value, sizeof(writeArgs[i].value), 0);
        writeArgs[i].entry = (KineticEntry) {
            .key = keyBuf,
            // .newVersion = verBuf,
            .tag = tagBuf,
            .algorithm = KINETIC_ALGORITHM_SHA1,
        .value = ByteBuffer_Create(writeArgs->value, sizeof(writeArgs->value), 0),
        .synchronization = KINETIC_SYNCHRONIZATION_WRITEBACK,
            .value = valBuf,
        };
    strcpy(writeArgs->ip, sessionConfig.host);
    }
    sleep(2); // Give a generous chunk of time for session to be initialized by the target device

    // Write all of the copies simultaneously (overlapped)
    for (int i = 0; i < NUM_COPIES; i++) {
        printf("  *** Overlapped PUT operations (writing copy %d of %d)"
               " on IP: %s\n", i + 1, NUM_COPIES, sessionConfig.host);
        int threadCreateStatus = pthread_create(&writeArgs[i].threadID, NULL, store_data, &writeArgs[i]);
        REPORT_ERRNO(threadCreateStatus, "pthread_create");
        if (threadCreateStatus != 0) {
            fprintf(stderr, "pthread create failed!\n");
            exit(-2);
        }
    }

    // Store the data
    printf("\nWriting data file to the Kinetic device...\n");
    store_data(writeArgs);
    // Wait for each overlapped PUT operations to complete and cleanup
    printf("  *** Waiting for PUT threads to exit...\n");
    for (int i = 0; i < NUM_COPIES; i++) {
        int joinStatus = pthread_join(writeArgs[i].threadID, NULL);
        if (joinStatus != 0) {
            fprintf(stderr, "pthread join failed!\n");
        }
        KineticClient_Disconnect(&writeArgs[i].sessionHandle);
    }

    // Shutdown client connection and cleanup
    KineticClient_Disconnect(&writeArgs->sessionHandle);
    KineticClient_Shutdown();
    free(writeArgs);
    free(buf);
+1 −1
Original line number Diff line number Diff line
@@ -111,7 +111,7 @@ KineticStatus KineticConnection_Disconnect(KineticConnection* const connection)
    // Shutdown the worker thread
    KineticStatus status = KINETIC_STATUS_SUCCESS;
    connection->thread.abortRequested = true;
    LOG2("\nSent abort request to worker thread!\n");
    LOG3("\nSent abort request to worker thread!\n");
    int pthreadStatus = pthread_join(connection->threadID, NULL);
    if (pthreadStatus != 0) {
        char errMsg[256];
Loading