Skip to content

Commit

Permalink
[Linux] Run functions on GLib event loop in a sync way (#25916)
Browse files Browse the repository at this point in the history
* Run functions on GLib event loop in a sync way

* Include invoke sync template when GLib is enabled

* Restore workaround for TSAN false positives

* Fix variable shadowing

* Do not use g_main_context_invoke_full for waiting for loop to start

The g_main_context_invoke_full() function checks whether the context, on
which it's supposed to run callback function, is acquired. Otherwise, it
runs given callback on the current thread. In our case this may lead to
incorrect GLib signals bindings, so we have to use g_source_attach()
when waiting for main loop.

* Release context object after joining glib thread

TSAN can report false-positive on context unref. By firstly joining
thread and later unreferencing context we will prevent such warning.
  • Loading branch information
arkq authored and pull[bot] committed Oct 2, 2023
1 parent b9109ea commit bd91551
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 116 deletions.
98 changes: 50 additions & 48 deletions src/platform/Linux/PlatformManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,28 @@ CHIP_ERROR PlatformManagerImpl::_InitChipStack()
mGLibMainLoopThread = g_thread_new("gmain-matter", GLibMainLoopThread, mGLibMainLoop);

{
// Wait for the GLib main loop to start. It is required that the context used
// by the main loop is acquired before any other GLib functions are called. Otherwise,
// the GLibMatterContextInvokeSync() might run functions on the wrong thread.

std::unique_lock<std::mutex> lock(mGLibMainLoopCallbackIndirectionMutex);
CallbackIndirection startedInd([](void *) { return G_SOURCE_REMOVE; }, nullptr);
g_idle_add(G_SOURCE_FUNC(&CallbackIndirection::Callback), &startedInd);
startedInd.Wait(lock);
GLibMatterContextInvokeData invokeData{};

auto * idleSource = g_idle_source_new();
g_source_set_callback(
idleSource,
[](void * userData_) {
auto * data = reinterpret_cast<GLibMatterContextInvokeData *>(userData_);
std::unique_lock<std::mutex> lock_(PlatformMgrImpl().mGLibMainLoopCallbackIndirectionMutex);
data->mDone = true;
data->mDoneCond.notify_one();
return G_SOURCE_REMOVE;
},
&invokeData, nullptr);
g_source_attach(idleSource, g_main_loop_get_context(mGLibMainLoop));
g_source_unref(idleSource);

invokeData.mDoneCond.wait(lock, [&invokeData]() { return invokeData.mDone; });
}

#endif
Expand Down Expand Up @@ -248,68 +266,52 @@ void PlatformManagerImpl::_Shutdown()

#if CHIP_DEVICE_CONFIG_WITH_GLIB_MAIN_LOOP
g_main_loop_quit(mGLibMainLoop);
g_main_loop_unref(mGLibMainLoop);
g_thread_join(mGLibMainLoopThread);
g_main_loop_unref(mGLibMainLoop);
#endif
}

#if CHIP_DEVICE_CONFIG_WITH_GLIB_MAIN_LOOP

void PlatformManagerImpl::CallbackIndirection::Wait(std::unique_lock<std::mutex> & lock)
{
mDoneCond.wait(lock, [this]() { return mDone; });
}

gboolean PlatformManagerImpl::CallbackIndirection::Callback(CallbackIndirection * self)
CHIP_ERROR PlatformManagerImpl::_GLibMatterContextInvokeSync(CHIP_ERROR (*func)(void *), void * userData)
{
// We can not access "self" before acquiring the lock, because TSAN will complain that
// there is a race condition between the thread that created the object and the thread
// that is executing the callback.
std::unique_lock<std::mutex> lock(PlatformMgrImpl().mGLibMainLoopCallbackIndirectionMutex);
// Because of TSAN false positives, we need to use a mutex to synchronize access to all members of
// the GLibMatterContextInvokeData object (including constructor and destructor). This is a temporary
// workaround until TSAN-enabled GLib will be used in our CI.
std::unique_lock<std::mutex> lock(mGLibMainLoopCallbackIndirectionMutex);

auto callback = self->mCallback;
auto userData = self->mUserData;
GLibMatterContextInvokeData invokeData{ func, userData };

lock.unlock();
auto result = callback(userData);
lock.lock();

self->mDone = true;
self->mDoneCond.notify_all();
g_main_context_invoke_full(
g_main_loop_get_context(mGLibMainLoop), G_PRIORITY_HIGH_IDLE,
[](void * userData_) {
auto * data = reinterpret_cast<GLibMatterContextInvokeData *>(userData_);

return result;
}
// XXX: Temporary workaround for TSAN false positives.
std::unique_lock<std::mutex> lock_(PlatformMgrImpl().mGLibMainLoopCallbackIndirectionMutex);

#if CHIP_DEVICE_CONFIG_ENABLE_CHIPOBLE
CHIP_ERROR PlatformManagerImpl::RunOnGLibMainLoopThread(GSourceFunc callback, void * userData, bool wait)
{
auto mFunc = data->mFunc;
auto mUserData = data->mFuncUserData;

GMainContext * context = g_main_loop_get_context(mGLibMainLoop);
VerifyOrReturnError(context != nullptr, CHIP_ERROR_INTERNAL,
ChipLogDetail(DeviceLayer, "Failed to get GLib main loop context"));
lock_.unlock();
auto result = mFunc(mUserData);
lock_.lock();

// If we've been called from the GLib main loop thread itself, there is no reason to wait
// for the callback, as it will be executed immediately by the g_main_context_invoke() call
// below. Using a callback indirection in this case would cause a deadlock.
if (g_main_context_is_owner(context))
{
wait = false;
}
data->mDone = true;
data->mFuncResult = result;
data->mDoneCond.notify_one();

if (wait)
{
std::unique_lock<std::mutex> lock(mGLibMainLoopCallbackIndirectionMutex);
CallbackIndirection indirection(callback, userData);
g_main_context_invoke(context, G_SOURCE_FUNC(&CallbackIndirection::Callback), &indirection);
indirection.Wait(lock);
return CHIP_NO_ERROR;
}
return G_SOURCE_REMOVE;
},
&invokeData, nullptr);

g_main_context_invoke(context, callback, userData);
return CHIP_NO_ERROR;
}
#endif // CHIP_DEVICE_CONFIG_ENABLE_CHIPOBLE
lock.lock();

invokeData.mDoneCond.wait(lock, [&invokeData]() { return invokeData.mDone; });

return invokeData.mFuncResult;
}
#endif // CHIP_DEVICE_CONFIG_WITH_GLIB_MAIN_LOOP

} // namespace DeviceLayer
Expand Down
48 changes: 24 additions & 24 deletions src/platform/Linux/PlatformManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,25 +54,22 @@ class PlatformManagerImpl final : public PlatformManager, public Internal::Gener
public:
// ===== Platform-specific members that may be accessed directly by the application.

#if CHIP_DEVICE_CONFIG_WITH_GLIB_MAIN_LOOP && CHIP_DEVICE_CONFIG_ENABLE_CHIPOBLE
#if CHIP_DEVICE_CONFIG_WITH_GLIB_MAIN_LOOP

/**
* @brief Executes a callback in the GLib main loop thread.
* @brief Invoke a function on the Matter GLib context.
*
* @param[in] callback The callback to execute.
* @param[in] userData User data to pass to the callback.
* @param[in] wait If true, the function will block until the callback has been executed.
* @returns CHIP_NO_ERROR if the callback was successfully executed.
*/
CHIP_ERROR RunOnGLibMainLoopThread(GSourceFunc callback, void * userData, bool wait = false);

/**
* @brief Convenience method to require less casts to void pointers.
* If execution of the function will have to be scheduled on other thread,
* this call will block the current thread until the function is executed.
*
* @param[in] function The function to call.
* @param[in] userData User data to pass to the function.
* @returns The result of the function.
*/
template <class T>
CHIP_ERROR ScheduleOnGLibMainLoopThread(gboolean (*callback)(T *), T * userData, bool wait = false)
template <typename T>
CHIP_ERROR GLibMatterContextInvokeSync(CHIP_ERROR (*func)(T *), T * userData)
{
return RunOnGLibMainLoopThread(G_SOURCE_FUNC(callback), userData, wait);
return _GLibMatterContextInvokeSync((CHIP_ERROR(*)(void *)) func, (void *) userData);
}

#endif
Expand All @@ -97,21 +94,24 @@ class PlatformManagerImpl final : public PlatformManager, public Internal::Gener

#if CHIP_DEVICE_CONFIG_WITH_GLIB_MAIN_LOOP

class CallbackIndirection
struct GLibMatterContextInvokeData
{
public:
CallbackIndirection(GSourceFunc callback, void * userData) : mCallback(callback), mUserData(userData) {}
void Wait(std::unique_lock<std::mutex> & lock);
static gboolean Callback(CallbackIndirection * self);

private:
GSourceFunc mCallback;
void * mUserData;
// Sync primitives to wait for the callback to be executed.
CHIP_ERROR (*mFunc)(void *);
void * mFuncUserData;
CHIP_ERROR mFuncResult;
// Sync primitives to wait for the function to be executed
std::condition_variable mDoneCond;
bool mDone = false;
};

/**
* @brief Invoke a function on the Matter GLib context.
*
* @note This function does not provide type safety for the user data. Please,
* use the GLibMatterContextInvokeSync() template function instead.
*/
CHIP_ERROR _GLibMatterContextInvokeSync(CHIP_ERROR (*func)(void *), void * userData);

// XXX: Mutex for guarding access to glib main event loop callback indirection
// synchronization primitives. This is a workaround to suppress TSAN warnings.
// TSAN does not know that from the thread synchronization perspective the
Expand Down
12 changes: 6 additions & 6 deletions src/platform/Linux/bluez/ChipDeviceScanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ CHIP_ERROR ChipDeviceScanner::StartScan(System::Clock::Timeout timeout)
ReturnErrorCodeIf(mIsScanning, CHIP_ERROR_INCORRECT_STATE);

mIsScanning = true; // optimistic, to allow all callbacks to check this
if (PlatformMgrImpl().ScheduleOnGLibMainLoopThread(MainLoopStartScan, this, true) != CHIP_NO_ERROR)
if (PlatformMgrImpl().GLibMatterContextInvokeSync(MainLoopStartScan, this) != CHIP_NO_ERROR)
{
ChipLogError(Ble, "Failed to schedule BLE scan start.");
mIsScanning = false;
Expand Down Expand Up @@ -174,7 +174,7 @@ CHIP_ERROR ChipDeviceScanner::StopScan()
mInterfaceChangedSignal = 0;
}

if (PlatformMgrImpl().ScheduleOnGLibMainLoopThread(MainLoopStopScan, this, true) != CHIP_NO_ERROR)
if (PlatformMgrImpl().GLibMatterContextInvokeSync(MainLoopStopScan, this) != CHIP_NO_ERROR)
{
ChipLogError(Ble, "Failed to schedule BLE scan stop.");
return CHIP_ERROR_INTERNAL;
Expand All @@ -183,7 +183,7 @@ CHIP_ERROR ChipDeviceScanner::StopScan()
return CHIP_NO_ERROR;
}

int ChipDeviceScanner::MainLoopStopScan(ChipDeviceScanner * self)
CHIP_ERROR ChipDeviceScanner::MainLoopStopScan(ChipDeviceScanner * self)
{
GError * error = nullptr;

Expand All @@ -199,7 +199,7 @@ int ChipDeviceScanner::MainLoopStopScan(ChipDeviceScanner * self)
// references to 'self' here)
delegate->OnScanComplete();

return 0;
return CHIP_NO_ERROR;
}

void ChipDeviceScanner::SignalObjectAdded(GDBusObjectManager * manager, GDBusObject * object, ChipDeviceScanner * self)
Expand Down Expand Up @@ -266,7 +266,7 @@ void ChipDeviceScanner::RemoveDevice(BluezDevice1 * device)
}
}

int ChipDeviceScanner::MainLoopStartScan(ChipDeviceScanner * self)
CHIP_ERROR ChipDeviceScanner::MainLoopStartScan(ChipDeviceScanner * self)
{
GError * error = nullptr;

Expand Down Expand Up @@ -308,7 +308,7 @@ int ChipDeviceScanner::MainLoopStartScan(ChipDeviceScanner * self)
self->mDelegate->OnScanComplete();
}

return 0;
return CHIP_NO_ERROR;
}

} // namespace Internal
Expand Down
4 changes: 2 additions & 2 deletions src/platform/Linux/bluez/ChipDeviceScanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ class ChipDeviceScanner

private:
static void TimerExpiredCallback(chip::System::Layer * layer, void * appState);
static int MainLoopStartScan(ChipDeviceScanner * self);
static int MainLoopStopScan(ChipDeviceScanner * self);
static CHIP_ERROR MainLoopStartScan(ChipDeviceScanner * self);
static CHIP_ERROR MainLoopStopScan(ChipDeviceScanner * self);
static void SignalObjectAdded(GDBusObjectManager * manager, GDBusObject * object, ChipDeviceScanner * self);
static void SignalInterfaceChanged(GDBusObjectManagerClient * manager, GDBusObjectProxy * object, GDBusProxy * aInterface,
GVariant * aChangedProperties, const gchar * const * aInvalidatedProps,
Expand Down
Loading

0 comments on commit bd91551

Please sign in to comment.