Commit 8d6728cf authored by Manuel Wudka-Robles's avatar Manuel Wudka-Robles
Browse files

Added nonblocking writes example

parent 4730e4e0
Loading
Loading
Loading
Loading
+8 −7
Original line number Diff line number Diff line
@@ -91,8 +91,8 @@ int main(int argc, char* argv[]) {
        return 1;
    }

    unsigned int file_size = std::stoi(value);
    printf("Reading file of size %d\n", file_size);
    long long file_size = std::stoll(value);
    printf("Reading file of size %llu\n", file_size);


    delete kinetic_connection;
@@ -116,24 +116,25 @@ int main(int argc, char* argv[]) {
    char* output_buffer = (char*)mmap(0, file_size, PROT_READ | PROT_WRITE, MAP_SHARED, file, 0);
    char key_buffer[100];
    int remaining = 0;
    for (unsigned int i = 0; i < file_size; i += 1024*1024) {
    fd_set read_fds, write_fds;
    int num_fds = 0;
    for (off_t i = 0; i < file_size; i += 1024*1024) {
        unsigned int block_length = 1024*1024;
        if (i + block_length > file_size) {
            block_length = file_size - i + 1;
        }

        sprintf(key_buffer, "%s-%10d", kinetic_key, i);
        sprintf(key_buffer, "%s-%10llu", kinetic_key, i);
        remaining++;
        TestCallback* callback = new TestCallback(output_buffer + i, block_length, &remaining);
        std::string key(key_buffer);
        connection->Get(key, callback);
        connection->Run(&read_fds, &write_fds, &num_fds);
    }


    fd_set read_fds, write_fds;
    int num_fds = 0;
    connection->Run(&read_fds, &write_fds, &num_fds);
    while (remaining > 0) {
        while(select(num_fds + 1, &read_fds, &write_fds, NULL, NULL) <= 0);
        connection->Run(&read_fds, &write_fds, &num_fds);
    }

+5 −5
Original line number Diff line number Diff line
@@ -76,13 +76,15 @@ int main(int argc, char* argv[]) {
    char* inputfile_data = (char*)mmap(0, inputfile_stat.st_size, PROT_READ, MAP_SHARED, file, 0);
    char key_buffer[100];
    int remaining = 0;
    for (int i = 0; i < inputfile_stat.st_size; i += 1024*1024) {
    fd_set read_fds, write_fds;
    int num_fds = 0;
    for (off_t i = 0; i < inputfile_stat.st_size; i += 1024*1024) {
        int value_size = 1024*1024;
        if (i + value_size > inputfile_stat.st_size) {
            value_size = inputfile_stat.st_size - i + 1;
        }

        sprintf(key_buffer, "%s-%10d", kinetic_key, i);
        sprintf(key_buffer, "%s-%10llu", kinetic_key, i);

        std::string key(key_buffer);
        std::string value(inputfile_data + i, value_size);
@@ -90,16 +92,14 @@ int main(int argc, char* argv[]) {
        KineticRecord record(value, "", "", Message_Algorithm_SHA1);
        remaining++;
        connection->Put(key, "", true, record, new PutCallback(&remaining));
        connection->Run(&read_fds, &write_fds, &num_fds);

    }
    printf("\n");

    KineticRecord record(std::to_string(inputfile_stat.st_size), "", "", Message_Algorithm_SHA1);
    remaining++;
    connection->Put(kinetic_key, "", true, record, new PutCallback(&remaining));

    fd_set read_fds, write_fds;
    int num_fds = 0;
    connection->Run(&read_fds, &write_fds, &num_fds);
    while (remaining > 0) {
        while (select(num_fds + 1, &read_fds, &write_fds, NULL, NULL) <= 0);