Commit a5e2c99a authored by Andrew Mitchell's avatar Andrew Mitchell
Browse files

Updated nonblocking kinetic connection to support PersistMode

parent c2292daf
Loading
Loading
Loading
Loading
+10 −0
Original line number Diff line number Diff line
@@ -33,6 +33,16 @@ enum class WriteMode {
    REQUIRE_SAME_VERSION
};

enum class PersistMode {
    /// This request is made persistent before returning
    WRITE_THROUGH,
    /// This request can be made persistent when the device chooses, or when a subsequent
    /// request with PersistMode FLUSH is issued on this connection
    WRITE_BACK,
    /// All pending information that has not been persisted will be before returning
    FLUSH
};

} // namespace kinetic

#endif  // KINETIC_CPP_CLIENT_KINETIC_CONNECTION_H_
+10 −0
Original line number Diff line number Diff line
@@ -261,6 +261,16 @@ class NonblockingKineticConnection {
        const string current_version, WriteMode mode,
        const shared_ptr<const KineticRecord> record,
        const shared_ptr<PutCallbackInterface> callback);
    virtual HandlerKey Put(const shared_ptr<const string> key,
        const shared_ptr<const string> current_version, WriteMode mode,
        const shared_ptr<const KineticRecord> record,
        const shared_ptr<PutCallbackInterface> callback,
        PersistMode persistMode);
    virtual HandlerKey Put(const string key,
        const string current_version, WriteMode mode,
        const shared_ptr<const KineticRecord> record,
        const shared_ptr<PutCallbackInterface> callback,
        PersistMode persistMode);
    virtual HandlerKey Delete(const shared_ptr<const string> key,
            const shared_ptr<const string> version, WriteMode mode,
            const shared_ptr<SimpleCallbackInterface> callback);
+41 −1
Original line number Diff line number Diff line
@@ -53,6 +53,10 @@ using com::seagate::kinetic::client::proto::Message_Security_ACL_Scope;
using com::seagate::kinetic::client::proto::Message_Security_ACL_HMACAlgorithm_HmacSHA1;
using com::seagate::kinetic::client::proto::Message_Status;
using com::seagate::kinetic::client::proto::Message_P2POperation;
using com::seagate::kinetic::client::proto::Message_Synchronization;
using Message_Synchronization::Message_Synchronization_FLUSH;
using Message_Synchronization::Message_Synchronization_WRITEBACK;
using Message_Synchronization::Message_Synchronization_WRITETHROUGH;

using std::shared_ptr;
using std::string;
@@ -329,7 +333,8 @@ HandlerKey NonblockingKineticConnection::GetKeyRange(const string start_key,
HandlerKey NonblockingKineticConnection::Put(const shared_ptr<const string> key,
    const shared_ptr<const string> current_version, WriteMode mode,
    const shared_ptr<const KineticRecord> record,
    const shared_ptr<PutCallbackInterface> callback) {
    const shared_ptr<PutCallbackInterface> callback,
    PersistMode persistMode) {
    unique_ptr<PutHandler> handler(new PutHandler(callback));
    unique_ptr<Message> request = NewMessage(Message_MessageType_PUT);

@@ -347,9 +352,44 @@ HandlerKey NonblockingKineticConnection::Put(const shared_ptr<const string> key,
    request->mutable_command()->mutable_body()->mutable_keyvalue()->set_tag(*(record->tag()));
    request->mutable_command()->mutable_body()->mutable_keyvalue()->set_algorithm(
            record->algorithm());


    Message_Synchronization sync_option;
    switch (persistMode) {
        case PersistMode::WRITE_BACK:
            sync_option = Message_Synchronization_WRITEBACK;
            break;
        case PersistMode::WRITE_THROUGH:
            sync_option = Message_Synchronization_WRITETHROUGH;
            break;
        case PersistMode::FLUSH:
            sync_option = Message_Synchronization_FLUSH;
            break;
    }
    request->mutable_command()->mutable_body()->mutable_keyvalue()->set_synchronization(sync_option);

    return service_->Submit(move(request), record->value(), move(handler));
}

HandlerKey NonblockingKineticConnection::Put(const string key,
    const string current_version, WriteMode mode,
    const shared_ptr<const KineticRecord> record,
    const shared_ptr<PutCallbackInterface> callback,
    PersistMode persistMode) {
    return this->Put(make_shared<string>(key), make_shared<string>(current_version), mode, record,
        callback);
}


HandlerKey NonblockingKineticConnection::Put(const shared_ptr<const string> key,
        const shared_ptr<const string> current_version, WriteMode mode,
        const shared_ptr<const KineticRecord> record,
        const shared_ptr<PutCallbackInterface> callback) {
    // Default to the WRITE_BACK case, which performs better but does
    // not guarantee immediate persistence
    return this->Put(key, current_version, mode, record, callback, PersistMode::WRITE_BACK);
}

HandlerKey NonblockingKineticConnection::Put(const string key,
    const string current_version, WriteMode mode,
    const shared_ptr<const KineticRecord> record,
+5 −0
Original line number Diff line number Diff line
@@ -63,6 +63,10 @@ using com::seagate::kinetic::client::proto::Message_Security_ACL_Permission_SECU
using com::seagate::kinetic::client::proto::Message_Security_ACL_HMACAlgorithm_HmacSHA1;
using com::seagate::kinetic::client::proto::Message_Status;
using com::seagate::kinetic::client::proto::Message_P2POperation;
using com::seagate::kinetic::client::proto::Message_Synchronization;
using Message_Synchronization::Message_Synchronization_FLUSH;
using Message_Synchronization::Message_Synchronization_WRITEBACK;
using Message_Synchronization::Message_Synchronization_WRITETHROUGH;

using ::testing::_;
using ::testing::DoAll;
@@ -182,6 +186,7 @@ TEST_F(NonblockingKineticConnectionTest, PutWorks) {
    ASSERT_EQ("new_version", message.command().body().keyvalue().newversion());
    ASSERT_EQ("tag", message.command().body().keyvalue().tag());
    ASSERT_EQ(Message_Algorithm_SHA1, message.command().body().keyvalue().algorithm());
    ASSERT_EQ(Message_Synchronization_WRITEBACK, message.command().body().keyvalue().synchronization());
}

TEST_F(NonblockingKineticConnectionTest, GetKeyRangeWorks) {