Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Followups from #6561 Generalize socket-based event loop #7799

Merged
merged 3 commits into from
Jun 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@

// Include the non-inline definitions for the GenericPlatformManagerImpl<> template,
// from which the GenericPlatformManagerImpl_POSIX<> template inherits.
#if CHIP_DEVICE_CONFIG_ENABLE_MDNS
#include "lib/mdns/platform/Mdns.h"
#endif
#include <platform/internal/GenericPlatformManagerImpl.cpp>

#include <system/SystemError.h>
Expand Down
4 changes: 2 additions & 2 deletions src/inet/RawEndPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ INET_ERROR RawEndPoint::Listen(IPEndPointBasis::OnMessageReceivedFunct onMessage

#if CHIP_SYSTEM_CONFIG_USE_SOCKETS
// Wait for ability to read on this endpoint.
mSocket.SetCallback(HandlePendingIO, this);
mSocket.SetCallback(HandlePendingIO, reinterpret_cast<intptr_t>(this));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This requires a more casts and stronger casts, I wonder if it is worth it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a request in the previous review, so you and Boris can fight it out :-)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was a suggestion; Boris said he was OK either way. Above was just a question. Ultimately I think it is up to you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, given what this diff looks like and that we have no non-pointer use cases for this closure, we should probably leave it as void* for now. If we ever end up with non-pointer use cases, we can think about how to be best address those then...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added to my TODO list.

mSocket.RequestCallbackOnPendingRead();
#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS

Expand Down Expand Up @@ -1026,7 +1026,7 @@ INET_ERROR RawEndPoint::GetSocket(IPAddressType aAddressType)
// static
void RawEndPoint::HandlePendingIO(System::WatchableSocket & socket)
{
static_cast<RawEndPoint *>(socket.GetCallbackData())->HandlePendingIO();
reinterpret_cast<RawEndPoint *>(socket.GetCallbackData())->HandlePendingIO();
}

void RawEndPoint::HandlePendingIO()
Expand Down
8 changes: 4 additions & 4 deletions src/inet/TCPEndPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ INET_ERROR TCPEndPoint::Listen(uint16_t backlog)
res = chip::System::MapErrorPOSIX(errno);

// Wait for ability to read on this endpoint.
mSocket.SetCallback(HandlePendingIO, this);
mSocket.SetCallback(HandlePendingIO, reinterpret_cast<intptr_t>(this));
mSocket.RequestCallbackOnPendingRead();

#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS
Expand Down Expand Up @@ -507,7 +507,7 @@ INET_ERROR TCPEndPoint::Connect(const IPAddress & addr, uint16_t port, Interface
return res;
}

mSocket.SetCallback(HandlePendingIO, this);
mSocket.SetCallback(HandlePendingIO, reinterpret_cast<intptr_t>(this));

// Once Connecting or Connected, bump the reference count. The corresponding Release()
// [or on LwIP, DeferredRelease()] will happen in DoClose().
Expand Down Expand Up @@ -2440,7 +2440,7 @@ INET_ERROR TCPEndPoint::GetSocket(IPAddressType addrType)
// static
void TCPEndPoint::HandlePendingIO(System::WatchableSocket & socket)
{
static_cast<TCPEndPoint *>(socket.GetCallbackData())->HandlePendingIO();
reinterpret_cast<TCPEndPoint *>(socket.GetCallbackData())->HandlePendingIO();
}

void TCPEndPoint::HandlePendingIO()
Expand Down Expand Up @@ -2693,7 +2693,7 @@ void TCPEndPoint::HandleIncomingConnection()
conEP->Retain();

// Wait for ability to read on this endpoint.
conEP->mSocket.SetCallback(HandlePendingIO, conEP);
conEP->mSocket.SetCallback(HandlePendingIO, reinterpret_cast<intptr_t>(conEP));
conEP->mSocket.RequestCallbackOnPendingRead();

// Call the app's callback function.
Expand Down
4 changes: 2 additions & 2 deletions src/inet/UDPEndPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ INET_ERROR UDPEndPoint::Listen(OnMessageReceivedFunct onMessageReceived, OnRecei

#if CHIP_SYSTEM_CONFIG_USE_SOCKETS
// Wait for ability to read on this endpoint.
mSocket.SetCallback(HandlePendingIO, this);
mSocket.SetCallback(HandlePendingIO, reinterpret_cast<intptr_t>(this));
mSocket.RequestCallbackOnPendingRead();
#endif // CHIP_SYSTEM_CONFIG_USE_SOCKETS

Expand Down Expand Up @@ -909,7 +909,7 @@ INET_ERROR UDPEndPoint::GetSocket(IPAddressType aAddressType)
// static
void UDPEndPoint::HandlePendingIO(System::WatchableSocket & socket)
{
static_cast<UDPEndPoint *>(socket.GetCallbackData())->HandlePendingIO();
reinterpret_cast<UDPEndPoint *>(socket.GetCallbackData())->HandlePendingIO();
}

void UDPEndPoint::HandlePendingIO()
Expand Down
4 changes: 2 additions & 2 deletions src/platform/Darwin/MdnsImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -544,9 +544,9 @@ CHIP_ERROR ChipMdnsResolve(MdnsService * service, chip::Inet::InterfaceId interf
return Resolve(context, callback, interfaceId, service->mAddressType, regtype.c_str(), service->mName);
}

void UpdateMdnsDataset(fd_set & readFdSet, fd_set & writeFdSet, fd_set & errorFdSet, int & maxFd, timeval & timeout) {}
void GetMdnsTimeout(timeval & timeout) {}

void ProcessMdns(fd_set & readFdSet, fd_set & writeFdSet, fd_set & errorFdSet) {}
void HandleMdnsTimeout() {}

} // namespace Mdns
} // namespace chip
116 changes: 51 additions & 65 deletions src/platform/Linux/MdnsImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@

#include <netinet/in.h>

#include <platform/internal/CHIPDeviceLayerInternal.h>
#include <support/CHIPMem.h>
#include <support/CHIPMemString.h>
#include <support/CodeUtils.h>

using chip::Mdns::kMdnsTypeMaxSize;
using chip::Mdns::MdnsServiceProtocol;
using chip::Mdns::TextEntry;
using chip::System::SocketEvents;
using std::chrono::duration_cast;
using std::chrono::microseconds;
using std::chrono::seconds;
Expand Down Expand Up @@ -79,6 +81,19 @@ chip::Inet::IPAddressType ToAddressType(AvahiProtocol protocol)
return type;
}

AvahiWatchEvent ToAvahiWatchEvent(SocketEvents events)
{
return static_cast<AvahiWatchEvent>((events.Has(chip::System::SocketEventFlags::kRead) ? AVAHI_WATCH_IN : 0) |
(events.Has(chip::System::SocketEventFlags::kWrite) ? AVAHI_WATCH_OUT : 0) |
(events.Has(chip::System::SocketEventFlags::kError) ? AVAHI_WATCH_ERR : 0));
}

void AvahiWatchCallbackTrampoline(chip::System::WatchableSocket & socket)
{
AvahiWatch * const watch = reinterpret_cast<AvahiWatch *>(socket.GetCallbackData());
watch->mCallback(watch, socket.GetFD(), ToAvahiWatchEvent(socket.GetPendingEvents()), watch->mContext);
}

CHIP_ERROR MakeAvahiStringListFromTextEntries(TextEntry * entries, size_t size, AvahiStringList ** strListOut)
{
*strListOut = avahi_string_list_new(nullptr, nullptr);
Expand Down Expand Up @@ -133,6 +148,8 @@ Poller::Poller()
mAvahiPoller.timeout_new = TimeoutNew;
mAvahiPoller.timeout_update = TimeoutUpdate;
mAvahiPoller.timeout_free = TimeoutFree;

mWatchableEvents = &DeviceLayer::SystemLayer.WatchableEvents();
}

AvahiWatch * Poller::WatchNew(const struct AvahiPoll * poller, int fd, AvahiWatchEvent event, AvahiWatchCallback callback,
Expand All @@ -145,19 +162,43 @@ AvahiWatch * Poller::WatchNew(int fd, AvahiWatchEvent event, AvahiWatchCallback
{
VerifyOrDie(callback != nullptr && fd >= 0);

mWatches.emplace_back(new AvahiWatch{ fd, event, 0, callback, context, this });
auto watch = std::make_unique<AvahiWatch>();
watch->mSocket.Init(*mWatchableEvents)
.Attach(fd)
.SetCallback(AvahiWatchCallbackTrampoline, reinterpret_cast<intptr_t>(watch.get()))
.RequestCallbackOnPendingRead(event & AVAHI_WATCH_IN)
.RequestCallbackOnPendingWrite(event & AVAHI_WATCH_OUT);
watch->mCallback = callback;
watch->mContext = context;
watch->mPoller = this;
mWatches.emplace_back(std::move(watch));

return mWatches.back().get();
}

void Poller::WatchUpdate(AvahiWatch * watch, AvahiWatchEvent event)
{
watch->mWatchEvents = event;
if (event & AVAHI_WATCH_IN)
{
watch->mSocket.RequestCallbackOnPendingRead();
}
else
{
watch->mSocket.ClearCallbackOnPendingRead();
}
if (event & AVAHI_WATCH_OUT)
{
watch->mSocket.RequestCallbackOnPendingWrite();
}
else
{
watch->mSocket.ClearCallbackOnPendingWrite();
}
}

AvahiWatchEvent Poller::WatchGetEvents(AvahiWatch * watch)
{
return static_cast<AvahiWatchEvent>(watch->mHappenedEvents);
return ToAvahiWatchEvent(watch->mSocket.GetPendingEvents());
}

void Poller::WatchFree(AvahiWatch * watch)
Expand All @@ -167,6 +208,7 @@ void Poller::WatchFree(AvahiWatch * watch)

void Poller::WatchFree(AvahiWatch & watch)
{
(void) watch.mSocket.ReleaseFD();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a confused by this. WatchableSocket is normally expected to own the fd but not in the case of avahi?

If WatchableSocket is owning, why doesn't it close in its destructor?

If WatchableSocket is non-owning, why does it close the file descriptor in Close()?

If WatchableSocket non-owning but Close() is just a helper for release-and-close, should "release" call WatchableSocket::OnClose which does the actual detach-from-WatchableEventManager?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a confused by this. WatchableSocket is normally expected to own the fd but not in the case of avahi?

Right. In the other cases (Inet and System::WakeEvent) the WatchableSocket effectively owns the fd, and replaces the fd from the point of view of the holder. In the Avahi case, Avahi itself owns the fd and lends it via AvahiPoll.watch_new and watch_free.

If WatchableSocket is owning, why doesn't it close in its destructor?

For the normal (Inet) case, it's contained inside an IPEndPointBasis which is not constructible or destructible. It's contained in order to avoid requiring separate allocation. It may be worth revisiting that if/when the System/Inet virtualization and pool allocation improvements happen.

If WatchableSocket is non-owning, why does it close the file descriptor in Close()?

Convenience for the common case.

If WatchableSocket non-owning but Close() is just a helper for release-and-close, should "release" call WatchableSocket::OnClose which does the actual detach-from-WatchableEventManager?

Yes, OnClose() should be renamed OnRelease().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a confused by this. WatchableSocket is normally expected to own the fd but not in the case of avahi?

Right. In the other cases (Inet and System::WakeEvent) the WatchableSocket effectively owns the fd, and replaces the fd from the point of view of the holder. In the Avahi case, Avahi itself owns the fd and lends it via AvahiPoll.watch_new and watch_free.

Having a non-owning watcher separate from fd-owner makes the most sense to me and would fit well with avahi but understood there are other constraints here.

If WatchableSocket is owning, why doesn't it close in its destructor?

For the normal (Inet) case, it's contained inside an IPEndPointBasis which is not constructible or destructible. It's contained in order to avoid requiring separate allocation. It may be worth revisiting that if/when the System/Inet virtualization and pool allocation improvements happen.

Ahh, the pool should really run destructors on slots released back to it :(

If WatchableSocket is non-owning, why does it close the file descriptor in Close()?

Convenience for the common case.

If WatchableSocket non-owning but Close() is just a helper for release-and-close, should "release" call WatchableSocket::OnClose which does the actual detach-from-WatchableEventManager?

Yes, OnClose() should be renamed OnRelease().

Ah, I see that it does I was just confused by the name. Rename sounds good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a non-owning watcher separate from fd-owner makes the most sense to me and would fit well with avahi but understood there are other constraints here.

I'll take a look at this when I do the remaining test and timer TODOs, which likely will be together with #7725. I don't know that it will make much difference; since we don't have weak fds, a watcher must be informed before a file is closed.

Yes, OnClose() should be renamed OnRelease().

Ah, I see that it does I was just confused by the name. Rename sounds good.

Ack, added to my TODO list.

mWatches.erase(std::remove_if(mWatches.begin(), mWatches.end(),
[&watch](const std::unique_ptr<AvahiWatch> & aValue) { return aValue.get() == &watch; }),
mWatches.end());
Expand Down Expand Up @@ -226,38 +268,10 @@ void Poller::TimeoutFree(AvahiTimeout & timer)
mTimers.end());
}

void Poller::UpdateFdSet(fd_set & readFdSet, fd_set & writeFdSet, fd_set & errorFdSet, int & aMaxFd, timeval & timeout)
void Poller::GetTimeout(timeval & timeout)
{
microseconds timeoutVal = seconds(timeout.tv_sec) + microseconds(timeout.tv_usec);

for (auto && watch : mWatches)
{
int fd = watch->mFd;
AvahiWatchEvent events = watch->mWatchEvents;

if (AVAHI_WATCH_IN & events)
{
FD_SET(fd, &readFdSet);
}

if (AVAHI_WATCH_OUT & events)
{
FD_SET(fd, &writeFdSet);
}

if (AVAHI_WATCH_ERR & events)
{
FD_SET(fd, &errorFdSet);
}

if (aMaxFd < fd)
{
aMaxFd = fd;
}

watch->mHappenedEvents = 0;
}

for (auto && timer : mTimers)
{
steady_clock::time_point absTimeout = timer->mAbsTimeout;
Expand All @@ -282,38 +296,10 @@ void Poller::UpdateFdSet(fd_set & readFdSet, fd_set & writeFdSet, fd_set & error
timeout.tv_usec = static_cast<uint64_t>(timeoutVal.count()) % kUsPerSec;
}

void Poller::Process(const fd_set & readFdSet, const fd_set & writeFdSet, const fd_set & errorFdSet)
void Poller::HandleTimeout()
{
steady_clock::time_point now = steady_clock::now();

for (auto && watch : mWatches)
{
int fd = watch->mFd;
AvahiWatchEvent events = watch->mWatchEvents;

watch->mHappenedEvents = 0;

if ((AVAHI_WATCH_IN & events) && FD_ISSET(fd, &readFdSet))
{
watch->mHappenedEvents |= AVAHI_WATCH_IN;
}

if ((AVAHI_WATCH_OUT & events) && FD_ISSET(fd, &writeFdSet))
{
watch->mHappenedEvents |= AVAHI_WATCH_OUT;
}

if ((AVAHI_WATCH_ERR & events) && FD_ISSET(fd, &errorFdSet))
{
watch->mHappenedEvents |= AVAHI_WATCH_ERR;
}

if (watch->mHappenedEvents)
{
watch->mCallback(watch.get(), watch->mFd, static_cast<AvahiWatchEvent>(watch->mHappenedEvents), watch->mContext);
}
}

for (auto && timer : mTimers)
{
if (!timer->mEnabled)
Expand Down Expand Up @@ -753,14 +739,14 @@ MdnsAvahi::~MdnsAvahi()
}
}

void UpdateMdnsDataset(fd_set & readFdSet, fd_set & writeFdSet, fd_set & errorFdSet, int & maxFd, timeval & timeout)
void GetMdnsTimeout(timeval & timeout)
{
MdnsAvahi::GetInstance().GetPoller().UpdateFdSet(readFdSet, writeFdSet, errorFdSet, maxFd, timeout);
MdnsAvahi::GetInstance().GetPoller().GetTimeout(timeout);
}

void ProcessMdns(fd_set & readFdSet, fd_set & writeFdSet, fd_set & errorFdSet)
void HandleMdnsTimeout()
{
MdnsAvahi::GetInstance().GetPoller().Process(readFdSet, writeFdSet, errorFdSet);
MdnsAvahi::GetInstance().GetPoller().HandleTimeout();
}

CHIP_ERROR ChipMdnsInit(MdnsAsyncReturnCallback initCallback, MdnsAsyncReturnCallback errorCallback, void * context)
Expand Down
11 changes: 5 additions & 6 deletions src/platform/Linux/MdnsImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,11 @@
#include <avahi-common/watch.h>

#include "lib/mdns/platform/Mdns.h"
#include "system/SystemSockets.h"

struct AvahiWatch
{
int mFd; ///< The file descriptor to watch.
AvahiWatchEvent mWatchEvents; ///< The interested events.
int mHappenedEvents; ///< The events happened.
chip::System::WatchableSocket mSocket;
AvahiWatchCallback mCallback; ///< The function to be called when interested events happened on mFd.
void * mContext; ///< A pointer to application-specific context.
void * mPoller; ///< The poller created this watch.
Expand All @@ -62,9 +61,8 @@ class Poller
public:
Poller(void);

void UpdateFdSet(fd_set & readFdSet, fd_set & writeFdSet, fd_set & errorFdSet, int & maxFd, timeval & timeout);

void Process(const fd_set & readFdSet, const fd_set & writeFdSet, const fd_set & errorFdSet);
void GetTimeout(timeval & timeout);
void HandleTimeout();

const AvahiPoll * GetAvahiPoll(void) const { return &mAvahiPoller; }

Expand Down Expand Up @@ -92,6 +90,7 @@ class Poller
std::vector<std::unique_ptr<AvahiWatch>> mWatches;
std::vector<std::unique_ptr<AvahiTimeout>> mTimers;
AvahiPoll mAvahiPoller;
System::WatchableEventManager * mWatchableEvents;
};

class MdnsAvahi
Expand Down
4 changes: 2 additions & 2 deletions src/system/SystemSockets.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ Error WakeEvent::Open(WatchableEventManager & watchState)

mFD.Init(watchState);
mFD.Attach(fds[FD_READ]);
mFD.SetCallback(Confirm, this);
mFD.SetCallback(Confirm, reinterpret_cast<intptr_t>(this));
mFD.RequestCallbackOnPendingRead();

mWriteFD = fds[FD_WRITE];
Expand Down Expand Up @@ -135,7 +135,7 @@ Error WakeEvent::Open(WatchableEventManager & watchState)
}

mFD.Attach(fd);
mFD.SetCallback(Confirm, this);
mFD.SetCallback(Confirm, reinterpret_cast<intptr_t>(this));
mFD.RequestCallbackOnPendingRead();

return CHIP_SYSTEM_NO_ERROR;
Expand Down
Loading