Skip to content

Commit

Permalink
Create integration for using a atomic broadcast. (#4)
Browse files Browse the repository at this point in the history
The current implementation is backed by etcd. Using this
approach we should have a primitive that is consistent and totally
orders all messages.

Along with this changes, added a generic interface to be possible to
use different atomic broadcast protocols.
  • Loading branch information
jabolina authored Feb 27, 2021
1 parent 0d99c86 commit 0e7cc81
Show file tree
Hide file tree
Showing 18 changed files with 1,378 additions and 377 deletions.
19 changes: 8 additions & 11 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,11 @@ jobs:
- name: Check out code into the Go module directory
uses: actions/checkout@v2

- name: Setup RabbitMQ with username and password
uses: getong/[email protected]
with:
rabbitmq version: '3.8.2-management-alpine'
host port: 5672
rabbitmq user: 'guest'
rabbitmq password: 'guest'
rabbitmq vhost: '/'

- name: All
run: make ci
- name: Setup etcd server and make
run: |
ETCD_VER=$(curl --silent https://api.github.com/repos/etcd-io/etcd/releases/latest | grep "tag_name" | cut -d ' ' -f4 | awk -F'"' '$0=$2')
curl -sL https://storage.googleapis.com/etcd/${ETCD_VER}/etcd-${ETCD_VER}-linux-amd64.tar.gz -o ./etcd.tar.gz
mkdir etcd && tar xzvf etcd.tar.gz -C etcd --strip-components=1
etcd/etcd > /dev/null &
sleep 30
make ci
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ endif

test_rule: # @HELP execute tests
@echo "executing tests"
GOTRACEBACK=all go test $(TESTARGS) -count=5 -timeout=120s -race ./test/...
GOTRACEBACK=all go test $(TESTARGS) -count=5 -timeout=120s -tags batchtest -race ./test/...
GOTRACEBACK=all go test $(TESTARGS) -count=1 -timeout=60s -race ./test/...
GOTRACEBACK=all go test $(TESTARGS) -count=1 -timeout=60s -tags batchtest -race ./test/...

lint: # @HELP lint files and format if possible
@echo "executing linter"
Expand Down
16 changes: 11 additions & 5 deletions _examples/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,32 @@ import (
"os/signal"
)

func produce(r *relt.Relt, reader io.Reader) {
func produce(r *relt.Relt, reader io.Reader, ctx context.Context) {
for {
println("Write message:")
scan := bufio.NewScanner(reader)
for scan.Scan() {
message := relt.Send{
Address: relt.DefaultExchangeName,
Data: scan.Bytes(),
}
log.Infof("Publishing message %s to group %s", string(message.Data), message.Address)
if err := r.Broadcast(message); err != nil {
if err := r.Broadcast(ctx, message); err != nil {
log.Errorf("failed sending %#v: %v", message, err)
}
}
}
}

func consume(r *relt.Relt, ctx context.Context) {
listener, err := r.Consume()
if err != nil {
return
}

for {
select {
case message := <-r.Consume():
case message := <-listener:
if message.Error != nil {
log.Errorf("message with error: %#v", message)
}
Expand All @@ -43,11 +49,11 @@ func consume(r *relt.Relt, ctx context.Context) {
func main() {
conf := relt.DefaultReltConfiguration()
conf.Name = "local-test"
relt := relt.NewRelt(*conf)
relt, _ := relt.NewRelt(*conf)
ctx, done := context.WithCancel(context.Background())

go func() {
produce(relt, os.Stdin)
produce(relt, os.Stdin, ctx)
}()

go func() {
Expand Down
32 changes: 26 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,30 @@ module github.com/jabolina/relt
go 1.14

require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275
github.com/sirupsen/logrus v1.6.0 // indirect
github.com/streadway/amqp v1.0.0
gopkg.in/alecthomas/kingpin.v2 v2.2.6 // indirect
github.com/coreos/bbolt v0.0.0-00010101000000-000000000000 // indirect
github.com/coreos/etcd v3.3.25+incompatible
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/google/uuid v1.2.0
github.com/grpc-ecosystem/go-grpc-middleware v1.2.2 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/jonboulle/clockwork v0.2.2 // indirect
github.com/prometheus/client_golang v1.9.0 // indirect
github.com/prometheus/common v0.15.0
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
go.uber.org/goleak v1.1.10
go.uber.org/zap v1.16.0 // indirect
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324 // indirect
google.golang.org/genproto v0.0.0-20210212180131-e7f2df4ecc2d // indirect
google.golang.org/grpc v1.35.0 // indirect
sigs.k8s.io/yaml v1.2.0 // indirect
)

replace (
github.com/coreos/bbolt => go.etcd.io/bbolt v1.3.5
google.golang.org/grpc v1.35.0 => google.golang.org/grpc v1.26.0
)
533 changes: 507 additions & 26 deletions go.sum

Large diffs are not rendered by default.

31 changes: 31 additions & 0 deletions internal/atomic_flag.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package internal

import "sync/atomic"

const (
// Constant to represent the `active` state on the Flag.
active = 0x0

// Constant to represent the `inactive` state on the Flag.
inactive = 0x1
)

// An atomic boolean implementation, to act specifically as a flag.
type Flag struct {
flag int32
}

// Verify if the flag still on `active` state.
func (f *Flag) IsActive() bool {
return atomic.LoadInt32(&f.flag) == active
}

// Verify if the flag is on `inactive` state.
func (f *Flag) IsInactive() bool {
return atomic.LoadInt32(&f.flag) == inactive
}

// Transition the flag from `active` to `inactive`.
func (f *Flag) Inactivate() bool {
return atomic.CompareAndSwapInt32(&f.flag, active, inactive)
}
177 changes: 177 additions & 0 deletions internal/coordinator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package internal

import (
"context"
"github.com/coreos/etcd/clientv3"
"io"
"time"
)

// A single write requests to be applied to etcd.
type request struct {
// Issuer writer context.
ctx context.Context

// Event to be sent to etcd.
event Event

// Channel to send response back.
response chan error
}

// Configuration for the coordinator.
type CoordinatorConfiguration struct {
// Each Coordinator will handle only a single partition.
// This will avoid peers with overlapping partitions.
Partition string

// Address for etcd server.
Server string

// Parent context that the Coordinator will derive it's own context.
Ctx context.Context

// Handler for managing goroutines.
Handler *GoRoutineHandler
}

// Coordinator interface that should be implemented by the
// atomic broadcast handler.
// Commands should be issued through the coordinator to be delivered
// to other peers
type Coordinator interface {
io.Closer

// Watch for changes on the partition.
// After called, this method will start a new goroutine that only
// returns when the Coordinator context is done.
Watch(received chan<- Event) error

// Issues an Event.
Write(ctx context.Context, event Event) <-chan error
}

// Create a new Coordinator using the given configuration.
// The current implementation is the EtcdCoordinator, backed by etcd.
func NewCoordinator(configuration CoordinatorConfiguration) (Coordinator, error) {
cli, err := clientv3.New(clientv3.Config{
DialTimeout: 30 * time.Second,
Endpoints: []string{configuration.Server},
})
if err != nil {
return nil, err
}
kv := clientv3.NewKV(cli)
ctx, cancel := context.WithCancel(configuration.Ctx)
coord := &EtcdCoordinator{
configuration: configuration,
cli: cli,
kv: kv,
ctx: ctx,
cancel: cancel,
writeChan: make(chan request),
}
configuration.Handler.Spawn(coord.writer)
return coord, nil
}

// EtcdCoordinator will use etcd for atomic broadcast.
type EtcdCoordinator struct {
// Configuration parameters.
configuration CoordinatorConfiguration

// Current Coordinator context, created from the parent context.
ctx context.Context

// Function to cancel the current context.
cancel context.CancelFunc

// A client for the etcd server.
cli *clientv3.Client

// The key-value entry point for issuing requests.
kv clientv3.KV

// Channel to receive write requests.
writeChan chan request
}

// Listen and apply write requests.
// This will keep running while the application context is available.
// Receiving commands through the channel will ensure that they are
// applied synchronously to the etcd.
func (e *EtcdCoordinator) writer() {
for {
select {
case <-e.ctx.Done():
return
case req := <-e.writeChan:
_, err := e.kv.Put(req.ctx, req.event.Key, string(req.event.Value))
req.response <- err
}
}
}

// Starts a new coroutine for watching the Coordinator partition.
// All received information will be published back through the channel
// received as parameter.
//
// After calling a routine will run bounded to the application lifetime.
func (e *EtcdCoordinator) Watch(received chan<- Event) error {
watchChan := e.cli.Watch(e.ctx, e.configuration.Partition)
watchChanges := func() {
for response := range watchChan {
select {
case <-e.ctx.Done():
return
default:
e.handleResponse(response, received)
}
}
}
e.configuration.Handler.Spawn(watchChanges)
return nil
}

// Write the given event using the KV interface.
func (e *EtcdCoordinator) Write(ctx context.Context, event Event) <-chan error {
res := make(chan error)
e.writeChan <- request{
ctx: ctx,
event: event,
response: res,
}
return res
}

// Stop the etcd client connection.
func (e *EtcdCoordinator) Close() error {
e.cancel()
return e.cli.Close()
}

// This method is responsible for handling events from the etcd client.
//
// This method will transform each received event into Event object and
// publish it back using the given channel. A buffered channel will be created
// and a goroutine will be spawned, so we can publish the received messages
// asynchronously without blocking. This can cause the Close to hold, if there
// exists pending messages to be consumed by the channel, this method can cause a deadlock.
func (e *EtcdCoordinator) handleResponse(response clientv3.WatchResponse, received chan<- Event) {
buffered := make(chan Event, len(response.Events))
defer close(buffered)

e.configuration.Handler.Spawn(func() {
for ev := range buffered {
received <- ev
}
})

for _, event := range response.Events {
buffered <- Event{
Key: string(event.Kv.Key),
Value: event.Kv.Value,
Error: nil,
}
}
}
Loading

0 comments on commit 0e7cc81

Please sign in to comment.