Skip to content

Commit

Permalink
Followups from #6561 Generalize socket-based event loop (#7799)
Browse files Browse the repository at this point in the history
* #### Problem

Followups from #6561 Generalize socket-based event loop

#### Change overview

* mDNS: Convert `platform/Linux/MdnsImpl`; remove Darwin stubs.
* Change `mCallbackData` type to `intptr_t` (review feedback).
* Comment for `System::WakeEvent` (this is minimal since it's
  likely to change soon for issue #7725).

Fixes #7758 _Convert MDNS to WatchableSocket._

#### Testing

Manual sanity check of mDNS using chip-device-ctrl on Linux.
Otherwise, no change to functionality is intended.

* review

* review
  • Loading branch information
kpschoedel authored Jun 23, 2021
1 parent 67ea9ed commit 9692e9a
Show file tree
Hide file tree
Showing 11 changed files with 161 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@

// Include the non-inline definitions for the GenericPlatformManagerImpl<> template,
// from which the GenericPlatformManagerImpl_POSIX<> template inherits.
#if CHIP_DEVICE_CONFIG_ENABLE_MDNS
#include "lib/mdns/platform/Mdns.h"
#endif
#include <platform/internal/GenericPlatformManagerImpl.cpp>

#include <system/SystemError.h>
Expand Down
4 changes: 2 additions & 2 deletions src/inet/RawEndPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ INET_ERROR RawEndPoint::Listen(IPEndPointBasis::OnMessageReceivedFunct onMessage

#if CHIP_SYSTEM_CONFIG_USE_SOCKETS
// Wait for ability to read on this endpoint.
mSocket.SetCallback(HandlePendingIO, this);
mSocket.SetCallback(HandlePendingIO, reinterpret_cast<intptr_t>(this));
mSocket.RequestCallbackOnPendingRead();
#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS

Expand Down Expand Up @@ -1026,7 +1026,7 @@ INET_ERROR RawEndPoint::GetSocket(IPAddressType aAddressType)
// static
void RawEndPoint::HandlePendingIO(System::WatchableSocket & socket)
{
static_cast<RawEndPoint *>(socket.GetCallbackData())->HandlePendingIO();
reinterpret_cast<RawEndPoint *>(socket.GetCallbackData())->HandlePendingIO();
}

void RawEndPoint::HandlePendingIO()
Expand Down
8 changes: 4 additions & 4 deletions src/inet/TCPEndPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ INET_ERROR TCPEndPoint::Listen(uint16_t backlog)
res = chip::System::MapErrorPOSIX(errno);

// Wait for ability to read on this endpoint.
mSocket.SetCallback(HandlePendingIO, this);
mSocket.SetCallback(HandlePendingIO, reinterpret_cast<intptr_t>(this));
mSocket.RequestCallbackOnPendingRead();

#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS
Expand Down Expand Up @@ -507,7 +507,7 @@ INET_ERROR TCPEndPoint::Connect(const IPAddress & addr, uint16_t port, Interface
return res;
}

mSocket.SetCallback(HandlePendingIO, this);
mSocket.SetCallback(HandlePendingIO, reinterpret_cast<intptr_t>(this));

// Once Connecting or Connected, bump the reference count. The corresponding Release()
// [or on LwIP, DeferredRelease()] will happen in DoClose().
Expand Down Expand Up @@ -2440,7 +2440,7 @@ INET_ERROR TCPEndPoint::GetSocket(IPAddressType addrType)
// static
void TCPEndPoint::HandlePendingIO(System::WatchableSocket & socket)
{
static_cast<TCPEndPoint *>(socket.GetCallbackData())->HandlePendingIO();
reinterpret_cast<TCPEndPoint *>(socket.GetCallbackData())->HandlePendingIO();
}

void TCPEndPoint::HandlePendingIO()
Expand Down Expand Up @@ -2693,7 +2693,7 @@ void TCPEndPoint::HandleIncomingConnection()
conEP->Retain();

// Wait for ability to read on this endpoint.
conEP->mSocket.SetCallback(HandlePendingIO, conEP);
conEP->mSocket.SetCallback(HandlePendingIO, reinterpret_cast<intptr_t>(conEP));
conEP->mSocket.RequestCallbackOnPendingRead();

// Call the app's callback function.
Expand Down
4 changes: 2 additions & 2 deletions src/inet/UDPEndPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ INET_ERROR UDPEndPoint::Listen(OnMessageReceivedFunct onMessageReceived, OnRecei

#if CHIP_SYSTEM_CONFIG_USE_SOCKETS
// Wait for ability to read on this endpoint.
mSocket.SetCallback(HandlePendingIO, this);
mSocket.SetCallback(HandlePendingIO, reinterpret_cast<intptr_t>(this));
mSocket.RequestCallbackOnPendingRead();
#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS

Expand Down Expand Up @@ -909,7 +909,7 @@ INET_ERROR UDPEndPoint::GetSocket(IPAddressType aAddressType)
// static
void UDPEndPoint::HandlePendingIO(System::WatchableSocket & socket)
{
static_cast<UDPEndPoint *>(socket.GetCallbackData())->HandlePendingIO();
reinterpret_cast<UDPEndPoint *>(socket.GetCallbackData())->HandlePendingIO();
}

void UDPEndPoint::HandlePendingIO()
Expand Down
4 changes: 2 additions & 2 deletions src/platform/Darwin/MdnsImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -544,9 +544,9 @@ CHIP_ERROR ChipMdnsResolve(MdnsService * service, chip::Inet::InterfaceId interf
return Resolve(context, callback, interfaceId, service->mAddressType, regtype.c_str(), service->mName);
}

void UpdateMdnsDataset(fd_set & readFdSet, fd_set & writeFdSet, fd_set & errorFdSet, int & maxFd, timeval & timeout) {}
void GetMdnsTimeout(timeval & timeout) {}

void ProcessMdns(fd_set & readFdSet, fd_set & writeFdSet, fd_set & errorFdSet) {}
void HandleMdnsTimeout() {}

} // namespace Mdns
} // namespace chip
116 changes: 51 additions & 65 deletions src/platform/Linux/MdnsImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@

#include <netinet/in.h>

#include <platform/internal/CHIPDeviceLayerInternal.h>
#include <support/CHIPMem.h>
#include <support/CHIPMemString.h>
#include <support/CodeUtils.h>

using chip::Mdns::kMdnsTypeMaxSize;
using chip::Mdns::MdnsServiceProtocol;
using chip::Mdns::TextEntry;
using chip::System::SocketEvents;
using std::chrono::duration_cast;
using std::chrono::microseconds;
using std::chrono::seconds;
Expand Down Expand Up @@ -79,6 +81,19 @@ chip::Inet::IPAddressType ToAddressType(AvahiProtocol protocol)
return type;
}

AvahiWatchEvent ToAvahiWatchEvent(SocketEvents events)
{
return static_cast<AvahiWatchEvent>((events.Has(chip::System::SocketEventFlags::kRead) ? AVAHI_WATCH_IN : 0) |
(events.Has(chip::System::SocketEventFlags::kWrite) ? AVAHI_WATCH_OUT : 0) |
(events.Has(chip::System::SocketEventFlags::kError) ? AVAHI_WATCH_ERR : 0));
}

void AvahiWatchCallbackTrampoline(chip::System::WatchableSocket & socket)
{
AvahiWatch * const watch = reinterpret_cast<AvahiWatch *>(socket.GetCallbackData());
watch->mCallback(watch, socket.GetFD(), ToAvahiWatchEvent(socket.GetPendingEvents()), watch->mContext);
}

CHIP_ERROR MakeAvahiStringListFromTextEntries(TextEntry * entries, size_t size, AvahiStringList ** strListOut)
{
*strListOut = avahi_string_list_new(nullptr, nullptr);
Expand Down Expand Up @@ -133,6 +148,8 @@ Poller::Poller()
mAvahiPoller.timeout_new = TimeoutNew;
mAvahiPoller.timeout_update = TimeoutUpdate;
mAvahiPoller.timeout_free = TimeoutFree;

mWatchableEvents = &DeviceLayer::SystemLayer.WatchableEvents();
}

AvahiWatch * Poller::WatchNew(const struct AvahiPoll * poller, int fd, AvahiWatchEvent event, AvahiWatchCallback callback,
Expand All @@ -145,19 +162,43 @@ AvahiWatch * Poller::WatchNew(int fd, AvahiWatchEvent event, AvahiWatchCallback
{
VerifyOrDie(callback != nullptr && fd >= 0);

mWatches.emplace_back(new AvahiWatch{ fd, event, 0, callback, context, this });
auto watch = std::make_unique<AvahiWatch>();
watch->mSocket.Init(*mWatchableEvents)
.Attach(fd)
.SetCallback(AvahiWatchCallbackTrampoline, reinterpret_cast<intptr_t>(watch.get()))
.RequestCallbackOnPendingRead(event & AVAHI_WATCH_IN)
.RequestCallbackOnPendingWrite(event & AVAHI_WATCH_OUT);
watch->mCallback = callback;
watch->mContext = context;
watch->mPoller = this;
mWatches.emplace_back(std::move(watch));

return mWatches.back().get();
}

void Poller::WatchUpdate(AvahiWatch * watch, AvahiWatchEvent event)
{
watch->mWatchEvents = event;
if (event & AVAHI_WATCH_IN)
{
watch->mSocket.RequestCallbackOnPendingRead();
}
else
{
watch->mSocket.ClearCallbackOnPendingRead();
}
if (event & AVAHI_WATCH_OUT)
{
watch->mSocket.RequestCallbackOnPendingWrite();
}
else
{
watch->mSocket.ClearCallbackOnPendingWrite();
}
}

AvahiWatchEvent Poller::WatchGetEvents(AvahiWatch * watch)
{
return static_cast<AvahiWatchEvent>(watch->mHappenedEvents);
return ToAvahiWatchEvent(watch->mSocket.GetPendingEvents());
}

void Poller::WatchFree(AvahiWatch * watch)
Expand All @@ -167,6 +208,7 @@ void Poller::WatchFree(AvahiWatch * watch)

void Poller::WatchFree(AvahiWatch & watch)
{
(void) watch.mSocket.ReleaseFD();
mWatches.erase(std::remove_if(mWatches.begin(), mWatches.end(),
[&watch](const std::unique_ptr<AvahiWatch> & aValue) { return aValue.get() == &watch; }),
mWatches.end());
Expand Down Expand Up @@ -226,38 +268,10 @@ void Poller::TimeoutFree(AvahiTimeout & timer)
mTimers.end());
}

void Poller::UpdateFdSet(fd_set & readFdSet, fd_set & writeFdSet, fd_set & errorFdSet, int & aMaxFd, timeval & timeout)
void Poller::GetTimeout(timeval & timeout)
{
microseconds timeoutVal = seconds(timeout.tv_sec) + microseconds(timeout.tv_usec);

for (auto && watch : mWatches)
{
int fd = watch->mFd;
AvahiWatchEvent events = watch->mWatchEvents;

if (AVAHI_WATCH_IN & events)
{
FD_SET(fd, &readFdSet);
}

if (AVAHI_WATCH_OUT & events)
{
FD_SET(fd, &writeFdSet);
}

if (AVAHI_WATCH_ERR & events)
{
FD_SET(fd, &errorFdSet);
}

if (aMaxFd < fd)
{
aMaxFd = fd;
}

watch->mHappenedEvents = 0;
}

for (auto && timer : mTimers)
{
steady_clock::time_point absTimeout = timer->mAbsTimeout;
Expand All @@ -282,38 +296,10 @@ void Poller::UpdateFdSet(fd_set & readFdSet, fd_set & writeFdSet, fd_set & error
timeout.tv_usec = static_cast<uint64_t>(timeoutVal.count()) % kUsPerSec;
}

void Poller::Process(const fd_set & readFdSet, const fd_set & writeFdSet, const fd_set & errorFdSet)
void Poller::HandleTimeout()
{
steady_clock::time_point now = steady_clock::now();

for (auto && watch : mWatches)
{
int fd = watch->mFd;
AvahiWatchEvent events = watch->mWatchEvents;

watch->mHappenedEvents = 0;

if ((AVAHI_WATCH_IN & events) && FD_ISSET(fd, &readFdSet))
{
watch->mHappenedEvents |= AVAHI_WATCH_IN;
}

if ((AVAHI_WATCH_OUT & events) && FD_ISSET(fd, &writeFdSet))
{
watch->mHappenedEvents |= AVAHI_WATCH_OUT;
}

if ((AVAHI_WATCH_ERR & events) && FD_ISSET(fd, &errorFdSet))
{
watch->mHappenedEvents |= AVAHI_WATCH_ERR;
}

if (watch->mHappenedEvents)
{
watch->mCallback(watch.get(), watch->mFd, static_cast<AvahiWatchEvent>(watch->mHappenedEvents), watch->mContext);
}
}

for (auto && timer : mTimers)
{
if (!timer->mEnabled)
Expand Down Expand Up @@ -753,14 +739,14 @@ MdnsAvahi::~MdnsAvahi()
}
}

void UpdateMdnsDataset(fd_set & readFdSet, fd_set & writeFdSet, fd_set & errorFdSet, int & maxFd, timeval & timeout)
void GetMdnsTimeout(timeval & timeout)
{
MdnsAvahi::GetInstance().GetPoller().UpdateFdSet(readFdSet, writeFdSet, errorFdSet, maxFd, timeout);
MdnsAvahi::GetInstance().GetPoller().GetTimeout(timeout);
}

void ProcessMdns(fd_set & readFdSet, fd_set & writeFdSet, fd_set & errorFdSet)
void HandleMdnsTimeout()
{
MdnsAvahi::GetInstance().GetPoller().Process(readFdSet, writeFdSet, errorFdSet);
MdnsAvahi::GetInstance().GetPoller().HandleTimeout();
}

CHIP_ERROR ChipMdnsInit(MdnsAsyncReturnCallback initCallback, MdnsAsyncReturnCallback errorCallback, void * context)
Expand Down
11 changes: 5 additions & 6 deletions src/platform/Linux/MdnsImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,11 @@
#include <avahi-common/watch.h>

#include "lib/mdns/platform/Mdns.h"
#include "system/SystemSockets.h"

struct AvahiWatch
{
int mFd; ///< The file descriptor to watch.
AvahiWatchEvent mWatchEvents; ///< The interested events.
int mHappenedEvents; ///< The events happened.
chip::System::WatchableSocket mSocket;
AvahiWatchCallback mCallback; ///< The function to be called when interested events happened on mFd.
void * mContext; ///< A pointer to application-specific context.
void * mPoller; ///< The poller created this watch.
Expand All @@ -62,9 +61,8 @@ class Poller
public:
Poller(void);

void UpdateFdSet(fd_set & readFdSet, fd_set & writeFdSet, fd_set & errorFdSet, int & maxFd, timeval & timeout);

void Process(const fd_set & readFdSet, const fd_set & writeFdSet, const fd_set & errorFdSet);
void GetTimeout(timeval & timeout);
void HandleTimeout();

const AvahiPoll * GetAvahiPoll(void) const { return &mAvahiPoller; }

Expand Down Expand Up @@ -92,6 +90,7 @@ class Poller
std::vector<std::unique_ptr<AvahiWatch>> mWatches;
std::vector<std::unique_ptr<AvahiTimeout>> mTimers;
AvahiPoll mAvahiPoller;
System::WatchableEventManager * mWatchableEvents;
};

class MdnsAvahi
Expand Down
4 changes: 2 additions & 2 deletions src/system/SystemSockets.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ Error WakeEvent::Open(WatchableEventManager & watchState)

mFD.Init(watchState);
mFD.Attach(fds[FD_READ]);
mFD.SetCallback(Confirm, this);
mFD.SetCallback(Confirm, reinterpret_cast<intptr_t>(this));
mFD.RequestCallbackOnPendingRead();

mWriteFD = fds[FD_WRITE];
Expand Down Expand Up @@ -135,7 +135,7 @@ Error WakeEvent::Open(WatchableEventManager & watchState)
}

mFD.Attach(fd);
mFD.SetCallback(Confirm, this);
mFD.SetCallback(Confirm, reinterpret_cast<intptr_t>(this));
mFD.RequestCallbackOnPendingRead();

return CHIP_SYSTEM_NO_ERROR;
Expand Down
Loading

0 comments on commit 9692e9a

Please sign in to comment.