From 29982fd1e8868d5bbfef19214b432db835d75c7a Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Wed, 20 Sep 2023 17:55:08 -0500 Subject: [PATCH] fix(config): onStoppedNotification and onUnhandledError will now always async dispatch Resolves #7343 --- packages/rxjs/spec/operators/share-spec.ts | 176 +++++++++--------- packages/rxjs/src/internal/Subscriber.ts | 3 +- .../src/internal/util/reportUnhandledError.ts | 3 +- 3 files changed, 88 insertions(+), 94 deletions(-) diff --git a/packages/rxjs/spec/operators/share-spec.ts b/packages/rxjs/spec/operators/share-spec.ts index b38aed486c..c423c60f76 100644 --- a/packages/rxjs/spec/operators/share-spec.ts +++ b/packages/rxjs/spec/operators/share-spec.ts @@ -18,25 +18,12 @@ import { } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; import { observableMatcher } from '../helpers/observableMatcher'; -import { SinonSpy, spy } from 'sinon'; +import { spy } from 'sinon'; const syncNotify = of(1); const asapNotify = scheduled(syncNotify, asapScheduler); const syncError = throwError(() => new Error()); -function spyOnUnhandledError(fn: (spy: SinonSpy) => void): void { - const prevOnUnhandledError = config.onUnhandledError; - - try { - const onUnhandledError = spy(); - config.onUnhandledError = onUnhandledError; - - fn(onUnhandledError); - } finally { - config.onUnhandledError = prevOnUnhandledError; - } -} - /** @test {share} */ describe('share', () => { let rxTest: TestScheduler; @@ -810,9 +797,22 @@ describe('share', () => { }); }); - it('should not reset on refCount 0 if reset notifier errors before emitting any value', () => { - spyOnUnhandledError((onUnhandledError) => { + describe('when config.onUnhandledError is set', () => { + afterEach(() => { + config.onUnhandledError = null; + }); + + it('should not reset on refCount 0 if reset notifier errors before emitting any value', (done) => { const error = new Error(); + let calls = 0; + + config.onUnhandledError = spy((err) => { + calls++; + expect(err).to.equal(error); + if (calls === 2) { + done(); + } + }); rxTest.run(({ hot, cold, expectObservable, expectSubscriptions }) => { const source = hot(' ---1---2---3---4---(5 )---|'); @@ -830,17 +830,16 @@ describe('share', () => { expectObservable(result, subscription).toBe(expected); expectSubscriptions(source.subscriptions).toBe(sourceSubs); }); - - expect(onUnhandledError).to.have.been.calledTwice; - expect(onUnhandledError.getCall(0)).to.have.been.calledWithExactly(error); - expect(onUnhandledError.getCall(1)).to.have.been.calledWithExactly(error); }); - }); - it('should not reset on error if reset notifier errors before emitting any value', () => { - spyOnUnhandledError((onUnhandledError) => { + it('should not reset on error if reset notifier errors before emitting any value', (done) => { const error = new Error(); + config.onUnhandledError = spy((err) => { + expect(err).to.equal(error); + done(); + }); + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { const source = cold(' ---1---2---# '); const sourceSubs = ' ^----------! '; @@ -856,89 +855,86 @@ describe('share', () => { expectObservable(result, subscription).toBe(expected); expectSubscriptions(source.subscriptions).toBe(sourceSubs); }); - - expect(onUnhandledError).to.have.been.calledOnce; - expect(onUnhandledError.getCall(0)).to.have.been.calledWithExactly(error); }); }); - it('should not reset on complete if reset notifier errors before emitting any value', () => { - spyOnUnhandledError((onUnhandledError) => { - const error = new Error(); - - rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { - const source = cold(' ---1---2---| '); - const sourceSubs = ' ^----------! '; - const expected = ' ---1---2------|'; - const subscription = ' ^--------------'; - const firstPause = cold(' -------|'); - const reset = cold(' --# ', undefined, error); - - const sharedSource = source.pipe(share({ resetOnComplete: () => reset, resetOnRefCountZero: false }), take(2)); + it('should not reset on complete if reset notifier errors before emitting any value', (done) => { + const error = new Error(); - const result = concat(sharedSource, firstPause, sharedSource); - - expectObservable(result, subscription).toBe(expected); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); - }); - - expect(onUnhandledError).to.have.been.calledOnce; - expect(onUnhandledError.getCall(0)).to.have.been.calledWithExactly(error); + config.onUnhandledError = spy((err) => { + expect(err).to.equal(error); + done(); }); - }); - it('should not call "resetOnRefCountZero" on error', () => { rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { - const resetOnRefCountZero = spy(() => EMPTY); - - const source = cold(' ---1---(2#) '); - // source: ' ---1---(2#) ' - const sourceSubs = [ - ' ^------(! ) ', - // break the line, please - ' -------(- )---^------(! ) ', - ]; - const expected = ' ---1---(2 )------1---(2#) '; - const subscription = ' ^------(- )----------(- ) '; - const firstPause = cold(' (- )---| '); - const reset = cold(' (- )-r '); - // reset: ' (- )-r' + const source = cold(' ---1---2---| '); + const sourceSubs = ' ^----------! '; + const expected = ' ---1---2------|'; + const subscription = ' ^--------------'; + const firstPause = cold(' -------|'); + const reset = cold(' --# ', undefined, error); - const sharedSource = source.pipe(share({ resetOnError: () => reset, resetOnRefCountZero })); + const sharedSource = source.pipe(share({ resetOnComplete: () => reset, resetOnRefCountZero: false }), take(2)); - const result = concat(sharedSource.pipe(onErrorResumeNextWith(firstPause)), sharedSource); + const result = concat(sharedSource, firstPause, sharedSource); expectObservable(result, subscription).toBe(expected); expectSubscriptions(source.subscriptions).toBe(sourceSubs); - expect(resetOnRefCountZero).to.not.have.been.called; }); }); + }); - it('should not call "resetOnRefCountZero" on complete', () => { - rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { - const resetOnRefCountZero = spy(() => EMPTY); - - const source = cold(' ---1---(2|) '); - // source: ' ---1---(2|) ' - const sourceSubs = [ - ' ^------(! ) ', - // break the line, please - ' -------(- )---^------(! ) ', - ]; - const expected = ' ---1---(2 )------1---(2|) '; - const subscription = ' ^------(- )----------(- ) '; - const firstPause = cold(' (- )---| '); - const reset = cold(' (- )-r '); - // reset: ' (- )-r' - - const sharedSource = source.pipe(share({ resetOnComplete: () => reset, resetOnRefCountZero })); - - const result = concat(sharedSource, firstPause, sharedSource); + it('should not call "resetOnRefCountZero" on error', () => { + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const resetOnRefCountZero = spy(() => EMPTY); + + const source = cold(' ---1---(2#) '); + // source: ' ---1---(2#) ' + const sourceSubs = [ + ' ^------(! ) ', + // break the line, please + ' -------(- )---^------(! ) ', + ]; + const expected = ' ---1---(2 )------1---(2#) '; + const subscription = ' ^------(- )----------(- ) '; + const firstPause = cold(' (- )---| '); + const reset = cold(' (- )-r '); + // reset: ' (- )-r' + + const sharedSource = source.pipe(share({ resetOnError: () => reset, resetOnRefCountZero })); + + const result = concat(sharedSource.pipe(onErrorResumeNextWith(firstPause)), sharedSource); + + expectObservable(result, subscription).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + expect(resetOnRefCountZero).to.not.have.been.called; + }); + }); - expectObservable(result, subscription).toBe(expected); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); - expect(resetOnRefCountZero).to.not.have.been.called; - }); + it('should not call "resetOnRefCountZero" on complete', () => { + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const resetOnRefCountZero = spy(() => EMPTY); + + const source = cold(' ---1---(2|) '); + // source: ' ---1---(2|) ' + const sourceSubs = [ + ' ^------(! ) ', + // break the line, please + ' -------(- )---^------(! ) ', + ]; + const expected = ' ---1---(2 )------1---(2|) '; + const subscription = ' ^------(- )----------(- ) '; + const firstPause = cold(' (- )---| '); + const reset = cold(' (- )-r '); + // reset: ' (- )-r' + + const sharedSource = source.pipe(share({ resetOnComplete: () => reset, resetOnRefCountZero })); + + const result = concat(sharedSource, firstPause, sharedSource); + + expectObservable(result, subscription).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + expect(resetOnRefCountZero).to.not.have.been.called; }); }); }); diff --git a/packages/rxjs/src/internal/Subscriber.ts b/packages/rxjs/src/internal/Subscriber.ts index a429d81a34..c166456e0f 100644 --- a/packages/rxjs/src/internal/Subscriber.ts +++ b/packages/rxjs/src/internal/Subscriber.ts @@ -4,7 +4,6 @@ import { Subscription } from './Subscription'; import { config } from './config'; import { reportUnhandledError } from './util/reportUnhandledError'; import { nextNotification, errorNotification, COMPLETE_NOTIFICATION } from './NotificationFactories'; -import { timeoutProvider } from './scheduler/timeoutProvider'; export interface SubscriberOverrides { /** @@ -267,7 +266,7 @@ function createSafeObserver(observerOrNext?: Partial> | ((value: */ function handleStoppedNotification(notification: ObservableNotification, subscriber: Subscriber) { const { onStoppedNotification } = config; - onStoppedNotification && timeoutProvider.setTimeout(() => onStoppedNotification(notification, subscriber)); + onStoppedNotification && setTimeout(() => onStoppedNotification(notification, subscriber)); } function hasAddAndUnsubscribe(value: any): value is Subscription { diff --git a/packages/rxjs/src/internal/util/reportUnhandledError.ts b/packages/rxjs/src/internal/util/reportUnhandledError.ts index d9969563e4..aa23f58e2a 100644 --- a/packages/rxjs/src/internal/util/reportUnhandledError.ts +++ b/packages/rxjs/src/internal/util/reportUnhandledError.ts @@ -1,5 +1,4 @@ import { config } from '../config'; -import { timeoutProvider } from '../scheduler/timeoutProvider'; /** * Handles an error on another job either with the user-configured {@link onUnhandledError}, @@ -11,7 +10,7 @@ import { timeoutProvider } from '../scheduler/timeoutProvider'; * @param err the error to report */ export function reportUnhandledError(err: any) { - timeoutProvider.setTimeout(() => { + setTimeout(() => { const { onUnhandledError } = config; if (onUnhandledError) { // Execute the user-configured error handler.