From 27ec52ecdb2770bd340f117c5d0f4e97e7c24942 Mon Sep 17 00:00:00 2001 From: Andrew Sisley Date: Thu, 6 Oct 2022 17:14:17 -0400 Subject: [PATCH] Notify subscribers on close --- events/events.go | 2 -- events/notification.go | 37 +++++++++++++++++++++++++++++++++++++ events/simple.go | 16 ++++++++-------- events/simple_test.go | 20 +++++++++++--------- events/subscription.go | 13 +++++++++++++ net/peer.go | 10 ++++++++-- 6 files changed, 77 insertions(+), 21 deletions(-) create mode 100644 events/notification.go create mode 100644 events/subscription.go diff --git a/events/events.go b/events/events.go index f283937677..f7f6391267 100644 --- a/events/events.go +++ b/events/events.go @@ -10,8 +10,6 @@ package events -type Subscription[T any] chan T - // Channel represents a subscribable type that will expose inputted items to subscribers. type Channel[T any] interface { // Subscribe subscribes to the Channel, returning a channel by which events can diff --git a/events/notification.go b/events/notification.go new file mode 100644 index 0000000000..32eb37bc62 --- /dev/null +++ b/events/notification.go @@ -0,0 +1,37 @@ +// Copyright 2022 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package events + +// Notification represents an actionable event in the subscription. +// This can either be a closing of the subscription, or an subscription item. +type Notification[T any] struct { + hasValue bool + value T +} + +func NewNotification[T any](value T) Notification[T] { + return Notification[T]{ + hasValue: true, + value: value, + } +} + +// Closed returns true if this is a notification that the source subscription +// has been closed, otherwise true. +func (n Notification[T]) Closed() bool { + return !n.hasValue +} + +// Value returns the value of this notification. Will be default if this is a +// close notification. +func (n Notification[T]) Value() T { + return n.value +} diff --git a/events/simple.go b/events/simple.go index 153f848c11..bda9bed9fd 100644 --- a/events/simple.go +++ b/events/simple.go @@ -13,9 +13,9 @@ package events import "github.com/sourcenetwork/defradb/errors" type simpleChannel[T any] struct { - subscribers []chan T - subscriptionChannel chan chan T - unsubscribeChannel chan chan T + subscribers []Subscription[T] + subscriptionChannel chan Subscription[T] + unsubscribeChannel chan Subscription[T] eventChannel chan T eventBufferSize int closeChannel chan struct{} @@ -28,8 +28,8 @@ type simpleChannel[T any] struct { // Should the buffers be filled subsequent calls to functions on this object may start to block. func NewSimpleChannel[T any](subscriberBufferSize int, eventBufferSize int) Channel[T] { c := simpleChannel[T]{ - subscriptionChannel: make(chan chan T, subscriberBufferSize), - unsubscribeChannel: make(chan chan T, subscriberBufferSize), + subscriptionChannel: make(chan Subscription[T], subscriberBufferSize), + unsubscribeChannel: make(chan Subscription[T], subscriberBufferSize), eventChannel: make(chan T, eventBufferSize), eventBufferSize: eventBufferSize, closeChannel: make(chan struct{}), @@ -46,7 +46,7 @@ func (c *simpleChannel[T]) Subscribe() (Subscription[T], error) { } // It is important to set this buffer size too, else we may end up blocked in the handleChannel func - ch := make(chan T, c.eventBufferSize) + ch := make(chan Notification[T], c.eventBufferSize) c.subscriptionChannel <- ch return ch, nil @@ -70,6 +70,7 @@ func (c *simpleChannel[T]) Close() { if c.isClosed { return } + c.isClosed = true c.closeChannel <- struct{}{} } @@ -77,7 +78,6 @@ func (c *simpleChannel[T]) handleChannel() { for { select { case <-c.closeChannel: - c.isClosed = true close(c.closeChannel) for _, subscriber := range c.subscribers { close(subscriber) @@ -112,7 +112,7 @@ func (c *simpleChannel[T]) handleChannel() { case item := <-c.eventChannel: for _, subscriber := range c.subscribers { - subscriber <- item + subscriber <- NewNotification(item) } } } diff --git a/events/simple_test.go b/events/simple_test.go index d4cc91047b..05b722a42e 100644 --- a/events/simple_test.go +++ b/events/simple_test.go @@ -59,11 +59,11 @@ func TestSimpleEachSubscribersRecievesEachItem(t *testing.T) { output2Ch1 := <-ch1 output2Ch2 := <-ch2 - assert.Equal(t, input1, output1Ch1) - assert.Equal(t, input1, output1Ch2) + assert.Equal(t, input1, output1Ch1.Value()) + assert.Equal(t, input1, output1Ch2.Value()) - assert.Equal(t, input2, output2Ch1) - assert.Equal(t, input2, output2Ch2) + assert.Equal(t, input2, output2Ch1.Value()) + assert.Equal(t, input2, output2Ch2.Value()) } func TestSimpleEachSubscribersRecievesEachItemGivenBufferedEventChan(t *testing.T) { @@ -86,11 +86,11 @@ func TestSimpleEachSubscribersRecievesEachItemGivenBufferedEventChan(t *testing. output2Ch1 := <-ch1 output2Ch2 := <-ch2 - assert.Equal(t, input1, output1Ch1) - assert.Equal(t, input1, output1Ch2) + assert.Equal(t, input1, output1Ch1.Value()) + assert.Equal(t, input1, output1Ch2.Value()) - assert.Equal(t, input2, output2Ch1) - assert.Equal(t, input2, output2Ch2) + assert.Equal(t, input2, output2Ch1.Value()) + assert.Equal(t, input2, output2Ch2.Value()) } func TestSimpleSubscribersDontRecieveItemsAfterUnsubscribing(t *testing.T) { @@ -105,6 +105,8 @@ func TestSimpleSubscribersDontRecieveItemsAfterUnsubscribing(t *testing.T) { // to do its thing with the pushed item. time.Sleep(5 * time.Millisecond) + result := <-ch + // closing the channel will result in reads yielding the default value - assert.Equal(t, 0, <-ch) + assert.True(t, result.Closed()) } diff --git a/events/subscription.go b/events/subscription.go new file mode 100644 index 0000000000..f59ad1c406 --- /dev/null +++ b/events/subscription.go @@ -0,0 +1,13 @@ +// Copyright 2022 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package events + +type Subscription[T any] chan Notification[T] diff --git a/net/peer.go b/net/peer.go index 10ca543c16..864281338b 100644 --- a/net/peer.go +++ b/net/peer.go @@ -33,6 +33,7 @@ import ( "github.com/sourcenetwork/defradb/core" corenet "github.com/sourcenetwork/defradb/core/net" "github.com/sourcenetwork/defradb/errors" + "github.com/sourcenetwork/defradb/events" "github.com/sourcenetwork/defradb/logging" "github.com/sourcenetwork/defradb/merkle/clock" pb "github.com/sourcenetwork/defradb/net/pb" @@ -48,7 +49,7 @@ type Peer struct { //config?? db client.DB - updateChannel chan client.UpdateEvent + updateChannel events.Subscription[client.UpdateEvent] host host.Host ps *pubsub.PubSub @@ -189,8 +190,13 @@ func (p *Peer) Close() error { func (p *Peer) handleBroadcastLoop() { log.Debug(p.ctx, "Waiting for messages on internal broadcaster") for { + notification := <-p.updateChannel + if notification.Closed() { + return + } + + update := notification.Value() log.Debug(p.ctx, "Handling internal broadcast bus message") - update := <-p.updateChannel // check log priority, 1 is new doc log // 2 is update log