Skip to content

Commit

Permalink
Run functions on GLib event loop in a sync way
Browse files Browse the repository at this point in the history
  • Loading branch information
arkq committed Mar 31, 2023
1 parent f70f8e5 commit a88f22e
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 140 deletions.
95 changes: 35 additions & 60 deletions src/platform/Linux/PlatformManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,24 @@ PlatformManagerImpl PlatformManagerImpl::sInstance;
namespace {

#if CHIP_DEVICE_CONFIG_WITH_GLIB_MAIN_LOOP

struct GLibMatterContextInvokeData
{
CHIP_ERROR (*mFunc)(void *);
void * mFuncUserData;
CHIP_ERROR mFuncResult;
// Sync primitives to wait for the function to be executed
std::mutex mDoneMutex;
std::condition_variable mDoneCond;
bool mDone = false;
};

void * GLibMainLoopThread(void * loop)
{
g_main_loop_run(static_cast<GMainLoop *>(loop));
return nullptr;
}

#endif

#if CHIP_DEVICE_CONFIG_ENABLE_WIFI
Expand Down Expand Up @@ -193,12 +206,8 @@ CHIP_ERROR PlatformManagerImpl::_InitChipStack()
mGLibMainLoop = g_main_loop_new(nullptr, FALSE);
mGLibMainLoopThread = g_thread_new("gmain-matter", GLibMainLoopThread, mGLibMainLoop);

{
std::unique_lock<std::mutex> lock(mGLibMainLoopCallbackIndirectionMutex);
CallbackIndirection startedInd([](void *) { return G_SOURCE_REMOVE; }, nullptr);
g_idle_add(G_SOURCE_FUNC(&CallbackIndirection::Callback), &startedInd);
startedInd.Wait(lock);
}
// Wait for the GLib main loop to start.
ReturnErrorOnFailure(GLibMatterContextInvokeSync<void>([](void *) { return CHIP_NO_ERROR; }, nullptr));

#endif

Expand Down Expand Up @@ -254,62 +263,28 @@ void PlatformManagerImpl::_Shutdown()
}

#if CHIP_DEVICE_CONFIG_WITH_GLIB_MAIN_LOOP

void PlatformManagerImpl::CallbackIndirection::Wait(std::unique_lock<std::mutex> & lock)
CHIP_ERROR PlatformManagerImpl::_GLibMatterContextInvokeSync(CHIP_ERROR (*func)(void *), void * userData)
{
mDoneCond.wait(lock, [this]() { return mDone; });
}

gboolean PlatformManagerImpl::CallbackIndirection::Callback(CallbackIndirection * self)
{
// We can not access "self" before acquiring the lock, because TSAN will complain that
// there is a race condition between the thread that created the object and the thread
// that is executing the callback.
std::unique_lock<std::mutex> lock(PlatformMgrImpl().mGLibMainLoopCallbackIndirectionMutex);

auto callback = self->mCallback;
auto userData = self->mUserData;

lock.unlock();
auto result = callback(userData);
lock.lock();

self->mDone = true;
self->mDoneCond.notify_all();

return result;
GLibMatterContextInvokeData invokeData{ func, userData };

g_main_context_invoke_full(
g_main_loop_get_context(mGLibMainLoop), G_PRIORITY_HIGH_IDLE,
[](void * userData_) {
auto * data = reinterpret_cast<GLibMatterContextInvokeData *>(userData_);
data->mFuncResult = data->mFunc(data->mFuncUserData);
data->mDoneMutex.lock();
data->mDone = true;
data->mDoneMutex.unlock();
data->mDoneCond.notify_one();
return G_SOURCE_REMOVE;
},
&invokeData, nullptr);

std::unique_lock<std::mutex> lock(invokeData.mDoneMutex);
invokeData.mDoneCond.wait(lock, [&invokeData]() { return invokeData.mDone; });

return invokeData.mFuncResult;
}

#if CHIP_DEVICE_CONFIG_ENABLE_CHIPOBLE
CHIP_ERROR PlatformManagerImpl::RunOnGLibMainLoopThread(GSourceFunc callback, void * userData, bool wait)
{

GMainContext * context = g_main_loop_get_context(mGLibMainLoop);
VerifyOrReturnError(context != nullptr, CHIP_ERROR_INTERNAL,
ChipLogDetail(DeviceLayer, "Failed to get GLib main loop context"));

// If we've been called from the GLib main loop thread itself, there is no reason to wait
// for the callback, as it will be executed immediately by the g_main_context_invoke() call
// below. Using a callback indirection in this case would cause a deadlock.
if (g_main_context_is_owner(context))
{
wait = false;
}

if (wait)
{
std::unique_lock<std::mutex> lock(mGLibMainLoopCallbackIndirectionMutex);
CallbackIndirection indirection(callback, userData);
g_main_context_invoke(context, G_SOURCE_FUNC(&CallbackIndirection::Callback), &indirection);
indirection.Wait(lock);
return CHIP_NO_ERROR;
}

g_main_context_invoke(context, callback, userData);
return CHIP_NO_ERROR;
}
#endif // CHIP_DEVICE_CONFIG_ENABLE_CHIPOBLE

#endif // CHIP_DEVICE_CONFIG_WITH_GLIB_MAIN_LOOP

} // namespace DeviceLayer
Expand Down
53 changes: 17 additions & 36 deletions src/platform/Linux/PlatformManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,22 +57,19 @@ class PlatformManagerImpl final : public PlatformManager, public Internal::Gener
#if CHIP_DEVICE_CONFIG_WITH_GLIB_MAIN_LOOP && CHIP_DEVICE_CONFIG_ENABLE_CHIPOBLE

/**
* @brief Executes a callback in the GLib main loop thread.
* @brief Invoke a function on the Matter GLib context.
*
* @param[in] callback The callback to execute.
* @param[in] userData User data to pass to the callback.
* @param[in] wait If true, the function will block until the callback has been executed.
* @returns CHIP_NO_ERROR if the callback was successfully executed.
*/
CHIP_ERROR RunOnGLibMainLoopThread(GSourceFunc callback, void * userData, bool wait = false);

/**
* @brief Convenience method to require less casts to void pointers.
* If execution of the function will have to be scheduled on other thread,
* this call will block the current thread until the function is executed.
*
* @param[in] function The function to call.
* @param[in] userData User data to pass to the function.
* @returns The result of the function.
*/
template <class T>
CHIP_ERROR ScheduleOnGLibMainLoopThread(gboolean (*callback)(T *), T * userData, bool wait = false)
template <typename T>
CHIP_ERROR GLibMatterContextInvokeSync(CHIP_ERROR (*func)(T *), T * userData)
{
return RunOnGLibMainLoopThread(G_SOURCE_FUNC(callback), userData, wait);
return _GLibMatterContextInvokeSync((CHIP_ERROR(*)(void *)) func, (void *) userData);
}

#endif
Expand All @@ -97,29 +94,13 @@ class PlatformManagerImpl final : public PlatformManager, public Internal::Gener

#if CHIP_DEVICE_CONFIG_WITH_GLIB_MAIN_LOOP

class CallbackIndirection
{
public:
CallbackIndirection(GSourceFunc callback, void * userData) : mCallback(callback), mUserData(userData) {}
void Wait(std::unique_lock<std::mutex> & lock);
static gboolean Callback(CallbackIndirection * self);

private:
GSourceFunc mCallback;
void * mUserData;
// Sync primitives to wait for the callback to be executed.
std::condition_variable mDoneCond;
bool mDone = false;
};

// XXX: Mutex for guarding access to glib main event loop callback indirection
// synchronization primitives. This is a workaround to suppress TSAN warnings.
// TSAN does not know that from the thread synchronization perspective the
// g_source_attach() function should be treated as pthread_create(). Memory
// access to shared data before the call to g_source_attach() without mutex
// is not a race condition - the callback will not be executed on glib main
// event loop thread before the call to g_source_attach().
std::mutex mGLibMainLoopCallbackIndirectionMutex;
/**
* @brief Invoke a function on the Matter GLib context.
*
* @note This function does not provide type safety for the user data. Please,
* use the GLibMatterContextInvokeSync() template function instead.
*/
CHIP_ERROR _GLibMatterContextInvokeSync(CHIP_ERROR (*func)(void *), void * userData);

GMainLoop * mGLibMainLoop;
GThread * mGLibMainLoopThread;
Expand Down
12 changes: 6 additions & 6 deletions src/platform/Linux/bluez/ChipDeviceScanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ CHIP_ERROR ChipDeviceScanner::StartScan(System::Clock::Timeout timeout)
ReturnErrorCodeIf(mIsScanning, CHIP_ERROR_INCORRECT_STATE);

mIsScanning = true; // optimistic, to allow all callbacks to check this
if (PlatformMgrImpl().ScheduleOnGLibMainLoopThread(MainLoopStartScan, this, true) != CHIP_NO_ERROR)
if (PlatformMgrImpl().GLibMatterContextInvokeSync(MainLoopStartScan, this) != CHIP_NO_ERROR)
{
ChipLogError(Ble, "Failed to schedule BLE scan start.");
mIsScanning = false;
Expand Down Expand Up @@ -174,7 +174,7 @@ CHIP_ERROR ChipDeviceScanner::StopScan()
mInterfaceChangedSignal = 0;
}

if (PlatformMgrImpl().ScheduleOnGLibMainLoopThread(MainLoopStopScan, this, true) != CHIP_NO_ERROR)
if (PlatformMgrImpl().GLibMatterContextInvokeSync(MainLoopStopScan, this) != CHIP_NO_ERROR)
{
ChipLogError(Ble, "Failed to schedule BLE scan stop.");
return CHIP_ERROR_INTERNAL;
Expand All @@ -183,7 +183,7 @@ CHIP_ERROR ChipDeviceScanner::StopScan()
return CHIP_NO_ERROR;
}

int ChipDeviceScanner::MainLoopStopScan(ChipDeviceScanner * self)
CHIP_ERROR ChipDeviceScanner::MainLoopStopScan(ChipDeviceScanner * self)
{
GError * error = nullptr;

Expand All @@ -199,7 +199,7 @@ int ChipDeviceScanner::MainLoopStopScan(ChipDeviceScanner * self)
// references to 'self' here)
delegate->OnScanComplete();

return 0;
return CHIP_NO_ERROR;
}

void ChipDeviceScanner::SignalObjectAdded(GDBusObjectManager * manager, GDBusObject * object, ChipDeviceScanner * self)
Expand Down Expand Up @@ -266,7 +266,7 @@ void ChipDeviceScanner::RemoveDevice(BluezDevice1 * device)
}
}

int ChipDeviceScanner::MainLoopStartScan(ChipDeviceScanner * self)
CHIP_ERROR ChipDeviceScanner::MainLoopStartScan(ChipDeviceScanner * self)
{
GError * error = nullptr;

Expand Down Expand Up @@ -308,7 +308,7 @@ int ChipDeviceScanner::MainLoopStartScan(ChipDeviceScanner * self)
self->mDelegate->OnScanComplete();
}

return 0;
return CHIP_NO_ERROR;
}

} // namespace Internal
Expand Down
4 changes: 2 additions & 2 deletions src/platform/Linux/bluez/ChipDeviceScanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ class ChipDeviceScanner

private:
static void TimerExpiredCallback(chip::System::Layer * layer, void * appState);
static int MainLoopStartScan(ChipDeviceScanner * self);
static int MainLoopStopScan(ChipDeviceScanner * self);
static CHIP_ERROR MainLoopStartScan(ChipDeviceScanner * self);
static CHIP_ERROR MainLoopStopScan(ChipDeviceScanner * self);
static void SignalObjectAdded(GDBusObjectManager * manager, GDBusObject * object, ChipDeviceScanner * self);
static void SignalInterfaceChanged(GDBusObjectManagerClient * manager, GDBusObjectProxy * object, GDBusProxy * aInterface,
GVariant * aChangedProperties, const gchar * const * aInvalidatedProps,
Expand Down
Loading

0 comments on commit a88f22e

Please sign in to comment.