Skip to content

Commit

Permalink
Feature/subscription engine (#813)
Browse files Browse the repository at this point in the history
* subscription: utils

Signed-off-by: iceseer <[email protected]>

* threaded handler

Signed-off-by: iceseer <[email protected]>

* threaded handler queue

Signed-off-by: iceseer <[email protected]>

* threaded handler queue

Signed-off-by: iceseer <[email protected]>

* threaded handler queue

Signed-off-by: iceseer <[email protected]>

* refactoring

Signed-off-by: iceseer <[email protected]>

* dispatcher

Signed-off-by: iceseer <[email protected]>

* dispatcher

Signed-off-by: iceseer <[email protected]>

* subscription engine

Signed-off-by: iceseer <[email protected]>

* format

Signed-off-by: iceseer <[email protected]>

* compile-time murmur2

Signed-off-by: iceseer <[email protected]>

* engine refactoring

Signed-off-by: iceseer <[email protected]>

* dispatcher in engine

Signed-off-by: iceseer <[email protected]>

* TID chained

Signed-off-by: iceseer <[email protected]>

* dispatcher checks

Signed-off-by: iceseer <[email protected]>

* engine multithreaded

Signed-off-by: iceseer <[email protected]>

* format

Signed-off-by: iceseer <[email protected]>

* format

Signed-off-by: iceseer <[email protected]>

* manager

Signed-off-by: iceseer <[email protected]>

* compiled

Signed-off-by: iceseer <[email protected]>

* compiled

Signed-off-by: iceseer <[email protected]>

* work

Signed-off-by: iceseer <[email protected]>

* refactoring

Signed-off-by: iceseer <[email protected]>

* format

Signed-off-by: iceseer <[email protected]>

* comments

Signed-off-by: iceseer <[email protected]>

* test

Signed-off-by: iceseer <[email protected]>

* rollback test

Signed-off-by: iceseer <[email protected]>

# Conflicts:
#	irohad/main/application.cpp

* move args

Signed-off-by: iceseer <[email protected]>

* thread safe

Signed-off-by: iceseer <[email protected]>

* fixup!

Signed-off-by: iceseer <[email protected]>

* refactoring

Signed-off-by: iceseer <[email protected]>

* format

Signed-off-by: iceseer <[email protected]>

* rw object holder

Signed-off-by: iceseer <[email protected]>

* rename

Signed-off-by: iceseer <[email protected]>

* SE sync call

Signed-off-by: iceseer <[email protected]>

* issue fixes

Signed-off-by: iceseer <[email protected]>

* revert sync call

Signed-off-by: iceseer <[email protected]>
Signed-off-by: Alexander Lednev <[email protected]>
  • Loading branch information
iceseer authored Mar 10, 2021
1 parent b4a9cc9 commit 9701b25
Show file tree
Hide file tree
Showing 8 changed files with 852 additions and 0 deletions.
87 changes: 87 additions & 0 deletions irohad/subscription/common.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#ifndef IROHA_SUBSCRIPTION_COMMON_HPP
#define IROHA_SUBSCRIPTION_COMMON_HPP

#include <chrono>
#include <mutex>
#include <shared_mutex>

namespace iroha::utils {

struct NoCopy {
NoCopy(NoCopy const &) = delete;
NoCopy &operator=(NoCopy const &) = delete;
NoCopy() = default;
};

struct NoMove {
NoMove(NoMove &&) = delete;
NoMove &operator=(NoMove &&) = delete;
NoMove() = default;
};

/**
* Protected object wrapper. Allow read-write access.
* @tparam T object type
* Example:
* @code
* ReadWriteObject<std::string> obj("1");
* bool const is_one_att1 = obj.sharedAccess([](auto const &str) { return str == "1"; });
* obj.exclusiveAccess([](auto &str) { str = "2"; });
* bool const is_one_att2 = obj.sharedAccess([](auto const &str) { return str == "1"; });
* std::cout <<
* "Attempt 1: " << is_one_att1 << std::endl <<
* "Attempt 2: " << is_one_att2;
* @endcode
*/
template <typename T>
struct ReadWriteObject {
template <typename... Args>
ReadWriteObject(Args &&... args) : t_(std::forward<Args>(args)...) {}

template <typename F>
inline auto exclusiveAccess(F &&f) {
std::unique_lock lock(cs_);
return std::forward<F>(f)(t_);
}

template <typename F>
inline auto sharedAccess(F &&f) const {
std::shared_lock lock(cs_);
return std::forward<F>(f)(t_);
}

private:
T t_;
mutable std::shared_mutex cs_;
};

class WaitForSingleObject final : NoMove, NoCopy {
std::condition_variable wait_cv_;
std::mutex wait_m_;
std::atomic_flag flag_;

public:
WaitForSingleObject() {
flag_.test_and_set();
}

bool wait(std::chrono::microseconds wait_timeout) {
std::unique_lock<std::mutex> _lock(wait_m_);
return wait_cv_.wait_for(_lock,
wait_timeout,
[&]() { return !flag_.test_and_set(); });
}

void set() {
flag_.clear();
wait_cv_.notify_one();
}
};
} // namespace iroha::utils

#endif // IROHA_SUBSCRIPTION_COMMON_HPP
47 changes: 47 additions & 0 deletions irohad/subscription/dispatcher.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#ifndef IROHA_SUBSCRIPTION_DISPATCHER_HPP
#define IROHA_SUBSCRIPTION_DISPATCHER_HPP

#include "subscription/common.hpp"
#include "subscription/thread_handler.hpp"

namespace iroha::subscription {

template <uint32_t kCount>
class Dispatcher final : utils::NoCopy, utils::NoMove {
public:
static constexpr uint32_t kHandlersCount = kCount;
using Task = ThreadHandler::Task;
using Tid = uint32_t;

private:
ThreadHandler handlers_[kHandlersCount];

public:
Dispatcher() = default;

template <Tid kId>
static constexpr void checkTid() {
static_assert(kId < kHandlersCount, "Unexpected TID handler.");
}

template <typename F>
void add(Tid tid, F &&f) {
assert(tid < kHandlersCount);
handlers_[tid].add(std::forward<F>(f));
}

template <typename F>
void addDelayed(Tid tid, std::chrono::microseconds timeout, F &&f) {
assert(tid < kHandlersCount);
handlers_[tid].addDelayed(timeout, std::forward<F>(f));
}
};

} // namespace iroha::subscription

#endif // IROHA_SUBSCRIPTION_DISPATCHER_HPP
52 changes: 52 additions & 0 deletions irohad/subscription/subscriber.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#ifndef IROHA_SUBSCRIPTION_SUBSCRIBER_HPP
#define IROHA_SUBSCRIPTION_SUBSCRIBER_HPP

#include <atomic>
#include <functional>
#include <memory>
#include <mutex>

#include "subscription/common.hpp"

namespace iroha::subscription {

using SubscriptionSetId = uint32_t;

/**
* Base class that determines the subscriber.
* @tparam EventKey type of listening event
* @tparam Dispatcher thread dispatcher to execute tasks
* @tparam Arguments list of event arguments
*/
template <typename EventKey, typename Dispatcher, typename... Arguments>
class Subscriber : public std::enable_shared_from_this<
Subscriber<EventKey, Dispatcher, Arguments...>>,
utils::NoMove,
utils::NoCopy {
protected:
Subscriber() = default;

public:
using EventType = EventKey;

virtual ~Subscriber() {}

/**
* Notification callback function
* @param set_id the id of the subscription set
* @param key notified event
* @param args event data
*/
virtual void on_notify(SubscriptionSetId set_id,
const EventType &key,
Arguments &&... args) = 0;
};

} // namespace iroha::subscription

#endif // IROHA_SUBSCRIPTION_SUBSCRIBER_HPP
166 changes: 166 additions & 0 deletions irohad/subscription/subscriber_impl.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#ifndef IROHA_SUBSCRIPTION_SUBSCRIBER_IMPL_HPP
#define IROHA_SUBSCRIPTION_SUBSCRIBER_IMPL_HPP

#include <atomic>
#include <functional>
#include <memory>
#include <mutex>

#include "subscription/common.hpp"
#include "subscription/subscriber.hpp"
#include "subscription/subscription_engine.hpp"

namespace iroha::subscription {

/**
* Is a wrapper class, which provides subscription to events from
* SubscriptionEngine
* @tparam EventKey is a type of a particular subscription event (might be a
* key from an observed storage or a specific event type from an enumeration).
* @tparam Dispatcher thread dispatcher
* @tparam Receiver is a type of an object which is a part of Subscriber's
* internal state and can be accessed on every event notification.
* @tparam Arguments is a set of types of objects that are passed on every
* event notification.
*/
template <typename EventKey,
typename Dispatcher,
typename Receiver,
typename... Arguments>
class SubscriberImpl final
: public Subscriber<EventKey, Dispatcher, Arguments...> {
public:
using ReceiverType = Receiver;
using Hash = size_t;
using Parent = Subscriber<EventKey, Dispatcher, Arguments...>;

using SubscriptionEngineType = SubscriptionEngine<
typename Parent::EventType,
Dispatcher,
Subscriber<typename Parent::EventType, Dispatcher, Arguments...>>;
using SubscriptionEnginePtr = std::shared_ptr<SubscriptionEngineType>;

using CallbackFnType =
std::function<void(SubscriptionSetId,
ReceiverType &,
const typename Parent::EventType &,
const Arguments &...)>;

private:
using SubscriptionsContainer =
std::unordered_map<typename Parent::EventType,
typename SubscriptionEngineType::IteratorType>;
using SubscriptionsSets =
std::unordered_map<SubscriptionSetId, SubscriptionsContainer>;

std::atomic<SubscriptionSetId> next_id_;
SubscriptionEnginePtr engine_;
ReceiverType object_;

std::mutex subscriptions_cs_;
SubscriptionsSets subscriptions_sets_;

CallbackFnType on_notify_callback_;

public:
template <typename... SubscriberConstructorArgs>
SubscriberImpl(SubscriptionEnginePtr const &ptr,
SubscriberConstructorArgs &&... args)
: next_id_(0ull),
engine_(ptr),
object_(std::forward<SubscriberConstructorArgs>(args)...) {}

~SubscriberImpl() {
// Unsubscribe all
for (auto &[_, subscriptions] : subscriptions_sets_)
for (auto &[key, it] : subscriptions) engine_->unsubscribe(key, it);
}

void setCallback(CallbackFnType &&f) {
on_notify_callback_ = std::move(f);
}

SubscriptionSetId generateSubscriptionSetId() {
return ++next_id_;
}

template <typename Dispatcher::Tid kTid>
void subscribe(SubscriptionSetId id,
const typename Parent::EventType &key) {
std::lock_guard lock(subscriptions_cs_);
auto &&[it, inserted] = subscriptions_sets_[id].emplace(
key, typename SubscriptionEngineType::IteratorType{});

/// Here we check first local subscriptions because of strong connection
/// with SubscriptionEngine.
if (inserted)
it->second = engine_->template subscribe<kTid>(
id, key, Parent::weak_from_this());
}

/**
* @param id -- subscription set id that unsubscribes from \arg key
* @param key -- event key to unsubscribe from
* @return true if was subscribed to \arg key, false otherwise
*/
bool unsubscribe(SubscriptionSetId id,
const typename Parent::EventType &key) {
std::lock_guard<std::mutex> lock(subscriptions_cs_);
if (auto set_it = subscriptions_sets_.find(id);
set_it != subscriptions_sets_.end()) {
auto &subscriptions = set_it->second;
auto it = subscriptions.find(key);
if (subscriptions.end() != it) {
engine_->unsubscribe(key, it->second);
subscriptions.erase(it);
return true;
}
}
return false;
}

/**
* @param id -- subscription set id to unsubscribe from
* @return true if was subscribed to \arg id, false otherwise
*/
bool unsubscribe(SubscriptionSetId id) {
std::lock_guard<std::mutex> lock(subscriptions_cs_);
if (auto set_it = subscriptions_sets_.find(id);
set_it != subscriptions_sets_.end()) {
auto &subscriptions = set_it->second;
for (auto &[key, it] : subscriptions) engine_->unsubscribe(key, it);

subscriptions_sets_.erase(set_it);
return true;
}
return false;
}

void unsubscribe() {
std::lock_guard<std::mutex> lock(subscriptions_cs_);
for (auto &[_, subscriptions] : subscriptions_sets_)
for (auto &[key, it] : subscriptions) engine_->unsubscribe(key, it);

subscriptions_sets_.clear();
}

void on_notify(SubscriptionSetId set_id,
const typename Parent::EventType &key,
Arguments &&... args) override {
if (nullptr != on_notify_callback_)
on_notify_callback_(set_id, object_, key, std::move(args)...);
}

ReceiverType &get() {
return object_;
}
};

} // namespace iroha::subscription

#endif // IROHA_SUBSCRIPTION_SUBSCRIBER_IMPL_HPP
Loading

0 comments on commit 9701b25

Please sign in to comment.