Skip to content

Commit

Permalink
Merge pull request #63 from igmagollo/feature/add-outbox-storer
Browse files Browse the repository at this point in the history
feat(outbox): add outbox storer with observability instrumented
  • Loading branch information
TheRafaBonin authored Dec 12, 2023
2 parents d476dcb + d169151 commit 3b2aa5d
Show file tree
Hide file tree
Showing 36 changed files with 5,164 additions and 34 deletions.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
test:
test-example:
@go run github.com/onsi/ginkgo/v2/ginkgo -v ./example/testing

test:
@go run github.com/onsi/ginkgo/v2/ginkgo -v -p --race ./tests
36 changes: 25 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,38 @@ module github.com/gothunder/thunder
go 1.19

require (
entgo.io/ent v0.12.4
github.com/99designs/gqlgen v0.17.24
github.com/TheRafaBonin/roxy v0.6.1
github.com/cenkalti/backoff/v4 v4.2.0
github.com/go-chi/chi/v5 v5.0.8
github.com/google/uuid v1.3.0
github.com/mattn/go-sqlite3 v1.14.16
github.com/onsi/ginkgo/v2 v2.8.0
github.com/onsi/gomega v1.25.0
github.com/rabbitmq/amqp091-go v1.7.0
github.com/rotisserie/eris v0.5.4
github.com/rs/zerolog v1.29.0
github.com/stretchr/testify v1.8.1
github.com/stretchr/testify v1.8.4
github.com/vektah/gqlparser/v2 v2.5.1
github.com/vektra/mockery/v2 v2.20.0
github.com/vmihailenco/msgpack/v5 v5.3.5
go.opentelemetry.io/otel v1.21.0
go.opentelemetry.io/otel/trace v1.21.0
go.uber.org/fx v1.19.1
)

require (
ariga.io/atlas v0.14.1-0.20230918065911-83ad451a4935 // indirect
github.com/agext/levenshtein v1.2.1 // indirect
github.com/apparentlymart/go-textseg/v13 v13.0.0 // indirect
github.com/chigopher/pathlib v0.12.0 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-openapi/inflect v0.19.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/hashicorp/hcl/v2 v2.13.0 // indirect
github.com/mitchellh/go-wordwrap v0.0.0-20150314170334-ad45545899c7 // indirect
github.com/zclconf/go-cty v1.8.0 // indirect
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect
google.golang.org/protobuf v1.30.0 // indirect
)
Expand All @@ -30,14 +43,14 @@ require (
github.com/agnivade/levenshtein v1.1.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/logr v1.3.0 // indirect
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/magiconair/properties v1.8.6 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.16 // indirect
Expand All @@ -49,24 +62,25 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/spf13/afero v1.8.2 // indirect
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/cobra v1.4.0 // indirect
github.com/spf13/cobra v1.7.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.12.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/subosito/gotenv v1.4.0 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
go.opentelemetry.io/otel/metric v1.21.0
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/dig v1.16.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.23.0 // indirect
golang.org/x/crypto v0.1.0 // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/term v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
golang.org/x/tools v0.6.0 // indirect
golang.org/x/mod v0.10.0 // indirect
golang.org/x/net v0.9.0 // indirect
golang.org/x/sys v0.7.0 // indirect
golang.org/x/term v0.7.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/tools v0.8.1-0.20230428195545-5283a0178901 // indirect
google.golang.org/grpc v1.55.0
gopkg.in/ini.v1 v1.66.6 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
80 changes: 58 additions & 22 deletions go.sum

Large diffs are not rendered by default.

171 changes: 171 additions & 0 deletions internal/events/outbox/ent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package outbox

import (
"context"
"errors"
"reflect"

"github.com/TheRafaBonin/roxy"
"github.com/gothunder/thunder/internal/utils"
)

const (
SetTopic = "SetTopic"
SetHeaders = "SetHeaders"
SetPayload = "SetPayload"
Exec = "Exec"

Create = "Create"
CreateBulk = "CreateBulk"
)

var (
ErrNilClient = errors.New("nil OutboxMessageClient")
)

type MessageClient interface {
Create() MessageCreator
CreateBulk(messageCreators ...MessageCreator) MessageBulkCreator
}

type MessageCreator interface {
SetTopic(topic string) MessageCreator
SetPayload(payload []byte) MessageCreator
SetHeaders(headers map[string]string) MessageCreator
Exec(ctx context.Context) error
Unwrap() interface{}
}

type MessageBulkCreator interface {
Exec(ctx context.Context) error
}

type outboxMessageCreateWrapper struct {
outboxMessageCreate interface{}
}

func (omcw *outboxMessageCreateWrapper) SetHeaders(headers map[string]string) MessageCreator {
_, err := utils.SafeCallMethod(omcw.outboxMessageCreate, SetHeaders, []reflect.Value{
reflect.ValueOf(headers),
})
if err != nil {
panic(err)
}
return omcw
}

func (omcw *outboxMessageCreateWrapper) SetPayload(payload []byte) MessageCreator {
_, err := utils.SafeCallMethod(omcw.outboxMessageCreate, SetPayload, []reflect.Value{
reflect.ValueOf(payload),
})
if err != nil {
panic(err)
}
return omcw
}

func (omcw *outboxMessageCreateWrapper) SetTopic(topic string) MessageCreator {
_, err := utils.SafeCallMethod(omcw.outboxMessageCreate, SetTopic, []reflect.Value{
reflect.ValueOf(topic),
})
if err != nil {
panic(err)
}
return omcw
}

func (omcw *outboxMessageCreateWrapper) Exec(ctx context.Context) error {
results, err := utils.SafeCallMethod(omcw.outboxMessageCreate, Exec, []reflect.Value{
reflect.ValueOf(ctx),
})
if err != nil {
return err
}

return results[0].Interface().(error)
}

func (omcw *outboxMessageCreateWrapper) Unwrap() interface{} {
return omcw.outboxMessageCreate
}

func WrapOutboxMessageCreate(omc interface{}) (MessageCreator, error) {
handleError := func(methodName string) (MessageCreator, error) {
return nil, roxy.Wrap(utils.ErrMethodNotFound, methodName)
}

methods := []string{SetTopic, SetHeaders, SetPayload, Exec}
for _, method := range methods {
if !utils.HasMethod(omc, method) {
return handleError(method)
}
}

return &outboxMessageCreateWrapper{outboxMessageCreate: omc}, nil
}

type MessageBuilderInterface[T any] interface {
SetHeaders(headers map[string]string) T
SetPayload(payload []byte) T
SetTopic(topic string) T
Exec(ctx context.Context) error
}

type outboxMessageClientWrapper struct {
OutboxMessageClient interface{}
}

func (c *outboxMessageClientWrapper) Create() MessageCreator {
results, err := utils.SafeCallMethod(c.OutboxMessageClient, Create, []reflect.Value{})
if err != nil {
panic(err)
}
creator, _ := WrapOutboxMessageCreate(results[0].Interface())
return creator
}

func (c *outboxMessageClientWrapper) CreateBulk(messageCreators ...MessageCreator) MessageBulkCreator {
creators := make([]reflect.Value, len(messageCreators))
for i, mc := range messageCreators {
creators[i] = reflect.ValueOf(mc.Unwrap())
}

results, err := utils.SafeCallMethod(c.OutboxMessageClient, CreateBulk, creators)
if err != nil {
panic(err)
}

return results[0].Interface().(MessageBulkCreator)
}

func WrapOutboxMessageClient(client interface{}) (MessageClient, error) {
if err := validateOutboxMessageClient(client); err != nil {
return nil, roxy.Wrap(err, "validating client")
}

return &outboxMessageClientWrapper{OutboxMessageClient: client}, nil
}

func validateOutboxMessageClient(client interface{}) error {
if client == nil {
return ErrNilClient
}

handleError := func(methodName string) error {
return roxy.Wrap(utils.ErrMethodNotFound, methodName)
}

methods := []string{Create, CreateBulk}
for _, method := range methods {
if !utils.HasMethod(client, method) {
return handleError(method)
}
}

return nil
}

type OutboxMessageClientInterface[T MessageBuilderInterface[T], U MessageBulkCreator] interface {
Create() T
CreateBulk(messageCreators ...T) U
}
33 changes: 33 additions & 0 deletions internal/events/outbox/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package outbox

import (
"errors"
)

var (
ErrEmptyTopic = errors.New("empty topic")
ErrEmptyPayload = errors.New("empty payload")
)

type Message struct {
Topic string
Payload []byte
Headers map[string]string
}

func (m Message) BuildEntMessage(creator MessageCreator) MessageCreator {
return creator.
SetTopic(m.Topic).
SetPayload(m.Payload).
SetHeaders(m.Headers)
}

func (m Message) Validate() error {
if m.Topic == "" {
return ErrEmptyTopic
}
if m.Payload == nil || len(m.Payload) == 0 {
return ErrEmptyPayload
}
return nil
}
5 changes: 5 additions & 0 deletions internal/events/outbox/relayer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package outbox

type Relayer interface {
Start()
}
89 changes: 89 additions & 0 deletions internal/events/outbox/storer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package outbox

import (
"context"
"errors"

"github.com/TheRafaBonin/roxy"
)

var (
ErrNoMessages = errors.New("no messages")
)

type Storer interface {
Store(ctx context.Context, txOutboxMessageClient interface{}, messages []Message) error
WithTxClient(txOutboxMessageClient interface{}) (TransactionalStorer, error)
}

type TransactionalStorer interface {
Store(ctx context.Context, messages []Message) error
}

type storer struct{}

// Store implements Storer.
func (s storer) Store(ctx context.Context, txOutboxMessageClient interface{}, messages []Message) error {
if err := validateMessages(messages); err != nil {
return roxy.Wrap(err, "validating messages")
}

txClient, err := WrapOutboxMessageClient(txOutboxMessageClient)
if err != nil {
return roxy.Wrap(err, "wrapping tx client")
}

entMessages := make([]MessageCreator, len(messages))
for i, msg := range messages {
entMessages[i] = msg.BuildEntMessage(txClient.Create())
}

err = txClient.CreateBulk(entMessages...).Exec(ctx)
if err != nil {
return roxy.Wrap(err, "creating messages")
}

return nil
}

// WithTxClient implements Storer.
func (s *storer) WithTxClient(txOutboxMessageClient interface{}) (TransactionalStorer, error) {
return newTransactionalStorer(s, txOutboxMessageClient)
}

func NewStorer() Storer {
return &storer{}
}

type transactionalStorer struct {
storer Storer
txClient interface{}
}

// Store implements TransactionalStorer.
func (t transactionalStorer) Store(ctx context.Context, messages []Message) error {
return t.storer.Store(ctx, t.txClient, messages)
}

func newTransactionalStorer(storer Storer, txOutboxMessageClient interface{}) (TransactionalStorer, error) {
if err := validateOutboxMessageClient(txOutboxMessageClient); err != nil {
return nil, roxy.Wrap(err, "validating tx client")
}

return &transactionalStorer{
storer: storer,
txClient: txOutboxMessageClient,
}, nil
}

func validateMessages(messages []Message) error {
if len(messages) == 0 {
return ErrNoMessages
}

errs := make([]error, len(messages))
for i, msg := range messages {
errs[i] = msg.Validate()
}
return errors.Join(errs...)
}
Loading

0 comments on commit 3b2aa5d

Please sign in to comment.