Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deferred connection evaluation #41

Closed
wants to merge 9 commits into from
1 change: 1 addition & 0 deletions src/kdbindings/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ set(HEADERS
property.h
property_updater.h
signal.h
connection_evaluator.h
utils.h
)

Expand Down
70 changes: 70 additions & 0 deletions src/kdbindings/connection_evaluator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#pragma once

#include <functional>
#include <list>
#include <mutex>

namespace KDBindings {

/**
* @brief Manages and evaluates deferred Signal connections.
*
* The ConnectionEvaluator class is responsible for managing and evaluating connections
* to Signals. It provides mechanisms to delay and control the evaluation of connections.
* It therefore allows controlling when and on which thread slots connected to a Signal are executed.
*
* @see Signal::connectDeferred()
*/
class ConnectionEvaluator
phyBrackets marked this conversation as resolved.
Show resolved Hide resolved
{

public:
ConnectionEvaluator() = default;

// ConnectionEvaluators are not copyable, as it is designed to manage connections,
phyBrackets marked this conversation as resolved.
Show resolved Hide resolved
// and copying it could lead to unexpected behavior, including duplication of connections and issues
// related to connection lifetimes. Therefore, it is intentionally made non-copyable.
ConnectionEvaluator(const ConnectionEvaluator &) noexcept = delete;

ConnectionEvaluator &operator=(const ConnectionEvaluator &) noexcept = delete;

// ConnectionEvaluators are not moveable, as they are captures by-reference
// by the Signal, so moving them would lead to a dangling reference.
ConnectionEvaluator(ConnectionEvaluator &&other) noexcept = delete;
phyBrackets marked this conversation as resolved.
Show resolved Hide resolved

ConnectionEvaluator &operator=(ConnectionEvaluator &&other) noexcept = delete;

/**
* @brief Evaluate and execute deferred connections.
phyBrackets marked this conversation as resolved.
Show resolved Hide resolved
*
* This function is responsible for evaluating and executing deferred connections.
* And this function ensures thread safety
*/
void evaluateDeferredConnections()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall I think the approach of simply copying (or moving out of) the connections is quite cool, as it allows to quickly release the mutex again.

However, this approach conflicts with deleting connections.
When the following happens asynchronously

  • emit a deferred signal
  • disconnect the connection
  • evaluateDeferredConnections

The connection should either be evaluated before the disconnect call returns, or it should not be evaluated, if the disconnect call returns before evaluateDeferredConnections is called. As it's highly likely that the connection includes dangling pointers after the call to disconnect the connection.

So if an evaluation is in progress the call to disconnect cannot return until the evaluation has completely finished.

If you still want to keep the approach of simply moving the connections over to release the mutex early, you could add a second mutex for "disconnect", which would be held longer, whilst the mutex for adding new connections could already be released.
However, this double-locking may be less performant in the end, depending on how many connections are queued, as we now need to lock two mutexes when evaluating and when disconnecting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really have a doubt about this, how i am thinking about is, so follow the order
-> emit a deferred signal
-> disconnect the connection
-> evaluateDeferredConnections

disconnecting the signal after the emit or before the evaluation wouldn't be have any issue, as evaluation depends on the list of connections in the evaluator which has already queued up through the call to emit. Call to disconnect removes the connection from the m_connections of the Signal class, which is fine.

I can only think about the issue when evaluation or disconnect either be used in mulithreading context. Or i might be thinking wrong?

I am adding the test case for this, to look up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like we discussed with @seanharmer and @lemirep , both approaches are feasible.
Removing queued invocations on disconnect is really a lot better to protect from dangling references.
But as Sean noted, it might just be something that needs to be expected with a new paradigm.

So if you're not sure what to do yet, feel free to explore your initial implementation a bit further (i.e. still evaluating queued invocations even after disconnect).
I'd just like to see the implications of this in use in an actual application.
Basically: How easy is it to foot-gun yourself with it?

{
std::list<std::function<void()>> movedConnections;
{
std::lock_guard<std::mutex> lock(connectionsMutex);
movedConnections = std::move(connections);
// Reinitialize the connections list
connections = std::list<std::function<void()>>();
}

for (auto &connection : movedConnections) {
connection();
}
}

private:
template<typename...>
friend class Signal;

void addConnection(std::function<void()> connection)
phyBrackets marked this conversation as resolved.
Show resolved Hide resolved
{
connections.push_back(connection);
}

std::list<std::function<void()>> connections;
std::mutex connectionsMutex;
phyBrackets marked this conversation as resolved.
Show resolved Hide resolved
};
} // namespace KDBindings
59 changes: 58 additions & 1 deletion src/kdbindings/signal.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
#include <stdexcept>
#include <type_traits>
#include <utility>
#include <mutex>

#include <kdbindings/connection_evaluator.h>
#include <kdbindings/genindex_array.h>
#include <kdbindings/utils.h>

Expand Down Expand Up @@ -204,6 +206,14 @@ class ConnectionHandle
*
* The Args type parameter pack describe which value types the Signal will emit.
*
* Deferred Connection:
*
* KDBindings::Signal supports deferred connections, enabling the decoupling of signal
* emission from the execution of connected slots. With deferred connections, you can
* connect slots to the Signal that are not immediately executed when the signal is emitted.
* Instead, you can evaluate these deferred connections at a later time, allowing for
* asynchronous or delayed execution of connected slots.
*
* Examples:
* - @ref 01-simple-connection/main.cpp
* - @ref 02-signal-member/main.cpp
Expand Down Expand Up @@ -242,6 +252,27 @@ class Signal
return m_connections.insert({ slot });
}

// Establish a deferred connection between signal and slot, where ConnectionEvaluator object
// used to queue all the connection to evaluate later. The returned
// value can be used to disconnect the function again.
phyBrackets marked this conversation as resolved.
Show resolved Hide resolved
Private::GenerationalIndex connectDeferred(const std::shared_ptr<ConnectionEvaluator> &evaluator, std::function<void(Args...)> const &slot)
{
auto weakEvaluator = std::weak_ptr<ConnectionEvaluator>(evaluator);

auto connection = [weakEvaluator = std::move(weakEvaluator), slot](Args... args) {
// Check if the ConnectionEvaluator is still alive
if (auto evaluatorPtr = weakEvaluator.lock()) {
auto lambda = [slot, args...]() {
slot(args...);
};
evaluatorPtr->addConnection(lambda);
} else {
throw std::runtime_error("ConnectionEvaluator is no longer alive");
}
};
return m_connections.insert({ connection, true });
}

// Disconnects a previously connected function
void disconnect(const Private::GenerationalIndex &id) override
LeonMatthesKDAB marked this conversation as resolved.
Show resolved Hide resolved
{
Expand Down Expand Up @@ -284,6 +315,7 @@ class Signal
// Calls all connected functions
void emit(Args... p) const
{

const auto numEntries = m_connections.entriesSize();

// This loop can tolerate signal handles being disconnected inside a slot,
Expand All @@ -294,15 +326,17 @@ class Signal
if (index) {
const auto con = m_connections.get(*index);

if (!con->blocked)
if (!con->blocked) {
con->slot(p...);
}
}
}
}

private:
struct Connection {
std::function<void(Args...)> slot;
bool isDeferred{ false };
LeonMatthesKDAB marked this conversation as resolved.
Show resolved Hide resolved
bool blocked{ false };
};
mutable Private::GenerationalIndexArray<Connection> m_connections;
Expand Down Expand Up @@ -349,6 +383,29 @@ class Signal
return ConnectionHandle{ m_impl, m_impl->connect(slot) };
}

/**
* @brief Establishes a deferred connection between the provided evaluator and slot.
*
* This function allows connecting an evaluator and a slot such that the slot's execution
* is deferred until the conditions evaluated by the `evaluator` are met.
*
* First argument to the function is reference to a shared pointer to the `ConnectionEvaluator` responsible for determining
* when the slot should be executed.
*
* @return An instance of ConnectionHandle, that can be used to disconnect
* or temporarily block the connection.
*
* @note
* The `KDBindings::Signal` class itself is not thread-safe. While the `ConnectionEvaluator` is inherently
* thread-safe, ensure that any concurrent access to this Signal is protected externally to maintain thread safety.
*/
ConnectionHandle connectDeferred(const std::shared_ptr<ConnectionEvaluator> &evaluator, std::function<void(Args...)> const &slot)
{
ensureImpl();

return ConnectionHandle(m_impl, m_impl->connectDeferred(evaluator, slot));
}

/**
* A template overload of Signal::connect that makes it easier to connect arbitrary functions to this
* Signal.
Expand Down
134 changes: 134 additions & 0 deletions tests/signal/tst_signal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@

#include "kdbindings/utils.h"
#include <kdbindings/signal.h>
#include <kdbindings/connection_evaluator.h>

#include <stdexcept>
#include <string>
#include <thread>

#define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN
#include <doctest.h>
Expand Down Expand Up @@ -77,6 +79,138 @@ TEST_CASE("Signal connections")
REQUIRE(lambdaCalled == true);
}

SUBCASE("Disconnect Deferred Connection")
{
Signal<int> signal;
int val = 4;
auto evaluator = std::make_shared<ConnectionEvaluator>();

auto connection = signal.connectDeferred(evaluator, [&val](int value) {
val += value;
});

REQUIRE(connection.isActive());

signal.emit(4);
REQUIRE(val == 4); // val not changing immediately after emit

connection.disconnect();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I believe it is dangerous to have the previously emitted connection still be evaluated is:

Imagine that val was a int * allocated with int* val = new int;

I would expect this to then work:

connection.disconnect();
delete val;

Then calling evaluateDeferredConnections would still reference val, as it's part of the connected closure.
This access would then be a dangling pointer and most likely SegFault, or write to random memory.

That's why I'm concerned about this behavior, you simply cannot guarantee that if you emitted a signal that references some resources those resources aren't referenced anymore after disconnecting.
Though that's exactly what disconnect is meant for in a non-deferred scenario.
The current behavior needs additional synchronization to ensure that both disconnect and evaluateDeferredConnections are called before any resources are freed.
Especially as those two calls are likely to happen in completely different threads the synchronization might be very difficult.

On the other hand I also get that it may be unintuitive that a signal emission doesn't actually execute a slot if it's not evaluated before it's disconnected.
Maybe @seanharmer or @lemirep have a preference on this, as they're using KDBindings quite extensively afaik.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation! That does make sense. Now assuming there is no trick to extend the lifetime of closure object, seems like we have to safely synchronize it, or basically make sure that evaluation happen before the call to disconnect.

I understand your point here, but I am concerned about how we used to understand signal, slot emit and disconnect. My intution while looking at code that contains structure like connectedDeferred -> emit -> disconnect -> evaluateDeferred , I would like to see it as the classic implementation, means if signal is emitted before disconnection, it should be evaluated , no matter if the call to evaluation is after disconnecting the signal. But sadly that does have the problem with dangling reference. I would also love to see , views from @seanharmer and @lemirep

On the other hand I also get that it may be unintuitive that a signal emission doesn't actually execute a slot if it's not
evaluated before it's disconnected.

REQUIRE(!connection.isActive());

signal.emit(6); // It will not affect the result as the signal is disconnected
REQUIRE(val == 4);

evaluator->evaluateDeferredConnections();
REQUIRE(val == 8);
}

SUBCASE("Multiple Signals with Evaluator")
{
Signal<int> signal1;
Signal<int> signal2;
int val = 4;
auto evaluator = std::make_shared<ConnectionEvaluator>();

std::thread thread1([&] {
signal1.connectDeferred(evaluator, [&val](int value) {
val += value;
});
});

std::thread thread2([&] {
signal2.connectDeferred(evaluator, [&val](int value) {
val += value;
});
});

thread1.join();
thread2.join();

signal1.emit(2);
signal2.emit(3);
REQUIRE(val == 4); // val not changing immediately after emit

evaluator->evaluateDeferredConnections();

REQUIRE(val == 9);
}

SUBCASE("Emit Multiple Signals with Evaluator")
{
Signal<int> signal1;
Signal<int> signal2;
int val1 = 4;
int val2 = 4;
auto evaluator = std::make_shared<ConnectionEvaluator>();

signal1.connectDeferred(evaluator, [&val1](int value) {
val1 += value;
});

signal2.connectDeferred(evaluator, [&val2](int value) {
val2 += value;
});

std::thread thread1([&] {
signal1.emit(2);
});

std::thread thread2([&] {
signal2.emit(3);
});

thread1.join();
thread2.join();

REQUIRE(val1 == 4);
REQUIRE(val2 == 4);

phyBrackets marked this conversation as resolved.
Show resolved Hide resolved
evaluator->evaluateDeferredConnections();

REQUIRE(val1 == 6);
REQUIRE(val2 == 7);
}

SUBCASE("Deferred Connect, Emit, Delete, and Evaluate")
{
Signal<int> signal;
int val = 4;
auto evaluator = std::make_shared<ConnectionEvaluator>();

auto connection = signal.connectDeferred(evaluator, [&val](int value) {
val += value;
});

REQUIRE(connection.isActive());

signal.emit(2);
REQUIRE(val == 4);

connection.disconnect();
evaluator->evaluateDeferredConnections();

REQUIRE(val == 6);
}

SUBCASE("Double Evaluate Deferred Connections")
{
Signal<int> signal;
int val = 4;
auto evaluator = std::make_shared<ConnectionEvaluator>();

signal.connectDeferred(evaluator, [&val](int value) {
val += value;
});

signal.emit(2);
REQUIRE(val == 4);
phyBrackets marked this conversation as resolved.
Show resolved Hide resolved

evaluator->evaluateDeferredConnections();
evaluator->evaluateDeferredConnections();

REQUIRE(val == 6);
}

SUBCASE("A signal with arguments can be connected to a lambda and invoked with l-value args")
{
Signal<std::string, int> signal;
Expand Down