Skip to content

Commit

Permalink
minmdns - perform exponental backoff retry for discovering nodes (#9900)
Browse files Browse the repository at this point in the history
* V1: very hardcoded logic for retries

* Move the active resolve attempts class to a separate header/cpp file for better code organization

* Restyle fixes

* Remove unused include in minmdns resolver implementation

* Update code to use Optional for potentially missing values

* use a clock base for the active resolve attempts, to allow for testability

* Start adding unit tests for the resolve attempt queue

* More  unit tests

* Restyle fixes

* Fix signed/unsigned for optional, to make android compile

* Code review updates, add a more complex test for scheduling check
  • Loading branch information
andy31415 authored Sep 28, 2021
1 parent 3b15fd5 commit 083c9ad
Show file tree
Hide file tree
Showing 6 changed files with 653 additions and 39 deletions.
134 changes: 95 additions & 39 deletions src/lib/mdns/Resolver_ImplMinimalMdns.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <lib/mdns/MinimalMdnsServer.h>
#include <lib/mdns/ServiceNaming.h>
#include <lib/mdns/TxtFields.h>
#include <lib/mdns/minimal/ActiveResolveAttempts.h>
#include <lib/mdns/minimal/Parser.h>
#include <lib/mdns/minimal/QueryBuilder.h>
#include <lib/mdns/minimal/RecordData.h>
Expand Down Expand Up @@ -82,9 +83,10 @@ class PacketDataReporter : public ParserDelegate
void OnHeader(ConstHeaderRef & header) override;
void OnQuery(const QueryData & data) override;
void OnResource(ResourceType type, const ResourceData & data) override;

// Called after ParsePacket is complete to send final notifications to the delegate.
// Used to ensure all the available IP addresses are attached before completion.
void OnComplete();
void OnComplete(ActiveResolveAttempts & activeAttempts);

private:
ResolverDelegate * mDelegate = nullptr;
Expand Down Expand Up @@ -311,7 +313,7 @@ void PacketDataReporter::OnResource(ResourceType type, const ResourceData & data
}
}

void PacketDataReporter::OnComplete()
void PacketDataReporter::OnComplete(ActiveResolveAttempts & activeAttempts)
{
if ((mDiscoveryType == DiscoveryType::kCommissionableNode || mDiscoveryType == DiscoveryType::kCommissionerNode) &&
mDiscoveredNodeData.IsValid())
Expand All @@ -320,6 +322,8 @@ void PacketDataReporter::OnComplete()
}
else if (mDiscoveryType == DiscoveryType::kOperational && mHasIP && mHasNodePort)
{
activeAttempts.Complete(mNodeData.mPeerId);

mNodeData.LogNodeIdResolved();
mDelegate->OnNodeIdResolved(mNodeData);
}
Expand All @@ -328,7 +332,10 @@ void PacketDataReporter::OnComplete()
class MinMdnsResolver : public Resolver, public MdnsPacketDelegate
{
public:
MinMdnsResolver() { GlobalMinimalMdnsServer::Instance().SetResponseDelegate(this); }
MinMdnsResolver() : mActiveResolves(chip::System::Internal::gClockBase)
{
GlobalMinimalMdnsServer::Instance().SetResponseDelegate(this);
}

//// MdnsPacketDelegate implementation
void OnMdnsPacketData(const BytesRange & data, const chip::Inet::IPPacketInfo * info) override;
Expand All @@ -344,6 +351,13 @@ class MinMdnsResolver : public Resolver, public MdnsPacketDelegate
private:
ResolverDelegate * mDelegate = nullptr;
DiscoveryType mDiscoveryType = DiscoveryType::kUnknown;
System::Layer * mSystemLayer = nullptr;
ActiveResolveAttempts mActiveResolves;

CHIP_ERROR SendPendingResolveQueries();
CHIP_ERROR ScheduleResolveRetries();

static void ResolveRetryCallback(System::Layer *, void * self);

CHIP_ERROR SendQuery(mdns::Minimal::FullQName qname, mdns::Minimal::QType type);
CHIP_ERROR BrowseNodes(DiscoveryType type, DiscoveryFilter subtype);
Expand Down Expand Up @@ -379,7 +393,8 @@ void MinMdnsResolver::OnMdnsPacketData(const BytesRange & data, const chip::Inet
}
else
{
reporter.OnComplete();
reporter.OnComplete(mActiveResolves);
ScheduleResolveRetries();
}
}

Expand All @@ -392,6 +407,8 @@ CHIP_ERROR MinMdnsResolver::StartResolver(chip::Inet::InetLayer * inetLayer, uin
return CHIP_NO_ERROR;
}

mSystemLayer = inetLayer->SystemLayer();

return GlobalMinimalMdnsServer::Instance().StartServer(inetLayer, port);
}

Expand Down Expand Up @@ -491,47 +508,86 @@ CHIP_ERROR MinMdnsResolver::BrowseNodes(DiscoveryType type, DiscoveryFilter filt

CHIP_ERROR MinMdnsResolver::ResolveNodeId(const PeerId & peerId, Inet::IPAddressType type)
{
mDiscoveryType = DiscoveryType::kOperational;
System::PacketBufferHandle buffer = System::PacketBufferHandle::New(kMdnsMaxPacketSize);
ReturnErrorCodeIf(buffer.IsNull(), CHIP_ERROR_NO_MEMORY);
mDiscoveryType = DiscoveryType::kOperational;
mActiveResolves.MarkPending(peerId);

QueryBuilder builder(std::move(buffer));
builder.Header().SetMessageId(0);
return SendPendingResolveQueries();
}

CHIP_ERROR MinMdnsResolver::ScheduleResolveRetries()
{
ReturnErrorCodeIf(mSystemLayer == nullptr, CHIP_ERROR_INCORRECT_STATE);
mSystemLayer->CancelTimer(&ResolveRetryCallback, this);

Optional<uint32_t> delayMs = mActiveResolves.GetMsUntilNextExpectedResponse();

if (!delayMs.HasValue())
{
char nameBuffer[64] = "";

// Node and fabricid are encoded in server names.
ReturnErrorOnFailure(MakeInstanceName(nameBuffer, sizeof(nameBuffer), peerId));

const char * instanceQName[] = { nameBuffer, kOperationalServiceName, kOperationalProtocol, kLocalDomain };
Query query(instanceQName);

query
.SetClass(QClass::IN) //
.SetType(QType::ANY) //
.SetAnswerViaUnicast(false) //
;

// NOTE: type above is NOT A or AAAA because the name searched for is
// a SRV record. The layout is:
// SRV -> hostname
// Hostname -> A
// Hostname -> AAAA
//
// Query is sent for ANY and expectation is to receive A/AAAA records
// in the additional section of the reply.
//
// Sending a A/AAAA query will return no results
// Sending a SRV query will return the srv only and an additional query
// would be needed to resolve the host name to an IP address

builder.AddQuery(query);
return CHIP_NO_ERROR;
}

ReturnErrorCodeIf(!builder.Ok(), CHIP_ERROR_INTERNAL);
return mSystemLayer->StartTimer(delayMs.Value(), &ResolveRetryCallback, this);
}

return GlobalMinimalMdnsServer::Server().BroadcastSend(builder.ReleasePacket(), kMdnsPort);
void MinMdnsResolver::ResolveRetryCallback(System::Layer *, void * self)
{
reinterpret_cast<MinMdnsResolver *>(self)->SendPendingResolveQueries();
}

CHIP_ERROR MinMdnsResolver::SendPendingResolveQueries()
{
while (true)
{
Optional<PeerId> peerId = mActiveResolves.NextScheduledPeer();

if (!peerId.HasValue())
{
break;
}

System::PacketBufferHandle buffer = System::PacketBufferHandle::New(kMdnsMaxPacketSize);
ReturnErrorCodeIf(buffer.IsNull(), CHIP_ERROR_NO_MEMORY);

QueryBuilder builder(std::move(buffer));
builder.Header().SetMessageId(0);

{
char nameBuffer[kMaxOperationalServiceNameSize] = "";

// Node and fabricid are encoded in server names.
ReturnErrorOnFailure(MakeInstanceName(nameBuffer, sizeof(nameBuffer), peerId.Value()));

const char * instanceQName[] = { nameBuffer, kOperationalServiceName, kOperationalProtocol, kLocalDomain };
Query query(instanceQName);

query
.SetClass(QClass::IN) //
.SetType(QType::ANY) //
.SetAnswerViaUnicast(false) //
;

// NOTE: type above is NOT A or AAAA because the name searched for is
// a SRV record. The layout is:
// SRV -> hostname
// Hostname -> A
// Hostname -> AAAA
//
// Query is sent for ANY and expectation is to receive A/AAAA records
// in the additional section of the reply.
//
// Sending a A/AAAA query will return no results
// Sending a SRV query will return the srv only and an additional query
// would be needed to resolve the host name to an IP address

builder.AddQuery(query);
}

ReturnErrorCodeIf(!builder.Ok(), CHIP_ERROR_INTERNAL);

ReturnErrorOnFailure(GlobalMinimalMdnsServer::Server().BroadcastSend(builder.ReleasePacket(), kMdnsPort));
}

return ScheduleResolveRetries();
}

MinMdnsResolver gResolver;
Expand Down
187 changes: 187 additions & 0 deletions src/lib/mdns/minimal/ActiveResolveAttempts.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*
*
* Copyright (c) 2021 Project CHIP Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "ActiveResolveAttempts.h"

#include <lib/support/logging/CHIPLogging.h>

#include <algorithm>

using namespace chip;

namespace mdns {
namespace Minimal {

void ActiveResolveAttempts::Reset()

{
for (auto & item : mRetryQueue)
{
item.peerId.SetNodeId(kUndefinedNodeId);
}
}

void ActiveResolveAttempts::Complete(const PeerId & peerId)
{
for (auto & item : mRetryQueue)
{
if (item.peerId == peerId)
{
item.peerId.SetNodeId(kUndefinedNodeId);
return;
}
}

// This may happen during boot time adverisements: nodes come online
// and advertise their IP without any explicit queries for them
ChipLogProgress(Discovery, "Discovered node without a pending query");
}

void ActiveResolveAttempts::MarkPending(const PeerId & peerId)
{
// Strategy when picking the peer id to use:
// 1 if a matching peer id is already found, use that one
// 2 if an 'unused' entry is found, use that
// 3 otherwise expire the one with the largest nextRetryDelaySec
// or if equal nextRetryDelaySec, pick the one with the oldest
// queryDueTimeMs

RetryEntry * entryToUse = &mRetryQueue[0];

for (size_t i = 1; i < kRetryQueueSize; i++)
{
if (entryToUse->peerId == peerId)
{
break; // best match possible
}

RetryEntry * entry = mRetryQueue + i;

// Rule 1: peer id match always matches
if (entry->peerId == peerId)
{
entryToUse = entry;
continue;
}

// Rule 2: select unused entries
if ((entryToUse->peerId.GetNodeId() != kUndefinedNodeId) && (entry->peerId.GetNodeId() == kUndefinedNodeId))
{
entryToUse = entry;
continue;
}
else if (entryToUse->peerId.GetNodeId() == kUndefinedNodeId)
{
continue;
}

// Rule 3: both choices are used (have a defined node id):
// - try to find the one with the largest next delay (oldest request)
// - on same delay, use queryDueTime to determine the oldest request
// (the one with the smallest due time was issued the longest time
// ago)
if (entry->nextRetryDelaySec > entryToUse->nextRetryDelaySec)
{
entryToUse = entry;
}
else if ((entry->nextRetryDelaySec == entryToUse->nextRetryDelaySec) &&
(entry->queryDueTimeMs < entryToUse->queryDueTimeMs))
{
entryToUse = entry;
}
}

if ((entryToUse->peerId.GetNodeId() != kUndefinedNodeId) && (entryToUse->peerId != peerId))
{
// TODO: node was evicted here, if/when resolution failures are
// supported this could be a place for error callbacks
//
// Note however that this is NOT an actual 'timeout' it is showing
// a burst of lookups for which we cannot maintain state. A reply may
// still be received for this peer id (query was already sent on the
// network)
ChipLogError(Discovery, "Re-using pending resolve entry before reply was received.");
}

entryToUse->peerId = peerId;
entryToUse->queryDueTimeMs = mClock->GetMonotonicMilliseconds();
entryToUse->nextRetryDelaySec = 1;
}

Optional<uint32_t> ActiveResolveAttempts::GetMsUntilNextExpectedResponse() const
{
Optional<uint32_t> minDelay = Optional<uint32_t>::Missing();

chip::System::Clock::MonotonicMilliseconds nowMs = mClock->GetMonotonicMilliseconds();

for (auto & entry : mRetryQueue)
{
if (entry.peerId.GetNodeId() == kUndefinedNodeId)
{
continue;
}

if (nowMs >= entry.queryDueTimeMs)
{
// found an entry that needs processing right now
return Optional<uint32_t>::Value(0);
}

uint32_t entryDelay = static_cast<int>(entry.queryDueTimeMs - nowMs);
if (!minDelay.HasValue() || (minDelay.Value() > entryDelay))
{
minDelay.SetValue(entryDelay);
}
}

return minDelay;
}

Optional<PeerId> ActiveResolveAttempts::NextScheduledPeer()
{
chip::System::Clock::MonotonicMilliseconds nowMs = mClock->GetMonotonicMilliseconds();

for (auto & entry : mRetryQueue)
{
if (entry.peerId.GetNodeId() == kUndefinedNodeId)
{
continue; // not a pending item
}

if (entry.queryDueTimeMs > nowMs)
{
continue; // not yet due
}

if (entry.nextRetryDelaySec > kMaxRetryDelaySec)
{
ChipLogError(Discovery, "Timeout waiting for mDNS resolution.");
entry.peerId.SetNodeId(kUndefinedNodeId);
continue;
}

entry.queryDueTimeMs = nowMs + entry.nextRetryDelaySec * 1000L;
entry.nextRetryDelaySec *= 2;

return Optional<PeerId>::Value(entry.peerId);
}

return Optional<PeerId>::Missing();
}

} // namespace Minimal
} // namespace mdns
Loading

0 comments on commit 083c9ad

Please sign in to comment.