Loading src/read_file_nonblocking.cc +17 −3 Original line number Diff line number Diff line Loading @@ -24,7 +24,8 @@ using std::string; class Callback : public GetCallbackInterface { public: Callback(char* buffer, unsigned int expected_length, int* remaining) : buffer_(buffer), expected_length_(expected_length), remaining_(remaining) {}; Callback(char* buffer, unsigned int expected_length, int* remaining) : buffer_(buffer), expected_length_(expected_length), remaining_(remaining) {}; void Success(const std::string &key, std::unique_ptr<KineticRecord> record) { if(expected_length_ != record->value()->size()) { printf("Received value chunk of wrong size\n"); Loading @@ -36,7 +37,7 @@ public: (*remaining_)--; } void Failure(StatusCode error) { printf("Error!\n"); printf("Error: %d\n", static_cast<int>(error)); exit(1); } private: Loading Loading @@ -122,7 +123,20 @@ int main(int argc, char* argv[]) { connection->nonblocking().Run(&read_fds, &write_fds, &num_fds); while (remaining > 0) { while(select(num_fds + 1, &read_fds, &write_fds, NULL, NULL) <= 0); struct timeval tv; tv.tv_sec = 10; tv.tv_usec = 0; int number_ready_fds = select(num_fds + 1, &read_fds, &write_fds, NULL, &tv); if (number_ready_fds < 0) { // select() returned an error printf("i/o error: %s\n", strerror(errno)); return 1; } else if (number_ready_fds == 0) { printf("connection timed out\n"); return 1; } connection->nonblocking().Run(&read_fds, &write_fds, &num_fds); } Loading Loading
src/read_file_nonblocking.cc +17 −3 Original line number Diff line number Diff line Loading @@ -24,7 +24,8 @@ using std::string; class Callback : public GetCallbackInterface { public: Callback(char* buffer, unsigned int expected_length, int* remaining) : buffer_(buffer), expected_length_(expected_length), remaining_(remaining) {}; Callback(char* buffer, unsigned int expected_length, int* remaining) : buffer_(buffer), expected_length_(expected_length), remaining_(remaining) {}; void Success(const std::string &key, std::unique_ptr<KineticRecord> record) { if(expected_length_ != record->value()->size()) { printf("Received value chunk of wrong size\n"); Loading @@ -36,7 +37,7 @@ public: (*remaining_)--; } void Failure(StatusCode error) { printf("Error!\n"); printf("Error: %d\n", static_cast<int>(error)); exit(1); } private: Loading Loading @@ -122,7 +123,20 @@ int main(int argc, char* argv[]) { connection->nonblocking().Run(&read_fds, &write_fds, &num_fds); while (remaining > 0) { while(select(num_fds + 1, &read_fds, &write_fds, NULL, NULL) <= 0); struct timeval tv; tv.tv_sec = 10; tv.tv_usec = 0; int number_ready_fds = select(num_fds + 1, &read_fds, &write_fds, NULL, &tv); if (number_ready_fds < 0) { // select() returned an error printf("i/o error: %s\n", strerror(errno)); return 1; } else if (number_ready_fds == 0) { printf("connection timed out\n"); return 1; } connection->nonblocking().Run(&read_fds, &write_fds, &num_fds); } Loading