Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update launch condition column #4903

Merged
merged 11 commits into from
Feb 27, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,18 @@
if err := validateSchedule(request, expectedInputs); err != nil {
return err
}

// Augment default inputs with the unbound workflow inputs.
request.Spec.DefaultInputs = expectedInputs
if request.Spec.EntityMetadata != nil {
if err := validateNotifications(request.Spec.EntityMetadata.Notifications); err != nil {
return err
}
if request.GetSpec().GetEntityMetadata().GetLaunchConditions() != nil {
return errors.NewFlyteAdminErrorf(
codes.InvalidArgument,
"Launch condition must be empty, found %v", request.GetSpec().GetEntityMetadata().GetLaunchConditions())
}

Check warning on line 68 in flyteadmin/pkg/manager/impl/validation/launch_plan_validator.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/manager/impl/validation/launch_plan_validator.go#L64-L68

Added lines #L64 - L68 were not covered by tests
}
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func TestValidateSchedule_ArgNotFixed(t *testing.T) {
})
t.Run("with rate", func(t *testing.T) {
request := testutils.GetLaunchPlanRequestWithFixedRateSchedule(2, admin.FixedRateUnit_HOUR)

err := validateSchedule(request, inputMap)
assert.NotNil(t, err)
})
Expand Down
43 changes: 42 additions & 1 deletion flyteadmin/pkg/repositories/config/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -1182,7 +1182,48 @@
},
}

var Migrations = append(LegacyMigrations, NoopMigrations...)
// ContinuedMigrations - Above are a series of migrations labeled as no-op migrations. These are migrations that we
// wrote to bring the then-existing migrations up to the Gorm standard, which is to write from scratch, each struct
// that we want auto-migrated, inside each function. Previously we had not been doing that. The idea is that we will
// one day delete the migrations prior to the no-op series. New migrations should continue in this array here, again
// using the proper Gorm methodology of including the struct definitions inside each migration function.
var ContinuedMigrations = []*gormigrate.Migration{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment here explaining what ContinuedMigrations is and that we should now add new migrations here?

{
ID: "pg-continue-2024-02-launchplan",
Migrate: func(tx *gorm.DB) error {
type LaunchPlanScheduleType string
type LaunchConditionType string

type LaunchPlan struct {
ID uint `gorm:"index;autoIncrement;not null"`
CreatedAt time.Time `gorm:"type:time"`
UpdatedAt time.Time `gorm:"type:time"`
DeletedAt *time.Time `gorm:"index"`
Project string `gorm:"primary_key;index:lp_project_domain_name_idx,lp_project_domain_idx" valid:"length(0|255)"`
Domain string `gorm:"primary_key;index:lp_project_domain_name_idx,lp_project_domain_idx" valid:"length(0|255)"`
Name string `gorm:"primary_key;index:lp_project_domain_name_idx" valid:"length(0|255)"`
Version string `gorm:"primary_key" valid:"length(0|255)"`
Spec []byte `gorm:"not null"`
WorkflowID uint `gorm:"index"`
Closure []byte `gorm:"not null"`
// GORM doesn't save the zero value for ints, so we use a pointer for the State field
State *int32 `gorm:"default:0"`
// Hash of the launch plan
Digest []byte
ScheduleType LaunchPlanScheduleType
// store the type of event that this launch plan is triggered by, can be empty, or SCHED
LaunchConditionType *LaunchConditionType `gorm:"type:varchar(32);index:idx_launch_plans_launch_conditions,where:launch_condition_type is not null"`
}
return tx.AutoMigrate(&LaunchPlan{})
},
Rollback: func(tx *gorm.DB) error {
return tx.Table("launch_plans").Migrator().DropColumn(&models.LaunchPlan{}, "launch_condition_type")
},

Check warning on line 1221 in flyteadmin/pkg/repositories/config/migrations.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/repositories/config/migrations.go#L1193-L1221

Added lines #L1193 - L1221 were not covered by tests
},
}

var m = append(LegacyMigrations, NoopMigrations...)
var Migrations = append(m, ContinuedMigrations...)

func alterTableColumnType(db *sql.DB, columnName, columnType string) error {
var err error
Expand Down
8 changes: 4 additions & 4 deletions flyteadmin/pkg/repositories/gormimpl/launch_plan_repo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func TestListLaunchPlans_Pagination(t *testing.T) {
GlobalMock := mocket.Catcher.Reset()

GlobalMock.NewMock().WithQuery(
`SELECT "launch_plans"."id","launch_plans"."created_at","launch_plans"."updated_at","launch_plans"."deleted_at","launch_plans"."project","launch_plans"."domain","launch_plans"."name","launch_plans"."version","launch_plans"."spec","launch_plans"."workflow_id","launch_plans"."closure","launch_plans"."state","launch_plans"."digest","launch_plans"."schedule_type" FROM "launch_plans" inner join workflows on launch_plans.workflow_id = workflows.id WHERE launch_plans.project = $1 AND launch_plans.domain = $2 AND launch_plans.name = $3 LIMIT 2 OFFSET 1`).WithReply(launchPlans)
`SELECT "launch_plans"."id","launch_plans"."created_at","launch_plans"."updated_at","launch_plans"."deleted_at","launch_plans"."project","launch_plans"."domain","launch_plans"."name","launch_plans"."version","launch_plans"."spec","launch_plans"."workflow_id","launch_plans"."closure","launch_plans"."state","launch_plans"."digest","launch_plans"."schedule_type","launch_plans"."launch_condition_type" FROM "launch_plans" inner join workflows on launch_plans.workflow_id = workflows.id WHERE launch_plans.project = $1 AND launch_plans.domain = $2 AND launch_plans.name = $3 LIMIT 2 OFFSET 1`).WithReply(launchPlans)

collection, err := launchPlanRepo.List(context.Background(), interfaces.ListResourceInput{
InlineFilters: []common.InlineFilter{
Expand Down Expand Up @@ -311,7 +311,7 @@ func TestListLaunchPlans_Filters(t *testing.T) {
GlobalMock := mocket.Catcher.Reset()
GlobalMock.Logging = true
// Only match on queries that append the name filter
GlobalMock.NewMock().WithQuery(`SELECT "launch_plans"."id","launch_plans"."created_at","launch_plans"."updated_at","launch_plans"."deleted_at","launch_plans"."project","launch_plans"."domain","launch_plans"."name","launch_plans"."version","launch_plans"."spec","launch_plans"."workflow_id","launch_plans"."closure","launch_plans"."state","launch_plans"."digest","launch_plans"."schedule_type" FROM "launch_plans" inner join workflows on launch_plans.workflow_id = workflows.id WHERE launch_plans.project = $1 AND launch_plans.domain = $2 AND launch_plans.name = $3 AND launch_plans.version = $4 LIMIT 20`).WithReply(launchPlans[0:1])
GlobalMock.NewMock().WithQuery(`SELECT "launch_plans"."id","launch_plans"."created_at","launch_plans"."updated_at","launch_plans"."deleted_at","launch_plans"."project","launch_plans"."domain","launch_plans"."name","launch_plans"."version","launch_plans"."spec","launch_plans"."workflow_id","launch_plans"."closure","launch_plans"."state","launch_plans"."digest","launch_plans"."schedule_type","launch_plans"."launch_condition_type" FROM "launch_plans" inner join workflows on launch_plans.workflow_id = workflows.id WHERE launch_plans.project = $1 AND launch_plans.domain = $2 AND launch_plans.name = $3 AND launch_plans.version = $4 LIMIT 20`).WithReply(launchPlans[0:1])

collection, err := launchPlanRepo.List(context.Background(), interfaces.ListResourceInput{
InlineFilters: []common.InlineFilter{
Expand Down Expand Up @@ -403,8 +403,8 @@ func TestListLaunchPlansForWorkflow(t *testing.T) {
// HACK: gorm orders the filters on join clauses non-deterministically. Ordering of filters doesn't affect
// correctness, but because the mocket library only pattern matches on substrings, both variations of the (valid)
// SQL that gorm produces are checked below.
query := `SELECT "launch_plans"."id","launch_plans"."created_at","launch_plans"."updated_at","launch_plans"."deleted_at","launch_plans"."project","launch_plans"."domain","launch_plans"."name","launch_plans"."version","launch_plans"."spec","launch_plans"."workflow_id","launch_plans"."closure","launch_plans"."state","launch_plans"."digest","launch_plans"."schedule_type" FROM "launch_plans" inner join workflows on launch_plans.workflow_id = workflows.id WHERE launch_plans.project = $1 AND launch_plans.domain = $2 AND launch_plans.name = $3 AND workflows.deleted_at = $4 LIMIT 20`
alternateQuery := `SELECT "launch_plans"."id","launch_plans"."created_at","launch_plans"."updated_at","launch_plans"."deleted_at","launch_plans"."project","launch_plans"."domain","launch_plans"."name","launch_plans"."version","launch_plans"."spec","launch_plans"."workflow_id","launch_plans"."closure","launch_plans"."state","launch_plans"."digest","launch_plans"."schedule_type" FROM "launch_plans" inner join workflows on launch_plans.workflow_id = workflows.id WHERE launch_plans.project = $1 AND launch_plans.domain = $2 AND launch_plans.name = $3 AND workflows.deleted_at = $4 LIMIT 20`
query := `SELECT "launch_plans"."id","launch_plans"."created_at","launch_plans"."updated_at","launch_plans"."deleted_at","launch_plans"."project","launch_plans"."domain","launch_plans"."name","launch_plans"."version","launch_plans"."spec","launch_plans"."workflow_id","launch_plans"."closure","launch_plans"."state","launch_plans"."digest","launch_plans"."schedule_type","launch_plans"."launch_condition_type" FROM "launch_plans" inner join workflows on launch_plans.workflow_id = workflows.id WHERE launch_plans.project = $1 AND launch_plans.domain = $2 AND launch_plans.name = $3 AND workflows.deleted_at = $4 LIMIT 20`
alternateQuery := `SELECT "launch_plans"."id","launch_plans"."created_at","launch_plans"."updated_at","launch_plans"."deleted_at","launch_plans"."project","launch_plans"."domain","launch_plans"."name","launch_plans"."version","launch_plans"."spec","launch_plans"."workflow_id","launch_plans"."closure","launch_plans"."state","launch_plans"."digest","launch_plans"."schedule_type","launch_plans"."launch_condition_type" FROM "launch_plans" inner join workflows on launch_plans.workflow_id = workflows.id WHERE launch_plans.project = $1 AND launch_plans.domain = $2 AND launch_plans.name = $3 AND workflows.deleted_at = $4 LIMIT 20`
GlobalMock.NewMock().WithQuery(query).WithReply(launchPlans)
GlobalMock.NewMock().WithQuery(alternateQuery).WithReply(launchPlans)

Expand Down
14 changes: 11 additions & 3 deletions flyteadmin/pkg/repositories/models/launch_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,14 @@ const (
LaunchPlanScheduleTypeRATE LaunchPlanScheduleType = "RATE"
)

// Database model to encapsulate a launch plan.
type LaunchConditionType string

const (
// LaunchConditionTypeSCHED is the const representing the launch plan has a trigger type of schedule
LaunchConditionTypeSCHED LaunchConditionType = "SCHED"
)

// LaunchPlan Database model to encapsulate a launch plan.
type LaunchPlan struct {
BaseModel
LaunchPlanKey
Expand All @@ -29,8 +36,9 @@ type LaunchPlan struct {
// GORM doesn't save the zero value for ints, so we use a pointer for the State field
State *int32 `gorm:"default:0"`
// Hash of the launch plan
Digest []byte
ScheduleType LaunchPlanScheduleType
Digest []byte
ScheduleType LaunchPlanScheduleType
LaunchConditionType *LaunchConditionType `gorm:"type:varchar(32);index:idx_launch_plans_launch_conditions,where:launch_condition_type is not null"`
}

var LaunchPlanColumns = modelColumns(LaunchPlan{})
11 changes: 9 additions & 2 deletions flyteadmin/pkg/repositories/transformers/launch_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,21 @@ func CreateLaunchPlanModel(
return models.LaunchPlan{}, errors.NewFlyteAdminError(codes.Internal, "Failed to serialize launch plan closure")
}

var launchConditionType models.LaunchConditionType
scheduleType := models.LaunchPlanScheduleTypeNONE
if launchPlan.Spec.EntityMetadata != nil && launchPlan.Spec.EntityMetadata.Schedule != nil {
if launchPlan.Spec.EntityMetadata.Schedule.GetCronExpression() != "" || launchPlan.Spec.EntityMetadata.Schedule.GetCronSchedule() != nil {
scheduleType = models.LaunchPlanScheduleTypeCRON
launchConditionType = models.LaunchConditionTypeSCHED
} else if launchPlan.Spec.EntityMetadata.Schedule.GetRate() != nil {
scheduleType = models.LaunchPlanScheduleTypeRATE
launchConditionType = models.LaunchConditionTypeSCHED
}
}

state := int32(initState)

return models.LaunchPlan{
lpModel := models.LaunchPlan{
LaunchPlanKey: models.LaunchPlanKey{
Project: launchPlan.Id.Project,
Domain: launchPlan.Id.Domain,
Expand All @@ -64,7 +67,11 @@ func CreateLaunchPlanModel(
WorkflowID: workflowRepoID,
Digest: digest,
ScheduleType: scheduleType,
}, nil
}
if launchConditionType != "" {
lpModel.LaunchConditionType = &launchConditionType
}
return lpModel, nil
}

// Transforms a LaunchPlanModel to a LaunchPlan
Expand Down
9 changes: 3 additions & 6 deletions flyteidl/clients/go/assets/admin.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -4273,10 +4273,11 @@
"SYSTEM",
"RELAUNCH",
"CHILD_WORKFLOW",
"RECOVERED"
"RECOVERED",
"TRIGGER"
],
"default": "MANUAL",
"description": "The method by which this execution was launched.\n\n - MANUAL: The default execution mode, MANUAL implies that an execution was launched by an individual.\n - SCHEDULED: A schedule triggered this execution launch.\n - SYSTEM: A system process was responsible for launching this execution rather an individual.\n - RELAUNCH: This execution was launched with identical inputs as a previous execution.\n - CHILD_WORKFLOW: This execution was triggered by another execution.\n - RECOVERED: This execution was recovered from another execution."
"description": "The method by which this execution was launched.\n\n - MANUAL: The default execution mode, MANUAL implies that an execution was launched by an individual.\n - SCHEDULED: A schedule triggered this execution launch.\n - SYSTEM: A system process was responsible for launching this execution rather an individual.\n - RELAUNCH: This execution was launched with identical inputs as a previous execution.\n - CHILD_WORKFLOW: This execution was triggered by another execution.\n - RECOVERED: This execution was recovered from another execution.\n - TRIGGER: Execution was kicked off by the artifact trigger system"
},
"IOStrategyDownloadMode": {
"type": "string",
Expand Down Expand Up @@ -6474,10 +6475,6 @@
"coreArtifactBindingData": {
"type": "object",
"properties": {
"index": {
"type": "integer",
"format": "int64"
},
"partition_key": {
"type": "string"
},
Expand Down
8 changes: 8 additions & 0 deletions flyteidl/gen/pb-es/flyteidl/admin/execution_pb.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 0 additions & 6 deletions flyteidl/gen/pb-es/flyteidl/core/artifact_id_pb.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading