Skip to content

Commit

Permalink
Merge pull request #1451 from sodality-tech/dedup-poller-zino
Browse files Browse the repository at this point in the history
feat(graphql): Configurable option to deduplicate pollers
  • Loading branch information
vincenzopalazzo authored Aug 5, 2024
2 parents b365a83 + e4967c0 commit e68adbe
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 12 deletions.
2 changes: 2 additions & 0 deletions packages/graphql/lib/src/core/query_manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ class QueryManager {
required this.link,
required this.cache,
this.alwaysRebroadcast = false,
bool deduplicatePollers = false,
}) {
scheduler = QueryScheduler(
queryManager: this,
deduplicatePollers: deduplicatePollers,
);
}

Expand Down
2 changes: 2 additions & 0 deletions packages/graphql/lib/src/graphql_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ class GraphQLClient implements GraphQLDataProxy {
required this.cache,
DefaultPolicies? defaultPolicies,
bool alwaysRebroadcast = false,
bool deduplicatePollers = false,
}) : defaultPolicies = defaultPolicies ?? DefaultPolicies(),
queryManager = QueryManager(
link: link,
cache: cache,
alwaysRebroadcast: alwaysRebroadcast,
deduplicatePollers: deduplicatePollers,
);

/// The default [Policies] to set for each client action
Expand Down
84 changes: 72 additions & 12 deletions packages/graphql/lib/src/scheduler/scheduler.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import 'dart:async';

import 'package:collection/collection.dart';
import 'package:gql_exec/gql_exec.dart';
import 'package:graphql/src/core/query_manager.dart';
import 'package:graphql/src/core/query_options.dart';
import 'package:graphql/src/core/observable_query.dart';
Expand All @@ -8,9 +10,11 @@ import 'package:graphql/src/core/observable_query.dart';
class QueryScheduler {
QueryScheduler({
this.queryManager,
});
bool deduplicatePollers = false,
}) : _deduplicatePollers = deduplicatePollers;

QueryManager? queryManager;
final bool _deduplicatePollers;

/// Map going from query ids to the [WatchQueryOptions] associated with those queries.
Map<String, WatchQueryOptions> registeredQueries =
Expand Down Expand Up @@ -68,25 +72,81 @@ class QueryScheduler {
options.pollInterval != null && options.pollInterval! > Duration.zero,
);

registeredQueries[queryId] = options;
final existingEntry = _fastestEntryForRequest(options.asRequest);
final String? existingQueryId = existingEntry?.key;
final Duration? existingInterval = existingEntry?.value.pollInterval;

final interval = options.pollInterval;
// Update or add the query in registeredQueries
registeredQueries[queryId] = options;

if (intervalQueries.containsKey(interval)) {
intervalQueries[interval]!.add(queryId);
final Duration interval;

if (existingInterval != null && _deduplicatePollers) {
if (existingInterval > options.pollInterval!) {
// The new one is faster, remove the old one and add the new one
intervalQueries[existingInterval]!.remove(existingQueryId);
interval = options.pollInterval!;
} else {
// The new one is slower or the same. Don't add it to the list
return;
}
} else {
intervalQueries[interval] = Set<String>.of([queryId]);

_pollingTimers[interval] = Timer.periodic(
interval!,
(Timer timer) => fetchQueriesOnInterval(timer, interval),
);
// If there is no existing interval, we'll add the new one
interval = options.pollInterval!;
}

// Add new query to intervalQueries
_addInterval(queryId, interval);
}

/// Removes the [ObservableQuery] from one of the registered queries.
/// The fetchQueriesOnInterval will then take care of not firing it anymore.
void stopPollingQuery(String queryId) {
registeredQueries.remove(queryId);
final removedQuery = registeredQueries.remove(queryId);

if (removedQuery == null ||
removedQuery.pollInterval == null ||
!_deduplicatePollers) {
return;
}

// If there is a registered query that has the same `asRequest` as this one
// Add the next fastest duration to the intervalQueries
final fastestEntry = _fastestEntryForRequest(removedQuery.asRequest);
final String? fastestQueryId = fastestEntry?.key;
final Duration? fastestInterval = fastestEntry?.value.pollInterval;

if (fastestQueryId == null || fastestInterval == null) {
// There is no other query, return.
return;
}

_addInterval(fastestQueryId, fastestInterval);
}

/// Adds a [queryId] to the [intervalQueries] for a specific [interval]
/// and starts the timer if it doesn't exist.
void _addInterval(String queryId, Duration interval) {
final existingSet = intervalQueries[interval];
if (existingSet != null) {
existingSet.add(queryId);
} else {
intervalQueries[interval] = {queryId};
_pollingTimers[interval] = Timer.periodic(
interval, (Timer timer) => fetchQueriesOnInterval(timer, interval));
}
}

/// Returns the fastest query that matches the [request] or null if none exists.
MapEntry<String, WatchQueryOptions<Object?>>? _fastestEntryForRequest(
Request request) {
return registeredQueries.entries
// All existing queries mapping to the same request.
.where((entry) =>
entry.value.asRequest == request &&
entry.value.pollInterval != null)
// Ascending is default (shortest poll interval first)
.sortedBy((entry) => entry.value.pollInterval!)
.firstOrNull;
}
}

0 comments on commit e68adbe

Please sign in to comment.