Skip to content

Commit

Permalink
feat: ModuleContext periodically refreshed on the controller (#1865)
Browse files Browse the repository at this point in the history
fixes #1699

`GetModuleContext` will stream updated `ModuleContext` to the runner.

---------

Co-authored-by: worstell <[email protected]>
Co-authored-by: Alec Thomas <[email protected]>
Co-authored-by: Matt Toohey <[email protected]>
  • Loading branch information
4 people authored Jun 26, 2024
1 parent 6296412 commit 4a30e97
Showing 1 changed file with 45 additions and 6 deletions.
51 changes: 45 additions & 6 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controller

import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -674,11 +675,12 @@ nextModule:
func (s *Service) GetModuleContext(ctx context.Context, req *connect.Request[ftlv1.ModuleContextRequest], resp *connect.ServerStream[ftlv1.ModuleContextResponse]) error {
name := req.Msg.Module

// TODO migrate to a polling implementation that only emits responses when the configuration changes

cm := cf.ConfigFromContext(ctx)
sm := cf.SecretsFromContext(ctx)

// Initialize checksum to -1; a zero checksum does occur when the context contains no settings
lastChecksum := int64(-1)

for {
configs, err := cm.MapForModule(ctx, name)
if err != nil {
Expand All @@ -693,16 +695,53 @@ func (s *Service) GetModuleContext(ctx context.Context, req *connect.Request[ftl
return connect.NewError(connect.CodeInternal, fmt.Errorf("could not get databases: %w", err))
}

response := modulecontext.NewBuilder(name).AddConfigs(configs).AddSecrets(secrets).AddDatabases(databases).Build().ToProto()
checksum := configurationMapChecksum(configs)
checksum = (checksum * 115163) + configurationMapChecksum(secrets)
checksum = (checksum * 454213) + configurationDatabaseChecksum(databases)

if checksum != lastChecksum {
response := modulecontext.NewBuilder(name).AddConfigs(configs).AddSecrets(secrets).AddDatabases(databases).Build().ToProto()

if err := resp.Send(response); err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("could not send response: %w", err))
}

if err := resp.Send(response); err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("could not send response: %w", err))
lastChecksum = checksum
}

time.Sleep(s.config.ModuleUpdateFrequency)
select {
case <-ctx.Done():
return nil
case <-time.After(s.config.ModuleUpdateFrequency):
}
}
}

// configurationMapChecksum computes a checksum on the map that is order invariant.
//
// This operation is used to detect configuration change.
func configurationMapChecksum(m map[string][]byte) int64 {
sum := int64(0)
for k, v := range m {
data := sha256.Sum(append([]byte(k), v...))
sum += int64(binary.BigEndian.Uint64(data[0:8]))
}
return sum
}

// configurationMapChecksum computes a checksum on the database map that is order invariant.
//
// This operation is used to detect configuration change.
func configurationDatabaseChecksum(m map[string]modulecontext.Database) int64 {
sum := int64(0)
for k, v := range m {
// currently, only the DSN is treated as mutable configuration
data := sha256.Sum(append([]byte(k), []byte(v.DSN)...))
sum += int64(binary.BigEndian.Uint64(data[0:8]))
}
return sum
}

// AcquireLease acquires a lease on behalf of a module.
//
// This is a bidirectional stream where each request from the client must be
Expand Down

0 comments on commit 4a30e97

Please sign in to comment.