Skip to content

Commit

Permalink
finished etcd migration
Browse files Browse the repository at this point in the history
  • Loading branch information
jabolina committed Feb 27, 2021
1 parent 958e288 commit 0194b13
Show file tree
Hide file tree
Showing 10 changed files with 321 additions and 63 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ endif

test_rule: # @HELP execute tests
@echo "executing tests"
GOTRACEBACK=all go test $(TESTARGS) -count=5 -timeout=120s -race ./test/...
GOTRACEBACK=all go test $(TESTARGS) -count=5 -timeout=120s -tags batchtest -race ./test/...
GOTRACEBACK=all go test $(TESTARGS) -count=1 -timeout=60s -race ./test/...
GOTRACEBACK=all go test $(TESTARGS) -count=1 -timeout=60s -tags batchtest -race ./test/...

lint: # @HELP lint files and format if possible
@echo "executing linter"
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ require (
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/google/uuid v1.2.0 // indirect
github.com/google/uuid v1.2.0
github.com/grpc-ecosystem/go-grpc-middleware v1.2.2 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/jonboulle/clockwork v0.2.2 // indirect
github.com/prometheus/client_golang v1.9.0 // indirect
github.com/prometheus/common v0.15.0
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
go.uber.org/goleak v1.1.10
go.uber.org/zap v1.16.0 // indirect
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324 // indirect
google.golang.org/genproto v0.0.0-20210212180131-e7f2df4ecc2d // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@ go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A=
Expand Down Expand Up @@ -450,6 +452,7 @@ golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBn
golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200207183749-b753a1ba74fa/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
Expand Down
51 changes: 44 additions & 7 deletions internal/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,18 @@ import (
"time"
)

// A single write requests to be applied to etcd.
type request struct {
// Issuer writer context.
ctx context.Context

// Event to be sent to etcd.
event Event

// Channel to send response back.
response chan error
}

// Configuration for the coordinator.
type CoordinatorConfiguration struct {
// Each Coordinator will handle only a single partition.
Expand Down Expand Up @@ -36,7 +48,7 @@ type Coordinator interface {
Watch(received chan<- Event) error

// Issues an Event.
Write(ctx context.Context, event Event) error
Write(ctx context.Context, event Event) <-chan error
}

// Create a new Coordinator using the given configuration.
Expand All @@ -57,7 +69,9 @@ func NewCoordinator(configuration CoordinatorConfiguration) (Coordinator, error)
kv: kv,
ctx: ctx,
cancel: cancel,
writeChan: make(chan request),
}
configuration.Handler.Spawn(coord.writer)
return coord, nil
}

Expand All @@ -77,6 +91,25 @@ type EtcdCoordinator struct {

// The key-value entry point for issuing requests.
kv clientv3.KV

// Channel to receive write requests.
writeChan chan request
}

// Listen and apply write requests.
// This will keep running while the application context is available.
// Receiving commands through the channel will ensure that they are
// applied synchronously to the etcd.
func (e *EtcdCoordinator) writer() {
for {
select {
case <-e.ctx.Done():
return
case req := <-e.writeChan:
_, err := e.kv.Put(req.ctx, req.event.Key, string(req.event.Value))
req.response <- err
}
}
}

// Starts a new coroutine for watching the Coordinator partition.
Expand All @@ -87,12 +120,11 @@ type EtcdCoordinator struct {
func (e *EtcdCoordinator) Watch(received chan<- Event) error {
watchChan := e.cli.Watch(e.ctx, e.configuration.Partition)
watchChanges := func() {
defer e.cancel()
for {
for response := range watchChan {
select {
case <-e.ctx.Done():
return
case response := <-watchChan:
default:
e.handleResponse(response, received)
}
}
Expand All @@ -102,9 +134,14 @@ func (e *EtcdCoordinator) Watch(received chan<- Event) error {
}

// Write the given event using the KV interface.
func (e *EtcdCoordinator) Write(ctx context.Context, event Event) error {
_, err := e.kv.Put(ctx, event.Key, string(event.Value))
return err
func (e *EtcdCoordinator) Write(ctx context.Context, event Event) <-chan error {
res := make(chan error)
e.writeChan <- request{
ctx: ctx,
event: event,
response: res,
}
return res
}

// Stop the etcd client connection.
Expand Down
35 changes: 30 additions & 5 deletions internal/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package internal
import (
"context"
"errors"
"fmt"
"io"
"time"
)

var (
Expand All @@ -20,6 +22,9 @@ type CoreConfiguration struct {

// Server address for the Coordinator.
Server string

// Default timeout to be applied when handling channels.
DefaultTimeout time.Duration
}

// Holds all flags used to manage the Core state.
Expand Down Expand Up @@ -68,6 +73,9 @@ type ReltCore struct {

// Flags for handling state.
flags CoreFlags

// Core configuration parameters.
configuration CoreConfiguration
}

// Create a new ReltCore using the given configuration.
Expand All @@ -92,15 +100,31 @@ func NewCore(configuration CoreConfiguration) (Core, error) {
finish: cancel,
handler: handler,
coord: coord,
output: make(chan Message),
output: make(chan Message, 100),
flags: CoreFlags{
shutdown: Flag{},
watching: Flag{},
},
configuration: configuration,
}
return core, nil
}

// After receiving an event from the Coordinator, the element
// should be parsed and sent back through the output channel.
func (r *ReltCore) publishMessageToListener(message Message) {
select {
case <-r.ctx.Done():
return
case r.output <- message:
return
case <-time.After(r.configuration.DefaultTimeout):
// TODO: create something to handle timed out responses.
fmt.Printf("not published %#v\n", message)
return
}
}

// The Listen method can be called only once.
// This will start a new goroutine that receives updates
// from the Coordinator and parse the information to a Message object.
Expand All @@ -112,7 +136,7 @@ func (r *ReltCore) Listen() (<-chan Message, error) {
}

if r.flags.watching.Inactivate() {
events := make(chan Event)
events := make(chan Event, 100)
if err := r.coord.Watch(events); err != nil {
return nil, err
}
Expand All @@ -123,7 +147,7 @@ func (r *ReltCore) Listen() (<-chan Message, error) {
case <-r.ctx.Done():
return
case event := <-events:
r.output <- event.toMessage()
r.publishMessageToListener(event.toMessage())
}
}
}
Expand All @@ -140,13 +164,13 @@ func (r *ReltCore) Listen() (<-chan Message, error) {
func (r *ReltCore) Send(ctx context.Context, dest string, data []byte) <-chan error {
response := make(chan error, 1)
writeRequest := func() {
defer close(response)
if r.flags.shutdown.IsActive() {
event := Event{
Key: dest,
Value: data,
}
response <- r.coord.Write(ctx, event)
err := <-r.coord.Write(ctx, event)
response <- err
} else {
response <- coreWasShutdown
}
Expand All @@ -162,6 +186,7 @@ func (r *ReltCore) Send(ctx context.Context, dest string, data []byte) <-chan er
// *This method will block until everything is finished.*
func (r *ReltCore) Close() error {
if r.flags.shutdown.Inactivate() {
defer close(r.output)
if err := r.coord.Close(); err != nil {
return err
}
Expand Down
23 changes: 16 additions & 7 deletions pkg/relt/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package relt

import (
"errors"
"time"
)

var (
Expand All @@ -11,7 +12,7 @@ var (

// Configuration used for creating a new instance
// of the Relt.
type ReltConfiguration struct {
type Configuration struct {
// The Relt name. Is not required, if empty a
// random string will be generated to be used.
// This must be unique, since it will be used to declare
Expand All @@ -31,23 +32,27 @@ type ReltConfiguration struct {
// messages. If all peers are using the same exchange then
// is the same as all peers are a single partition.
Exchange GroupAddress

// Default timeout to be applied when handling asynchronous methods.
DefaultTimeout time.Duration
}

// Creates the default configuration for the Relt.
// The peer, thus the queue name will be randomly generated,
// the connection Url will connect to a local broker using
// the user `guest` and password `guest`.
// The default exchange will fallback to `relt`.
func DefaultReltConfiguration() *ReltConfiguration {
return &ReltConfiguration{
Name: GenerateUID(),
Url: "localhost:2379",
Exchange: DefaultExchangeName,
func DefaultReltConfiguration() *Configuration {
return &Configuration{
Name: GenerateUID(),
Url: "localhost:2379",
Exchange: DefaultExchangeName,
DefaultTimeout: time.Second,
}
}

// Verify if the given Relt configuration is valid.
func (c *ReltConfiguration) ValidateConfiguration() error {
func (c *Configuration) ValidateConfiguration() error {
if len(c.Name) == 0 {
c.Name = GenerateUID()
}
Expand All @@ -64,5 +69,9 @@ func (c *ReltConfiguration) ValidateConfiguration() error {
return ErrInvalidConfiguration
}

if c.DefaultTimeout.Microseconds() <= 0 {
return ErrInvalidConfiguration
}

return nil
}
18 changes: 10 additions & 8 deletions pkg/relt/relt.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"github.com/jabolina/relt/internal"
"time"
)

var (
Expand All @@ -19,19 +18,19 @@ var (
type Relt struct {
// Holds the configuration about the core
// and the Relt transport.
configuration ReltConfiguration
configuration Configuration

// Holds the Core structure.
core internal.Core
}

// Implements the Transport interface.
func (r Relt) Consume() (<-chan internal.Message, error) {
func (r *Relt) Consume() (<-chan internal.Message, error) {
return r.core.Listen()
}

// Implements the Transport interface.
func (r Relt) Broadcast(ctx context.Context, message Send) error {
func (r *Relt) Broadcast(ctx context.Context, message Send) error {
if len(message.Address) == 0 {
return ErrInvalidGroupAddress
}
Expand All @@ -40,10 +39,12 @@ func (r Relt) Broadcast(ctx context.Context, message Send) error {
return ErrInvalidMessage
}

timeout, cancel := context.WithTimeout(ctx, time.Second)
timeout, cancel := context.WithTimeout(ctx, r.configuration.DefaultTimeout)
defer cancel()

select {
case <-ctx.Done():
return ErrContextClosed
case <-timeout.Done():
return ErrPublishTimeout
case err := <-r.core.Send(timeout, string(message.Address), message.Data):
Expand All @@ -58,10 +59,11 @@ func (r *Relt) Close() error {

// Creates a new instance of the reliable transport,
// and start all needed routines.
func NewRelt(configuration ReltConfiguration) (*Relt, error) {
func NewRelt(configuration Configuration) (*Relt, error) {
conf := internal.CoreConfiguration{
Partition: string(configuration.Exchange),
Server: configuration.Url,
Partition: string(configuration.Exchange),
Server: configuration.Url,
DefaultTimeout: configuration.DefaultTimeout,
}
core, err := internal.NewCore(conf)
if err != nil {
Expand Down
Loading

0 comments on commit 0194b13

Please sign in to comment.