Skip to content

Commit

Permalink
remove read client/handler and write handler when corresponding fabri…
Browse files Browse the repository at this point in the history
…c is removed (#21204)

* remove read client/handler and write handler when corresponding fabric is removed

* address comments

* address comments

* address comments
  • Loading branch information
yunhanw-google authored and pull[bot] committed Sep 8, 2022
1 parent ddfa46b commit 1987793
Show file tree
Hide file tree
Showing 12 changed files with 193 additions and 57 deletions.
63 changes: 41 additions & 22 deletions src/app/InteractionModelEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ CHIP_ERROR InteractionModelEngine::Init(Messaging::ExchangeManager * apExchangeM
mpFabricTable = apFabricTable;
mpCASESessionMgr = apCASESessionMgr;

ReturnErrorOnFailure(mpFabricTable->AddFabricDelegate(this));
ReturnErrorOnFailure(mpExchangeMgr->RegisterUnsolicitedMessageHandlerForProtocol(Protocols::InteractionModel::Id, this));

mReportingEngine.Init();
Expand All @@ -76,9 +77,9 @@ void InteractionModelEngine::Shutdown()
//
while (handlerIter)
{
CommandHandlerInterface * next = handlerIter->GetNext();
CommandHandlerInterface * nextHandler = handlerIter->GetNext();
handlerIter->SetNext(nullptr);
handlerIter = next;
handlerIter = nextHandler;
}

mCommandHandlerList = nullptr;
Expand Down Expand Up @@ -235,24 +236,6 @@ uint32_t InteractionModelEngine::GetNumActiveWriteHandlers() const
return numActive;
}

void InteractionModelEngine::CloseTransactionsFromFabricIndex(FabricIndex aFabricIndex)
{
//
// Walk through all existing subscriptions and shut down those whose subscriber matches
// that which just came in.
//
mReadHandlers.ForEachActiveObject([this, aFabricIndex](ReadHandler * handler) {
if (handler->GetAccessingFabricIndex() == aFabricIndex)
{
ChipLogProgress(InteractionModel, "Deleting expired ReadHandler for NodeId: " ChipLogFormatX64 ", FabricIndex: %u",
ChipLogValueX64(handler->GetInitiatorNodeId()), aFabricIndex);
mReadHandlers.ReleaseObject(handler);
}

return Loop::Continue;
});
}

CHIP_ERROR InteractionModelEngine::ShutdownSubscription(SubscriptionId aSubscriptionId)
{
for (auto * readClient = mpActiveReadClientList; readClient != nullptr; readClient = readClient->GetNextClient())
Expand Down Expand Up @@ -1213,9 +1196,9 @@ void InteractionModelEngine::ReleasePool(ObjectList<T> *& aObjectList, ObjectPoo
ObjectList<T> * current = aObjectList;
while (current != nullptr)
{
ObjectList<T> * next = current->mpNext;
ObjectList<T> * nextObject = current->mpNext;
aObjectPool.ReleaseObject(current);
current = next;
current = nextObject;
}

aObjectList = nullptr;
Expand Down Expand Up @@ -1429,5 +1412,41 @@ size_t InteractionModelEngine::GetNumDirtySubscriptions() const
return numDirtySubscriptions;
}

void InteractionModelEngine::OnFabricRemoved(const FabricTable & fabricTable, FabricIndex fabricIndex)
{
mReadHandlers.ForEachActiveObject([fabricIndex](ReadHandler * handler) {
if (handler->GetAccessingFabricIndex() == fabricIndex)
{
ChipLogProgress(InteractionModel, "Deleting expired ReadHandler for NodeId: " ChipLogFormatX64 ", FabricIndex: %u",
ChipLogValueX64(handler->GetInitiatorNodeId()), fabricIndex);
handler->Close();
}

return Loop::Continue;
});

for (auto * readClient = mpActiveReadClientList; readClient != nullptr; readClient = readClient->GetNextClient())
{
if (readClient->GetFabricIndex() == fabricIndex)
{
ChipLogProgress(InteractionModel, "Fabric removed, deleting obsolete read client with FabricIndex: %u", fabricIndex);
readClient->Close(CHIP_ERROR_IM_FABRIC_DELETED, false);
}
}

for (auto & handler : mWriteHandlers)
{
if (!(handler.IsFree()) && handler.GetAccessingFabricIndex() == fabricIndex)
{
ChipLogProgress(InteractionModel, "Fabric removed, deleting obsolete write handler with FabricIndex: %u", fabricIndex);
handler.Close();
}
}

// Applications may hold references to CommandHandler instances for async command processing.
// Therefore we can't forcible destroy CommandHandlers here. Their exchanges will get closed by
// the fabric removal, though, so they will fail when they try to actually send their command response
// and will close at that point.
}
} // namespace app
} // namespace chip
12 changes: 5 additions & 7 deletions src/app/InteractionModelEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ namespace app {
class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler,
public Messaging::ExchangeDelegate,
public CommandHandler::Callback,
public ReadHandler::ManagementCallback
public ReadHandler::ManagementCallback,
public FabricTable::Delegate
{
public:
/**
Expand Down Expand Up @@ -143,12 +144,6 @@ class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler,
*/
void ShutdownAllSubscriptions();

/**
* Expire active transactions and release related objects for the given fabric index.
* This is used for releasing transactions that won't be closed when a fabric is removed.
*/
void CloseTransactionsFromFabricIndex(FabricIndex aFabricIndex);

uint32_t GetNumActiveReadHandlers() const;
uint32_t GetNumActiveReadHandlers(ReadHandler::InteractionType type) const;

Expand Down Expand Up @@ -289,6 +284,9 @@ class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler,
*/
uint16_t GetMinGuaranteedSubscriptionsPerFabric() const;

// virtual method from FabricTable::Delegate
void OnFabricRemoved(const FabricTable & fabricTable, FabricIndex fabricIndex) override;

#if CONFIG_BUILD_FOR_HOST_UNIT_TEST
//
// Get direct access to the underlying read handler pool
Expand Down
8 changes: 0 additions & 8 deletions src/app/ReadClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,14 +272,6 @@ class ReadClient : public Messaging::ExchangeDelegate
*/
~ReadClient() override;

/*
* This forcibly closes the exchange context if a valid one is pointed to. Such a situation does
* not arise during normal message processing flows that all normally call Close() above. This can only
* arise due to application-initiated destruction of the object when this object is handling receiving/sending
* message payloads. Abort() should be called first before the object is destroyed.
*/
void Abort();

/**
* Send a request. There can be one request outstanding on a given ReadClient.
* If SendRequest returns success, no more SendRequest calls can happen on this ReadClient
Expand Down
9 changes: 5 additions & 4 deletions src/app/WriteHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ class WriteHandler : public Messaging::ExchangeDelegate
*/
void Abort();

/**
* Clean up state when we are done sending the write response.
*/
void Close();

bool IsFree() const { return mState == State::Uninitialized; }

~WriteHandler() override = default;
Expand Down Expand Up @@ -133,10 +138,6 @@ class WriteHandler : public Messaging::ExchangeDelegate
void MoveToState(const State aTargetState);
void ClearState();
const char * GetStateStr() const;
/**
* Clean up state when we are done sending the write response.
*/
void Close();

void DeliverListWriteBegin(const ConcreteAttributePath & aPath);
void DeliverListWriteEnd(const ConcreteAttributePath & aPath, bool writeWasSuccessful);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ CHIP_ERROR DeleteFabricFromTable(FabricIndex fabricIndex)

void CleanupSessionsForFabric(SessionManager & sessionMgr, FabricIndex fabricIndex)
{
InteractionModelEngine::GetInstance()->CloseTransactionsFromFabricIndex(fabricIndex);
sessionMgr.ExpireAllSessionsForFabric(fabricIndex);
}

Expand Down Expand Up @@ -379,6 +378,7 @@ class OpCredsFabricTableDelegate : public chip::FabricTable::Delegate
ChipLogProgress(Zcl, "OpCreds: Fabric index 0x%x was removed", static_cast<unsigned>(fabricIndex));

EventManagement::GetInstance().FabricRemoved(fabricIndex);

NotifyFabricTableChanged();
}

Expand Down
63 changes: 63 additions & 0 deletions src/app/tests/TestReadInteraction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ class TestReadInteraction
static void TestSubscribeSendUnknownMessage(nlTestSuite * apSuite, void * apContext);
static void TestSubscribeSendInvalidStatusReport(nlTestSuite * apSuite, void * apContext);
static void TestReadHandlerInvalidSubscribeRequest(nlTestSuite * apSuite, void * apContext);
static void TestSubscribeInvalidateFabric(nlTestSuite * apSuite, void * apContext);

private:
static void GenerateReportData(nlTestSuite * apSuite, void * apContext, System::PacketBufferHandle & aPayload,
Expand Down Expand Up @@ -3668,6 +3669,67 @@ void TestReadInteraction::TestReadHandlerInvalidSubscribeRequest(nlTestSuite * a
NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0);
}

// Create the subscription, then remove the corresponding fabric in client and handler, the corresponding
// client and handler would be released as well.
void TestReadInteraction::TestSubscribeInvalidateFabric(nlTestSuite * apSuite, void * apContext)
{
TestContext & ctx = *static_cast<TestContext *>(apContext);
CHIP_ERROR err = CHIP_NO_ERROR;

Messaging::ReliableMessageMgr * rm = ctx.GetExchangeManager().GetReliableMessageMgr();
// Shouldn't have anything in the retransmit table when starting the test.
NL_TEST_ASSERT(apSuite, rm->TestGetCountRetransTable() == 0);

GenerateEvents(apSuite, apContext);

MockInteractionModelApp delegate;
auto * engine = chip::app::InteractionModelEngine::GetInstance();
err = engine->Init(&ctx.GetExchangeManager(), &ctx.GetFabricTable());
NL_TEST_ASSERT(apSuite, err == CHIP_NO_ERROR);

ReadPrepareParams readPrepareParams(ctx.GetSessionBobToAlice());
readPrepareParams.mpAttributePathParamsList = new chip::app::AttributePathParams[1];
readPrepareParams.mAttributePathParamsListSize = 1;

readPrepareParams.mpAttributePathParamsList[0].mEndpointId = Test::kMockEndpoint3;
readPrepareParams.mpAttributePathParamsList[0].mClusterId = Test::MockClusterId(2);
readPrepareParams.mpAttributePathParamsList[0].mAttributeId = Test::MockAttributeId(4);

readPrepareParams.mMinIntervalFloorSeconds = 0;
readPrepareParams.mMaxIntervalCeilingSeconds = 0;

{
app::ReadClient readClient(chip::app::InteractionModelEngine::GetInstance(), &ctx.GetExchangeManager(), delegate,
chip::app::ReadClient::InteractionType::Subscribe);

delegate.mGotReport = false;

err = readClient.SendAutoResubscribeRequest(std::move(readPrepareParams));
NL_TEST_ASSERT(apSuite, err == CHIP_NO_ERROR);

ctx.DrainAndServiceIO();

NL_TEST_ASSERT(apSuite, delegate.mGotReport);
NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadHandlers(ReadHandler::InteractionType::Subscribe) == 1);
NL_TEST_ASSERT(apSuite, engine->ActiveHandlerAt(0) != nullptr);
delegate.mpReadHandler = engine->ActiveHandlerAt(0);

ctx.GetFabricTable().Delete(ctx.GetAliceFabricIndex());
NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadHandlers(ReadHandler::InteractionType::Subscribe) == 0);
ctx.GetFabricTable().Delete(ctx.GetBobFabricIndex());
NL_TEST_ASSERT(apSuite, delegate.mError == CHIP_ERROR_IM_FABRIC_DELETED);
ctx.ExpireSessionAliceToBob();
ctx.ExpireSessionBobToAlice();
ctx.CreateAliceFabric();
ctx.CreateBobFabric();
ctx.CreateSessionAliceToBob();
ctx.CreateSessionBobToAlice();
}
NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadClients() == 0);
engine->Shutdown();
NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0);
}

} // namespace app
} // namespace chip

Expand Down Expand Up @@ -3712,6 +3774,7 @@ const nlTest sTests[] =
NL_TEST_DEF("TestSubscribeSendUnknownMessage", chip::app::TestReadInteraction::TestSubscribeSendUnknownMessage),
NL_TEST_DEF("TestSubscribeSendInvalidStatusReport", chip::app::TestReadInteraction::TestSubscribeSendInvalidStatusReport),
NL_TEST_DEF("TestReadHandlerInvalidSubscribeRequest", chip::app::TestReadInteraction::TestReadHandlerInvalidSubscribeRequest),
NL_TEST_DEF("TestSubscribeInvalidateFabric", chip::app::TestReadInteraction::TestSubscribeInvalidateFabric),
NL_TEST_DEF("TestSubscribeUrgentWildcardEvent", chip::app::TestReadInteraction::TestSubscribeUrgentWildcardEvent),
NL_TEST_DEF("TestSubscribeWildcard", chip::app::TestReadInteraction::TestSubscribeWildcard),
NL_TEST_DEF("TestSubscribePartialOverlap", chip::app::TestReadInteraction::TestSubscribePartialOverlap),
Expand Down
52 changes: 52 additions & 0 deletions src/app/tests/TestWriteInteraction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class TestWriteInteraction
static void TestWriteRoundtripWithClusterObjectsVersionMismatch(nlTestSuite * apSuite, void * apContext);
#if CONFIG_BUILD_FOR_HOST_UNIT_TEST
static void TestWriteHandlerReceiveInvalidMessage(nlTestSuite * apSuite, void * apContext);
static void TestWriteHandlerInvalidateFabric(nlTestSuite * apSuite, void * apContext);
#endif
private:
static void AddAttributeDataIB(nlTestSuite * apSuite, void * apContext, WriteClient & aWriteClient);
Expand Down Expand Up @@ -626,6 +627,56 @@ void TestWriteInteraction::TestWriteHandlerReceiveInvalidMessage(nlTestSuite * a
ctx.CreateSessionAliceToBob();
ctx.CreateSessionBobToAlice();
}

// This test is to create Chunked write requests, we drop the message since the 3rd message, then remove fabrics for client and
// handler, the corresponding client and handler would be released as well.
void TestWriteInteraction::TestWriteHandlerInvalidateFabric(nlTestSuite * apSuite, void * apContext)
{
TestContext & ctx = *static_cast<TestContext *>(apContext);
auto sessionHandle = ctx.GetSessionBobToAlice();

app::AttributePathParams attributePath(2, 3, 4);

CHIP_ERROR err = CHIP_NO_ERROR;
Messaging::ReliableMessageMgr * rm = ctx.GetExchangeManager().GetReliableMessageMgr();
// Shouldn't have anything in the retransmit table when starting the test.
NL_TEST_ASSERT(apSuite, rm->TestGetCountRetransTable() == 0);

TestWriteClientCallback writeCallback;
auto * engine = chip::app::InteractionModelEngine::GetInstance();
err = engine->Init(&ctx.GetExchangeManager(), &ctx.GetFabricTable());
NL_TEST_ASSERT(apSuite, err == CHIP_NO_ERROR);

app::WriteClient writeClient(&ctx.GetExchangeManager(), &writeCallback, Optional<uint16_t>::Missing(),
static_cast<uint16_t>(900) /* reserved buffer size */);

ByteSpan list[5];

err = writeClient.EncodeAttribute(attributePath, app::DataModel::List<ByteSpan>(list, 5));
NL_TEST_ASSERT(apSuite, err == CHIP_NO_ERROR);

ctx.GetLoopback().mDroppedMessageCount = 0;
ctx.GetLoopback().mSentMessageCount = 0;
ctx.GetLoopback().mNumMessagesToDrop = 1;
ctx.GetLoopback().mNumMessagesToAllowBeforeDropping = 2;
err = writeClient.SendWriteRequest(sessionHandle);
NL_TEST_ASSERT(apSuite, err == CHIP_NO_ERROR);
ctx.DrainAndServiceIO();

NL_TEST_ASSERT(apSuite, InteractionModelEngine::GetInstance()->GetNumActiveWriteHandlers() == 1);
NL_TEST_ASSERT(apSuite, ctx.GetLoopback().mSentMessageCount == 3);
NL_TEST_ASSERT(apSuite, ctx.GetLoopback().mDroppedMessageCount == 1);

ctx.GetFabricTable().Delete(ctx.GetAliceFabricIndex());
NL_TEST_ASSERT(apSuite, InteractionModelEngine::GetInstance()->GetNumActiveWriteHandlers() == 0);
engine->Shutdown();
ctx.ExpireSessionAliceToBob();
ctx.ExpireSessionBobToAlice();
ctx.CreateAliceFabric();
ctx.CreateSessionAliceToBob();
ctx.CreateSessionBobToAlice();
}

#endif

// Write Client sends a write request, receives an unexpected message type, sends a status response to that.
Expand Down Expand Up @@ -917,6 +968,7 @@ const nlTest sTests[] =
NL_TEST_DEF("TestWriteRoundtripWithClusterObjectsVersionMismatch", chip::app::TestWriteInteraction::TestWriteRoundtripWithClusterObjectsVersionMismatch),
#if CONFIG_BUILD_FOR_HOST_UNIT_TEST
NL_TEST_DEF("TestWriteHandlerReceiveInvalidMessage", chip::app::TestWriteInteraction::TestWriteHandlerReceiveInvalidMessage),
NL_TEST_DEF("TestWriteHandlerInvalidateFabric", chip::app::TestWriteInteraction::TestWriteHandlerInvalidateFabric),
#endif
NL_TEST_DEF("TestWriteInvalidMessage1", chip::app::TestWriteInteraction::TestWriteInvalidMessage1),
NL_TEST_DEF("TestWriteInvalidMessage2", chip::app::TestWriteInteraction::TestWriteInvalidMessage2),
Expand Down
4 changes: 2 additions & 2 deletions src/lib/core/CHIPError.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -545,8 +545,8 @@ bool FormatCHIPError(char * buf, uint16_t bufSize, CHIP_ERROR err)
case CHIP_ERROR_PERSISTED_STORAGE_VALUE_NOT_FOUND.AsInteger():
desc = "Value not found in the persisted storage";
break;
case CHIP_ERROR_PROFILE_STRING_CONTEXT_ALREADY_REGISTERED.AsInteger():
desc = "String context already registered";
case CHIP_ERROR_IM_FABRIC_DELETED.AsInteger():
desc = "The fabric is deleted, and the corresponding IM resources are released";
break;
case CHIP_ERROR_PROFILE_STRING_CONTEXT_NOT_REGISTERED.AsInteger():
desc = "String context not registered";
Expand Down
9 changes: 4 additions & 5 deletions src/lib/core/CHIPError.h
Original file line number Diff line number Diff line change
Expand Up @@ -1879,13 +1879,12 @@ using CHIP_ERROR = ::chip::ChipError;
#define CHIP_ERROR_PERSISTED_STORAGE_VALUE_NOT_FOUND CHIP_CORE_ERROR(0xa0)

/**
* @def CHIP_ERROR_PROFILE_STRING_CONTEXT_ALREADY_REGISTERED
*
* @brief
* The specified profile string support context is already registered.
* @def CHIP_ERROR_IM_FABRIC_DELETED
*
* @brief
* The fabric is deleted, and the corresponding IM resources are released
*/
#define CHIP_ERROR_PROFILE_STRING_CONTEXT_ALREADY_REGISTERED CHIP_CORE_ERROR(0xa1)
#define CHIP_ERROR_IM_FABRIC_DELETED CHIP_CORE_ERROR(0xa1)

/**
* @def CHIP_ERROR_PROFILE_STRING_CONTEXT_NOT_REGISTERED
Expand Down
2 changes: 1 addition & 1 deletion src/lib/core/tests/TestCHIPErrorStr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ static const CHIP_ERROR kTestElements[] =
CHIP_ERROR_DEFAULT_EVENT_HANDLER_NOT_CALLED,
CHIP_ERROR_PERSISTED_STORAGE_FAILED,
CHIP_ERROR_PERSISTED_STORAGE_VALUE_NOT_FOUND,
CHIP_ERROR_PROFILE_STRING_CONTEXT_ALREADY_REGISTERED,
CHIP_ERROR_IM_FABRIC_DELETED,
CHIP_ERROR_PROFILE_STRING_CONTEXT_NOT_REGISTERED,
CHIP_ERROR_INCOMPATIBLE_SCHEMA_VERSION,
CHIP_ERROR_ACCESS_DENIED,
Expand Down
Loading

0 comments on commit 1987793

Please sign in to comment.