From 0194b13d55bdcacaa42c6fc73b62f2204f55a1f7 Mon Sep 17 00:00:00 2001 From: jabolina Date: Fri, 26 Feb 2021 22:02:20 -0300 Subject: [PATCH] finished etcd migration --- Makefile | 4 +- go.mod | 3 +- go.sum | 3 ++ internal/coordinator.go | 51 ++++++++++++++++--- internal/core.go | 35 +++++++++++-- pkg/relt/configuration.go | 23 ++++++--- pkg/relt/relt.go | 18 ++++--- test/helper.go | 55 ++++++++++++++++++++ test/relt_test.go | 93 +++++++++++++++++++++------------ test/synchronization_test.go | 99 ++++++++++++++++++++++++++++++++++++ 10 files changed, 321 insertions(+), 63 deletions(-) create mode 100644 test/helper.go create mode 100644 test/synchronization_test.go diff --git a/Makefile b/Makefile index 335eb69..2ceba37 100644 --- a/Makefile +++ b/Makefile @@ -14,8 +14,8 @@ endif test_rule: # @HELP execute tests @echo "executing tests" - GOTRACEBACK=all go test $(TESTARGS) -count=5 -timeout=120s -race ./test/... - GOTRACEBACK=all go test $(TESTARGS) -count=5 -timeout=120s -tags batchtest -race ./test/... + GOTRACEBACK=all go test $(TESTARGS) -count=1 -timeout=60s -race ./test/... + GOTRACEBACK=all go test $(TESTARGS) -count=1 -timeout=60s -tags batchtest -race ./test/... lint: # @HELP lint files and format if possible @echo "executing linter" diff --git a/go.mod b/go.mod index b5aa5ad..c070455 100644 --- a/go.mod +++ b/go.mod @@ -11,13 +11,14 @@ require ( github.com/dustin/go-humanize v1.0.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect - github.com/google/uuid v1.2.0 // indirect + github.com/google/uuid v1.2.0 github.com/grpc-ecosystem/go-grpc-middleware v1.2.2 // indirect github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect github.com/jonboulle/clockwork v0.2.2 // indirect github.com/prometheus/client_golang v1.9.0 // indirect github.com/prometheus/common v0.15.0 github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect + go.uber.org/goleak v1.1.10 go.uber.org/zap v1.16.0 // indirect golang.org/x/time v0.0.0-20201208040808-7e3f01d25324 // indirect google.golang.org/genproto v0.0.0-20210212180131-e7f2df4ecc2d // indirect diff --git a/go.sum b/go.sum index 71ce071..739a6af 100644 --- a/go.sum +++ b/go.sum @@ -334,6 +334,8 @@ go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0= +go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A= @@ -450,6 +452,7 @@ golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBn golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200207183749-b753a1ba74fa/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= diff --git a/internal/coordinator.go b/internal/coordinator.go index c8e56af..9131229 100644 --- a/internal/coordinator.go +++ b/internal/coordinator.go @@ -7,6 +7,18 @@ import ( "time" ) +// A single write requests to be applied to etcd. +type request struct { + // Issuer writer context. + ctx context.Context + + // Event to be sent to etcd. + event Event + + // Channel to send response back. + response chan error +} + // Configuration for the coordinator. type CoordinatorConfiguration struct { // Each Coordinator will handle only a single partition. @@ -36,7 +48,7 @@ type Coordinator interface { Watch(received chan<- Event) error // Issues an Event. - Write(ctx context.Context, event Event) error + Write(ctx context.Context, event Event) <-chan error } // Create a new Coordinator using the given configuration. @@ -57,7 +69,9 @@ func NewCoordinator(configuration CoordinatorConfiguration) (Coordinator, error) kv: kv, ctx: ctx, cancel: cancel, + writeChan: make(chan request), } + configuration.Handler.Spawn(coord.writer) return coord, nil } @@ -77,6 +91,25 @@ type EtcdCoordinator struct { // The key-value entry point for issuing requests. kv clientv3.KV + + // Channel to receive write requests. + writeChan chan request +} + +// Listen and apply write requests. +// This will keep running while the application context is available. +// Receiving commands through the channel will ensure that they are +// applied synchronously to the etcd. +func (e *EtcdCoordinator) writer() { + for { + select { + case <-e.ctx.Done(): + return + case req := <-e.writeChan: + _, err := e.kv.Put(req.ctx, req.event.Key, string(req.event.Value)) + req.response <- err + } + } } // Starts a new coroutine for watching the Coordinator partition. @@ -87,12 +120,11 @@ type EtcdCoordinator struct { func (e *EtcdCoordinator) Watch(received chan<- Event) error { watchChan := e.cli.Watch(e.ctx, e.configuration.Partition) watchChanges := func() { - defer e.cancel() - for { + for response := range watchChan { select { case <-e.ctx.Done(): return - case response := <-watchChan: + default: e.handleResponse(response, received) } } @@ -102,9 +134,14 @@ func (e *EtcdCoordinator) Watch(received chan<- Event) error { } // Write the given event using the KV interface. -func (e *EtcdCoordinator) Write(ctx context.Context, event Event) error { - _, err := e.kv.Put(ctx, event.Key, string(event.Value)) - return err +func (e *EtcdCoordinator) Write(ctx context.Context, event Event) <-chan error { + res := make(chan error) + e.writeChan <- request{ + ctx: ctx, + event: event, + response: res, + } + return res } // Stop the etcd client connection. diff --git a/internal/core.go b/internal/core.go index 59b1bb2..1ea1723 100644 --- a/internal/core.go +++ b/internal/core.go @@ -3,7 +3,9 @@ package internal import ( "context" "errors" + "fmt" "io" + "time" ) var ( @@ -20,6 +22,9 @@ type CoreConfiguration struct { // Server address for the Coordinator. Server string + + // Default timeout to be applied when handling channels. + DefaultTimeout time.Duration } // Holds all flags used to manage the Core state. @@ -68,6 +73,9 @@ type ReltCore struct { // Flags for handling state. flags CoreFlags + + // Core configuration parameters. + configuration CoreConfiguration } // Create a new ReltCore using the given configuration. @@ -92,15 +100,31 @@ func NewCore(configuration CoreConfiguration) (Core, error) { finish: cancel, handler: handler, coord: coord, - output: make(chan Message), + output: make(chan Message, 100), flags: CoreFlags{ shutdown: Flag{}, watching: Flag{}, }, + configuration: configuration, } return core, nil } +// After receiving an event from the Coordinator, the element +// should be parsed and sent back through the output channel. +func (r *ReltCore) publishMessageToListener(message Message) { + select { + case <-r.ctx.Done(): + return + case r.output <- message: + return + case <-time.After(r.configuration.DefaultTimeout): + // TODO: create something to handle timed out responses. + fmt.Printf("not published %#v\n", message) + return + } +} + // The Listen method can be called only once. // This will start a new goroutine that receives updates // from the Coordinator and parse the information to a Message object. @@ -112,7 +136,7 @@ func (r *ReltCore) Listen() (<-chan Message, error) { } if r.flags.watching.Inactivate() { - events := make(chan Event) + events := make(chan Event, 100) if err := r.coord.Watch(events); err != nil { return nil, err } @@ -123,7 +147,7 @@ func (r *ReltCore) Listen() (<-chan Message, error) { case <-r.ctx.Done(): return case event := <-events: - r.output <- event.toMessage() + r.publishMessageToListener(event.toMessage()) } } } @@ -140,13 +164,13 @@ func (r *ReltCore) Listen() (<-chan Message, error) { func (r *ReltCore) Send(ctx context.Context, dest string, data []byte) <-chan error { response := make(chan error, 1) writeRequest := func() { - defer close(response) if r.flags.shutdown.IsActive() { event := Event{ Key: dest, Value: data, } - response <- r.coord.Write(ctx, event) + err := <-r.coord.Write(ctx, event) + response <- err } else { response <- coreWasShutdown } @@ -162,6 +186,7 @@ func (r *ReltCore) Send(ctx context.Context, dest string, data []byte) <-chan er // *This method will block until everything is finished.* func (r *ReltCore) Close() error { if r.flags.shutdown.Inactivate() { + defer close(r.output) if err := r.coord.Close(); err != nil { return err } diff --git a/pkg/relt/configuration.go b/pkg/relt/configuration.go index 24f788b..964a9a3 100644 --- a/pkg/relt/configuration.go +++ b/pkg/relt/configuration.go @@ -2,6 +2,7 @@ package relt import ( "errors" + "time" ) var ( @@ -11,7 +12,7 @@ var ( // Configuration used for creating a new instance // of the Relt. -type ReltConfiguration struct { +type Configuration struct { // The Relt name. Is not required, if empty a // random string will be generated to be used. // This must be unique, since it will be used to declare @@ -31,6 +32,9 @@ type ReltConfiguration struct { // messages. If all peers are using the same exchange then // is the same as all peers are a single partition. Exchange GroupAddress + + // Default timeout to be applied when handling asynchronous methods. + DefaultTimeout time.Duration } // Creates the default configuration for the Relt. @@ -38,16 +42,17 @@ type ReltConfiguration struct { // the connection Url will connect to a local broker using // the user `guest` and password `guest`. // The default exchange will fallback to `relt`. -func DefaultReltConfiguration() *ReltConfiguration { - return &ReltConfiguration{ - Name: GenerateUID(), - Url: "localhost:2379", - Exchange: DefaultExchangeName, +func DefaultReltConfiguration() *Configuration { + return &Configuration{ + Name: GenerateUID(), + Url: "localhost:2379", + Exchange: DefaultExchangeName, + DefaultTimeout: time.Second, } } // Verify if the given Relt configuration is valid. -func (c *ReltConfiguration) ValidateConfiguration() error { +func (c *Configuration) ValidateConfiguration() error { if len(c.Name) == 0 { c.Name = GenerateUID() } @@ -64,5 +69,9 @@ func (c *ReltConfiguration) ValidateConfiguration() error { return ErrInvalidConfiguration } + if c.DefaultTimeout.Microseconds() <= 0 { + return ErrInvalidConfiguration + } + return nil } diff --git a/pkg/relt/relt.go b/pkg/relt/relt.go index ac428be..f0fa03b 100644 --- a/pkg/relt/relt.go +++ b/pkg/relt/relt.go @@ -4,7 +4,6 @@ import ( "context" "errors" "github.com/jabolina/relt/internal" - "time" ) var ( @@ -19,19 +18,19 @@ var ( type Relt struct { // Holds the configuration about the core // and the Relt transport. - configuration ReltConfiguration + configuration Configuration // Holds the Core structure. core internal.Core } // Implements the Transport interface. -func (r Relt) Consume() (<-chan internal.Message, error) { +func (r *Relt) Consume() (<-chan internal.Message, error) { return r.core.Listen() } // Implements the Transport interface. -func (r Relt) Broadcast(ctx context.Context, message Send) error { +func (r *Relt) Broadcast(ctx context.Context, message Send) error { if len(message.Address) == 0 { return ErrInvalidGroupAddress } @@ -40,10 +39,12 @@ func (r Relt) Broadcast(ctx context.Context, message Send) error { return ErrInvalidMessage } - timeout, cancel := context.WithTimeout(ctx, time.Second) + timeout, cancel := context.WithTimeout(ctx, r.configuration.DefaultTimeout) defer cancel() select { + case <-ctx.Done(): + return ErrContextClosed case <-timeout.Done(): return ErrPublishTimeout case err := <-r.core.Send(timeout, string(message.Address), message.Data): @@ -58,10 +59,11 @@ func (r *Relt) Close() error { // Creates a new instance of the reliable transport, // and start all needed routines. -func NewRelt(configuration ReltConfiguration) (*Relt, error) { +func NewRelt(configuration Configuration) (*Relt, error) { conf := internal.CoreConfiguration{ - Partition: string(configuration.Exchange), - Server: configuration.Url, + Partition: string(configuration.Exchange), + Server: configuration.Url, + DefaultTimeout: configuration.DefaultTimeout, } core, err := internal.NewCore(conf) if err != nil { diff --git a/test/helper.go b/test/helper.go new file mode 100644 index 0000000..473c63c --- /dev/null +++ b/test/helper.go @@ -0,0 +1,55 @@ +package test + +import "sync" + +type MessageHist struct { + mutex *sync.Mutex + history []string + data map[string]bool +} + +func (m *MessageHist) insert(message string) { + m.mutex.Lock() + defer m.mutex.Unlock() + m.history = append(m.history, message) + m.data[message] = true +} + +func (m *MessageHist) contains(message string) bool { + m.mutex.Lock() + defer m.mutex.Unlock() + _, ok := m.data[message] + return ok +} + +func (m *MessageHist) values() []string { + m.mutex.Lock() + defer m.mutex.Unlock() + var copied []string + copied = append(copied, m.history...) + return copied +} + +func (m *MessageHist) compare(other MessageHist) int { + m.mutex.Lock() + defer m.mutex.Unlock() + different := 0 + for i, s := range other.values() { + if len(m.history)-1 < i { + different += 1 + continue + } + + if m.history[i] != s { + different += 1 + } + } + return different +} + +func NewHistory() *MessageHist { + return &MessageHist{ + mutex: &sync.Mutex{}, + data: make(map[string]bool), + } +} diff --git a/test/relt_test.go b/test/relt_test.go index c11ab8a..65944b5 100644 --- a/test/relt_test.go +++ b/test/relt_test.go @@ -5,6 +5,8 @@ import ( "context" "fmt" "github.com/jabolina/relt/pkg/relt" + "go.uber.org/goleak" + "runtime" "sync" "testing" "time" @@ -15,6 +17,19 @@ type hist struct { data map[string]bool } +func (h *hist) validate(interval int) bool { + h.mutex.Lock() + defer h.mutex.Unlock() + for i := 0; i < interval; i++ { + key := fmt.Sprintf("%d", i) + _, ok := h.data[key] + if !ok { + return false + } + } + return true +} + func (h *hist) exists(key string) bool { h.mutex.Lock() defer h.mutex.Unlock() @@ -28,17 +43,26 @@ func (h *hist) insert(key string) { h.data[key] = true } +func PrintStackTrace(t *testing.T) { + t.Log("get stacktrace\n") + buf := make([]byte, 1<<16) + runtime.Stack(buf, true) + t.Errorf("%s", buf) +} + func TestRelt_StartAndStop(t *testing.T) { + defer goleak.VerifyNone(t) conf := relt.DefaultReltConfiguration() - relt, err := relt.NewRelt(*conf) + r, err := relt.NewRelt(*conf) if err != nil { t.Fatalf("failed connecting. %v", err) return } - relt.Close() + r.Close() } func TestRelt_PublishAndReceiveMessage(t *testing.T) { + defer goleak.VerifyNone(t) conf := relt.DefaultReltConfiguration() conf.Name = "random-test-name" r, err := relt.NewRelt(*conf) @@ -80,6 +104,7 @@ func TestRelt_PublishAndReceiveMessage(t *testing.T) { } func Test_MultipleConnections(t *testing.T) { + defer goleak.VerifyNone(t) var connections []*relt.Relt testSize := 150 for i := 0; i < testSize; i++ { @@ -112,6 +137,7 @@ func Test_MultipleConnections(t *testing.T) { } func Test_LoadPublishAndReceiveMessage(t *testing.T) { + defer goleak.VerifyNone(t) conf := relt.DefaultReltConfiguration() conf.Name = "random-test-name" r, err := relt.NewRelt(*conf) @@ -121,7 +147,7 @@ func Test_LoadPublishAndReceiveMessage(t *testing.T) { } defer r.Close() group := &sync.WaitGroup{} - testSize := 5000 + testSize := 1000 listener, err := r.Consume() if err != nil { @@ -172,6 +198,7 @@ func Test_LoadPublishAndReceiveMessage(t *testing.T) { select { case <-done: + received.validate(testSize) return case <-time.After(20 * time.Second): t.Errorf("too long to routines to finish") @@ -180,6 +207,7 @@ func Test_LoadPublishAndReceiveMessage(t *testing.T) { } func Test_LoadPublishAndReceiveMultipleConnections(t *testing.T) { + defer goleak.VerifyNone(t) conf1 := relt.DefaultReltConfiguration() conf1.Name = "random-test-name-st" first, err := relt.NewRelt(*conf1) @@ -200,21 +228,26 @@ func Test_LoadPublishAndReceiveMultipleConnections(t *testing.T) { defer second.Close() group := &sync.WaitGroup{} - testSize := 5000 + testSize := 150 listener, err := second.Consume() if err != nil { t.Fatalf("failed listening. %#v", err) } - group.Add(testSize) + done, cancel := context.WithCancel(context.TODO()) received := hist{ mutex: &sync.Mutex{}, data: make(map[string]bool), } - for i := 0; i < testSize; i++ { - read := func() { - defer group.Done() + // See that when handling multiple request concurrently, + // the relt application is bounded by how much a client can + // consume. We can only publish through a channel while the client + // can consume the channel. + // In this example we have only a single lonely consumer handling + // all messages. This is why the testSize is not an excessive value. + go func() { + for { select { case recv := <-listener: if recv.Data == nil || len(recv.Data) == 0 { @@ -224,42 +257,36 @@ func Test_LoadPublishAndReceiveMultipleConnections(t *testing.T) { if recv.Error != nil { t.Errorf("error on consumed response. %v", recv.Error) } + key := string(recv.Data) if received.exists(key) { t.Errorf("data %s already received", key) } received.insert(key) - return - case <-time.After(500 * time.Millisecond): - t.Errorf("timed out receiving") + case <-done.Done(): return } } + }() - data := []byte(fmt.Sprintf("%d", i)) - go read() - - err = first.Broadcast(context.TODO(), relt.Send{ - Address: conf2.Exchange, - Data: data, - }) + group.Add(testSize) + for i := 0; i < testSize; i++ { + write := func(data []byte) { + defer group.Done() + err := first.Broadcast(done, relt.Send{ + Address: conf2.Exchange, + Data: data, + }) - if err != nil { - t.Errorf("failed broadcasting. %v", err) + if err != nil { + t.Errorf("failed broadcasting. %v", err) + } } + go write([]byte(fmt.Sprintf("%d", i))) } - done := make(chan bool) - go func() { - group.Wait() - done <- true - }() - - select { - case <-done: - return - case <-time.After(20 * time.Second): - t.Errorf("too long to routines to finish") - return - } + group.Wait() + time.Sleep(time.Second) + cancel() + received.validate(testSize) } diff --git a/test/synchronization_test.go b/test/synchronization_test.go new file mode 100644 index 0000000..d3399b0 --- /dev/null +++ b/test/synchronization_test.go @@ -0,0 +1,99 @@ +package test + +import ( + "context" + "fmt" + "github.com/google/uuid" + "github.com/jabolina/relt/pkg/relt" + "go.uber.org/goleak" + "sync" + "testing" + "time" +) + +// This test should validate that both replicas will be synchronized after +// a sequence of concurrent writes. At the end, both replicas should have +// the same message history, applied in the same order. +func Test_ReplicasShouldReceiveSameOrder(t *testing.T) { + defer goleak.VerifyNone(t) + partition := "synchronized-replicas-" + uuid.New().String() + testSize := 100 + + conf1 := relt.DefaultReltConfiguration() + conf1.Name = partition + "-first" + conf1.Exchange = relt.GroupAddress(partition) + first, err := relt.NewRelt(*conf1) + if err != nil { + t.Fatalf("failed connecting. %v", err) + return + } + defer first.Close() + + conf2 := relt.DefaultReltConfiguration() + conf2.Name = partition + "-second" + conf2.Exchange = relt.GroupAddress(partition) + second, err := relt.NewRelt(*conf2) + if err != nil { + t.Fatalf("failed connecting. %v", err) + return + } + defer second.Close() + + group := &sync.WaitGroup{} + ctx, cancel := context.WithCancel(context.TODO()) + firstHistory := NewHistory() + secondHistory := NewHistory() + + initialize := func(r *relt.Relt, history *MessageHist) { + listener, err := r.Consume() + if err != nil { + t.Fatalf("failed starting consumer. %#v", err) + } + + go func() { + for { + select { + case recv := <-listener: + if recv.Data == nil || len(recv.Data) == 0 { + t.Errorf("received wrong data") + } + + if recv.Error != nil { + t.Errorf("error on consumed response. %v", recv.Error) + } + + history.insert(string(recv.Data)) + case <-ctx.Done(): + return + } + } + }() + } + + initialize(first, firstHistory) + initialize(second, secondHistory) + + group.Add(testSize) + for i := 0; i < testSize; i++ { + write := func(data []byte) { + defer group.Done() + err := first.Broadcast(ctx, relt.Send{ + Address: conf2.Exchange, + Data: data, + }) + + if err != nil { + t.Errorf("failed broadcasting. %v", err) + } + } + go write([]byte(fmt.Sprintf("%d", i))) + } + + group.Wait() + time.Sleep(time.Second) + cancel() + diff := firstHistory.compare(*secondHistory) + if diff > 0 { + t.Fatalf("message history do not match. %d messages do not match", diff) + } +}