Skip to content

Commit

Permalink
fix(rstream): PubSub dispatch & error handling
Browse files Browse the repository at this point in the history
- store last received value (if caching enabled)
- update error handler logic
  • Loading branch information
postspectacular committed Mar 12, 2021
1 parent 014bf20 commit cca0f34
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 5 deletions.
14 changes: 10 additions & 4 deletions packages/rstream/src/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,12 @@ export class PubSub<A, B = A, T = any> extends Subscription<A, B> {
!t &&
this.topics.set(
topicID,
(t = subscription(undefined, {
closeOut: CloseMode.NEVER,
}))
(t = subscription(
undefined,
optsWithID("topic", {
closeOut: CloseMode.NEVER,
})
))
);
return t.subscribe(sub, opts);
}
Expand Down Expand Up @@ -166,14 +169,17 @@ export class PubSub<A, B = A, T = any> extends Subscription<A, B> {

protected dispatch(x: B) {
LOGGER.debug(this.id, "dispatch", x);
this.cacheLast && (this.last = x);
const t = this.topicfn(x);
if (t !== undefined) {
const sub = this.topics.get(t);
if (sub) {
try {
sub.next && sub.next(x);
} catch (e) {
sub.error ? sub.error(e) : this.error(e);
if (!sub.error || !sub.error(e)) {
return this.unhandledError(e);
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion packages/rstream/src/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ export class Subscription<A, B> implements ISubscription<A, B> {
protected wrapped?: Partial<ISubscriber<B>>,
opts?: Partial<SubscriptionOpts<A, B>>
) {
opts = optsWithID(`$sub`, {
opts = optsWithID(`sub`, {
closeIn: CloseMode.LAST,
closeOut: CloseMode.LAST,
cache: true,
Expand Down

0 comments on commit cca0f34

Please sign in to comment.