Skip to content

Commit

Permalink
revert sync call
Browse files Browse the repository at this point in the history
Signed-off-by: iceseer <[email protected]>
  • Loading branch information
iceseer committed Mar 10, 2021
1 parent c1de100 commit 994a4d9
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 72 deletions.
31 changes: 7 additions & 24 deletions irohad/subscription/subscriber_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,6 @@ namespace iroha::subscription {

CallbackFnType on_notify_callback_;

template <typename F>
inline void callSubscribe(SubscriptionSetId id,
const typename Parent::EventType &key,
F &&f) {
std::lock_guard lock(subscriptions_cs_);
auto &&[it, inserted] = subscriptions_sets_[id].emplace(
key, typename SubscriptionEngineType::IteratorType{});

/// Here we check first local subscriptions because of strong connection
/// with SubscriptionEngine.
if (inserted)
it->second = std::forward<F>(f)();
}

public:
template <typename... SubscriberConstructorArgs>
SubscriberImpl(SubscriptionEnginePtr const &ptr,
Expand All @@ -106,18 +92,15 @@ namespace iroha::subscription {
template <typename Dispatcher::Tid kTid>
void subscribe(SubscriptionSetId id,
const typename Parent::EventType &key) {
callSubscribe(id, key, [&]() {
return engine_->template subscribe<kTid>(
id, key, Parent::weak_from_this());
});
}
std::lock_guard lock(subscriptions_cs_);
auto &&[it, inserted] = subscriptions_sets_[id].emplace(
key, typename SubscriptionEngineType::IteratorType{});

void subscribeSync(SubscriptionSetId id,
const typename Parent::EventType &key) {
callSubscribe(id, key, [&]() {
return engine_->template subscribeSync(
/// Here we check first local subscriptions because of strong connection
/// with SubscriptionEngine.
if (inserted)
it->second = engine_->template subscribe<kTid>(
id, key, Parent::weak_from_this());
});
}

/**
Expand Down
71 changes: 23 additions & 48 deletions irohad/subscription/subscription_engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,9 @@ namespace iroha::subscription {
/// List is preferable here because this container iterators remain
/// alive after removal from the middle of the container
/// using custom allocator
using SubscribersContainer =
std::list<std::tuple<std::optional<typename Dispatcher::Tid>,
SubscriptionSetId,
SubscriberWeakPtr>>;
using SubscribersContainer = std::list<std::tuple<typename Dispatcher::Tid,
SubscriptionSetId,
SubscriberWeakPtr>>;
using IteratorType = typename SubscribersContainer::iterator;

public:
Expand All @@ -66,33 +65,19 @@ namespace iroha::subscription {
KeyValueContainer subscribers_map_;
DispatcherPtr dispatcher_;

inline IteratorType callSubscribe(
std::optional<typename Dispatcher::Tid> &&tid,
SubscriptionSetId set_id,
const EventKeyType &key,
SubscriberWeakPtr ptr) {
std::unique_lock lock(subscribers_map_cs_);
auto &subscribers_context = subscribers_map_[key];

std::lock_guard l(subscribers_context.subscribers_list_cs);
return subscribers_context.subscribers_list.emplace(
subscribers_context.subscribers_list.end(),
std::make_tuple(std::move(tid), set_id, std::move(ptr)));
}

public:
template <typename Dispatcher::Tid kTid>
IteratorType subscribe(SubscriptionSetId set_id,
const EventKeyType &key,
SubscriberWeakPtr ptr) {
Dispatcher::template checkTid<kTid>();
return callSubscribe(kTid, set_id, key, ptr);
}
std::unique_lock lock(subscribers_map_cs_);
auto &subscribers_context = subscribers_map_[key];

IteratorType subscribeSync(SubscriptionSetId set_id,
const EventKeyType &key,
SubscriberWeakPtr ptr) {
return callSubscribe(std::nullopt, set_id, key, ptr);
std::lock_guard l(subscribers_context.subscribers_list_cs);
return subscribers_context.subscribers_list.emplace(
subscribers_context.subscribers_list.end(),
std::make_tuple(kTid, set_id, std::move(ptr)));
}

void unsubscribe(const EventKeyType &key, const IteratorType &it_remove) {
Expand Down Expand Up @@ -148,32 +133,22 @@ namespace iroha::subscription {
it_sub != subscribers_container.subscribers_list.end();) {
auto wsub = std::get<2>(*it_sub);
auto id = std::get<1>(*it_sub);
auto const tid = std::get<0>(*it_sub);

if (auto sub = wsub.lock()) {
if (tid.has_value()) {
dispatcher_->addDelayed(
tid.value(),
timeout,
[wsub(std::move(wsub)),
id(id),
key(key),
args = std::make_tuple(args...)]() mutable {
if (auto sub = wsub.lock())
std::apply(
[&](auto &&... args) {
sub->on_notify(id, key, std::move(args)...);
},
std::move(args));
});
} else {
assert(timeout == std::chrono::microseconds(0ull));
std::apply(
[&](auto &&... args) {
sub->on_notify(id, key, std::move(args)...);
},
std::make_tuple(args...));
}
dispatcher_->addDelayed(std::get<0>(*it_sub),
timeout,
[wsub(std::move(wsub)),
id(id),
key(key),
args = std::make_tuple(args...)]() mutable {
if (auto sub = wsub.lock())
std::apply(
[&](auto &&... args) {
sub->on_notify(
id, key, std::move(args)...);
},
std::move(args));
});
++it_sub;
} else {
it_sub = subscribers_container.subscribers_list.erase(it_sub);
Expand Down

0 comments on commit 994a4d9

Please sign in to comment.