Skip to content

Commit

Permalink
handle module removal
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas committed Dec 5, 2024
1 parent faa2ffb commit 966b746
Show file tree
Hide file tree
Showing 11 changed files with 110 additions and 157 deletions.
47 changes: 32 additions & 15 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -834,8 +834,7 @@ func hashConfigurationMap(h hash.Hash, m map[string][]byte) error {
return nil
}

// hashRoutesTable computes an order invariant checksum on the configuration
// settings supplied in the map.
// hashRoutesTable computes an order invariant checksum on the routes
func hashRoutesTable(h hash.Hash, m map[string]string) error {
keys := maps.Keys(m)
sort.Strings(keys)
Expand Down Expand Up @@ -1557,9 +1556,9 @@ func (s *Service) watchModuleChanges(ctx context.Context, sendChange func(respon
hash []byte
minReplicas int
}
moduleState := map[string]moduleStateEntry{}
deploymentState := map[string]moduleStateEntry{}
moduleByDeploymentKey := map[string]string{}
mostRecentDeploymentByModule := map[string]string{}
aliveDeploymentsForModule := map[string]map[string]bool{}
schemaByDeploymentKey := map[string]*schemapb.Module{}

// Seed the notification channel with the current deployments.
Expand Down Expand Up @@ -1602,20 +1601,24 @@ func (s *Service) watchModuleChanges(ctx context.Context, sendChange func(respon
if deletion, ok := notification.Deleted.Get(); ok {
name := moduleByDeploymentKey[deletion.String()]
schema := schemaByDeploymentKey[deletion.String()]
moduleRemoved := mostRecentDeploymentByModule[name] == deletion.String()
moduleRemoved := true
if aliveDeploymentsForModule[name] != nil {
delete(aliveDeploymentsForModule[name], deletion.String())
moduleRemoved = len(aliveDeploymentsForModule[name]) == 0
if moduleRemoved {
delete(aliveDeploymentsForModule, name)
}
}
response = &ftlv1.PullSchemaResponse{
ModuleName: name,
DeploymentKey: proto.String(deletion.String()),
ChangeType: ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGE_TYPE_REMOVED,
ModuleRemoved: moduleRemoved,
Schema: schema,
}
delete(moduleState, name)
delete(deploymentState, deletion.String())
delete(moduleByDeploymentKey, deletion.String())
delete(schemaByDeploymentKey, deletion.String())
if moduleRemoved {
delete(mostRecentDeploymentByModule, name)
}
} else if message, ok := notification.Message.Get(); ok {
if message.Schema.Runtime == nil {
message.Schema.Runtime = &schema.ModuleRuntime{}
Expand All @@ -1640,14 +1643,25 @@ func (s *Service) watchModuleChanges(ctx context.Context, sendChange func(respon
hash: hasher.Sum(nil),
minReplicas: message.MinReplicas,
}
if current, ok := moduleState[message.Schema.Name]; ok {
if current, ok := deploymentState[message.Key.String()]; ok {
if !bytes.Equal(current.hash, newState.hash) || current.minReplicas != newState.minReplicas {
alive := aliveDeploymentsForModule[moduleSchema.Name]
if alive == nil {
alive = map[string]bool{}
aliveDeploymentsForModule[moduleSchema.Name] = alive
}
if newState.minReplicas > 0 {
alive[message.Key.String()] = true
} else {
delete(alive, message.Key.String())
}
changeType := ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGE_TYPE_CHANGED
// A deployment is considered removed if its minReplicas is set to 0.
moduleRemoved := false
if current.minReplicas > 0 && message.MinReplicas == 0 {
changeType = ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGE_TYPE_REMOVED
moduleRemoved = mostRecentDeploymentByModule[message.Schema.Name] == message.Key.String()
moduleRemoved = len(alive) == 0
logger.Infof("Deployment %s was deleted via update notfication with module removed %v", deletion, moduleRemoved)
}
response = &ftlv1.PullSchemaResponse{
ModuleName: moduleSchema.Name,
Expand All @@ -1658,7 +1672,12 @@ func (s *Service) watchModuleChanges(ctx context.Context, sendChange func(respon
}
}
} else {
mostRecentDeploymentByModule[message.Schema.Name] = message.Key.String()
alive := aliveDeploymentsForModule[moduleSchema.Name]
if alive == nil {
alive = map[string]bool{}
aliveDeploymentsForModule[moduleSchema.Name] = alive
}
alive[message.Key.String()] = true
response = &ftlv1.PullSchemaResponse{
ModuleName: moduleSchema.Name,
DeploymentKey: proto.String(message.Key.String()),
Expand All @@ -1670,9 +1689,7 @@ func (s *Service) watchModuleChanges(ctx context.Context, sendChange func(respon
initialCount--
}
}
moduleState[message.Schema.Name] = newState
delete(moduleByDeploymentKey, message.Key.String()) // The deployment may have changed.
delete(schemaByDeploymentKey, message.Key.String())
deploymentState[message.Key.String()] = newState
moduleByDeploymentKey[message.Key.String()] = message.Schema.Name
schemaByDeploymentKey[message.Key.String()] = moduleSchema
}
Expand Down
38 changes: 0 additions & 38 deletions backend/controller/encryption/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,44 +30,6 @@ func WithEncryption() in.Option {
return in.WithEnvar("FTL_KMS_URI", "fake-kms://CKbvh_ILElQKSAowdHlwZS5nb29nbGVhcGlzLmNvbS9nb29nbGUuY3J5cHRvLnRpbmsuQWVzR2NtS2V5EhIaEE6tD2yE5AWYOirhmkY-r3sYARABGKbvh_ILIAE")
}

func TestEncryptionForLogs(t *testing.T) {
t.Skip("This test needs the timeline service refactoring done")
in.Run(t,
WithEncryption(),
in.CopyModule("encryption"),
in.Deploy("encryption"),
in.Call[map[string]interface{}, any]("encryption", "echo", map[string]interface{}{"name": "Alice"}, nil),

// confirm that we can read an event for that call
func(t testing.TB, ic in.TestContext) {
in.Infof("Read Logs")
resp, err := ic.Console.GetEvents(ic.Context, connect.NewRequest(&pbconsole.GetEventsRequest{
Limit: 10,
}))
assert.NoError(t, err, "could not get events")
_, ok := slices.Find(resp.Msg.Events, func(e *pbtimeline.Event) bool {
call, ok := e.Entry.(*pbtimeline.Event_Call)
if !ok {
return false
}
assert.Contains(t, call.Call.Request, "Alice", "request does not contain expected value")

return true
})
assert.True(t, ok, "could not find event")
},

// confirm that we can't find that raw request string in the table
in.QueryRow("ftl", "SELECT COUNT(*) FROM timeline WHERE type = 'call'", int64(1)),
func(t testing.TB, ic in.TestContext) {
values := in.GetRow(t, ic, "ftl", "SELECT payload FROM timeline WHERE type = 'call' LIMIT 1", 1)
payload, ok := values[0].([]byte)
assert.True(t, ok, "could not convert payload to string")
assert.NotContains(t, string(payload), "Alice", "raw request string should not be stored in the table")
},
)
}

func TestEncryptionForPubSub(t *testing.T) {
in.Run(t,
WithEncryption(),
Expand Down
20 changes: 13 additions & 7 deletions backend/provisioner/runner_scaling_provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func provisionRunner(scaling scaling.RunnerScaling) InMemResourceProvisionerFn {
return nil, fmt.Errorf("failed to parse schema: %w", err)
}
logger.Debugf("provisioning runner: %s.%s for deployment %s", module, id, deployment)
err = scaling.StartDeployment(ctx, module, deployment, schema)
err = scaling.StartDeployment(ctx, module, deployment, schema, false, false)
if err != nil {
logger.Infof("failed to start deployment: %v", err)
return nil, fmt.Errorf("failed to start deployment: %w", err)
Expand Down Expand Up @@ -80,18 +80,24 @@ func provisionRunner(scaling scaling.RunnerScaling) InMemResourceProvisionerFn {
}
}

schemaClient := rpc.ClientFromContext[ftlv1connect.SchemaServiceClient](ctx)
controllerClient := rpc.ClientFromContext[ftlv1connect.ControllerServiceClient](ctx)
runner.Runner.Output = &provisioner.RunnerResource_RunnerResourceOutput{
RunnerUri: endpointURI,
DeploymentKey: deployment,
}
if previous != nil && previous.GetRunner().GetOutput().GetDeploymentKey() != deployment {
logger.Debugf("terminating previous deployment: %s", previous.GetRunner().GetOutput().GetDeploymentKey())
err := scaling.TerminateDeployment(ctx, module, previous.GetRunner().GetOutput().GetDeploymentKey())
if err != nil {
logger.Errorf(err, "failed to terminate previous deployment")
deps, err := scaling.TerminatePreviousDeployments(ctx, module, deployment)
if err != nil {
logger.Errorf(err, "failed to terminate previous deployments")
} else {
var zero int32
for _, dep := range deps {
_, err := controllerClient.UpdateDeploy(ctx, connect.NewRequest(&ftlv1.UpdateDeployRequest{DeploymentKey: dep, MinReplicas: &zero}))
if err != nil {
logger.Errorf(err, "failed to update deployment %s", dep)
}
}
}
schemaClient := rpc.ClientFromContext[ftlv1connect.SchemaServiceClient](ctx)

logger.Infof("updating module runtime for %s with endpoint %s", module, endpointURI)
_, err = schemaClient.UpdateDeploymentRuntime(ctx, connect.NewRequest(&ftlv1.UpdateDeploymentRuntimeRequest{Deployment: deployment, Event: &schemapb.ModuleRuntimeEvent{Value: &schemapb.ModuleRuntimeEvent_ModuleRuntimeDeployment{ModuleRuntimeDeployment: &schemapb.ModuleRuntimeDeployment{DeploymentKey: deployment, Endpoint: endpointURI}}}}))
Expand Down
100 changes: 40 additions & 60 deletions backend/provisioner/scaling/k8sscaling/k8s_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewK8sScaling(disableIstio bool, controllerURL string) scaling.RunnerScalin
return &k8sScaling{disableIstio: disableIstio, controller: controllerURL}
}

func (r *k8sScaling) StartDeployment(ctx context.Context, module string, deploymentKey string, sch *schema.Module) error {
func (r *k8sScaling) StartDeployment(ctx context.Context, module string, deploymentKey string, sch *schema.Module, hasCron bool, hasIngress bool) error {
logger := log.FromContext(ctx)
logger = logger.Module(module)
ctx = log.ContextWithLogger(ctx, logger)
Expand All @@ -84,57 +84,45 @@ func (r *k8sScaling) StartDeployment(ctx context.Context, module string, deploym
return r.handleExistingDeployment(ctx, deployment)

}
err = r.handleNewDeployment(ctx, module, deploymentKey, sch)
err = r.handleNewDeployment(ctx, module, deploymentKey, sch, hasCron, hasIngress)
if err != nil {
return err
}
err = r.waitForDeploymentReady(ctx, deploymentKey, deployTimeout)
if err != nil {
return err
}
delCtx := log.ContextWithLogger(context.Background(), logger)
go func() {
time.Sleep(time.Second * 20)
err := r.deleteOldDeployments(delCtx, module, deploymentKey)
if err != nil {
logger.Errorf(err, "Failed to delete old deployments")
}
}()

return nil
}

func (r *k8sScaling) TerminateDeployment(ctx context.Context, module string, deploymentKey string) error {
func (r *k8sScaling) TerminatePreviousDeployments(ctx context.Context, module string, deploymentKey string) ([]string, error) {
logger := log.FromContext(ctx)
logger = logger.Module(module)
logger.Debugf("Handling schema change for %s", deploymentKey)
delCtx := log.ContextWithLogger(context.Background(), logger)
deploymentClient := r.client.AppsV1().Deployments(r.namespace)
_, err := deploymentClient.Get(ctx, deploymentKey, v1.GetOptions{})
deploymentExists := true
deployments, err := deploymentClient.List(ctx, v1.ListOptions{LabelSelector: moduleLabel + "=" + module})
var ret []string
if err != nil {
if errors.IsNotFound(err) {
deploymentExists = false
} else {
return fmt.Errorf("failed to get deployment %s: %w", deploymentKey, err)
return nil, fmt.Errorf("failed to list deployments: %w", err)
}
for _, deploy := range deployments.Items {
if deploy.Name != deploymentKey {
logger.Debugf("Queing old deployment %s for deletion", deploy.Name)
ret = append(ret, deploy.Name)
}
}

if deploymentExists {
go func() {

// Nasty hack, we want all the controllers to have updated their route tables before we kill the runner
// so we add a slight delay here
time.Sleep(time.Second * 10)
r.knownDeployments.Delete(deploymentKey)
logger.Debugf("Deleting service %s", module)
err = r.client.CoreV1().Services(r.namespace).Delete(ctx, deploymentKey, v1.DeleteOptions{})
// So hacky, all this needs to change when the provisioner is a proper schema observer
go func() {
time.Sleep(time.Second * 20)
for _, dep := range ret {
err = deploymentClient.Delete(delCtx, dep, v1.DeleteOptions{})
if err != nil {
if !errors.IsNotFound(err) {
logger.Errorf(err, "Failed to delete service %s", module)
}
logger.Errorf(err, "Failed to delete deployment %s", dep)
}
}()
}
return nil
}
}()
return ret, nil
}

func (r *k8sScaling) Start(ctx context.Context) error {
Expand Down Expand Up @@ -283,7 +271,7 @@ func (r *k8sScaling) thisContainerImage(ctx context.Context) (string, error) {
return thisDeployment.Spec.Template.Spec.Containers[0].Image, nil
}

func (r *k8sScaling) handleNewDeployment(ctx context.Context, module string, name string, sch *schema.Module) error {
func (r *k8sScaling) handleNewDeployment(ctx context.Context, module string, name string, sch *schema.Module, cron bool, ingress bool) error {
logger := log.FromContext(ctx)

cm, err := r.client.CoreV1().ConfigMaps(r.namespace).Get(ctx, configMapName, v1.GetOptions{})
Expand Down Expand Up @@ -355,7 +343,7 @@ func (r *k8sScaling) handleNewDeployment(ctx context.Context, module string, nam

// Sync the istio policy if applicable
if sec, ok := r.istioSecurity.Get(); ok {
err = r.syncIstioPolicy(ctx, sec, module, name, service, controllerDeployment, provisionerDeployment, sch)
err = r.syncIstioPolicy(ctx, sec, module, name, service, controllerDeployment, provisionerDeployment, sch, cron, ingress)
if err != nil {
return err
}
Expand Down Expand Up @@ -548,7 +536,7 @@ func (r *k8sScaling) updateEnvVar(deployment *kubeapps.Deployment, envVerName st
return changes
}

func (r *k8sScaling) syncIstioPolicy(ctx context.Context, sec istioclient.Clientset, module string, name string, service *kubecore.Service, controllerDeployment *kubeapps.Deployment, provisionerDeployment *kubeapps.Deployment, sch *schema.Module) error {
func (r *k8sScaling) syncIstioPolicy(ctx context.Context, sec istioclient.Clientset, module string, name string, service *kubecore.Service, controllerDeployment *kubeapps.Deployment, provisionerDeployment *kubeapps.Deployment, sch *schema.Module, hasCron bool, hasIngress bool) error {
logger := log.FromContext(ctx)
logger.Debugf("Creating new istio policy for %s", name)

Expand Down Expand Up @@ -580,15 +568,26 @@ func (r *k8sScaling) syncIstioPolicy(ctx context.Context, sec istioclient.Client
// At present we only allow ingress from the controller
policy.Spec.Selector = &v1beta1.WorkloadSelector{MatchLabels: map[string]string{"app": name}}
policy.Spec.Action = istiosecmodel.AuthorizationPolicy_ALLOW
principals := []string{
"cluster.local/ns/" + r.namespace + "/sa/" + controllerDeployment.Spec.Template.Spec.ServiceAccountName,
"cluster.local/ns/" + r.namespace + "/sa/" + provisionerDeployment.Spec.Template.Spec.ServiceAccountName,
}
// TODO: fix hard coded service account names
if hasIngress {
// Allow ingress from the ingress gateway
principals = append(principals, "cluster.local/ns/"+r.namespace+"/sa/ftl-http-ingress")
}

if hasCron {
// Allow cron invocations
principals = append(principals, "cluster.local/ns/"+r.namespace+"/sa/ftl-cron")
}
policy.Spec.Rules = []*istiosecmodel.Rule{
{
From: []*istiosecmodel.Rule_From{
{
Source: &istiosecmodel.Source{
Principals: []string{
"cluster.local/ns/" + r.namespace + "/sa/" + controllerDeployment.Spec.Template.Spec.ServiceAccountName,
"cluster.local/ns/" + r.namespace + "/sa/" + provisionerDeployment.Spec.Template.Spec.ServiceAccountName,
},
Principals: principals,
},
},
},
Expand Down Expand Up @@ -692,25 +691,6 @@ func (r *k8sScaling) waitForDeploymentReady(ctx context.Context, key string, tim
}
}

func (r *k8sScaling) deleteOldDeployments(ctx context.Context, module string, deployment string) error {
logger := log.FromContext(ctx)
deploymentClient := r.client.AppsV1().Deployments(r.namespace)
deployments, err := deploymentClient.List(ctx, v1.ListOptions{LabelSelector: moduleLabel + "=" + module})
if err != nil {
return fmt.Errorf("failed to list deployments: %w", err)
}
for _, deploy := range deployments.Items {
if deploy.Name != deployment {
logger.Debugf("Deleting old deployment %s", deploy.Name)
err = deploymentClient.Delete(ctx, deploy.Name, v1.DeleteOptions{})
if err != nil {
logger.Errorf(err, "Failed to delete deployment %s", deploy.Name)
}
}
}
return nil
}

func extractTag(image string) (string, error) {
idx := strings.LastIndex(image, ":")
if idx == -1 {
Expand Down
Loading

0 comments on commit 966b746

Please sign in to comment.