From 4a30e970ac4ec9c03ef43a829781ee7da838f6c2 Mon Sep 17 00:00:00 2001 From: Jon Johnson <113393155+jonathanj-square@users.noreply.github.com> Date: Wed, 26 Jun 2024 12:23:39 -0700 Subject: [PATCH] feat: ModuleContext periodically refreshed on the controller (#1865) fixes #1699 `GetModuleContext` will stream updated `ModuleContext` to the runner. --------- Co-authored-by: worstell Co-authored-by: Alec Thomas Co-authored-by: Matt Toohey --- backend/controller/controller.go | 51 ++++++++++++++++++++++++++++---- 1 file changed, 45 insertions(+), 6 deletions(-) diff --git a/backend/controller/controller.go b/backend/controller/controller.go index e4e6f42b91..a699fe31a1 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -2,6 +2,7 @@ package controller import ( "context" + "encoding/binary" "errors" "fmt" "io" @@ -674,11 +675,12 @@ nextModule: func (s *Service) GetModuleContext(ctx context.Context, req *connect.Request[ftlv1.ModuleContextRequest], resp *connect.ServerStream[ftlv1.ModuleContextResponse]) error { name := req.Msg.Module - // TODO migrate to a polling implementation that only emits responses when the configuration changes - cm := cf.ConfigFromContext(ctx) sm := cf.SecretsFromContext(ctx) + // Initialize checksum to -1; a zero checksum does occur when the context contains no settings + lastChecksum := int64(-1) + for { configs, err := cm.MapForModule(ctx, name) if err != nil { @@ -693,16 +695,53 @@ func (s *Service) GetModuleContext(ctx context.Context, req *connect.Request[ftl return connect.NewError(connect.CodeInternal, fmt.Errorf("could not get databases: %w", err)) } - response := modulecontext.NewBuilder(name).AddConfigs(configs).AddSecrets(secrets).AddDatabases(databases).Build().ToProto() + checksum := configurationMapChecksum(configs) + checksum = (checksum * 115163) + configurationMapChecksum(secrets) + checksum = (checksum * 454213) + configurationDatabaseChecksum(databases) + + if checksum != lastChecksum { + response := modulecontext.NewBuilder(name).AddConfigs(configs).AddSecrets(secrets).AddDatabases(databases).Build().ToProto() + + if err := resp.Send(response); err != nil { + return connect.NewError(connect.CodeInternal, fmt.Errorf("could not send response: %w", err)) + } - if err := resp.Send(response); err != nil { - return connect.NewError(connect.CodeInternal, fmt.Errorf("could not send response: %w", err)) + lastChecksum = checksum } - time.Sleep(s.config.ModuleUpdateFrequency) + select { + case <-ctx.Done(): + return nil + case <-time.After(s.config.ModuleUpdateFrequency): + } } } +// configurationMapChecksum computes a checksum on the map that is order invariant. +// +// This operation is used to detect configuration change. +func configurationMapChecksum(m map[string][]byte) int64 { + sum := int64(0) + for k, v := range m { + data := sha256.Sum(append([]byte(k), v...)) + sum += int64(binary.BigEndian.Uint64(data[0:8])) + } + return sum +} + +// configurationMapChecksum computes a checksum on the database map that is order invariant. +// +// This operation is used to detect configuration change. +func configurationDatabaseChecksum(m map[string]modulecontext.Database) int64 { + sum := int64(0) + for k, v := range m { + // currently, only the DSN is treated as mutable configuration + data := sha256.Sum(append([]byte(k), []byte(v.DSN)...)) + sum += int64(binary.BigEndian.Uint64(data[0:8])) + } + return sum +} + // AcquireLease acquires a lease on behalf of a module. // // This is a bidirectional stream where each request from the client must be