Loading src/main/kinetic_connection_factory.cc +6 −2 Original line number Diff line number Diff line Loading @@ -121,8 +121,12 @@ Status KineticConnectionFactory::doNewConnection( return Status::makeInternalError("Connection error"); } auto receiver = shared_ptr<NonblockingReceiverInterface>(new NonblockingReceiver(socket_wrapper, hmac_provider_, options)); shared_ptr<NonblockingReceiverInterface> receiver; try{ receiver = shared_ptr<NonblockingReceiverInterface>(new NonblockingReceiver(socket_wrapper, hmac_provider_, options)); }catch(std::exception& e){ return Status::makeInternalError("Connection error:"+std::string(e.what())); } auto writer_factory = unique_ptr<NonblockingPacketWriterFactoryInterface>(new NonblockingPacketWriterFactory()); auto sender = unique_ptr<NonblockingSenderInterface>(new NonblockingSender(socket_wrapper, Loading src/main/nonblocking_packet_receiver.cc +40 −6 Original line number Diff line number Diff line Loading @@ -22,6 +22,7 @@ namespace kinetic { using com::seagate::kinetic::client::proto::Message_AuthType_UNSOLICITEDSTATUS; using com::seagate::kinetic::client::proto::Command_Status_StatusCode_SUCCESS; using std::string; using std::shared_ptr; Loading @@ -30,6 +31,7 @@ using std::move; using std::make_pair; KineticStatus GetKineticStatus(StatusCode code, int64_t expected_cluster_version) { switch (code) { case StatusCode::CLIENT_IO_ERROR: Loading Loading @@ -75,12 +77,43 @@ KineticStatus GetKineticStatus(StatusCode code, int64_t expected_cluster_version } } class HandshakeHandler : public HandlerInterface{ public: bool done; bool success; HandshakeHandler():done(false),success(false){} void Handle(const Command &response, unique_ptr<const string> value){ done = success = true; } void Error(KineticStatus error, Command const * const response){ done = true; } }; NonblockingReceiver::NonblockingReceiver(shared_ptr<SocketWrapperInterface> socket_wrapper, HmacProvider hmac_provider, const ConnectionOptions &connection_options) : socket_wrapper_(socket_wrapper), hmac_provider_(hmac_provider), connection_options_(connection_options), nonblocking_response_(NULL), connection_id_(time(NULL)), handler_(NULL) { CHECK_NE(-1, connection_id_); connection_id_(0), handler_(NULL) { shared_ptr<HandshakeHandler> hh = std::make_shared<HandshakeHandler>(); map_.insert(make_pair(-1,make_pair(hh,-1))); handler_to_message_seq_map_.insert(make_pair(-1, -1)); auto start = std::chrono::steady_clock::now(); while(true){ if(Receive() == kError) break; if(hh->done) break; auto now = std::chrono::steady_clock::now(); if(std::chrono::duration_cast<std::chrono::seconds>(now-start).count() > 30) break; } if(!hh->success) throw std::runtime_error("Could not complete handshake."); } NonblockingReceiver::~NonblockingReceiver() { Loading Loading @@ -138,22 +171,24 @@ NonblockingPacketServiceStatus NonblockingReceiver::Receive() { delete nonblocking_response_; nonblocking_response_ = NULL; if(message_.has_hmacauth()) if (!hmac_provider_.ValidateHmac(message_, connection_options_.hmac_key)) { LOG(INFO) << "Response HMAC mismatch"; CallAllErrorHandlers(KineticStatus(StatusCode::CLIENT_RESPONSE_HMAC_VERIFICATION_ERROR, "Response HMAC mismatch")); return kIdle; } if(!command_.ParseFromString(message_.commandbytes())){ CallAllErrorHandlers(KineticStatus(StatusCode::CLIENT_IO_ERROR, "I/O read error parsing proto::Command")); return kError; } if (command_.header().has_connectionid()) { connection_id_ = command_.header().connectionid(); } if(message_.authtype() == Message_AuthType_UNSOLICITEDSTATUS) command_.mutable_header()->set_acksequence(-1); if (!command_.header().has_acksequence()) { LOG(INFO) << "Got response without an acksequence"; CallAllErrorHandlers(KineticStatus(StatusCode::PROTOCOL_ERROR_RESPONSE_NO_ACKSEQUENCE, Loading @@ -165,7 +200,6 @@ NonblockingPacketServiceStatus NonblockingReceiver::Receive() { if (find_result == map_.end()) { LOG(WARNING) << "Couldn't find a handler for acksequence " << command_.header().acksequence(); continue; } auto handler_pair = find_result->second; Loading Loading
src/main/kinetic_connection_factory.cc +6 −2 Original line number Diff line number Diff line Loading @@ -121,8 +121,12 @@ Status KineticConnectionFactory::doNewConnection( return Status::makeInternalError("Connection error"); } auto receiver = shared_ptr<NonblockingReceiverInterface>(new NonblockingReceiver(socket_wrapper, hmac_provider_, options)); shared_ptr<NonblockingReceiverInterface> receiver; try{ receiver = shared_ptr<NonblockingReceiverInterface>(new NonblockingReceiver(socket_wrapper, hmac_provider_, options)); }catch(std::exception& e){ return Status::makeInternalError("Connection error:"+std::string(e.what())); } auto writer_factory = unique_ptr<NonblockingPacketWriterFactoryInterface>(new NonblockingPacketWriterFactory()); auto sender = unique_ptr<NonblockingSenderInterface>(new NonblockingSender(socket_wrapper, Loading
src/main/nonblocking_packet_receiver.cc +40 −6 Original line number Diff line number Diff line Loading @@ -22,6 +22,7 @@ namespace kinetic { using com::seagate::kinetic::client::proto::Message_AuthType_UNSOLICITEDSTATUS; using com::seagate::kinetic::client::proto::Command_Status_StatusCode_SUCCESS; using std::string; using std::shared_ptr; Loading @@ -30,6 +31,7 @@ using std::move; using std::make_pair; KineticStatus GetKineticStatus(StatusCode code, int64_t expected_cluster_version) { switch (code) { case StatusCode::CLIENT_IO_ERROR: Loading Loading @@ -75,12 +77,43 @@ KineticStatus GetKineticStatus(StatusCode code, int64_t expected_cluster_version } } class HandshakeHandler : public HandlerInterface{ public: bool done; bool success; HandshakeHandler():done(false),success(false){} void Handle(const Command &response, unique_ptr<const string> value){ done = success = true; } void Error(KineticStatus error, Command const * const response){ done = true; } }; NonblockingReceiver::NonblockingReceiver(shared_ptr<SocketWrapperInterface> socket_wrapper, HmacProvider hmac_provider, const ConnectionOptions &connection_options) : socket_wrapper_(socket_wrapper), hmac_provider_(hmac_provider), connection_options_(connection_options), nonblocking_response_(NULL), connection_id_(time(NULL)), handler_(NULL) { CHECK_NE(-1, connection_id_); connection_id_(0), handler_(NULL) { shared_ptr<HandshakeHandler> hh = std::make_shared<HandshakeHandler>(); map_.insert(make_pair(-1,make_pair(hh,-1))); handler_to_message_seq_map_.insert(make_pair(-1, -1)); auto start = std::chrono::steady_clock::now(); while(true){ if(Receive() == kError) break; if(hh->done) break; auto now = std::chrono::steady_clock::now(); if(std::chrono::duration_cast<std::chrono::seconds>(now-start).count() > 30) break; } if(!hh->success) throw std::runtime_error("Could not complete handshake."); } NonblockingReceiver::~NonblockingReceiver() { Loading Loading @@ -138,22 +171,24 @@ NonblockingPacketServiceStatus NonblockingReceiver::Receive() { delete nonblocking_response_; nonblocking_response_ = NULL; if(message_.has_hmacauth()) if (!hmac_provider_.ValidateHmac(message_, connection_options_.hmac_key)) { LOG(INFO) << "Response HMAC mismatch"; CallAllErrorHandlers(KineticStatus(StatusCode::CLIENT_RESPONSE_HMAC_VERIFICATION_ERROR, "Response HMAC mismatch")); return kIdle; } if(!command_.ParseFromString(message_.commandbytes())){ CallAllErrorHandlers(KineticStatus(StatusCode::CLIENT_IO_ERROR, "I/O read error parsing proto::Command")); return kError; } if (command_.header().has_connectionid()) { connection_id_ = command_.header().connectionid(); } if(message_.authtype() == Message_AuthType_UNSOLICITEDSTATUS) command_.mutable_header()->set_acksequence(-1); if (!command_.header().has_acksequence()) { LOG(INFO) << "Got response without an acksequence"; CallAllErrorHandlers(KineticStatus(StatusCode::PROTOCOL_ERROR_RESPONSE_NO_ACKSEQUENCE, Loading @@ -165,7 +200,6 @@ NonblockingPacketServiceStatus NonblockingReceiver::Receive() { if (find_result == map_.end()) { LOG(WARNING) << "Couldn't find a handler for acksequence " << command_.header().acksequence(); continue; } auto handler_pair = find_result->second; Loading