Skip to content

Commit

Permalink
(core) - Avoid dispatching an operation for already active cache-firs…
Browse files Browse the repository at this point in the history
…t/only operations (#1600)
  • Loading branch information
kitten authored May 3, 2021
1 parent e51daa5 commit f02178f
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 22 deletions.
5 changes: 5 additions & 0 deletions .changeset/large-frogs-tease.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@urql/core': minor
---

With the "single-source behavior" the `Client` will now also avoid executing an operation if it's already active, has a previous result available, and is either run with the `cache-first` or `cache-only` request policies. This is similar to a "short circuiting" behavior, where unnecessary work is avoided by not issuing more operations into the exchange pipeline than expected.
134 changes: 121 additions & 13 deletions packages/core/src/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ describe('shared sources behavior', () => {
jest.useRealTimers();
});

it('replays results from prior operation result as needed', async () => {
it('replays results from prior operation result as needed (cache-first)', async () => {
const exchange: Exchange = () => ops$ => {
let i = 0;
return pipe(
Expand Down Expand Up @@ -556,10 +556,111 @@ describe('shared sources behavior', () => {

expect(resultTwo).toHaveBeenCalledWith({
data: 1,
operation: queryOperation,
});

jest.advanceTimersByTime(1);

// With cache-first we don't expect a new operation to be issued
expect(resultTwo).toHaveBeenCalledTimes(1);
});

it('replays results from prior operation result as needed (network-only)', async () => {
const exchange: Exchange = () => ops$ => {
let i = 0;
return pipe(
ops$,
map(op => ({
data: ++i,
operation: op,
})),
delay(1)
);
};

const client = createClient({
url: 'test',
exchanges: [exchange],
});

const operation = makeOperation('query', queryOperation, {
...queryOperation.context,
requestPolicy: 'network-only',
});

const resultOne = jest.fn();
const resultTwo = jest.fn();

pipe(client.executeRequestOperation(operation), subscribe(resultOne));

expect(resultOne).toHaveBeenCalledTimes(0);

jest.advanceTimersByTime(1);

expect(resultOne).toHaveBeenCalledTimes(1);
expect(resultOne).toHaveBeenCalledWith({
data: 1,
operation,
});

pipe(client.executeRequestOperation(operation), subscribe(resultTwo));

expect(resultTwo).toHaveBeenCalledWith({
data: 1,
operation,
stale: true,
});

jest.advanceTimersByTime(1);

// With network-only we expect a new operation to be issued, hence a new result
expect(resultTwo).toHaveBeenCalledTimes(2);

expect(resultTwo).toHaveBeenCalledWith({
data: 2,
operation,
});
});

it('does not replay values from a past subscription', async () => {
const exchange: Exchange = () => ops$ => {
let i = 0;
return pipe(
ops$,
filter(op => op.kind !== 'teardown'),
map(op => ({
data: ++i,
operation: op,
})),
delay(1)
);
};

const client = createClient({
url: 'test',
exchanges: [exchange],
});

// We keep the source in-memory
const source = client.executeRequestOperation(queryOperation);
const resultOne = jest.fn();
let subscription;

subscription = pipe(source, subscribe(resultOne));

expect(resultOne).toHaveBeenCalledTimes(0);
jest.advanceTimersByTime(1);

expect(resultOne).toHaveBeenCalledWith({
data: 1,
operation: queryOperation,
});

subscription.unsubscribe();
const resultTwo = jest.fn();
subscription = pipe(source, subscribe(resultTwo));

expect(resultTwo).toHaveBeenCalledTimes(0);
jest.advanceTimersByTime(1);

expect(resultTwo).toHaveBeenCalledWith({
Expand All @@ -586,19 +687,21 @@ describe('shared sources behavior', () => {
exchanges: [exchange],
});

const operation = makeOperation('query', queryOperation, {
...queryOperation.context,
requestPolicy: 'network-only',
});

const resultOne = jest.fn();
const resultTwo = jest.fn();

pipe(client.executeRequestOperation(queryOperation), subscribe(resultOne));

pipe(client.executeRequestOperation(queryOperation), subscribe(resultTwo));
pipe(client.executeRequestOperation(operation), subscribe(resultOne));
pipe(client.executeRequestOperation(operation), subscribe(resultTwo));

expect(resultOne).toHaveBeenCalledTimes(1);
expect(resultTwo).toHaveBeenCalledTimes(1);

expect(resultTwo).toHaveBeenCalledWith({
data: 1,
operation: queryOperation,
operation,
stale: true,
});
});
Expand Down Expand Up @@ -628,7 +731,7 @@ describe('shared sources behavior', () => {
expect(resultTwo).toHaveBeenCalledTimes(0);
});

it('skips replaying results when a result is emitted immediately', () => {
it('skips replaying results when a result is emitted immediately (network-only)', () => {
const exchange: Exchange = () => ops$ => {
let i = 0;
return pipe(
Expand All @@ -642,26 +745,31 @@ describe('shared sources behavior', () => {
exchanges: [exchange],
});

const operation = makeOperation('query', queryOperation, {
...queryOperation.context,
requestPolicy: 'network-only',
});

const resultOne = jest.fn();
const resultTwo = jest.fn();

pipe(client.executeRequestOperation(queryOperation), subscribe(resultOne));
pipe(client.executeRequestOperation(operation), subscribe(resultOne));

expect(resultOne).toHaveBeenCalledWith({
data: 1,
operation: queryOperation,
operation,
});

pipe(client.executeRequestOperation(queryOperation), subscribe(resultTwo));
pipe(client.executeRequestOperation(operation), subscribe(resultTwo));

expect(resultTwo).toHaveBeenCalledWith({
data: 2,
operation: queryOperation,
operation,
});

expect(resultOne).toHaveBeenCalledWith({
data: 2,
operation: queryOperation,
operation,
});
});

Expand Down
7 changes: 6 additions & 1 deletion packages/core/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,14 @@ export class Client {
share
);
} else {
const mode =
operation.context.requestPolicy === 'cache-and-network' ||
operation.context.requestPolicy === 'network-only'
? 'pre'
: 'post';
active = pipe(
result$,
replayOnStart(() => {
replayOnStart(mode, () => {
this.activeOperations.set(operation.key, active!);
this.dispatchOperation(operation);
})
Expand Down
28 changes: 20 additions & 8 deletions packages/core/src/utils/streamUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,20 @@ export function withPromise<T>(source$: Source<T>): PromisifiedSource<T> {
return source$ as PromisifiedSource<T>;
}

export type ReplayMode = 'pre' | 'post';

export function replayOnStart<T extends OperationResult>(
start?: () => void
mode: ReplayMode,
start: () => void
): Operator<T, T> {
return source$ => {
let replay: T | void;

const shared$ = pipe(
source$,
onEnd(() => {
replay = undefined;
}),
onPush(value => {
replay = value;
}),
Expand All @@ -37,18 +43,24 @@ export function replayOnStart<T extends OperationResult>(
return make<T>(observer => {
const prevReplay = replay;

const subscription = pipe(
return pipe(
shared$,
onEnd(observer.complete),
onStart(() => {
if (start) start();
if (prevReplay !== undefined && prevReplay === replay)
observer.next({ ...prevReplay, stale: true });
if (mode === 'pre') {
start();
}

if (prevReplay !== undefined && prevReplay === replay) {
observer.next(
mode === 'pre' ? { ...prevReplay, stale: true } : prevReplay
);
} else if (mode === 'post') {
start();
}
}),
subscribe(observer.next)
);

return subscription.unsubscribe;
).unsubscribe;
});
};
}

0 comments on commit f02178f

Please sign in to comment.