Loading include/kinetic/nonblocking_kinetic_connection.h +6 −6 Original line number Diff line number Diff line Loading @@ -198,6 +198,12 @@ struct P2PPushOperation { /// Ignore version on the remote drive. The same as specifying force with a regular put. bool force; /// P2P operations to execute on the drive specified in the request. /// This lets you set up a pipeline of P2P push. For example, a client can push /// a set of keys to A, and in the same request instruct A to push /// keys to B, and so on. shared_ptr<P2PPushRequest> request; }; /// Represents a collection of P2P operations Loading @@ -210,12 +216,6 @@ struct P2PPushRequest { /// Operations to perform against the drive specified above vector<P2PPushOperation> operations; /// P2P operations to execute on the drive specified above. This lets /// you set up a pipeline of P2P push. For example, a client can push /// a set of keys to A, and in the same request instruct A to push /// keys to B, and so on. vector<P2PPushRequest> requests; }; class NonblockingKineticConnection { Loading src/main/nonblocking_kinetic_connection.cc +4 −5 Original line number Diff line number Diff line Loading @@ -578,12 +578,11 @@ void NonblockingKineticConnection::PopulateP2PMessage( op->set_newkey(it->newKey); } op->set_force(it->force); } for (auto it = push_request->requests.begin(); it != push_request->requests.end(); ++it) { auto req = mutable_p2pop->add_operation()->mutable_p2pop(); PopulateP2PMessage(req, make_shared<P2PPushRequest>(*it)); if (it->request != nullptr) { auto req = op->mutable_p2pop(); PopulateP2PMessage(req, it->request); } } } Loading src/test/nonblocking_kinetic_connection_test.cc +36 −2 Original line number Diff line number Diff line Loading @@ -493,12 +493,31 @@ TEST_F(NonblockingKineticConnectionTest, P2PPushBuildsCorrectMessage) { EXPECT_CALL(*packet_service_, Submit_(_, StringSharedPtrEq(""), _)).WillOnce( DoAll(SaveArg<0>(&message), Return(0))); auto double_nested_request = make_shared<P2PPushRequest>(); double_nested_request->host = "baz.tld"; double_nested_request->port = 1236; P2PPushOperation double_nested_op; double_nested_op.key = "double_nested_key1"; double_nested_request->operations.push_back(double_nested_op); auto nested_request = make_shared<P2PPushRequest>(); nested_request->host = "bar.tld"; nested_request->port = 1235; P2PPushOperation nested_op; nested_op.key = "nested_key1"; nested_op.request = double_nested_request; nested_request->operations.push_back(nested_op); P2PPushRequest request; request.host = "foo.tld"; request.port = 1234; P2PPushOperation op1; op1.key = "key1"; op1.request = nested_request; request.operations.push_back(op1); P2PPushOperation op2; Loading @@ -519,9 +538,24 @@ TEST_F(NonblockingKineticConnectionTest, P2PPushBuildsCorrectMessage) { EXPECT_EQ(1234, message.command().body().p2poperation().peer().port()); EXPECT_FALSE(message.command().body().p2poperation().peer().tls()); auto operation0 = message.command().body().p2poperation().operation(0); ASSERT_EQ(3, message.command().body().p2poperation().operation_size()); EXPECT_EQ("key1", message.command().body().p2poperation().operation(0).key()); EXPECT_FALSE(message.command().body().p2poperation().operation(0).has_newkey()); EXPECT_EQ("key1", operation0.key()); EXPECT_FALSE(operation0.has_newkey()); EXPECT_EQ("bar.tld", operation0.p2pop().peer().hostname()); EXPECT_EQ(1235, operation0.p2pop().peer().port()); ASSERT_EQ(1, operation0.p2pop().operation_size()); auto operation0_nested = operation0.p2pop().operation(0); EXPECT_EQ("nested_key1", operation0_nested.key()); EXPECT_EQ("baz.tld", operation0_nested.p2pop().peer().hostname()); EXPECT_EQ(1236, operation0_nested.p2pop().peer().port()); ASSERT_EQ(1, operation0_nested.p2pop().operation_size()); auto operation0_double_nested = operation0_nested.p2pop().operation(0); EXPECT_EQ("double_nested_key1", operation0_double_nested.key()); EXPECT_EQ("key2", message.command().body().p2poperation().operation(1).key()); EXPECT_FALSE(message.command().body().p2poperation().operation(1).has_newkey()); Loading Loading
include/kinetic/nonblocking_kinetic_connection.h +6 −6 Original line number Diff line number Diff line Loading @@ -198,6 +198,12 @@ struct P2PPushOperation { /// Ignore version on the remote drive. The same as specifying force with a regular put. bool force; /// P2P operations to execute on the drive specified in the request. /// This lets you set up a pipeline of P2P push. For example, a client can push /// a set of keys to A, and in the same request instruct A to push /// keys to B, and so on. shared_ptr<P2PPushRequest> request; }; /// Represents a collection of P2P operations Loading @@ -210,12 +216,6 @@ struct P2PPushRequest { /// Operations to perform against the drive specified above vector<P2PPushOperation> operations; /// P2P operations to execute on the drive specified above. This lets /// you set up a pipeline of P2P push. For example, a client can push /// a set of keys to A, and in the same request instruct A to push /// keys to B, and so on. vector<P2PPushRequest> requests; }; class NonblockingKineticConnection { Loading
src/main/nonblocking_kinetic_connection.cc +4 −5 Original line number Diff line number Diff line Loading @@ -578,12 +578,11 @@ void NonblockingKineticConnection::PopulateP2PMessage( op->set_newkey(it->newKey); } op->set_force(it->force); } for (auto it = push_request->requests.begin(); it != push_request->requests.end(); ++it) { auto req = mutable_p2pop->add_operation()->mutable_p2pop(); PopulateP2PMessage(req, make_shared<P2PPushRequest>(*it)); if (it->request != nullptr) { auto req = op->mutable_p2pop(); PopulateP2PMessage(req, it->request); } } } Loading
src/test/nonblocking_kinetic_connection_test.cc +36 −2 Original line number Diff line number Diff line Loading @@ -493,12 +493,31 @@ TEST_F(NonblockingKineticConnectionTest, P2PPushBuildsCorrectMessage) { EXPECT_CALL(*packet_service_, Submit_(_, StringSharedPtrEq(""), _)).WillOnce( DoAll(SaveArg<0>(&message), Return(0))); auto double_nested_request = make_shared<P2PPushRequest>(); double_nested_request->host = "baz.tld"; double_nested_request->port = 1236; P2PPushOperation double_nested_op; double_nested_op.key = "double_nested_key1"; double_nested_request->operations.push_back(double_nested_op); auto nested_request = make_shared<P2PPushRequest>(); nested_request->host = "bar.tld"; nested_request->port = 1235; P2PPushOperation nested_op; nested_op.key = "nested_key1"; nested_op.request = double_nested_request; nested_request->operations.push_back(nested_op); P2PPushRequest request; request.host = "foo.tld"; request.port = 1234; P2PPushOperation op1; op1.key = "key1"; op1.request = nested_request; request.operations.push_back(op1); P2PPushOperation op2; Loading @@ -519,9 +538,24 @@ TEST_F(NonblockingKineticConnectionTest, P2PPushBuildsCorrectMessage) { EXPECT_EQ(1234, message.command().body().p2poperation().peer().port()); EXPECT_FALSE(message.command().body().p2poperation().peer().tls()); auto operation0 = message.command().body().p2poperation().operation(0); ASSERT_EQ(3, message.command().body().p2poperation().operation_size()); EXPECT_EQ("key1", message.command().body().p2poperation().operation(0).key()); EXPECT_FALSE(message.command().body().p2poperation().operation(0).has_newkey()); EXPECT_EQ("key1", operation0.key()); EXPECT_FALSE(operation0.has_newkey()); EXPECT_EQ("bar.tld", operation0.p2pop().peer().hostname()); EXPECT_EQ(1235, operation0.p2pop().peer().port()); ASSERT_EQ(1, operation0.p2pop().operation_size()); auto operation0_nested = operation0.p2pop().operation(0); EXPECT_EQ("nested_key1", operation0_nested.key()); EXPECT_EQ("baz.tld", operation0_nested.p2pop().peer().hostname()); EXPECT_EQ(1236, operation0_nested.p2pop().peer().port()); ASSERT_EQ(1, operation0_nested.p2pop().operation_size()); auto operation0_double_nested = operation0_nested.p2pop().operation(0); EXPECT_EQ("double_nested_key1", operation0_double_nested.key()); EXPECT_EQ("key2", message.command().body().p2poperation().operation(1).key()); EXPECT_FALSE(message.command().body().p2poperation().operation(1).has_newkey()); Loading