Skip to content

Commit

Permalink
Refactor notifying store subscriptions for better consistency update …
Browse files Browse the repository at this point in the history
…perf (under feature flag)

Reviewed By: josephsavona

Differential Revision: D24966863

fbshipit-source-id: d22a614dcbc3e457e39bc79185e03803724223bf
  • Loading branch information
Juan Tejada authored and facebook-github-bot committed Nov 19, 2020
1 parent d64b579 commit ad65328
Show file tree
Hide file tree
Showing 6 changed files with 1,028 additions and 8 deletions.
12 changes: 8 additions & 4 deletions packages/relay-runtime/store/RelayModernStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,20 @@
'use strict';

const DataChecker = require('./DataChecker');
const RelayFeatureFlags = require('../util/RelayFeatureFlags');
const RelayModernRecord = require('./RelayModernRecord');
const RelayOptimisticRecordSource = require('./RelayOptimisticRecordSource');
const RelayProfiler = require('../util/RelayProfiler');
const RelayReader = require('./RelayReader');
const RelayReferenceMarker = require('./RelayReferenceMarker');
const RelayStoreReactFlightUtils = require('./RelayStoreReactFlightUtils');
const RelayStoreSubscriptions = require('./RelayStoreSubscriptions');
const RelayStoreSubscriptionsUsingMapByID = require('./RelayStoreSubscriptionsUsingMapByID');
const RelayStoreUtils = require('./RelayStoreUtils');

const deepFreeze = require('../util/deepFreeze');
const defaultGetDataID = require('./defaultGetDataID');
const hasOverlappingIDs = require('./hasOverlappingIDs');
const invariant = require('invariant');
const recycleNodesInto = require('../util/recycleNodesInto');
const resolveImmediate = require('../util/resolveImmediate');

const {ROOT_ID, ROOT_TYPE} = require('./RelayStoreUtils');
Expand All @@ -47,6 +47,7 @@ import type {
SingularReaderSelector,
Snapshot,
Store,
StoreSubscriptions,
UpdatedRecords,
} from './RelayStoreTypes';

Expand Down Expand Up @@ -100,7 +101,7 @@ class RelayModernStore implements Store {
|},
>;
_shouldScheduleGC: boolean;
_storeSubscriptions: RelayStoreSubscriptions;
_storeSubscriptions: StoreSubscriptions;
_updatedRecordIDs: UpdatedRecords;

constructor(
Expand Down Expand Up @@ -143,7 +144,10 @@ class RelayModernStore implements Store {
this._releaseBuffer = [];
this._roots = new Map();
this._shouldScheduleGC = false;
this._storeSubscriptions = new RelayStoreSubscriptions();
this._storeSubscriptions =
RelayFeatureFlags.ENABLE_STORE_SUBSCRIPTIONS_REFACTOR === true
? new RelayStoreSubscriptionsUsingMapByID()
: new RelayStoreSubscriptions();
this._updatedRecordIDs = {};

initializeRecordSource(this._recordSource);
Expand Down
15 changes: 11 additions & 4 deletions packages/relay-runtime/store/RelayStoreSubscriptions.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,18 @@ import type {
RecordSource,
RequestDescriptor,
Snapshot,
StoreSubscriptions,
UpdatedRecords,
} from './RelayStoreTypes';

export type Subscription = {|
type Subscription = {|
callback: (snapshot: Snapshot) => void,
snapshot: Snapshot,
stale: boolean,
backup: ?Snapshot,
|};

class RelayStoreSubscriptions {
class RelayStoreSubscriptions implements StoreSubscriptions {
_subscriptions: Set<Subscription>;

constructor() {
Expand Down Expand Up @@ -119,8 +120,14 @@ class RelayStoreSubscriptions {
});
}

// Returns the owner (RequestDescriptor) if the subscription was affected by the
// latest update, or null if it was not affected.
/**
* Notifies the callback for the subscription if the data for the associated
* snapshot has changed.
* Additionally, updates the subscription snapshot with the latest snapshot,
* and marks it as not stale.
* Returns the owner (RequestDescriptor) if the subscription was affected by the
* latest update, or null if it was not affected.
*/
_updateSubscription(
source: RecordSource,
subscription: Subscription,
Expand Down
259 changes: 259 additions & 0 deletions packages/relay-runtime/store/RelayStoreSubscriptionsUsingMapByID.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
/**
* Copyright (c) Facebook, Inc. and its affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*
* @flow strict-local
* @format
*/

// flowlint ambiguous-object-type:error

'use strict';

const RelayReader = require('./RelayReader');

const deepFreeze = require('../util/deepFreeze');
const recycleNodesInto = require('../util/recycleNodesInto');

import type {DataID, Disposable} from '../util/RelayRuntimeTypes';
import type {
RecordMap,
RecordSource,
RequestDescriptor,
Snapshot,
StoreSubscriptions,
UpdatedRecords,
} from './RelayStoreTypes';

type Subscription = {|
backup: ?Snapshot,
callback: (snapshot: Snapshot) => void,
notifiedRevision: number,
snapshot: Snapshot,
snapshotRevision: number,
|};

class RelayStoreSubscriptionsUsingMapByID implements StoreSubscriptions {
_notifiedRevision: number;
_snapshotRevision: number;
_subscriptionsByDataId: Map<DataID, Set<Subscription>>;
_staleSubscriptions: Set<Subscription>;

constructor() {
this._notifiedRevision = 0;
this._snapshotRevision = 0;
this._subscriptionsByDataId = new Map();
this._staleSubscriptions = new Set();
}

subscribe(
snapshot: Snapshot,
callback: (snapshot: Snapshot) => void,
): Disposable {
const subscription = {
backup: null,
callback,
notifiedRevision: this._notifiedRevision,
snapshotRevision: this._snapshotRevision,
snapshot,
};
const dispose = () => {
for (const dataId in snapshot.seenRecords) {
const subscriptionsForDataId = this._subscriptionsByDataId.get(dataId);
if (subscriptionsForDataId != null) {
subscriptionsForDataId.delete(subscription);
if (subscriptionsForDataId.size === 0) {
this._subscriptionsByDataId.delete(dataId);
}
}
}
};

for (const dataId in snapshot.seenRecords) {
const subscriptionsForDataId = this._subscriptionsByDataId.get(dataId);
if (subscriptionsForDataId != null) {
subscriptionsForDataId.add(subscription);
} else {
this._subscriptionsByDataId.set(dataId, new Set([subscription]));
}
}

return {dispose};
}

snapshotSubscriptions(source: RecordSource) {
this._snapshotRevision++;
this._subscriptionsByDataId.forEach(subscriptions => {
subscriptions.forEach(subscription => {
if (subscription.snapshotRevision === this._snapshotRevision) {
return;
}
subscription.snapshotRevision = this._snapshotRevision;

// Backup occurs after writing a new "final" payload(s) and before (re)applying
// optimistic changes. Each subscription's `snapshot` represents what was *last
// published to the subscriber*, which notably may include previous optimistic
// updates. Therefore a subscription can be in any of the following states:
// - stale=true: This subscription was restored to a different value than
// `snapshot`. That means this subscription has changes relative to its base,
// but its base has changed (we just applied a final payload): recompute
// a backup so that we can later restore to the state the subscription
// should be in.
// - stale=false: This subscription was restored to the same value than
// `snapshot`. That means this subscription does *not* have changes relative
// to its base, so the current `snapshot` is valid to use as a backup.
if (!this._staleSubscriptions.has(subscription)) {
subscription.backup = subscription.snapshot;
return;
}
const snapshot = subscription.snapshot;
const backup = RelayReader.read(source, snapshot.selector);
const nextData = recycleNodesInto(snapshot.data, backup.data);
(backup: $FlowFixMe).data = nextData; // backup owns the snapshot and can safely mutate
subscription.backup = backup;
});
});
}

restoreSubscriptions() {
this._snapshotRevision++;
this._subscriptionsByDataId.forEach(subscriptions => {
subscriptions.forEach(subscription => {
if (subscription.snapshotRevision === this._snapshotRevision) {
return;
}
subscription.snapshotRevision = this._snapshotRevision;

const backup = subscription.backup;
subscription.backup = null;
if (backup) {
if (backup.data !== subscription.snapshot.data) {
this._staleSubscriptions.add(subscription);
}
const prevSeenRecords = subscription.snapshot.seenRecords;
subscription.snapshot = {
data: subscription.snapshot.data,
isMissingData: backup.isMissingData,
seenRecords: backup.seenRecords,
selector: backup.selector,
missingRequiredFields: backup.missingRequiredFields,
};
this._updateSubscriptionsMap(subscription, prevSeenRecords);
} else {
this._staleSubscriptions.add(subscription);
}
});
});
}

updateSubscriptions(
source: RecordSource,
updatedRecordIDs: UpdatedRecords,
updatedOwners: Array<RequestDescriptor>,
) {
this._notifiedRevision++;
Object.keys(updatedRecordIDs).forEach(updatedRecordId => {
const subcriptionsForDataId = this._subscriptionsByDataId.get(
updatedRecordId,
);
if (subcriptionsForDataId == null) {
return;
}
subcriptionsForDataId.forEach(subscription => {
if (subscription.notifiedRevision === this._notifiedRevision) {
return;
}
const owner = this._updateSubscription(source, subscription, false);
if (owner != null) {
updatedOwners.push(owner);
}
});
});
this._staleSubscriptions.forEach(subscription => {
if (subscription.notifiedRevision === this._notifiedRevision) {
return;
}
const owner = this._updateSubscription(source, subscription, true);
if (owner != null) {
updatedOwners.push(owner);
}
});
this._staleSubscriptions.clear();
}

/**
* Notifies the callback for the subscription if the data for the associated
* snapshot has changed.
* Additionally, updates the subscription snapshot with the latest snapshot,
* amarks it as not stale, and updates the subscription tracking for any
* any new ids observed in the latest data snapshot.
* Returns the owner (RequestDescriptor) if the subscription was affected by the
* latest update, or null if it was not affected.
*/
_updateSubscription(
source: RecordSource,
subscription: Subscription,
stale: boolean,
): ?RequestDescriptor {
const {backup, callback, snapshot} = subscription;
let nextSnapshot: Snapshot =
stale && backup != null
? backup
: RelayReader.read(source, snapshot.selector);
const nextData = recycleNodesInto(snapshot.data, nextSnapshot.data);
nextSnapshot = ({
data: nextData,
isMissingData: nextSnapshot.isMissingData,
seenRecords: nextSnapshot.seenRecords,
selector: nextSnapshot.selector,
missingRequiredFields: nextSnapshot.missingRequiredFields,
}: Snapshot);
if (__DEV__) {
deepFreeze(nextSnapshot);
}

const prevSeenRecords = subscription.snapshot.seenRecords;
subscription.snapshot = nextSnapshot;
subscription.notifiedRevision = this._notifiedRevision;
this._updateSubscriptionsMap(subscription, prevSeenRecords);

if (nextSnapshot.data !== snapshot.data) {
callback(nextSnapshot);
return snapshot.selector.owner;
}
}

/**
* Updates the Map that tracks subscriptions by id.
* Given an updated subscription and the records that where seen
* on the previous subscription snapshot, updates our tracking
* to track the subscription for the newly and no longer seen ids.
*/
_updateSubscriptionsMap(
subscription: Subscription,
prevSeenRecords: RecordMap,
) {
for (const dataId in prevSeenRecords) {
const subscriptionsForDataId = this._subscriptionsByDataId.get(dataId);
if (subscriptionsForDataId != null) {
subscriptionsForDataId.delete(subscription);
if (subscriptionsForDataId.size === 0) {
this._subscriptionsByDataId.delete(dataId);
}
}
}

for (const dataId in subscription.snapshot.seenRecords) {
const subscriptionsForDataId = this._subscriptionsByDataId.get(dataId);
if (subscriptionsForDataId != null) {
subscriptionsForDataId.add(subscription);
} else {
this._subscriptionsByDataId.set(dataId, new Set([subscription]));
}
}
}
}

module.exports = RelayStoreSubscriptionsUsingMapByID;
34 changes: 34 additions & 0 deletions packages/relay-runtime/store/RelayStoreTypes.js
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,40 @@ export interface Store {
): Disposable;
}

export interface StoreSubscriptions {
/**
* Subscribe to changes to the results of a selector. The callback is called
* when `updateSubscriptions()` is called *and* records have been published that affect the
* selector results relative to the last update.
*/
subscribe(
snapshot: Snapshot,
callback: (snapshot: Snapshot) => void,
): Disposable;

/**
* Record a backup/snapshot of the current state of the subscriptions.
* This state can be restored with restore().
*/
snapshotSubscriptions(source: RecordSource): void;

/**
* Reset the state of the subscriptions to the point that snapshot() was last called.
*/
restoreSubscriptions(): void;

/**
* Notifies each subscription if the snapshot for the subscription selector has changed.
* Mutates the updatedOwners array with any owners (RequestDescriptors) associated
* with the subscriptions that were notifed; i.e. the owners affected by the changes.
*/
updateSubscriptions(
source: RecordSource,
updatedRecordIDs: UpdatedRecords,
updatedOwners: Array<RequestDescriptor>,
): void;
}

/**
* A type that accepts a callback and schedules it to run at some future time.
* By convention, implementations should not execute the callback immediately.
Expand Down
Loading

0 comments on commit ad65328

Please sign in to comment.