Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add temporality #428

Merged
merged 7 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions components/fctl/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ require (
atomicgo.dev/keyboard v0.2.9 // indirect
github.com/cenkalti/backoff/v4 v4.2.0 // indirect
github.com/containerd/console v1.0.4-0.20230313162750-1ae8d489ac81 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fatih/color v1.14.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/uuid v1.3.0 // indirect
Expand All @@ -47,11 +48,13 @@ require (
github.com/mattn/go-tty v0.0.4 // indirect
github.com/muhlemmer/gu v0.3.1 // indirect
github.com/pkg/term v1.2.0-beta.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rivo/uniseg v0.4.4 // indirect
github.com/segmentio/backo-go v1.0.1 // indirect
github.com/sergi/go-diff v1.3.1 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/testify v1.8.4 // indirect
github.com/talal/go-bits v0.0.0-20200204154716-071e9f3e66e1 // indirect
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect
golang.org/x/crypto v0.10.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions components/fctl/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/talal/go-bits v0.0.0-20200204154716-071e9f3e66e1 h1:sDCJkxkdgPIDUKcOemdIsP2AnUjtLTVSlUDkAnf3fB4=
github.com/talal/go-bits v0.0.0-20200204154716-071e9f3e66e1/go.mod h1:IaWL8TVo0gKkfVx+4RbcRkzp6FoeMqEtD88+5aCRwyY=
github.com/urfave/cli v1.22.5/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
Expand Down
4 changes: 3 additions & 1 deletion components/ledger/Taskfile.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,19 @@ tasks:
cmds:
- mkdir -p {{.BENCH_RESULTS_DIR}}
- >
go test -run BenchmarkParallelWrites -bench=.
go test -run BenchmarkParallelWrites -bench=. {{if eq .VERBOSE "true"}}-v{{end}}
-test.benchmem
-timeout 1h
-memprofile {{.BENCH_RESULTS_DIR}}/{{.BRANCH}}-memprofile-{{if eq .ASYNC "true"}}async{{else}}sync{{end}}.out
-cpuprofile {{.BENCH_RESULTS_DIR}}/{{.BRANCH}}-profile-{{if eq .ASYNC "true"}}async{{else}}sync{{end}}.out
-benchtime={{if .DURATION}}{{.DURATION}}{{else}}15s{{end}}
{{if eq .RACE "true"}}-race{{end}}
-count={{if .COUNT}}{{.COUNT}}{{else}}10{{end}} ./benchmarks | tee {{.BENCH_RESULTS_DIR}}/{{.BRANCH}}-{{if eq .ASYNC "true"}}async{{else}}sync{{end}}.stats
env:
ASYNC: "{{.ASYNC}}"
GOMEMLIMIT: 1GiB
GOMAXPROCS: 2
VERBOSE: false
# GOGC: "1000" # https://dave.cheney.net/tag/gogc
CGO_ENABLED: 0
# GODEBUG: gctrace=1 #,gcpacertrace=1
Expand Down
112 changes: 86 additions & 26 deletions components/ledger/benchmarks/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package benchmarks

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"os"
"runtime"
"sync"
"testing"
"time"

Expand All @@ -18,72 +18,132 @@ import (
"github.com/formancehq/ledger/pkg/core"
"github.com/formancehq/ledger/pkg/ledger"
"github.com/formancehq/ledger/pkg/opentelemetry/metrics"
"github.com/formancehq/ledger/pkg/storage/sqlstoragetesting"
"github.com/formancehq/ledger/pkg/storage/storagetesting"
"github.com/formancehq/stack/libs/go-libs/api"
"github.com/formancehq/stack/libs/go-libs/logging"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)

func BenchmarkParallelWrites(b *testing.B) {

driver := sqlstoragetesting.StorageDriver(b)
resolver := ledger.NewResolver(driver)
ctx := logging.TestingContext()

driver := storagetesting.StorageDriver(b)
resolver := ledger.NewResolver(driver, ledger.WithLogger(logging.FromContext(ctx)))
b.Cleanup(func() {
require.NoError(b, resolver.CloseLedgers(context.Background()))
require.NoError(b, resolver.CloseLedgers(ctx))
})

ledgerName := uuid.NewString()

backend := controllers.NewDefaultBackend(driver, "latest", resolver)
router := routes.NewRouter(backend, nil, nil, metrics.NewNoOpMetricsRegistry())
srv := httptest.NewServer(router)
defer srv.Close()
router := routes.NewRouter(backend, nil, metrics.NewNoOpRegistry())
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := logging.ContextWithLogger(r.Context(), logging.FromContext(ctx))
router.ServeHTTP(w, r.WithContext(ctx))
})

totalDuration := atomic.Int64{}
b.SetParallelism(1000)
runtime.GC()
b.ResetTimer()
startOfBench := time.Now()
counter := atomic.NewInt64(0)
longestTxLock := sync.Mutex{}
longestTransactionID := uint64(0)
longestTransactionDuration := time.Duration(0)
b.RunParallel(func(pb *testing.PB) {
buf := bytes.NewBufferString("")
for pb.Next() {
buf.Reset()
id := counter.Add(1)

err := json.NewEncoder(buf).Encode(controllers.PostTransactionRequest{
Script: controllers.Script{
Script: core.Script{
Plain: `
vars {
account $account
}

send [USD/2 100] (
source = @world
destination = $account
)`,
},
Vars: map[string]any{
"account": fmt.Sprintf("accounts:%d", counter.Add(1)),
},
//script := controllers.Script{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this code

// Script: core.Script{
// Plain: fmt.Sprintf(`
// vars {
// account $account
// }
//
// send [USD/2 100] (
// source = @world:%d allowing unbounded overdraft
// destination = $account
// )`, counter.Load()%100),
// },
// Vars: map[string]any{
// "account": fmt.Sprintf("accounts:%d", counter.Add(1)),
// },
//}

script := controllers.Script{
Script: core.Script{
Plain: `vars {
account $account
}

send [USD/2 100] (
source = @world
destination = $account
)`,
},
Vars: map[string]any{
"account": fmt.Sprintf("accounts:%d", id),
},
}

// script := controllers.Script{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this code

// Script: core.Script{
// Plain: `vars {
// account $account
// account $src
//}
//
//send [USD/2 100] (
// source = $src allowing unbounded overdraft
// destination = $account
//)`,
// },
// Vars: map[string]any{
// "src": fmt.Sprintf("world:%d", id),
// "account": fmt.Sprintf("accounts:%d", id),
// },
// }

err := json.NewEncoder(buf).Encode(controllers.PostTransactionRequest{
Script: script,
})
require.NoError(b, err)

//ctx, _ := context.WithDeadline(ctx, time.Now().Add(10*time.Second))

req := httptest.NewRequest("POST", "/"+ledgerName+"/transactions", buf)
req = req.WithContext(ctx)
req.URL.RawQuery = url.Values{
"async": []string{os.Getenv("ASYNC")},
}.Encode()
rsp := httptest.NewRecorder()

now := time.Now()
router.ServeHTTP(rsp, req)
totalDuration.Add(time.Since(now).Milliseconds())
handler.ServeHTTP(rsp, req)
latency := time.Since(now).Milliseconds()
totalDuration.Add(latency)

require.Equal(b, http.StatusOK, rsp.Code)
tx, _ := api.DecodeSingleResponse[core.Transaction](b, rsp.Body)

longestTxLock.Lock()
if time.Millisecond*time.Duration(latency) > longestTransactionDuration {
longestTransactionID = tx.ID
longestTransactionDuration = time.Duration(latency) * time.Millisecond
}
longestTxLock.Unlock()
}
})

b.StopTimer()
b.Logf("Longest transaction: %d (%s)", longestTransactionID, longestTransactionDuration.String())
b.ReportMetric((float64(time.Duration(b.N))/float64(time.Since(startOfBench)))*float64(time.Second), "t/s")
b.ReportMetric(float64(totalDuration.Load()/int64(b.N)), "ms/transaction")
runtime.GC()
Expand Down
1 change: 1 addition & 0 deletions components/ledger/benchmarks/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
)

func TestMain(m *testing.M) {

if err := pgtesting.CreatePostgresServer(pgtesting.WithDockerHostConfigOption(func(hostConfig *docker.HostConfig) {
hostConfig.CPUCount = 2
})); err != nil {
Expand Down
11 changes: 3 additions & 8 deletions components/ledger/cmd/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ import (

"github.com/formancehq/ledger/cmd/internal"
"github.com/formancehq/ledger/pkg/api"
"github.com/formancehq/ledger/pkg/bus"
"github.com/formancehq/ledger/pkg/ledger"
"github.com/formancehq/ledger/pkg/storage"
"github.com/formancehq/ledger/pkg/storage/driver"
"github.com/formancehq/stack/libs/go-libs/otlp/otlpmetrics"
"github.com/formancehq/stack/libs/go-libs/otlp/otlptraces"
"github.com/formancehq/stack/libs/go-libs/publish"
Expand All @@ -25,26 +24,22 @@ func resolveOptions(output io.Writer, userOptions ...fx.Option) []fx.Option {
v := viper.GetViper()
debug := v.GetBool(service.DebugFlag)
if debug {
storage.InstrumentalizeSQLDriver()
driver.InstrumentalizeSQLDriver()
}

options = append(options,
publish.CLIPublisherModule(v, ServiceName),
bus.LedgerMonitorModule(),
otlptraces.CLITracesModule(v),
otlpmetrics.CLIMetricsModule(v),
api.Module(api.Config{
Version: Version,
}),
storage.CLIDriverModule(v, output, debug),
driver.CLIModule(v, output, debug),
internal.NewAnalyticsModule(v, Version),
ledger.Module(ledger.Configuration{
NumscriptCache: ledger.NumscriptCacheConfiguration{
MaxCount: v.GetInt(numscriptCacheMaxCount),
},
Query: ledger.QueryConfiguration{
LimitReadLogs: v.GetInt(queryLimitReadLogsFlag),
},
}),
)

Expand Down
5 changes: 3 additions & 2 deletions components/ledger/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"os"

"github.com/formancehq/ledger/cmd/internal"
"github.com/formancehq/ledger/pkg/storage"
"github.com/formancehq/ledger/pkg/storage/driver"
initschema "github.com/formancehq/ledger/pkg/storage/ledgerstore/migrates/0-init-schema"
"github.com/formancehq/stack/libs/go-libs/otlp/otlpmetrics"
"github.com/formancehq/stack/libs/go-libs/otlp/otlptraces"
Expand Down Expand Up @@ -44,6 +44,7 @@ func NewRootCommand() *cobra.Command {
store.AddCommand(NewStorageInit())
store.AddCommand(NewStorageList())
store.AddCommand(NewStorageUpgrade())
store.AddCommand(NewStorageUpgradeAll())
store.AddCommand(NewStorageDelete())

root.AddCommand(serve)
Expand All @@ -61,7 +62,7 @@ func NewRootCommand() *cobra.Command {
otlptraces.InitOTLPTracesFlags(root.PersistentFlags())
internal.InitAnalyticsFlags(root, DefaultSegmentWriteKey)
publish.InitCLIFlags(root)
storage.InitCLIFlags(root)
driver.InitCLIFlags(root)
initschema.InitMigrationConfigCLIFlags(root.PersistentFlags())

if err := viper.BindPFlags(root.PersistentFlags()); err != nil {
Expand Down
13 changes: 10 additions & 3 deletions components/ledger/cmd/serve.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package cmd

import (
"net/http"

"github.com/formancehq/ledger/pkg/api/middlewares"
"github.com/formancehq/stack/libs/go-libs/ballast"
"github.com/formancehq/stack/libs/go-libs/httpserver"
"github.com/formancehq/stack/libs/go-libs/logging"
app "github.com/formancehq/stack/libs/go-libs/service"
"github.com/go-chi/chi/v5"
"github.com/spf13/cobra"
Expand All @@ -12,7 +15,6 @@ import (
)

const (
queryLimitReadLogsFlag = "query-limit-read-logs"
ballastSizeInBytesFlag = "ballast-size"
numscriptCacheMaxCount = "numscript-cache-max-count"
)
Expand All @@ -24,10 +26,16 @@ func NewServe() *cobra.Command {
return app.New(cmd.OutOrStdout(), resolveOptions(
cmd.OutOrStdout(),
ballast.Module(viper.GetUint(ballastSizeInBytesFlag)),
fx.Invoke(func(lc fx.Lifecycle, h chi.Router) {
fx.Invoke(func(lc fx.Lifecycle, h chi.Router, logger logging.Logger) {

if viper.GetBool(app.DebugFlag) {
wrappedRouter := chi.NewRouter()
wrappedRouter.Use(func(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r = r.WithContext(logging.ContextWithLogger(r.Context(), logger))
handler.ServeHTTP(w, r)
})
})
wrappedRouter.Use(middlewares.Log())
wrappedRouter.Mount("/", h)
h = wrappedRouter
Expand All @@ -38,7 +46,6 @@ func NewServe() *cobra.Command {
)...).Run(cmd.Context())
},
}
cmd.Flags().Int(queryLimitReadLogsFlag, 10000, "Query limit read logs")
cmd.Flags().Uint(ballastSizeInBytesFlag, 0, "Ballast size in bytes, default to 0")
cmd.Flags().Int(numscriptCacheMaxCount, 1024, "Numscript cache max count")
return cmd
Expand Down
Loading