diff --git a/backend/controller/timeline/timeline.go b/backend/controller/timeline/timeline.go index b4655eb374..3e186a2c31 100644 --- a/backend/controller/timeline/timeline.go +++ b/backend/controller/timeline/timeline.go @@ -6,6 +6,8 @@ import ( "fmt" "time" + "github.com/alecthomas/atomic" + "github.com/TBD54566975/ftl/backend/controller/encryption" "github.com/TBD54566975/ftl/backend/controller/observability" "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" @@ -42,10 +44,12 @@ type InEvent interface { } type Service struct { - ctx context.Context - conn *stdsql.DB - encryption *encryption.Service - events chan InEvent + ctx context.Context + conn *stdsql.DB + encryption *encryption.Service + events chan InEvent + lastDroppedError atomic.Value[time.Time] + lastFailedError atomic.Value[time.Time] } func New(ctx context.Context, conn *stdsql.DB, encryption *encryption.Service) *Service { @@ -71,7 +75,10 @@ func (s *Service) EnqueueEvent(ctx context.Context, event InEvent) { select { case s.events <- event: default: - log.FromContext(ctx).Warnf("Dropping event %T due to full queue", event) + if time.Since(s.lastDroppedError.Load()) > 10*time.Second { + log.FromContext(ctx).Warnf("Dropping event %T due to full queue", event) + s.lastDroppedError.Store(time.Now()) + } } } @@ -133,7 +140,10 @@ func (s *Service) flushEvents(events []InEvent) { lastError = err } if lastError != nil { - logger.Errorf(lastError, "Failed to insert %d events, most recent error", failures) + if time.Since(s.lastFailedError.Load()) > 10*time.Second { + logger.Errorf(lastError, "Failed to insert %d events, most recent error", failures) + s.lastFailedError.Store(time.Now()) + } observability.Timeline.Failed(s.ctx, failures) } observability.Timeline.Inserted(s.ctx, len(events)-failures) diff --git a/frontend/cli/cmd_bench.go b/frontend/cli/cmd_bench.go new file mode 100644 index 0000000000..3df6265163 --- /dev/null +++ b/frontend/cli/cmd_bench.go @@ -0,0 +1,129 @@ +package main + +import ( + "context" + "fmt" + "math" + "sort" + "sync/atomic" + "time" + + "connectrpc.com/connect" + "github.com/jpillora/backoff" + "github.com/titanous/json5" + "golang.org/x/sync/errgroup" + + ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" + "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect" + "github.com/TBD54566975/ftl/go-runtime/ftl/reflection" + "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/rpc" +) + +type benchCmd struct { + Count int `short:"c" help:"Number of times to call the Verb in each thread." default:"128"` + Parallelism int `short:"j" help:"Number of concurrent benchmarks to create." default:"${numcpu}"` + Wait time.Duration `short:"w" help:"Wait up to this elapsed time for the FTL cluster to become available." default:"1m"` + Verb reflection.Ref `arg:"" required:"" help:"Full path of Verb to call." predictor:"verbs"` + Request string `arg:"" optional:"" help:"JSON5 request payload." default:"{}"` +} + +func (c *benchCmd) Run(ctx context.Context, client ftlv1connect.VerbServiceClient) error { + ctx, cancel := context.WithTimeout(ctx, c.Wait) + defer cancel() + if err := rpc.Wait(ctx, backoff.Backoff{Max: time.Second * 2}, client); err != nil { + return fmt.Errorf("FTL cluster did not become ready: %w", err) + } + logger := log.FromContext(ctx) + request := map[string]any{} + err := json5.Unmarshal([]byte(c.Request), &request) + if err != nil { + return fmt.Errorf("invalid request: %w", err) + } + + fmt.Printf("Starting benchmark\n") + fmt.Printf(" Verb: %s\n", c.Verb) + fmt.Printf(" Count: %d\n", c.Count) + fmt.Printf(" Parallelism: %d\n", c.Parallelism) + + var errors int64 + var success int64 + wg := errgroup.Group{} + timings := make([][]time.Duration, c.Parallelism) + for job := range c.Parallelism { + wg.Go(func() error { + for range c.Count { + start := time.Now() + // otherwise, we have a match so call the verb + _, err := client.Call(ctx, connect.NewRequest(&ftlv1.CallRequest{ + Verb: c.Verb.ToProto(), + Body: []byte(c.Request), + })) + if err != nil { + // Only log error once. + if atomic.AddInt64(&errors, 1) == 1 { + logger.Errorf(err, "Error calling %s", c.Verb) + } + } else { + atomic.AddInt64(&success, 1) + } + timings[job] = append(timings[job], time.Since(start)) + } + return nil + }) + } + _ = wg.Wait() //nolint: errcheck + + // Display timing percentiles. + var allTimings []time.Duration + for _, t := range timings { + allTimings = append(allTimings, t...) + } + sort.Slice(allTimings, func(i, j int) bool { return allTimings[i] < allTimings[j] }) + fmt.Printf("Results:\n") + fmt.Printf(" Successes: %d\n", success) + fmt.Printf(" Errors: %d\n", errors) + fmt.Printf("Timing percentiles:\n") + for p, t := range computePercentiles(allTimings) { + fmt.Printf(" %d%%: %s\n", p, t) + } + fmt.Printf("Standard deviation: ±%v\n", computeStandardDeviation(allTimings)) + return nil +} + +func computePercentiles(timings []time.Duration) map[int]time.Duration { + percentiles := map[int]time.Duration{} + for _, p := range []int{50, 90, 95, 99} { + percentiles[p] = percentile(timings, p) + } + return percentiles +} + +func percentile(timings []time.Duration, p int) time.Duration { + if len(timings) == 0 { + return 0 + } + i := int(float64(len(timings)) * float64(p) / 100) + return timings[i] +} + +func computeStandardDeviation(timings []time.Duration) time.Duration { + if len(timings) == 0 { + return 0 + } + + var sum time.Duration + for _, t := range timings { + sum += t + } + mean := float64(sum) / float64(len(timings)) + + var varianceSum float64 + for _, t := range timings { + diff := float64(t) - mean + varianceSum += diff * diff + } + variance := varianceSum / float64(len(timings)) + + return time.Duration(math.Sqrt(variance)) +} diff --git a/frontend/cli/cmd_call.go b/frontend/cli/cmd_call.go index 2c5b96be35..faf8e9bb4e 100644 --- a/frontend/cli/cmd_call.go +++ b/frontend/cli/cmd_call.go @@ -62,7 +62,7 @@ func callVerb(ctx context.Context, client ftlv1connect.VerbServiceClient, ctlCli if cerr := new(connect.Error); errors.As(err, &cerr) && cerr.Code() == connect.CodeNotFound { suggestions, err := findSuggestions(ctx, ctlCli, verb) - // if we have suggestions, return a helpful error message. otherwise continue to the original error + // If we have suggestions, return a helpful error message, otherwise continue to the original error. if err == nil { return fmt.Errorf("verb not found: %s\n\nDid you mean one of these?\n%s", verb, strings.Join(suggestions, "\n")) } diff --git a/frontend/cli/main.go b/frontend/cli/main.go index 1e3f8c9411..5f74d69960 100644 --- a/frontend/cli/main.go +++ b/frontend/cli/main.go @@ -39,6 +39,7 @@ type InteractiveCLI struct { New newCmd `cmd:"" help:"Create a new FTL module."` PS psCmd `cmd:"" help:"List deployments."` Call callCmd `cmd:"" help:"Call an FTL function."` + Bench benchCmd `cmd:"" help:"Benchmark an FTL function."` Replay replayCmd `cmd:"" help:"Call an FTL function with the same request body as the last invocation."` Update updateCmd `cmd:"" help:"Update a deployment."` Kill killCmd `cmd:"" help:"Kill a deployment."`