Loading src/read_file_nonblocking.cc +15 −5 Original line number Diff line number Diff line Loading @@ -30,9 +30,9 @@ using kinetic::Message; using kinetic::NonblockingKineticConnection; using kinetic::SocketWrapper; class TestCallback : public GetCallbackInterface { class Callback : public GetCallbackInterface { public: TestCallback(char* buffer, unsigned int expected_length, int* remaining) : buffer_(buffer), expected_length_(expected_length), remaining_(remaining) {}; Callback(char* buffer, unsigned int expected_length, int* remaining) : buffer_(buffer), expected_length_(expected_length), remaining_(remaining) {}; void Success(const std::string &key, std::unique_ptr<KineticRecord> record) { if(expected_length_ != record->value().size()) { printf("Received value chunk of wrong size\n"); Loading @@ -54,6 +54,7 @@ private: }; int main(int argc, char* argv[]) { google::InitGoogleLogging(argv[0]); if (argc != 4) { printf("%s: <host> <kinetic key> <output file name>\n", argv[0]); Loading Loading @@ -103,11 +104,17 @@ int main(int argc, char* argv[]) { printf("Unable to resize file\n"); return 1; } char* output_buffer = (char*)mmap(0, file_size, PROT_READ | PROT_WRITE, MAP_SHARED, file, 0); if (output_buffer == MAP_FAILED) { printf("Unable to mmap file errno=%d\n", errno); return 1; } char key_buffer[100]; int remaining = 0; fd_set read_fds, write_fds; int num_fds = 0; std::vector<std::unique_ptr<Callback> > callbacks; for (int64_t i = 0; i < file_size; i += 1024*1024) { unsigned int block_length = 1024*1024; if (i + block_length > file_size) { Loading @@ -116,10 +123,10 @@ int main(int argc, char* argv[]) { sprintf(key_buffer, "%s-%10" PRId64, kinetic_key, i); remaining++; TestCallback* callback = new TestCallback(output_buffer + i, block_length, &remaining); std::string key(key_buffer); connection->nonblocking().Get(key, callback); std::unique_ptr<Callback> callback(new Callback(output_buffer + i, block_length, &remaining)); connection->nonblocking().Get(std::string(key_buffer), callback.get()); connection->nonblocking().Run(&read_fds, &write_fds, &num_fds); callbacks.push_back(std::move(callback)); } connection->nonblocking().Run(&read_fds, &write_fds, &num_fds); Loading @@ -133,6 +140,9 @@ int main(int argc, char* argv[]) { printf("\nDone!\n"); delete connection; google::protobuf::ShutdownProtobufLibrary(); google::ShutdownGoogleLogging(); google::ShutDownCommandLineFlags(); return 0; } Loading
src/read_file_nonblocking.cc +15 −5 Original line number Diff line number Diff line Loading @@ -30,9 +30,9 @@ using kinetic::Message; using kinetic::NonblockingKineticConnection; using kinetic::SocketWrapper; class TestCallback : public GetCallbackInterface { class Callback : public GetCallbackInterface { public: TestCallback(char* buffer, unsigned int expected_length, int* remaining) : buffer_(buffer), expected_length_(expected_length), remaining_(remaining) {}; Callback(char* buffer, unsigned int expected_length, int* remaining) : buffer_(buffer), expected_length_(expected_length), remaining_(remaining) {}; void Success(const std::string &key, std::unique_ptr<KineticRecord> record) { if(expected_length_ != record->value().size()) { printf("Received value chunk of wrong size\n"); Loading @@ -54,6 +54,7 @@ private: }; int main(int argc, char* argv[]) { google::InitGoogleLogging(argv[0]); if (argc != 4) { printf("%s: <host> <kinetic key> <output file name>\n", argv[0]); Loading Loading @@ -103,11 +104,17 @@ int main(int argc, char* argv[]) { printf("Unable to resize file\n"); return 1; } char* output_buffer = (char*)mmap(0, file_size, PROT_READ | PROT_WRITE, MAP_SHARED, file, 0); if (output_buffer == MAP_FAILED) { printf("Unable to mmap file errno=%d\n", errno); return 1; } char key_buffer[100]; int remaining = 0; fd_set read_fds, write_fds; int num_fds = 0; std::vector<std::unique_ptr<Callback> > callbacks; for (int64_t i = 0; i < file_size; i += 1024*1024) { unsigned int block_length = 1024*1024; if (i + block_length > file_size) { Loading @@ -116,10 +123,10 @@ int main(int argc, char* argv[]) { sprintf(key_buffer, "%s-%10" PRId64, kinetic_key, i); remaining++; TestCallback* callback = new TestCallback(output_buffer + i, block_length, &remaining); std::string key(key_buffer); connection->nonblocking().Get(key, callback); std::unique_ptr<Callback> callback(new Callback(output_buffer + i, block_length, &remaining)); connection->nonblocking().Get(std::string(key_buffer), callback.get()); connection->nonblocking().Run(&read_fds, &write_fds, &num_fds); callbacks.push_back(std::move(callback)); } connection->nonblocking().Run(&read_fds, &write_fds, &num_fds); Loading @@ -133,6 +140,9 @@ int main(int argc, char* argv[]) { printf("\nDone!\n"); delete connection; google::protobuf::ShutdownProtobufLibrary(); google::ShutdownGoogleLogging(); google::ShutDownCommandLineFlags(); return 0; }