Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Add quality of service logic to admin #107

Merged
merged 6 commits into from
Jul 22, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions flyteadmin_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,16 @@ cluster_resources:
valueFrom:
env: SHELL
refresh: 3s
qualityOfService:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a fan of this name - I just don't like the connotation of a low quality of service. Is it too late to change this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not used so technically i can go make a breaking change :) any suggestions instead? basic/baseline?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually we've had bad luck with supposedly no-op breaking changes so I would rather not (this is checked into flyteidl, unfortunately)

tierExecutionValues:
LOW:
queueingBudget: 30m
MEDIUM:
queueingBudget: 10m
HIGH:
queueingBudget: 1m
# By default UNDEFINED has no queueing budget when it is omitted from the configuration
defaultTiers:
development: LOW
staging: MEDIUM
# by default production has an UNDEFINED tier when it is omitted from the configuration
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/jinzhu/gorm v1.9.12
github.com/kelseyhightower/envconfig v1.4.0 // indirect
github.com/lib/pq v1.3.0
github.com/lyft/flyteidl v0.17.34
github.com/lyft/flyteidl v0.17.36
github.com/lyft/flytepropeller v0.2.64
github.com/lyft/flytestdlib v0.3.9
github.com/magiconair/properties v1.8.1
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -456,12 +456,17 @@ github.com/lyft/flyteidl v0.17.27/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1
github.com/lyft/flyteidl v0.17.32/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.34 h1:8ERT/8vY40dOPPJrdD8ossBb30WkvzUx/IAFMR/7+9U=
github.com/lyft/flyteidl v0.17.34/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.36 h1:frCsRL9h4aoe+VnQSUhWM9FqZXjCAXcmR96Jt3Y+qVE=
github.com/lyft/flyteidl v0.17.36/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteplugins v0.3.11/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA=
github.com/lyft/flyteplugins v0.3.21/go.mod h1:NDhdkOAn2q6p7YLq9a0/lxyS0dburoAEgipOY5TiO8A=
github.com/lyft/flyteplugins v0.3.33/go.mod h1:HHO6KC/2z77n9o9KM697YvSP85IWDe6jl6tAIrMLqWU=
github.com/lyft/flyteplugins v0.3.35 h1:9s2BrJ82RoTJa1Cy02vqQy+ajxS+d4MQAkuUFoaiCuQ=
github.com/lyft/flyteplugins v0.3.35/go.mod h1:Dk9rnPCbgR7rC9dNM49260TQ51TvRsljDBJ6uBjZ9ys=
github.com/lyft/flytepropeller v0.2.13 h1:RDFM8ps5bHWdHYK87NLyYX4iyF16ahkxerI0X9DZSfM=
github.com/lyft/flytepropeller v0.2.13/go.mod h1:QJ9txCCxHnzvwQoG4TbcldVs1in4+C943prLZVDmmIA=
github.com/lyft/flytepropeller v0.2.32-0.20200514160943-ec7a5eee620b h1:Yn0PCPtKNV8jV1yZBc3812arIEk92WjtsvI39OARFgE=
github.com/lyft/flytepropeller v0.2.32-0.20200514160943-ec7a5eee620b/go.mod h1:Vkn/YNyMr6NEbOSfKF4xcsvrT188WbcHmUEi73GbSjE=
github.com/lyft/flytepropeller v0.2.62/go.mod h1:O0fezjbTCsbKiFoiRAbCwdZsXdYOeE56xnW4oc9Vj5c=
github.com/lyft/flytepropeller v0.2.64-0.20200623171054-601fca02a398 h1:KSYZKkJdGiMOk/rFAbG1HAtDI/w1WS7kWmO/RFa12mA=
github.com/lyft/flytepropeller v0.2.64-0.20200623171054-601fca02a398/go.mod h1:TY3KCHDtOtuj1Z55sScGb2ICCpH5MtUpeHWuYE7qzQ8=
Expand Down
109 changes: 67 additions & 42 deletions pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,20 @@ type executionUserMetrics struct {
}

type ExecutionManager struct {
db repositories.RepositoryInterface
config runtimeInterfaces.Configuration
storageClient *storage.DataStore
workflowExecutor workflowengineInterfaces.Executor
queueAllocator executions.QueueAllocator
_clock clock.Clock
systemMetrics executionSystemMetrics
userMetrics executionUserMetrics
notificationClient notificationInterfaces.Publisher
urlData dataInterfaces.RemoteURLInterface
workflowManager interfaces.WorkflowInterface
namedEntityManager interfaces.NamedEntityInterface
db repositories.RepositoryInterface
config runtimeInterfaces.Configuration
storageClient *storage.DataStore
workflowExecutor workflowengineInterfaces.Executor
queueAllocator executions.QueueAllocator
_clock clock.Clock
systemMetrics executionSystemMetrics
userMetrics executionUserMetrics
notificationClient notificationInterfaces.Publisher
urlData dataInterfaces.RemoteURLInterface
workflowManager interfaces.WorkflowInterface
namedEntityManager interfaces.NamedEntityInterface
resourceManager interfaces.ResourceInterface
qualityOfServiceAllocator executions.QualityOfServiceAllocator
}

func getExecutionContext(ctx context.Context, id *core.WorkflowExecutionIdentifier) context.Context {
Expand Down Expand Up @@ -264,9 +266,7 @@ func assignResourcesIfUnset(ctx context.Context, identifier *core.Identifier,
// Note: The system will assign a system-default value for request but for limit it will deduce it from the request
// itself => Limit := Min([Some-Multiplier X Request], System-Max). For now we are using a multiplier of 1. In
// general we recommend the users to set limits close to requests for more predictability in the system.
func setCompiledTaskDefaults(ctx context.Context, config runtimeInterfaces.Configuration,
task *core.CompiledTask, db repositories.RepositoryInterface, workflowName string) {
resourceManager := resources.NewResourceManager(db, config.ApplicationConfiguration())
func (m *ExecutionManager) setCompiledTaskDefaults(ctx context.Context, task *core.CompiledTask, workflowName string) {
if task == nil {
logger.Warningf(ctx, "Can't set default resources for nil task.")
return
Expand All @@ -276,7 +276,7 @@ func setCompiledTaskDefaults(ctx context.Context, config runtimeInterfaces.Confi
logger.Debugf(ctx, "Not setting default resources for task [%+v], no container resources found to check", task)
return
}
resource, err := resourceManager.GetResource(ctx, interfaces.ResourceRequest{
resource, err := m.resourceManager.GetResource(ctx, interfaces.ResourceRequest{
Project: task.Template.Id.Project,
Domain: task.Template.Id.Domain,
Workflow: workflowName,
Expand All @@ -293,7 +293,7 @@ func setCompiledTaskDefaults(ctx context.Context, config runtimeInterfaces.Confi
taskResourceSpec = resource.Attributes.GetTaskResourceAttributes().Defaults
}
task.Template.GetContainer().Resources.Requests = assignResourcesIfUnset(
ctx, task.Template.Id, config.TaskResourceConfiguration().GetDefaults(), task.Template.GetContainer().Resources.Requests,
ctx, task.Template.Id, m.config.TaskResourceConfiguration().GetDefaults(), task.Template.GetContainer().Resources.Requests,
taskResourceSpec)

logger.Debugf(ctx, "Assigning task resource limits for [%+v]", task.Template.Id)
Expand Down Expand Up @@ -371,7 +371,7 @@ func (m *ExecutionManager) launchSingleTaskExecution(

// Dynamically assign task resource defaults.
for _, task := range workflow.Closure.CompiledWorkflow.Tasks {
setCompiledTaskDefaults(ctx, m.config, task, m.db, name)
m.setCompiledTaskDefaults(ctx, task, name)
}

// Dynamically assign execution queues.
Expand All @@ -385,13 +385,23 @@ func (m *ExecutionManager) launchSingleTaskExecution(
if err != nil {
return nil, nil, err
}
qualityOfService, err := m.qualityOfServiceAllocator.GetQualityOfService(ctx, executions.GetQualityOfServiceInput{
Workflow: &workflow,
LaunchPlan: launchPlan,
ExecutionCreateRequest: &request,
})
if err != nil {
logger.Errorf(ctx, "Failed to get quality of service for [%+v] with error: %v", workflowExecutionID, err)
return nil, nil, err
}
executeTaskInputs := workflowengineInterfaces.ExecuteTaskInput{
ExecutionID: &workflowExecutionID,
WfClosure: *workflow.Closure.CompiledWorkflow,
Inputs: request.Inputs,
ReferenceName: taskIdentifier.Name,
AcceptedAt: requestedAt,
Auth: request.Spec.AuthRole,
ExecutionID: &workflowExecutionID,
WfClosure: *workflow.Closure.CompiledWorkflow,
Inputs: request.Inputs,
ReferenceName: taskIdentifier.Name,
AcceptedAt: requestedAt,
Auth: request.Spec.AuthRole,
QueueingBudget: qualityOfService.QueuingBudget,
}
if request.Spec.Labels != nil {
executeTaskInputs.Labels = request.Spec.Labels.Values
Expand Down Expand Up @@ -523,7 +533,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(

// Dynamically assign task resource defaults.
for _, task := range workflow.Closure.CompiledWorkflow.Tasks {
setCompiledTaskDefaults(ctx, m.config, task, m.db, name)
m.setCompiledTaskDefaults(ctx, task, name)
}

// Dynamically assign execution queues.
Expand All @@ -538,13 +548,24 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
return nil, nil, err
}

qualityOfService, err := m.qualityOfServiceAllocator.GetQualityOfService(ctx, executions.GetQualityOfServiceInput{
Workflow: workflow,
LaunchPlan: launchPlan,
ExecutionCreateRequest: &request,
})
if err != nil {
logger.Errorf(ctx, "Failed to get quality of service for [%+v] with error: %v", workflowExecutionID, err)
return nil, nil, err
}

// TODO: Reduce CRD size and use offloaded input URI to blob store instead.
executeWorkflowInputs := workflowengineInterfaces.ExecuteWorkflowInput{
ExecutionID: &workflowExecutionID,
WfClosure: *workflow.Closure.CompiledWorkflow,
Inputs: executionInputs,
Reference: *launchPlan,
AcceptedAt: requestedAt,
ExecutionID: &workflowExecutionID,
WfClosure: *workflow.Closure.CompiledWorkflow,
Inputs: executionInputs,
Reference: *launchPlan,
AcceptedAt: requestedAt,
QueueingBudget: qualityOfService.QueuingBudget,
}
err = m.addLabelsAndAnnotations(request.Spec, &executeWorkflowInputs)
if err != nil {
Expand Down Expand Up @@ -1214,18 +1235,22 @@ func NewExecutionManager(
ScheduledExecutionDelays: make(map[string]map[string]*promutils.StopWatch),
WorkflowExecutionDurations: make(map[string]map[string]*promutils.StopWatch),
}

resourceManager := resources.NewResourceManager(db, config.ApplicationConfiguration())
return &ExecutionManager{
db: db,
config: config,
storageClient: storageClient,
workflowExecutor: workflowExecutor,
queueAllocator: queueAllocator,
_clock: clock.New(),
systemMetrics: systemMetrics,
userMetrics: userMetrics,
notificationClient: publisher,
urlData: urlData,
workflowManager: workflowManager,
namedEntityManager: namedEntityManager,
db: db,
config: config,
storageClient: storageClient,
workflowExecutor: workflowExecutor,
queueAllocator: queueAllocator,
_clock: clock.New(),
systemMetrics: systemMetrics,
userMetrics: userMetrics,
notificationClient: publisher,
urlData: urlData,
workflowManager: workflowManager,
namedEntityManager: namedEntityManager,
resourceManager: resourceManager,
qualityOfServiceAllocator: executions.NewQualityOfServiceAllocator(config, resourceManager),
}
}
32 changes: 29 additions & 3 deletions pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,32 @@ func TestCreateExecution(t *testing.T) {
"annotation3": "3",
"annotation4": "4",
}, inputs.Annotations)
assert.EqualValues(t, 10*time.Minute, inputs.QueueingBudget)
return &workflowengineInterfaces.ExecutionInfo{
Cluster: testCluster,
}, nil
})
qosProvider := runtimeMocks.NewMockQualityOfServiceProvider()
qosProvider.(*runtimeMocks.MockQualityOfServiceProvider).TierExecutionValues = map[core.QualityOfService_Tier]core.QualityOfServiceSpec{
core.QualityOfService_HIGH: {
QueueingBudget: ptypes.DurationProto(10 * time.Minute),
},
core.QualityOfService_MEDIUM: {
QueueingBudget: ptypes.DurationProto(20 * time.Minute),
},
core.QualityOfService_LOW: {
QueueingBudget: ptypes.DurationProto(30 * time.Minute),
},
}

qosProvider.(*runtimeMocks.MockQualityOfServiceProvider).DefaultTiers = map[string]core.QualityOfService_Tier{
"domain": core.QualityOfService_HIGH,
}

mockConfig := getMockExecutionsConfigProvider()
mockConfig.(*runtimeMocks.MockConfigurationProvider).AddQualityOfServiceConfiguration(qosProvider)
execManager := NewExecutionManager(
repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockExecutor,
repository, mockConfig, getMockStorageForExecTest(context.Background()), mockExecutor,
mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
request := testutils.GetExecutionRequest()
request.Spec.Metadata = &admin.ExecutionMetadata{
Expand Down Expand Up @@ -2449,7 +2469,10 @@ func TestSetDefaults(t *testing.T) {
mockConfig := runtimeMocks.NewMockConfigurationProvider(
testutils.GetApplicationConfigWithDefaultDomains(), nil, nil, &taskConfig,
runtimeMocks.NewMockWhitelistConfiguration(), nil)
setCompiledTaskDefaults(context.Background(), mockConfig, task, repositoryMocks.NewMockRepository(), "workflow")
execManager := NewExecutionManager(
repositoryMocks.NewMockRepository(), mockConfig, getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(),
mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
execManager.(*ExecutionManager).setCompiledTaskDefaults(context.Background(), task, "workflow")
assert.True(t, proto.Equal(
&core.Container{
Resources: &core.Resources{
Expand Down Expand Up @@ -2515,7 +2538,10 @@ func TestSetDefaults_MissingDefaults(t *testing.T) {
mockConfig := runtimeMocks.NewMockConfigurationProvider(
testutils.GetApplicationConfigWithDefaultDomains(), nil, nil, &taskConfig,
runtimeMocks.NewMockWhitelistConfiguration(), nil)
setCompiledTaskDefaults(context.Background(), mockConfig, task, repositoryMocks.NewMockRepository(), "workflow")
execManager := NewExecutionManager(
repositoryMocks.NewMockRepository(), mockConfig, getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(),
mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
execManager.(*ExecutionManager).setCompiledTaskDefaults(context.Background(), task, "workflow")
assert.True(t, proto.Equal(
&core.Container{
Resources: &core.Resources{
Expand Down
Loading