Skip to content

Commit

Permalink
[Darwin] MTRDevice should coalesce reads and avoid duplicates
Browse files Browse the repository at this point in the history
  • Loading branch information
jtung-apple committed Jun 1, 2023
1 parent 6d5111b commit 6d7b16f
Show file tree
Hide file tree
Showing 5 changed files with 391 additions and 26 deletions.
46 changes: 45 additions & 1 deletion src/darwin/Framework/CHIP/MTRAsyncCallbackWorkQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ NS_ASSUME_NONNULL_BEGIN

typedef void (^MTRAsyncCallbackReadyHandler)(id context, NSUInteger retryCount);

// The batching handler is called by the work queue when a work item is batchable. The handler will be passed the opaque data from
// the current and the next work item, and should the next item's data be fully merged into the first, the fullyMerged BOOL should
// be set to YES, so that the work queue can remove the "next item" from the queue. And when fullyMerged is set to YES, this handler
// will be called again, if the following item is also batchable with the same ID.
typedef void (^MTRAsyncCallbackBatchingHandler)(id opaqueDataCurrent, id opaqueDataNext, BOOL * fullyMerged);

// The duplicate check handler is called by the work queue when the client wishes to verify if a work item is a duplicate of an
// existing one, so that the client can decide to not enqueue the new duplicate. The work queue will
typedef void (^MTRAsyncCallbackDuplicateCheckHandler)(id opaqueItemData, BOOL * isDuplicate);

// MTRAsyncCallbackQueue high level description
// The MTRAsyncCallbackQueue was made to call one readyHandler
// block at a time asynchronously, and the readyHandler is
Expand All @@ -42,6 +52,23 @@ typedef void (^MTRAsyncCallbackReadyHandler)(id context, NSUInteger retryCount);
// - Set the readyHandler block on the WorkItem object
// - Call enqueueWorkItem on a MTRAsyncCallbackQueue

// Optional feature: Work Item Batching
// When a work item is dequeued to run, if it is of a type that can be combined with similar work items in a batch, this facility
// gives the client of this API an opportunity to coalesce and merge work items.
// - The "batching ID" is used for grouping mergeable work items with unique merging strategies. The ID value is opaque to this
// API, and the API client is responsible for assinging them.
// - Each work item will only be asked to batch before it's first dequeued to run readyHandler.
// See the MTRAsyncCallbackBatchingHandler definition above and the WorkItem's -setBatchingID:data:handler: method description for
// more details.

// Optional feature: Duplicate Filtering
// This is a facility that enables the API client to check if a potential work item has already been enqueued. By providing a
// handler that can answer if a work item's relevant data is a duplicate, it can avoid redundant queuing of requests.
// - The "duplicate type ID" is used for grouping different types of work items for duplicate checking. The ID value is opaque
// to this API, and the API client is responsible for assinging them.
// See the MTRAsyncCallbackDuplicateCheckHandler definition above and the WorkItem's -setDuplicateTypeID:handler: method description
// for more details.

// A serial one-at-a-time queue for performing work items
@interface MTRAsyncCallbackWorkQueue : NSObject
- (instancetype)init NS_UNAVAILABLE;
Expand All @@ -57,7 +84,12 @@ typedef void (^MTRAsyncCallbackReadyHandler)(id context, NSUInteger retryCount);
// Note: Once a work item is enqueued, its handlers cannot be modified
- (void)enqueueWorkItem:(MTRAsyncCallbackQueueWorkItem *)item;

// TODO: Add a "set concurrency width" method to allow for more than 1 work item at a time
// Before creating a work item, a client may call this method to check with existing work items that the new potential work item
// data is not a duplicate request. The work queue will then look for all work items matching the duplicate type ID, and call their
// duplicateCheckHandler.
//
// Returns YES if any item's duplicateCheckHandler returns a match.
- (BOOL)isDuplicateForTypeID:(NSUInteger)opaqueDuplicateTypeID workItemData:(id)opaqueWorkItemData;
@end

// An item in the work queue
Expand All @@ -70,6 +102,18 @@ typedef void (^MTRAsyncCallbackReadyHandler)(id context, NSUInteger retryCount);
@property (nonatomic, strong) MTRAsyncCallbackReadyHandler readyHandler;
@property (nonatomic, strong) dispatch_block_t cancelHandler;

// For work items that can be merged into a batch, set this handler with an identifier and an object that represents the mergeable
// data. When the work queue processes a batchable item, if the next item is also batchable with the same batching identifier, the
// work queue will call the batchingHandler to give the work item an opportunity to merge the data before readyHandler is called.
// Should the two items be completely merged into one batch, the batchingHandler can signal that through the out argument
// "fullyMerged", and the work queue will remove the second item.
- (void)setBatchingID:(NSUInteger)opaqueBatchingID
data:(id)opaqueBatchableData
handler:(MTRAsyncCallbackBatchingHandler)batchingHandler;

// For work items that may have duplicates, set this handler with an identifier and a handler
- (void)setDuplicateTypeID:(NSUInteger)opaqueDuplicateTypeID handler:(MTRAsyncCallbackDuplicateCheckHandler)duplicateCheckHandler;

// Called by the creater of the work item when async work is done and should
// be removed from the queue. The work queue will run the next work item.
// Note: This must only be called from within the readyHandler
Expand Down
64 changes: 63 additions & 1 deletion src/darwin/Framework/CHIP/MTRAsyncCallbackWorkQueue.mm
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#import <dispatch/dispatch.h>
#import <os/lock.h>

#import "MTRAsyncCallbackWorkQueue.h"
#import "MTRAsyncCallbackWorkQueue_Internal.h"
#import "MTRLogging_Internal.h"

#pragma mark - Class extensions
Expand Down Expand Up @@ -169,9 +169,51 @@ - (void)_callNextReadyWorkItem
self.runningWorkItemCount = 1;

MTRAsyncCallbackQueueWorkItem * workItem = self.items.firstObject;

// Check if batching is possible or needed. Only ask work item to batch once for simplicity
if (workItem.batchable && workItem.batchingHandler && (workItem.retryCount == 0)) {
while (self.items.count >= 2) {
MTRAsyncCallbackQueueWorkItem * nextWorkItem = self.items[1];
if (!nextWorkItem.batchable || (nextWorkItem.batchingID != workItem.batchingID)) {
// next item is not eligible to merge with this one
break;
}

BOOL fullyMerged = NO;
workItem.batchingHandler(workItem.batchableData, nextWorkItem.batchableData, &fullyMerged);
MTR_LOG_DEFAULT(
"MTRAsyncCallbackWorkQueue: merged items for batching - fully merged %@", fullyMerged ? @"YES" : @"NO");
if (!fullyMerged) {
// if some parts of the next item is
break;
}

[self.items removeObjectAtIndex:1];
}
}

[workItem callReadyHandlerWithContext:self.context];
}
}

- (BOOL)isDuplicateForTypeID:(NSUInteger)opaqueDuplicateTypeID workItemData:(id)opaqueWorkItemData
{
os_unfair_lock_lock(&_lock);
int i = 0;
for (MTRAsyncCallbackQueueWorkItem * item in self.items) {
BOOL isDuplicate = NO;
if (item.supportsDuplicateCheck && (item.duplicateTypeID == opaqueDuplicateTypeID) && item.duplicateCheckHandler) {
item.duplicateCheckHandler(opaqueWorkItemData, &isDuplicate);
if (isDuplicate) {
os_unfair_lock_unlock(&_lock);
return YES;
}
}
i++;
}
os_unfair_lock_unlock(&_lock);
return NO;
}
@end

@implementation MTRAsyncCallbackQueueWorkItem
Expand Down Expand Up @@ -277,4 +319,24 @@ - (void)cancel
});
}
}

- (void)setBatchingID:(NSUInteger)opaqueBatchingID
data:(id)opaqueBatchableData
handler:(MTRAsyncCallbackBatchingHandler)batchingHandler
{
os_unfair_lock_lock(&self->_lock);
_batchable = YES;
_batchingID = opaqueBatchingID;
_batchableData = opaqueBatchableData;
_batchingHandler = batchingHandler;
os_unfair_lock_unlock(&self->_lock);
}

- (void)setDuplicateTypeID:(NSUInteger)opaqueDuplicateTypeID handler:(MTRAsyncCallbackDuplicateCheckHandler)duplicateCheckHandler
{
_supportsDuplicateCheck = YES;
_duplicateTypeID = opaqueDuplicateTypeID;
_duplicateCheckHandler = duplicateCheckHandler;
}

@end
13 changes: 13 additions & 0 deletions src/darwin/Framework/CHIP/MTRAsyncCallbackWorkQueue_Internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,17 @@ NS_ASSUME_NONNULL_BEGIN
- (instancetype)initWithContext:(id _Nullable)context queue:(dispatch_queue_t)queue;
@end

@interface MTRAsyncCallbackQueueWorkItem ()
// Batching
@property (nonatomic, readonly) BOOL batchable;
@property (nonatomic, readonly) NSUInteger batchingID;
@property (nonatomic, readonly) id batchableData;
@property (nonatomic, readonly) MTRAsyncCallbackBatchingHandler batchingHandler;

// Duplicate filter
@property (nonatomic, readonly) BOOL supportsDuplicateCheck;
@property (nonatomic, readonly) NSUInteger duplicateTypeID;
@property (nonatomic, readonly) MTRAsyncCallbackDuplicateCheckHandler duplicateCheckHandler;
@end

NS_ASSUME_NONNULL_END
138 changes: 114 additions & 24 deletions src/darwin/Framework/CHIP/MTRDevice.mm
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,21 @@ typedef NS_ENUM(NSUInteger, MTRDeviceExpectedValueFieldIndex) {
MTRDeviceExpectedValueFieldIDIndex = 2
};

typedef NS_ENUM(NSUInteger, MTRDeviceReadRequestFieldIndex) {
MTRDeviceReadRequestFieldEndpointIDIndex = 0,
MTRDeviceReadRequestFieldClusterIDIndex = 1,
MTRDeviceReadRequestFieldAttributeIDIndex = 2,
MTRDeviceReadRequestFieldParamsIndex = 3
};

typedef NS_ENUM(NSUInteger, MTRDeviceWorkItemBatchingID) {
MTRDeviceWorkItemBatchingReadID = 1,
};

typedef NS_ENUM(NSUInteger, MTRDeviceWorkItemDuplicateTypeID) {
MTRDeviceWorkItemDuplicateReadTypeID = 1,
};

@interface MTRDevice ()
@property (nonatomic, readonly) os_unfair_lock lock; // protects the caches and device state
@property (nonatomic) chip::FabricIndex fabricIndex;
Expand Down Expand Up @@ -775,37 +790,112 @@ static BOOL AttributeHasChangesOmittedQuality(MTRAttributePath * attributePath)
// 4. Cache has no entry
// TODO: add option for BaseSubscriptionCallback to report during priming, to reduce when case 4 is hit
if (!attributeIsSpecified || ![self _subscriptionAbleToReport] || hasChangesOmittedQuality || !attributeValueToReturn) {
// Read requests container will be an array of items, each being an array containing:
// [endpoint ID, cluster ID, attribute ID, params]
// Batching handler should only coalesce when params are equal.

// For this single read API there's only 1 array item. Use NSNull to stand in for nil params for easy comparison.
NSMutableArray<NSArray *> * readRequests =
[NSMutableArray arrayWithObject:@[ endpointID, clusterID, attributeID, params ?: [NSNull null] ]];

// Create work item, set ready handler to perform task, then enqueue the work
MTRAsyncCallbackQueueWorkItem * workItem = [[MTRAsyncCallbackQueueWorkItem alloc] initWithQueue:self.queue];
MTRAsyncCallbackBatchingHandler batchingHandler = ^(id opaqueDataCurrent, id opaqueDataNext, BOOL * fullyMerged) {
NSMutableArray<NSArray *> * readRequestsCurrent = opaqueDataCurrent;
NSMutableArray<NSArray *> * readRequestsNext = opaqueDataNext;

*fullyMerged = NO;

// Can only read up to 9 paths at a time, per spec
if (readRequestsCurrent.count >= 9) {
return;
}

while (readRequestsNext.count) {
// if params don't match then they cannot be merged
if (![readRequestsNext[0][MTRDeviceReadRequestFieldParamsIndex]
isEqual:readRequestsCurrent[0][MTRDeviceReadRequestFieldParamsIndex]]) {
return;
}

// merge the next item's first request into the current item's list
[readRequestsCurrent addObject:readRequestsNext[0]];
[readRequestsNext removeObjectAtIndex:0];

// Can only read up to 9 paths at a time, per spec
if (readRequestsCurrent.count == 9) {
break;
}
}

if (readRequestsNext.count == 0) {
*fullyMerged = YES;
}
};
MTRAsyncCallbackDuplicateCheckHandler duplicateCheckHandler = ^(id opaqueItemData, BOOL * isDuplicate) {
*isDuplicate = NO;
for (NSArray * readItem in readRequests) {
if ([readItem isEqual:opaqueItemData]) {
*isDuplicate = YES;
return;
}
}
};
MTRAsyncCallbackReadyHandler readyHandler = ^(MTRDevice * device, NSUInteger retryCount) {
MTR_LOG_DEFAULT("%@ dequeueWorkItem %@", logPrefix, self->_asyncCallbackWorkQueue);

// Sanity check
if (readRequests.count == 0) {
MTR_LOG_ERROR("%@ dequeueWorkItem no read requests", logPrefix);
[workItem endWork];
return;
}

// Build the attribute paths from the read requests
NSMutableArray * attributePaths = [NSMutableArray array];
for (NSArray * readItem in readRequests) {
// Sanity check
if (readItem.count < 4) {
MTR_LOG_ERROR("%@ dequeueWorkItem read item missing info %@", logPrefix, readItem);
[workItem endWork];
return;
}
[attributePaths addObject:[MTRAttributeRequestPath
requestPathWithEndpointID:readItem[MTRDeviceReadRequestFieldEndpointIDIndex]
clusterID:readItem[MTRDeviceReadRequestFieldClusterIDIndex]
attributeID:readItem[MTRDeviceReadRequestFieldAttributeIDIndex]]];
}
// If param is the NSNull stand-in, then just use nil
id readParamObject = readRequests[0][MTRDeviceReadRequestFieldParamsIndex];
MTRReadParams * readParams = (![readParamObject isEqual:[NSNull null]]) ? readParamObject : nil;

MTRBaseDevice * baseDevice = [self newBaseDevice];
[baseDevice readAttributesWithEndpointID:endpointID
clusterID:clusterID
attributeID:attributeID
params:params
queue:self.queue
completion:^(NSArray<NSDictionary<NSString *, id> *> * _Nullable values,
NSError * _Nullable error) {
if (values) {
// Since the format is the same data-value dictionary, this looks like an
// attribute report
MTR_LOG_INFO("%@ completion values %@", logPrefix, values);
[self _handleAttributeReport:values];
}

// TODO: better retry logic
if (error && (retryCount < 2)) {
MTR_LOG_ERROR("%@ completion error %@ retryWork %lu", logPrefix, error,
(unsigned long) retryCount);
[workItem retryWork];
} else {
MTR_LOG_DEFAULT("%@ completion error %@ endWork", logPrefix, error);
[workItem endWork];
}
}];
[baseDevice
readAttributePaths:attributePaths
eventPaths:nil
params:readParams
queue:self.queue
completion:^(NSArray<NSDictionary<NSString *, id> *> * _Nullable values, NSError * _Nullable error) {
if (values) {
// Since the format is the same data-value dictionary, this looks like an
// attribute report
MTR_LOG_INFO("%@ completion values %@", logPrefix, values);
[self _handleAttributeReport:values];
}

// TODO: better retry logic
if (error && (retryCount < 2)) {
MTR_LOG_ERROR("%@ completion error %@ retryWork %lu", logPrefix, error, (unsigned long) retryCount);
[workItem retryWork];
} else {
MTR_LOG_DEFAULT("%@ completion error %@ endWork", logPrefix, error);
[workItem endWork];
}
}];
};
workItem.readyHandler = readyHandler;
[workItem setBatchingID:MTRDeviceWorkItemBatchingReadID data:readRequests handler:batchingHandler];
[workItem setDuplicateTypeID:MTRDeviceWorkItemDuplicateReadTypeID handler:duplicateCheckHandler];
MTR_LOG_DEFAULT("%@ enqueueWorkItem %@", logPrefix, _asyncCallbackWorkQueue);
[_asyncCallbackWorkQueue enqueueWorkItem:workItem];
}
Expand Down
Loading

0 comments on commit 6d7b16f

Please sign in to comment.