Skip to content

Commit

Permalink
fix: confirm block import notifier is closed properly (#1736)
Browse files Browse the repository at this point in the history
* add TODOs to identify where block imported channel is handled

* added comments for imported channels

* create constructor for listeners

* added close channel to defer in listen

* move imported chan to block_notify

* remove comments, lint

* handle lint issues

* replace imported channel map with sync.Map

* fix mocks in listeners test

* fix mock functions for new imported notification channel

* fix deep source issues

* add debugging printf

* remove sync.Pool, and sync.Map

* handle channel closing

* add sleep before close

* remove channel close

* run go imported

* defer importedLock unlock

* wrap notifier channel in struct

* store channel by interface{} key

* update storage key for imported block listeners

* refacter GetImportedBlockNotifierChannel arugments

* GetImportedBlockNotifierChannel doesn't return error, fixed related test

* remove un-needed comments

* remove close for FinalisedChannel listener

* added test for free imported channel

* add mocks paths to .deepsource exclude_patterns
  • Loading branch information
edwardmack authored Sep 24, 2021
1 parent a96f06a commit ad2d85e
Show file tree
Hide file tree
Showing 22 changed files with 174 additions and 255 deletions.
4 changes: 3 additions & 1 deletion .deepsource.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ exclude_patterns = [
"dot/config/**/*",
"dot/rpc/modules/test_data",
"lib/runtime/test_data",
"**/*_test.go"
"**/*_test.go",
"**/mocks/*",
"**/mock_*"
]

[[analyzers]]
Expand Down
4 changes: 2 additions & 2 deletions dot/core/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ type BlockState interface {
GetSlotForBlock(common.Hash) (uint64, error)
GetFinalisedHeader(uint64, uint64) (*types.Header, error)
GetFinalisedHash(uint64, uint64) (common.Hash, error)
RegisterImportedChannel(ch chan<- *types.Block) (byte, error)
UnregisterImportedChannel(id byte)
GetImportedBlockNotifierChannel() chan *types.Block
FreeImportedBlockNotifierChannel(ch chan *types.Block)
RegisterFinalizedChannel(ch chan<- *types.FinalisationInfo) (byte, error)
UnregisterFinalisedChannel(id byte)
HighestCommonAncestor(a, b common.Hash) (common.Hash, error)
Expand Down
61 changes: 21 additions & 40 deletions dot/core/mocks/block_state.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 3 additions & 9 deletions dot/digest/digest.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ type Handler struct {

// block notification channels
imported chan *types.Block
importedID byte
finalised chan *types.FinalisationInfo
finalisedID byte

Expand All @@ -74,12 +73,9 @@ type resume struct {

// NewHandler returns a new Handler
func NewHandler(blockState BlockState, epochState EpochState, grandpaState GrandpaState) (*Handler, error) {
imported := make(chan *types.Block, 16)
imported := blockState.GetImportedBlockNotifierChannel()

finalised := make(chan *types.FinalisationInfo, 16)
iid, err := blockState.RegisterImportedChannel(imported)
if err != nil {
return nil, err
}

fid, err := blockState.RegisterFinalizedChannel(finalised)
if err != nil {
Expand All @@ -95,7 +91,6 @@ func NewHandler(blockState BlockState, epochState EpochState, grandpaState Grand
epochState: epochState,
grandpaState: grandpaState,
imported: imported,
importedID: iid,
finalised: finalised,
finalisedID: fid,
}, nil
Expand All @@ -111,9 +106,8 @@ func (h *Handler) Start() error {
// Stop stops the Handler
func (h *Handler) Stop() error {
h.cancel()
h.blockState.UnregisterImportedChannel(h.importedID)
h.blockState.FreeImportedBlockNotifierChannel(h.imported)
h.blockState.UnregisterFinalisedChannel(h.finalisedID)
close(h.imported)
close(h.finalised)
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions dot/digest/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
// BlockState interface for block state methods
type BlockState interface {
BestBlockHeader() (*types.Header, error)
RegisterImportedChannel(ch chan<- *types.Block) (byte, error)
UnregisterImportedChannel(id byte)
GetImportedBlockNotifierChannel() chan *types.Block
FreeImportedBlockNotifierChannel(ch chan *types.Block)
RegisterFinalizedChannel(ch chan<- *types.FinalisationInfo) (byte, error)
UnregisterFinalisedChannel(id byte)
}
Expand Down
3 changes: 1 addition & 2 deletions dot/rpc/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,7 @@ func (h *HTTPServer) Stop() error {
case *subscription.StorageObserver:
h.serverConfig.StorageAPI.UnregisterStorageObserver(v)
case *subscription.BlockListener:
h.serverConfig.BlockAPI.UnregisterImportedChannel(v.ChanID)
close(v.Channel)
h.serverConfig.BlockAPI.FreeImportedBlockNotifierChannel(v.Channel)
}
}

Expand Down
4 changes: 2 additions & 2 deletions dot/rpc/modules/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ type BlockAPI interface {
GetHighestFinalisedHash() (common.Hash, error)
HasJustification(hash common.Hash) (bool, error)
GetJustification(hash common.Hash) ([]byte, error)
RegisterImportedChannel(ch chan<- *types.Block) (byte, error)
UnregisterImportedChannel(id byte)
GetImportedBlockNotifierChannel() chan *types.Block
FreeImportedBlockNotifierChannel(ch chan *types.Block)
RegisterFinalizedChannel(ch chan<- *types.FinalisationInfo) (byte, error)
UnregisterFinalisedChannel(id byte)
SubChain(start, end common.Hash) ([]common.Hash, error)
Expand Down
8 changes: 5 additions & 3 deletions dot/rpc/modules/api_mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package modules

import (
modulesmocks "github.com/ChainSafe/gossamer/dot/rpc/modules/mocks"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
runtimemocks "github.com/ChainSafe/gossamer/lib/runtime/mocks"
"github.com/stretchr/testify/mock"
Expand All @@ -21,15 +22,16 @@ func NewMockStorageAPI() *modulesmocks.MockStorageAPI {
}

// NewMockBlockAPI creates and return an rpc BlockAPI interface mock
func NewMockBlockAPI() *modulesmocks.BlockAPI {
m := new(modulesmocks.BlockAPI)
func NewMockBlockAPI() *modulesmocks.MockBlockAPI {
m := new(modulesmocks.MockBlockAPI)
m.On("GetHeader", mock.AnythingOfType("common.Hash")).Return(nil, nil)
m.On("BestBlockHash").Return(common.Hash{})
m.On("GetBlockByHash", mock.AnythingOfType("common.Hash")).Return(nil, nil)
m.On("GetBlockHash", mock.AnythingOfType("*big.Int")).Return(nil, nil)
m.On("GetFinalisedHash", mock.AnythingOfType("uint64"), mock.AnythingOfType("uint64")).Return(common.Hash{}, nil)
m.On("GetHighestFinalisedHash").Return(common.Hash{}, nil)
m.On("RegisterImportedChannel", mock.AnythingOfType("chan<- *types.Block")).Return(byte(0), nil)
m.On("GetImportedBlockNotifierChannel").Return(make(chan *types.Block, 5))
m.On("FreeImportedBlockNotifierChannel", mock.AnythingOfType("chan *types.Block"))
m.On("UnregisterImportedChannel", mock.AnythingOfType("uint8"))
m.On("RegisterFinalizedChannel", mock.AnythingOfType("chan<- *types.FinalisationInfo")).Return(byte(0), nil)
m.On("UnregisterFinalizedChannel", mock.AnythingOfType("uint8"))
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion dot/rpc/modules/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func TestSyncState(t *testing.T) {
Number: big.NewInt(int64(49)),
}

blockapiMock := new(mocks.BlockAPI)
blockapiMock := new(mocks.MockBlockAPI)
blockapiMock.On("BestBlockHash").Return(fakeCommonHash)
blockapiMock.On("GetHeader", fakeCommonHash).Return(fakeHeader, nil).Once()

Expand Down
Loading

0 comments on commit ad2d85e

Please sign in to comment.