Skip to content

Commit

Permalink
feat: add temporality
Browse files Browse the repository at this point in the history
refactor so many things for this feature...
  • Loading branch information
gfyrag committed Jul 13, 2023
1 parent ec15ffc commit 23a673a
Show file tree
Hide file tree
Showing 134 changed files with 4,474 additions and 4,512 deletions.
3 changes: 3 additions & 0 deletions components/fctl/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ require (
atomicgo.dev/keyboard v0.2.9 // indirect
github.com/cenkalti/backoff/v4 v4.2.0 // indirect
github.com/containerd/console v1.0.4-0.20230313162750-1ae8d489ac81 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fatih/color v1.14.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/uuid v1.3.0 // indirect
Expand All @@ -47,11 +48,13 @@ require (
github.com/mattn/go-tty v0.0.4 // indirect
github.com/muhlemmer/gu v0.3.1 // indirect
github.com/pkg/term v1.2.0-beta.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rivo/uniseg v0.4.4 // indirect
github.com/segmentio/backo-go v1.0.1 // indirect
github.com/sergi/go-diff v1.3.1 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/testify v1.8.4 // indirect
github.com/talal/go-bits v0.0.0-20200204154716-071e9f3e66e1 // indirect
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect
golang.org/x/crypto v0.10.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions components/fctl/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/talal/go-bits v0.0.0-20200204154716-071e9f3e66e1 h1:sDCJkxkdgPIDUKcOemdIsP2AnUjtLTVSlUDkAnf3fB4=
github.com/talal/go-bits v0.0.0-20200204154716-071e9f3e66e1/go.mod h1:IaWL8TVo0gKkfVx+4RbcRkzp6FoeMqEtD88+5aCRwyY=
github.com/urfave/cli v1.22.5/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
Expand Down
4 changes: 3 additions & 1 deletion components/ledger/Taskfile.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,19 @@ tasks:
cmds:
- mkdir -p {{.BENCH_RESULTS_DIR}}
- >
go test -run BenchmarkParallelWrites -bench=.
go test -run BenchmarkParallelWrites -bench=. {{if eq .VERBOSE "true"}}-v{{end}}
-test.benchmem
-timeout 1h
-memprofile {{.BENCH_RESULTS_DIR}}/{{.BRANCH}}-memprofile-{{if eq .ASYNC "true"}}async{{else}}sync{{end}}.out
-cpuprofile {{.BENCH_RESULTS_DIR}}/{{.BRANCH}}-profile-{{if eq .ASYNC "true"}}async{{else}}sync{{end}}.out
-benchtime={{if .DURATION}}{{.DURATION}}{{else}}15s{{end}}
{{if eq .RACE "true"}}-race{{end}}
-count={{if .COUNT}}{{.COUNT}}{{else}}10{{end}} ./benchmarks | tee {{.BENCH_RESULTS_DIR}}/{{.BRANCH}}-{{if eq .ASYNC "true"}}async{{else}}sync{{end}}.stats
env:
ASYNC: "{{.ASYNC}}"
GOMEMLIMIT: 1GiB
GOMAXPROCS: 2
VERBOSE: false
# GOGC: "1000" # https://dave.cheney.net/tag/gogc
CGO_ENABLED: 0
# GODEBUG: gctrace=1 #,gcpacertrace=1
Expand Down
110 changes: 85 additions & 25 deletions components/ledger/benchmarks/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package benchmarks

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"os"
"runtime"
"sync"
"testing"
"time"

Expand All @@ -18,72 +18,132 @@ import (
"github.com/formancehq/ledger/pkg/core"
"github.com/formancehq/ledger/pkg/ledger"
"github.com/formancehq/ledger/pkg/opentelemetry/metrics"
"github.com/formancehq/ledger/pkg/storage/sqlstoragetesting"
"github.com/formancehq/ledger/pkg/storage/storagetesting"
"github.com/formancehq/stack/libs/go-libs/api"
"github.com/formancehq/stack/libs/go-libs/logging"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)

func BenchmarkParallelWrites(b *testing.B) {

driver := sqlstoragetesting.StorageDriver(b)
ctx := logging.TestingContext()

driver := storagetesting.StorageDriver(b)
resolver := ledger.NewResolver(driver)
b.Cleanup(func() {
require.NoError(b, resolver.CloseLedgers(context.Background()))
require.NoError(b, resolver.CloseLedgers(ctx))
})

ledgerName := uuid.NewString()

backend := controllers.NewDefaultBackend(driver, "latest", resolver)
router := routes.NewRouter(backend, nil, nil, metrics.NewNoOpMetricsRegistry())
srv := httptest.NewServer(router)
defer srv.Close()
router := routes.NewRouter(backend, nil, nil, metrics.NewNoOpRegistry())
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := logging.ContextWithLogger(r.Context(), logging.FromContext(ctx))
router.ServeHTTP(w, r.WithContext(ctx))
})

totalDuration := atomic.Int64{}
b.SetParallelism(1000)
runtime.GC()
b.ResetTimer()
startOfBench := time.Now()
counter := atomic.NewInt64(0)
longestTxLock := sync.Mutex{}
longestTransactionID := uint64(0)
longestTransactionDuration := time.Duration(0)
b.RunParallel(func(pb *testing.PB) {
buf := bytes.NewBufferString("")
for pb.Next() {
buf.Reset()
id := counter.Add(1)

err := json.NewEncoder(buf).Encode(controllers.PostTransactionRequest{
Script: controllers.Script{
Script: core.Script{
Plain: `
vars {
account $account
}
send [USD/2 100] (
source = @world
destination = $account
)`,
},
Vars: map[string]any{
"account": fmt.Sprintf("accounts:%d", counter.Add(1)),
},
//script := controllers.Script{
// Script: core.Script{
// Plain: fmt.Sprintf(`
// vars {
// account $account
// }
//
// send [USD/2 100] (
// source = @world:%d allowing unbounded overdraft
// destination = $account
// )`, counter.Load()%100),
// },
// Vars: map[string]any{
// "account": fmt.Sprintf("accounts:%d", counter.Add(1)),
// },
//}

script := controllers.Script{
Script: core.Script{
Plain: `vars {
account $account
}
send [USD/2 100] (
source = @world
destination = $account
)`,
},
Vars: map[string]any{
"account": fmt.Sprintf("accounts:%d", id),
},
}

// script := controllers.Script{
// Script: core.Script{
// Plain: `vars {
// account $account
// account $src
//}
//
//send [USD/2 100] (
// source = $src allowing unbounded overdraft
// destination = $account
//)`,
// },
// Vars: map[string]any{
// "src": fmt.Sprintf("world:%d", id),
// "account": fmt.Sprintf("accounts:%d", id),
// },
// }

err := json.NewEncoder(buf).Encode(controllers.PostTransactionRequest{
Script: script,
})
require.NoError(b, err)

//ctx, _ := context.WithDeadline(ctx, time.Now().Add(10*time.Second))

req := httptest.NewRequest("POST", "/"+ledgerName+"/transactions", buf)
req = req.WithContext(ctx)
req.URL.RawQuery = url.Values{
"async": []string{os.Getenv("ASYNC")},
}.Encode()
rsp := httptest.NewRecorder()

now := time.Now()
router.ServeHTTP(rsp, req)
totalDuration.Add(time.Since(now).Milliseconds())
handler.ServeHTTP(rsp, req)
latency := time.Since(now).Milliseconds()
totalDuration.Add(latency)

require.Equal(b, http.StatusOK, rsp.Code)
tx, _ := api.DecodeSingleResponse[core.Transaction](b, rsp.Body)

longestTxLock.Lock()
if time.Millisecond*time.Duration(latency) > longestTransactionDuration {
longestTransactionID = tx.ID
longestTransactionDuration = time.Duration(latency) * time.Millisecond
}
longestTxLock.Unlock()
}
})

b.StopTimer()
b.Logf("Longest transaction: %d (%s)", longestTransactionID, longestTransactionDuration.String())
b.ReportMetric((float64(time.Duration(b.N))/float64(time.Since(startOfBench)))*float64(time.Second), "t/s")
b.ReportMetric(float64(totalDuration.Load()/int64(b.N)), "ms/transaction")
runtime.GC()
Expand Down
1 change: 1 addition & 0 deletions components/ledger/benchmarks/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
)

func TestMain(m *testing.M) {

if err := pgtesting.CreatePostgresServer(pgtesting.WithDockerHostConfigOption(func(hostConfig *docker.HostConfig) {
hostConfig.CPUCount = 2
})); err != nil {
Expand Down
9 changes: 3 additions & 6 deletions components/ledger/cmd/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/formancehq/ledger/pkg/api"
"github.com/formancehq/ledger/pkg/bus"
"github.com/formancehq/ledger/pkg/ledger"
"github.com/formancehq/ledger/pkg/storage"
"github.com/formancehq/ledger/pkg/storage/driver"
"github.com/formancehq/stack/libs/go-libs/otlp/otlpmetrics"
"github.com/formancehq/stack/libs/go-libs/otlp/otlptraces"
"github.com/formancehq/stack/libs/go-libs/publish"
Expand All @@ -25,7 +25,7 @@ func resolveOptions(output io.Writer, userOptions ...fx.Option) []fx.Option {
v := viper.GetViper()
debug := v.GetBool(service.DebugFlag)
if debug {
storage.InstrumentalizeSQLDriver()
driver.InstrumentalizeSQLDriver()
}

options = append(options,
Expand All @@ -36,15 +36,12 @@ func resolveOptions(output io.Writer, userOptions ...fx.Option) []fx.Option {
api.Module(api.Config{
Version: Version,
}),
storage.CLIDriverModule(v, output, debug),
driver.CLIModule(v, output, debug),
internal.NewAnalyticsModule(v, Version),
ledger.Module(ledger.Configuration{
NumscriptCache: ledger.NumscriptCacheConfiguration{
MaxCount: v.GetInt(numscriptCacheMaxCount),
},
Query: ledger.QueryConfiguration{
LimitReadLogs: v.GetInt(queryLimitReadLogsFlag),
},
}),
)

Expand Down
4 changes: 2 additions & 2 deletions components/ledger/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"os"

"github.com/formancehq/ledger/cmd/internal"
"github.com/formancehq/ledger/pkg/storage"
"github.com/formancehq/ledger/pkg/storage/driver"
initschema "github.com/formancehq/ledger/pkg/storage/ledgerstore/migrates/0-init-schema"
"github.com/formancehq/stack/libs/go-libs/otlp/otlpmetrics"
"github.com/formancehq/stack/libs/go-libs/otlp/otlptraces"
Expand Down Expand Up @@ -61,7 +61,7 @@ func NewRootCommand() *cobra.Command {
otlptraces.InitOTLPTracesFlags(root.PersistentFlags())
internal.InitAnalyticsFlags(root, DefaultSegmentWriteKey)
publish.InitCLIFlags(root)
storage.InitCLIFlags(root)
driver.InitCLIFlags(root)
initschema.InitMigrationConfigCLIFlags(root.PersistentFlags())

if err := viper.BindPFlags(root.PersistentFlags()); err != nil {
Expand Down
2 changes: 0 additions & 2 deletions components/ledger/cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
)

const (
queryLimitReadLogsFlag = "query-limit-read-logs"
ballastSizeInBytesFlag = "ballast-size"
numscriptCacheMaxCount = "numscript-cache-max-count"
)
Expand All @@ -38,7 +37,6 @@ func NewServe() *cobra.Command {
)...).Run(cmd.Context())
},
}
cmd.Flags().Int(queryLimitReadLogsFlag, 10000, "Query limit read logs")
cmd.Flags().Uint(ballastSizeInBytesFlag, 0, "Ballast size in bytes, default to 0")
cmd.Flags().Int(numscriptCacheMaxCount, 1024, "Numscript cache max count")
return cmd
Expand Down
10 changes: 5 additions & 5 deletions components/ledger/cmd/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"errors"

"github.com/formancehq/ledger/pkg/storage"
"github.com/formancehq/ledger/pkg/storage/driver"
"github.com/formancehq/stack/libs/go-libs/logging"
"github.com/formancehq/stack/libs/go-libs/service"
"github.com/spf13/cobra"
Expand All @@ -26,7 +26,7 @@ func NewStorageInit() *cobra.Command {
cmd.OutOrStdout(),
resolveOptions(
cmd.OutOrStdout(),
fx.Invoke(func(storageDriver *storage.Driver, lc fx.Lifecycle) {
fx.Invoke(func(storageDriver *driver.Driver, lc fx.Lifecycle) {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
name := viper.GetString("name")
Expand Down Expand Up @@ -71,7 +71,7 @@ func NewStorageList() *cobra.Command {
app := service.New(cmd.OutOrStdout(),
resolveOptions(
cmd.OutOrStdout(),
fx.Invoke(func(storageDriver *storage.Driver, lc fx.Lifecycle) {
fx.Invoke(func(storageDriver *driver.Driver, lc fx.Lifecycle) {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
ledgers, err := storageDriver.GetSystemStore().ListLedgers(ctx)
Expand Down Expand Up @@ -103,7 +103,7 @@ func NewStorageUpgrade() *cobra.Command {
app := service.New(cmd.OutOrStdout(),
resolveOptions(
cmd.OutOrStdout(),
fx.Invoke(func(storageDriver *storage.Driver, lc fx.Lifecycle) {
fx.Invoke(func(storageDriver *driver.Driver, lc fx.Lifecycle) {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
name := args[0]
Expand Down Expand Up @@ -140,7 +140,7 @@ func NewStorageDelete() *cobra.Command {
cmd.OutOrStdout(),
resolveOptions(
cmd.OutOrStdout(),
fx.Invoke(func(storageDriver *storage.Driver, lc fx.Lifecycle) {
fx.Invoke(func(storageDriver *driver.Driver, lc fx.Lifecycle) {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
name := args[0]
Expand Down
5 changes: 4 additions & 1 deletion components/ledger/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ go 1.19
require (
github.com/Masterminds/semver/v3 v3.2.0
github.com/ThreeDotsLabs/watermill v1.2.0
github.com/alitto/pond v1.8.3
github.com/antlr/antlr4/runtime/Go/antlr v1.4.10
github.com/bluele/gcache v0.0.2
github.com/formancehq/stack/libs/go-libs v0.0.0-20230517212829-71aaaacfd130
github.com/go-chi/chi/v5 v5.0.8
github.com/go-chi/cors v1.2.1
github.com/golang/mock v1.4.4
github.com/google/go-cmp v0.5.9
github.com/google/uuid v1.3.0
github.com/jackc/pgx/v5 v5.3.0
github.com/lib/pq v1.10.7
Expand All @@ -30,13 +32,13 @@ require (
github.com/uptrace/bun/dialect/pgdialect v1.1.12
github.com/uptrace/bun/extra/bunbig v1.1.12
github.com/uptrace/bun/extra/bundebug v1.1.12
github.com/uptrace/bun/extra/bunotel v1.1.12
go.nhat.io/otelsql v0.9.0
go.opentelemetry.io/otel v1.14.0
go.opentelemetry.io/otel/metric v0.37.0
go.opentelemetry.io/otel/trace v1.14.0
go.uber.org/atomic v1.10.0
go.uber.org/fx v1.19.2
golang.org/x/sync v0.1.0
gopkg.in/segmentio/analytics-go.v3 v3.1.0
)

Expand Down Expand Up @@ -116,6 +118,7 @@ require (
github.com/tklauser/numcpus v0.6.0 // indirect
github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc // indirect
github.com/uptrace/opentelemetry-go-extra/otellogrus v0.1.21 // indirect
github.com/uptrace/opentelemetry-go-extra/otelsql v0.1.21 // indirect
github.com/uptrace/opentelemetry-go-extra/otelutil v0.1.21 // indirect
github.com/vmihailenco/msgpack/v5 v5.3.5 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
Expand Down
Loading

0 comments on commit 23a673a

Please sign in to comment.