Skip to content

Commit

Permalink
feat: Add subscription on publish shared lib and update components le…
Browse files Browse the repository at this point in the history
…dger/payments/webhooks (#45)
  • Loading branch information
gfyrag committed Feb 23, 2023
1 parent 8d5dd36 commit 2f4b7c0
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 142 deletions.
60 changes: 1 addition & 59 deletions cmd/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,15 @@ package cmd
import (
"crypto/tls"
"fmt"
"log"
"net/http"
"os"
"strings"

"github.com/Shopify/sarama"
"github.com/formancehq/stack/libs/go-libs/auth"
"github.com/formancehq/stack/libs/go-libs/logging"
"github.com/formancehq/stack/libs/go-libs/logging/logginglogrus"
"github.com/formancehq/stack/libs/go-libs/oauth2/oauth2introspect"
"github.com/formancehq/stack/libs/go-libs/otlp/otlptraces"
"github.com/formancehq/stack/libs/go-libs/publish"
"github.com/formancehq/stack/libs/go-libs/publish/publishhttp"
"github.com/formancehq/stack/libs/go-libs/publish/publishkafka"
"github.com/gin-contrib/cors"
"github.com/gin-gonic/gin"
"github.com/numary/ledger/cmd/internal"
Expand All @@ -31,7 +26,6 @@ import (
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"github.com/uptrace/opentelemetry-go-extra/otellogrus"
"github.com/xdg-go/scram"
"go.opentelemetry.io/otel/trace"
"go.uber.org/fx"
)
Expand Down Expand Up @@ -66,59 +60,7 @@ func NewContainer(v *viper.Viper, userOptions ...fx.Option) *fx.App {
sqlstorage.InstrumentalizeSQLDrivers()
}

topics := v.GetStringSlice(publisherTopicMappingFlag)
mapping := make(map[string]string)
for _, topic := range topics {
parts := strings.SplitN(topic, ":", 2)
if len(parts) != 2 {
panic("invalid topic flag")
}
mapping[parts[0]] = parts[1]
}

options = append(options, publish.Module(), bus.LedgerMonitorModule())
options = append(options, publish.TopicMapperPublisherModule(mapping))

switch {
case v.GetBool(publisherHttpEnabledFlag):
options = append(options, publishhttp.Module())
case v.GetBool(publisherKafkaEnabledFlag):
sarama.Logger = log.New(os.Stdout, "[Sarama] ", log.LstdFlags)
options = append(options,
publishkafka.Module(ServiceName, v.GetStringSlice(publisherKafkaBrokerFlag)...),
publishkafka.ProvideSaramaOption(
publishkafka.WithConsumerReturnErrors(),
publishkafka.WithProducerReturnSuccess(),
),
)
if v.GetBool(publisherKafkaTLSEnabled) {
options = append(options, publishkafka.ProvideSaramaOption(publishkafka.WithTLS()))
}
if v.GetBool(publisherKafkaSASLEnabled) {
options = append(options, publishkafka.ProvideSaramaOption(
publishkafka.WithSASLEnabled(),
publishkafka.WithSASLCredentials(
v.GetString(publisherKafkaSASLUsername),
v.GetString(publisherKafkaSASLPassword),
),
publishkafka.WithSASLMechanism(sarama.SASLMechanism(v.GetString(publisherKafkaSASLMechanism))),
publishkafka.WithSASLScramClient(func() sarama.SCRAMClient {
var fn scram.HashGeneratorFcn
switch v.GetInt(publisherKafkaSASLScramSHASize) {
case 512:
fn = publishkafka.SHA512
case 256:
fn = publishkafka.SHA256
default:
panic("sha size not handled")
}
return &publishkafka.XDGSCRAMClient{
HashGeneratorFcn: fn,
}
}),
))
}
}
options = append(options, publish.CLIPublisherModule(v, ServiceName), bus.LedgerMonitorModule())

// Handle OpenTelemetry
options = append(options, otlptraces.CLITracesModule(v))
Expand Down
23 changes: 2 additions & 21 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"path"

"github.com/formancehq/stack/libs/go-libs/otlp/otlptraces"
"github.com/formancehq/stack/libs/go-libs/publish"
"github.com/numary/ledger/cmd/internal"
"github.com/numary/ledger/pkg/redis"
_ "github.com/numary/ledger/pkg/storage/sqlstorage/migrates/9-add-pre-post-volumes"
Expand All @@ -31,17 +32,6 @@ const (
lockStrategyRedisTLSEnabledFlag = "lock-strategy-redis-tls-enabled"
lockStrategyRedisTLSInsecureFlag = "lock-strategy-redis-tls-insecure"

publisherKafkaEnabledFlag = "publisher-kafka-enabled"
publisherKafkaBrokerFlag = "publisher-kafka-broker"
publisherKafkaSASLEnabled = "publisher-kafka-sasl-enabled"
publisherKafkaSASLUsername = "publisher-kafka-sasl-username"
publisherKafkaSASLPassword = "publisher-kafka-sasl-password"
publisherKafkaSASLMechanism = "publisher-kafka-sasl-mechanism"
publisherKafkaSASLScramSHASize = "publisher-kafka-sasl-scram-sha-size"
publisherKafkaTLSEnabled = "publisher-kafka-tls-enabled"
publisherTopicMappingFlag = "publisher-topic-mapping"
publisherHttpEnabledFlag = "publisher-http-enabled"

authBearerEnabledFlag = "auth-bearer-enabled"
authBearerIntrospectUrlFlag = "auth-bearer-introspect-url"
authBearerAudienceFlag = "auth-bearer-audience"
Expand Down Expand Up @@ -124,16 +114,6 @@ func NewRootCommand() *cobra.Command {
root.PersistentFlags().Duration(lockStrategyRedisRetryFlag, redis.DefaultRetryInterval, "Retry lock period")
root.PersistentFlags().Bool(lockStrategyRedisTLSEnabledFlag, false, "Use tls on redis")
root.PersistentFlags().Bool(lockStrategyRedisTLSInsecureFlag, false, "Whether redis is trusted or not")
root.PersistentFlags().Bool(publisherKafkaEnabledFlag, false, "Publish write events to kafka")
root.PersistentFlags().StringSlice(publisherKafkaBrokerFlag, []string{}, "Kafka address is kafka enabled")
root.PersistentFlags().StringSlice(publisherTopicMappingFlag, []string{}, "Define mapping between internal event types and topics")
root.PersistentFlags().Bool(publisherHttpEnabledFlag, false, "Sent write event to http endpoint")
root.PersistentFlags().Bool(publisherKafkaSASLEnabled, false, "Enable SASL authentication on kafka publisher")
root.PersistentFlags().String(publisherKafkaSASLUsername, "", "SASL username")
root.PersistentFlags().String(publisherKafkaSASLPassword, "", "SASL password")
root.PersistentFlags().String(publisherKafkaSASLMechanism, "", "SASL authentication mechanism")
root.PersistentFlags().Int(publisherKafkaSASLScramSHASize, 512, "SASL SCRAM SHA size")
root.PersistentFlags().Bool(publisherKafkaTLSEnabled, false, "Enable TLS to connect on kafka")
root.PersistentFlags().Bool(authBearerEnabledFlag, false, "Enable bearer auth")
root.PersistentFlags().String(authBearerIntrospectUrlFlag, "", "OAuth2 introspect URL")
root.PersistentFlags().StringSlice(authBearerAudienceFlag, []string{}, "Allowed audiences")
Expand All @@ -148,6 +128,7 @@ func NewRootCommand() *cobra.Command {
otlptraces.InitOTLPTracesFlags(root.PersistentFlags())
internal.InitHTTPBasicFlags(root)
internal.InitAnalyticsFlags(root, DefaultSegmentWriteKey)
publish.InitCLIFlags(root)

if err = viper.BindPFlags(root.PersistentFlags()); err != nil {
panic(err)
Expand Down
39 changes: 21 additions & 18 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ go 1.18
require (
github.com/DmitriyVTitov/size v1.5.0
github.com/Masterminds/semver/v3 v3.2.0
github.com/Shopify/sarama v1.37.2
github.com/ThreeDotsLabs/watermill v1.1.1
github.com/ThreeDotsLabs/watermill v1.2.0
github.com/buger/jsonparser v1.1.1
github.com/dgraph-io/ristretto v0.1.1
github.com/formancehq/machine v1.4.5
Expand All @@ -31,16 +30,15 @@ require (
github.com/psanford/memfs v0.0.0-20210214183328-a001468d78ef
github.com/sirupsen/logrus v1.9.0
github.com/spf13/cobra v1.6.1
github.com/spf13/viper v1.14.0
github.com/spf13/viper v1.15.0
github.com/stretchr/testify v1.8.1
github.com/uptrace/opentelemetry-go-extra/otellogrus v0.1.17
github.com/xdg-go/scram v1.1.2
go.nhat.io/otelsql v0.7.0
go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin v0.36.4
go.opentelemetry.io/otel v1.11.2
go.opentelemetry.io/otel v1.12.0
go.opentelemetry.io/otel/sdk v1.11.2
go.opentelemetry.io/otel/trace v1.11.2
go.uber.org/fx v1.18.2
go.opentelemetry.io/otel/trace v1.12.0
go.uber.org/fx v1.19.1
gopkg.in/segmentio/analytics-go.v3 v3.1.0
gopkg.in/yaml.v3 v3.0.1
)
Expand All @@ -49,8 +47,10 @@ require (
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/Microsoft/go-winio v0.5.2 // indirect
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect
github.com/Shopify/sarama v1.38.1 // indirect
github.com/ThreeDotsLabs/watermill-http v1.1.4 // indirect
github.com/ThreeDotsLabs/watermill-kafka/v2 v2.2.2 // indirect
github.com/ThreeDotsLabs/watermill-nats/v2 v2.0.0 // indirect
github.com/ajg/form v1.5.1 // indirect
github.com/antlr/antlr4/runtime/Go/antlr v1.4.10 // indirect
github.com/cenkalti/backoff/v4 v4.2.0 // indirect
Expand All @@ -64,7 +64,7 @@ require (
github.com/docker/go-units v0.4.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/eapache/go-resiliency v1.3.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/felixge/httpsnoop v1.0.3 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
Expand Down Expand Up @@ -102,7 +102,7 @@ require (
github.com/jcmturner/gokrb5/v8 v8.4.3 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.15.13 // indirect
github.com/klauspost/compress v1.15.15 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
github.com/lithammer/shortuuid/v3 v3.0.7 // indirect
github.com/logrusorgru/aurora v2.0.3+incompatible // indirect
Expand All @@ -111,11 +111,13 @@ require (
github.com/moby/term v0.0.0-20220808134915-39b0c02b01ae // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/nats-io/nats.go v1.23.0 // indirect
github.com/nats-io/nkeys v0.3.0 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.2 // indirect
github.com/opencontainers/runc v1.1.3 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.0.6 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand All @@ -125,16 +127,17 @@ require (
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.4.1 // indirect
github.com/subosito/gotenv v1.4.2 // indirect
github.com/ugorji/go/codec v1.2.7 // indirect
github.com/uptrace/opentelemetry-go-extra/otelutil v0.1.17 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
github.com/xeipuuv/gojsonschema v1.2.0 // indirect
github.com/xtgo/uuid v0.0.0-20140804021211-a0b114877d4c // indirect
go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama v0.37.0 // indirect
go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama v0.38.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.37.0 // indirect
go.opentelemetry.io/contrib/propagators/b3 v1.12.0 // indirect
go.opentelemetry.io/otel/exporters/jaeger v1.11.2 // indirect
Expand All @@ -146,15 +149,15 @@ require (
go.opentelemetry.io/otel/metric v0.34.0 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/dig v1.15.0 // indirect
go.uber.org/dig v1.16.1 // indirect
go.uber.org/multierr v1.9.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/crypto v0.4.0 // indirect
golang.org/x/net v0.4.0 // indirect
golang.org/x/sys v0.4.0 // indirect
golang.org/x/text v0.5.0 // indirect
golang.org/x/crypto v0.5.0 // indirect
golang.org/x/net v0.5.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/text v0.6.0 // indirect
google.golang.org/genproto v0.0.0-20221227171554-f9683d7f8bef // indirect
google.golang.org/grpc v1.51.0 // indirect
google.golang.org/grpc v1.52.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
Loading

0 comments on commit 2f4b7c0

Please sign in to comment.