Loading include/kinetic/kinetic_connection_factory.h +44 −2 Original line number Diff line number Diff line Loading @@ -25,8 +25,10 @@ #include "kinetic/connection_options.h" #include "kinetic/hmac_provider.h" #include "kinetic/blocking_kinetic_connection.h" #include "kinetic/nonblocking_kinetic_connection.h" #include "kinetic/threadsafe_nonblocking_connection.h" #include "kinetic/threadsafe_blocking_kinetic_connection.h" #include "kinetic/status.h" namespace kinetic { Loading @@ -40,7 +42,7 @@ class KineticConnectionFactory { public: explicit KineticConnectionFactory(HmacProvider hmac_provider); /// Creates and opens a new connection using the given options. If the returned /// Creates and opens a new nonblocking connection using the given options. If the returned /// Status indicates success then the connection is ready to perform /// actions and the caller should delete it when done using it. If the /// Status indicates failure, then no connection will be created and Loading @@ -55,11 +57,51 @@ class KineticConnectionFactory { const ConnectionOptions& options, unique_ptr <NonblockingKineticConnection>& connection); /// Like NewConnection, except the connection is safe for use by multiple threads. virtual Status NewNonblockingConnection( const ConnectionOptions& options, shared_ptr <NonblockingKineticConnection>& connection); /// Like NewNonblockingConnection, except the connection is safe for use by multiple threads. virtual Status NewThreadsafeNonblockingConnection( const ConnectionOptions& options, unique_ptr <NonblockingKineticConnection>& connection); virtual Status NewThreadsafeNonblockingConnection( const ConnectionOptions& options, shared_ptr <NonblockingKineticConnection>& connection); /// Creates and opens a new blocking connection using the given options. If the returned /// Status indicates success then the connection is ready to perform /// actions and the caller should delete it when done using it. If the /// Status indicates failure, then no connection will be created and /// the caller must not attempt to use or delete it. /// /// @param[in] options Specifies host, port, user id, etc /// @param[in] network_timeout_seconds If an operation goes more than this many seconds without /// data the operation fails /// @param[out] connection Populated with a BlockingKineticConnection if the request /// succeeds virtual Status NewBlockingConnection( const ConnectionOptions& options, unique_ptr <BlockingKineticConnection>& connection, unsigned int network_timeout_seconds); virtual Status NewBlockingConnection( const ConnectionOptions& options, shared_ptr <BlockingKineticConnection>& connection, unsigned int network_timeout_seconds); /// Like NewBlockingConnection, except the connection is safe for use by multiple threads virtual Status NewThreadsafeBlockingConnection( const ConnectionOptions& options, unique_ptr <BlockingKineticConnection>& connection, unsigned int network_timeout_seconds); virtual Status NewThreadsafeBlockingConnection( const ConnectionOptions& options, shared_ptr <BlockingKineticConnection>& connection, unsigned int network_timeout_seconds); private: HmacProvider hmac_provider_; Status doNewConnection( Loading src/integration_test/integration_test.h +1 −3 Original line number Diff line number Diff line Loading @@ -89,10 +89,8 @@ class IntegrationTest : public ::testing::Test { for (size_t i = 0; i < kMaxRetries; ++i) { ASSERT_EQ(0, nanosleep(&sleep_time, NULL)); unique_ptr<NonblockingKineticConnection> nonblocking_connection(nullptr); if (connection_factory.NewNonblockingConnection(options, nonblocking_connection).ok()) { if (connection_factory.NewNonblockingConnection(options, nonblocking_connection_).ok()) { connected = true; nonblocking_connection_ = shared_ptr<NonblockingKineticConnection>(nonblocking_connection.release()); blocking_connection_.reset(new BlockingKineticConnection(nonblocking_connection_, 10)); break; } Loading src/main/kinetic_connection_factory.cc +66 −0 Original line number Diff line number Diff line Loading @@ -42,12 +42,78 @@ Status KineticConnectionFactory::NewNonblockingConnection( return doNewConnection(options, connection, false); } Status KineticConnectionFactory::NewNonblockingConnection( const ConnectionOptions& options, shared_ptr<NonblockingKineticConnection>& connection) { unique_ptr<NonblockingKineticConnection> nbc(nullptr); Status status = doNewConnection(options, nbc, false); if (status.ok()) { connection.reset(nbc.release()); } return status; } Status KineticConnectionFactory::NewThreadsafeNonblockingConnection( const ConnectionOptions& options, unique_ptr<NonblockingKineticConnection>& connection) { return doNewConnection(options, connection, true); } Status KineticConnectionFactory::NewThreadsafeNonblockingConnection( const ConnectionOptions& options, shared_ptr<NonblockingKineticConnection>& connection) { unique_ptr<NonblockingKineticConnection> nbc(nullptr); Status status = doNewConnection(options, nbc, true); if (status.ok()) { connection.reset(nbc.release()); } return status; } Status KineticConnectionFactory::NewBlockingConnection( const ConnectionOptions& options, unique_ptr<BlockingKineticConnection>& connection, unsigned int network_timeout_seconds) { unique_ptr<NonblockingKineticConnection> nbc(nullptr); Status status = doNewConnection(options, nbc, false); connection.reset(new BlockingKineticConnection(move(nbc), network_timeout_seconds)); return status; } Status KineticConnectionFactory::NewBlockingConnection( const ConnectionOptions& options, shared_ptr<BlockingKineticConnection>& connection, unsigned int network_timeout_seconds) { unique_ptr<BlockingKineticConnection> bc(nullptr); Status status = NewBlockingConnection(options, bc, network_timeout_seconds); if (status.ok()) { connection.reset(bc.release()); } return status; } Status KineticConnectionFactory::NewThreadsafeBlockingConnection( const ConnectionOptions& options, unique_ptr<BlockingKineticConnection>& connection, unsigned int network_timeout_seconds) { unique_ptr<NonblockingKineticConnection> nbc(nullptr); Status status = doNewConnection(options, nbc, true); connection.reset(new BlockingKineticConnection(move(nbc), network_timeout_seconds)); return status; } Status KineticConnectionFactory::NewThreadsafeBlockingConnection( const ConnectionOptions& options, shared_ptr<BlockingKineticConnection>& connection, unsigned int network_timeout_seconds) { unique_ptr<BlockingKineticConnection> bc(nullptr); Status status = NewThreadsafeBlockingConnection(options, bc, network_timeout_seconds); if (status.ok()) { connection.reset(bc.release()); } return status; } Status KineticConnectionFactory::doNewConnection( ConnectionOptions const& options, unique_ptr <NonblockingKineticConnection>& connection, bool threadsafe) { Loading Loading
include/kinetic/kinetic_connection_factory.h +44 −2 Original line number Diff line number Diff line Loading @@ -25,8 +25,10 @@ #include "kinetic/connection_options.h" #include "kinetic/hmac_provider.h" #include "kinetic/blocking_kinetic_connection.h" #include "kinetic/nonblocking_kinetic_connection.h" #include "kinetic/threadsafe_nonblocking_connection.h" #include "kinetic/threadsafe_blocking_kinetic_connection.h" #include "kinetic/status.h" namespace kinetic { Loading @@ -40,7 +42,7 @@ class KineticConnectionFactory { public: explicit KineticConnectionFactory(HmacProvider hmac_provider); /// Creates and opens a new connection using the given options. If the returned /// Creates and opens a new nonblocking connection using the given options. If the returned /// Status indicates success then the connection is ready to perform /// actions and the caller should delete it when done using it. If the /// Status indicates failure, then no connection will be created and Loading @@ -55,11 +57,51 @@ class KineticConnectionFactory { const ConnectionOptions& options, unique_ptr <NonblockingKineticConnection>& connection); /// Like NewConnection, except the connection is safe for use by multiple threads. virtual Status NewNonblockingConnection( const ConnectionOptions& options, shared_ptr <NonblockingKineticConnection>& connection); /// Like NewNonblockingConnection, except the connection is safe for use by multiple threads. virtual Status NewThreadsafeNonblockingConnection( const ConnectionOptions& options, unique_ptr <NonblockingKineticConnection>& connection); virtual Status NewThreadsafeNonblockingConnection( const ConnectionOptions& options, shared_ptr <NonblockingKineticConnection>& connection); /// Creates and opens a new blocking connection using the given options. If the returned /// Status indicates success then the connection is ready to perform /// actions and the caller should delete it when done using it. If the /// Status indicates failure, then no connection will be created and /// the caller must not attempt to use or delete it. /// /// @param[in] options Specifies host, port, user id, etc /// @param[in] network_timeout_seconds If an operation goes more than this many seconds without /// data the operation fails /// @param[out] connection Populated with a BlockingKineticConnection if the request /// succeeds virtual Status NewBlockingConnection( const ConnectionOptions& options, unique_ptr <BlockingKineticConnection>& connection, unsigned int network_timeout_seconds); virtual Status NewBlockingConnection( const ConnectionOptions& options, shared_ptr <BlockingKineticConnection>& connection, unsigned int network_timeout_seconds); /// Like NewBlockingConnection, except the connection is safe for use by multiple threads virtual Status NewThreadsafeBlockingConnection( const ConnectionOptions& options, unique_ptr <BlockingKineticConnection>& connection, unsigned int network_timeout_seconds); virtual Status NewThreadsafeBlockingConnection( const ConnectionOptions& options, shared_ptr <BlockingKineticConnection>& connection, unsigned int network_timeout_seconds); private: HmacProvider hmac_provider_; Status doNewConnection( Loading
src/integration_test/integration_test.h +1 −3 Original line number Diff line number Diff line Loading @@ -89,10 +89,8 @@ class IntegrationTest : public ::testing::Test { for (size_t i = 0; i < kMaxRetries; ++i) { ASSERT_EQ(0, nanosleep(&sleep_time, NULL)); unique_ptr<NonblockingKineticConnection> nonblocking_connection(nullptr); if (connection_factory.NewNonblockingConnection(options, nonblocking_connection).ok()) { if (connection_factory.NewNonblockingConnection(options, nonblocking_connection_).ok()) { connected = true; nonblocking_connection_ = shared_ptr<NonblockingKineticConnection>(nonblocking_connection.release()); blocking_connection_.reset(new BlockingKineticConnection(nonblocking_connection_, 10)); break; } Loading
src/main/kinetic_connection_factory.cc +66 −0 Original line number Diff line number Diff line Loading @@ -42,12 +42,78 @@ Status KineticConnectionFactory::NewNonblockingConnection( return doNewConnection(options, connection, false); } Status KineticConnectionFactory::NewNonblockingConnection( const ConnectionOptions& options, shared_ptr<NonblockingKineticConnection>& connection) { unique_ptr<NonblockingKineticConnection> nbc(nullptr); Status status = doNewConnection(options, nbc, false); if (status.ok()) { connection.reset(nbc.release()); } return status; } Status KineticConnectionFactory::NewThreadsafeNonblockingConnection( const ConnectionOptions& options, unique_ptr<NonblockingKineticConnection>& connection) { return doNewConnection(options, connection, true); } Status KineticConnectionFactory::NewThreadsafeNonblockingConnection( const ConnectionOptions& options, shared_ptr<NonblockingKineticConnection>& connection) { unique_ptr<NonblockingKineticConnection> nbc(nullptr); Status status = doNewConnection(options, nbc, true); if (status.ok()) { connection.reset(nbc.release()); } return status; } Status KineticConnectionFactory::NewBlockingConnection( const ConnectionOptions& options, unique_ptr<BlockingKineticConnection>& connection, unsigned int network_timeout_seconds) { unique_ptr<NonblockingKineticConnection> nbc(nullptr); Status status = doNewConnection(options, nbc, false); connection.reset(new BlockingKineticConnection(move(nbc), network_timeout_seconds)); return status; } Status KineticConnectionFactory::NewBlockingConnection( const ConnectionOptions& options, shared_ptr<BlockingKineticConnection>& connection, unsigned int network_timeout_seconds) { unique_ptr<BlockingKineticConnection> bc(nullptr); Status status = NewBlockingConnection(options, bc, network_timeout_seconds); if (status.ok()) { connection.reset(bc.release()); } return status; } Status KineticConnectionFactory::NewThreadsafeBlockingConnection( const ConnectionOptions& options, unique_ptr<BlockingKineticConnection>& connection, unsigned int network_timeout_seconds) { unique_ptr<NonblockingKineticConnection> nbc(nullptr); Status status = doNewConnection(options, nbc, true); connection.reset(new BlockingKineticConnection(move(nbc), network_timeout_seconds)); return status; } Status KineticConnectionFactory::NewThreadsafeBlockingConnection( const ConnectionOptions& options, shared_ptr<BlockingKineticConnection>& connection, unsigned int network_timeout_seconds) { unique_ptr<BlockingKineticConnection> bc(nullptr); Status status = NewThreadsafeBlockingConnection(options, bc, network_timeout_seconds); if (status.ok()) { connection.reset(bc.release()); } return status; } Status KineticConnectionFactory::doNewConnection( ConnectionOptions const& options, unique_ptr <NonblockingKineticConnection>& connection, bool threadsafe) { Loading