Skip to content

Commit

Permalink
fix(rx-stateful): handle errors correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelbe812 committed Jun 30, 2023
1 parent b435efa commit f834d42
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 127 deletions.
7 changes: 7 additions & 0 deletions libs/rx-stateful/src/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,8 @@
export * from './lib/rx-stateful$';
export {
RxStatefulContext,
Stateful,
RxStateful,
RxStatefulWithError,
RxStatefulConfig,
} from './lib/types/types';
46 changes: 30 additions & 16 deletions libs/rx-stateful/src/lib/rx-stateful$.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ describe('rxStateful$', () => {
);
source$.next(10);

refreshTrigger$.next();
refreshTrigger$.next();
refreshTrigger$.next(void 0);
refreshTrigger$.next(void 0);

expect(result.getValues()).toEqual([10, 10, 10]);
});
Expand All @@ -126,8 +126,8 @@ describe('rxStateful$', () => {
);
source$.next(10);

refreshTrigger$.next();
refreshTrigger$.next();
refreshTrigger$.next(void 0);
refreshTrigger$.next(void 0);

expect(result.getValues()).toEqual([10, null, 10, null, 10]);
});
Expand All @@ -141,7 +141,7 @@ describe('rxStateful$', () => {
);

source$.next(10);
refreshTrigger$.next();
refreshTrigger$.next(void 0);

expect(result.getValues()).toEqual([false, true]);
});
Expand All @@ -153,7 +153,7 @@ describe('rxStateful$', () => {
);

source$.next(10);
refreshTrigger$.next();
refreshTrigger$.next(void 0);

expect(result.getValues()).toEqual([false, true, false, true]);
});
Expand All @@ -167,7 +167,7 @@ describe('rxStateful$', () => {
);

source$.next(10);
refreshTrigger$.next();
refreshTrigger$.next(void 0);

expect(result.getValues()).toEqual([true, false, true, false]);
});
Expand All @@ -179,7 +179,7 @@ describe('rxStateful$', () => {
);

source$.next(10);
refreshTrigger$.next();
refreshTrigger$.next(void 0);

expect(result.getValues()).toEqual([true, false, true, false]);
});
Expand All @@ -191,30 +191,44 @@ describe('rxStateful$', () => {
const result = subscribeSpyTo(rxStateful$<number>(source$, { refreshTrigger$ }).context$);

source$.next(10);
refreshTrigger$.next();
refreshTrigger$.next(void 0);

expect(result.getValues()).toEqual(['suspense', 'next', 'suspense', 'next']);
});
});
describe('state$', () => {
it('should return the correct state', () => {
const source$ = new Subject<number>();
const source$ = new Subject<any>();
const refreshTrigger$ = new Subject<void>();
const result = subscribeSpyTo(rxStateful$<number>(source$, { refreshTrigger$ }).state$);

source$.next(10);
refreshTrigger$.next();
refreshTrigger$.next(void 0);
// todo #60
//source$.next(throwError(() => new Error('error')));

expect(result.getValues()).toEqual([
{ hasValue: false, isSuspense: true, hasError: false, context:'suspense' },
{ hasValue: false, isSuspense: true, hasError: false, context: 'suspense' },
{ hasValue: true, isSuspense: false, value: 10, hasError: false, context: 'next' },
{ hasValue: true, isSuspense: true, value: 10, context: 'suspense', hasError: false },
{ hasValue: true, isSuspense: false, value: 10, context: 'next', hasError: false },
// { hasValue: true, isSuspense: true, value: null, context: 'suspense', hasError: false },
//{ hasValue: false, isSuspense: false, value: null, context: 'error', hasError: true, error: 'error' },
]);
});
})
// todo
//describe('hasError$', () => {});
//describe('error$', () => {});
});
describe('hasError$', () => {
it('should return false true false true', () => {
const source$ = new Subject<Observable<any>>();
const refreshTrigger$ = new Subject<void>();
const result = subscribeSpyTo(rxStateful$<number>(source$.pipe(mergeAll()), { refreshTrigger$ }).hasError$);

source$.next(throwError(() => new Error('error')));
refreshTrigger$.next(void 0);
source$.next(throwError(() => new Error('error')));

expect(result.getValues()).toEqual([false, true, false, true]);
});
});
});
});
163 changes: 52 additions & 111 deletions libs/rx-stateful/src/lib/rx-stateful$.ts
Original file line number Diff line number Diff line change
@@ -1,128 +1,91 @@
import {
BehaviorSubject,
catchError,
debounce,
distinctUntilChanged,
filter,
map,
merge,
MonoTypeOperatorFunction,
NEVER,
Observable,
of,
pipe,
ReplaySubject,
scan,
share,
skip,
startWith,
Subject,
switchMap
} from "rxjs";

/**
* @publicApi
*/
export type RxStatefulContext = 'idle' | 'suspense' | 'error' | 'next';

/**
* @publicApi
*/
export interface Stateful<T, E> {
hasError: boolean;
error: E | undefined;

isSuspense: boolean;

context: RxStatefulContext;

value: T | null | undefined;
hasValue: boolean;

}

/**
* @publicApi
*/
export interface RxStateful<T, E>{
hasError$: Observable<boolean>;
error$: Observable<E | never>;

isSuspense$: Observable<boolean>;

value$: Observable<T | null | undefined>;
hasValue$: Observable<boolean>;

context$: Observable<RxStatefulContext>;

state$: Observable<Stateful<T, E>>
}

/**
* @internal
*/
interface InternalRxState<T, E> {
value: T | null | undefined;
isLoading: boolean;
isRefreshing: boolean;
error: E | undefined;
context: RxStatefulContext;
}

/**
* @publicApi
*/
export interface RxStatefulConfig {
refreshTrigger$?: Subject<any>;
keepValueOnRefresh?: boolean;
}
switchMap,
} from 'rxjs';
import {InternalRxState, RxStateful, RxStatefulConfig, RxStatefulWithError,} from './types/types';
import {_handleSyncValue} from './util/handle-sync-value';

/**
* @publicApi
*/
export function rxStateful$<T, E = unknown>(source$: Observable<T>): RxStateful<T, E>;
export function rxStateful$<T, E = unknown>(source$: Observable<T>, config: RxStatefulConfig): RxStateful<T, E>
export function rxStateful$<T, E = unknown>(source$: Observable<T>, config?: RxStatefulConfig): RxStateful<T, E> {

export function rxStateful$<T, E = unknown>(source$: Observable<T>, config: RxStatefulConfig): RxStateful<T, E>;
export function rxStateful$<T, E = unknown>(
source$: Observable<T>,
config?: RxStatefulConfig
): RxStateful<T, E> {
const error$$ = new Subject<RxStatefulWithError<T, E>>();
const mergedConfig: RxStatefulConfig = {
keepValueOnRefresh: true,
...config
}
...config,
};

const refreshTriggerIsBehaivorSubject = (config: RxStatefulConfig) => config.refreshTrigger$ instanceof BehaviorSubject;
const refreshTriggerIsBehaivorSubject = (config: RxStatefulConfig) =>
config.refreshTrigger$ instanceof BehaviorSubject;

const refresh$ = mergedConfig?.refreshTrigger$ ?? new Subject<unknown>();

const sharedSource$ = source$.pipe(
share({
connector: () => new ReplaySubject(1),
}),
catchError((error) => of({ error: error, context: 'error' })),
catchError((error: any) => {
error$$.next({ error: error?.message, context: 'error', hasError: true });
return NEVER;
})
);

const request$: Observable<Partial<InternalRxState<T, E>>> = sharedSource$.pipe(
map((v) => ({ value: v, isLoading: false, isRefreshing: false, context: 'next' } as Partial<InternalRxState<T, E>>)),
startWith({ isLoading: true, isRefreshing: false, context: 'suspense' }as Partial<InternalRxState<T, E>>)
map(
(v) =>
({ value: v, isLoading: false, isRefreshing: false, context: 'next', error: undefined } as Partial<
InternalRxState<T, E>
>)
),
startWith({ isLoading: true, isRefreshing: false, context: 'suspense' } as Partial<InternalRxState<T, E>>)
);


const refreshedRequest$: Observable<Partial<InternalRxState<T, E>>> = refresh$.pipe(
/**
* in case the refreshTrigger$ is a BehaviorSubject, we want to skip the first value
* bc otherwise the emissions are not correct. It will then emit 4 vales instead of 2.
* the 2 additional values come from isRefreshing which is not correct.
*/
// @ts-ignore
// @ts-ignore todo
refreshTriggerIsBehaivorSubject(mergedConfig) ? skip(1) : pipe(),
switchMap(() =>
sharedSource$.pipe(
map((v) => ({ value: v, isLoading: false, isRefreshing: false, context:'next' } as Partial<InternalRxState<T, E>>)),
map(
(v) =>
({ value: v, isLoading: false, isRefreshing: false, context: 'next', error: undefined } as Partial<
InternalRxState<T, E>
>)
),
mergedConfig?.keepValueOnRefresh
? startWith({ isLoading: true, isRefreshing: true, context: 'suspense' }as Partial<InternalRxState<T, E>>)
: startWith({ isLoading: true, isRefreshing: true, value: null, context: 'suspense' }as Partial<InternalRxState<T, E>>),
? startWith({ isLoading: true, isRefreshing: true, context: 'suspense', error: undefined } as Partial<
InternalRxState<T, E>
>)
: startWith({
isLoading: true,
isRefreshing: true,
value: null,
context: 'suspense',
error: undefined,
} as Partial<InternalRxState<T, E>>)
)
)
);

const state$ = merge(request$, refreshedRequest$).pipe(
const state$ = merge(request$, refreshedRequest$, error$$).pipe(
scan(
// @ts-ignore
(acc, curr) => {
Expand All @@ -140,19 +103,15 @@ export function rxStateful$<T, E = unknown>(source$: Observable<T>, config?: RxS
_handleSyncValue()
);

return {
const rxStateful$ = {
value$: state$.pipe(
map((state, index) => {
/**
* todo there is for sure a nicer way to do this.
*
* IF we don't do this we will have two emissions when we refresh and keepValueOnRefresh = true.
*/
if (
index !== 0 &&
!mergedConfig.keepValueOnRefresh &&
(state.isLoading || state.isRefreshing)
) {
if (index !== 0 && !mergedConfig.keepValueOnRefresh && (state.isLoading || state.isRefreshing)) {
return null;
}
if (!state.isLoading || !state.isRefreshing) {
Expand All @@ -167,35 +126,17 @@ export function rxStateful$<T, E = unknown>(source$: Observable<T>, config?: RxS
error$: state$.pipe(map((state) => state.error)),
context$: state$.pipe(map((state) => state.context)),
state$: state$.pipe(
map(state => ({
map((state) => ({
value: state.value,
hasValue: !!state.value,
hasError: !!state.error,
error: state.error,
isSuspense: state.isLoading || state.isRefreshing,
context: state.context
context: state.context,
})),
distinctUntilChanged()
)
}
}
function _handleSyncValue<T>(): MonoTypeOperatorFunction<any> {
return (source$: Observable<T>): Observable<T> => {
return new Observable<T>((observer) => {
const isReadySubject = new ReplaySubject<unknown>(1);

const subscription = source$
.pipe(
/* Wait for all synchronous processing to be done. */
debounce(() => isReadySubject)
)
.subscribe(observer);

/* Sync emitted values have been processed now.
* Mark source as ready and emit last computed state. */
isReadySubject.next(undefined);

return () => subscription.unsubscribe();
});
),
};

return rxStateful$;
}
Loading

0 comments on commit f834d42

Please sign in to comment.