From e8ae745bbad273483d715a0ced779d2b7fb238a6 Mon Sep 17 00:00:00 2001 From: Phil Pluckthun Date: Wed, 28 Apr 2021 13:21:24 +0100 Subject: [PATCH] (core) - Make Client more forgiving by sharing result sources that emit cached results (#1515) * Replace operation semaphore with shared behavior source Replace the client's counting of active operations with a single, shared operation result source in a Map. This achieves the same semaphore behaviour but replaces it with a binary semaphore-like cache, where the cached source remembers the last active result. * Prevent emitting cached result if a new result emitted first * Reuse client.query() in client.readQuery() * Add tests for shared source behaviour * Abstract replay logic in Client * Add test for shared subscription behaviour * Fix linting issues --- .../src/__snapshots__/client.test.ts.snap | 2 +- packages/core/src/client.test.ts | 224 +++++++++++++++++- packages/core/src/client.ts | 98 ++++---- packages/core/src/utils/index.ts | 2 +- packages/core/src/utils/streamUtils.ts | 54 +++++ packages/core/src/utils/withPromise.ts | 8 - 6 files changed, 324 insertions(+), 64 deletions(-) create mode 100644 packages/core/src/utils/streamUtils.ts delete mode 100644 packages/core/src/utils/withPromise.ts diff --git a/packages/core/src/__snapshots__/client.test.ts.snap b/packages/core/src/__snapshots__/client.test.ts.snap index 88745fc402..d49c1ab21f 100644 --- a/packages/core/src/__snapshots__/client.test.ts.snap +++ b/packages/core/src/__snapshots__/client.test.ts.snap @@ -2,7 +2,7 @@ exports[`createClient passes snapshot 1`] = ` Client { - "activeOperations": Object {}, + "activeOperations": Map {}, "createOperationContext": [Function], "createRequestOperation": [Function], "dispatchOperation": [Function], diff --git a/packages/core/src/client.test.ts b/packages/core/src/client.test.ts index 7104463516..7bf8f20a89 100755 --- a/packages/core/src/client.test.ts +++ b/packages/core/src/client.test.ts @@ -11,12 +11,14 @@ import { filter, toArray, tap, + take, } from 'wonka'; + import { gql } from './gql'; import { Exchange, Operation, OperationResult } from './types'; import { makeOperation } from './utils'; import { createClient } from './client'; -import { queryOperation } from './test-utils'; +import { queryOperation, subscriptionOperation } from './test-utils'; const url = 'https://hostname.com'; @@ -507,3 +509,223 @@ describe('queuing behavior', () => { unsubscribe(); }); }); + +describe('shared sources behavior', () => { + beforeEach(() => { + jest.useFakeTimers(); + }); + + afterEach(() => { + jest.useRealTimers(); + }); + + it('replays results from prior operation result as needed', async () => { + const exchange: Exchange = () => ops$ => { + let i = 0; + return pipe( + ops$, + map(op => ({ + data: ++i, + operation: op, + })), + delay(1) + ); + }; + + const client = createClient({ + url: 'test', + exchanges: [exchange], + }); + + const resultOne = jest.fn(); + const resultTwo = jest.fn(); + + pipe(client.executeRequestOperation(queryOperation), subscribe(resultOne)); + + expect(resultOne).toHaveBeenCalledTimes(0); + + jest.advanceTimersByTime(1); + + expect(resultOne).toHaveBeenCalledTimes(1); + expect(resultOne).toHaveBeenCalledWith({ + data: 1, + operation: queryOperation, + }); + + pipe(client.executeRequestOperation(queryOperation), subscribe(resultTwo)); + + expect(resultTwo).toHaveBeenCalledWith({ + data: 1, + stale: true, + operation: queryOperation, + }); + + jest.advanceTimersByTime(1); + + expect(resultTwo).toHaveBeenCalledWith({ + data: 2, + operation: queryOperation, + }); + }); + + it('replayed results are not emitted on the shared source', () => { + const exchange: Exchange = () => ops$ => { + let i = 0; + return pipe( + ops$, + map(op => ({ + data: ++i, + operation: op, + })), + take(1) + ); + }; + + const client = createClient({ + url: 'test', + exchanges: [exchange], + }); + + const resultOne = jest.fn(); + const resultTwo = jest.fn(); + + pipe(client.executeRequestOperation(queryOperation), subscribe(resultOne)); + + pipe(client.executeRequestOperation(queryOperation), subscribe(resultTwo)); + + expect(resultOne).toHaveBeenCalledTimes(1); + expect(resultTwo).toHaveBeenCalledTimes(1); + + expect(resultTwo).toHaveBeenCalledWith({ + data: 1, + operation: queryOperation, + stale: true, + }); + }); + + it('does nothing when no operation result has been emitted yet', () => { + const exchange: Exchange = () => ops$ => { + return pipe( + ops$, + map(op => ({ data: 1, operation: op })), + filter(() => false) + ); + }; + + const client = createClient({ + url: 'test', + exchanges: [exchange], + }); + + const resultOne = jest.fn(); + const resultTwo = jest.fn(); + + pipe(client.executeRequestOperation(queryOperation), subscribe(resultOne)); + + pipe(client.executeRequestOperation(queryOperation), subscribe(resultTwo)); + + expect(resultOne).toHaveBeenCalledTimes(0); + expect(resultTwo).toHaveBeenCalledTimes(0); + }); + + it('skips replaying results when a result is emitted immediately', () => { + const exchange: Exchange = () => ops$ => { + let i = 0; + return pipe( + ops$, + map(op => ({ data: ++i, operation: op })) + ); + }; + + const client = createClient({ + url: 'test', + exchanges: [exchange], + }); + + const resultOne = jest.fn(); + const resultTwo = jest.fn(); + + pipe(client.executeRequestOperation(queryOperation), subscribe(resultOne)); + + expect(resultOne).toHaveBeenCalledWith({ + data: 1, + operation: queryOperation, + }); + + pipe(client.executeRequestOperation(queryOperation), subscribe(resultTwo)); + + expect(resultTwo).toHaveBeenCalledWith({ + data: 2, + operation: queryOperation, + }); + + expect(resultOne).toHaveBeenCalledWith({ + data: 2, + operation: queryOperation, + }); + }); + + it('replays stale results as needed', () => { + const exchange: Exchange = () => ops$ => { + return pipe( + ops$, + map(op => ({ stale: true, data: 1, operation: op })), + take(1) + ); + }; + + const client = createClient({ + url: 'test', + exchanges: [exchange], + }); + + const resultOne = jest.fn(); + const resultTwo = jest.fn(); + + pipe(client.executeRequestOperation(queryOperation), subscribe(resultOne)); + + expect(resultOne).toHaveBeenCalledWith({ + data: 1, + operation: queryOperation, + stale: true, + }); + + pipe(client.executeRequestOperation(queryOperation), subscribe(resultTwo)); + + expect(resultTwo).toHaveBeenCalledWith({ + data: 1, + operation: queryOperation, + stale: true, + }); + }); + + it('does nothing when operation is a subscription has been emitted yet', () => { + const exchange: Exchange = () => ops$ => { + return pipe( + ops$, + map(op => ({ data: 1, operation: op })), + take(1) + ); + }; + + const client = createClient({ + url: 'test', + exchanges: [exchange], + }); + + const resultOne = jest.fn(); + const resultTwo = jest.fn(); + + pipe( + client.executeRequestOperation(subscriptionOperation), + subscribe(resultOne) + ); + expect(resultOne).toHaveBeenCalledTimes(1); + + pipe( + client.executeRequestOperation(subscriptionOperation), + subscribe(resultTwo) + ); + expect(resultTwo).toHaveBeenCalledTimes(0); + }); +}); diff --git a/packages/core/src/client.ts b/packages/core/src/client.ts index 8951950e01..7c25a63bc0 100755 --- a/packages/core/src/client.ts +++ b/packages/core/src/client.ts @@ -41,6 +41,7 @@ import { import { createRequest, withPromise, + replayOnStart, maskTypename, noop, makeOperation, @@ -66,10 +67,6 @@ export interface ClientOptions { maskTypename?: boolean; } -interface ActiveOperations { - [operationKey: string]: number; -} - export const createClient = (opts: ClientOptions) => new Client(opts); /** The URQL application-wide client library. Each execute method starts a GraphQL request and returns a stream of results. */ @@ -93,7 +90,7 @@ export class Client { dispatchOperation: (operation?: Operation | void) => void; operations$: Source; results$: Source; - activeOperations = Object.create(null) as ActiveOperations; + activeOperations: Map> = new Map(); queue: Operation[] = []; constructor(opts: ClientOptions) { @@ -138,7 +135,7 @@ export class Client { // operation's exchange results if ( operation.kind === 'mutation' || - (this.activeOperations[operation.key] || 0) > 0 + this.activeOperations.has(operation.key) ) { this.queue.push(operation); if (!isOperationBatchActive) { @@ -197,54 +194,28 @@ export class Client { this.createOperationContext(opts) ); - /** Counts up the active operation key and dispatches the operation */ - private onOperationStart(operation: Operation) { - const { key } = operation; - this.activeOperations[key] = (this.activeOperations[key] || 0) + 1; - this.dispatchOperation(operation); - } - - /** Deletes an active operation's result observable and sends a teardown signal through the exchange pipeline */ - private onOperationEnd(operation: Operation) { - const { key } = operation; - const prevActive = this.activeOperations[key] || 0; - const newActive = (this.activeOperations[key] = - prevActive <= 0 ? 0 : prevActive - 1); - // Check whether this operation has now become inactive - if (newActive <= 0) { - // Delete all related queued up operations for the inactive one - for (let i = this.queue.length - 1; i >= 0; i--) - if (this.queue[i].key === operation.key) this.queue.splice(i, 1); - // Issue the cancellation teardown operation - this.dispatchOperation( - makeOperation('teardown', operation, operation.context) - ); - } - } - /** Executes an Operation by sending it through the exchange pipeline It returns an observable that emits all related exchange results and keeps track of this observable's subscribers. A teardown signal will be emitted when no subscribers are listening anymore. */ executeRequestOperation( operation: Operation ): Source> { - let operationResults$ = pipe( + let active = this.activeOperations.get(operation.key); + if (active) return active; + + let result$ = pipe( this.results$, filter((res: OperationResult) => res.operation.key === operation.key) ) as Source>; - if (this.maskTypename) { - operationResults$ = pipe( - operationResults$, - map(res => { - res.data = maskTypename(res.data); - return res; - }) + result$ = pipe( + result$, + map(res => ({ ...res, data: maskTypename(res.data) })) ); } + // A mutation is always limited to just a single result and is never shared if (operation.kind === 'mutation') { - // A mutation is always limited to just a single result and is never shared return pipe( - operationResults$, + result$, onStart(() => this.dispatchOperation(operation)), take(1) ); @@ -263,34 +234,55 @@ export class Client { (op: Operation) => op.kind === operation.kind && op.key === operation.key && - op.context.requestPolicy !== 'cache-only' - ) + (op.context.requestPolicy === 'network-only' || + op.context.requestPolicy === 'cache-and-network') + ), + take(1) ); - const result$ = pipe( - operationResults$, + result$ = pipe( + result$, takeUntil(teardown$), switchMap(result => { if (result.stale) return fromValue(result); - return merge([ fromValue(result), pipe( refetch$, - take(1), map(() => ({ ...result, stale: true })) ), ]); }), - onStart(() => { - this.onOperationStart(operation); - }), onEnd(() => { - this.onOperationEnd(operation); + this.activeOperations.delete(operation.key); + for (let i = this.queue.length - 1; i >= 0; i--) + if (this.queue[i].key === operation.key) this.queue.splice(i, 1); + this.dispatchOperation( + makeOperation('teardown', operation, operation.context) + ); }) ); - return result$; + if (operation.kind === 'subscription') { + active = pipe( + result$, + onStart(() => { + this.activeOperations.set(operation.key, active!); + this.dispatchOperation(operation); + }), + share + ); + } else { + active = pipe( + result$, + replayOnStart(() => { + this.activeOperations.set(operation.key, active!); + this.dispatchOperation(operation); + }) + ); + } + + return active; } query( @@ -318,7 +310,7 @@ export class Client { let result: OperationResult | null = null; pipe( - this.executeQuery(createRequest(query, variables), context), + this.query(query, variables, context), subscribe(res => { result = res; }) diff --git a/packages/core/src/utils/index.ts b/packages/core/src/utils/index.ts index bba813b8fe..6b8469f1ba 100644 --- a/packages/core/src/utils/index.ts +++ b/packages/core/src/utils/index.ts @@ -4,7 +4,7 @@ export * from './result'; export * from './typenames'; export * from './stringifyVariables'; export * from './maskTypename'; -export * from './withPromise'; +export * from './streamUtils'; export * from './operation'; export const noop = () => { diff --git a/packages/core/src/utils/streamUtils.ts b/packages/core/src/utils/streamUtils.ts new file mode 100644 index 0000000000..1e18311eb0 --- /dev/null +++ b/packages/core/src/utils/streamUtils.ts @@ -0,0 +1,54 @@ +import { + Operator, + Source, + pipe, + toPromise, + take, + share, + onPush, + onStart, + onEnd, + subscribe, + make, +} from 'wonka'; + +import { OperationResult, PromisifiedSource } from '../types'; + +export function withPromise(source$: Source): PromisifiedSource { + (source$ as PromisifiedSource).toPromise = () => + pipe(source$, take(1), toPromise); + return source$ as PromisifiedSource; +} + +export function replayOnStart( + start?: () => void +): Operator { + return source$ => { + let replay: T | void; + + const shared$ = pipe( + source$, + onPush(value => { + replay = value; + }), + share + ); + + return make(observer => { + const prevReplay = replay; + + const subscription = pipe( + shared$, + onEnd(observer.complete), + onStart(() => { + if (start) start(); + if (prevReplay !== undefined && prevReplay === replay) + observer.next({ ...prevReplay, stale: true }); + }), + subscribe(observer.next) + ); + + return subscription.unsubscribe; + }); + }; +} diff --git a/packages/core/src/utils/withPromise.ts b/packages/core/src/utils/withPromise.ts deleted file mode 100644 index e91fccc1a0..0000000000 --- a/packages/core/src/utils/withPromise.ts +++ /dev/null @@ -1,8 +0,0 @@ -import { Source, pipe, toPromise, take } from 'wonka'; -import { PromisifiedSource } from '../types'; - -export function withPromise(source$: Source): PromisifiedSource { - (source$ as PromisifiedSource).toPromise = () => - pipe(source$, take(1), toPromise); - return source$ as PromisifiedSource; -}