Skip to content

Commit

Permalink
(vue) - Simplify useQuery implementation (#1758)
Browse files Browse the repository at this point in the history
* Replace useQuery implementation

* Fix fetching.value timings in useQuery and useSubscription

* Align useQuery and useSubscription implementations

* Add changeset
  • Loading branch information
kitten authored Jul 6, 2021
1 parent 891a17a commit a0fcea4
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 126 deletions.
5 changes: 5 additions & 0 deletions .changeset/strange-windows-repair.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@urql/vue': patch
---

Refactor `useQuery` implementation to utilise the single-source implementation of `@urql/[email protected]`. This should improve the stability of promisified `useQuery()` calls and prevent operations from not being issued in some edge cases.
152 changes: 50 additions & 102 deletions packages/vue-urql/src/useQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,7 @@ import { DocumentNode } from 'graphql';

import { WatchStopHandle, Ref, ref, watchEffect, reactive, isRef } from 'vue';

import {
Source,
concat,
switchAll,
share,
fromValue,
makeSubject,
filter,
map,
pipe,
take,
publish,
onEnd,
onStart,
onPush,
toPromise,
} from 'wonka';
import { Source, map, pipe, take, subscribe, onEnd, toPromise } from 'wonka';

import {
Client,
Expand Down Expand Up @@ -70,26 +54,6 @@ const watchOptions = {
flush: 'pre' as const,
};

/** Wonka Operator to replay the most recent value to sinks */
function replayOne<T>(source: Source<T>): Source<T> {
let cached: undefined | T;

return concat([
pipe(
fromValue(cached!),
map(() => cached!),
filter(x => x !== undefined)
),
pipe(
source,
onPush(value => {
cached = value;
}),
share
),
]);
}

export function useQuery<T = any, V = object>(
args: UseQueryArgs<T, V>
): UseQueryResponse<T, V> {
Expand Down Expand Up @@ -118,10 +82,7 @@ export function callUseQuery<T = any, V = object>(
createRequest<T, V>(args.query, args.variables as V) as any
);

const source: Ref<Source<Source<any>>> = ref(null as any);
const next: Ref<
(query$: undefined | Source<OperationResult<T, V>>) => void
> = ref(null as any);
const source: Ref<Source<OperationResult> | undefined> = ref();

stops.push(
watchEffect(() => {
Expand All @@ -132,6 +93,17 @@ export function callUseQuery<T = any, V = object>(
}, watchOptions)
);

stops.push(
watchEffect(() => {
source.value = !isPaused.value
? client.executeQuery<T, V>(request.value, {
requestPolicy: args.requestPolicy,
...args.context,
})
: undefined;
}, watchOptions)
);

const state: UseQueryState<T, V> = {
data,
stale,
Expand All @@ -141,13 +113,11 @@ export function callUseQuery<T = any, V = object>(
fetching,
isPaused,
executeQuery(opts?: Partial<OperationContext>): UseQueryResponse<T, V> {
next.value(
client.executeQuery<T, V>(request.value, {
requestPolicy: args.requestPolicy,
...args.context,
...opts,
})
);
source.value = client.executeQuery<T, V>(request.value, {
requestPolicy: args.requestPolicy,
...args.context,
...opts,
});

return response;
},
Expand All @@ -159,45 +129,34 @@ export function callUseQuery<T = any, V = object>(
},
};

const getState = () => state;

stops.push(
watchEffect(
onInvalidate => {
const subject = makeSubject<Source<any>>();
source.value = pipe(subject.source, replayOne);
next.value = (value: undefined | Source<any>) => {
const query$ = pipe(
value
? pipe(
value,
onStart(() => {
fetching.value = true;
stale.value = false;
}),
onPush(res => {
data.value = res.data;
stale.value = !!res.stale;
fetching.value = false;
error.value = res.error;
operation.value = res.operation;
extensions.value = res.extensions;
}),
share
)
: fromValue(undefined),
onEnd(() => {
fetching.value = false;
stale.value = false;
})
if (source.value) {
fetching.value = true;
stale.value = false;

onInvalidate(
pipe(
source.value,
onEnd(() => {
fetching.value = false;
stale.value = false;
}),
subscribe(res => {
data.value = res.data;
stale.value = !!res.stale;
fetching.value = false;
error.value = res.error;
operation.value = res.operation;
extensions.value = res.extensions;
})
).unsubscribe
);

subject.next(query$);
};

onInvalidate(
pipe(source.value, switchAll, map(getState), publish).unsubscribe
);
} else {
fetching.value = false;
stale.value = false;
}
},
{
// NOTE: This part of the query pipeline is only initialised once and will need
Expand All @@ -207,28 +166,17 @@ export function callUseQuery<T = any, V = object>(
)
);

stops.push(
watchEffect(() => {
next.value(
!isPaused.value
? client.executeQuery<T, V>(request.value, {
requestPolicy: args.requestPolicy,
...args.context,
})
: undefined
);
}, watchOptions)
);

const response: UseQueryResponse<T, V> = {
...state,
then(onFulfilled, onRejected) {
return pipe(
source.value,
switchAll,
map(getState),
take(1),
toPromise
return (source.value
? pipe(
source.value,
take(1),
map(() => state),
toPromise
)
: Promise.resolve(state)
).then(onFulfilled, onRejected);
},
};
Expand Down
38 changes: 14 additions & 24 deletions packages/vue-urql/src/useSubscription.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* eslint-disable react-hooks/rules-of-hooks */

import { DocumentNode } from 'graphql';
import { Source, pipe, publish, share, onStart, onPush, onEnd } from 'wonka';
import { Source, pipe, subscribe, onEnd } from 'wonka';

import { WatchStopHandle, Ref, ref, watchEffect, reactive, isRef } from 'vue';

Expand Down Expand Up @@ -98,32 +98,24 @@ export function callUseSubscription<T = any, R = T, V = object>(

stops.push(
watchEffect(() => {
if (!isPaused.value) {
source.value = pipe(
client.executeSubscription<T, V>(request.value, {
...args.context,
}),
share
);
} else {
source.value = undefined;
}
source.value = !isPaused.value
? client.executeSubscription<T, V>(request.value, { ...args.context })
: undefined;
}, watchOptions)
);

stops.push(
watchEffect(onInvalidate => {
if (source.value) {
fetching.value = true;

onInvalidate(
pipe(
source.value,
onStart(() => {
fetching.value = true;
}),
onEnd(() => {
fetching.value = false;
}),
onPush(result => {
subscribe(result => {
fetching.value = true;
(data.value =
result.data !== undefined
Expand All @@ -135,10 +127,11 @@ export function callUseSubscription<T = any, R = T, V = object>(
extensions.value = result.extensions;
stale.value = !!result.stale;
operation.value = result.operation;
}),
publish
})
).unsubscribe
);
} else {
fetching.value = false;
}
}, watchOptions)
);
Expand All @@ -154,13 +147,10 @@ export function callUseSubscription<T = any, R = T, V = object>(
executeSubscription(
opts?: Partial<OperationContext>
): UseSubscriptionState<T, R, V> {
source.value = pipe(
client.executeSubscription<T, V>(request.value, {
...args.context,
...opts,
}),
share
);
source.value = client.executeSubscription<T, V>(request.value, {
...args.context,
...opts,
});

return state;
},
Expand Down

0 comments on commit a0fcea4

Please sign in to comment.