Skip to content

Commit

Permalink
Add Application Callback for Subscription Handling (project-chip#16517)
Browse files Browse the repository at this point in the history
* Add Application Callback for Subscription Handling

Adds a new ReadHandler::ApplicationCallback that permits notifying the
application of notable happenings on subscriptions as well as giving
them a chance to alter the min/max intervals setup as part of
subscriptions.

Tests:
- Add some unit tests to TestRead to validate the positive/negative behaviors added.

* Apply suggestions from code review

Co-authored-by: Boris Zbarsky <[email protected]>

* Review feedback.

* Removing a bunch of accidentally added files...

Co-authored-by: Boris Zbarsky <[email protected]>
  • Loading branch information
2 people authored and andrei-menzopol committed Apr 14, 2022
1 parent fe7d1c6 commit 53ec3f5
Show file tree
Hide file tree
Showing 16 changed files with 383 additions and 45 deletions.
8 changes: 4 additions & 4 deletions src/app/EventLoggingTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,10 @@ struct EventLoadOutContext
EventNumber mStartingEventNumber = 0;
Timestamp mPreviousTime;
Timestamp mCurrentTime;
EventNumber mCurrentEventNumber = 0;
size_t mEventCount = 0;
ObjectList<EventPathParams> * mpInterestedEventPaths = nullptr;
bool mFirst = true;
EventNumber mCurrentEventNumber = 0;
size_t mEventCount = 0;
const ObjectList<EventPathParams> * mpInterestedEventPaths = nullptr;
bool mFirst = true;
Access::SubjectDescriptor mSubjectDescriptor;
};
} // namespace app
Expand Down
2 changes: 1 addition & 1 deletion src/app/EventManagement.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,7 @@ CHIP_ERROR EventManagement::CopyEventsSince(const TLVReader & aReader, size_t aD
return err;
}

CHIP_ERROR EventManagement::FetchEventsSince(TLVWriter & aWriter, ObjectList<EventPathParams> * apEventPathList,
CHIP_ERROR EventManagement::FetchEventsSince(TLVWriter & aWriter, const ObjectList<EventPathParams> * apEventPathList,
EventNumber & aEventMin, size_t & aEventCount,
const Access::SubjectDescriptor & aSubjectDescriptor)
{
Expand Down
2 changes: 1 addition & 1 deletion src/app/EventManagement.h
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ class EventManagement
* available.
*
*/
CHIP_ERROR FetchEventsSince(chip::TLV::TLVWriter & aWriter, ObjectList<EventPathParams> * apEventPathList,
CHIP_ERROR FetchEventsSince(chip::TLV::TLVWriter & aWriter, const ObjectList<EventPathParams> * apEventPathList,
EventNumber & aEventMin, size_t & aEventCount,
const Access::SubjectDescriptor & aSubjectDescriptor);

Expand Down
17 changes: 16 additions & 1 deletion src/app/InteractionModelEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ namespace app {
* handlers
*
*/
class InteractionModelEngine : public Messaging::ExchangeDelegate, public CommandHandler::Callback, public ReadHandler::Callback
class InteractionModelEngine : public Messaging::ExchangeDelegate,
public CommandHandler::Callback,
public ReadHandler::ManagementCallback
{
public:
/**
Expand Down Expand Up @@ -157,6 +159,15 @@ class InteractionModelEngine : public Messaging::ExchangeDelegate, public Comman
CommandHandlerInterface * FindCommandHandler(EndpointId endpointId, ClusterId clusterId);
void UnregisterCommandHandlers(EndpointId endpointId);

/*
* Register an application callback to be notified of notable events when handling reads/subscribes.
*/
void RegisterReadHandlerAppCallback(ReadHandler::ApplicationCallback * mpApplicationCallback)
{
mpReadHandlerApplicationCallback = mpApplicationCallback;
}
void UnregisterReadHandlerAppCallback() { mpReadHandlerApplicationCallback = nullptr; }

/**
* Called when a timed interaction has failed (i.e. the exchange it was
* happening on has closed while the exchange delegate was the timed
Expand Down Expand Up @@ -252,6 +263,8 @@ class InteractionModelEngine : public Messaging::ExchangeDelegate, public Comman
void OnDone(CommandHandler & apCommandObj) override;
void OnDone(ReadHandler & apReadObj) override;

ReadHandler::ApplicationCallback * GetAppCallback() override { return mpReadHandlerApplicationCallback; }

/**
* Called when Interaction Model receives a Command Request message. Errors processing
* the Command Request are handled entirely within this function. The caller pre-sets status to failure and the callee is
Expand Down Expand Up @@ -325,6 +338,8 @@ class InteractionModelEngine : public Messaging::ExchangeDelegate, public Comman
ObjectPool<ObjectList<DataVersionFilter>, CHIP_IM_SERVER_MAX_NUM_PATH_GROUPS> mDataVersionFilterPool;
ReadClient * mpActiveReadClientList = nullptr;

ReadHandler::ApplicationCallback * mpReadHandlerApplicationCallback = nullptr;

#if CONFIG_IM_BUILD_FOR_UNIT_TEST
int mReadHandlerCapacityOverride = -1;
#endif
Expand Down
3 changes: 2 additions & 1 deletion src/app/ReadClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -760,10 +760,11 @@ CHIP_ERROR ReadClient::ProcessSubscribeResponse(System::PacketBufferHandle && aP
ChipLogValueX64(mPeerNodeId));

ReturnErrorOnFailure(subscribeResponse.ExitContainer());
mpCallback.OnSubscriptionEstablished(subscriptionId);

MoveToState(ClientState::SubscriptionActive);

mpCallback.OnSubscriptionEstablished(subscriptionId);

if (mReadPrepareParams.mResubscribePolicy != nullptr)
{
mNumRetries = 0;
Expand Down
15 changes: 15 additions & 0 deletions src/app/ReadClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,21 @@ class ReadClient : public Messaging::ExchangeDelegate
bool IsReadType() { return mInteractionType == InteractionType::Read; }
bool IsSubscriptionType() const { return mInteractionType == InteractionType::Subscribe; };

/*
* Retrieve the reporting intervals associated with an active subscription. This should only be called if we're of subscription
* interaction type and after a subscription has been established.
*/
CHIP_ERROR GetReportingIntervals(uint16_t & aMinIntervalFloorSeconds, uint16_t & aMaxIntervalCeilingSeconds) const
{
VerifyOrReturnError(IsSubscriptionType(), CHIP_ERROR_INCORRECT_STATE);
VerifyOrReturnError(IsSubscriptionIdle(), CHIP_ERROR_INCORRECT_STATE);

aMinIntervalFloorSeconds = mMinIntervalFloorSeconds;
aMaxIntervalCeilingSeconds = mMaxIntervalCeilingSeconds;

return CHIP_NO_ERROR;
}

ReadClient * GetNextClient() { return mpNext; }
void SetNextClient(ReadClient * apClient) { mpNext = apClient; }

Expand Down
40 changes: 36 additions & 4 deletions src/app/ReadHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@
namespace chip {
namespace app {

ReadHandler::ReadHandler(Callback & apCallback, Messaging::ExchangeContext * apExchangeContext, InteractionType aInteractionType) :
mCallback(apCallback)
ReadHandler::ReadHandler(ManagementCallback & apCallback, Messaging::ExchangeContext * apExchangeContext,
InteractionType aInteractionType) :
mManagementCallback(apCallback)
{
mpExchangeMgr = apExchangeContext->GetExchangeMgr();
mpExchangeCtx = apExchangeContext;
Expand Down Expand Up @@ -80,6 +81,12 @@ void ReadHandler::Abort(bool aCalledFromDestructor)

ReadHandler::~ReadHandler()
{
auto * appCallback = mManagementCallback.GetAppCallback();
if (mActiveSubscription && appCallback)
{
appCallback->OnSubscriptionTerminated(*this);
}

Abort(true);

if (IsType(InteractionType::Subscribe))
Expand Down Expand Up @@ -109,7 +116,7 @@ void ReadHandler::Close()
}

MoveToState(HandlerState::AwaitingDestruction);
mCallback.OnDone(*this);
mManagementCallback.OnDone(*this);
}

CHIP_ERROR ReadHandler::OnInitialRequest(System::PacketBufferHandle && aPayload)
Expand Down Expand Up @@ -159,10 +166,18 @@ CHIP_ERROR ReadHandler::OnStatusResponse(Messaging::ExchangeContext * apExchange
{
if (IsPriming())
{
err = SendSubscribeResponse();
err = SendSubscribeResponse();

mpExchangeCtx = nullptr;
SuccessOrExit(err);

mActiveSubscription = true;

auto * appCallback = mManagementCallback.GetAppCallback();
if (appCallback)
{
appCallback->OnSubscriptionEstablished(*this);
}
}
else
{
Expand Down Expand Up @@ -697,6 +712,23 @@ CHIP_ERROR ReadHandler::ProcessSubscribeRequest(System::PacketBufferHandle && aP
ReturnErrorOnFailure(subscribeRequestParser.GetMinIntervalFloorSeconds(&mMinIntervalFloorSeconds));
ReturnErrorOnFailure(subscribeRequestParser.GetMaxIntervalCeilingSeconds(&mMaxIntervalCeilingSeconds));
VerifyOrReturnError(mMinIntervalFloorSeconds <= mMaxIntervalCeilingSeconds, CHIP_ERROR_INVALID_ARGUMENT);

//
// Notify the application (if requested) of the impending subscription and check whether we should still proceed to set it up.
// This also provides the application an opportunity to modify the negotiated min/max intervals set above.
//
auto * appCallback = mManagementCallback.GetAppCallback();
if (appCallback)
{
if (appCallback->OnSubscriptionRequested(*this, *mpExchangeCtx->GetSessionHandle()->AsSecureSession()) != CHIP_NO_ERROR)
{
return CHIP_ERROR_TRANSACTION_CANCELED;
}
}

ChipLogProgress(DataManagement, "Final negotiated min/max parameters: Min = %ds, Max = %ds", mMinIntervalFloorSeconds,
mMaxIntervalCeilingSeconds);

ReturnErrorOnFailure(subscribeRequestParser.GetIsFabricFiltered(&mIsFabricFiltered));
ReturnErrorOnFailure(Crypto::DRBG_get_bytes(reinterpret_cast<uint8_t *>(&mSubscriptionId), sizeof(mSubscriptionId)));
ReturnErrorOnFailure(subscribeRequestParser.ExitContainer());
Expand Down
114 changes: 98 additions & 16 deletions src/app/ReadHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ namespace app {
//
namespace reporting {
class Engine;
}
class TestReportingEngine;
} // namespace reporting

class InteractionModelEngine;

/**
* @class ReadHandler
Expand All @@ -74,34 +77,116 @@ class ReadHandler : public Messaging::ExchangeDelegate
Subscribe,
};

class Callback
/*
* A callback used to interact with the application.
*/
class ApplicationCallback
{
public:
virtual ~Callback() = default;
virtual ~ApplicationCallback() = default;

/*
* Called right after a SubscribeRequest has been parsed and processed. This notifies an interested application
* of a subscription that is about to be established. It also provides an avenue for altering the parameters of the
* subscription (specifically, the min/max negotiated intervals) or even outright rejecting the subscription for
* application-specific reasons.
*
* TODO: Need a new IM status code to convey application-rejected subscribes. Currently, a Failure IM status code is sent
* back to the subscriber, which isn't sufficient.
*
* To reject the subscription, a CHIP_ERROR code that is not equivalent to CHIP_NO_ERROR should be returned.
*
* More information about the set of paths associated with this subscription can be retrieved by calling the appropriate
* Get* methods below.
*
* aReadHandler: Reference to the ReadHandler associated with the subscription.
* aSecureSession: A reference to the underlying secure session associated with the subscription.
*
*/
virtual CHIP_ERROR OnSubscriptionRequested(ReadHandler & aReadHandler, Transport::SecureSession & aSecureSession)
{
return CHIP_NO_ERROR;
}

/*
* Called after a subscription has been fully established.
*/
virtual void OnSubscriptionEstablished(ReadHandler & aReadHandler){};

/*
* Called right before a subscription is about to get terminated. This is only called on subscriptions that were terminated
* after they had been fully established (and therefore had called OnSubscriptionEstablished).
* OnSubscriptionEstablishment().
*/
virtual void OnSubscriptionTerminated(ReadHandler & aReadHandler){};
};

/*
* A callback used to manage the lifetime of the ReadHandler object.
*/
class ManagementCallback
{
public:
virtual ~ManagementCallback() = default;

/*
* Method that signals to a registered callback that this object
* has completed doing useful work and is now safe for release/destruction.
*/
virtual void OnDone(ReadHandler & apReadHandlerObj) = 0;

/*
* Retrieve the ApplicationCallback (if a valid one exists) from our management entity. This avoids
* storing multiple references to the application provided callback and having to subsequently manage lifetime
* issues w.r.t the ReadHandler itself.
*/
virtual ApplicationCallback * GetAppCallback() = 0;
};

/*
* Destructor - as part of destruction, it will abort the exchange context
* if a valid one still exists.
*
* See Abort() for details on when that might occur.
*/
~ReadHandler() override;

/**
*
* Constructor.
*
* The callback passed in has to outlive this handler object.
*
*/
ReadHandler(Callback & apCallback, Messaging::ExchangeContext * apExchangeContext, InteractionType aInteractionType);
ReadHandler(ManagementCallback & apCallback, Messaging::ExchangeContext * apExchangeContext, InteractionType aInteractionType);

const ObjectList<AttributePathParams> * GetAttributePathList() const { return mpAttributePathList; }
const ObjectList<EventPathParams> * GetEventPathList() const { return mpEventPathList; }
const ObjectList<DataVersionFilter> * GetDataVersionFilterList() const { return mpDataVersionFilterList; }

void GetReportingIntervals(uint16_t & aMinInterval, uint16_t & aMaxInterval) const
{
aMinInterval = mMinIntervalFloorSeconds;
aMaxInterval = mMaxIntervalCeilingSeconds;
}

/*
* Destructor - as part of destruction, it will abort the exchange context
* if a valid one still exists.
*
* See Abort() for details on when that might occur.
* Set the reporting intervals for the subscription. This SHALL only be called
* from the OnSubscriptionRequested callback above.
*/
~ReadHandler() override;
CHIP_ERROR SetReportingIntervals(uint16_t aMinInterval, uint16_t aMaxInterval)
{
VerifyOrReturnError(IsIdle(), CHIP_ERROR_INCORRECT_STATE);
VerifyOrReturnError(aMinInterval <= aMaxInterval, CHIP_ERROR_INVALID_ARGUMENT);

mMinIntervalFloorSeconds = aMinInterval;
mMaxIntervalCeilingSeconds = aMaxInterval;
return CHIP_NO_ERROR;
}

private:
PriorityLevel GetCurrentPriority() const { return mCurrentPriority; }
EventNumber & GetEventMin() { return mEventMin; }

/**
* Process a read/subscribe request. Parts of the processing may end up being asynchronous, but the ReadHandler
Expand Down Expand Up @@ -131,6 +216,7 @@ class ReadHandler : public Messaging::ExchangeDelegate
*/
bool IsFromSubscriber(Messaging::ExchangeContext & apExchangeContext) const;

bool IsIdle() const { return mState == HandlerState::Idle; }
bool IsReportable() const { return mState == HandlerState::GeneratingReports && !mHoldReport && (IsDirty() || !mHoldSync); }
bool IsGeneratingReports() const { return mState == HandlerState::GeneratingReports; }
bool IsAwaitingReportResponse() const { return mState == HandlerState::AwaitingReportResponse; }
Expand All @@ -139,11 +225,6 @@ class ReadHandler : public Messaging::ExchangeDelegate
void ResetPathIterator();

CHIP_ERROR ProcessDataVersionFilterList(DataVersionFilterIBs::Parser & aDataVersionFilterListParser);
ObjectList<AttributePathParams> * GetAttributePathList() { return mpAttributePathList; }
ObjectList<EventPathParams> * GetEventPathList() { return mpEventPathList; }
ObjectList<DataVersionFilter> * GetDataVersionFilterList() const { return mpDataVersionFilterList; }
EventNumber & GetEventMin() { return mEventMin; }
PriorityLevel GetCurrentPriority() { return mCurrentPriority; }

// if current priority is in the middle, it has valid snapshoted last event number, it check cleaness via comparing
// with snapshotted last event number. if current priority is in the end, no valid
Expand Down Expand Up @@ -185,15 +266,16 @@ class ReadHandler : public Messaging::ExchangeDelegate
uint32_t GetLastWrittenEventsBytes() const { return mLastWrittenEventsBytes; }
CHIP_ERROR SendStatusReport(Protocols::InteractionModel::Status aStatus);

private:
friend class TestReadInteraction;
friend class chip::app::reporting::TestReportingEngine;

//
// The engine needs to be able to Abort/Close a ReadHandler instance upon completion of work for a given read/subscribe
// interaction. We do not want to make these methods public just to give an adjacent class in the IM access, since public
// should really be taking application usage considerations as well. Hence, make it a friend.
//
friend class chip::app::reporting::Engine;
friend class chip::app::InteractionModelEngine;

enum class HandlerState
{
Expand Down Expand Up @@ -259,7 +341,7 @@ class ReadHandler : public Messaging::ExchangeDelegate
// The last schedule event number snapshoted in the beginning when preparing to fill new events to reports
EventNumber mLastScheduledEventNumber = 0;
Messaging::ExchangeManager * mpExchangeMgr = nullptr;
Callback & mCallback;
ManagementCallback & mManagementCallback;

// Tracks whether we're in the initial phase of receiving priming
// reports, which is always true for reads and true for subscriptions
Expand Down
Loading

0 comments on commit 53ec3f5

Please sign in to comment.