Loading include/kinetic/nonblocking_kinetic_connection.h +9 −0 Original line number Diff line number Diff line Loading @@ -43,6 +43,8 @@ using com::seagate::kinetic::client::proto::Message_MessageType; using com::seagate::kinetic::client::proto::Message_P2POperation; using com::seagate::kinetic::client::proto::Message_Synchronization; using std::shared_ptr; using std::unique_ptr; using std::string; Loading Loading @@ -271,6 +273,11 @@ class NonblockingKineticConnection { const shared_ptr<const KineticRecord> record, const shared_ptr<PutCallbackInterface> callback, PersistMode persistMode); virtual HandlerKey Delete(const shared_ptr<const string> key, const shared_ptr<const string> version, WriteMode mode, const shared_ptr<SimpleCallbackInterface> callback, PersistMode persistMode); virtual HandlerKey Delete(const string key, const string version, WriteMode mode, const shared_ptr<SimpleCallbackInterface> callback, PersistMode persistMode); virtual HandlerKey Delete(const shared_ptr<const string> key, const shared_ptr<const string> version, WriteMode mode, const shared_ptr<SimpleCallbackInterface> callback); Loading Loading @@ -305,9 +312,11 @@ class NonblockingKineticConnection { void PopulateP2PMessage(Message_P2POperation *mutable_p2pop, const shared_ptr<const P2PPushRequest> push_request); unique_ptr<Message> NewMessage(Message_MessageType message_type); Message_Synchronization GetSynchronizationForPersistMode(PersistMode persistMode); NonblockingPacketServiceInterface *service_; const shared_ptr<const string> empty_str_; int64_t cluster_version_; DISALLOW_COPY_AND_ASSIGN(NonblockingKineticConnection); Loading src/main/nonblocking_kinetic_connection.cc +37 −15 Original line number Diff line number Diff line Loading @@ -353,20 +353,8 @@ HandlerKey NonblockingKineticConnection::Put(const shared_ptr<const string> key, request->mutable_command()->mutable_body()->mutable_keyvalue()->set_algorithm( record->algorithm()); Message_Synchronization sync_option; switch (persistMode) { case PersistMode::WRITE_BACK: sync_option = Message_Synchronization_WRITEBACK; break; case PersistMode::WRITE_THROUGH: sync_option = Message_Synchronization_WRITETHROUGH; break; case PersistMode::FLUSH: sync_option = Message_Synchronization_FLUSH; break; } request->mutable_command()->mutable_body()->mutable_keyvalue()->set_synchronization(sync_option); request->mutable_command()->mutable_body()->mutable_keyvalue()->set_synchronization( this->GetSynchronizationForPersistMode(persistMode)); return service_->Submit(move(request), record->value(), move(handler)); } Loading Loading @@ -400,7 +388,8 @@ HandlerKey NonblockingKineticConnection::Put(const string key, HandlerKey NonblockingKineticConnection::Delete(const shared_ptr<const string> key, const shared_ptr<const string> version, WriteMode mode, const shared_ptr<SimpleCallbackInterface> callback) { const shared_ptr<SimpleCallbackInterface> callback, PersistMode persistMode) { unique_ptr<SimpleHandler> handler(new SimpleHandler(callback)); unique_ptr<Message> request = NewMessage(Message_MessageType_DELETE); Loading @@ -409,9 +398,27 @@ HandlerKey NonblockingKineticConnection::Delete(const shared_ptr<const string> k // TODO(marshall) handle null version request->mutable_command()->mutable_body()->mutable_keyvalue()->set_dbversion(*version); request->mutable_command()->mutable_body()->mutable_keyvalue()->set_force(force); request->mutable_command()->mutable_body()->mutable_keyvalue()->set_synchronization( this->GetSynchronizationForPersistMode(persistMode)); return service_->Submit(move(request), empty_str_, move(handler)); } HandlerKey NonblockingKineticConnection::Delete(const string key, const string version, WriteMode mode, const shared_ptr<SimpleCallbackInterface> callback, PersistMode persistMode) { return this->Delete(make_shared<string>(key), make_shared<string>(version), mode, callback, persistMode); } HandlerKey NonblockingKineticConnection::Delete(const shared_ptr<const string> key, const shared_ptr<const string> version, WriteMode mode, const shared_ptr<SimpleCallbackInterface> callback) { // Default to the WRITE_BACK case, which performs better but does // not guarantee immediate persistence return this->Delete(key, version, mode, callback, PersistMode::WRITE_BACK); } HandlerKey NonblockingKineticConnection::Delete(const string key, const string version, WriteMode mode, const shared_ptr<SimpleCallbackInterface> callback) { return this->Delete(make_shared<string>(key), make_shared<string>(version), mode, callback); Loading Loading @@ -597,5 +604,20 @@ bool NonblockingKineticConnection::RemoveHandler(HandlerKey handler_key) { return service_->Remove(handler_key); } Message_Synchronization NonblockingKineticConnection::GetSynchronizationForPersistMode(PersistMode persistMode) { Message_Synchronization sync_option; switch (persistMode) { case PersistMode::WRITE_BACK: sync_option = Message_Synchronization_WRITEBACK; break; case PersistMode::WRITE_THROUGH: sync_option = Message_Synchronization_WRITETHROUGH; break; case PersistMode::FLUSH: sync_option = Message_Synchronization_FLUSH; break; } return sync_option; } } // namespace kinetic src/test/nonblocking_kinetic_connection_test.cc +1 −0 Original line number Diff line number Diff line Loading @@ -168,6 +168,7 @@ TEST_F(NonblockingKineticConnectionTest, DeleteWorks) { ASSERT_EQ("key", message.command().body().keyvalue().key()); ASSERT_EQ("version", message.command().body().keyvalue().dbversion()); ASSERT_TRUE(message.command().body().keyvalue().force()); ASSERT_EQ(Message_Synchronization_WRITEBACK, message.command().body().keyvalue().synchronization()); } TEST_F(NonblockingKineticConnectionTest, PutWorks) { Loading Loading
include/kinetic/nonblocking_kinetic_connection.h +9 −0 Original line number Diff line number Diff line Loading @@ -43,6 +43,8 @@ using com::seagate::kinetic::client::proto::Message_MessageType; using com::seagate::kinetic::client::proto::Message_P2POperation; using com::seagate::kinetic::client::proto::Message_Synchronization; using std::shared_ptr; using std::unique_ptr; using std::string; Loading Loading @@ -271,6 +273,11 @@ class NonblockingKineticConnection { const shared_ptr<const KineticRecord> record, const shared_ptr<PutCallbackInterface> callback, PersistMode persistMode); virtual HandlerKey Delete(const shared_ptr<const string> key, const shared_ptr<const string> version, WriteMode mode, const shared_ptr<SimpleCallbackInterface> callback, PersistMode persistMode); virtual HandlerKey Delete(const string key, const string version, WriteMode mode, const shared_ptr<SimpleCallbackInterface> callback, PersistMode persistMode); virtual HandlerKey Delete(const shared_ptr<const string> key, const shared_ptr<const string> version, WriteMode mode, const shared_ptr<SimpleCallbackInterface> callback); Loading Loading @@ -305,9 +312,11 @@ class NonblockingKineticConnection { void PopulateP2PMessage(Message_P2POperation *mutable_p2pop, const shared_ptr<const P2PPushRequest> push_request); unique_ptr<Message> NewMessage(Message_MessageType message_type); Message_Synchronization GetSynchronizationForPersistMode(PersistMode persistMode); NonblockingPacketServiceInterface *service_; const shared_ptr<const string> empty_str_; int64_t cluster_version_; DISALLOW_COPY_AND_ASSIGN(NonblockingKineticConnection); Loading
src/main/nonblocking_kinetic_connection.cc +37 −15 Original line number Diff line number Diff line Loading @@ -353,20 +353,8 @@ HandlerKey NonblockingKineticConnection::Put(const shared_ptr<const string> key, request->mutable_command()->mutable_body()->mutable_keyvalue()->set_algorithm( record->algorithm()); Message_Synchronization sync_option; switch (persistMode) { case PersistMode::WRITE_BACK: sync_option = Message_Synchronization_WRITEBACK; break; case PersistMode::WRITE_THROUGH: sync_option = Message_Synchronization_WRITETHROUGH; break; case PersistMode::FLUSH: sync_option = Message_Synchronization_FLUSH; break; } request->mutable_command()->mutable_body()->mutable_keyvalue()->set_synchronization(sync_option); request->mutable_command()->mutable_body()->mutable_keyvalue()->set_synchronization( this->GetSynchronizationForPersistMode(persistMode)); return service_->Submit(move(request), record->value(), move(handler)); } Loading Loading @@ -400,7 +388,8 @@ HandlerKey NonblockingKineticConnection::Put(const string key, HandlerKey NonblockingKineticConnection::Delete(const shared_ptr<const string> key, const shared_ptr<const string> version, WriteMode mode, const shared_ptr<SimpleCallbackInterface> callback) { const shared_ptr<SimpleCallbackInterface> callback, PersistMode persistMode) { unique_ptr<SimpleHandler> handler(new SimpleHandler(callback)); unique_ptr<Message> request = NewMessage(Message_MessageType_DELETE); Loading @@ -409,9 +398,27 @@ HandlerKey NonblockingKineticConnection::Delete(const shared_ptr<const string> k // TODO(marshall) handle null version request->mutable_command()->mutable_body()->mutable_keyvalue()->set_dbversion(*version); request->mutable_command()->mutable_body()->mutable_keyvalue()->set_force(force); request->mutable_command()->mutable_body()->mutable_keyvalue()->set_synchronization( this->GetSynchronizationForPersistMode(persistMode)); return service_->Submit(move(request), empty_str_, move(handler)); } HandlerKey NonblockingKineticConnection::Delete(const string key, const string version, WriteMode mode, const shared_ptr<SimpleCallbackInterface> callback, PersistMode persistMode) { return this->Delete(make_shared<string>(key), make_shared<string>(version), mode, callback, persistMode); } HandlerKey NonblockingKineticConnection::Delete(const shared_ptr<const string> key, const shared_ptr<const string> version, WriteMode mode, const shared_ptr<SimpleCallbackInterface> callback) { // Default to the WRITE_BACK case, which performs better but does // not guarantee immediate persistence return this->Delete(key, version, mode, callback, PersistMode::WRITE_BACK); } HandlerKey NonblockingKineticConnection::Delete(const string key, const string version, WriteMode mode, const shared_ptr<SimpleCallbackInterface> callback) { return this->Delete(make_shared<string>(key), make_shared<string>(version), mode, callback); Loading Loading @@ -597,5 +604,20 @@ bool NonblockingKineticConnection::RemoveHandler(HandlerKey handler_key) { return service_->Remove(handler_key); } Message_Synchronization NonblockingKineticConnection::GetSynchronizationForPersistMode(PersistMode persistMode) { Message_Synchronization sync_option; switch (persistMode) { case PersistMode::WRITE_BACK: sync_option = Message_Synchronization_WRITEBACK; break; case PersistMode::WRITE_THROUGH: sync_option = Message_Synchronization_WRITETHROUGH; break; case PersistMode::FLUSH: sync_option = Message_Synchronization_FLUSH; break; } return sync_option; } } // namespace kinetic
src/test/nonblocking_kinetic_connection_test.cc +1 −0 Original line number Diff line number Diff line Loading @@ -168,6 +168,7 @@ TEST_F(NonblockingKineticConnectionTest, DeleteWorks) { ASSERT_EQ("key", message.command().body().keyvalue().key()); ASSERT_EQ("version", message.command().body().keyvalue().dbversion()); ASSERT_TRUE(message.command().body().keyvalue().force()); ASSERT_EQ(Message_Synchronization_WRITEBACK, message.command().body().keyvalue().synchronization()); } TEST_F(NonblockingKineticConnectionTest, PutWorks) { Loading