Commit ce207f2c authored by Paul Lensing's avatar Paul Lensing
Browse files

Basic 3.0.0 kinetic-protocol support. Note that 3.0.0 proto file has to

be supplied manually for the build process to succeed. 
parent c3d8a644
Loading
Loading
Loading
Loading
+5 −5
Original line number Diff line number Diff line
@@ -27,7 +27,7 @@

namespace kinetic {

using com::seagate::kinetic::client::proto::Message_Algorithm;
using com::seagate::kinetic::client::proto::Command_Algorithm;
using std::shared_ptr;
using std::string;
using std::make_shared;
@@ -38,12 +38,12 @@ using std::make_shared;
class KineticRecord {
    public:
    KineticRecord(const shared_ptr<const string> value, const shared_ptr<const string> version,
            const shared_ptr<const string> tag, Message_Algorithm algorithm) :
            const shared_ptr<const string> tag, Command_Algorithm algorithm) :
            value_(value), version_(version), tag_(tag), algorithm_(
                    algorithm) {
    }
    KineticRecord(const string value, const string version, const string tag,
        Message_Algorithm algorithm) :
            Command_Algorithm algorithm) :
        value_(make_shared<string>(value)), version_(make_shared<string>(version)),
        tag_(make_shared<string>(tag)), algorithm_(algorithm) {
    }
@@ -68,7 +68,7 @@ class KineticRecord {
    }

    /// The algorithm used to generate the tag
    Message_Algorithm algorithm() const {
    Command_Algorithm algorithm() const {
        return algorithm_;
    }

@@ -76,7 +76,7 @@ class KineticRecord {
    const shared_ptr<const string> value_;
    const shared_ptr<const string> version_;
    const shared_ptr<const string> tag_;
    const Message_Algorithm algorithm_;
    const Command_Algorithm algorithm_;
    // disallow operator=
    void operator=(const KineticRecord&);
};
+24 −25
Original line number Diff line number Diff line
@@ -39,11 +39,10 @@

namespace kinetic {

using com::seagate::kinetic::client::proto::Message_MessageType;

using com::seagate::kinetic::client::proto::Message_P2POperation;

using com::seagate::kinetic::client::proto::Message_Synchronization;
using com::seagate::kinetic::client::proto::Command;
using com::seagate::kinetic::client::proto::Command_MessageType;
using com::seagate::kinetic::client::proto::Command_P2POperation;
using com::seagate::kinetic::client::proto::Command_Synchronization;

using std::shared_ptr;
using std::unique_ptr;
@@ -64,8 +63,8 @@ class SimpleCallbackInterface {
class SimpleHandler : public HandlerInterface {
    public:
    explicit SimpleHandler(const shared_ptr<SimpleCallbackInterface> callback);
    void Handle(const Message &response, unique_ptr<const string> value);
    void Error(KineticStatus error, Message const * const response);
    void Handle(const Command &response, unique_ptr<const string> value);
    void Error(KineticStatus error, Command const * const response);

    private:
    const shared_ptr<SimpleCallbackInterface> callback_;
@@ -82,8 +81,8 @@ class GetCallbackInterface {
class GetHandler : public HandlerInterface {
    public:
    explicit GetHandler(const shared_ptr<GetCallbackInterface> callback);
    void Handle(const Message &response, unique_ptr<const string> value);
    void Error(KineticStatus error, Message const * const response);
    void Handle(const Command &response, unique_ptr<const string> value);
    void Error(KineticStatus error, Command const * const response);

    private:
    const shared_ptr<GetCallbackInterface> callback_;
@@ -100,8 +99,8 @@ class GetVersionCallbackInterface {
class GetVersionHandler : public HandlerInterface {
    public:
    explicit GetVersionHandler(const shared_ptr<GetVersionCallbackInterface> callback);
    void Handle(const Message &response, unique_ptr<const string> value);
    void Error(KineticStatus error, Message const * const response);
    void Handle(const Command &response, unique_ptr<const string> value);
    void Error(KineticStatus error, Command const * const response);

    private:
    const shared_ptr<GetVersionCallbackInterface> callback_;
@@ -118,8 +117,8 @@ class GetKeyRangeCallbackInterface {
class GetKeyRangeHandler : public HandlerInterface {
    public:
    explicit GetKeyRangeHandler(const shared_ptr<GetKeyRangeCallbackInterface> callback);
    void Handle(const Message &response, unique_ptr<const string> value);
    void Error(KineticStatus error, Message const * const response);
    void Handle(const Command &response, unique_ptr<const string> value);
    void Error(KineticStatus error, Command const * const response);

    private:
    const shared_ptr<GetKeyRangeCallbackInterface>  callback_;
@@ -137,8 +136,8 @@ class PutCallbackInterface {
class PutHandler : public HandlerInterface {
    public:
    explicit PutHandler(const shared_ptr<PutCallbackInterface> callback);
    void Handle(const Message &response, unique_ptr<const string> value);
    void Error(KineticStatus error, Message const * const response);
    void Handle(const Command &response, unique_ptr<const string> value);
    void Error(KineticStatus error, Command const * const response);

    private:
    const shared_ptr<PutCallbackInterface> callback_;
@@ -155,8 +154,8 @@ class GetLogCallbackInterface {
class GetLogHandler : public HandlerInterface {
    public:
    explicit GetLogHandler(const shared_ptr<GetLogCallbackInterface> callback);
    void Handle(const Message& response, unique_ptr<const string> value);
    void Error(KineticStatus error, Message const * const response);
    void Handle(const Command& response, unique_ptr<const string> value);
    void Error(KineticStatus error, Command const * const response);

    private:
    const shared_ptr<GetLogCallbackInterface> callback_;
@@ -166,15 +165,15 @@ class GetLogHandler : public HandlerInterface {
class P2PPushCallbackInterface {
    public:
    virtual ~P2PPushCallbackInterface() {}
    virtual void Success(unique_ptr<vector<KineticStatus>> operation_statuses, const Message& response) = 0;
    virtual void Failure(KineticStatus error, Message const * const response) = 0;
    virtual void Success(unique_ptr<vector<KineticStatus>> operation_statuses, const Command& response) = 0;
    virtual void Failure(KineticStatus error, Command const * const response) = 0;
};

class P2PPushHandler : public HandlerInterface {
    public:
    explicit P2PPushHandler(const shared_ptr<P2PPushCallbackInterface> callback);
    void Handle(const Message& response, unique_ptr<const string> value);
    void Error(KineticStatus error, Message const * const response);
    void Handle(const Command& response, unique_ptr<const string> value);
    void Error(KineticStatus error, Command const * const response);

    private:
    const shared_ptr<P2PPushCallbackInterface> callback_;
@@ -308,11 +307,11 @@ class NonblockingKineticConnection {

    private:
    HandlerKey GenericGet(const shared_ptr<const string> key,
        const shared_ptr<GetCallbackInterface> callback, Message_MessageType message_type);
    void PopulateP2PMessage(Message_P2POperation *mutable_p2pop,
        const shared_ptr<GetCallbackInterface> callback, Command_MessageType message_type);
    void PopulateP2PMessage(Command_P2POperation *mutable_p2pop,
        const shared_ptr<const P2PPushRequest> push_request);
    unique_ptr<Message> NewMessage(Message_MessageType message_type);
    Message_Synchronization GetSynchronizationForPersistMode(PersistMode persistMode);
    unique_ptr<Command> NewCommand(Command_MessageType message_type);
    Command_Synchronization GetSynchronizationForPersistMode(PersistMode persistMode);

    NonblockingPacketServiceInterface *service_;
    const shared_ptr<const string> empty_str_;
+4 −3
Original line number Diff line number Diff line
@@ -30,6 +30,7 @@
namespace kinetic {

using com::seagate::kinetic::client::proto::Message;
using com::seagate::kinetic::client::proto::Command;

using std::shared_ptr;
using std::unique_ptr;
@@ -43,15 +44,15 @@ class HandlerInterface {
    virtual ~HandlerInterface() {}

    // response is re-used, so make sure to copy everything you need out of it
    virtual void Handle(const Message &response, unique_ptr<const string> value) = 0;
    virtual void Error(KineticStatus error, Message const * const response) = 0;
    virtual void Handle(const Command &response, unique_ptr<const string> value) = 0;
    virtual void Error(KineticStatus error, Command const * const response) = 0;
};

class NonblockingPacketServiceInterface {
    public:
    virtual ~NonblockingPacketServiceInterface() {}
    // message is modified in this call hierarchy
    virtual HandlerKey Submit(unique_ptr<Message> message, const shared_ptr<const string> value,
    virtual HandlerKey Submit(unique_ptr<Message> message, unique_ptr<Command> command, const shared_ptr<const string> value,
            unique_ptr<HandlerInterface> handler) = 0;
    virtual bool Run(fd_set *read_fds, fd_set *write_fds, int *nfds) = 0;
    virtual bool Remove(HandlerKey handler_key) = 0;
+3 −3
Original line number Diff line number Diff line
@@ -25,7 +25,7 @@

namespace kinetic {

using com::seagate::kinetic::client::proto::Message_Status_StatusCode;
using com::seagate::kinetic::client::proto::Command_Status_StatusCode;
using std::string;

enum class StatusCode {
@@ -54,8 +54,8 @@ enum class StatusCode {
    REMOTE_NESTED_OPERATION_ERRORS
};

StatusCode ConvertFromProtoStatus(Message_Status_StatusCode status);
Message_Status_StatusCode ConvertToProtoStatus(StatusCode status);
StatusCode ConvertFromProtoStatus(Command_Status_StatusCode status);
Command_Status_StatusCode ConvertToProtoStatus(StatusCode status);

} // namespace kinetic

+8 −8
Original line number Diff line number Diff line
@@ -29,24 +29,24 @@ using std::shared_ptr;
using std::make_shared;
using std::vector;

using com::seagate::kinetic::client::proto::Message_Algorithm_CRC32;
using com::seagate::kinetic::client::proto::Message_Algorithm_SHA1;
using com::seagate::kinetic::client::proto::Command_Algorithm_CRC32;
using com::seagate::kinetic::client::proto::Command_Algorithm_SHA1;

TEST_F(IntegrationTest, BlockingSmoketest) {
    ASSERT_TRUE(blocking_connection_->NoOp().ok());

    auto record1 = make_shared<KineticRecord>(make_shared<string>("value1"),
        make_shared<string>("v1"), make_shared<string>("t1"), Message_Algorithm_CRC32);
        make_shared<string>("v1"), make_shared<string>("t1"), Command_Algorithm_CRC32);
    KineticStatus kineticStatus = blocking_connection_->Put("key1", "", WriteMode::IGNORE_VERSION, *record1);
    ASSERT_TRUE(kineticStatus.ok());

    auto record2 = make_shared<KineticRecord>(make_shared<string>("value2"),
        make_shared<string>("v2"), make_shared<string>("t2"), Message_Algorithm_SHA1);
        make_shared<string>("v2"), make_shared<string>("t2"), Command_Algorithm_SHA1);
    ASSERT_TRUE(blocking_connection_->Put(make_shared<string>("key2"), make_shared<string>(""),
       WriteMode::IGNORE_VERSION, record2).ok());

    auto record3 = make_shared<KineticRecord>(make_shared<string>("value3"),
        make_shared<string>("v3"), make_shared<string>("t3"), Message_Algorithm_CRC32);
        make_shared<string>("v3"), make_shared<string>("t3"), Command_Algorithm_CRC32);
    ASSERT_TRUE(blocking_connection_->Put(make_shared<string>("key3"), make_shared<string>(""),
       WriteMode::IGNORE_VERSION, record3).ok());

@@ -63,7 +63,7 @@ TEST_F(IntegrationTest, BlockingSmoketest) {
    EXPECT_EQ("value2", *(result_record->value()));
    EXPECT_EQ("v2", *(result_record->version()));
    EXPECT_EQ("t2", *(result_record->tag()));
    EXPECT_EQ(Message_Algorithm_SHA1, result_record->algorithm());
    EXPECT_EQ(Command_Algorithm_SHA1, result_record->algorithm());

    unique_ptr<string> result_key(nullptr);
    ASSERT_TRUE(blocking_connection_->GetNext("key1", result_key, result_record).ok());
@@ -71,14 +71,14 @@ TEST_F(IntegrationTest, BlockingSmoketest) {
    EXPECT_EQ("value2", *(result_record->value()));
    EXPECT_EQ("v2", *(result_record->version()));
    EXPECT_EQ("t2", *(result_record->tag()));
    EXPECT_EQ(Message_Algorithm_SHA1, result_record->algorithm());
    EXPECT_EQ(Command_Algorithm_SHA1, result_record->algorithm());

    ASSERT_TRUE(blocking_connection_->GetPrevious("key3", result_key, result_record).ok());
    EXPECT_EQ("key2", *result_key);
    EXPECT_EQ("value2", *(result_record->value()));
    EXPECT_EQ("v2", *(result_record->version()));
    EXPECT_EQ("t2", *(result_record->tag()));
    EXPECT_EQ(Message_Algorithm_SHA1, result_record->algorithm());
    EXPECT_EQ(Command_Algorithm_SHA1, result_record->algorithm());

    unique_ptr<string> result_version(nullptr);
    ASSERT_TRUE(blocking_connection_->GetVersion("key3", result_version).ok());
Loading