Commit cbd0bacd authored by Andrew Mitchell's avatar Andrew Mitchell
Browse files

Added threadsafe blocking client as a course way to prevent threading issue

parent 973678b5
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -133,6 +133,7 @@ add_library(kinetic_client
    src/main/nonblocking_string.cc
    src/main/socket_wrapper.cc
    src/main/blocking_kinetic_connection.cc
    src/main/threadsafe_blocking_kinetic_connection.cc
    src/main/connection_handle.cc
    src/main/key_range_iterator.cc
)
+122 −0
Original line number Diff line number Diff line
/*
 * kinetic-cpp-client
 * Copyright (C) 2014 Seagate Technology.
 * 
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
 *
 */

#ifndef KINETIC_CPP_CLIENT_THREADSAFE_BLOCKING_KINETIC_CONNECTION_H_
#define KINETIC_CPP_CLIENT_THREADSAFE_BLOCKING_KINETIC_CONNECTION_H_

#include <memory>
#include <mutex>

#include "kinetic/blocking_kinetic_connection.h"
#include "kinetic/threadsafe_nonblocking_connection.h"
#include "kinetic/status.h"
#include "kinetic/kinetic_connection.h"
#include "kinetic/key_range_iterator.h"
#include "protobufutil/common.h"


namespace kinetic {

using std::shared_ptr;
using std::unique_ptr;
using std::string;
using std::vector;

class KeyRangeIterator;
class BlockingCallbackState;

class ThreadsafeBlockingKineticConnection : public BlockingKineticConnection {
    public:
    /// Takes ownership of the given NonblockingKineticConnection
    /// @param[in] nonblocking_connection   The underlying connection that will be used
    /// @param[in] network_timeout_seconds  If an operation goes more than network_timeout_seconds
    ///                                     seconds without receiving data the operation will fail
    explicit ThreadsafeBlockingKineticConnection(
        NonblockingKineticConnection* nonblocking_connection,
        unsigned int network_timeout_seconds);
    ~ThreadsafeBlockingKineticConnection();

    /// If the drive has a non-zero cluster version, requests will fail unless the developer
    /// tells the client the correct cluster version using this method.
    void SetClientClusterVersion(int64_t cluster_version);

    KineticStatus NoOp();

    KineticStatus Get(
            const shared_ptr<const string> key,
            unique_ptr<KineticRecord>& record);

    KineticStatus GetNext(
            const shared_ptr<const string> key,
            unique_ptr<string>& actual_key,
            unique_ptr<KineticRecord>& record);

    KineticStatus GetPrevious(const shared_ptr<const string> key,
            unique_ptr<string>& actual_key,
            unique_ptr<KineticRecord>& record);

    KineticStatus GetVersion(const shared_ptr<const string> key,
            unique_ptr<string>& version);


    KineticStatus GetKeyRange(const shared_ptr<const string> start_key,
            bool start_key_inclusive,
            const shared_ptr<const string> end_key,
            bool end_key_inclusive,
            bool reverse_results,
            int32_t max_results,
            unique_ptr<vector<string>>& keys);

    KeyRangeIterator IterateKeyRange(const shared_ptr<const string> start_key,
            bool start_key_inclusive,
            const shared_ptr<const string> end_key,
            bool end_key_inclusive,
            unsigned int frame_size);

    KineticStatus Put(const shared_ptr<const string> key,
            const shared_ptr<const string> current_version, WriteMode mode,
            const shared_ptr<const KineticRecord> record);

    KineticStatus Delete(const shared_ptr<const string> key,
            const shared_ptr<const string> version, WriteMode mode);

    KineticStatus InstantSecureErase(const shared_ptr<string> pin);

    KineticStatus SetClusterVersion(int64_t cluster_version);

    KineticStatus GetLog(unique_ptr<DriveLog>& drive_log);

    KineticStatus UpdateFirmware(const shared_ptr<const string> new_firmware);

    KineticStatus SetACLs(const shared_ptr<const list<ACL>> acls);

    KineticStatus SetPin(const shared_ptr<const string> new_pin,
            const shared_ptr<const string> current_pin = make_shared<string>());

    KineticStatus P2PPush(const shared_ptr<const P2PPushRequest> push_request,
            unique_ptr<vector<KineticStatus>>& operation_statuses);

    private:
    std::mutex mutex_;
};

} // namespace kinetic

#endif  // KINETIC_CPP_CLIENT_THREADSAFE_BLOCKING_KINETIC_CONNECTION_H_
+11 −6
Original line number Diff line number Diff line
@@ -18,6 +18,7 @@
 *
 */

#include "kinetic/threadsafe_blocking_kinetic_connection.h"
#include "kinetic/kinetic_connection_factory.h"
#include "kinetic/threadsafe_nonblocking_connection.h"
#include "socket_wrapper.h"
@@ -69,17 +70,21 @@ Status KineticConnectionFactory::doNewConnection(ConnectionOptions const &option

    NonblockingPacketService *service =
        new NonblockingPacketService(socket_wrapper, move(sender), receiver);
    NonblockingKineticConnection *nonblocking_connection;

    if (threadsafe) {
        nonblocking_connection = new ThreadsafeNonblockingKineticConnection(service);
        NonblockingKineticConnection* nonblocking_connection =
                new ThreadsafeNonblockingKineticConnection(service);
        ThreadsafeBlockingKineticConnection* blocking_connection =
                new ThreadsafeBlockingKineticConnection(nonblocking_connection, network_timeout_seconds);
        connection.reset(new ConnectionHandle(blocking_connection, nonblocking_connection));
    } else {
        nonblocking_connection = new NonblockingKineticConnection(service);
    }

        NonblockingKineticConnection* nonblocking_connection =
                new NonblockingKineticConnection(service);
        BlockingKineticConnection* blocking_connection =
                new BlockingKineticConnection(nonblocking_connection, network_timeout_seconds);
        connection.reset(new ConnectionHandle(blocking_connection, nonblocking_connection));
    }


    return Status::makeOk();
}
+158 −0
Original line number Diff line number Diff line
/*
 * kinetic-cpp-client
 * Copyright (C) 2014 Seagate Technology.
 * 
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
 *
 */

#include <memory>
#include <stdexcept>

#include "kinetic/threadsafe_blocking_kinetic_connection.h"


namespace kinetic {

using std::shared_ptr;
using std::unique_ptr;
using std::string;
using std::make_shared;
using std::move;

ThreadsafeBlockingKineticConnection::ThreadsafeBlockingKineticConnection(
        NonblockingKineticConnection* nonblocking_connection, unsigned int network_timeout_seconds)
    : BlockingKineticConnection(nonblocking_connection,network_timeout_seconds) {}

ThreadsafeBlockingKineticConnection::~ThreadsafeBlockingKineticConnection() {}

KineticStatus ThreadsafeBlockingKineticConnection::NoOp() {
    std::lock_guard<std::mutex> guard(mutex_);
    return BlockingKineticConnection::NoOp();
}

void ThreadsafeBlockingKineticConnection::SetClientClusterVersion(int64_t cluster_version) {
    std::lock_guard<std::mutex> guard(mutex_);
    return BlockingKineticConnection::SetClientClusterVersion(cluster_version);
}

KineticStatus ThreadsafeBlockingKineticConnection::Get(const shared_ptr<const string> key,
    unique_ptr<KineticRecord>& record) {
    std::lock_guard<std::mutex> guard(mutex_);
    return BlockingKineticConnection::Get(key, record);
}

KineticStatus ThreadsafeBlockingKineticConnection::Put(const shared_ptr<const string> key,
        const shared_ptr<const string> current_version, WriteMode mode,
        const shared_ptr<const KineticRecord> record) {
    std::lock_guard<std::mutex> guard(mutex_);
    return BlockingKineticConnection::Put(key, current_version, mode, record);
}

KineticStatus ThreadsafeBlockingKineticConnection::Delete(const shared_ptr<const string> key,
        const shared_ptr<const string> version, WriteMode mode) {
        std::lock_guard<std::mutex> guard(mutex_);
        return BlockingKineticConnection::Delete(key, version, mode);
}

KineticStatus ThreadsafeBlockingKineticConnection::InstantSecureErase(const shared_ptr<string> pin) {
    std::lock_guard<std::mutex> guard(mutex_);
    return BlockingKineticConnection::InstantSecureErase(pin);
}

KineticStatus ThreadsafeBlockingKineticConnection::SetClusterVersion(int64_t new_cluster_version) {
    std::lock_guard<std::mutex> guard(mutex_);
    return BlockingKineticConnection::SetClusterVersion(new_cluster_version);
}

KineticStatus ThreadsafeBlockingKineticConnection::GetLog(unique_ptr<DriveLog>& drive_log) {
    std::lock_guard<std::mutex> guard(mutex_);
    return BlockingKineticConnection::GetLog(drive_log);
}

KineticStatus ThreadsafeBlockingKineticConnection::UpdateFirmware(const shared_ptr<const string>
        new_firmware) {
    std::lock_guard<std::mutex> guard(mutex_);
    return BlockingKineticConnection::UpdateFirmware(new_firmware);
}

KineticStatus ThreadsafeBlockingKineticConnection::SetACLs(const shared_ptr<const list<ACL>> acls) {
    std::lock_guard<std::mutex> guard(mutex_);
    return BlockingKineticConnection::SetACLs(acls);
}

KineticStatus ThreadsafeBlockingKineticConnection::SetPin(const shared_ptr<const string> new_pin,
    const shared_ptr<const string> current_pin) {
    std::lock_guard<std::mutex> guard(mutex_);
    return BlockingKineticConnection::SetPin(new_pin, current_pin);
}

KineticStatus ThreadsafeBlockingKineticConnection::GetNext(const shared_ptr<const string> key,
        unique_ptr<string>& actual_key, unique_ptr<KineticRecord>& record) {
    std::lock_guard<std::mutex> guard(mutex_);
    return BlockingKineticConnection::GetNext(key, actual_key, record);
}

KineticStatus ThreadsafeBlockingKineticConnection::GetPrevious(const shared_ptr<const string> key,
        unique_ptr<string>& actual_key, unique_ptr<KineticRecord>& record) {
    std::lock_guard<std::mutex> guard(mutex_);
    return BlockingKineticConnection::GetNext(key, actual_key, record);
}

KineticStatus ThreadsafeBlockingKineticConnection::GetVersion(const shared_ptr<const string> key,
        unique_ptr<string>& version) {
    std::lock_guard<std::mutex> guard(mutex_);
    return BlockingKineticConnection::GetVersion(key, version);
}

KineticStatus ThreadsafeBlockingKineticConnection::GetKeyRange(const shared_ptr<const string> start_key,
        bool start_key_inclusive,
        const shared_ptr<const string> end_key,
        bool end_key_inclusive,
        bool reverse_results,
        int32_t max_results,
        unique_ptr<vector<string>>& keys) {
    std::lock_guard<std::mutex> guard(mutex_);
    return BlockingKineticConnection::GetKeyRange(start_key,
            start_key_inclusive,
            end_key,
            end_key_inclusive,
            reverse_results,
            max_results,
            keys);
}

KeyRangeIterator ThreadsafeBlockingKineticConnection::IterateKeyRange(
        const shared_ptr<const string> start_key,
        bool start_key_inclusive,
        const shared_ptr<const string> end_key,
        bool end_key_inclusive,
        unsigned int frame_size) {
    std::lock_guard<std::mutex> guard(mutex_);
    return BlockingKineticConnection::IterateKeyRange(start_key,
            start_key_inclusive,
            end_key,
            end_key_inclusive,
            frame_size);
}

KineticStatus ThreadsafeBlockingKineticConnection::P2PPush(
        const shared_ptr<const P2PPushRequest> push_request,
        unique_ptr<vector<KineticStatus>>& operation_statuses) {
    std::lock_guard<std::mutex> guard(mutex_);
    return BlockingKineticConnection::P2PPush(push_request, operation_statuses);
}

} // namespace kinetic