diff --git a/packages/rstream/src/pubsub.ts b/packages/rstream/src/pubsub.ts index e411bc87de..788f67053c 100644 --- a/packages/rstream/src/pubsub.ts +++ b/packages/rstream/src/pubsub.ts @@ -118,9 +118,12 @@ export class PubSub extends Subscription { !t && this.topics.set( topicID, - (t = subscription(undefined, { - closeOut: CloseMode.NEVER, - })) + (t = subscription( + undefined, + optsWithID("topic", { + closeOut: CloseMode.NEVER, + }) + )) ); return t.subscribe(sub, opts); } @@ -166,6 +169,7 @@ export class PubSub extends Subscription { protected dispatch(x: B) { LOGGER.debug(this.id, "dispatch", x); + this.cacheLast && (this.last = x); const t = this.topicfn(x); if (t !== undefined) { const sub = this.topics.get(t); @@ -173,7 +177,9 @@ export class PubSub extends Subscription { try { sub.next && sub.next(x); } catch (e) { - sub.error ? sub.error(e) : this.error(e); + if (!sub.error || !sub.error(e)) { + return this.unhandledError(e); + } } } } diff --git a/packages/rstream/src/subscription.ts b/packages/rstream/src/subscription.ts index 52560f18b0..99d5cefdcb 100644 --- a/packages/rstream/src/subscription.ts +++ b/packages/rstream/src/subscription.ts @@ -97,7 +97,7 @@ export class Subscription implements ISubscription { protected wrapped?: Partial>, opts?: Partial> ) { - opts = optsWithID(`$sub`, { + opts = optsWithID(`sub`, { closeIn: CloseMode.LAST, closeOut: CloseMode.LAST, cache: true,