Skip to content

Commit

Permalink
feat(rx-stateful): allow custom accumulation function
Browse files Browse the repository at this point in the history
closes #53
  • Loading branch information
michaelbe812 committed Jun 30, 2023
1 parent 83f1c51 commit 04d2d53
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 9 deletions.
1 change: 1 addition & 0 deletions libs/rx-stateful/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ export {
RxStatefulWithError,
RxStatefulConfig,
} from './lib/types/types';
export { RxStatefulAccumulationFn } from './lib/types/accumulation-fn';
16 changes: 8 additions & 8 deletions libs/rx-stateful/src/lib/rx-stateful$.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,26 @@ import {
} from 'rxjs';
import {InternalRxState, RxStateful, RxStatefulConfig, RxStatefulWithError,} from './types/types';
import {_handleSyncValue} from './util/handle-sync-value';
import {defaultAccumulationFn} from "./types/accumulation-fn";

/**
* @publicApi
*/
export function rxStateful$<T, E = unknown>(source$: Observable<T>): RxStateful<T, E>;
export function rxStateful$<T, E = unknown>(source$: Observable<T>, config: RxStatefulConfig): RxStateful<T, E>;
export function rxStateful$<T, E = unknown>(source$: Observable<T>, config: RxStatefulConfig<T,E>): RxStateful<T, E>;
export function rxStateful$<T, E = unknown>(
source$: Observable<T>,
config?: RxStatefulConfig
config?: RxStatefulConfig<T,E>
): RxStateful<T, E> {
const error$$ = new Subject<RxStatefulWithError<T, E>>();
const mergedConfig: RxStatefulConfig = {
const mergedConfig: RxStatefulConfig<T,E> = {
keepValueOnRefresh: true,
...config,
};
const accumulationFn = mergedConfig.accumulationFn ?? defaultAccumulationFn;

const refreshTriggerIsBehaivorSubject = (config: RxStatefulConfig) =>

const refreshTriggerIsBehaivorSubject = (config: RxStatefulConfig<T,E>) =>
config.refreshTrigger$ instanceof BehaviorSubject;

const refresh$ = mergedConfig?.refreshTrigger$ ?? new Subject<unknown>();
Expand Down Expand Up @@ -87,10 +90,7 @@ export function rxStateful$<T, E = unknown>(

const state$ = merge(request$, refreshedRequest$, error$$).pipe(
scan(
// @ts-ignore
(acc, curr) => {
return { ...acc, ...curr };
},
accumulationFn,
{ isLoading: false, isRefreshing: false, value: undefined, error: undefined, context: 'idle' }
),
distinctUntilChanged(),
Expand Down
8 changes: 8 additions & 0 deletions libs/rx-stateful/src/lib/types/accumulation-fn.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import {InternalRxState} from './types';

export type RxStatefulAccumulationFn<T, E> = (
acc: InternalRxState<T, E>,
val: Partial<InternalRxState<T, E>>
) => InternalRxState<T, E>;

export const defaultAccumulationFn: RxStatefulAccumulationFn<any, any> = (acc, val) => ({ ...acc, ...val });
4 changes: 3 additions & 1 deletion libs/rx-stateful/src/lib/types/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {Observable, Subject} from 'rxjs';
import {RxStatefulAccumulationFn} from "./accumulation-fn";

/**
* @publicApi
Expand Down Expand Up @@ -53,7 +54,8 @@ export interface InternalRxState<T, E> {
/**
* @publicApi
*/
export interface RxStatefulConfig {
export interface RxStatefulConfig<T, E> {
refreshTrigger$?: Subject<any>;
keepValueOnRefresh?: boolean;
accumulationFn?: RxStatefulAccumulationFn<T, E>;
}

0 comments on commit 04d2d53

Please sign in to comment.