Skip to content

Commit

Permalink
refactor: Clean up Client result source logic and allow multiple muta…
Browse files Browse the repository at this point in the history
…tion results (#3102)
  • Loading branch information
kitten authored Mar 27, 2023
1 parent 85fb3ab commit 849025f
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 122 deletions.
7 changes: 7 additions & 0 deletions .changeset/soft-glasses-guess.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@urql/core': patch
---

Refactor `Client` result source construction code and allow multiple mutation
results, if `result.hasNext` on a mutation result is set to `true`, indicating
deferred or streamed results.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
"react-is": "^17.0.2",
"styled-components": "^5.2.3",
"vite": "^3.2.4",
"wonka": "^6.2.6"
"wonka": "^6.3.0"
}
},
"devDependencies": {
Expand Down
189 changes: 101 additions & 88 deletions packages/core/src/client.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
/* eslint-disable @typescript-eslint/no-use-before-define */

import {
lazy,
filter,
make,
makeSubject,
onEnd,
onPush,
Expand Down Expand Up @@ -602,91 +602,99 @@ export const Client: new (opts: ClientOptions) => Client = function Client(
const makeResultSource = (operation: Operation) => {
let result$ = pipe(
results$,
// Filter by matching key (or _instance if it’s set)
filter(
(res: OperationResult) =>
res.operation.kind === operation.kind &&
res.operation.key === operation.key &&
(!res.operation.context._instance ||
res.operation.context._instance === operation.context._instance)
),
// End the results stream when an active teardown event is sent
takeUntil(
pipe(
operations.source,
filter(op => op.kind === 'teardown' && op.key === operation.key)
)
)
);

// Mask typename properties if the option for it is turned on
if (opts.maskTypename) {
if (operation.kind !== 'query') {
// Interrupt subscriptions and mutations when they have no more results
result$ = pipe(
result$,
map(res => ({ ...res, data: maskTypename(res.data, true) }))
takeWhile(result => !!result.hasNext, true)
);
} else {
result$ = pipe(
result$,
// Add `stale: true` flag when a new operation is sent for queries
switchMap(result => {
const value$ = fromValue(result);
return result.stale
? value$
: merge([
value$,
pipe(
operations.source,
filter(
op =>
op.kind === 'query' &&
op.key === operation.key &&
op.context.requestPolicy !== 'cache-only'
),
take(1),
map(() => ({ ...result, stale: true }))
),
]);
})
);
}

if (operation.kind !== 'query') {
if (operation.kind !== 'mutation') {
result$ = pipe(
result$,
// Store replay result
onPush(result => {
dispatched.delete(operation.key);
replays.set(operation.key, result);
}),
// Cleanup active states on end of source
onEnd(() => {
// Delete the active operation handle
dispatched.delete(operation.key);
replays.delete(operation.key);
active.delete(operation.key);
// Interrupt active queue
isOperationBatchActive = false;
// Delete all queued up operations of the same key on end
for (let i = queue.length - 1; i >= 0; i--)
if (queue[i].key === operation.key) queue.splice(i, 1);
// Dispatch a teardown signal for the stopped operation
nextOperation(
makeOperation('teardown', operation, operation.context)
);
})
);
} else {
result$ = pipe(
result$,
// Send mutation operation on start
onStart(() => {
nextOperation(operation);
})
);
}

// A mutation is always limited to just a single result and is never shared
if (operation.kind === 'mutation') {
return pipe(result$, take(1));
}

if (operation.kind === 'subscription') {
// Mask typename properties if the option for it is turned on
if (opts.maskTypename) {
result$ = pipe(
result$,
takeWhile(result => !!result.hasNext)
map(res => ({ ...res, data: maskTypename(res.data, true) }))
);
}

return pipe(
result$,
// End the results stream when an active teardown event is sent
takeUntil(
pipe(
operations.source,
filter(op => op.kind === 'teardown' && op.key === operation.key)
)
),
switchMap(result => {
if (operation.kind !== 'query' || result.stale) {
return fromValue(result);
}

return merge([
fromValue(result),
// Mark a result as stale when a new operation is sent for it
pipe(
operations.source,
filter(
op =>
op.kind === 'query' &&
op.key === operation.key &&
op.context.requestPolicy !== 'cache-only'
),
take(1),
map(() => ({ ...result, stale: true }))
),
]);
}),
onPush(result => {
dispatched.delete(operation.key);
replays.set(operation.key, result);
}),
onEnd(() => {
// Delete the active operation handle
dispatched.delete(operation.key);
replays.delete(operation.key);
active.delete(operation.key);
// Delete all queued up operations of the same key on end
for (let i = queue.length - 1; i >= 0; i--)
if (queue[i].key === operation.key) queue.splice(i, 1);
// Dispatch a teardown signal for the stopped operation
nextOperation(makeOperation('teardown', operation, operation.context));
}),
share
);
return share(result$);
};

const instance: Client =
Expand Down Expand Up @@ -736,41 +744,46 @@ export const Client: new (opts: ClientOptions) => Client = function Client(
}

return withPromise(
make<OperationResult>(observer => {
lazy<OperationResult>(() => {
let source = active.get(operation.key);
if (!source) {
active.set(operation.key, (source = makeResultSource(operation)));
}

return pipe(
source,
onStart(() => {
const prevReplay = replays.get(operation.key);
const isNetworkOperation =
operation.context.requestPolicy === 'cache-and-network' ||
operation.context.requestPolicy === 'network-only';
if (operation.kind !== 'query') {
return;
} else if (isNetworkOperation) {
dispatchOperation(operation);
if (prevReplay && !prevReplay.hasNext) prevReplay.stale = true;
}

if (
prevReplay != null &&
prevReplay === replays.get(operation.key)
) {
observer.next(prevReplay);
} else if (!isNetworkOperation) {
const isNetworkOperation =
operation.context.requestPolicy === 'cache-and-network' ||
operation.context.requestPolicy === 'network-only';
const replay = replays.get(operation.key);

if (operation.kind !== 'query' || !replay || isNetworkOperation) {
source = pipe(
source,
onStart(() => {
dispatchOperation(operation);
}
}),
onEnd(() => {
isOperationBatchActive = false;
observer.complete();
}),
subscribe(observer.next)
).unsubscribe;
})
);
}

if (operation.kind === 'query' && replay) {
return merge([
source,
pipe(
fromValue(replay),
filter(replay => {
if (replay === replays.get(operation.key)) {
if (isNetworkOperation && !replay.hasNext)
replay.stale = true;
return true;
} else {
if (!isNetworkOperation) dispatchOperation(operation);
return false;
}
})
),
]);
} else {
return source;
}
})
);
},
Expand Down
Loading

0 comments on commit 849025f

Please sign in to comment.