Loading include/kinetic/nonblocking_kinetic_connection.h +7 −7 Original line number Diff line number Diff line Loading @@ -65,7 +65,7 @@ class SimpleHandler : public HandlerInterface { public: explicit SimpleHandler(const shared_ptr<SimpleCallbackInterface> callback); void Handle(const Message &response, unique_ptr<const string> value); void Error(KineticStatus error, Message const * const response = nullptr); void Error(KineticStatus error, Message const * const response); private: const shared_ptr<SimpleCallbackInterface> callback_; Loading @@ -83,7 +83,7 @@ class GetHandler : public HandlerInterface { public: explicit GetHandler(const shared_ptr<GetCallbackInterface> callback); void Handle(const Message &response, unique_ptr<const string> value); void Error(KineticStatus error, Message const * const response = nullptr); void Error(KineticStatus error, Message const * const response); private: const shared_ptr<GetCallbackInterface> callback_; Loading @@ -101,7 +101,7 @@ class GetVersionHandler : public HandlerInterface { public: explicit GetVersionHandler(const shared_ptr<GetVersionCallbackInterface> callback); void Handle(const Message &response, unique_ptr<const string> value); void Error(KineticStatus error, Message const * const response = nullptr); void Error(KineticStatus error, Message const * const response); private: const shared_ptr<GetVersionCallbackInterface> callback_; Loading @@ -119,7 +119,7 @@ class GetKeyRangeHandler : public HandlerInterface { public: explicit GetKeyRangeHandler(const shared_ptr<GetKeyRangeCallbackInterface> callback); void Handle(const Message &response, unique_ptr<const string> value); void Error(KineticStatus error, Message const * const response = nullptr); void Error(KineticStatus error, Message const * const response); private: const shared_ptr<GetKeyRangeCallbackInterface> callback_; Loading @@ -138,7 +138,7 @@ class PutHandler : public HandlerInterface { public: explicit PutHandler(const shared_ptr<PutCallbackInterface> callback); void Handle(const Message &response, unique_ptr<const string> value); void Error(KineticStatus error, Message const * const response = nullptr); void Error(KineticStatus error, Message const * const response); private: const shared_ptr<PutCallbackInterface> callback_; Loading @@ -156,7 +156,7 @@ class GetLogHandler : public HandlerInterface { public: explicit GetLogHandler(const shared_ptr<GetLogCallbackInterface> callback); void Handle(const Message& response, unique_ptr<const string> value); void Error(KineticStatus error, Message const * const response = nullptr); void Error(KineticStatus error, Message const * const response); private: const shared_ptr<GetLogCallbackInterface> callback_; Loading @@ -174,7 +174,7 @@ class P2PPushHandler : public HandlerInterface { public: explicit P2PPushHandler(const shared_ptr<P2PPushCallbackInterface> callback); void Handle(const Message& response, unique_ptr<const string> value); void Error(KineticStatus error, Message const * const response = nullptr); void Error(KineticStatus error, Message const * const response); private: const shared_ptr<P2PPushCallbackInterface> callback_; Loading include/kinetic/nonblocking_packet_service_interface.h +1 −1 Original line number Diff line number Diff line Loading @@ -44,7 +44,7 @@ class HandlerInterface { // response is re-used, so make sure to copy everything you need out of it virtual void Handle(const Message &response, unique_ptr<const string> value) = 0; virtual void Error(KineticStatus error, Message const * const response = nullptr) = 0; virtual void Error(KineticStatus error, Message const * const response) = 0; }; class NonblockingPacketServiceInterface { Loading src/main/nonblocking_packet_receiver.cc +2 −2 Original line number Diff line number Diff line Loading @@ -184,7 +184,7 @@ int64_t NonblockingReceiver::connection_id() { void NonblockingReceiver::CallAllErrorHandlers(KineticStatus error) { if (handler_) { handler_->Error(error); handler_->Error(error, nullptr); handler_.reset(); } Loading @@ -197,7 +197,7 @@ void NonblockingReceiver::CallAllErrorHandlers(KineticStatus error) { CHECK_EQ((size_t) 1, handler_to_message_seq_map_.erase(handler_key)) << "Couldn't delete handler to sequence entry for handler_key " << handler_key; handler->Error(error); handler->Error(error, nullptr); handler.reset(); iter++; } Loading src/main/nonblocking_packet_sender.cc +6 −4 Original line number Diff line number Diff line Loading @@ -61,7 +61,8 @@ NonblockingSender::~NonblockingSender() { unique_ptr<Request> request = move(request_queue_.front()); request_queue_.pop_front(); request->handler->Error( KineticStatus(StatusCode::CLIENT_SHUTDOWN, "Sender shutdown")); KineticStatus(StatusCode::CLIENT_SHUTDOWN, "Sender shutdown"), nullptr); } } Loading Loading @@ -90,14 +91,15 @@ NonblockingPacketServiceStatus NonblockingSender::Send() { CHECK_EQ(kFailed, status); handler_->Error(KineticStatus(StatusCode::CLIENT_IO_ERROR, "I/O write error")); handler_->Error( KineticStatus(StatusCode::CLIENT_IO_ERROR, "I/O write error"), nullptr); handler_.reset(); while (!request_queue_.empty()) { unique_ptr<Request> request = move(request_queue_.front()); request_queue_.pop_front(); request->handler->Error(KineticStatus(StatusCode::CLIENT_IO_ERROR, "I/O write error")); "I/O write error"), nullptr); } return kError; } Loading @@ -109,7 +111,7 @@ NonblockingPacketServiceStatus NonblockingSender::Send() { LOG(WARNING) << "Could not enqueue handler; already had a handler for sequence " << message_sequence_ << " and handler key " << handler_key_; handler_->Error(KineticStatus(StatusCode::CLIENT_INTERNAL_ERROR, "Could not enqueue handler")); "Could not enqueue handler"), nullptr); } handler_.reset(); } Loading src/main/nonblocking_packet_service.cc +2 −1 Original line number Diff line number Diff line Loading @@ -44,7 +44,8 @@ HandlerKey NonblockingPacketService::Submit(unique_ptr<Message> message, HandlerKey key = next_key_++; if (failed_) { handler->Error(KineticStatus(StatusCode::CLIENT_SHUTDOWN, "Client already shut down")); handler->Error( KineticStatus(StatusCode::CLIENT_SHUTDOWN, "Client already shut down"), nullptr); } else { sender_->Enqueue(move(message), value, move(handler), key); } Loading Loading
include/kinetic/nonblocking_kinetic_connection.h +7 −7 Original line number Diff line number Diff line Loading @@ -65,7 +65,7 @@ class SimpleHandler : public HandlerInterface { public: explicit SimpleHandler(const shared_ptr<SimpleCallbackInterface> callback); void Handle(const Message &response, unique_ptr<const string> value); void Error(KineticStatus error, Message const * const response = nullptr); void Error(KineticStatus error, Message const * const response); private: const shared_ptr<SimpleCallbackInterface> callback_; Loading @@ -83,7 +83,7 @@ class GetHandler : public HandlerInterface { public: explicit GetHandler(const shared_ptr<GetCallbackInterface> callback); void Handle(const Message &response, unique_ptr<const string> value); void Error(KineticStatus error, Message const * const response = nullptr); void Error(KineticStatus error, Message const * const response); private: const shared_ptr<GetCallbackInterface> callback_; Loading @@ -101,7 +101,7 @@ class GetVersionHandler : public HandlerInterface { public: explicit GetVersionHandler(const shared_ptr<GetVersionCallbackInterface> callback); void Handle(const Message &response, unique_ptr<const string> value); void Error(KineticStatus error, Message const * const response = nullptr); void Error(KineticStatus error, Message const * const response); private: const shared_ptr<GetVersionCallbackInterface> callback_; Loading @@ -119,7 +119,7 @@ class GetKeyRangeHandler : public HandlerInterface { public: explicit GetKeyRangeHandler(const shared_ptr<GetKeyRangeCallbackInterface> callback); void Handle(const Message &response, unique_ptr<const string> value); void Error(KineticStatus error, Message const * const response = nullptr); void Error(KineticStatus error, Message const * const response); private: const shared_ptr<GetKeyRangeCallbackInterface> callback_; Loading @@ -138,7 +138,7 @@ class PutHandler : public HandlerInterface { public: explicit PutHandler(const shared_ptr<PutCallbackInterface> callback); void Handle(const Message &response, unique_ptr<const string> value); void Error(KineticStatus error, Message const * const response = nullptr); void Error(KineticStatus error, Message const * const response); private: const shared_ptr<PutCallbackInterface> callback_; Loading @@ -156,7 +156,7 @@ class GetLogHandler : public HandlerInterface { public: explicit GetLogHandler(const shared_ptr<GetLogCallbackInterface> callback); void Handle(const Message& response, unique_ptr<const string> value); void Error(KineticStatus error, Message const * const response = nullptr); void Error(KineticStatus error, Message const * const response); private: const shared_ptr<GetLogCallbackInterface> callback_; Loading @@ -174,7 +174,7 @@ class P2PPushHandler : public HandlerInterface { public: explicit P2PPushHandler(const shared_ptr<P2PPushCallbackInterface> callback); void Handle(const Message& response, unique_ptr<const string> value); void Error(KineticStatus error, Message const * const response = nullptr); void Error(KineticStatus error, Message const * const response); private: const shared_ptr<P2PPushCallbackInterface> callback_; Loading
include/kinetic/nonblocking_packet_service_interface.h +1 −1 Original line number Diff line number Diff line Loading @@ -44,7 +44,7 @@ class HandlerInterface { // response is re-used, so make sure to copy everything you need out of it virtual void Handle(const Message &response, unique_ptr<const string> value) = 0; virtual void Error(KineticStatus error, Message const * const response = nullptr) = 0; virtual void Error(KineticStatus error, Message const * const response) = 0; }; class NonblockingPacketServiceInterface { Loading
src/main/nonblocking_packet_receiver.cc +2 −2 Original line number Diff line number Diff line Loading @@ -184,7 +184,7 @@ int64_t NonblockingReceiver::connection_id() { void NonblockingReceiver::CallAllErrorHandlers(KineticStatus error) { if (handler_) { handler_->Error(error); handler_->Error(error, nullptr); handler_.reset(); } Loading @@ -197,7 +197,7 @@ void NonblockingReceiver::CallAllErrorHandlers(KineticStatus error) { CHECK_EQ((size_t) 1, handler_to_message_seq_map_.erase(handler_key)) << "Couldn't delete handler to sequence entry for handler_key " << handler_key; handler->Error(error); handler->Error(error, nullptr); handler.reset(); iter++; } Loading
src/main/nonblocking_packet_sender.cc +6 −4 Original line number Diff line number Diff line Loading @@ -61,7 +61,8 @@ NonblockingSender::~NonblockingSender() { unique_ptr<Request> request = move(request_queue_.front()); request_queue_.pop_front(); request->handler->Error( KineticStatus(StatusCode::CLIENT_SHUTDOWN, "Sender shutdown")); KineticStatus(StatusCode::CLIENT_SHUTDOWN, "Sender shutdown"), nullptr); } } Loading Loading @@ -90,14 +91,15 @@ NonblockingPacketServiceStatus NonblockingSender::Send() { CHECK_EQ(kFailed, status); handler_->Error(KineticStatus(StatusCode::CLIENT_IO_ERROR, "I/O write error")); handler_->Error( KineticStatus(StatusCode::CLIENT_IO_ERROR, "I/O write error"), nullptr); handler_.reset(); while (!request_queue_.empty()) { unique_ptr<Request> request = move(request_queue_.front()); request_queue_.pop_front(); request->handler->Error(KineticStatus(StatusCode::CLIENT_IO_ERROR, "I/O write error")); "I/O write error"), nullptr); } return kError; } Loading @@ -109,7 +111,7 @@ NonblockingPacketServiceStatus NonblockingSender::Send() { LOG(WARNING) << "Could not enqueue handler; already had a handler for sequence " << message_sequence_ << " and handler key " << handler_key_; handler_->Error(KineticStatus(StatusCode::CLIENT_INTERNAL_ERROR, "Could not enqueue handler")); "Could not enqueue handler"), nullptr); } handler_.reset(); } Loading
src/main/nonblocking_packet_service.cc +2 −1 Original line number Diff line number Diff line Loading @@ -44,7 +44,8 @@ HandlerKey NonblockingPacketService::Submit(unique_ptr<Message> message, HandlerKey key = next_key_++; if (failed_) { handler->Error(KineticStatus(StatusCode::CLIENT_SHUTDOWN, "Client already shut down")); handler->Error( KineticStatus(StatusCode::CLIENT_SHUTDOWN, "Client already shut down"), nullptr); } else { sender_->Enqueue(move(message), value, move(handler), key); } Loading