Commit 4dd1549d authored by Manuel Wudka-Robles's avatar Manuel Wudka-Robles
Browse files

copydrive example allows copying to multiple drives via recursive P2P

parent 7d1a17a7
Loading
Loading
Loading
Loading
+38 −16
Original line number Diff line number Diff line
@@ -11,8 +11,26 @@ using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::vector;
using std::pair;
using std::make_pair;

kinetic::P2PPushRequest prepare_request(const vector<kinetic::P2PPushOperation>& operations, const vector<pair<string, int>>& destinations, size_t currentDestination) {
    kinetic::P2PPushRequest request;
    if (currentDestination < destinations.size() - 1) {
        request.requests.push_back(prepare_request(operations, destinations, currentDestination + 1));
    }

    request.host = destinations[currentDestination].first;
    request.port = destinations[currentDestination].second;

    request.operations = operations;

    return request;
}

void dispatch_request(kinetic::BlockingKineticConnection &connection, const vector<kinetic::P2PPushOperation>& operations, const vector<pair<string, int>>& destinations) {
    kinetic::P2PPushRequest request = prepare_request(operations, destinations, 0);

void dispatch_request(kinetic::BlockingKineticConnection& connection, const kinetic::P2PPushRequest& request) {
    unique_ptr<vector<kinetic::KineticStatus>> statuses(new vector<kinetic::KineticStatus>());
    if (!connection.P2PPush(request, statuses).ok()) {
        printf("Error pushing\n");
@@ -23,24 +41,31 @@ void dispatch_request(kinetic::BlockingKineticConnection& connection, const kine
        if (it->ok()) {
            printf(".");
        } else {
            printf("X");
            printf("%s", it->message().c_str());
        }
    }
    fflush(stdout);
}

int main(int argc, char* argv[]) {
    if (argc != 5) {
        printf("%s: <source host> <source port> <destination host> <destination port>\n", argv[0]);
    if (argc < 5 || argc % 2 != 1) {
        printf("%s: <source host> <source port> <destination host> <destination port> [<additional host> <additional port> ... ]\n", argv[0]);
        return 1;
    }

    const char* source_host = argv[1];
    int source_port = atoi(argv[2]);
    const char* dest_host = argv[3];
    int dest_port = atoi(argv[4]);

    printf("Copying from %s:%d -> %s:%d\n", source_host, source_port, dest_host, dest_port);
    vector<pair<string, int>> destinations;

    printf("Copying from %s:%d", source_host, source_port);

    for (int i = 3; i < argc; i += 2) {
        auto destination = make_pair(argv[i], atoi(argv[i + 1]));
        destinations.push_back(destination);
        printf(" -> %s:%d", destination.first, destination.second);
    }
    printf("\n");


    kinetic::ConnectionOptions options;
@@ -57,10 +82,7 @@ int main(int argc, char* argv[]) {
        return 1;
    }

    kinetic::P2PPushRequest request;
    request.host = dest_host;
    request.port = dest_port;

    vector<kinetic::P2PPushOperation> operations;

    // Build a key consisting of "FFFFFF...". In almost all cases this will come after the last
    // key in the drive
@@ -75,15 +97,15 @@ int main(int argc, char* argv[]) {
        op.key = *it;
        op.force = true;
        op.newKey = *it;
        request.operations.push_back(op);
        operations.push_back(op);

        if (request.operations.size() > kP2PBatchSize) {
            dispatch_request(connection->blocking(), request);
            request.operations.clear();
        if (operations.size() > kP2PBatchSize) {
            dispatch_request(connection->blocking(), operations, destinations);
            operations.clear();
        }
    }

    dispatch_request(connection->blocking(), request);
    dispatch_request(connection->blocking(), operations, destinations);

    printf("\n");