Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Clean up Client result source logic and allow multiple mutation results #3102

Merged
merged 8 commits into from
Mar 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/soft-glasses-guess.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@urql/core': patch
---

Refactor `Client` result source construction code and allow multiple mutation
results, if `result.hasNext` on a mutation result is set to `true`, indicating
deferred or streamed results.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
"react-is": "^17.0.2",
"styled-components": "^5.2.3",
"vite": "^3.2.4",
"wonka": "^6.2.6"
"wonka": "^6.3.0"
}
},
"devDependencies": {
Expand Down
189 changes: 101 additions & 88 deletions packages/core/src/client.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
/* eslint-disable @typescript-eslint/no-use-before-define */

import {
lazy,
filter,
make,
makeSubject,
onEnd,
onPush,
Expand Down Expand Up @@ -602,91 +602,99 @@ export const Client: new (opts: ClientOptions) => Client = function Client(
const makeResultSource = (operation: Operation) => {
let result$ = pipe(
results$,
// Filter by matching key (or _instance if it’s set)
filter(
(res: OperationResult) =>
res.operation.kind === operation.kind &&
res.operation.key === operation.key &&
(!res.operation.context._instance ||
res.operation.context._instance === operation.context._instance)
),
// End the results stream when an active teardown event is sent
takeUntil(
pipe(
operations.source,
filter(op => op.kind === 'teardown' && op.key === operation.key)
)
)
);

// Mask typename properties if the option for it is turned on
if (opts.maskTypename) {
if (operation.kind !== 'query') {
// Interrupt subscriptions and mutations when they have no more results
result$ = pipe(
result$,
map(res => ({ ...res, data: maskTypename(res.data, true) }))
takeWhile(result => !!result.hasNext, true)
);
} else {
result$ = pipe(
result$,
// Add `stale: true` flag when a new operation is sent for queries
switchMap(result => {
const value$ = fromValue(result);
return result.stale
? value$
: merge([
value$,
pipe(
operations.source,
filter(
op =>
op.kind === 'query' &&
op.key === operation.key &&
op.context.requestPolicy !== 'cache-only'
),
take(1),
map(() => ({ ...result, stale: true }))
),
]);
})
);
}

if (operation.kind !== 'query') {
if (operation.kind !== 'mutation') {
result$ = pipe(
result$,
// Store replay result
onPush(result => {
dispatched.delete(operation.key);
replays.set(operation.key, result);
}),
// Cleanup active states on end of source
onEnd(() => {
// Delete the active operation handle
dispatched.delete(operation.key);
replays.delete(operation.key);
active.delete(operation.key);
// Interrupt active queue
isOperationBatchActive = false;
// Delete all queued up operations of the same key on end
for (let i = queue.length - 1; i >= 0; i--)
if (queue[i].key === operation.key) queue.splice(i, 1);
// Dispatch a teardown signal for the stopped operation
nextOperation(
makeOperation('teardown', operation, operation.context)
);
})
);
} else {
result$ = pipe(
result$,
// Send mutation operation on start
onStart(() => {
nextOperation(operation);
})
);
}

// A mutation is always limited to just a single result and is never shared
if (operation.kind === 'mutation') {
return pipe(result$, take(1));
}

if (operation.kind === 'subscription') {
// Mask typename properties if the option for it is turned on
if (opts.maskTypename) {
result$ = pipe(
result$,
takeWhile(result => !!result.hasNext)
map(res => ({ ...res, data: maskTypename(res.data, true) }))
);
}

return pipe(
result$,
// End the results stream when an active teardown event is sent
takeUntil(
pipe(
operations.source,
filter(op => op.kind === 'teardown' && op.key === operation.key)
)
),
switchMap(result => {
if (operation.kind !== 'query' || result.stale) {
return fromValue(result);
}

return merge([
fromValue(result),
// Mark a result as stale when a new operation is sent for it
pipe(
operations.source,
filter(
op =>
op.kind === 'query' &&
op.key === operation.key &&
op.context.requestPolicy !== 'cache-only'
),
take(1),
map(() => ({ ...result, stale: true }))
),
]);
}),
onPush(result => {
dispatched.delete(operation.key);
replays.set(operation.key, result);
}),
onEnd(() => {
// Delete the active operation handle
dispatched.delete(operation.key);
replays.delete(operation.key);
active.delete(operation.key);
// Delete all queued up operations of the same key on end
for (let i = queue.length - 1; i >= 0; i--)
if (queue[i].key === operation.key) queue.splice(i, 1);
// Dispatch a teardown signal for the stopped operation
nextOperation(makeOperation('teardown', operation, operation.context));
}),
share
);
return share(result$);
};

const instance: Client =
Expand Down Expand Up @@ -736,41 +744,46 @@ export const Client: new (opts: ClientOptions) => Client = function Client(
}

return withPromise(
make<OperationResult>(observer => {
lazy<OperationResult>(() => {
let source = active.get(operation.key);
if (!source) {
active.set(operation.key, (source = makeResultSource(operation)));
}

return pipe(
source,
onStart(() => {
const prevReplay = replays.get(operation.key);
const isNetworkOperation =
operation.context.requestPolicy === 'cache-and-network' ||
operation.context.requestPolicy === 'network-only';
if (operation.kind !== 'query') {
return;
} else if (isNetworkOperation) {
dispatchOperation(operation);
if (prevReplay && !prevReplay.hasNext) prevReplay.stale = true;
}

if (
prevReplay != null &&
prevReplay === replays.get(operation.key)
) {
observer.next(prevReplay);
} else if (!isNetworkOperation) {
const isNetworkOperation =
operation.context.requestPolicy === 'cache-and-network' ||
operation.context.requestPolicy === 'network-only';
const replay = replays.get(operation.key);

if (operation.kind !== 'query' || !replay || isNetworkOperation) {
source = pipe(
source,
onStart(() => {
dispatchOperation(operation);
}
}),
onEnd(() => {
isOperationBatchActive = false;
observer.complete();
}),
subscribe(observer.next)
).unsubscribe;
})
);
}

if (operation.kind === 'query' && replay) {
return merge([
source,
pipe(
fromValue(replay),
filter(replay => {
if (replay === replays.get(operation.key)) {
if (isNetworkOperation && !replay.hasNext)
replay.stale = true;
return true;
} else {
if (!isNetworkOperation) dispatchOperation(operation);
return false;
}
})
),
]);
} else {
return source;
}
})
);
},
Expand Down
Loading