Skip to content

Commit

Permalink
feat: Map returns cached outputs if the inputs are unchanged
Browse files Browse the repository at this point in the history
This incurs a slight penalty due to the JSON serialisation + hashing,
but it should be relatively insignificant in most cases.
  • Loading branch information
alecthomas committed Jun 7, 2024
1 parent 6f08f0f commit 8b004a2
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 7 deletions.
2 changes: 1 addition & 1 deletion Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ integration-tests *test:
#!/bin/bash
set -euo pipefail
testName=${1:-}
go test -fullpath -count 1 -v -tags integration -run "$testName" -p 1 ./...
go test -fullpath -count 1 -v -tags integration -run "$testName" -p 1 $(git ls-files -z | xargs -0 grep -r -l "$testName" | xargs grep -l '//go:build integration' | xargs -I {} dirname './{}')

# Run README doc tests
test-readme *args:
Expand Down
2 changes: 1 addition & 1 deletion go-runtime/ftl/ftltest/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (f *fakeFTL) startAllowingMapCalls() {
f.allowMapCalls = true
}

func (f *fakeFTL) CallMap(ctx context.Context, mapper any, mapImpl func(context.Context) (any, error)) any {
func (f *fakeFTL) CallMap(ctx context.Context, mapper any, value any, mapImpl func(context.Context) (any, error)) any {
key := makeMapKey(mapper)
mockMap, ok := f.mockMaps[key]
if ok {
Expand Down
11 changes: 9 additions & 2 deletions go-runtime/ftl/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,16 @@ type MapHandle[T, U any] struct {
getter Handle[T]
}

// Underlying returns the underlying value of the handle being mapped.
func (mh *MapHandle[T, U]) Underlying(ctx context.Context) T {
return mh.getter.Get(ctx)
}

// Get the mapped value.
func (mh *MapHandle[T, U]) Get(ctx context.Context) U {
out := internal.FromContext(ctx).CallMap(ctx, mh, func(ctx context.Context) (any, error) {
return mh.fn(ctx, mh.getter.Get(ctx))
value := mh.getter.Get(ctx)
out := internal.FromContext(ctx).CallMap(ctx, mh, value, func(ctx context.Context) (any, error) {
return mh.fn(ctx, value)
})
u, ok := out.(U)
if !ok {
Expand Down
6 changes: 5 additions & 1 deletion go-runtime/internal/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ type FTL interface {
PublishEvent(ctx context.Context, topic string, event any) error

// CallMap calls Get on an instance of an ftl.Map.
CallMap(ctx context.Context, mapper any, mapImpl func(context.Context) (any, error)) any
//
// "mapper" is a pointer to an instance of an ftl.MapHandle. "value" is the
// value being mapped. "mapImpl" is a function that will be called to
// compute the mapped value.
CallMap(ctx context.Context, mapper any, value any, mapImpl func(context.Context) (any, error)) any

// GetConfig unmarshals a configuration value into dest.
GetConfig(ctx context.Context, name string, dest any) error
Expand Down
41 changes: 39 additions & 2 deletions go-runtime/internal/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ package internal

import (
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"reflect"

"connectrpc.com/connect"

"github.com/puzpuzpuz/xsync/v3"

ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
Expand All @@ -17,13 +21,25 @@ import (
"github.com/TBD54566975/ftl/internal/rpc"
)

type mapCacheEntry struct {
checksum [32]byte
output any
}

// RealFTL is the real implementation of the [internal.FTL] interface using the Controller.
type RealFTL struct {
mctx modulecontext.ModuleContext
// Cache for Map() calls
mapped *xsync.MapOf[uintptr, mapCacheEntry]
}

// New creates a new [RealFTL]
func New(mctx modulecontext.ModuleContext) *RealFTL { return &RealFTL{mctx: mctx} }
func New(mctx modulecontext.ModuleContext) *RealFTL {
return &RealFTL{
mctx: mctx,
mapped: xsync.NewMapOf[uintptr, mapCacheEntry](),
}
}

var _ FTL = &RealFTL{}

Expand Down Expand Up @@ -65,10 +81,31 @@ func (r *RealFTL) PublishEvent(ctx context.Context, topic string, event any) err
return nil
}

func (r *RealFTL) CallMap(ctx context.Context, mapper any, mapImpl func(context.Context) (any, error)) any {
func (r *RealFTL) CallMap(ctx context.Context, mapper any, value any, mapImpl func(context.Context) (any, error)) any {
// Compute checksum of the input.
inputData, err := json.Marshal(value)
if err != nil {
return fmt.Errorf("failed to marshal input data: %w", err)
}
checksum := sha256.Sum256(inputData)

// Check cache.
key := reflect.ValueOf(mapper).Pointer()
cached, ok := r.mapped.Load(key)
if ok && checksum == cached.checksum {
return cached.output
}

// Map the value
t, err := mapImpl(ctx)
if err != nil {
panic(err)
}

// Write the cache back.
r.mapped.Store(key, mapCacheEntry{
checksum: checksum,
output: t,
})
return t
}

0 comments on commit 8b004a2

Please sign in to comment.