Skip to content

Commit

Permalink
feat: add message system (#3)
Browse files Browse the repository at this point in the history
* Setup repo

* Create LICENSE

* Bump golangci/golangci-lint-action from 3 to 4 (#1)

Bumps [golangci/golangci-lint-action](https://github.com/golangci/golangci-lint-action) from 3 to 4.
- [Release notes](https://github.com/golangci/golangci-lint-action/releases)
- [Commits](golangci/golangci-lint-action@v3...v4)

---
updated-dependencies:
- dependency-name: golangci/golangci-lint-action
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* Initial foundation pseudocode

* Bump go version

* Add message proto definitions

* Add initial collector logic

* Add temporary fetch messages to the collector

* Significantly simplify the message storage

* Add unit tests for the collector

* Remove dupl lint

* Split up the subscribe unit test

* Simplify remove case

* Drop old message queue

* Change AddMessage sig

* Tidy go mod

* Remove leftover pb.go

* Tidy go mod

---------

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
  • Loading branch information
zivkovicmilos and dependabot[bot] authored Feb 25, 2024
1 parent 161a584 commit 7fb4c91
Show file tree
Hide file tree
Showing 11 changed files with 750 additions and 197 deletions.
3 changes: 2 additions & 1 deletion .github/golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ run:
modules-download-mode: readonly
allow-parallel-runners: false
go: ""
skip-dirs: # Temporary
- core

output:
uniq-by-line: false
Expand Down Expand Up @@ -77,7 +79,6 @@ linters:
- goimports # Unused imports
- goconst # Repeated strings that could be replaced by a constant
- dogsled # Checks assignments with too many blank identifiers (e.g. x, , , _, := f())
- dupl # Code clone detection
- errname # Checks that sentinel errors are prefixed with the Err and error types are suffixed with the Error
- errorlint # errorlint is a linter for that can be used to find code that will cause problems with the error wrapping scheme introduced in Go 1.13
- unused # Checks Go code for unused constants, variables, functions and types
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ fixalign:
protoc:
# Make sure the following prerequisites are installed before running these commands:
# https://grpc.io/docs/languages/go/quickstart/#prerequisites
protoc --proto_path=./ --go_out=./ --go-grpc_out=./ ./messages/proto/*.proto
protoc --go_out=./ ./messages/proto/*.proto
44 changes: 0 additions & 44 deletions core/message_queue.go

This file was deleted.

81 changes: 0 additions & 81 deletions core/message_queue_test.go

This file was deleted.

12 changes: 11 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,14 @@ module github.com/gnolang/go-tendermint

go 1.21

require google.golang.org/protobuf v1.32.0
require (
github.com/rs/xid v1.5.0
github.com/stretchr/testify v1.8.4
google.golang.org/protobuf v1.32.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
12 changes: 12 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc=
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
117 changes: 117 additions & 0 deletions messages/collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package messages

import (
"fmt"
"sync"
)

// msgType is the combined message type interface,
// for easy reference and type safety
type msgType interface {
ProposalMessage | PrevoteMessage | PrecommitMessage
}

type (
// collection are the actual received messages.
// Maps a unique identifier -> their message (of a specific type) to avoid duplicates.
// Identifiers are derived from <sender ID, height, round>.
// Each validator in the consensus needs to send at most 1 message of every type
// (minus the PROPOSAL, which is only sent by the proposer),
// so the message system needs to keep track of only 1 message per type, per validator, per view
collection[T msgType] map[string]*T
)

// Collector is a single message type collector
type Collector[T msgType] struct {
collection collection[T] // the message storage
subscriptions subscriptions[T] // the active message subscriptions

collectionMux sync.RWMutex
subscriptionsMux sync.RWMutex
}

// NewCollector creates a new message collector
func NewCollector[T msgType]() *Collector[T] {
return &Collector[T]{
collection: make(collection[T]),
subscriptions: make(subscriptions[T]),
}
}

// Subscribe creates a new collector subscription.
// Returns the channel for receiving messages,
// as well as the unsubscribe method
func (c *Collector[T]) Subscribe() (<-chan MsgCallback[T], func()) {
c.subscriptionsMux.Lock()
defer c.subscriptionsMux.Unlock()

// Create a new subscription
id, ch := c.subscriptions.add()

// Create the unsubscribe callback
unsubscribeFn := func() {
c.subscriptionsMux.Lock()
defer c.subscriptionsMux.Unlock()

c.subscriptions.remove(id)
}

// Notify the subscription immediately,
// since there can be existing messages in the collection.
// This action assumes the channel is not blocking (created with initial size),
// since the calling context does not have access to it yet at this point
notifySubscription(ch, c.GetMessages)

return ch, unsubscribeFn
}

// GetMessages returns the currently present messages in the collector
func (c *Collector[T]) GetMessages() []*T {
c.collectionMux.RLock()
defer c.collectionMux.RUnlock()

// Fetch the messages in the collection
return c.collection.getMessages()
}

// getMessages fetches the messages in the collection
func (c *collection[T]) getMessages() []*T {
messages := make([]*T, 0, len(*c))

for _, senderMessage := range *c {
messages = append(messages, senderMessage)
}

return messages
}

// AddMessage adds a new message to the collector
func (c *Collector[T]) AddMessage(view *View, from []byte, message *T) {
c.collectionMux.Lock()

// Add the message
c.collection.addMessage(
getCollectionKey(from, view),
message,
)

c.collectionMux.Unlock()

// Notify the subscriptions
c.subscriptionsMux.RLock()
defer c.subscriptionsMux.RUnlock()

c.subscriptions.notify(c.GetMessages)
}

// addMessage adds a new message to the collection
func (c *collection[T]) addMessage(key string, message *T) {
(*c)[key] = message
}

// getCollectionKey constructs a key based on the
// message sender and view information.
// This key guarantees uniqueness in the message store
func getCollectionKey(from []byte, view *View) string {
return fmt.Sprintf("%s_%d_%d", from, view.Height, view.Round)
}
Loading

0 comments on commit 7fb4c91

Please sign in to comment.