Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Darwin: PlatformManagerImpl improvements #32904

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 73 additions & 13 deletions src/lib/core/Global.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,65 @@

#include <lib/core/CHIPConfig.h>

#include <mutex>
#include <new>

#if CHIP_SYSTEM_CONFIG_USE_DISPATCH
#include <dispatch/dispatch.h>
#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH

namespace chip {
namespace detail {

#if CHIP_CONFIG_GLOBALS_LAZY_INIT

struct NonAtomicOnce
{
bool mInitialized = false;
void call(void (*func)(void *), void * context)
{
if (!mInitialized)
{
mInitialized = true;
func(context);
}
}
};

struct AtomicOnce
{
// dispatch_once (if available) is more efficient than std::call_once because
// it takes advantage of the additional assumption that the dispatch_once_t
// is allocated within a static / global.
#if CHIP_SYSTEM_CONFIG_USE_DISPATCH
dispatch_once_t mOnce = 0;
void call(void (*func)(void *), void * context) { dispatch_once_f(&mOnce, context, func); }
#else // CHIP_SYSTEM_CONFIG_USE_DISPATCH
std::once_flag mOnce;
void call(void (*func)(void *), void * context) { std::call_once(mOnce, func, context); }
#endif
};

#endif // CHIP_CONFIG_GLOBALS_LAZY_INIT

} // namespace detail

/**
* A wrapper for global object that enables initialization and destruction to
* be configured by the platform via `CHIP_CONFIG_GLOBALS_*` options.
*
* The contained object of type T is default constructed, possibly lazily.
*
* This class is generally NOT thread-safe; external synchronization is required.
* Values of this type MUST be globals or static class members.
*
* This class is not thread-safe; external synchronization is required.
* @see AtomicGlobal<T> for a thread-safe variant.
*/
#if CHIP_CONFIG_GLOBALS_LAZY_INIT
template <class T, class OnceStrategy = detail::NonAtomicOnce>
#else // CHIP_CONFIG_GLOBALS_LAZY_INIT
template <class T>
#endif // CHIP_CONFIG_GLOBALS_LAZY_INIT
class Global
{
public:
Expand All @@ -40,6 +86,11 @@ class Global
T & get() { return _get(); }
T * operator->() { return &_get(); }

// Globals are not copyable or movable
Global(const Global &) = delete;
Global(const Global &&) = delete;
Global & operator=(const Global &) = delete;

#if CHIP_CONFIG_GLOBALS_LAZY_INIT
public:
constexpr Global() = default;
Expand All @@ -49,24 +100,22 @@ class Global
// Zero-initialize everything. We should technically leave mStorage uninitialized,
// but that can sometimes cause clang to be unable to constant-initialize the object.
alignas(T) unsigned char mStorage[sizeof(T)] = {};
bool mInitialized = false;

T & _value() { return *reinterpret_cast<T *>(mStorage); }
OnceStrategy mOnce;

T & _get()
{
if (!mInitialized)
{
new (mStorage) T();
mInitialized = true;
T * value = reinterpret_cast<T *>(mStorage);
mOnce.call(&create, value);
return *value;
}
static void create(void * value)
{
new (value) T();
#if !CHIP_CONFIG_GLOBALS_NO_DESTRUCT
CHIP_CXA_ATEXIT(&destroy, this);
CHIP_CXA_ATEXIT(&destroy, value);
#endif // CHIP_CONFIG_GLOBALS_NO_DESTRUCT
}
return _value();
}

static void destroy(void * context) { static_cast<Global<T> *>(context)->_value().~T(); }
static void destroy(void * value) { static_cast<T *>(value)->~T(); }

#else // CHIP_CONFIG_GLOBALS_LAZY_INIT
public:
Expand Down Expand Up @@ -100,4 +149,15 @@ class Global
#endif // CHIP_CONFIG_GLOBALS_LAZY_INIT
};

/**
* A variant of Global<T> that is thread-safe.
*/
template <class T>
using AtomicGlobal =
#if CHIP_CONFIG_GLOBALS_LAZY_INIT
Global<T, detail::AtomicOnce>;
#else // CHIP_CONFIG_GLOBALS_LAZY_INIT
Global<T>; // eager globals are already thread-safe
#endif // CHIP_CONFIG_GLOBALS_LAZY_INIT

} // namespace chip
2 changes: 1 addition & 1 deletion src/lib/dnssd/Discovery_ImplPlatform.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class DiscoveryImplPlatform : public ServiceAdvertiser, public Resolver
uint8_t mCommissionableInstanceName[sizeof(uint64_t)];
OperationalResolveDelegate * mOperationalDelegate = nullptr;

friend class Global<DiscoveryImplPlatform>;
friend Global<DiscoveryImplPlatform>;
static Global<DiscoveryImplPlatform> sManager;
};

Expand Down
2 changes: 1 addition & 1 deletion src/platform/Darwin/DnssdImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class MdnsContexts

private:
MdnsContexts() = default;
friend class Global<MdnsContexts>;
friend Global<MdnsContexts>;
static Global<MdnsContexts> sInstance;

std::vector<GenericContext *> mContexts;
Expand Down
108 changes: 37 additions & 71 deletions src/platform/Darwin/PlatformManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#endif

#include <platform/Darwin/DiagnosticDataProviderImpl.h>
#include <platform/Darwin/PlatformMetricKeys.h>
#include <platform/PlatformManager.h>

// Include the non-inline definitions for the GenericPlatformManagerImpl<> template,
Expand All @@ -39,22 +40,28 @@
#include <CoreFoundation/CoreFoundation.h>
#include <tracing/metric_event.h>

#import "PlatformMetricKeys.h"
using namespace chip::Tracing::DarwinPlatform;

namespace chip {
namespace DeviceLayer {

Global<PlatformManagerImpl> PlatformManagerImpl::sInstance;
AtomicGlobal<PlatformManagerImpl> PlatformManagerImpl::sInstance;

CHIP_ERROR PlatformManagerImpl::_InitChipStack()
PlatformManagerImpl::PlatformManagerImpl() :
mWorkQueue(dispatch_queue_create("org.csa-iot.matter.workqueue",
dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL_WITH_AUTORELEASE_POOL,
QOS_CLASS_USER_INITIATED, QOS_MIN_RELATIVE_PRIORITY)))
{
CHIP_ERROR err;
// Tag our queue for IsWorkQueueCurrentQueue()
dispatch_queue_set_specific(mWorkQueue, this, this, nullptr);
dispatch_suspend(mWorkQueue);
}

CHIP_ERROR PlatformManagerImpl::_InitChipStack()
{
// Initialize the configuration system.
#if !CHIP_DISABLE_PLATFORM_KVS
err = Internal::PosixConfig::Init();
SuccessOrExit(err);
ReturnErrorOnFailure(Internal::PosixConfig::Init());
#endif // CHIP_DISABLE_PLATFORM_KVS

#if !CHIP_SYSTEM_CONFIG_USE_LIBEV
Expand All @@ -64,8 +71,7 @@ CHIP_ERROR PlatformManagerImpl::_InitChipStack()

// Call _InitChipStack() on the generic implementation base class
// to finish the initialization process.
err = Internal::GenericPlatformManagerImpl<PlatformManagerImpl>::_InitChipStack();
SuccessOrExit(err);
ReturnErrorOnFailure(Internal::GenericPlatformManagerImpl<PlatformManagerImpl>::_InitChipStack());

#if !CHIP_DISABLE_PLATFORM_KVS
// Now set up our device instance info provider. We couldn't do that
Expand All @@ -74,53 +80,36 @@ CHIP_ERROR PlatformManagerImpl::_InitChipStack()
#endif // CHIP_DISABLE_PLATFORM_KVS

mStartTime = System::SystemClock().GetMonotonicTimestamp();

exit:
return err;
return CHIP_NO_ERROR;
}

CHIP_ERROR PlatformManagerImpl::_StartEventLoopTask()
{
if (mIsWorkQueueSuspended)
{
mIsWorkQueueSuspended = false;
dispatch_resume(mWorkQueue);
}

auto expected = WorkQueueState::kSuspended;
VerifyOrReturnError(mWorkQueueState.compare_exchange_strong(expected, WorkQueueState::kRunning), CHIP_ERROR_INCORRECT_STATE);
dispatch_resume(mWorkQueue);
return CHIP_NO_ERROR;
};

CHIP_ERROR PlatformManagerImpl::_StopEventLoopTask()
{
if (!mIsWorkQueueSuspended && !mIsWorkQueueSuspensionPending)
{
mIsWorkQueueSuspensionPending = true;
if (!IsWorkQueueCurrentQueue())
{
// dispatch_sync is used in order to guarantee serialization of the caller with
// respect to any tasks that might already be on the queue, or running.
dispatch_sync(mWorkQueue, ^{
dispatch_suspend(mWorkQueue);
});

mIsWorkQueueSuspended = true;
mIsWorkQueueSuspensionPending = false;
}
else
auto expected = WorkQueueState::kRunning;
VerifyOrReturnError(mWorkQueueState.compare_exchange_strong(expected, WorkQueueState::kSuspensionPending),
CHIP_ERROR_INCORRECT_STATE);

// We need to dispatch to the work queue to ensure any currently queued jobs
// finish executing. When called from outside the work queue we also need to
// wait for them to complete before returning to the caller, so we use
// dispatch_sync in that case.
(IsWorkQueueCurrentQueue() ? dispatch_async : dispatch_sync)(mWorkQueue, ^{
dispatch_suspend(mWorkQueue);
mWorkQueueState.store(WorkQueueState::kSuspended);
auto * semaphore = mRunLoopSem;
if (semaphore != nullptr)
{
// We are called from a task running on our work queue. Dispatch async,
// so we don't deadlock ourselves. Note that we do have to dispatch to
// guarantee that we don't signal the semaphore until we have ensured
// that no more tasks will run on the queue.
dispatch_async(mWorkQueue, ^{
dispatch_suspend(mWorkQueue);
mIsWorkQueueSuspended = true;
mIsWorkQueueSuspensionPending = false;
dispatch_semaphore_signal(mRunLoopSem);
});
dispatch_semaphore_signal(semaphore);
}
}

});
return CHIP_NO_ERROR;
}

Expand All @@ -147,47 +136,24 @@ void PlatformManagerImpl::_Shutdown()

CHIP_ERROR PlatformManagerImpl::_PostEvent(const ChipDeviceEvent * event)
{
if (mWorkQueue == nullptr)
{
return CHIP_ERROR_INCORRECT_STATE;
}

const ChipDeviceEvent eventCopy = *event;
dispatch_async(mWorkQueue, ^{
Impl()->DispatchEvent(&eventCopy);
DispatchEvent(&eventCopy);
});
return CHIP_NO_ERROR;
}

#if CHIP_STACK_LOCK_TRACKING_ENABLED
bool PlatformManagerImpl::_IsChipStackLockedByCurrentThread() const
{
// If we have no work queue, or it's suspended, then we assume our caller
// knows what they are doing in terms of their own concurrency.
return !mWorkQueue || mIsWorkQueueSuspended || IsWorkQueueCurrentQueue();
// Assume our caller knows what they are doing in terms of concurrency if the work queue is suspended.
return IsWorkQueueCurrentQueue() || mWorkQueueState.load() == WorkQueueState::kSuspended;
};
#endif

static int sPlatformManagerKey; // We use pointer to this as key.

dispatch_queue_t PlatformManagerImpl::GetWorkQueue()
{
if (mWorkQueue == nullptr)
{
mWorkQueue =
dispatch_queue_create(CHIP_CONTROLLER_QUEUE,
dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL_WITH_AUTORELEASE_POOL,
QOS_CLASS_USER_INITIATED, QOS_MIN_RELATIVE_PRIORITY));
dispatch_suspend(mWorkQueue);
dispatch_queue_set_specific(mWorkQueue, &sPlatformManagerKey, this, nullptr);
mIsWorkQueueSuspended = true;
}
return mWorkQueue;
}

bool PlatformManagerImpl::IsWorkQueueCurrentQueue() const
{
return dispatch_get_specific(&sPlatformManagerKey) == this;
return dispatch_get_specific(this) == this;
}

CHIP_ERROR PlatformManagerImpl::StartBleScan(BleScannerDelegate * delegate)
Expand Down
32 changes: 18 additions & 14 deletions src/platform/Darwin/PlatformManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@
#include <lib/core/Global.h>
#include <platform/internal/GenericPlatformManagerImpl.h>

#include <atomic>
#include <dispatch/dispatch.h>

static constexpr const char * const CHIP_CONTROLLER_QUEUE = "org.csa-iot.matter.framework.controller.workqueue";

namespace chip {
namespace DeviceLayer {

Expand All @@ -47,7 +46,7 @@ class PlatformManagerImpl final : public PlatformManager, public Internal::Gener
public:
// ===== Platform-specific members that may be accessed directly by the application.

dispatch_queue_t GetWorkQueue();
dispatch_queue_t GetWorkQueue() { return mWorkQueue; }
bool IsWorkQueueCurrentQueue() const;

CHIP_ERROR StartBleScan(BleScannerDelegate * delegate = nullptr);
Expand Down Expand Up @@ -80,21 +79,26 @@ class PlatformManagerImpl final : public PlatformManager, public Internal::Gener
friend PlatformManager & PlatformMgr(void);
friend PlatformManagerImpl & PlatformMgrImpl(void);

static Global<PlatformManagerImpl> sInstance;
friend AtomicGlobal<PlatformManagerImpl>;
static AtomicGlobal<PlatformManagerImpl> sInstance;

PlatformManagerImpl();

System::Clock::Timestamp mStartTime = System::Clock::kZero;

dispatch_queue_t mWorkQueue = nullptr;
// Semaphore used to implement blocking behavior in _RunEventLoop.
dispatch_semaphore_t mRunLoopSem;
dispatch_queue_t mWorkQueue;

bool mIsWorkQueueSuspended = false;
// TODO: mIsWorkQueueSuspensionPending might need to be an atomic and use
// atomic ops, if we're worried about calls to StopEventLoopTask() from
// multiple threads racing somehow...
bool mIsWorkQueueSuspensionPending = false;
enum class WorkQueueState
{
kSuspended,
kRunning,
kSuspensionPending,
};

inline ImplClass * Impl() { return static_cast<PlatformManagerImpl *>(this); }
std::atomic<WorkQueueState> mWorkQueueState = WorkQueueState::kSuspended;

// Semaphore used to implement blocking behavior in _RunEventLoop.
dispatch_semaphore_t mRunLoopSem;
};

/**
Expand All @@ -112,7 +116,7 @@ inline PlatformManager & PlatformMgr(void)
* Returns the platform-specific implementation of the PlatformManager singleton object.
*
* chip applications can use this to gain access to features of the PlatformManager
* that are specific to the ESP32 platform.
* that are specific to the platform.
*/
inline PlatformManagerImpl & PlatformMgrImpl(void)
{
Expand Down
Loading
Loading