Skip to content

Commit

Permalink
Hide generated launch plans starting with .flytegen in the UI (#5949)
Browse files Browse the repository at this point in the history
Signed-off-by: troychiu <[email protected]>
  • Loading branch information
troychiu authored Nov 6, 2024
1 parent 96c467e commit f143481
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 26 deletions.
13 changes: 13 additions & 0 deletions flyteadmin/pkg/common/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type GormQueryExpr struct {
// Complete set of filters available for database queries.
const (
Contains FilterExpression = iota
NotLike
GreaterThan
GreaterThanOrEqual
LessThan
Expand All @@ -37,6 +38,7 @@ const (
joinArgsFormat = "%s.%s"
containsQuery = "%s LIKE ?"
containsArgs = "%%%s%%"
notLikeQuery = "%s NOT LIKE ?"
greaterThanQuery = "%s > ?"
greaterThanOrEqualQuery = "%s >= ?"
lessThanQuery = "%s < ?"
Expand All @@ -50,6 +52,7 @@ const (
// Set of available filters which exclusively accept a single argument value.
var singleValueFilters = map[FilterExpression]bool{
Contains: true,
NotLike: true,
GreaterThan: true,
GreaterThanOrEqual: true,
LessThan: true,
Expand All @@ -68,6 +71,7 @@ const EqualExpression = "eq"

var filterNameMappings = map[string]FilterExpression{
"contains": Contains,
"not_like": NotLike,
"gt": GreaterThan,
"gte": GreaterThanOrEqual,
"lt": LessThan,
Expand All @@ -80,6 +84,7 @@ var filterNameMappings = map[string]FilterExpression{

var filterQueryMappings = map[FilterExpression]string{
Contains: containsQuery,
NotLike: notLikeQuery,
GreaterThan: greaterThanQuery,
GreaterThanOrEqual: greaterThanOrEqualQuery,
LessThan: lessThanQuery,
Expand Down Expand Up @@ -117,6 +122,8 @@ func getFilterExpressionName(expression FilterExpression) string {
switch expression {
case Contains:
return "contains"
case NotLike:
return "not like"
case GreaterThan:
return "greater than"
case GreaterThanOrEqual:
Expand Down Expand Up @@ -208,6 +215,12 @@ func (f *inlineFilterImpl) getGormQueryExpr(formattedField string) (GormQueryExp
// args renders to something like: "%value%"
Args: fmt.Sprintf(containsArgs, f.value),
}, nil
case NotLike:
return GormQueryExpr{
// WHERE field NOT LIKE value
Query: fmt.Sprintf(notLikeQuery, formattedField),
Args: f.value,
}, nil
case GreaterThan:
return GormQueryExpr{
// WHERE field > value
Expand Down
16 changes: 16 additions & 0 deletions flyteadmin/pkg/common/filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func TestGetGormJoinTableQueryExpr(t *testing.T) {

var expectedArgsForFilters = map[FilterExpression]string{
Contains: "%value%",
NotLike: "value",
GreaterThan: "value",
GreaterThanOrEqual: "value",
LessThan: "value",
Expand Down Expand Up @@ -169,3 +170,18 @@ func TestWithDefaultValueFilter(t *testing.T) {
assert.Equal(t, "COALESCE(named_entity_metadata.state, 0) = ?", queryExpression.Query)
assert.Equal(t, 1, queryExpression.Args)
}

func TestNotLikeFilter(t *testing.T) {
filter, err := NewSingleValueFilter(NamedEntityMetadata, NotLike, "name", ".flytegen%")
assert.NoError(t, err)

queryExpression, err := filter.GetGormQueryExpr()
assert.NoError(t, err)
assert.Equal(t, "name NOT LIKE ?", queryExpression.Query)
assert.Equal(t, ".flytegen%", queryExpression.Args)

queryExpression, err = filter.GetGormJoinTableQueryExpr("named_entity_metadata")
assert.NoError(t, err)
assert.Equal(t, "named_entity_metadata.name NOT LIKE ?", queryExpression.Query)
assert.Equal(t, ".flytegen%", queryExpression.Args)
}
2 changes: 1 addition & 1 deletion flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ func (m *ExecutionManager) launchSingleTaskExecution(
return nil, nil, err
}

launchPlan, err := util.CreateOrGetLaunchPlan(ctx, m.db, m.config, taskIdentifier,
launchPlan, err := util.CreateOrGetLaunchPlan(ctx, m.db, m.config, m.namedEntityManager, taskIdentifier,
workflow.Closure.CompiledWorkflow.Primary.Template.Interface, workflowModel.ID, request.Spec)
if err != nil {
return nil, nil, err
Expand Down
21 changes: 7 additions & 14 deletions flyteadmin/pkg/manager/impl/named_entity_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package impl

import (
"context"
"fmt"
"strconv"
"strings"

Expand All @@ -17,21 +18,13 @@ import (
"github.com/flyteorg/flyte/flyteadmin/pkg/repositories/transformers"
runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytestdlib/contextutils"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
)

const state = "state"

// System-generated workflows are meant to be hidden from the user by default. Therefore we always only show
// workflow-type named entities that have been user generated only.
var nonSystemGeneratedWorkflowsFilter, _ = common.NewSingleValueFilter(
common.NamedEntityMetadata, common.NotEqual, state, admin.NamedEntityState_SYSTEM_GENERATED)
var defaultWorkflowsFilter, _ = common.NewWithDefaultValueFilter(
strconv.Itoa(int(admin.NamedEntityState_NAMED_ENTITY_ACTIVE)), nonSystemGeneratedWorkflowsFilter)

type NamedEntityMetrics struct {
Scope promutils.Scope
}
Expand Down Expand Up @@ -75,12 +68,8 @@ func (m *NamedEntityManager) GetNamedEntity(ctx context.Context, request *admin.
return util.GetNamedEntity(ctx, m.db, request.ResourceType, request.Id)
}

func (m *NamedEntityManager) getQueryFilters(referenceEntity core.ResourceType, requestFilters string) ([]common.InlineFilter, error) {
func (m *NamedEntityManager) getQueryFilters(requestFilters string) ([]common.InlineFilter, error) {
filters := make([]common.InlineFilter, 0)
if referenceEntity == core.ResourceType_WORKFLOW {
filters = append(filters, defaultWorkflowsFilter)
}

if len(requestFilters) == 0 {
return filters, nil
}
Expand Down Expand Up @@ -111,10 +100,14 @@ func (m *NamedEntityManager) ListNamedEntities(ctx context.Context, request *adm
}
ctx = contextutils.WithProjectDomain(ctx, request.Project, request.Domain)

if len(request.Filters) == 0 {
// Add implicit filter to exclude system generated workflows
request.Filters = fmt.Sprintf("not_like(name,%s)", ".flytegen%")
}
// HACK: In order to filter by state (if requested) - we need to amend the filter to use COALESCE
// e.g. eq(state, 1) becomes 'WHERE (COALESCE(state, 0) = '1')' since not every NamedEntity necessarily
// has an entry, and therefore the default state value '0' (active), should be assumed.
filters, err := m.getQueryFilters(request.ResourceType, request.Filters)
filters, err := m.getQueryFilters(request.Filters)
if err != nil {
return nil, err
}
Expand Down
10 changes: 3 additions & 7 deletions flyteadmin/pkg/manager/impl/named_entity_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestNamedEntityManager_Get_BadRequest(t *testing.T) {
func TestNamedEntityManager_getQueryFilters(t *testing.T) {
repository := getMockRepositoryForNETest()
manager := NewNamedEntityManager(repository, getMockConfigForNETest(), mockScope.NewTestScope())
updatedFilters, err := manager.(*NamedEntityManager).getQueryFilters(core.ResourceType_TASK, "eq(state, 0)")
updatedFilters, err := manager.(*NamedEntityManager).getQueryFilters("eq(state, 0)")
assert.NoError(t, err)
assert.Len(t, updatedFilters, 1)

Expand All @@ -97,13 +97,9 @@ func TestNamedEntityManager_getQueryFilters(t *testing.T) {
assert.Equal(t, "COALESCE(state, 0) = ?", queryExp.Query)
assert.Equal(t, "0", queryExp.Args)

updatedFilters, err = manager.(*NamedEntityManager).getQueryFilters(core.ResourceType_WORKFLOW, "")
updatedFilters, err = manager.(*NamedEntityManager).getQueryFilters("")
assert.NoError(t, err)
assert.Len(t, updatedFilters, 1)
queryExp, err = updatedFilters[0].GetGormQueryExpr()
assert.NoError(t, err)
assert.Equal(t, "COALESCE(state, 0) <> ?", queryExp.Query)
assert.Equal(t, admin.NamedEntityState_SYSTEM_GENERATED, queryExp.Args)
assert.Len(t, updatedFilters, 0)
}

func TestNamedEntityManager_Update(t *testing.T) {
Expand Down
15 changes: 14 additions & 1 deletion flyteadmin/pkg/manager/impl/util/single_task_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func CreateOrGetWorkflowModel(
}

func CreateOrGetLaunchPlan(ctx context.Context,
db repositoryInterfaces.Repository, config runtimeInterfaces.Configuration, taskIdentifier *core.Identifier,
db repositoryInterfaces.Repository, config runtimeInterfaces.Configuration, namedEntityManager interfaces.NamedEntityInterface, taskIdentifier *core.Identifier,
workflowInterface *core.TypedInterface, workflowID uint, spec *admin.ExecutionSpec) (*admin.LaunchPlan, error) {
var launchPlan *admin.LaunchPlan
var err error
Expand Down Expand Up @@ -226,6 +226,19 @@ func CreateOrGetLaunchPlan(ctx context.Context,
logger.Errorf(ctx, "Failed to save launch plan model [%+v] with err: %v", launchPlanIdentifier, err)
return nil, err
}
_, err = namedEntityManager.UpdateNamedEntity(ctx, &admin.NamedEntityUpdateRequest{
ResourceType: core.ResourceType_LAUNCH_PLAN,
Id: &admin.NamedEntityIdentifier{
Project: launchPlan.GetId().GetProject(),
Domain: launchPlan.GetId().GetDomain(),
Name: launchPlan.GetId().GetName(),
},
Metadata: &admin.NamedEntityMetadata{State: admin.NamedEntityState_SYSTEM_GENERATED},
})
if err != nil {
logger.Warningf(ctx, "Failed to set launch plan state to system-generated: %v", err)
return nil, err
}
}

return launchPlan, nil
Expand Down
17 changes: 16 additions & 1 deletion flyteadmin/pkg/manager/impl/util/single_task_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,21 @@ func TestCreateOrGetLaunchPlan(t *testing.T) {
},
}
workflowID := uint(12)

mockNamedEntityManager := managerMocks.NamedEntityManager{}
mockNamedEntityManager.UpdateNamedEntityFunc = func(ctx context.Context, request *admin.NamedEntityUpdateRequest) (*admin.NamedEntityUpdateResponse, error) {
assert.Equal(t, request.ResourceType, core.ResourceType_LAUNCH_PLAN)
assert.True(t, proto.Equal(request.Id, &admin.NamedEntityIdentifier{
Project: "flytekit",
Domain: "production",
Name: ".flytegen.app.workflows.MyWorkflow.my_task",
}), fmt.Sprintf("%+v", request.Id))
assert.True(t, proto.Equal(request.Metadata, &admin.NamedEntityMetadata{
State: admin.NamedEntityState_SYSTEM_GENERATED,
}))
return &admin.NamedEntityUpdateResponse{}, nil
}

taskIdentifier := &core.Identifier{
ResourceType: core.ResourceType_TASK,
Project: "flytekit",
Expand All @@ -233,7 +248,7 @@ func TestCreateOrGetLaunchPlan(t *testing.T) {
},
}
launchPlan, err := CreateOrGetLaunchPlan(
context.Background(), repository, config, taskIdentifier, workflowInterface, workflowID, &spec)
context.Background(), repository, config, &mockNamedEntityManager, taskIdentifier, workflowInterface, workflowID, &spec)
assert.NoError(t, err)
assert.True(t, proto.Equal(&core.Identifier{
ResourceType: core.ResourceType_LAUNCH_PLAN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
)

var archivableResourceTypes = sets.NewInt32(int32(core.ResourceType_WORKFLOW), int32(core.ResourceType_TASK))
var archivableResourceTypes = sets.NewInt32(int32(core.ResourceType_WORKFLOW), int32(core.ResourceType_TASK), int32(core.ResourceType_LAUNCH_PLAN))

func ValidateNamedEntityGetRequest(request *admin.NamedEntityGetRequest) error {
if err := ValidateResourceType(request.ResourceType); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestValidateNamedEntityUpdateRequest(t *testing.T) {
},
}))
assert.Equal(t, codes.InvalidArgument, ValidateNamedEntityUpdateRequest(&admin.NamedEntityUpdateRequest{
ResourceType: core.ResourceType_LAUNCH_PLAN,
ResourceType: core.ResourceType_DATASET,
Id: &admin.NamedEntityIdentifier{
Project: "project",
Domain: "domain",
Expand Down Expand Up @@ -141,6 +141,30 @@ func TestValidateNamedEntityUpdateRequest(t *testing.T) {
State: admin.NamedEntityState_NAMED_ENTITY_ARCHIVED,
},
}))

assert.Nil(t, ValidateNamedEntityUpdateRequest(&admin.NamedEntityUpdateRequest{
ResourceType: core.ResourceType_LAUNCH_PLAN,
Id: &admin.NamedEntityIdentifier{
Project: "project",
Domain: "domain",
Name: "name",
},
Metadata: &admin.NamedEntityMetadata{
Description: "description",
},
}))

assert.Nil(t, ValidateNamedEntityUpdateRequest(&admin.NamedEntityUpdateRequest{
ResourceType: core.ResourceType_LAUNCH_PLAN,
Id: &admin.NamedEntityIdentifier{
Project: "project",
Domain: "domain",
Name: "name",
},
Metadata: &admin.NamedEntityMetadata{
State: admin.NamedEntityState_NAMED_ENTITY_ARCHIVED,
},
}))
}

func TestValidateNamedEntityListRequest(t *testing.T) {
Expand Down

0 comments on commit f143481

Please sign in to comment.