Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Application Callback for Subscription Handling #16517

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/app/EventLoggingTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ struct EventLoadOutContext
Timestamp mCurrentTime;
EventNumber mCurrentEventNumber = 0;
size_t mEventCount = 0;
ObjectList<EventPathParams> * mpInterestedEventPaths = nullptr;
const ObjectList<EventPathParams> * mpInterestedEventPaths = nullptr;
bool mFirst = true;
Access::SubjectDescriptor mSubjectDescriptor;
};
Expand Down
2 changes: 1 addition & 1 deletion src/app/EventManagement.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,7 @@ CHIP_ERROR EventManagement::CopyEventsSince(const TLVReader & aReader, size_t aD
return err;
}

CHIP_ERROR EventManagement::FetchEventsSince(TLVWriter & aWriter, ObjectList<EventPathParams> * apEventPathList,
CHIP_ERROR EventManagement::FetchEventsSince(TLVWriter & aWriter, const ObjectList<EventPathParams> * apEventPathList,
EventNumber & aEventMin, size_t & aEventCount,
const Access::SubjectDescriptor & aSubjectDescriptor)
{
Expand Down
2 changes: 1 addition & 1 deletion src/app/EventManagement.h
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ class EventManagement
* available.
*
*/
CHIP_ERROR FetchEventsSince(chip::TLV::TLVWriter & aWriter, ObjectList<EventPathParams> * apEventPathList,
CHIP_ERROR FetchEventsSince(chip::TLV::TLVWriter & aWriter, const ObjectList<EventPathParams> * apEventPathList,
EventNumber & aEventMin, size_t & aEventCount,
const Access::SubjectDescriptor & aSubjectDescriptor);

Expand Down
17 changes: 16 additions & 1 deletion src/app/InteractionModelEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ namespace app {
* handlers
*
*/
class InteractionModelEngine : public Messaging::ExchangeDelegate, public CommandHandler::Callback, public ReadHandler::Callback
class InteractionModelEngine : public Messaging::ExchangeDelegate,
public CommandHandler::Callback,
public ReadHandler::ManagementCallback
{
public:
/**
Expand Down Expand Up @@ -157,6 +159,15 @@ class InteractionModelEngine : public Messaging::ExchangeDelegate, public Comman
CommandHandlerInterface * FindCommandHandler(EndpointId endpointId, ClusterId clusterId);
void UnregisterCommandHandlers(EndpointId endpointId);

/*
* Register an application callback to be notified of notable events when handling reads/subscribes.
*/
void RegisterReadHandlerAppCallback(ReadHandler::ApplicationCallback * mpApplicationCallback)
{
mpReadHandlerApplicationCallback = mpApplicationCallback;
}
void UnregisterReadHandlerAppCallback() { mpReadHandlerApplicationCallback = nullptr; }
mrjerryjohns marked this conversation as resolved.
Show resolved Hide resolved

/**
* Called when a timed interaction has failed (i.e. the exchange it was
* happening on has closed while the exchange delegate was the timed
Expand Down Expand Up @@ -252,6 +263,8 @@ class InteractionModelEngine : public Messaging::ExchangeDelegate, public Comman
void OnDone(CommandHandler & apCommandObj) override;
void OnDone(ReadHandler & apReadObj) override;

ReadHandler::ApplicationCallback * GetAppCallback() override { return mpReadHandlerApplicationCallback; }

/**
* Called when Interaction Model receives a Command Request message. Errors processing
* the Command Request are handled entirely within this function. The caller pre-sets status to failure and the callee is
Expand Down Expand Up @@ -325,6 +338,8 @@ class InteractionModelEngine : public Messaging::ExchangeDelegate, public Comman
ObjectPool<ObjectList<DataVersionFilter>, CHIP_IM_SERVER_MAX_NUM_PATH_GROUPS> mDataVersionFilterPool;
ReadClient * mpActiveReadClientList = nullptr;

ReadHandler::ApplicationCallback * mpReadHandlerApplicationCallback = nullptr;

#if CONFIG_IM_BUILD_FOR_UNIT_TEST
int mReadHandlerCapacityOverride = -1;
#endif
Expand Down
3 changes: 2 additions & 1 deletion src/app/ReadClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -766,10 +766,11 @@ CHIP_ERROR ReadClient::ProcessSubscribeResponse(System::PacketBufferHandle && aP
ChipLogValueX64(mPeerNodeId));

ReturnErrorOnFailure(subscribeResponse.ExitContainer());
mpCallback.OnSubscriptionEstablished(subscriptionId);

MoveToState(ClientState::SubscriptionActive);

mpCallback.OnSubscriptionEstablished(subscriptionId);

if (mReadPrepareParams.mResubscribePolicy != nullptr)
{
mNumRetries = 0;
Expand Down
15 changes: 15 additions & 0 deletions src/app/ReadClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,21 @@ class ReadClient : public Messaging::ExchangeDelegate
bool IsReadType() { return mInteractionType == InteractionType::Read; }
bool IsSubscriptionType() const { return mInteractionType == InteractionType::Subscribe; };

/*
* Retrieve the reporting intervals associated with an active subscription. This should only be called if we're of subscription
* interaction type and after a subscription has been established.
*/
CHIP_ERROR GetReportingIntervals(uint16_t & aMinIntervalFloorSeconds, uint16_t & aMaxIntervalCeilingSeconds) const
{
VerifyOrReturnError(IsSubscriptionType(), CHIP_ERROR_INCORRECT_STATE);
VerifyOrReturnError(IsSubscriptionIdle(), CHIP_ERROR_INCORRECT_STATE);

aMinIntervalFloorSeconds = mMinIntervalFloorSeconds;
aMaxIntervalCeilingSeconds = mMaxIntervalCeilingSeconds;

return CHIP_NO_ERROR;
}

ReadClient * GetNextClient() { return mpNext; }
void SetNextClient(ReadClient * apClient) { mpNext = apClient; }

Expand Down
40 changes: 36 additions & 4 deletions src/app/ReadHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@
namespace chip {
namespace app {

ReadHandler::ReadHandler(Callback & apCallback, Messaging::ExchangeContext * apExchangeContext, InteractionType aInteractionType) :
mCallback(apCallback)
ReadHandler::ReadHandler(ManagementCallback & apCallback, Messaging::ExchangeContext * apExchangeContext,
InteractionType aInteractionType) :
mManagementCallback(apCallback)
{
mpExchangeMgr = apExchangeContext->GetExchangeMgr();
mpExchangeCtx = apExchangeContext;
Expand Down Expand Up @@ -80,6 +81,12 @@ void ReadHandler::Abort(bool aCalledFromDestructor)

ReadHandler::~ReadHandler()
{
auto * appCallback = mManagementCallback.GetAppCallback();
if (mActiveSubscription && appCallback)
{
appCallback->OnSubscriptionTerminated(*this);
}

Abort(true);

if (IsType(InteractionType::Subscribe))
Expand Down Expand Up @@ -109,7 +116,7 @@ void ReadHandler::Close()
}

MoveToState(HandlerState::AwaitingDestruction);
mCallback.OnDone(*this);
mManagementCallback.OnDone(*this);
}

CHIP_ERROR ReadHandler::OnInitialRequest(System::PacketBufferHandle && aPayload)
Expand Down Expand Up @@ -159,10 +166,18 @@ CHIP_ERROR ReadHandler::OnStatusResponse(Messaging::ExchangeContext * apExchange
{
if (IsPriming())
{
err = SendSubscribeResponse();
err = SendSubscribeResponse();

mpExchangeCtx = nullptr;
SuccessOrExit(err);

mActiveSubscription = true;

auto * appCallback = mManagementCallback.GetAppCallback();
if (appCallback)
{
appCallback->OnSubscriptionEstablished(*this);
}
}
else
{
Expand Down Expand Up @@ -692,6 +707,23 @@ CHIP_ERROR ReadHandler::ProcessSubscribeRequest(System::PacketBufferHandle && aP
ReturnErrorOnFailure(subscribeRequestParser.GetMinIntervalFloorSeconds(&mMinIntervalFloorSeconds));
ReturnErrorOnFailure(subscribeRequestParser.GetMaxIntervalCeilingSeconds(&mMaxIntervalCeilingSeconds));
VerifyOrReturnError(mMinIntervalFloorSeconds <= mMaxIntervalCeilingSeconds, CHIP_ERROR_INVALID_ARGUMENT);

//
// Notify the application (if requested) of the impending subscription and check whether we should still proceed to set it up.
// This also provides the application an opportunity to modify the negotiated min/max intervals set above.
//
auto * appCallback = mManagementCallback.GetAppCallback();
if (appCallback)
{
if (appCallback->OnSubscriptionRequested(*this, *mpExchangeCtx->GetSessionHandle()->AsSecureSession()) != CHIP_NO_ERROR)
{
return CHIP_ERROR_TRANSACTION_CANCELED;
bzbarsky-apple marked this conversation as resolved.
Show resolved Hide resolved
}
}

ChipLogProgress(DataManagement, "Final negotiated min/max parameters: Min = %ds, Max = %ds", mMinIntervalFloorSeconds,
mMaxIntervalCeilingSeconds);

ReturnErrorOnFailure(subscribeRequestParser.GetIsFabricFiltered(&mIsFabricFiltered));
ReturnErrorOnFailure(Crypto::DRBG_get_bytes(reinterpret_cast<uint8_t *>(&mSubscriptionId), sizeof(mSubscriptionId)));
ReturnErrorOnFailure(subscribeRequestParser.ExitContainer());
Expand Down
116 changes: 99 additions & 17 deletions src/app/ReadHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ namespace app {
//
namespace reporting {
class Engine;
}
class TestReportingEngine;
} // namespace reporting

class InteractionModelEngine;

/**
* @class ReadHandler
Expand All @@ -74,34 +77,116 @@ class ReadHandler : public Messaging::ExchangeDelegate
Subscribe,
};

class Callback
/*
* A callback used to interact with the application.
*/
class ApplicationCallback
{
public:
virtual ~ApplicationCallback() = default;

/*
* Called right after a SubscribeRequest has been parsed and processed. This notifies an interested application
* of a subscription that is about to be established. It also provides an avenue for altering the parameters of the
* subscription (specifically, the min/max negotiated intervals) or even outright rejecting the subscription for
* application-specific reasons.
*
* TODO: Need a new IM status code to convey application-rejected subscribes. Currently, a Failure IM status code is sent
* back to the subscriber, which isn't sufficient.
*
* To reject the subscription, a CHIP_ERROR code that is not equivalent to CHIP_NO_ERROR should be returned.
*
* More information about the set of paths associated with this subscription can be retrieved by calling the appropriate
* Get* methods below.
*
* aReadHandler: Reference to the ReadHandler associated with the subscription.
* aSecureSession: A reference to the underlying secure session associated with the subscription.
*
*/
virtual CHIP_ERROR OnSubscriptionRequested(ReadHandler & aReadHandler, Transport::SecureSession & aSecureSession)
{
return CHIP_NO_ERROR;
}

/*
* Called after a subscription has been fully established.
*/
virtual void OnSubscriptionEstablished(ReadHandler & aReadHandler){};

/*
* Called right before a subscription is about to get terminated. This is only called on subscriptions that were terminated
* after they had been fully established (and therefore had called OnSubscriptionEstablished).
* OnSubscriptionEstablishment().
*/
virtual void OnSubscriptionTerminated(ReadHandler & aReadHandler){};
};

/*
* A callback used to manage the lifetime of the ReadHandler object.
*/
class ManagementCallback
{
public:
virtual ~Callback() = default;
virtual ~ManagementCallback() = default;

/*
* Method that signals to a registered callback that this object
* has completed doing useful work and is now safe for release/destruction.
*/
virtual void OnDone(ReadHandler & apReadHandlerObj) = 0;

/*
* Retrieve the ApplicationCallback (if a valid one exists) from our management entity. This avoids
* storing multiple references to the application provided callback and having to subsequently manage lifetime
* issues w.r.t the ReadHandler itself.
*/
virtual ApplicationCallback * GetAppCallback() = 0;
};

/*
* Destructor - as part of destruction, it will abort the exchange context
* if a valid one still exists.
*
* See Abort() for details on when that might occur.
*/
~ReadHandler() override;

/**
*
* Constructor.
*
* The callback passed in has to outlive this handler object.
*
*/
ReadHandler(Callback & apCallback, Messaging::ExchangeContext * apExchangeContext, InteractionType aInteractionType);
ReadHandler(ManagementCallback & apCallback, Messaging::ExchangeContext * apExchangeContext, InteractionType aInteractionType);

const ObjectList<AttributePathParams> * GetAttributePathList() const { return mpAttributePathList; }
mrjerryjohns marked this conversation as resolved.
Show resolved Hide resolved
const ObjectList<EventPathParams> * GetEventPathList() const { return mpEventPathList; }
const ObjectList<DataVersionFilter> * GetDataVersionFilterList() const { return mpDataVersionFilterList; }

void GetReportingIntervals(uint16_t & aMinInterval, uint16_t & aMaxInterval) const
{
aMinInterval = mMinIntervalFloorSeconds;
aMaxInterval = mMaxIntervalCeilingSeconds;
}

/*
* Destructor - as part of destruction, it will abort the exchange context
* if a valid one still exists.
*
* See Abort() for details on when that might occur.
* Set the reporting intervals for the subscription. This SHALL only be called
* from the OnSubscriptionRequested callback above.
*/
~ReadHandler() override;
CHIP_ERROR SetReportingIntervals(uint16_t aMinInterval, uint16_t aMaxInterval)
{
VerifyOrReturnError(IsIdle(), CHIP_ERROR_INCORRECT_STATE);
VerifyOrReturnError(aMinInterval <= aMaxInterval, CHIP_ERROR_INVALID_ARGUMENT);

mMinIntervalFloorSeconds = aMinInterval;
mMaxIntervalCeilingSeconds = aMaxInterval;
return CHIP_NO_ERROR;
}

private:
PriorityLevel GetCurrentPriority() const { return mCurrentPriority; }
EventNumber GetEventMin() const { return mEventMin; }
mrjerryjohns marked this conversation as resolved.
Show resolved Hide resolved

/**
* Process a read/subscribe request. Parts of the processing may end up being asynchronous, but the ReadHandler
Expand Down Expand Up @@ -131,17 +216,13 @@ class ReadHandler : public Messaging::ExchangeDelegate
*/
bool IsFromSubscriber(Messaging::ExchangeContext & apExchangeContext) const;

bool IsIdle() const { return mState == HandlerState::Idle; }
bool IsReportable() const { return mState == HandlerState::GeneratingReports && !mHoldReport && (mDirty || !mHoldSync); }
bool IsGeneratingReports() const { return mState == HandlerState::GeneratingReports; }
bool IsAwaitingReportResponse() const { return mState == HandlerState::AwaitingReportResponse; }

CHIP_ERROR ProcessDataVersionFilterList(DataVersionFilterIBs::Parser & aDataVersionFilterListParser);
ObjectList<AttributePathParams> * GetAttributePathList() { return mpAttributePathList; }
ObjectList<EventPathParams> * GetEventPathList() { return mpEventPathList; }
ObjectList<DataVersionFilter> * GetDataVersionFilterList() const { return mpDataVersionFilterList; }
EventNumber & GetEventMin() { return mEventMin; }
PriorityLevel GetCurrentPriority() { return mCurrentPriority; }


// if current priority is in the middle, it has valid snapshoted last event number, it check cleaness via comparing
// with snapshotted last event number. if current priority is in the end, no valid
// sanpshotted last event, check with latest last event number, re-setup snapshoted checkpoint, and compare again.
Expand Down Expand Up @@ -181,15 +262,16 @@ class ReadHandler : public Messaging::ExchangeDelegate
uint32_t GetLastWrittenEventsBytes() const { return mLastWrittenEventsBytes; }
CHIP_ERROR SendStatusReport(Protocols::InteractionModel::Status aStatus);

private:
friend class TestReadInteraction;
friend class chip::app::reporting::TestReportingEngine;

//
// The engine needs to be able to Abort/Close a ReadHandler instance upon completion of work for a given read/subscribe
// interaction. We do not want to make these methods public just to give an adjacent class in the IM access, since public
// should really be taking application usage considerations as well. Hence, make it a friend.
//
friend class chip::app::reporting::Engine;
friend class chip::app::InteractionModelEngine;

enum class HandlerState
{
Expand Down Expand Up @@ -255,7 +337,7 @@ class ReadHandler : public Messaging::ExchangeDelegate
// The last schedule event number snapshoted in the beginning when preparing to fill new events to reports
EventNumber mLastScheduledEventNumber = 0;
Messaging::ExchangeManager * mpExchangeMgr = nullptr;
Callback & mCallback;
ManagementCallback & mManagementCallback;

// Tracks whether we're in the initial phase of receiving priming
// reports, which is always true for reads and true for subscriptions
Expand Down
12 changes: 6 additions & 6 deletions src/app/reporting/Engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ void Engine::Shutdown()
mGlobalDirtySet.ReleaseAll();
}

bool Engine::IsClusterDataVersionMatch(ObjectList<DataVersionFilter> * aDataVersionFilterList,
bool Engine::IsClusterDataVersionMatch(const ObjectList<DataVersionFilter> * aDataVersionFilterList,
const ConcreteReadAttributePath & aPath)
{
bool existPathMatch = false;
Expand Down Expand Up @@ -267,11 +267,11 @@ CHIP_ERROR Engine::BuildSingleReportDataEventReports(ReportDataMessage::Builder
CHIP_ERROR err = CHIP_NO_ERROR;
size_t eventCount = 0;
TLV::TLVWriter backup;
bool eventClean = true;
ObjectList<EventPathParams> * eventList = apReadHandler->GetEventPathList();
EventNumber & eventMin = apReadHandler->GetEventMin();
EventManagement & eventManager = EventManagement::GetInstance();
bool hasMoreChunks = false;
bool eventClean = true;
const auto* eventList = apReadHandler->GetEventPathList();
EventNumber eventMin = apReadHandler->GetEventMin();
mrjerryjohns marked this conversation as resolved.
Show resolved Hide resolved
EventManagement & eventManager = EventManagement::GetInstance();
bool hasMoreChunks = false;

aReportDataBuilder.Checkpoint(backup);

Expand Down
Loading