Commit da61e031 authored by Andrew Mitchell's avatar Andrew Mitchell
Browse files

Merge branch 'master' into features/protocol-2-0-4

parents 7a623077 47575b19
Loading
Loading
Loading
Loading
+15 −15
Original line number Diff line number Diff line
@@ -65,7 +65,7 @@ class SimpleHandler : public HandlerInterface {
    public:
    explicit SimpleHandler(const shared_ptr<SimpleCallbackInterface> callback);
    void Handle(const Message &response, unique_ptr<const string> value);
    void Error(KineticStatus error);
    void Error(KineticStatus error, Message const * const response);

    private:
    const shared_ptr<SimpleCallbackInterface> callback_;
@@ -83,7 +83,7 @@ class GetHandler : public HandlerInterface {
    public:
    explicit GetHandler(const shared_ptr<GetCallbackInterface> callback);
    void Handle(const Message &response, unique_ptr<const string> value);
    void Error(KineticStatus error);
    void Error(KineticStatus error, Message const * const response);

    private:
    const shared_ptr<GetCallbackInterface> callback_;
@@ -101,7 +101,7 @@ class GetVersionHandler : public HandlerInterface {
    public:
    explicit GetVersionHandler(const shared_ptr<GetVersionCallbackInterface> callback);
    void Handle(const Message &response, unique_ptr<const string> value);
    void Error(KineticStatus error);
    void Error(KineticStatus error, Message const * const response);

    private:
    const shared_ptr<GetVersionCallbackInterface> callback_;
@@ -119,7 +119,7 @@ class GetKeyRangeHandler : public HandlerInterface {
    public:
    explicit GetKeyRangeHandler(const shared_ptr<GetKeyRangeCallbackInterface> callback);
    void Handle(const Message &response, unique_ptr<const string> value);
    void Error(KineticStatus error);
    void Error(KineticStatus error, Message const * const response);

    private:
    const shared_ptr<GetKeyRangeCallbackInterface>  callback_;
@@ -138,7 +138,7 @@ class PutHandler : public HandlerInterface {
    public:
    explicit PutHandler(const shared_ptr<PutCallbackInterface> callback);
    void Handle(const Message &response, unique_ptr<const string> value);
    void Error(KineticStatus error);
    void Error(KineticStatus error, Message const * const response);

    private:
    const shared_ptr<PutCallbackInterface> callback_;
@@ -156,7 +156,7 @@ class GetLogHandler : public HandlerInterface {
    public:
    explicit GetLogHandler(const shared_ptr<GetLogCallbackInterface> callback);
    void Handle(const Message& response, unique_ptr<const string> value);
    void Error(KineticStatus error);
    void Error(KineticStatus error, Message const * const response);

    private:
    const shared_ptr<GetLogCallbackInterface> callback_;
@@ -166,15 +166,15 @@ class GetLogHandler : public HandlerInterface {
class P2PPushCallbackInterface {
    public:
    virtual ~P2PPushCallbackInterface() {}
    virtual void Success(unique_ptr<vector<KineticStatus>> operation_statuses) = 0;
    virtual void Failure(KineticStatus error) = 0;
    virtual void Success(unique_ptr<vector<KineticStatus>> operation_statuses, const Message& response) = 0;
    virtual void Failure(KineticStatus error, Message const * const response) = 0;
};

class P2PPushHandler : public HandlerInterface {
    public:
    explicit P2PPushHandler(const shared_ptr<P2PPushCallbackInterface> callback);
    void Handle(const Message& response, unique_ptr<const string> value);
    void Error(KineticStatus error);
    void Error(KineticStatus error, Message const * const response);

    private:
    const shared_ptr<P2PPushCallbackInterface> callback_;
@@ -198,6 +198,12 @@ struct P2PPushOperation {

    /// Ignore version on the remote drive. The same as specifying force with a regular put.
    bool force;

    /// P2P operations to execute on the drive specified in the request.
    /// This lets you set up a pipeline of P2P push. For example, a client can push
    /// a set of keys to A, and in the same request instruct A to push
    /// keys to B, and so on.
    shared_ptr<P2PPushRequest> request;
};

/// Represents a collection of P2P operations
@@ -210,12 +216,6 @@ struct P2PPushRequest {

    /// Operations to perform against the drive specified above
    vector<P2PPushOperation> operations;

    /// P2P operations to execute on the drive specified above. This lets
    /// you set up a pipeline of P2P push. For example, a client can push
    /// a set of keys to A, and in the same request instruct A to push
    /// keys to B, and so on.
    vector<P2PPushRequest> requests;
};

class NonblockingKineticConnection {
+1 −1
Original line number Diff line number Diff line
@@ -44,7 +44,7 @@ class HandlerInterface {

    // response is re-used, so make sure to copy everything you need out of it
    virtual void Handle(const Message &response, unique_ptr<const string> value) = 0;
    virtual void Error(KineticStatus error) = 0;
    virtual void Error(KineticStatus error, Message const * const response) = 0;
};

class NonblockingPacketServiceInterface {
+2 −2
Original line number Diff line number Diff line
@@ -413,11 +413,11 @@ class BlockingP2PPushCallback : public P2PPushCallbackInterface, public Blocking
    public:
    explicit BlockingP2PPushCallback(unique_ptr<vector<KineticStatus>>& statuses)
    : statuses_(statuses) {}
    virtual void Success(unique_ptr<vector<KineticStatus>> statuses) {
    virtual void Success(unique_ptr<vector<KineticStatus>> statuses, const Message& response) {
        OnSuccess();
        statuses_ = std::move(statuses);
    }
    virtual void Failure(KineticStatus error) {
    virtual void Failure(KineticStatus error, Message const * const response) {
        OnError(error);
    }

+13 −14
Original line number Diff line number Diff line
@@ -76,7 +76,7 @@ void GetHandler::Handle(const Message &response, unique_ptr<const string> value)
    callback_->Success(response.command().body().keyvalue().key(), move(record));
}

void GetHandler::Error(KineticStatus error) {
void GetHandler::Error(KineticStatus error, Message const * const response) {
    callback_->Failure(error);
}

@@ -87,7 +87,7 @@ void GetVersionHandler::Handle(const Message &response, unique_ptr<const string>
    callback_->Success(response.command().body().keyvalue().dbversion());
}

void GetVersionHandler::Error(KineticStatus error) {
void GetVersionHandler::Error(KineticStatus error, Message const * const response) {
    callback_->Failure(error);
}

@@ -109,7 +109,7 @@ void GetKeyRangeHandler::Handle(const Message &response, unique_ptr<const string
    callback_->Success(move(keys));
}

void GetKeyRangeHandler::Error(KineticStatus error) {
void GetKeyRangeHandler::Error(KineticStatus error, Message const * const response) {
    callback_->Failure(error);
}

@@ -120,7 +120,7 @@ void PutHandler::Handle(const Message &response, unique_ptr<const string> value)
    callback_->Success();
}

void PutHandler::Error(KineticStatus error) {
void PutHandler::Error(KineticStatus error, Message const * const response) {
    callback_->Failure(error);
}

@@ -131,7 +131,7 @@ void SimpleHandler::Handle(const Message &response, unique_ptr<const string> val
    callback_->Success();
}

void SimpleHandler::Error(KineticStatus error) {
void SimpleHandler::Error(KineticStatus error, Message const * const response) {
    callback_->Failure(error);
}

@@ -197,7 +197,7 @@ void GetLogHandler::Handle(const Message& response, unique_ptr<const string> val
    callback_->Success(move(drive_log));
}

void GetLogHandler::Error(KineticStatus error) {
void GetLogHandler::Error(KineticStatus error, Message const * const response) {
    callback_->Failure(error);
}

@@ -217,11 +217,11 @@ void P2PPushHandler::Handle(const Message& response, unique_ptr<const string> va
                KineticStatus(ConvertFromProtoStatus(status.code()), status.statusmessage()));
    }

    callback_->Success(move(statuses));
    callback_->Success(move(statuses), response);
}

void P2PPushHandler::Error(KineticStatus error) {
    callback_->Failure(error);
void P2PPushHandler::Error(KineticStatus error, Message const * const response) {
    callback_->Failure(error, response);
}

NonblockingKineticConnection::NonblockingKineticConnection(
@@ -578,12 +578,11 @@ void NonblockingKineticConnection::PopulateP2PMessage(
            op->set_newkey(it->newKey);
        }
        op->set_force(it->force);
    }

    for (auto it = push_request->requests.begin(); it != push_request->requests.end(); ++it) {
        auto req = mutable_p2pop->add_operation()->mutable_p2pop();

        PopulateP2PMessage(req, make_shared<P2PPushRequest>(*it));
        if (it->request != nullptr) {
            auto req = op->mutable_p2pop();
            PopulateP2PMessage(req, it->request);
        }
    }
}

+4 −3
Original line number Diff line number Diff line
@@ -170,7 +170,8 @@ NonblockingPacketServiceStatus NonblockingReceiver::Receive() {
            handler_->Handle(response_, move(value_));
        } else {
            handler_->Error(GetKineticStatus(ConvertFromProtoStatus(
                    response_.command().status().code()), response_.command().header().clusterversion()));
                    response_.command().status().code()), response_.command().header().clusterversion()),
                    &response_);
        }

        handler_.reset();
@@ -183,7 +184,7 @@ int64_t NonblockingReceiver::connection_id() {

void NonblockingReceiver::CallAllErrorHandlers(KineticStatus error) {
    if (handler_) {
        handler_->Error(error);
        handler_->Error(error, nullptr);
        handler_.reset();
    }

@@ -196,7 +197,7 @@ void NonblockingReceiver::CallAllErrorHandlers(KineticStatus error) {
        CHECK_EQ((size_t) 1, handler_to_message_seq_map_.erase(handler_key))
                << "Couldn't delete handler to sequence entry for handler_key " << handler_key;

        handler->Error(error);
        handler->Error(error, nullptr);
        handler.reset();
        iter++;
    }
Loading