Skip to content

Commit

Permalink
feat: extract Execution Worker (#5923)
Browse files Browse the repository at this point in the history
* chore: delete unused
* chore: delete subscription checker from API instance
* fix: simplify applying the global template
* chore: extract cloud/storage machines in Test Workflow executor
* chore: rename `id` variable to `executionId`
* chore: extract workflow machine in Test Workflow executor
* chore: use the earliest timestamp for execution schedulement
* chore: extract resource machine
* chore: make schedule time UTC
* chore: extract helper for validating the TestWorkflow
* chore: serialize propagated execution tags as JSON
* chore: a bit of grouping
* chore: clean namespace from resolved workflow too
* chore: detach secret's batch from namespace
* chore: add some comments
* chore: move configuration resolution higher
* fix: determine execution namespace after the templates are applied
* chore: avoid unnecessary variables
* chore: avoid pre-validating the workflow
* chore: move place where the resolved workflow is preserved
* chore: move place where the default service account is applied
* feat: always create TestWorkflowExecution before deployment
* chore: decouple a bit creating TestWorkflow execution
* chore: split a bit logic of updating execution
* chore: organize better the internal configuration passed to Test Workflow execution
  - use single annotation instead of multiple duplicated environment variables
  - group the variables nicely
  - handle that recursively
* chore: reorder configs
* chore: rename RuntimeConfig to WorkerConfig
* feat: add `groupId` to the TestWorkflowExecution's model
* chore: add todo
* feat: add basic ExecutionWorker with Execute() method only
* chore: pass execution worker to the Test Workflow Executor
* fix: delete unused variables for Test Workflow Executor, return namespace from worker
* feat: implement Notifications(), Destroy(), Pause() and Resume() for ExecutionWorker
* chore: use the ExecutionWorker more
* chore: avoid using testworkflowcontroller in Executor
* chore: rename labels/annotations
* chore: add todo
* chore: extract LogsReader and NotificationsWatcher to separate files
* feat: add LRU cache for last namespaces and pod IPs
* feat: add ResumeMany() method to the ExecutionWorker, use ExecutionWorker for parallel operations
* feat: expose containers readiness in status notifications
* feat: use ExecutionWorker for services too
* chore: use ExecutionWorker for Test Triggers
* feat: reuse Test Workflow controllers
* chore: move Execution Worker to pkg/testworkflows
* chore: move TestWorkflowController to ExecutionWorker
* fix: handle error in the logs watcher
* fix: list resources correctly in Execution Worker
* chore: delete some unused code
* chore: delete unused machine resolution
* fix: use pro context for determining organization/environment ID in Test Workflow executor
* fix: handle better unique execution names
* fix: unit/integration tests
* chore: delete unused code
* chore: avoid unnecessary fallback
* chore: clean up worker interface a bit
* chore: separate functions for creating a service and execution
* chore: unify Execute() and Service() a bit
* chore: reorder a bit
* feat: persist scheduledAt in the Job/Pod annotations
* fix: CLI for initialization error watch
* chore: isolate Kubernetes Execution Worker, so we can have another engine
* feat: add Abort() option for Execution Worker, that may be useful later for different drivers than Kubernetes
  • Loading branch information
rangoo94 authored Oct 17, 2024
1 parent c0d0afd commit 0b8bae9
Show file tree
Hide file tree
Showing 94 changed files with 3,316 additions and 1,630 deletions.
5 changes: 5 additions & 0 deletions api/v1/testkube.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8060,6 +8060,11 @@ components:
description: unique execution identifier
format: bson objectId
example: "62f395e004109209b50edfc1"
groupId:
type: string
description: identifier for group of correlated executions
format: bson objectId
example: "62f395e004109209b50edfc1"
name:
type: string
description: execution name
Expand Down
72 changes: 59 additions & 13 deletions cmd/api-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@ import (

corev1 "k8s.io/api/core/v1"

"github.com/kubeshop/testkube/pkg/cache"

"github.com/nats-io/nats.go"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/kubeshop/testkube/pkg/cache"
"github.com/kubeshop/testkube/pkg/testworkflows/executionworker"
"github.com/kubeshop/testkube/pkg/testworkflows/executionworker/kubernetesworker"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowconfig"

executorsclientv1 "github.com/kubeshop/testkube-operator/pkg/client/executors/v1"
cloudtestworkflow "github.com/kubeshop/testkube/pkg/cloud/data/testworkflow"
"github.com/kubeshop/testkube/pkg/imageinspector"
Expand Down Expand Up @@ -585,27 +588,70 @@ func main() {
if mode == common.ModeAgent {
testWorkflowProcessor = presets.NewPro(inspector)
}

// Build internal execution worker
namespacesConfig := map[string]kubernetesworker.NamespaceConfig{}
for n, s := range serviceAccountNames {
namespacesConfig[n] = kubernetesworker.NamespaceConfig{DefaultServiceAccountName: s}
}
cloudUrl := cfg.TestkubeProURL
cloudApiKey := cfg.TestkubeProAPIKey
objectStorageConfig := testworkflowconfig.ObjectStorageConfig{}
if cloudApiKey == "" {
cloudUrl = ""
objectStorageConfig = testworkflowconfig.ObjectStorageConfig{
Endpoint: cfg.StorageEndpoint,
AccessKeyID: cfg.StorageAccessKeyID,
SecretAccessKey: cfg.StorageSecretAccessKey,
Region: cfg.StorageRegion,
Token: cfg.StorageToken,
Bucket: cfg.StorageBucket,
Ssl: cfg.StorageSSL,
SkipVerify: cfg.StorageSkipVerify,
CertFile: cfg.StorageCertFile,
KeyFile: cfg.StorageKeyFile,
CAFile: cfg.StorageCAFile,
}
}
executionWorker := executionworker.NewKubernetes(clientset, testWorkflowProcessor, kubernetesworker.Config{
Cluster: kubernetesworker.ClusterConfig{
Id: clusterId,
DefaultNamespace: cfg.TestkubeNamespace,
DefaultRegistry: cfg.TestkubeRegistry,
Namespaces: namespacesConfig,
},
ImageInspector: kubernetesworker.ImageInspectorConfig{
CacheEnabled: cfg.EnableImageDataPersistentCache,
CacheKey: cfg.ImageDataPersistentCacheKey,
CacheTTL: cfg.TestkubeImageCredentialsCacheTTL,
},
Connection: testworkflowconfig.WorkerConnectionConfig{
Url: cloudUrl,
ApiKey: cloudApiKey,
SkipVerify: cfg.TestkubeProSkipVerify,
TlsInsecure: cfg.TestkubeProTLSInsecure,

// TODO: Prepare ControlPlane interface for OSS, so we may unify the communication
LocalApiUrl: fmt.Sprintf("http://%s:%s", cfg.APIServerFullname, cfg.APIServerPort),
ObjectStorage: objectStorageConfig,
},
})

testWorkflowExecutor := testworkflowexecutor.New(
eventsEmitter,
executionWorker,
clientset,
testWorkflowResultsRepository,
testWorkflowOutputRepository,
testWorkflowTemplatesClient,
testWorkflowProcessor,
configMapConfig,
testWorkflowTemplatesClient,
testWorkflowExecutionsClient,
testWorkflowsClient,
metrics,
secretManager,
serviceAccountNames,
cfg.GlobalWorkflowTemplateName,
cfg.TestkubeNamespace,
"http://"+cfg.APIServerFullname+":"+cfg.APIServerPort,
cfg.TestkubeRegistry,
cfg.EnableImageDataPersistentCache,
cfg.ImageDataPersistentCacheKey,
cfg.TestkubeDashboardURI,
clusterId,
&proContext,
)

go testWorkflowExecutor.Recover(context.Background())
Expand Down Expand Up @@ -633,10 +679,10 @@ func main() {
executor,
containerExecutor,
testWorkflowExecutor,
executionWorker,
metrics,
sched,
slackLoader,
storageClient,
cfg.GraphqlPort,
artifactStorage,
templatesClient,
Expand All @@ -648,7 +694,6 @@ func main() {
features,
logsStream,
logGrpcClient,
subscriptionChecker,
serviceAccountNames,
envs,
cfg.TestkubeDockerImageVersion,
Expand Down Expand Up @@ -701,6 +746,7 @@ func main() {
executor,
eventBus,
metrics,
executionWorker,
testWorkflowExecutor,
testWorkflowResultsRepository,
triggers.WithHostnameIdentifier(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func printPrettyOutput(ui *ui.UI, execution testkube.TestWorkflowExecution) {
}
if !execution.Result.FinishedAt.IsZero() {
ui.Warn("Finished at: ", execution.Result.FinishedAt.String())
ui.Warn("Duration: ", execution.Result.FinishedAt.Sub(execution.Result.QueuedAt).String())
ui.Warn("Duration: ", execution.Result.Duration)
}
}
}
Expand Down
18 changes: 10 additions & 8 deletions cmd/kubectl-testkube/commands/testworkflows/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,16 @@ func NewRunTestWorkflowCmd() *cobra.Command {
var exitCode = 0
if outputPretty {
ui.NL()
if watchEnabled {
exitCode = uiWatch(execution, client)
ui.NL()
if downloadArtifactsEnabled {
tests.DownloadTestWorkflowArtifacts(execution.Id, downloadDir, format, masks, client, outputPretty)
if !execution.FailedToInitialize() {
if watchEnabled {
exitCode = uiWatch(execution, client)
ui.NL()
if downloadArtifactsEnabled {
tests.DownloadTestWorkflowArtifacts(execution.Id, downloadDir, format, masks, client, outputPretty)
}
} else {
uiShellWatchExecution(execution.Id)
}
} else {
uiShellWatchExecution(execution.Id)
}

execution, err = client.GetTestWorkflowExecution(execution.Id)
Expand Down Expand Up @@ -291,7 +293,7 @@ func trimTimestamp(line string) string {
if strings.Index(line, "T") == 10 {
idx := strings.Index(line, " ")
if len(line) >= idx {
return line[idx:]
return line[idx+1:]
}
}
return line
Expand Down
27 changes: 12 additions & 15 deletions cmd/tcl/testworkflow-toolkit/commands/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/kubeshop/testkube/cmd/testworkflow-init/data"
"github.com/kubeshop/testkube/cmd/testworkflow-init/instructions"
"github.com/kubeshop/testkube/cmd/testworkflow-toolkit/env"
"github.com/kubeshop/testkube/cmd/testworkflow-toolkit/env/config"
"github.com/kubeshop/testkube/cmd/testworkflow-toolkit/transfer"
"github.com/kubeshop/testkube/internal/common"
"github.com/kubeshop/testkube/pkg/api/v1/client"
Expand All @@ -32,7 +33,6 @@ import (
"github.com/kubeshop/testkube/pkg/mapper/testworkflows"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/constants"
"github.com/kubeshop/testkube/pkg/ui"
"github.com/kubeshop/testkube/pkg/utils"
)

const (
Expand Down Expand Up @@ -73,7 +73,7 @@ func buildTestExecution(test testworkflowsv1.StepExecuteTest, async bool) (func(
exec, err := c.ExecuteTest(test.Name, test.ExecutionRequest.Name, client.ExecuteTestOptions{
RunningContext: &testkube.RunningContext{
Type_: "testworkflow",
Context: fmt.Sprintf("%s/executions/%s", env.WorkflowName(), env.ExecutionId()),
Context: fmt.Sprintf("%s/executions/%s", config.WorkflowName(), config.ExecutionId()),
},
IsVariablesFileUploaded: test.ExecutionRequest.IsVariablesFileUploaded,
ExecutionLabels: test.ExecutionRequest.ExecutionLabels,
Expand All @@ -94,15 +94,15 @@ func buildTestExecution(test testworkflowsv1.StepExecuteTest, async bool) (func(
EnvConfigMaps: common.MapSlice(test.ExecutionRequest.EnvConfigMaps, testworkflows.MapTestEnvReferenceKubeToAPI),
EnvSecrets: common.MapSlice(test.ExecutionRequest.EnvSecrets, testworkflows.MapTestEnvReferenceKubeToAPI),
ExecutionNamespace: test.ExecutionRequest.ExecutionNamespace,
DisableWebhooks: env.ExecutionDisableWebhooks(),
DisableWebhooks: config.ExecutionDisableWebhooks(),
})
execName := exec.Name
if err != nil {
ui.Errf("failed to execute test: %s: %s", test.Name, err)
return
}

instructions.PrintOutput(env.Ref(), "test-start", &testExecutionDetails{
instructions.PrintOutput(config.Ref(), "test-start", &testExecutionDetails{
Id: exec.Id,
Name: exec.Name,
TestName: exec.TestName,
Expand Down Expand Up @@ -136,7 +136,7 @@ func buildTestExecution(test testworkflowsv1.StepExecuteTest, async bool) (func(
break loop
}
if prevStatus != status {
instructions.PrintOutput(env.Ref(), "test-status", &executionResult{Id: exec.Id, Status: string(status)})
instructions.PrintOutput(config.Ref(), "test-status", &executionResult{Id: exec.Id, Status: string(status)})
}
prevStatus = status
}
Expand All @@ -150,7 +150,7 @@ func buildTestExecution(test testworkflowsv1.StepExecuteTest, async bool) (func(
color = ui.Red
}

instructions.PrintOutput(env.Ref(), "test-end", &executionResult{Id: exec.Id, Status: string(status)})
instructions.PrintOutput(config.Ref(), "test-end", &executionResult{Id: exec.Id, Status: string(status)})
fmt.Printf("%s • %s\n", color(execName), string(status))
return
}, nil
Expand All @@ -160,17 +160,14 @@ func buildWorkflowExecution(workflow testworkflowsv1.StepExecuteWorkflow, async
return func() (err error) {
c := env.Testkube()

tags, err := utils.DecodeEnvVarToStringMap(env.ExecutionTags())
if err != nil {
ui.Errf("failed to decode tags: %s: %s", workflow.Name, err.Error())
}
tags := config.ExecutionTags()

var exec testkube.TestWorkflowExecution
for i := 0; i < CreateExecutionRetryOnFailureMaxAttempts; i++ {
exec, err = c.ExecuteTestWorkflow(workflow.Name, testkube.TestWorkflowExecutionRequest{
Name: workflow.ExecutionName,
Config: testworkflows.MapConfigValueKubeToAPI(workflow.Config),
DisableWebhooks: env.ExecutionDisableWebhooks(),
DisableWebhooks: config.ExecutionDisableWebhooks(),
Tags: tags,
})
if err == nil {
Expand All @@ -188,7 +185,7 @@ func buildWorkflowExecution(workflow testworkflowsv1.StepExecuteWorkflow, async
}
execName := exec.Name

instructions.PrintOutput(env.Ref(), "testworkflow-start", &testWorkflowExecutionDetails{
instructions.PrintOutput(config.Ref(), "testworkflow-start", &testWorkflowExecutionDetails{
Id: exec.Id,
Name: exec.Name,
TestWorkflowName: exec.Workflow.Name,
Expand Down Expand Up @@ -233,7 +230,7 @@ func buildWorkflowExecution(workflow testworkflowsv1.StepExecuteWorkflow, async
break loop
}
if prevStatus != status {
instructions.PrintOutput(env.Ref(), "testworkflow-status", &executionResult{Id: exec.Id, Status: string(status)})
instructions.PrintOutput(config.Ref(), "testworkflow-status", &executionResult{Id: exec.Id, Status: string(status)})
}
prevStatus = status
}
Expand All @@ -247,7 +244,7 @@ func buildWorkflowExecution(workflow testworkflowsv1.StepExecuteWorkflow, async
color = ui.Red
}

instructions.PrintOutput(env.Ref(), "testworkflow-end", &executionResult{Id: exec.Id, Status: string(status)})
instructions.PrintOutput(config.Ref(), "testworkflow-end", &executionResult{Id: exec.Id, Status: string(status)})
fmt.Printf("%s • %s\n", color(execName), string(status))
return
}, nil
Expand Down Expand Up @@ -311,7 +308,7 @@ func NewExecuteCmd() *cobra.Command {
baseMachine := data.GetBaseTestWorkflowMachine()

// Initialize transfer server
transferSrv := transfer.NewServer(constants.DefaultTransferDirPath, env.IP(), constants.DefaultTransferPort)
transferSrv := transfer.NewServer(constants.DefaultTransferDirPath, config.IP(), constants.DefaultTransferPort)

// Build operations to run
operations := make([]func() error, 0)
Expand Down
38 changes: 24 additions & 14 deletions cmd/tcl/testworkflow-toolkit/commands/kill.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,15 @@ import (
"strings"

"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

commontcl "github.com/kubeshop/testkube/cmd/tcl/testworkflow-toolkit/common"
"github.com/kubeshop/testkube/cmd/tcl/testworkflow-toolkit/spawn"
"github.com/kubeshop/testkube/cmd/testworkflow-init/data"
"github.com/kubeshop/testkube/cmd/testworkflow-init/instructions"
"github.com/kubeshop/testkube/cmd/testworkflow-toolkit/artifacts"
"github.com/kubeshop/testkube/cmd/testworkflow-toolkit/env"
"github.com/kubeshop/testkube/cmd/testworkflow-toolkit/env/config"
"github.com/kubeshop/testkube/pkg/expressions"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowcontroller"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/constants"
"github.com/kubeshop/testkube/pkg/testworkflows/executionworker/executionworkertypes"
"github.com/kubeshop/testkube/pkg/ui"
)

Expand All @@ -40,7 +38,6 @@ func NewKillCmd() *cobra.Command {
Run: func(cmd *cobra.Command, args []string) {
machine := expressions.CombinedMachines(data.AliasMachine, data.GetBaseTestWorkflowMachine())
groupRef := args[0]
clientSet := env.Kubernetes()

conditions := make(map[string]expressions.Expression)
for _, l := range logs {
Expand All @@ -57,16 +54,27 @@ func NewKillCmd() *cobra.Command {
}

// Fetch the services when needed
namespace := ""
if len(logs) > 0 {
jobs, err := clientSet.BatchV1().Jobs(env.Namespace()).List(context.Background(), metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", constants.GroupIdLabelName, groupRef),
items, err := spawn.ExecutionWorker().List(context.Background(), executionworkertypes.ListOptions{
GroupId: groupRef,
})
ui.ExitOnError("listing service resources", err)
ui.ExitOnError("listing service instances", err)

if len(items) > 0 {
namespace = items[0].Namespace
for _, item := range items {
if item.Namespace != namespace {
namespace = ""
break
}
}
}

services := make(map[string]int64)
ids := make([]string, 0)
for _, job := range jobs.Items {
service, index := spawn.GetServiceByResourceId(job.Name)
for _, item := range items {
service, index := spawn.GetServiceByResourceId(item.Resource.Id)
if _, ok := conditions[service]; !ok {
continue
}
Expand All @@ -84,7 +92,7 @@ func NewKillCmd() *cobra.Command {
fmt.Printf("warning: service '%s': could not resolve condition '%s': %s", service, log.String(), err.Error())
} else if v, _ := log.BoolValue(); v {
services[service]++
ids = append(ids, job.Name)
ids = append(ids, item.Resource.Id)
}
}

Expand All @@ -103,17 +111,19 @@ func NewKillCmd() *cobra.Command {
}
log := spawn.CreateLogger(service, "", index, count)

logsFilePath, err := spawn.SaveLogs(context.Background(), clientSet, storage, env.Namespace(), id, service+"/", index)
logsFilePath, err := spawn.SaveLogs(context.Background(), storage, config.Namespace(), id, service+"/", index)
if err == nil {
instructions.PrintOutput(env.Ref(), "service", ServiceInfo{Group: groupRef, Name: service, Index: index, Logs: storage.FullPath(logsFilePath)})
instructions.PrintOutput(config.Ref(), "service", ServiceInfo{Group: groupRef, Name: service, Index: index, Logs: storage.FullPath(logsFilePath)})
log("saved logs")
} else {
log("warning", "problem saving the logs", err.Error())
}
}
}

err := testworkflowcontroller.CleanupGroup(context.Background(), clientSet, env.Namespace(), groupRef)
err := spawn.ExecutionWorker().DestroyGroup(context.Background(), groupRef, executionworkertypes.DestroyOptions{
Namespace: namespace,
})
ui.ExitOnError("cleaning up resources", err)
},
}
Expand Down
Loading

0 comments on commit 0b8bae9

Please sign in to comment.