Skip to content

Commit

Permalink
fix: subscriptions enhancements
Browse files Browse the repository at this point in the history
  • Loading branch information
logaretm committed Mar 9, 2023
1 parent 8389d18 commit b88190b
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 19 deletions.
10 changes: 6 additions & 4 deletions packages/villus/src/handleSubscriptions.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { normalizeQuery } from '../../shared/src/utils';
import { ClientPlugin, ClientPluginOperation, ObservableLike, StandardOperationResult } from './types';
import { ClientPlugin, ClientPluginOperation, ObservableLike, StandardOperationResult, MaybePromise } from './types';

export type SubscriptionForwarder<TData = any> = (
operation: ClientPluginOperation & { query: string }
) => ObservableLike<StandardOperationResult<TData>>;
) => MaybePromise<ObservableLike<StandardOperationResult<TData>>>;

export function handleSubscriptions(forwarder: SubscriptionForwarder): ClientPlugin {
const forward = forwarder;

return function subscriptionsHandlerPlugin({ operation, useResult }) {
return async function subscriptionsHandlerPlugin({ operation, useResult }) {
if (operation.type !== 'subscription') {
return;
}
Expand All @@ -22,6 +22,8 @@ export function handleSubscriptions(forwarder: SubscriptionForwarder): ClientPlu
throw new Error('A query must be provided.');
}

useResult(forward({ ...operation, query: normalizedQuery }) as any, true);
const result = await forward({ ...operation, query: normalizedQuery });

useResult(result as any, true);
};
}
2 changes: 2 additions & 0 deletions packages/villus/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ export type StandardOperationResult<TData = any> = ExecutionResult<TData>;

export type QueryVariables = Record<string, any>;

export type MaybePromise<T> = T | Promise<T>;

export interface ObserverLike<T> {
next: (value: T) => void;
error: (err: any) => void;
Expand Down
57 changes: 43 additions & 14 deletions packages/villus/src/useSubscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@ import {
MaybeRef,
StandardOperationResult,
QueryPredicateOrSignal,
MaybeLazyOrRef,
} from './types';
import { CombinedError, isWatchable, unravel } from './utils';
import { CombinedError, isWatchable, unravel, unwrap, debounceAsync, isEqual } from './utils';
import { Operation } from '../../shared/src';
import { Client, resolveClient } from './client';

interface SubscriptionCompositeOptions<TData, TVars> {
query: MaybeRef<Operation<TData, TVars>['query']>;
variables?: MaybeRef<TVars>;
variables?: MaybeLazyOrRef<TVars>;
skip?: QueryPredicateOrSignal<TVars>;
paused?: QueryPredicateOrSignal<TVars>;
client?: Client;
}
Expand All @@ -27,7 +29,7 @@ export function useSubscription<TData = any, TResult = TData, TVars = QueryVaria
reduce: Reducer<TData, TResult> = defaultReducer
) {
const client = opts.client ?? resolveClient();
const { query, variables, paused } = opts;
const { query, variables, paused, skip } = opts;
const data = ref<TResult | null>(reduce(null, { data: null, error: null }));
const error: Ref<CombinedError | null> = ref(null);
const isPaused = computed(() => unravel(paused, variables as TVars));
Expand All @@ -41,13 +43,17 @@ export function useSubscription<TData = any, TResult = TData, TVars = QueryVaria
* if can not getCurrentInstance, the func use outside of setup, cannot get onMounted
* when outside of setup initObserver immediately.
*/
let observer: Unsubscribable;
let observer: Unsubscribable | undefined;

const subscribe = debounceAsync(async function subscribe() {
unsubscribe();
if (shouldSkip()) {
return;
}

async function initObserver() {
observer?.unsubscribe();
const result = await client.executeSubscription<TData, TVars>({
query: unref(query),
variables: unref(variables) as TVars,
variables: unwrap(variables),
});

observer = result.subscribe({
Expand All @@ -74,30 +80,53 @@ export function useSubscription<TData = any, TResult = TData, TVars = QueryVaria
});

return observer;
});

function unsubscribe() {
observer?.unsubscribe();
observer = undefined;
}

const vm = getCurrentInstance();
if (!isPaused.value) {
vm ? onMounted(initObserver) : initObserver();
if (!isPaused.value && !shouldSkip()) {
vm ? onMounted(subscribe) : subscribe();
}

// TODO: if outside of setup, it should be recommend manually pause it(or some action else)
vm && onBeforeUnmount(() => observer?.unsubscribe());
vm && onBeforeUnmount(unsubscribe);

function shouldSkip() {
return unravel(skip, unwrap(variables) || {});
}

if (isWatchable(paused)) {
watch(paused, val => {
if (!val) {
initObserver();
subscribe();
}
});
}

if (isRef(query)) {
watch(query, initObserver);
watch(query, subscribe);
}

if (isRef(variables)) {
watch(variables, initObserver);
if (isWatchable(variables)) {
watch(variables, (value, oldValue) => {
if (!isEqual(value, oldValue)) {
subscribe();
}
});
}

if (isWatchable(skip)) {
watch(shouldSkip, (value, oldValue) => {
if (value === oldValue) {
return;
}

value ? unsubscribe() : subscribe();
});
}

return { data, error, paused: isPaused };
Expand Down
29 changes: 28 additions & 1 deletion packages/villus/src/utils/common.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { isReactive, isRef, Ref, unref } from 'vue';
import { isReactive, isRef, Ref, unref, nextTick } from 'vue';
import { MaybeLazyOrRef, QueryVariables, QueryPredicateOrSignal, MaybeRef } from '../types';
import stringify from 'fast-json-stable-stringify';

export function unravel<TVars = QueryVariables>(
signal: QueryPredicateOrSignal<TVars> | undefined,
Expand Down Expand Up @@ -36,3 +37,29 @@ export function arrayToExistHash<T extends string | number>(items: T[]): Record<
return acc;
}, {} as Record<string, boolean>);
}

export function debounceAsync<TFunction extends (...args: any) => Promise<any>, TResult = ReturnType<TFunction>>(
inner: TFunction
): (...args: Parameters<TFunction>) => Promise<TResult> {
let resolves: any[] = [];
let ticking = false;

return function (...args: Parameters<TFunction>) {
if (!ticking) {
nextTick(() => {
const result = inner(...(args as any));
resolves.forEach(r => r(result));
resolves = [];
ticking = false;
});

ticking = true;
}

return new Promise<TResult>(resolve => resolves.push(resolve));
};
}

export function isEqual(lhs: unknown, rhs: unknown) {
return stringify(lhs) === stringify(rhs);
}
41 changes: 41 additions & 0 deletions packages/villus/test/useSubscription.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,47 @@ test('Can pause subscriptions initially', async () => {
expect(document.querySelectorAll('li')).toHaveLength(2);
});

test('Skips subscribing if skip is true', async () => {
const unSubSpy = jest.fn();
const subSpy = jest.fn(() => ({
subscribe() {
return {
unsubscribe: unSubSpy,
};
},
}));

const id = ref(0);

mount({
setup() {
useClient({
url: 'https://test.com/graphql',
use: [handleSubscriptions(subSpy), ...defaultPlugins()],
});

const query = `subscription (id) { newMessages }`;

const { data } = useSubscription<Message>({ query, variables: () => ({ id: id.value }), skip: ({ id }) => !id });

return { messages: data };
},
template: `<div></div>`,
});

await flushPromises();
expect(subSpy).not.toHaveBeenCalled();
expect(unSubSpy).not.toHaveBeenCalled();
id.value++;
await flushPromises();
expect(subSpy).toHaveBeenCalledTimes(1);
expect(unSubSpy).toHaveBeenCalledTimes(0);
id.value--;
await flushPromises();
expect(subSpy).toHaveBeenCalledTimes(1);
expect(unSubSpy).toHaveBeenCalledTimes(1);
});

test('Fails if provider was not resolved', () => {
try {
mount({
Expand Down

0 comments on commit b88190b

Please sign in to comment.