From 9bf19399c0ae245e099bd115d700a862d633e28f Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Wed, 8 Jul 2020 16:42:11 -0700 Subject: [PATCH 1/6] quality of service --- flyteadmin_config.yaml | 13 ++ go.mod | 4 +- go.sum | 3 + pkg/manager/impl/execution_manager.go | 109 +++++---- pkg/manager/impl/execution_manager_test.go | 10 +- .../impl/executions/quality_of_service.go | 148 ++++++++++++ .../executions/quality_of_service_test.go | 212 ++++++++++++++++++ pkg/manager/mocks/resource.go | 7 +- pkg/runtime/configuration_provider.go | 6 + pkg/runtime/interfaces/configuration.go | 1 + .../quality_of_service_configuration.go | 22 ++ .../mocks/mock_configuration_provider.go | 10 + .../mocks/mock_quality_of_service_provider.go | 26 +++ pkg/runtime/quality_of_service_provider.go | 60 +++++ pkg/workflowengine/impl/propeller_executor.go | 12 + pkg/workflowengine/interfaces/executor.go | 32 +-- 16 files changed, 614 insertions(+), 61 deletions(-) create mode 100644 pkg/manager/impl/executions/quality_of_service.go create mode 100644 pkg/manager/impl/executions/quality_of_service_test.go create mode 100644 pkg/runtime/interfaces/quality_of_service_configuration.go create mode 100644 pkg/runtime/mocks/mock_quality_of_service_provider.go create mode 100644 pkg/runtime/quality_of_service_provider.go diff --git a/flyteadmin_config.yaml b/flyteadmin_config.yaml index 69e27873a..11749e42d 100644 --- a/flyteadmin_config.yaml +++ b/flyteadmin_config.yaml @@ -159,3 +159,16 @@ cluster_resources: valueFrom: env: SHELL refresh: 3s +qualityOfService: + tierExecutionValues: + LOW: + queueingBudget: 30m + MEDIUM: + queueingBudget: 10m + HIGH: + queueingBudget: 1m + # By default UNDEFINED has no queueing budget when it is omitted from the configuration + defaultTiers: + development: LOW + staging: MEDIUM + # by default production has an UNDEFINED tier when it is omitted from the configuration diff --git a/go.mod b/go.mod index 2841dcfbe..c5a69457a 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/jinzhu/gorm v1.9.12 github.com/kelseyhightower/envconfig v1.4.0 // indirect github.com/lib/pq v1.3.0 - github.com/lyft/flyteidl v0.17.34 + github.com/lyft/flyteidl v0.17.36 github.com/lyft/flytepropeller v0.2.64 github.com/lyft/flytestdlib v0.3.9 github.com/magiconair/properties v1.8.1 @@ -53,3 +53,5 @@ replace ( k8s.io/apimachinery => github.com/lyft/apimachinery v0.0.0-20191031200210-047e3ea32d7f k8s.io/client-go => k8s.io/client-go v0.0.0-20191016111102-bec269661e48 ) + +replace github.com/lyft/flyteidl => ../flyteidl/ diff --git a/go.sum b/go.sum index 362fe33f7..91faa36c1 100644 --- a/go.sum +++ b/go.sum @@ -457,11 +457,14 @@ github.com/lyft/flyteidl v0.17.32/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1 github.com/lyft/flyteidl v0.17.34 h1:8ERT/8vY40dOPPJrdD8ossBb30WkvzUx/IAFMR/7+9U= github.com/lyft/flyteidl v0.17.34/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= github.com/lyft/flyteplugins v0.3.11/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA= +github.com/lyft/flyteplugins v0.3.21/go.mod h1:NDhdkOAn2q6p7YLq9a0/lxyS0dburoAEgipOY5TiO8A= github.com/lyft/flyteplugins v0.3.33/go.mod h1:HHO6KC/2z77n9o9KM697YvSP85IWDe6jl6tAIrMLqWU= github.com/lyft/flyteplugins v0.3.35 h1:9s2BrJ82RoTJa1Cy02vqQy+ajxS+d4MQAkuUFoaiCuQ= github.com/lyft/flyteplugins v0.3.35/go.mod h1:Dk9rnPCbgR7rC9dNM49260TQ51TvRsljDBJ6uBjZ9ys= github.com/lyft/flytepropeller v0.2.13 h1:RDFM8ps5bHWdHYK87NLyYX4iyF16ahkxerI0X9DZSfM= github.com/lyft/flytepropeller v0.2.13/go.mod h1:QJ9txCCxHnzvwQoG4TbcldVs1in4+C943prLZVDmmIA= +github.com/lyft/flytepropeller v0.2.32-0.20200514160943-ec7a5eee620b h1:Yn0PCPtKNV8jV1yZBc3812arIEk92WjtsvI39OARFgE= +github.com/lyft/flytepropeller v0.2.32-0.20200514160943-ec7a5eee620b/go.mod h1:Vkn/YNyMr6NEbOSfKF4xcsvrT188WbcHmUEi73GbSjE= github.com/lyft/flytepropeller v0.2.62/go.mod h1:O0fezjbTCsbKiFoiRAbCwdZsXdYOeE56xnW4oc9Vj5c= github.com/lyft/flytepropeller v0.2.64-0.20200623171054-601fca02a398 h1:KSYZKkJdGiMOk/rFAbG1HAtDI/w1WS7kWmO/RFa12mA= github.com/lyft/flytepropeller v0.2.64-0.20200623171054-601fca02a398/go.mod h1:TY3KCHDtOtuj1Z55sScGb2ICCpH5MtUpeHWuYE7qzQ8= diff --git a/pkg/manager/impl/execution_manager.go b/pkg/manager/impl/execution_manager.go index 43d745d2b..bafe0ae17 100644 --- a/pkg/manager/impl/execution_manager.go +++ b/pkg/manager/impl/execution_manager.go @@ -71,18 +71,20 @@ type executionUserMetrics struct { } type ExecutionManager struct { - db repositories.RepositoryInterface - config runtimeInterfaces.Configuration - storageClient *storage.DataStore - workflowExecutor workflowengineInterfaces.Executor - queueAllocator executions.QueueAllocator - _clock clock.Clock - systemMetrics executionSystemMetrics - userMetrics executionUserMetrics - notificationClient notificationInterfaces.Publisher - urlData dataInterfaces.RemoteURLInterface - workflowManager interfaces.WorkflowInterface - namedEntityManager interfaces.NamedEntityInterface + db repositories.RepositoryInterface + config runtimeInterfaces.Configuration + storageClient *storage.DataStore + workflowExecutor workflowengineInterfaces.Executor + queueAllocator executions.QueueAllocator + _clock clock.Clock + systemMetrics executionSystemMetrics + userMetrics executionUserMetrics + notificationClient notificationInterfaces.Publisher + urlData dataInterfaces.RemoteURLInterface + workflowManager interfaces.WorkflowInterface + namedEntityManager interfaces.NamedEntityInterface + resourceManager interfaces.ResourceInterface + qualityOfServiceAllocator executions.QualityOfServiceAllocator } func getExecutionContext(ctx context.Context, id *core.WorkflowExecutionIdentifier) context.Context { @@ -264,9 +266,7 @@ func assignResourcesIfUnset(ctx context.Context, identifier *core.Identifier, // Note: The system will assign a system-default value for request but for limit it will deduce it from the request // itself => Limit := Min([Some-Multiplier X Request], System-Max). For now we are using a multiplier of 1. In // general we recommend the users to set limits close to requests for more predictability in the system. -func setCompiledTaskDefaults(ctx context.Context, config runtimeInterfaces.Configuration, - task *core.CompiledTask, db repositories.RepositoryInterface, workflowName string) { - resourceManager := resources.NewResourceManager(db, config.ApplicationConfiguration()) +func (m *ExecutionManager) setCompiledTaskDefaults(ctx context.Context, task *core.CompiledTask, workflowName string) { if task == nil { logger.Warningf(ctx, "Can't set default resources for nil task.") return @@ -276,7 +276,7 @@ func setCompiledTaskDefaults(ctx context.Context, config runtimeInterfaces.Confi logger.Debugf(ctx, "Not setting default resources for task [%+v], no container resources found to check", task) return } - resource, err := resourceManager.GetResource(ctx, interfaces.ResourceRequest{ + resource, err := m.resourceManager.GetResource(ctx, interfaces.ResourceRequest{ Project: task.Template.Id.Project, Domain: task.Template.Id.Domain, Workflow: workflowName, @@ -293,7 +293,7 @@ func setCompiledTaskDefaults(ctx context.Context, config runtimeInterfaces.Confi taskResourceSpec = resource.Attributes.GetTaskResourceAttributes().Defaults } task.Template.GetContainer().Resources.Requests = assignResourcesIfUnset( - ctx, task.Template.Id, config.TaskResourceConfiguration().GetDefaults(), task.Template.GetContainer().Resources.Requests, + ctx, task.Template.Id, m.config.TaskResourceConfiguration().GetDefaults(), task.Template.GetContainer().Resources.Requests, taskResourceSpec) logger.Debugf(ctx, "Assigning task resource limits for [%+v]", task.Template.Id) @@ -371,7 +371,7 @@ func (m *ExecutionManager) launchSingleTaskExecution( // Dynamically assign task resource defaults. for _, task := range workflow.Closure.CompiledWorkflow.Tasks { - setCompiledTaskDefaults(ctx, m.config, task, m.db, name) + m.setCompiledTaskDefaults(ctx, task, name) } // Dynamically assign execution queues. @@ -385,13 +385,23 @@ func (m *ExecutionManager) launchSingleTaskExecution( if err != nil { return nil, nil, err } + qualityOfService, err := m.qualityOfServiceAllocator.GetQualityOfService(ctx, executions.GetQualityOfServiceInput{ + Workflow: &workflow, + LaunchPlan: launchPlan, + ExecutionCreateRequest: &request, + }) + if err != nil { + logger.Errorf(ctx, "Failed to get quality of service for [%+v] with error: %v", workflowExecutionID, err) + return nil, nil, err + } executeTaskInputs := workflowengineInterfaces.ExecuteTaskInput{ - ExecutionID: &workflowExecutionID, - WfClosure: *workflow.Closure.CompiledWorkflow, - Inputs: request.Inputs, - ReferenceName: taskIdentifier.Name, - AcceptedAt: requestedAt, - Auth: request.Spec.AuthRole, + ExecutionID: &workflowExecutionID, + WfClosure: *workflow.Closure.CompiledWorkflow, + Inputs: request.Inputs, + ReferenceName: taskIdentifier.Name, + AcceptedAt: requestedAt, + Auth: request.Spec.AuthRole, + QueueingBudget: qualityOfService.QueuingBudget, } if request.Spec.Labels != nil { executeTaskInputs.Labels = request.Spec.Labels.Values @@ -523,7 +533,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( // Dynamically assign task resource defaults. for _, task := range workflow.Closure.CompiledWorkflow.Tasks { - setCompiledTaskDefaults(ctx, m.config, task, m.db, name) + m.setCompiledTaskDefaults(ctx, task, name) } // Dynamically assign execution queues. @@ -538,13 +548,24 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( return nil, nil, err } + qualityOfService, err := m.qualityOfServiceAllocator.GetQualityOfService(ctx, executions.GetQualityOfServiceInput{ + Workflow: workflow, + LaunchPlan: launchPlan, + ExecutionCreateRequest: &request, + }) + if err != nil { + logger.Errorf(ctx, "Failed to get quality of service for [%+v] with error: %v", workflowExecutionID, err) + return nil, nil, err + } + // TODO: Reduce CRD size and use offloaded input URI to blob store instead. executeWorkflowInputs := workflowengineInterfaces.ExecuteWorkflowInput{ - ExecutionID: &workflowExecutionID, - WfClosure: *workflow.Closure.CompiledWorkflow, - Inputs: executionInputs, - Reference: *launchPlan, - AcceptedAt: requestedAt, + ExecutionID: &workflowExecutionID, + WfClosure: *workflow.Closure.CompiledWorkflow, + Inputs: executionInputs, + Reference: *launchPlan, + AcceptedAt: requestedAt, + QueueingBudget: qualityOfService.QueuingBudget, } err = m.addLabelsAndAnnotations(request.Spec, &executeWorkflowInputs) if err != nil { @@ -1214,18 +1235,22 @@ func NewExecutionManager( ScheduledExecutionDelays: make(map[string]map[string]*promutils.StopWatch), WorkflowExecutionDurations: make(map[string]map[string]*promutils.StopWatch), } + + resourceManager := resources.NewResourceManager(db, config.ApplicationConfiguration()) return &ExecutionManager{ - db: db, - config: config, - storageClient: storageClient, - workflowExecutor: workflowExecutor, - queueAllocator: queueAllocator, - _clock: clock.New(), - systemMetrics: systemMetrics, - userMetrics: userMetrics, - notificationClient: publisher, - urlData: urlData, - workflowManager: workflowManager, - namedEntityManager: namedEntityManager, + db: db, + config: config, + storageClient: storageClient, + workflowExecutor: workflowExecutor, + queueAllocator: queueAllocator, + _clock: clock.New(), + systemMetrics: systemMetrics, + userMetrics: userMetrics, + notificationClient: publisher, + urlData: urlData, + workflowManager: workflowManager, + namedEntityManager: namedEntityManager, + resourceManager: resourceManager, + qualityOfServiceAllocator: executions.NewQualityOfServiceAllocator(config, resourceManager), } } diff --git a/pkg/manager/impl/execution_manager_test.go b/pkg/manager/impl/execution_manager_test.go index 753b796f0..afe70a57b 100644 --- a/pkg/manager/impl/execution_manager_test.go +++ b/pkg/manager/impl/execution_manager_test.go @@ -2449,7 +2449,10 @@ func TestSetDefaults(t *testing.T) { mockConfig := runtimeMocks.NewMockConfigurationProvider( testutils.GetApplicationConfigWithDefaultDomains(), nil, nil, &taskConfig, runtimeMocks.NewMockWhitelistConfiguration(), nil) - setCompiledTaskDefaults(context.Background(), mockConfig, task, repositoryMocks.NewMockRepository(), "workflow") + execManager := NewExecutionManager( + repositoryMocks.NewMockRepository(), mockConfig, getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), + mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager.(*ExecutionManager).setCompiledTaskDefaults(context.Background(), task, "workflow") assert.True(t, proto.Equal( &core.Container{ Resources: &core.Resources{ @@ -2515,7 +2518,10 @@ func TestSetDefaults_MissingDefaults(t *testing.T) { mockConfig := runtimeMocks.NewMockConfigurationProvider( testutils.GetApplicationConfigWithDefaultDomains(), nil, nil, &taskConfig, runtimeMocks.NewMockWhitelistConfiguration(), nil) - setCompiledTaskDefaults(context.Background(), mockConfig, task, repositoryMocks.NewMockRepository(), "workflow") + execManager := NewExecutionManager( + repositoryMocks.NewMockRepository(), mockConfig, getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(), + mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) + execManager.(*ExecutionManager).setCompiledTaskDefaults(context.Background(), task, "workflow") assert.True(t, proto.Equal( &core.Container{ Resources: &core.Resources{ diff --git a/pkg/manager/impl/executions/quality_of_service.go b/pkg/manager/impl/executions/quality_of_service.go new file mode 100644 index 000000000..80df081fb --- /dev/null +++ b/pkg/manager/impl/executions/quality_of_service.go @@ -0,0 +1,148 @@ +package executions + +import ( + "context" + "time" + + "github.com/golang/protobuf/ptypes" + "github.com/lyft/flyteadmin/pkg/errors" + "github.com/lyft/flyteadmin/pkg/manager/interfaces" + runtimeInterfaces "github.com/lyft/flyteadmin/pkg/runtime/interfaces" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" + "github.com/lyft/flytestdlib/logger" + "google.golang.org/grpc/codes" +) + +type QualityOfServiceSpec struct { + QueuingBudget time.Duration +} + +type GetQualityOfServiceInput struct { + Workflow *admin.Workflow + LaunchPlan *admin.LaunchPlan + ExecutionCreateRequest *admin.ExecutionCreateRequest +} + +type QualityOfServiceAllocator interface { + GetQualityOfService(ctx context.Context, input GetQualityOfServiceInput) (QualityOfServiceSpec, error) +} + +type qualityOfServiceAllocator struct { + config runtimeInterfaces.Configuration + resourceManager interfaces.ResourceInterface +} + +func (q qualityOfServiceAllocator) GetQualityOfService(ctx context.Context, input GetQualityOfServiceInput) (QualityOfServiceSpec, error) { + workflowIdentifier := input.Workflow.Id + + var qualityOfServiceTier core.QualityOfService_Tier + if input.ExecutionCreateRequest.Spec.QualityOfService != nil { + if input.ExecutionCreateRequest.Spec.QualityOfService.GetSpec() != nil { + duration, err := ptypes.Duration(input.ExecutionCreateRequest.Spec.QualityOfService.GetSpec().QueueingBudget) + if err != nil { + return QualityOfServiceSpec{}, errors.NewFlyteAdminErrorf(codes.InvalidArgument, + "Invalid custom quality of service set in create execution request [%s/%s/%s], failed to parse duration [%v] with: %v", + input.ExecutionCreateRequest.Project, input.ExecutionCreateRequest.Domain, + input.ExecutionCreateRequest.Name, + input.ExecutionCreateRequest.Spec.QualityOfService.GetSpec().QueueingBudget, err) + } + return QualityOfServiceSpec{ + QueuingBudget: duration, + }, nil + } + qualityOfServiceTier = input.ExecutionCreateRequest.Spec.QualityOfService.GetTier() + } else if input.LaunchPlan.Spec.QualityOfService != nil { + if input.LaunchPlan.Spec.QualityOfService.GetSpec() != nil { + duration, err := ptypes.Duration(input.LaunchPlan.Spec.QualityOfService.GetSpec().QueueingBudget) + if err != nil { + return QualityOfServiceSpec{}, errors.NewFlyteAdminErrorf(codes.InvalidArgument, + "Invalid custom quality of service set in launch plan [%v], failed to parse duration [%v] with: %v", + input.LaunchPlan.Id, + input.ExecutionCreateRequest.Spec.QualityOfService.GetSpec().QueueingBudget, err) + } + return QualityOfServiceSpec{ + QueuingBudget: duration, + }, nil + } + qualityOfServiceTier = input.LaunchPlan.Spec.QualityOfService.GetTier() + } else if input.Workflow.Closure.CompiledWorkflow.Primary.Template.Metadata != nil && + input.Workflow.Closure.CompiledWorkflow.Primary.Template.Metadata.QualityOfService != nil { + if input.Workflow.Closure.CompiledWorkflow.Primary.Template.Metadata.QualityOfService.GetSpec() != nil { + duration, err := ptypes.Duration(input.Workflow.Closure.CompiledWorkflow.Primary.Template.Metadata.QualityOfService. + GetSpec().QueueingBudget) + if err != nil { + return QualityOfServiceSpec{}, errors.NewFlyteAdminErrorf(codes.InvalidArgument, + "Invalid custom quality of service set in workflow [%v], failed to parse duration [%v] with: %v", + workflowIdentifier, + input.ExecutionCreateRequest.Spec.QualityOfService.GetSpec().QueueingBudget, err) + } + return QualityOfServiceSpec{ + QueuingBudget: duration, + }, nil + } + qualityOfServiceTier = input.Workflow.Closure.CompiledWorkflow.Primary.Template.Metadata.QualityOfService.GetTier() + } + + // If nothing in the hierarchy has set the quality of service, see if an override exists in the matchable attributes + // resource table. + resource, err := q.resourceManager.GetResource(ctx, interfaces.ResourceRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + Workflow: workflowIdentifier.Name, + ResourceType: admin.MatchableResource_QUALITY_OF_SERVICE_SPECIFICATION, + }) + if err != nil { + if _, ok := err.(errors.FlyteAdminError); !ok || err.(errors.FlyteAdminError).Code() != codes.NotFound { + logger.Warningf(ctx, + "Failed to fetch override values when assigning quality of service values for [%+v] with err: %v", + workflowIdentifier, err) + } + } + + if resource != nil && resource.Attributes != nil && resource.Attributes.GetQualityOfService() != nil && + resource.Attributes.GetQualityOfService().GetSpec() != nil { + // Use custom override value in database rather than from registered entities or the admin application config. + duration, err := ptypes.Duration(resource.Attributes.GetQualityOfService().GetSpec().QueueingBudget) + if err != nil { + return QualityOfServiceSpec{}, errors.NewFlyteAdminErrorf(codes.InvalidArgument, + "Invalid custom quality of service set for [%+v], failed to parse duration [%v] with: %v", + workflowIdentifier, resource.Attributes.GetQualityOfService().GetSpec().QueueingBudget, err) + } + return QualityOfServiceSpec{ + QueuingBudget: duration, + }, nil + } else if resource != nil && resource.Attributes != nil && resource.Attributes.GetQualityOfService() != nil && + resource.Attributes.GetQualityOfService().GetTier() != core.QualityOfService_UNDEFINED { + qualityOfServiceTier = resource.Attributes.GetQualityOfService().GetTier() + } + + if qualityOfServiceTier == core.QualityOfService_UNDEFINED { + // If we've come all this way and at no layer is an overridable configuration for the quality of service tier + // set, use the default values from the admin application config. + var ok bool + qualityOfServiceTier, ok = q.config.QualityOfServiceConfiguration().GetDefaultTiers()[input.ExecutionCreateRequest.Domain] + if !ok { + // No queueing budget to set when no default is specified + return QualityOfServiceSpec{}, nil + } + } + executionValues, ok := q.config.QualityOfServiceConfiguration().GetTierExecutionValues()[qualityOfServiceTier] + if !ok { + // No queueing budget to set when no default is specified + return QualityOfServiceSpec{}, nil + } + // Config values should always be vetted so there's no need to check the error from conversion. + duration, _ := ptypes.Duration(executionValues.QueueingBudget) + + return QualityOfServiceSpec{ + QueuingBudget: duration, + }, nil +} + +func NewQualityOfServiceAllocator(config runtimeInterfaces.Configuration, resourceManager interfaces.ResourceInterface) QualityOfServiceAllocator { + return &qualityOfServiceAllocator{ + config: config, + resourceManager: resourceManager, + } +} diff --git a/pkg/manager/impl/executions/quality_of_service_test.go b/pkg/manager/impl/executions/quality_of_service_test.go new file mode 100644 index 000000000..738e19680 --- /dev/null +++ b/pkg/manager/impl/executions/quality_of_service_test.go @@ -0,0 +1,212 @@ +package executions + +import ( + "context" + "testing" + "time" + + "github.com/golang/protobuf/ptypes" + "github.com/lyft/flyteadmin/pkg/manager/interfaces" + managerMocks "github.com/lyft/flyteadmin/pkg/manager/mocks" + runtimeInterfaces "github.com/lyft/flyteadmin/pkg/runtime/interfaces" + "github.com/lyft/flyteadmin/pkg/runtime/mocks" + runtimeMocks "github.com/lyft/flyteadmin/pkg/runtime/mocks" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" + "github.com/stretchr/testify/assert" +) + +var workflowIdentifier = &core.Identifier{ + ResourceType: core.ResourceType_WORKFLOW, + Project: "project", + Domain: "development", + Name: "worky", +} + +func getQualityOfServiceWithDuration(duration time.Duration) *core.QualityOfService { + return &core.QualityOfService{ + Designation: &core.QualityOfService_Spec{ + Spec: &core.QualityOfServiceSpec{ + QueueingBudget: ptypes.DurationProto(duration), + }, + }, + } +} + +func getMockConfig() runtimeInterfaces.Configuration { + mockConfig := mocks.NewMockConfigurationProvider(nil, nil, nil, nil, nil, nil) + provider := runtimeMocks.NewMockQualityOfServiceProvider() + provider.(*runtimeMocks.MockQualityOfServiceProvider).TierExecutionValues = map[core.QualityOfService_Tier]core.QualityOfServiceSpec{ + core.QualityOfService_HIGH: { + QueueingBudget: ptypes.DurationProto(10 * time.Minute), + }, + core.QualityOfService_MEDIUM: { + QueueingBudget: ptypes.DurationProto(20 * time.Minute), + }, + core.QualityOfService_LOW: { + QueueingBudget: ptypes.DurationProto(30 * time.Minute), + }, + } + + provider.(*runtimeMocks.MockQualityOfServiceProvider).DefaultTiers = map[string]core.QualityOfService_Tier{ + "production": core.QualityOfService_HIGH, + "development": core.QualityOfService_LOW, + } + + mockConfig.(*runtimeMocks.MockConfigurationProvider).AddQualityOfServiceConfiguration(provider) + return mockConfig +} + +func addGetResourceFunc(t *testing.T, resourceManager interfaces.ResourceInterface) { + resourceManager.(*managerMocks.MockResourceManager).GetResourceFunc = func(ctx context.Context, + request interfaces.ResourceRequest) (*interfaces.ResourceResponse, error) { + assert.EqualValues(t, request, interfaces.ResourceRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + Workflow: workflowIdentifier.Name, + ResourceType: admin.MatchableResource_QUALITY_OF_SERVICE_SPECIFICATION, + }) + return &interfaces.ResourceResponse{ + Attributes: &admin.MatchingAttributes{ + Target: &admin.MatchingAttributes_QualityOfService{ + QualityOfService: getQualityOfServiceWithDuration(5 * time.Minute), + }, + }, + }, nil + } +} + +func getWorkflowWithQosSpec(qualityOfService *core.QualityOfService) *admin.Workflow { + return &admin.Workflow{ + Id: workflowIdentifier, + Closure: &admin.WorkflowClosure{ + CompiledWorkflow: &core.CompiledWorkflowClosure{ + Primary: &core.CompiledWorkflow{ + Template: &core.WorkflowTemplate{ + Metadata: &core.WorkflowMetadata{ + QualityOfService: qualityOfService, + }, + }, + }, + }, + }, + } +} + +func TestGetQualityOfService_ExecutionCreateRequest(t *testing.T) { + resourceManager := managerMocks.MockResourceManager{} + addGetResourceFunc(t, &resourceManager) + + allocator := NewQualityOfServiceAllocator(getMockConfig(), &resourceManager) + spec, err := allocator.GetQualityOfService(context.Background(), GetQualityOfServiceInput{ + Workflow: getWorkflowWithQosSpec(getQualityOfServiceWithDuration(4 * time.Minute)), + LaunchPlan: &admin.LaunchPlan{ + Spec: &admin.LaunchPlanSpec{ + QualityOfService: getQualityOfServiceWithDuration(2 * time.Minute), + }, + }, + ExecutionCreateRequest: &admin.ExecutionCreateRequest{ + Domain: "production", + Spec: &admin.ExecutionSpec{ + QualityOfService: getQualityOfServiceWithDuration(3 * time.Minute), + }, + }, + }) + assert.Nil(t, err) + assert.EqualValues(t, spec.QueuingBudget, 3*time.Minute) +} + +func TestGetQualityOfService_LaunchPlan(t *testing.T) { + resourceManager := managerMocks.MockResourceManager{} + addGetResourceFunc(t, &resourceManager) + + allocator := NewQualityOfServiceAllocator(getMockConfig(), &resourceManager) + spec, err := allocator.GetQualityOfService(context.Background(), GetQualityOfServiceInput{ + Workflow: getWorkflowWithQosSpec(getQualityOfServiceWithDuration(4 * time.Minute)), + LaunchPlan: &admin.LaunchPlan{ + Spec: &admin.LaunchPlanSpec{ + QualityOfService: getQualityOfServiceWithDuration(2 * time.Minute), + }, + }, + ExecutionCreateRequest: &admin.ExecutionCreateRequest{ + Domain: "production", + Spec: &admin.ExecutionSpec{}, + }, + }) + assert.Nil(t, err) + assert.EqualValues(t, spec.QueuingBudget, 2*time.Minute) +} + +func TestGetQualityOfService_Workflow(t *testing.T) { + resourceManager := managerMocks.MockResourceManager{} + addGetResourceFunc(t, &resourceManager) + + allocator := NewQualityOfServiceAllocator(getMockConfig(), &resourceManager) + spec, err := allocator.GetQualityOfService(context.Background(), GetQualityOfServiceInput{ + Workflow: getWorkflowWithQosSpec(getQualityOfServiceWithDuration(4 * time.Minute)), + LaunchPlan: &admin.LaunchPlan{ + Spec: &admin.LaunchPlanSpec{}, + }, + ExecutionCreateRequest: &admin.ExecutionCreateRequest{ + Domain: "production", + Spec: &admin.ExecutionSpec{}, + }, + }) + assert.Nil(t, err) + assert.EqualValues(t, spec.QueuingBudget, 4*time.Minute) +} + +func TestGetQualityOfService_MatchableResource(t *testing.T) { + resourceManager := managerMocks.MockResourceManager{} + addGetResourceFunc(t, &resourceManager) + + allocator := NewQualityOfServiceAllocator(getMockConfig(), &resourceManager) + spec, err := allocator.GetQualityOfService(context.Background(), GetQualityOfServiceInput{ + Workflow: getWorkflowWithQosSpec(nil), + LaunchPlan: &admin.LaunchPlan{ + Spec: &admin.LaunchPlanSpec{}, + }, + ExecutionCreateRequest: &admin.ExecutionCreateRequest{ + Domain: "production", + Spec: &admin.ExecutionSpec{}, + }, + }) + assert.Nil(t, err) + assert.EqualValues(t, spec.QueuingBudget, 5*time.Minute) +} + +func TestGetQualityOfService_ConfigValues(t *testing.T) { + resourceManager := managerMocks.MockResourceManager{} + + allocator := NewQualityOfServiceAllocator(getMockConfig(), &resourceManager) + spec, err := allocator.GetQualityOfService(context.Background(), GetQualityOfServiceInput{ + Workflow: getWorkflowWithQosSpec(nil), + LaunchPlan: &admin.LaunchPlan{ + Spec: &admin.LaunchPlanSpec{}, + }, + ExecutionCreateRequest: &admin.ExecutionCreateRequest{ + Domain: "production", + Spec: &admin.ExecutionSpec{}, + }, + }) + assert.Nil(t, err) + assert.EqualValues(t, spec.QueuingBudget, 10*time.Minute) +} + +func TestGetQualityOfService_NoDefault(t *testing.T) { + resourceManager := managerMocks.MockResourceManager{} + + allocator := NewQualityOfServiceAllocator(getMockConfig(), &resourceManager) + spec, err := allocator.GetQualityOfService(context.Background(), GetQualityOfServiceInput{ + Workflow: getWorkflowWithQosSpec(nil), + LaunchPlan: &admin.LaunchPlan{ + Spec: &admin.LaunchPlanSpec{}, + }, + ExecutionCreateRequest: &admin.ExecutionCreateRequest{ + Domain: "staging", // Nothing configured to match in the application config. + Spec: &admin.ExecutionSpec{}, + }, + }) + assert.Nil(t, err) + assert.EqualValues(t, spec.QueuingBudget.Seconds(), 0) +} diff --git a/pkg/manager/mocks/resource.go b/pkg/manager/mocks/resource.go index fca248ec2..5901d8768 100644 --- a/pkg/manager/mocks/resource.go +++ b/pkg/manager/mocks/resource.go @@ -16,16 +16,21 @@ type DeleteProjectDomainFunc func(ctx context.Context, request admin.ProjectDoma *admin.ProjectDomainAttributesDeleteResponse, error) type ListResourceFunc func(ctx context.Context, request admin.ListMatchableAttributesRequest) ( *admin.ListMatchableAttributesResponse, error) +type GetResourceFunc func(ctx context.Context, request interfaces.ResourceRequest) (*interfaces.ResourceResponse, error) type MockResourceManager struct { updateProjectDomainFunc UpdateProjectDomainFunc GetFunc GetProjectDomainFunc DeleteFunc DeleteProjectDomainFunc ListFunc ListResourceFunc + GetResourceFunc GetResourceFunc } func (m *MockResourceManager) GetResource(ctx context.Context, request interfaces.ResourceRequest) (*interfaces.ResourceResponse, error) { - panic("implement me") + if m.GetResourceFunc != nil { + return m.GetResourceFunc(ctx, request) + } + return nil, nil } func (m *MockResourceManager) UpdateWorkflowAttributes(ctx context.Context, request admin.WorkflowAttributesUpdateRequest) ( diff --git a/pkg/runtime/configuration_provider.go b/pkg/runtime/configuration_provider.go index ba1c237e6..96d7de1a7 100644 --- a/pkg/runtime/configuration_provider.go +++ b/pkg/runtime/configuration_provider.go @@ -14,6 +14,7 @@ type ConfigurationProvider struct { registrationValidationConfiguration interfaces.RegistrationValidationConfiguration clusterResourceConfiguration interfaces.ClusterResourceConfiguration namespaceMappingConfiguration interfaces.NamespaceMappingConfiguration + qualityOfServiceConfiguration interfaces.QualityOfServiceConfiguration } func (p *ConfigurationProvider) ApplicationConfiguration() interfaces.ApplicationConfiguration { @@ -48,6 +49,10 @@ func (p *ConfigurationProvider) NamespaceMappingConfiguration() interfaces.Names return p.namespaceMappingConfiguration } +func (p *ConfigurationProvider) QualityOfServiceConfiguration() interfaces.QualityOfServiceConfiguration { + return p.qualityOfServiceConfiguration +} + func NewConfigurationProvider() interfaces.Configuration { return &ConfigurationProvider{ applicationConfiguration: NewApplicationConfigurationProvider(), @@ -58,5 +63,6 @@ func NewConfigurationProvider() interfaces.Configuration { registrationValidationConfiguration: NewRegistrationValidationProvider(), clusterResourceConfiguration: NewClusterResourceConfigurationProvider(), namespaceMappingConfiguration: NewNamespaceMappingConfigurationProvider(), + qualityOfServiceConfiguration: NewQualityOfServiceConfigProvider(), } } diff --git a/pkg/runtime/interfaces/configuration.go b/pkg/runtime/interfaces/configuration.go index 4c4151584..7fe694be0 100644 --- a/pkg/runtime/interfaces/configuration.go +++ b/pkg/runtime/interfaces/configuration.go @@ -10,4 +10,5 @@ type Configuration interface { RegistrationValidationConfiguration() RegistrationValidationConfiguration ClusterResourceConfiguration() ClusterResourceConfiguration NamespaceMappingConfiguration() NamespaceMappingConfiguration + QualityOfServiceConfiguration() QualityOfServiceConfiguration } diff --git a/pkg/runtime/interfaces/quality_of_service_configuration.go b/pkg/runtime/interfaces/quality_of_service_configuration.go new file mode 100644 index 000000000..4d5265801 --- /dev/null +++ b/pkg/runtime/interfaces/quality_of_service_configuration.go @@ -0,0 +1,22 @@ +package interfaces + +import ( + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" + "github.com/lyft/flytestdlib/config" +) + +type TierName = string + +type QualityOfServiceSpec struct { + QueueingBudget config.Duration `json:"queueingBudget"` +} + +type QualityOfServiceConfig struct { + TierExecutionValues map[TierName]QualityOfServiceSpec `json:"tierExecutionValues"` + DefaultTiers map[DomainName]TierName `json:"defaultTiers"` +} + +type QualityOfServiceConfiguration interface { + GetTierExecutionValues() map[core.QualityOfService_Tier]core.QualityOfServiceSpec + GetDefaultTiers() map[DomainName]core.QualityOfService_Tier +} diff --git a/pkg/runtime/mocks/mock_configuration_provider.go b/pkg/runtime/mocks/mock_configuration_provider.go index 85129c981..47c948f41 100644 --- a/pkg/runtime/mocks/mock_configuration_provider.go +++ b/pkg/runtime/mocks/mock_configuration_provider.go @@ -11,6 +11,7 @@ type MockConfigurationProvider struct { registrationValidationConfiguration interfaces.RegistrationValidationConfiguration clusterResourceConfiguration interfaces.ClusterResourceConfiguration namespaceMappingConfiguration interfaces.NamespaceMappingConfiguration + qualityOfServiceConfiguration interfaces.QualityOfServiceConfiguration } func (p *MockConfigurationProvider) ApplicationConfiguration() interfaces.ApplicationConfiguration { @@ -57,6 +58,14 @@ func (p *MockConfigurationProvider) AddNamespaceMappingConfiguration(config inte p.namespaceMappingConfiguration = config } +func (p *MockConfigurationProvider) QualityOfServiceConfiguration() interfaces.QualityOfServiceConfiguration { + return p.qualityOfServiceConfiguration +} + +func (p *MockConfigurationProvider) AddQualityOfServiceConfiguration(config interfaces.QualityOfServiceConfiguration) { + p.qualityOfServiceConfiguration = config +} + func NewMockConfigurationProvider( applicationConfiguration interfaces.ApplicationConfiguration, queueConfiguration interfaces.QueueConfiguration, @@ -71,5 +80,6 @@ func NewMockConfigurationProvider( taskResourceConfiguration: taskResourceConfiguration, whitelistConfiguration: whitelistConfiguration, namespaceMappingConfiguration: namespaceMappingConfiguration, + qualityOfServiceConfiguration: NewMockQualityOfServiceProvider(), } } diff --git a/pkg/runtime/mocks/mock_quality_of_service_provider.go b/pkg/runtime/mocks/mock_quality_of_service_provider.go new file mode 100644 index 000000000..e572ba642 --- /dev/null +++ b/pkg/runtime/mocks/mock_quality_of_service_provider.go @@ -0,0 +1,26 @@ +package mocks + +import ( + "github.com/lyft/flyteadmin/pkg/runtime/interfaces" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" +) + +type MockQualityOfServiceProvider struct { + TierExecutionValues map[core.QualityOfService_Tier]core.QualityOfServiceSpec + DefaultTiers map[string]core.QualityOfService_Tier +} + +func (p MockQualityOfServiceProvider) GetTierExecutionValues() map[core.QualityOfService_Tier]core.QualityOfServiceSpec { + return p.TierExecutionValues +} + +func (p MockQualityOfServiceProvider) GetDefaultTiers() map[string]core.QualityOfService_Tier { + return p.DefaultTiers +} + +func NewMockQualityOfServiceProvider() interfaces.QualityOfServiceConfiguration { + return &MockQualityOfServiceProvider{ + TierExecutionValues: make(map[core.QualityOfService_Tier]core.QualityOfServiceSpec), + DefaultTiers: make(map[string]core.QualityOfService_Tier), + } +} diff --git a/pkg/runtime/quality_of_service_provider.go b/pkg/runtime/quality_of_service_provider.go new file mode 100644 index 000000000..fad06fb81 --- /dev/null +++ b/pkg/runtime/quality_of_service_provider.go @@ -0,0 +1,60 @@ +package runtime + +import ( + "fmt" + + "github.com/golang/protobuf/ptypes" + "github.com/lyft/flyteadmin/pkg/runtime/interfaces" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" + "github.com/lyft/flytestdlib/config" +) + +const qualityOfServiceKey = "qualityOfService" + +var qualityOfServiceConfig = config.MustRegisterSection(qualityOfServiceKey, &interfaces.QualityOfServiceConfig{}) + +// Implementation of an interfaces.QualityOfServiceConfiguration +type QualityOfServiceConfigProvider struct { +} + +func (p *QualityOfServiceConfigProvider) GetTierExecutionValues() map[core.QualityOfService_Tier]core.QualityOfServiceSpec { + tierExecutionValues := make(map[core.QualityOfService_Tier]core.QualityOfServiceSpec) + if qualityOfServiceConfig != nil { + values := qualityOfServiceConfig.GetConfig().(*interfaces.QualityOfServiceConfig).TierExecutionValues + for tierName, spec := range values { + tierExecutionValues[core.QualityOfService_Tier(core.QualityOfService_Tier_value[tierName])] = + core.QualityOfServiceSpec{ + QueueingBudget: ptypes.DurationProto(spec.QueueingBudget.Duration), + } + } + } + return tierExecutionValues +} + +func (p *QualityOfServiceConfigProvider) GetDefaultTiers() map[interfaces.DomainName]core.QualityOfService_Tier { + defaultTiers := make(map[interfaces.DomainName]core.QualityOfService_Tier) + if qualityOfServiceConfig != nil { + tiers := qualityOfServiceConfig.GetConfig().(*interfaces.QualityOfServiceConfig).DefaultTiers + for domainName, tierName := range tiers { + defaultTiers[domainName] = core.QualityOfService_Tier(core.QualityOfService_Tier_value[tierName]) + } + } + return defaultTiers +} + +func validateConfigValues() { + if qualityOfServiceConfig != nil { + values := qualityOfServiceConfig.GetConfig().(*interfaces.QualityOfServiceConfig).TierExecutionValues + for tierName, spec := range values { + _, err := ptypes.Duration(ptypes.DurationProto(spec.QueueingBudget.Duration)) + if err != nil { + panic(fmt.Sprintf("Invalid duration [%+v] specified for %s", spec.QueueingBudget.Duration, tierName)) + } + } + } +} + +func NewQualityOfServiceConfigProvider() interfaces.QualityOfServiceConfiguration { + validateConfigValues() + return &QualityOfServiceConfigProvider{} +} diff --git a/pkg/workflowengine/impl/propeller_executor.go b/pkg/workflowengine/impl/propeller_executor.go index 4efc1e24f..9ff59cc5b 100644 --- a/pkg/workflowengine/impl/propeller_executor.go +++ b/pkg/workflowengine/impl/propeller_executor.go @@ -122,6 +122,12 @@ func (c *FlytePropeller) ExecuteWorkflow(ctx context.Context, input interfaces.E annotations := addMapValues(input.Annotations, flyteWf.Annotations) flyteWf.Annotations = annotations + /* + TODO(katrogan): uncomment once propeller has updated the flyte workflow CRD. + queueingBudgetSeconds := int64(input.QueueingBudget.Seconds()) + flyteWf.QueuingBudgetSeconds = &queueingBudgetSeconds + */ + executionTargetSpec := executioncluster.ExecutionTargetSpec{ Project: input.ExecutionID.Project, Domain: input.ExecutionID.Domain, @@ -191,6 +197,12 @@ func (c *FlytePropeller) ExecuteTask(ctx context.Context, input interfaces.Execu annotations := addMapValues(input.Annotations, flyteWf.Annotations) flyteWf.Annotations = annotations + /* + TODO(katrogan): uncomment once propeller has updated the flyte workflow CRD. + queueingBudgetSeconds := int64(input.QueueingBudget.Seconds()) + flyteWf.QueuingBudgetSeconds = &queueingBudgetSeconds + */ + executionTargetSpec := executioncluster.ExecutionTargetSpec{ Project: input.ExecutionID.Project, Domain: input.ExecutionID.Domain, diff --git a/pkg/workflowengine/interfaces/executor.go b/pkg/workflowengine/interfaces/executor.go index 42892b42c..471ec98b5 100644 --- a/pkg/workflowengine/interfaces/executor.go +++ b/pkg/workflowengine/interfaces/executor.go @@ -10,24 +10,26 @@ import ( ) type ExecuteWorkflowInput struct { - ExecutionID *core.WorkflowExecutionIdentifier - WfClosure core.CompiledWorkflowClosure - Inputs *core.LiteralMap - Reference admin.LaunchPlan - AcceptedAt time.Time - Labels map[string]string - Annotations map[string]string + ExecutionID *core.WorkflowExecutionIdentifier + WfClosure core.CompiledWorkflowClosure + Inputs *core.LiteralMap + Reference admin.LaunchPlan + AcceptedAt time.Time + Labels map[string]string + Annotations map[string]string + QueueingBudget time.Duration } type ExecuteTaskInput struct { - ExecutionID *core.WorkflowExecutionIdentifier - WfClosure core.CompiledWorkflowClosure - Inputs *core.LiteralMap - ReferenceName string - Auth *admin.AuthRole - AcceptedAt time.Time - Labels map[string]string - Annotations map[string]string + ExecutionID *core.WorkflowExecutionIdentifier + WfClosure core.CompiledWorkflowClosure + Inputs *core.LiteralMap + ReferenceName string + Auth *admin.AuthRole + AcceptedAt time.Time + Labels map[string]string + Annotations map[string]string + QueueingBudget time.Duration } type TerminateWorkflowInput struct { From 800486e57fcbda8abdacba61057d22491eb5c816 Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Wed, 8 Jul 2020 16:52:41 -0700 Subject: [PATCH 2/6] nits --- go.mod | 2 -- go.sum | 1 + .../impl/executions/quality_of_service.go | 16 ++++++++++++++++ 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index c5a69457a..c83db22ff 100644 --- a/go.mod +++ b/go.mod @@ -53,5 +53,3 @@ replace ( k8s.io/apimachinery => github.com/lyft/apimachinery v0.0.0-20191031200210-047e3ea32d7f k8s.io/client-go => k8s.io/client-go v0.0.0-20191016111102-bec269661e48 ) - -replace github.com/lyft/flyteidl => ../flyteidl/ diff --git a/go.sum b/go.sum index 91faa36c1..3ca031d05 100644 --- a/go.sum +++ b/go.sum @@ -456,6 +456,7 @@ github.com/lyft/flyteidl v0.17.27/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1 github.com/lyft/flyteidl v0.17.32/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= github.com/lyft/flyteidl v0.17.34 h1:8ERT/8vY40dOPPJrdD8ossBb30WkvzUx/IAFMR/7+9U= github.com/lyft/flyteidl v0.17.34/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= +github.com/lyft/flyteidl v0.17.36/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= github.com/lyft/flyteplugins v0.3.11/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA= github.com/lyft/flyteplugins v0.3.21/go.mod h1:NDhdkOAn2q6p7YLq9a0/lxyS0dburoAEgipOY5TiO8A= github.com/lyft/flyteplugins v0.3.33/go.mod h1:HHO6KC/2z77n9o9KM697YvSP85IWDe6jl6tAIrMLqWU= diff --git a/pkg/manager/impl/executions/quality_of_service.go b/pkg/manager/impl/executions/quality_of_service.go index 80df081fb..237bed1d5 100644 --- a/pkg/manager/impl/executions/quality_of_service.go +++ b/pkg/manager/impl/executions/quality_of_service.go @@ -33,6 +33,22 @@ type qualityOfServiceAllocator struct { resourceManager interfaces.ResourceInterface } +/* +Users can specify the quality of service for an execution (in order of decreasing specificity) + +- At CreateExecution request time +- In the LaunchPlan spec +- In the Workflow spec +- As an overridable MatchableResource (https://lyft.github.io/flyte/administrator/install/managing_customizable_resources.html) + for the underlying workflow + +System administrators can specify default QualityOfService specs +(https://github.com/lyft/flyteidl/blob/e9727afcedf8d4c30a1fc2eeac45593e426d9bb0/protos/flyteidl/core/execution.proto#L92)s +for different QualityOfService tiers. The execution domain determines the tier, which in turn determines the configured +QualityOfService spec to apply. + +This method handles resolving the QualityOfService for an execution given the above rules. + */ func (q qualityOfServiceAllocator) GetQualityOfService(ctx context.Context, input GetQualityOfServiceInput) (QualityOfServiceSpec, error) { workflowIdentifier := input.Workflow.Id From 308ed5c0bd91f3c0374c4c9aebdc7851e48a9f76 Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Thu, 9 Jul 2020 14:44:19 -0700 Subject: [PATCH 3/6] review comments, another test --- go.sum | 1 + pkg/manager/impl/execution_manager_test.go | 22 ++++- .../impl/executions/quality_of_service.go | 82 +++++++++++-------- 3 files changed, 72 insertions(+), 33 deletions(-) diff --git a/go.sum b/go.sum index 3ca031d05..76efec4f5 100644 --- a/go.sum +++ b/go.sum @@ -456,6 +456,7 @@ github.com/lyft/flyteidl v0.17.27/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1 github.com/lyft/flyteidl v0.17.32/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= github.com/lyft/flyteidl v0.17.34 h1:8ERT/8vY40dOPPJrdD8ossBb30WkvzUx/IAFMR/7+9U= github.com/lyft/flyteidl v0.17.34/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= +github.com/lyft/flyteidl v0.17.36 h1:frCsRL9h4aoe+VnQSUhWM9FqZXjCAXcmR96Jt3Y+qVE= github.com/lyft/flyteidl v0.17.36/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= github.com/lyft/flyteplugins v0.3.11/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA= github.com/lyft/flyteplugins v0.3.21/go.mod h1:NDhdkOAn2q6p7YLq9a0/lxyS0dburoAEgipOY5TiO8A= diff --git a/pkg/manager/impl/execution_manager_test.go b/pkg/manager/impl/execution_manager_test.go index afe70a57b..4440d575f 100644 --- a/pkg/manager/impl/execution_manager_test.go +++ b/pkg/manager/impl/execution_manager_test.go @@ -217,12 +217,32 @@ func TestCreateExecution(t *testing.T) { "annotation3": "3", "annotation4": "4", }, inputs.Annotations) + assert.EqualValues(t, 10*time.Minute, inputs.QueueingBudget) return &workflowengineInterfaces.ExecutionInfo{ Cluster: testCluster, }, nil }) + qosProvider := runtimeMocks.NewMockQualityOfServiceProvider() + qosProvider.(*runtimeMocks.MockQualityOfServiceProvider).TierExecutionValues = map[core.QualityOfService_Tier]core.QualityOfServiceSpec{ + core.QualityOfService_HIGH: { + QueueingBudget: ptypes.DurationProto(10 * time.Minute), + }, + core.QualityOfService_MEDIUM: { + QueueingBudget: ptypes.DurationProto(20 * time.Minute), + }, + core.QualityOfService_LOW: { + QueueingBudget: ptypes.DurationProto(30 * time.Minute), + }, + } + + qosProvider.(*runtimeMocks.MockQualityOfServiceProvider).DefaultTiers = map[string]core.QualityOfService_Tier{ + "domain": core.QualityOfService_HIGH, + } + + mockConfig := getMockExecutionsConfigProvider() + mockConfig.(*runtimeMocks.MockConfigurationProvider).AddQualityOfServiceConfiguration(qosProvider) execManager := NewExecutionManager( - repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockExecutor, + repository, mockConfig, getMockStorageForExecTest(context.Background()), mockExecutor, mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil) request := testutils.GetExecutionRequest() request.Spec.Metadata = &admin.ExecutionMetadata{ diff --git a/pkg/manager/impl/executions/quality_of_service.go b/pkg/manager/impl/executions/quality_of_service.go index 237bed1d5..9530b990c 100644 --- a/pkg/manager/impl/executions/quality_of_service.go +++ b/pkg/manager/impl/executions/quality_of_service.go @@ -33,6 +33,34 @@ type qualityOfServiceAllocator struct { resourceManager interfaces.ResourceInterface } +func (q qualityOfServiceAllocator) getQualityOfServiceFromDb(ctx context.Context, workflowIdentifier *core.Identifier) ( + *core.QualityOfService, error) { + resource, err := q.resourceManager.GetResource(ctx, interfaces.ResourceRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + Workflow: workflowIdentifier.Name, + ResourceType: admin.MatchableResource_QUALITY_OF_SERVICE_SPECIFICATION, + }) + if err != nil { + if _, ok := err.(errors.FlyteAdminError); !ok || err.(errors.FlyteAdminError).Code() != codes.NotFound { + logger.Warningf(ctx, + "Failed to fetch override values when assigning quality of service values for [%+v] with err: %v", + workflowIdentifier, err) + return nil, err + } + logger.Debugf(ctx, "No quality of service specified as an overridable matching attribute in db") + return nil, nil + } + + if resource != nil && resource.Attributes != nil && resource.Attributes.GetQualityOfService() != nil && + resource.Attributes.GetQualityOfService() != nil { + // Use custom override value in database rather than from registered entities or the admin application config. + return resource.Attributes.GetQualityOfService(), nil + } + logger.Warningf(ctx, "Empty quality of service specified as an overridable matching attribute in db") + return nil, nil +} + /* Users can specify the quality of service for an execution (in order of decreasing specificity) @@ -48,7 +76,7 @@ for different QualityOfService tiers. The execution domain determines the tier, QualityOfService spec to apply. This method handles resolving the QualityOfService for an execution given the above rules. - */ +*/ func (q qualityOfServiceAllocator) GetQualityOfService(ctx context.Context, input GetQualityOfServiceInput) (QualityOfServiceSpec, error) { workflowIdentifier := input.Workflow.Id @@ -100,42 +128,32 @@ func (q qualityOfServiceAllocator) GetQualityOfService(ctx context.Context, inpu qualityOfServiceTier = input.Workflow.Closure.CompiledWorkflow.Primary.Template.Metadata.QualityOfService.GetTier() } - // If nothing in the hierarchy has set the quality of service, see if an override exists in the matchable attributes - // resource table. - resource, err := q.resourceManager.GetResource(ctx, interfaces.ResourceRequest{ - Project: workflowIdentifier.Project, - Domain: workflowIdentifier.Domain, - Workflow: workflowIdentifier.Name, - ResourceType: admin.MatchableResource_QUALITY_OF_SERVICE_SPECIFICATION, - }) - if err != nil { - if _, ok := err.(errors.FlyteAdminError); !ok || err.(errors.FlyteAdminError).Code() != codes.NotFound { - logger.Warningf(ctx, - "Failed to fetch override values when assigning quality of service values for [%+v] with err: %v", - workflowIdentifier, err) - } - } - - if resource != nil && resource.Attributes != nil && resource.Attributes.GetQualityOfService() != nil && - resource.Attributes.GetQualityOfService().GetSpec() != nil { - // Use custom override value in database rather than from registered entities or the admin application config. - duration, err := ptypes.Duration(resource.Attributes.GetQualityOfService().GetSpec().QueueingBudget) + // If nothing in the hierarchy of registrable entities has set the quality of service, + // see if an override exists in the matchable attributes resource table. + if qualityOfServiceTier == core.QualityOfService_UNDEFINED { + qualityOfService, err := q.getQualityOfServiceFromDb(ctx, workflowIdentifier) if err != nil { - return QualityOfServiceSpec{}, errors.NewFlyteAdminErrorf(codes.InvalidArgument, - "Invalid custom quality of service set for [%+v], failed to parse duration [%v] with: %v", - workflowIdentifier, resource.Attributes.GetQualityOfService().GetSpec().QueueingBudget, err) + return QualityOfServiceSpec{}, err + } + if qualityOfService != nil && qualityOfService.GetSpec() != nil { + duration, err := ptypes.Duration(qualityOfService.GetSpec().QueueingBudget) + if err != nil { + return QualityOfServiceSpec{}, errors.NewFlyteAdminErrorf(codes.InvalidArgument, + "Invalid custom quality of service set in overridable matching attributes for [%v],"+ + "failed to parse duration [%v] with: %v", workflowIdentifier, + input.ExecutionCreateRequest.Spec.QualityOfService.GetSpec().QueueingBudget, err) + } + return QualityOfServiceSpec{ + QueuingBudget: duration, + }, nil + } else if qualityOfService != nil && qualityOfService.GetTier() != core.QualityOfService_UNDEFINED { + qualityOfServiceTier = input.Workflow.Closure.CompiledWorkflow.Primary.Template.Metadata.QualityOfService.GetTier() } - return QualityOfServiceSpec{ - QueuingBudget: duration, - }, nil - } else if resource != nil && resource.Attributes != nil && resource.Attributes.GetQualityOfService() != nil && - resource.Attributes.GetQualityOfService().GetTier() != core.QualityOfService_UNDEFINED { - qualityOfServiceTier = resource.Attributes.GetQualityOfService().GetTier() } + // If we've come all this way and at no layer is an overridable configuration for the quality of service tier + // set, use the default values from the admin application config. if qualityOfServiceTier == core.QualityOfService_UNDEFINED { - // If we've come all this way and at no layer is an overridable configuration for the quality of service tier - // set, use the default values from the admin application config. var ok bool qualityOfServiceTier, ok = q.config.QualityOfServiceConfiguration().GetDefaultTiers()[input.ExecutionCreateRequest.Domain] if !ok { From f0f6133a8ab7e33961a367dbae922b78c09924f9 Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Tue, 21 Jul 2020 13:49:08 -0700 Subject: [PATCH 4/6] logs --- .../impl/executions/quality_of_service.go | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/pkg/manager/impl/executions/quality_of_service.go b/pkg/manager/impl/executions/quality_of_service.go index 9530b990c..e916a5d47 100644 --- a/pkg/manager/impl/executions/quality_of_service.go +++ b/pkg/manager/impl/executions/quality_of_service.go @@ -83,6 +83,9 @@ func (q qualityOfServiceAllocator) GetQualityOfService(ctx context.Context, inpu var qualityOfServiceTier core.QualityOfService_Tier if input.ExecutionCreateRequest.Spec.QualityOfService != nil { if input.ExecutionCreateRequest.Spec.QualityOfService.GetSpec() != nil { + logger.Debugf(ctx, "Determining quality of service from execution spec for [%s/%s/%s]", + input.ExecutionCreateRequest.Project, input.ExecutionCreateRequest.Domain, + input.ExecutionCreateRequest.Name) duration, err := ptypes.Duration(input.ExecutionCreateRequest.Spec.QualityOfService.GetSpec().QueueingBudget) if err != nil { return QualityOfServiceSpec{}, errors.NewFlyteAdminErrorf(codes.InvalidArgument, @@ -98,6 +101,9 @@ func (q qualityOfServiceAllocator) GetQualityOfService(ctx context.Context, inpu qualityOfServiceTier = input.ExecutionCreateRequest.Spec.QualityOfService.GetTier() } else if input.LaunchPlan.Spec.QualityOfService != nil { if input.LaunchPlan.Spec.QualityOfService.GetSpec() != nil { + logger.Debugf(ctx, "Determining quality of service from launch plan spec for [%s/%s/%s]", + input.ExecutionCreateRequest.Project, input.ExecutionCreateRequest.Domain, + input.ExecutionCreateRequest.Name) duration, err := ptypes.Duration(input.LaunchPlan.Spec.QualityOfService.GetSpec().QueueingBudget) if err != nil { return QualityOfServiceSpec{}, errors.NewFlyteAdminErrorf(codes.InvalidArgument, @@ -112,6 +118,9 @@ func (q qualityOfServiceAllocator) GetQualityOfService(ctx context.Context, inpu qualityOfServiceTier = input.LaunchPlan.Spec.QualityOfService.GetTier() } else if input.Workflow.Closure.CompiledWorkflow.Primary.Template.Metadata != nil && input.Workflow.Closure.CompiledWorkflow.Primary.Template.Metadata.QualityOfService != nil { + logger.Debugf(ctx, "Determining quality of service from workflow spec for [%s/%s/%s]", + input.ExecutionCreateRequest.Project, input.ExecutionCreateRequest.Domain, + input.ExecutionCreateRequest.Name) if input.Workflow.Closure.CompiledWorkflow.Primary.Template.Metadata.QualityOfService.GetSpec() != nil { duration, err := ptypes.Duration(input.Workflow.Closure.CompiledWorkflow.Primary.Template.Metadata.QualityOfService. GetSpec().QueueingBudget) @@ -136,6 +145,9 @@ func (q qualityOfServiceAllocator) GetQualityOfService(ctx context.Context, inpu return QualityOfServiceSpec{}, err } if qualityOfService != nil && qualityOfService.GetSpec() != nil { + logger.Debugf(ctx, "Determining quality of service from spec database override for [%s/%s/%s]", + input.ExecutionCreateRequest.Project, input.ExecutionCreateRequest.Domain, + input.ExecutionCreateRequest.Name) duration, err := ptypes.Duration(qualityOfService.GetSpec().QueueingBudget) if err != nil { return QualityOfServiceSpec{}, errors.NewFlyteAdminErrorf(codes.InvalidArgument, @@ -147,6 +159,9 @@ func (q qualityOfServiceAllocator) GetQualityOfService(ctx context.Context, inpu QueuingBudget: duration, }, nil } else if qualityOfService != nil && qualityOfService.GetTier() != core.QualityOfService_UNDEFINED { + logger.Debugf(ctx, "Determining quality of service tier from database override for [%s/%s/%s]", + input.ExecutionCreateRequest.Project, input.ExecutionCreateRequest.Domain, + input.ExecutionCreateRequest.Name) qualityOfServiceTier = input.Workflow.Closure.CompiledWorkflow.Primary.Template.Metadata.QualityOfService.GetTier() } } @@ -154,6 +169,9 @@ func (q qualityOfServiceAllocator) GetQualityOfService(ctx context.Context, inpu // If we've come all this way and at no layer is an overridable configuration for the quality of service tier // set, use the default values from the admin application config. if qualityOfServiceTier == core.QualityOfService_UNDEFINED { + logger.Debugf(ctx, "Determining quality of service tier from application config override for [%s/%s/%s]", + input.ExecutionCreateRequest.Project, input.ExecutionCreateRequest.Domain, + input.ExecutionCreateRequest.Name) var ok bool qualityOfServiceTier, ok = q.config.QualityOfServiceConfiguration().GetDefaultTiers()[input.ExecutionCreateRequest.Domain] if !ok { @@ -166,6 +184,9 @@ func (q qualityOfServiceAllocator) GetQualityOfService(ctx context.Context, inpu // No queueing budget to set when no default is specified return QualityOfServiceSpec{}, nil } + logger.Debugf(ctx, "Determining quality of service spec from application config override for [%s/%s/%s] with tier [%v]", + input.ExecutionCreateRequest.Project, input.ExecutionCreateRequest.Domain, + input.ExecutionCreateRequest.Name, qualityOfServiceTier) // Config values should always be vetted so there's no need to check the error from conversion. duration, _ := ptypes.Duration(executionValues.QueueingBudget) From 335bcad401938e8b16cb38c9ef91158860a3988d Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Tue, 21 Jul 2020 14:24:55 -0700 Subject: [PATCH 5/6] make generate and add one mockery call (#108) --- .../mocks/authentication_context.go | 186 +++++++++++++++++- pkg/config/serverconfig_flags.go | 5 + pkg/config/serverconfig_flags_test.go | 110 +++++++++++ pkg/manager/impl/execution_manager_test.go | 12 +- .../executions/quality_of_service_test.go | 11 +- .../mocks/quality_of_service_configuration.go | 82 ++++++++ .../quality_of_service_configuration.go | 3 + .../mocks/mock_configuration_provider.go | 13 +- .../mocks/mock_quality_of_service_provider.go | 26 --- 9 files changed, 402 insertions(+), 46 deletions(-) create mode 100644 pkg/runtime/interfaces/mocks/quality_of_service_configuration.go delete mode 100644 pkg/runtime/mocks/mock_quality_of_service_provider.go diff --git a/pkg/auth/interfaces/mocks/authentication_context.go b/pkg/auth/interfaces/mocks/authentication_context.go index 759b492ad..34b6ef044 100644 --- a/pkg/auth/interfaces/mocks/authentication_context.go +++ b/pkg/auth/interfaces/mocks/authentication_context.go @@ -1,20 +1,46 @@ -// Code generated by mockery v1.0.0. DO NOT EDIT. +// Code generated by mockery v1.0.1. DO NOT EDIT. package mocks -import config "github.com/lyft/flyteadmin/pkg/auth/config" -import http "net/http" -import interfaces "github.com/lyft/flyteadmin/pkg/auth/interfaces" -import mock "github.com/stretchr/testify/mock" -import oauth2 "golang.org/x/oauth2" -import oidc "github.com/coreos/go-oidc" -import url "net/url" +import ( + http "net/http" + + config "github.com/lyft/flyteadmin/pkg/auth/config" + + interfaces "github.com/lyft/flyteadmin/pkg/auth/interfaces" + + mock "github.com/stretchr/testify/mock" + + oauth2 "golang.org/x/oauth2" + + oidc "github.com/coreos/go-oidc" + + url "net/url" +) // AuthenticationContext is an autogenerated mock type for the AuthenticationContext type type AuthenticationContext struct { mock.Mock } +type AuthenticationContext_Claims struct { + *mock.Call +} + +func (_m AuthenticationContext_Claims) Return(_a0 config.Claims) *AuthenticationContext_Claims { + return &AuthenticationContext_Claims{Call: _m.Call.Return(_a0)} +} + +func (_m *AuthenticationContext) OnClaims() *AuthenticationContext_Claims { + c := _m.On("Claims") + return &AuthenticationContext_Claims{Call: c} +} + +func (_m *AuthenticationContext) OnClaimsMatch(matchers ...interface{}) *AuthenticationContext_Claims { + c := _m.On("Claims", matchers...) + return &AuthenticationContext_Claims{Call: c} +} + // Claims provides a mock function with given fields: func (_m *AuthenticationContext) Claims() config.Claims { ret := _m.Called() @@ -29,6 +55,24 @@ func (_m *AuthenticationContext) Claims() config.Claims { return r0 } +type AuthenticationContext_CookieManager struct { + *mock.Call +} + +func (_m AuthenticationContext_CookieManager) Return(_a0 interfaces.CookieHandler) *AuthenticationContext_CookieManager { + return &AuthenticationContext_CookieManager{Call: _m.Call.Return(_a0)} +} + +func (_m *AuthenticationContext) OnCookieManager() *AuthenticationContext_CookieManager { + c := _m.On("CookieManager") + return &AuthenticationContext_CookieManager{Call: c} +} + +func (_m *AuthenticationContext) OnCookieManagerMatch(matchers ...interface{}) *AuthenticationContext_CookieManager { + c := _m.On("CookieManager", matchers...) + return &AuthenticationContext_CookieManager{Call: c} +} + // CookieManager provides a mock function with given fields: func (_m *AuthenticationContext) CookieManager() interfaces.CookieHandler { ret := _m.Called() @@ -45,6 +89,24 @@ func (_m *AuthenticationContext) CookieManager() interfaces.CookieHandler { return r0 } +type AuthenticationContext_GetBaseURL struct { + *mock.Call +} + +func (_m AuthenticationContext_GetBaseURL) Return(_a0 *url.URL) *AuthenticationContext_GetBaseURL { + return &AuthenticationContext_GetBaseURL{Call: _m.Call.Return(_a0)} +} + +func (_m *AuthenticationContext) OnGetBaseURL() *AuthenticationContext_GetBaseURL { + c := _m.On("GetBaseURL") + return &AuthenticationContext_GetBaseURL{Call: c} +} + +func (_m *AuthenticationContext) OnGetBaseURLMatch(matchers ...interface{}) *AuthenticationContext_GetBaseURL { + c := _m.On("GetBaseURL", matchers...) + return &AuthenticationContext_GetBaseURL{Call: c} +} + // GetBaseURL provides a mock function with given fields: func (_m *AuthenticationContext) GetBaseURL() *url.URL { ret := _m.Called() @@ -61,6 +123,24 @@ func (_m *AuthenticationContext) GetBaseURL() *url.URL { return r0 } +type AuthenticationContext_GetHTTPClient struct { + *mock.Call +} + +func (_m AuthenticationContext_GetHTTPClient) Return(_a0 *http.Client) *AuthenticationContext_GetHTTPClient { + return &AuthenticationContext_GetHTTPClient{Call: _m.Call.Return(_a0)} +} + +func (_m *AuthenticationContext) OnGetHTTPClient() *AuthenticationContext_GetHTTPClient { + c := _m.On("GetHTTPClient") + return &AuthenticationContext_GetHTTPClient{Call: c} +} + +func (_m *AuthenticationContext) OnGetHTTPClientMatch(matchers ...interface{}) *AuthenticationContext_GetHTTPClient { + c := _m.On("GetHTTPClient", matchers...) + return &AuthenticationContext_GetHTTPClient{Call: c} +} + // GetHTTPClient provides a mock function with given fields: func (_m *AuthenticationContext) GetHTTPClient() *http.Client { ret := _m.Called() @@ -77,6 +157,24 @@ func (_m *AuthenticationContext) GetHTTPClient() *http.Client { return r0 } +type AuthenticationContext_GetMetadataURL struct { + *mock.Call +} + +func (_m AuthenticationContext_GetMetadataURL) Return(_a0 *url.URL) *AuthenticationContext_GetMetadataURL { + return &AuthenticationContext_GetMetadataURL{Call: _m.Call.Return(_a0)} +} + +func (_m *AuthenticationContext) OnGetMetadataURL() *AuthenticationContext_GetMetadataURL { + c := _m.On("GetMetadataURL") + return &AuthenticationContext_GetMetadataURL{Call: c} +} + +func (_m *AuthenticationContext) OnGetMetadataURLMatch(matchers ...interface{}) *AuthenticationContext_GetMetadataURL { + c := _m.On("GetMetadataURL", matchers...) + return &AuthenticationContext_GetMetadataURL{Call: c} +} + // GetMetadataURL provides a mock function with given fields: func (_m *AuthenticationContext) GetMetadataURL() *url.URL { ret := _m.Called() @@ -93,6 +191,24 @@ func (_m *AuthenticationContext) GetMetadataURL() *url.URL { return r0 } +type AuthenticationContext_GetUserInfoURL struct { + *mock.Call +} + +func (_m AuthenticationContext_GetUserInfoURL) Return(_a0 *url.URL) *AuthenticationContext_GetUserInfoURL { + return &AuthenticationContext_GetUserInfoURL{Call: _m.Call.Return(_a0)} +} + +func (_m *AuthenticationContext) OnGetUserInfoURL() *AuthenticationContext_GetUserInfoURL { + c := _m.On("GetUserInfoURL") + return &AuthenticationContext_GetUserInfoURL{Call: c} +} + +func (_m *AuthenticationContext) OnGetUserInfoURLMatch(matchers ...interface{}) *AuthenticationContext_GetUserInfoURL { + c := _m.On("GetUserInfoURL", matchers...) + return &AuthenticationContext_GetUserInfoURL{Call: c} +} + // GetUserInfoURL provides a mock function with given fields: func (_m *AuthenticationContext) GetUserInfoURL() *url.URL { ret := _m.Called() @@ -109,6 +225,24 @@ func (_m *AuthenticationContext) GetUserInfoURL() *url.URL { return r0 } +type AuthenticationContext_OAuth2Config struct { + *mock.Call +} + +func (_m AuthenticationContext_OAuth2Config) Return(_a0 *oauth2.Config) *AuthenticationContext_OAuth2Config { + return &AuthenticationContext_OAuth2Config{Call: _m.Call.Return(_a0)} +} + +func (_m *AuthenticationContext) OnOAuth2Config() *AuthenticationContext_OAuth2Config { + c := _m.On("OAuth2Config") + return &AuthenticationContext_OAuth2Config{Call: c} +} + +func (_m *AuthenticationContext) OnOAuth2ConfigMatch(matchers ...interface{}) *AuthenticationContext_OAuth2Config { + c := _m.On("OAuth2Config", matchers...) + return &AuthenticationContext_OAuth2Config{Call: c} +} + // OAuth2Config provides a mock function with given fields: func (_m *AuthenticationContext) OAuth2Config() *oauth2.Config { ret := _m.Called() @@ -125,6 +259,24 @@ func (_m *AuthenticationContext) OAuth2Config() *oauth2.Config { return r0 } +type AuthenticationContext_OidcProvider struct { + *mock.Call +} + +func (_m AuthenticationContext_OidcProvider) Return(_a0 *oidc.Provider) *AuthenticationContext_OidcProvider { + return &AuthenticationContext_OidcProvider{Call: _m.Call.Return(_a0)} +} + +func (_m *AuthenticationContext) OnOidcProvider() *AuthenticationContext_OidcProvider { + c := _m.On("OidcProvider") + return &AuthenticationContext_OidcProvider{Call: c} +} + +func (_m *AuthenticationContext) OnOidcProviderMatch(matchers ...interface{}) *AuthenticationContext_OidcProvider { + c := _m.On("OidcProvider", matchers...) + return &AuthenticationContext_OidcProvider{Call: c} +} + // OidcProvider provides a mock function with given fields: func (_m *AuthenticationContext) OidcProvider() *oidc.Provider { ret := _m.Called() @@ -141,6 +293,24 @@ func (_m *AuthenticationContext) OidcProvider() *oidc.Provider { return r0 } +type AuthenticationContext_Options struct { + *mock.Call +} + +func (_m AuthenticationContext_Options) Return(_a0 config.OAuthOptions) *AuthenticationContext_Options { + return &AuthenticationContext_Options{Call: _m.Call.Return(_a0)} +} + +func (_m *AuthenticationContext) OnOptions() *AuthenticationContext_Options { + c := _m.On("Options") + return &AuthenticationContext_Options{Call: c} +} + +func (_m *AuthenticationContext) OnOptionsMatch(matchers ...interface{}) *AuthenticationContext_Options { + c := _m.On("Options", matchers...) + return &AuthenticationContext_Options{Call: c} +} + // Options provides a mock function with given fields: func (_m *AuthenticationContext) Options() config.OAuthOptions { ret := _m.Called() diff --git a/pkg/config/serverconfig_flags.go b/pkg/config/serverconfig_flags.go index 5d1dc16de..5a90a790f 100755 --- a/pkg/config/serverconfig_flags.go +++ b/pkg/config/serverconfig_flags.go @@ -64,8 +64,13 @@ func (cfg ServerConfig) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.String(fmt.Sprintf("%v%v", prefix, "security.oauth.redirectUrl"), defaultServerConfig.Security.Oauth.RedirectURL, "") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "security.oauth.httpAuthorizationHeader"), defaultServerConfig.Security.Oauth.HTTPAuthorizationHeader, "") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "security.oauth.grpcAuthorizationHeader"), defaultServerConfig.Security.Oauth.GrpcAuthorizationHeader, "") + cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "security.oauth.disableForHttp"), defaultServerConfig.Security.Oauth.DisableForHTTP, "") + cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "security.oauth.disableForGrpc"), defaultServerConfig.Security.Oauth.DisableForGrpc, "") + cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "security.auditAccess"), defaultServerConfig.Security.AuditAccess, "") cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "security.allowCors"), defaultServerConfig.Security.AllowCors, "") cmdFlags.StringSlice(fmt.Sprintf("%v%v", prefix, "security.allowedOrigins"), []string{}, "") cmdFlags.StringSlice(fmt.Sprintf("%v%v", prefix, "security.allowedHeaders"), []string{}, "") + cmdFlags.String(fmt.Sprintf("%v%v", prefix, "thirdPartyConfig.flyteClient.clientId"), defaultServerConfig.ThirdPartyConfig.FlyteClientConfig.ClientID, "public identifier for the app which handles authorization for a Flyte deployment") + cmdFlags.String(fmt.Sprintf("%v%v", prefix, "thirdPartyConfig.flyteClient.redirectUri"), defaultServerConfig.ThirdPartyConfig.FlyteClientConfig.RedirectURI, "This is the callback uri registered with the app which handles authorization for a Flyte deployment") return cmdFlags } diff --git a/pkg/config/serverconfig_flags_test.go b/pkg/config/serverconfig_flags_test.go index 6031e805c..b98784bef 100755 --- a/pkg/config/serverconfig_flags_test.go +++ b/pkg/config/serverconfig_flags_test.go @@ -605,6 +605,72 @@ func TestServerConfig_SetFlags(t *testing.T) { } }) }) + t.Run("Test_security.oauth.disableForHttp", func(t *testing.T) { + t.Run("DefaultValue", func(t *testing.T) { + // Test that default value is set properly + if vBool, err := cmdFlags.GetBool("security.oauth.disableForHttp"); err == nil { + assert.Equal(t, bool(defaultServerConfig.Security.Oauth.DisableForHTTP), vBool) + } else { + assert.FailNow(t, err.Error()) + } + }) + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("security.oauth.disableForHttp", testValue) + if vBool, err := cmdFlags.GetBool("security.oauth.disableForHttp"); err == nil { + testDecodeJson_ServerConfig(t, fmt.Sprintf("%v", vBool), &actual.Security.Oauth.DisableForHTTP) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) + t.Run("Test_security.oauth.disableForGrpc", func(t *testing.T) { + t.Run("DefaultValue", func(t *testing.T) { + // Test that default value is set properly + if vBool, err := cmdFlags.GetBool("security.oauth.disableForGrpc"); err == nil { + assert.Equal(t, bool(defaultServerConfig.Security.Oauth.DisableForGrpc), vBool) + } else { + assert.FailNow(t, err.Error()) + } + }) + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("security.oauth.disableForGrpc", testValue) + if vBool, err := cmdFlags.GetBool("security.oauth.disableForGrpc"); err == nil { + testDecodeJson_ServerConfig(t, fmt.Sprintf("%v", vBool), &actual.Security.Oauth.DisableForGrpc) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) + t.Run("Test_security.auditAccess", func(t *testing.T) { + t.Run("DefaultValue", func(t *testing.T) { + // Test that default value is set properly + if vBool, err := cmdFlags.GetBool("security.auditAccess"); err == nil { + assert.Equal(t, bool(defaultServerConfig.Security.AuditAccess), vBool) + } else { + assert.FailNow(t, err.Error()) + } + }) + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("security.auditAccess", testValue) + if vBool, err := cmdFlags.GetBool("security.auditAccess"); err == nil { + testDecodeJson_ServerConfig(t, fmt.Sprintf("%v", vBool), &actual.Security.AuditAccess) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) t.Run("Test_security.allowCors", func(t *testing.T) { t.Run("DefaultValue", func(t *testing.T) { // Test that default value is set properly @@ -671,4 +737,48 @@ func TestServerConfig_SetFlags(t *testing.T) { } }) }) + t.Run("Test_thirdPartyConfig.flyteClient.clientId", func(t *testing.T) { + t.Run("DefaultValue", func(t *testing.T) { + // Test that default value is set properly + if vString, err := cmdFlags.GetString("thirdPartyConfig.flyteClient.clientId"); err == nil { + assert.Equal(t, string(defaultServerConfig.ThirdPartyConfig.FlyteClientConfig.ClientID), vString) + } else { + assert.FailNow(t, err.Error()) + } + }) + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("thirdPartyConfig.flyteClient.clientId", testValue) + if vString, err := cmdFlags.GetString("thirdPartyConfig.flyteClient.clientId"); err == nil { + testDecodeJson_ServerConfig(t, fmt.Sprintf("%v", vString), &actual.ThirdPartyConfig.FlyteClientConfig.ClientID) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) + t.Run("Test_thirdPartyConfig.flyteClient.redirectUri", func(t *testing.T) { + t.Run("DefaultValue", func(t *testing.T) { + // Test that default value is set properly + if vString, err := cmdFlags.GetString("thirdPartyConfig.flyteClient.redirectUri"); err == nil { + assert.Equal(t, string(defaultServerConfig.ThirdPartyConfig.FlyteClientConfig.RedirectURI), vString) + } else { + assert.FailNow(t, err.Error()) + } + }) + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("thirdPartyConfig.flyteClient.redirectUri", testValue) + if vString, err := cmdFlags.GetString("thirdPartyConfig.flyteClient.redirectUri"); err == nil { + testDecodeJson_ServerConfig(t, fmt.Sprintf("%v", vString), &actual.ThirdPartyConfig.FlyteClientConfig.RedirectURI) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) } diff --git a/pkg/manager/impl/execution_manager_test.go b/pkg/manager/impl/execution_manager_test.go index 4440d575f..d8ec76386 100644 --- a/pkg/manager/impl/execution_manager_test.go +++ b/pkg/manager/impl/execution_manager_test.go @@ -35,6 +35,7 @@ import ( repositoryMocks "github.com/lyft/flyteadmin/pkg/repositories/mocks" "github.com/lyft/flyteadmin/pkg/repositories/models" runtimeInterfaces "github.com/lyft/flyteadmin/pkg/runtime/interfaces" + runtimeIFaceMocks "github.com/lyft/flyteadmin/pkg/runtime/interfaces/mocks" runtimeMocks "github.com/lyft/flyteadmin/pkg/runtime/mocks" workflowengineInterfaces "github.com/lyft/flyteadmin/pkg/workflowengine/interfaces" workflowengineMocks "github.com/lyft/flyteadmin/pkg/workflowengine/mocks" @@ -222,8 +223,9 @@ func TestCreateExecution(t *testing.T) { Cluster: testCluster, }, nil }) - qosProvider := runtimeMocks.NewMockQualityOfServiceProvider() - qosProvider.(*runtimeMocks.MockQualityOfServiceProvider).TierExecutionValues = map[core.QualityOfService_Tier]core.QualityOfServiceSpec{ + //qosProvider := runtimeMocks.NewMockQualityOfServiceProvider() + qosProvider := &runtimeIFaceMocks.QualityOfServiceConfiguration{} + qosProvider.OnGetTierExecutionValues().Return(map[core.QualityOfService_Tier]core.QualityOfServiceSpec{ core.QualityOfService_HIGH: { QueueingBudget: ptypes.DurationProto(10 * time.Minute), }, @@ -233,11 +235,11 @@ func TestCreateExecution(t *testing.T) { core.QualityOfService_LOW: { QueueingBudget: ptypes.DurationProto(30 * time.Minute), }, - } + }) - qosProvider.(*runtimeMocks.MockQualityOfServiceProvider).DefaultTiers = map[string]core.QualityOfService_Tier{ + qosProvider.OnGetDefaultTiers().Return(map[string]core.QualityOfService_Tier{ "domain": core.QualityOfService_HIGH, - } + }) mockConfig := getMockExecutionsConfigProvider() mockConfig.(*runtimeMocks.MockConfigurationProvider).AddQualityOfServiceConfiguration(qosProvider) diff --git a/pkg/manager/impl/executions/quality_of_service_test.go b/pkg/manager/impl/executions/quality_of_service_test.go index 738e19680..b0d2886ce 100644 --- a/pkg/manager/impl/executions/quality_of_service_test.go +++ b/pkg/manager/impl/executions/quality_of_service_test.go @@ -9,6 +9,7 @@ import ( "github.com/lyft/flyteadmin/pkg/manager/interfaces" managerMocks "github.com/lyft/flyteadmin/pkg/manager/mocks" runtimeInterfaces "github.com/lyft/flyteadmin/pkg/runtime/interfaces" + runtimeIFaceMocks "github.com/lyft/flyteadmin/pkg/runtime/interfaces/mocks" "github.com/lyft/flyteadmin/pkg/runtime/mocks" runtimeMocks "github.com/lyft/flyteadmin/pkg/runtime/mocks" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin" @@ -35,8 +36,8 @@ func getQualityOfServiceWithDuration(duration time.Duration) *core.QualityOfServ func getMockConfig() runtimeInterfaces.Configuration { mockConfig := mocks.NewMockConfigurationProvider(nil, nil, nil, nil, nil, nil) - provider := runtimeMocks.NewMockQualityOfServiceProvider() - provider.(*runtimeMocks.MockQualityOfServiceProvider).TierExecutionValues = map[core.QualityOfService_Tier]core.QualityOfServiceSpec{ + provider := &runtimeIFaceMocks.QualityOfServiceConfiguration{} + provider.OnGetTierExecutionValues().Return(map[core.QualityOfService_Tier]core.QualityOfServiceSpec{ core.QualityOfService_HIGH: { QueueingBudget: ptypes.DurationProto(10 * time.Minute), }, @@ -46,12 +47,12 @@ func getMockConfig() runtimeInterfaces.Configuration { core.QualityOfService_LOW: { QueueingBudget: ptypes.DurationProto(30 * time.Minute), }, - } + }) - provider.(*runtimeMocks.MockQualityOfServiceProvider).DefaultTiers = map[string]core.QualityOfService_Tier{ + provider.OnGetDefaultTiers().Return(map[string]core.QualityOfService_Tier{ "production": core.QualityOfService_HIGH, "development": core.QualityOfService_LOW, - } + }) mockConfig.(*runtimeMocks.MockConfigurationProvider).AddQualityOfServiceConfiguration(provider) return mockConfig diff --git a/pkg/runtime/interfaces/mocks/quality_of_service_configuration.go b/pkg/runtime/interfaces/mocks/quality_of_service_configuration.go new file mode 100644 index 000000000..ff87aecb5 --- /dev/null +++ b/pkg/runtime/interfaces/mocks/quality_of_service_configuration.go @@ -0,0 +1,82 @@ +// Code generated by mockery v1.0.1. DO NOT EDIT. + +package mocks + +import ( + core "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" + + mock "github.com/stretchr/testify/mock" +) + +// QualityOfServiceConfiguration is an autogenerated mock type for the QualityOfServiceConfiguration type +type QualityOfServiceConfiguration struct { + mock.Mock +} + +type QualityOfServiceConfiguration_GetDefaultTiers struct { + *mock.Call +} + +func (_m QualityOfServiceConfiguration_GetDefaultTiers) Return(_a0 map[string]core.QualityOfService_Tier) *QualityOfServiceConfiguration_GetDefaultTiers { + return &QualityOfServiceConfiguration_GetDefaultTiers{Call: _m.Call.Return(_a0)} +} + +func (_m *QualityOfServiceConfiguration) OnGetDefaultTiers() *QualityOfServiceConfiguration_GetDefaultTiers { + c := _m.On("GetDefaultTiers") + return &QualityOfServiceConfiguration_GetDefaultTiers{Call: c} +} + +func (_m *QualityOfServiceConfiguration) OnGetDefaultTiersMatch(matchers ...interface{}) *QualityOfServiceConfiguration_GetDefaultTiers { + c := _m.On("GetDefaultTiers", matchers...) + return &QualityOfServiceConfiguration_GetDefaultTiers{Call: c} +} + +// GetDefaultTiers provides a mock function with given fields: +func (_m *QualityOfServiceConfiguration) GetDefaultTiers() map[string]core.QualityOfService_Tier { + ret := _m.Called() + + var r0 map[string]core.QualityOfService_Tier + if rf, ok := ret.Get(0).(func() map[string]core.QualityOfService_Tier); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[string]core.QualityOfService_Tier) + } + } + + return r0 +} + +type QualityOfServiceConfiguration_GetTierExecutionValues struct { + *mock.Call +} + +func (_m QualityOfServiceConfiguration_GetTierExecutionValues) Return(_a0 map[core.QualityOfService_Tier]core.QualityOfServiceSpec) *QualityOfServiceConfiguration_GetTierExecutionValues { + return &QualityOfServiceConfiguration_GetTierExecutionValues{Call: _m.Call.Return(_a0)} +} + +func (_m *QualityOfServiceConfiguration) OnGetTierExecutionValues() *QualityOfServiceConfiguration_GetTierExecutionValues { + c := _m.On("GetTierExecutionValues") + return &QualityOfServiceConfiguration_GetTierExecutionValues{Call: c} +} + +func (_m *QualityOfServiceConfiguration) OnGetTierExecutionValuesMatch(matchers ...interface{}) *QualityOfServiceConfiguration_GetTierExecutionValues { + c := _m.On("GetTierExecutionValues", matchers...) + return &QualityOfServiceConfiguration_GetTierExecutionValues{Call: c} +} + +// GetTierExecutionValues provides a mock function with given fields: +func (_m *QualityOfServiceConfiguration) GetTierExecutionValues() map[core.QualityOfService_Tier]core.QualityOfServiceSpec { + ret := _m.Called() + + var r0 map[core.QualityOfService_Tier]core.QualityOfServiceSpec + if rf, ok := ret.Get(0).(func() map[core.QualityOfService_Tier]core.QualityOfServiceSpec); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[core.QualityOfService_Tier]core.QualityOfServiceSpec) + } + } + + return r0 +} diff --git a/pkg/runtime/interfaces/quality_of_service_configuration.go b/pkg/runtime/interfaces/quality_of_service_configuration.go index 4d5265801..a8b7055bc 100644 --- a/pkg/runtime/interfaces/quality_of_service_configuration.go +++ b/pkg/runtime/interfaces/quality_of_service_configuration.go @@ -7,6 +7,9 @@ import ( type TierName = string +// Just incrementally start using mockery, replace with -all when working on https://github.com/lyft/flyte/issues/149 +//go:generate mockery -name QualityOfServiceConfiguration -output=mocks -case=underscore + type QualityOfServiceSpec struct { QueueingBudget config.Duration `json:"queueingBudget"` } diff --git a/pkg/runtime/mocks/mock_configuration_provider.go b/pkg/runtime/mocks/mock_configuration_provider.go index 47c948f41..7c06af698 100644 --- a/pkg/runtime/mocks/mock_configuration_provider.go +++ b/pkg/runtime/mocks/mock_configuration_provider.go @@ -1,6 +1,10 @@ package mocks -import "github.com/lyft/flyteadmin/pkg/runtime/interfaces" +import ( + "github.com/lyft/flyteadmin/pkg/runtime/interfaces" + ifaceMocks "github.com/lyft/flyteadmin/pkg/runtime/interfaces/mocks" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" +) type MockConfigurationProvider struct { applicationConfiguration interfaces.ApplicationConfiguration @@ -73,6 +77,11 @@ func NewMockConfigurationProvider( taskResourceConfiguration interfaces.TaskResourceConfiguration, whitelistConfiguration interfaces.WhitelistConfiguration, namespaceMappingConfiguration interfaces.NamespaceMappingConfiguration) interfaces.Configuration { + + mockQualityOfServiceConfiguration := &ifaceMocks.QualityOfServiceConfiguration{} + mockQualityOfServiceConfiguration.OnGetDefaultTiers().Return(make(map[string]core.QualityOfService_Tier)) + mockQualityOfServiceConfiguration.OnGetTierExecutionValues().Return(make(map[core.QualityOfService_Tier]core.QualityOfServiceSpec)) + return &MockConfigurationProvider{ applicationConfiguration: applicationConfiguration, queueConfiguration: queueConfiguration, @@ -80,6 +89,6 @@ func NewMockConfigurationProvider( taskResourceConfiguration: taskResourceConfiguration, whitelistConfiguration: whitelistConfiguration, namespaceMappingConfiguration: namespaceMappingConfiguration, - qualityOfServiceConfiguration: NewMockQualityOfServiceProvider(), + qualityOfServiceConfiguration: mockQualityOfServiceConfiguration, } } diff --git a/pkg/runtime/mocks/mock_quality_of_service_provider.go b/pkg/runtime/mocks/mock_quality_of_service_provider.go deleted file mode 100644 index e572ba642..000000000 --- a/pkg/runtime/mocks/mock_quality_of_service_provider.go +++ /dev/null @@ -1,26 +0,0 @@ -package mocks - -import ( - "github.com/lyft/flyteadmin/pkg/runtime/interfaces" - "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" -) - -type MockQualityOfServiceProvider struct { - TierExecutionValues map[core.QualityOfService_Tier]core.QualityOfServiceSpec - DefaultTiers map[string]core.QualityOfService_Tier -} - -func (p MockQualityOfServiceProvider) GetTierExecutionValues() map[core.QualityOfService_Tier]core.QualityOfServiceSpec { - return p.TierExecutionValues -} - -func (p MockQualityOfServiceProvider) GetDefaultTiers() map[string]core.QualityOfService_Tier { - return p.DefaultTiers -} - -func NewMockQualityOfServiceProvider() interfaces.QualityOfServiceConfiguration { - return &MockQualityOfServiceProvider{ - TierExecutionValues: make(map[core.QualityOfService_Tier]core.QualityOfServiceSpec), - DefaultTiers: make(map[string]core.QualityOfService_Tier), - } -} From ae66d065114bbe5afb21e74a2226bcf2ddb07316 Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Wed, 22 Jul 2020 12:57:26 -0700 Subject: [PATCH 6/6] rm --- pkg/manager/impl/execution_manager_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/manager/impl/execution_manager_test.go b/pkg/manager/impl/execution_manager_test.go index d8ec76386..9b42efe1a 100644 --- a/pkg/manager/impl/execution_manager_test.go +++ b/pkg/manager/impl/execution_manager_test.go @@ -223,7 +223,6 @@ func TestCreateExecution(t *testing.T) { Cluster: testCluster, }, nil }) - //qosProvider := runtimeMocks.NewMockQualityOfServiceProvider() qosProvider := &runtimeIFaceMocks.QualityOfServiceConfiguration{} qosProvider.OnGetTierExecutionValues().Return(map[core.QualityOfService_Tier]core.QualityOfServiceSpec{ core.QualityOfService_HIGH: {