From 8b004a20ef81062147166a4c5d4e2c0d12cd09c9 Mon Sep 17 00:00:00 2001 From: Alec Thomas Date: Sat, 8 Jun 2024 08:22:09 +1000 Subject: [PATCH] feat: Map returns cached outputs if the inputs are unchanged This incurs a slight penalty due to the JSON serialisation + hashing, but it should be relatively insignificant in most cases. --- Justfile | 2 +- go-runtime/ftl/ftltest/fake.go | 2 +- go-runtime/ftl/map.go | 11 +++++++-- go-runtime/internal/api.go | 6 ++++- go-runtime/internal/impl.go | 41 ++++++++++++++++++++++++++++++++-- 5 files changed, 55 insertions(+), 7 deletions(-) diff --git a/Justfile b/Justfile index 87932967a7..3d3e527a9f 100644 --- a/Justfile +++ b/Justfile @@ -114,7 +114,7 @@ integration-tests *test: #!/bin/bash set -euo pipefail testName=${1:-} - go test -fullpath -count 1 -v -tags integration -run "$testName" -p 1 ./... + go test -fullpath -count 1 -v -tags integration -run "$testName" -p 1 $(git ls-files -z | xargs -0 grep -r -l "$testName" | xargs grep -l '//go:build integration' | xargs -I {} dirname './{}') # Run README doc tests test-readme *args: diff --git a/go-runtime/ftl/ftltest/fake.go b/go-runtime/ftl/ftltest/fake.go index c5b3d94b4a..52754f9e10 100644 --- a/go-runtime/ftl/ftltest/fake.go +++ b/go-runtime/ftl/ftltest/fake.go @@ -71,7 +71,7 @@ func (f *fakeFTL) startAllowingMapCalls() { f.allowMapCalls = true } -func (f *fakeFTL) CallMap(ctx context.Context, mapper any, mapImpl func(context.Context) (any, error)) any { +func (f *fakeFTL) CallMap(ctx context.Context, mapper any, value any, mapImpl func(context.Context) (any, error)) any { key := makeMapKey(mapper) mockMap, ok := f.mockMaps[key] if ok { diff --git a/go-runtime/ftl/map.go b/go-runtime/ftl/map.go index 4e296b676c..3367fb2aa7 100644 --- a/go-runtime/ftl/map.go +++ b/go-runtime/ftl/map.go @@ -12,9 +12,16 @@ type MapHandle[T, U any] struct { getter Handle[T] } +// Underlying returns the underlying value of the handle being mapped. +func (mh *MapHandle[T, U]) Underlying(ctx context.Context) T { + return mh.getter.Get(ctx) +} + +// Get the mapped value. func (mh *MapHandle[T, U]) Get(ctx context.Context) U { - out := internal.FromContext(ctx).CallMap(ctx, mh, func(ctx context.Context) (any, error) { - return mh.fn(ctx, mh.getter.Get(ctx)) + value := mh.getter.Get(ctx) + out := internal.FromContext(ctx).CallMap(ctx, mh, value, func(ctx context.Context) (any, error) { + return mh.fn(ctx, value) }) u, ok := out.(U) if !ok { diff --git a/go-runtime/internal/api.go b/go-runtime/internal/api.go index 5c1e80064d..f86de391b0 100644 --- a/go-runtime/internal/api.go +++ b/go-runtime/internal/api.go @@ -19,7 +19,11 @@ type FTL interface { PublishEvent(ctx context.Context, topic string, event any) error // CallMap calls Get on an instance of an ftl.Map. - CallMap(ctx context.Context, mapper any, mapImpl func(context.Context) (any, error)) any + // + // "mapper" is a pointer to an instance of an ftl.MapHandle. "value" is the + // value being mapped. "mapImpl" is a function that will be called to + // compute the mapped value. + CallMap(ctx context.Context, mapper any, value any, mapImpl func(context.Context) (any, error)) any // GetConfig unmarshals a configuration value into dest. GetConfig(ctx context.Context, name string, dest any) error diff --git a/go-runtime/internal/impl.go b/go-runtime/internal/impl.go index b2d2e27f9e..61a29e6a7e 100644 --- a/go-runtime/internal/impl.go +++ b/go-runtime/internal/impl.go @@ -2,11 +2,15 @@ package internal import ( "context" + "crypto/sha256" + "encoding/json" "fmt" "reflect" "connectrpc.com/connect" + "github.com/puzpuzpuz/xsync/v3" + ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect" schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema" @@ -17,13 +21,25 @@ import ( "github.com/TBD54566975/ftl/internal/rpc" ) +type mapCacheEntry struct { + checksum [32]byte + output any +} + // RealFTL is the real implementation of the [internal.FTL] interface using the Controller. type RealFTL struct { mctx modulecontext.ModuleContext + // Cache for Map() calls + mapped *xsync.MapOf[uintptr, mapCacheEntry] } // New creates a new [RealFTL] -func New(mctx modulecontext.ModuleContext) *RealFTL { return &RealFTL{mctx: mctx} } +func New(mctx modulecontext.ModuleContext) *RealFTL { + return &RealFTL{ + mctx: mctx, + mapped: xsync.NewMapOf[uintptr, mapCacheEntry](), + } +} var _ FTL = &RealFTL{} @@ -65,10 +81,31 @@ func (r *RealFTL) PublishEvent(ctx context.Context, topic string, event any) err return nil } -func (r *RealFTL) CallMap(ctx context.Context, mapper any, mapImpl func(context.Context) (any, error)) any { +func (r *RealFTL) CallMap(ctx context.Context, mapper any, value any, mapImpl func(context.Context) (any, error)) any { + // Compute checksum of the input. + inputData, err := json.Marshal(value) + if err != nil { + return fmt.Errorf("failed to marshal input data: %w", err) + } + checksum := sha256.Sum256(inputData) + + // Check cache. + key := reflect.ValueOf(mapper).Pointer() + cached, ok := r.mapped.Load(key) + if ok && checksum == cached.checksum { + return cached.output + } + + // Map the value t, err := mapImpl(ctx) if err != nil { panic(err) } + + // Write the cache back. + r.mapped.Store(key, mapCacheEntry{ + checksum: checksum, + output: t, + }) return t }