From 75d6420695dd88142252b0f71bd8212e6d1fa417 Mon Sep 17 00:00:00 2001 From: Scott Winkler Date: Fri, 4 Nov 2022 05:51:31 -0700 Subject: [PATCH 01/21] task after dag support --- pkg/resources/task.go | 276 +++++++++++++------------- pkg/resources/task_acceptance_test.go | 18 +- pkg/snowflake/task.go | 123 +++++++++--- pkg/snowflake/task_test.go | 10 +- 4 files changed, 246 insertions(+), 181 deletions(-) diff --git a/pkg/resources/task.go b/pkg/resources/task.go index 61ef5f6d5c..a2e6d0e0f6 100644 --- a/pkg/resources/task.go +++ b/pkg/resources/task.go @@ -13,6 +13,7 @@ import ( "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation" "github.com/pkg/errors" + "golang.org/x/exp/slices" ) const ( @@ -75,9 +76,10 @@ var taskSchema = map[string]*schema.Schema{ Description: "Specifies a comment for the task.", }, "after": { - Type: schema.TypeString, + Type: schema.TypeList, + Elem: &schema.Schema{Type: schema.TypeString}, Optional: true, - Description: "Specifies the predecessor task in the same database and schema of the current task. When a run of the predecessor task finishes successfully, it triggers this task (after a brief lag). (Conflict with schedule)", + Description: "Specifies one or more predecessor tasks for the current task. Use this option to create a DAG of tasks or add this task to an existing DAG. A DAG is a series of tasks that starts with a scheduled root task and is linked together by dependencies.", ConflictsWith: []string{"schedule"}, }, "when": { @@ -146,88 +148,6 @@ func difference(a, b map[string]interface{}) map[string]interface{} { return diff } -// getActiveRootTask tries to retrieve the root of current task or returns the current (standalone) task. -func getActiveRootTask(data *schema.ResourceData, meta interface{}) (*snowflake.TaskBuilder, error) { - log.Println("[DEBUG] retrieving root task") - - db := meta.(*sql.DB) - database := data.Get("database").(string) - dbSchema := data.Get("schema").(string) - name := data.Get("name").(string) - after := data.Get("after").(string) - - if name == "" { - return nil, nil - } - - // always start from first predecessor - // or the current task when standalone - if after != "" { - name = after - } - - for { - builder := snowflake.Task(name, database, dbSchema) - q := builder.Show() - row := snowflake.QueryRow(db, q) - task, err := snowflake.ScanTask(row) - - if err != nil && name != data.Get("name").(string) { - return nil, errors.Wrapf(err, "failed to locate the root node of: %v", name) - } - - currentName := task.GetPredecessorName() - if currentName == "" { - log.Printf("[DEBUG] found root task: %v", name) - // we only want to deal with suspending the root task when its enabled (started) - if task.IsEnabled() { - return snowflake.Task(name, database, dbSchema), nil - } - return nil, nil - } - - name = currentName - } -} - -// getActiveRootTaskAndSuspend retrieves the root task and suspends it. -func getActiveRootTaskAndSuspend(data *schema.ResourceData, meta interface{}) (*snowflake.TaskBuilder, error) { - db := meta.(*sql.DB) - name := data.Get("name").(string) - - root, err := getActiveRootTask(data, meta) - if err != nil { - return nil, errors.Wrapf(err, "error retrieving root task %v", name) - } - - if root != nil { - qr := root.Suspend() - err = snowflake.Exec(db, qr) - if err != nil { - return nil, errors.Wrapf(err, "error suspending root task %v", name) - } - } - - return root, nil -} - -func resumeTask(root *snowflake.TaskBuilder, meta interface{}) { - if root == nil { - return - } - - if root.IsDisabled() { - return - } - - db := meta.(*sql.DB) - qr := root.Resume() - err := snowflake.Exec(db, qr) - if err != nil { - log.Fatal(errors.Wrapf(err, "error resuming root task %v", root.QualifiedName())) - } -} - // taskIDFromString() takes in a pipe-delimited string: DatabaseName|SchemaName|TaskName // and returns a taskID object. func taskIDFromString(stringID string) (*taskID, error) { @@ -360,12 +280,13 @@ func ReadTask(d *schema.ResourceData, meta interface{}) error { return err } - predecessorName := t.GetPredecessorName() - if predecessorName != "" { - err = d.Set("after", predecessorName) - if err != nil { - return err - } + predecessors, err := t.GetPredecessors() + if err != nil { + return err + } + err = d.Set("predecessors", predecessors) + if err != nil { + return err } err = d.Set("when", t.Condition) @@ -479,13 +400,38 @@ func CreateTask(d *schema.ResourceData, meta interface{}) error { } if v, ok := d.GetOk("after"); ok { - root, err := getActiveRootTaskAndSuspend(d, meta) + a := v.([]interface{}) + after := make([]string, len(a)) + for i, v := range a { + after[i] = v.(string) + } + rootTasks, err := snowflake.GetRootTasks(name, database, dbSchema, db) if err != nil { return err } - defer resumeTask(root, meta) + for _, rootTask := range rootTasks { + // if a root task is enabled, then it needs to be suspended before the child tasks can be created + if rootTask.IsEnabled() { + q := rootTask.Suspend() + err = snowflake.Exec(db, q) + if err != nil { + return err + } + + // resume the task after modifications are complete as long as it is not a standalone task + if !(rootTask.Name == name){ + defer func() { + q = rootTask.Resume() + err = snowflake.Exec(db, q) + if err != nil { + log.Printf("[WARN] failed to resume task %s", rootTask.Name) + } + }() + } + } - builder.WithDependency(v.(string)) + builder.WithAfter(after) + } } if v, ok := d.GetOk("when"); ok { @@ -498,14 +444,6 @@ func CreateTask(d *schema.ResourceData, meta interface{}) error { return errors.Wrapf(err, "error creating task %v", name) } - if enabled { - q = builder.Resume() - err = snowflake.Exec(db, q) - if err != nil { - return errors.Wrapf(err, "error starting task %v", name) - } - } - taskID := &taskID{ DatabaseName: database, SchemaName: dbSchema, @@ -517,6 +455,14 @@ func CreateTask(d *schema.ResourceData, meta interface{}) error { } d.SetId(dataIDInput) + if enabled { + q = builder.Resume() + err = snowflake.Exec(db, q) + if err != nil { + return errors.Wrapf(err, "error starting task %v", name) + } + } + return ReadTask(d, meta) } @@ -533,10 +479,32 @@ func UpdateTask(d *schema.ResourceData, meta interface{}) error { dbSchema := taskID.SchemaName name := taskID.TaskName builder := snowflake.Task(name, database, dbSchema) - root, err := getActiveRootTaskAndSuspend(d, meta) + + rootTasks, err := snowflake.GetRootTasks(name, database, dbSchema, db) if err != nil { return err } + for _, rootTask := range rootTasks { + // if a root task is enabled, then it needs to be suspended before the child tasks can be created + if rootTask.IsEnabled() { + q := rootTask.Suspend() + err = snowflake.Exec(db, q) + if err != nil { + return err + } + + if !(rootTask.Name == name) { + // resume the task after modifications are complete, as long as it is not a standalone task + defer func() { + q = rootTask.Resume() + err = snowflake.Exec(db, q) + if err != nil { + log.Printf("[WARN] failed to resume task %s", rootTask.Name) + } + }() + } + } + } if d.HasChange("warehouse") { var q string @@ -580,27 +548,53 @@ func UpdateTask(d *schema.ResourceData, meta interface{}) error { } } - // Need to remove dependency before adding schedule if needed if d.HasChange("after") { - var ( - q string - err error - ) - - old, _ := d.GetChange("after") - - q = builder.Suspend() + // making changes to after require suspending the current task + q := builder.Suspend() err = snowflake.Exec(db, q) if err != nil { return errors.Wrapf(err, "error suspending task %v", d.Id()) } needResumeCurrentTask = d.Get("enabled").(bool) - if old != "" { - q = builder.RemoveDependency(old.(string)) - err = snowflake.Exec(db, q) + old, new := d.GetChange("after") + var oldAfter []string + if len(old.([]interface{})) > 0 { + oldAfter = expandStringList(old.([]interface{})) + } + + var newAfter []string + if len(new.([]interface{})) > 0 { + newAfter = expandStringList(new.([]interface{})) + } + + // Remove old dependencies that are not in new dependencies + var toRemove []string + for _, dep := range oldAfter { + if !slices.Contains(newAfter, dep) { + toRemove = append(toRemove, dep) + } + } + if len(toRemove) > 0 { + q := builder.RemoveAfter(toRemove) + err := snowflake.Exec(db, q) + if err != nil { + return errors.Wrapf(err, "error removing after dependencies from task %v", d.Id()) + } + } + + // Add new dependencies that are not in old dependencies + var toAdd []string + for _, dep := range newAfter { + if !slices.Contains(oldAfter, dep) { + toAdd = append(toAdd, dep) + } + } + if len(toAdd) > 0 { + q := builder.AddAfter(toAdd) + err := snowflake.Exec(db, q) if err != nil { - return errors.Wrapf(err, "error removing old after dependency from task %v", d.Id()) + return errors.Wrapf(err, "error adding after dependencies to task %v", d.Id()) } } } @@ -662,21 +656,6 @@ func UpdateTask(d *schema.ResourceData, meta interface{}) error { } } - if d.HasChange("after") { - var ( - q string - ) - new := d.Get("after") - - if new != "" { - q = builder.AddDependency(new.(string)) - err := snowflake.Exec(db, q) - if err != nil { - return errors.Wrapf(err, "error adding after dependency on task %v", d.Id()) - } - } - } - if d.HasChange("session_parameters") { var q string o, n := d.GetChange("session_parameters") @@ -737,12 +716,6 @@ func UpdateTask(d *schema.ResourceData, meta interface{}) error { q = builder.Resume() } else { q = builder.Suspend() - // make sure defer doesn't enable task again - // when standalone or root task and status is suspended - needResumeCurrentTask = false - if root != nil && builder.QualifiedName() == root.QualifiedName() { - root = root.SetDisabled() //nolint - } } err := snowflake.Exec(db, q) @@ -752,10 +725,13 @@ func UpdateTask(d *schema.ResourceData, meta interface{}) error { } if needResumeCurrentTask { - resumeTask(builder, meta) + q := builder.Resume() + err := snowflake.Exec(db, q) + if err != nil { + return errors.Wrapf(err, "error resuming task %v", d.Id()) + } } - resumeTask(root, meta) return ReadTask(d, meta) } @@ -771,14 +747,30 @@ func DeleteTask(d *schema.ResourceData, meta interface{}) error { schema := taskID.SchemaName name := taskID.TaskName - root, err := getActiveRootTaskAndSuspend(d, meta) + rootTasks, err := snowflake.GetRootTasks(name, database, schema, db) if err != nil { return err } + for _, rootTask := range rootTasks { + // if a root task is enabled, then it needs to be suspended before the child tasks can be created + if rootTask.IsEnabled() { + q := rootTask.Suspend() + err = snowflake.Exec(db, q) + if err != nil { + return err + } - // only resume the root when not a standalone task - if root != nil && name != root.Name() { - defer resumeTask(root, meta) + if !(rootTask.Name == name) { + // resume the task after modifications are complete, as long as it is not a standalone task + defer func() { + q = rootTask.Resume() + err = snowflake.Exec(db, q) + if err != nil { + log.Printf("[WARN] failed to resume task %s", rootTask.Name) + } + }() + } + } } q := snowflake.Task(name, database, schema).Drop() diff --git a/pkg/resources/task_acceptance_test.go b/pkg/resources/task_acceptance_test.go index e6ae88208b..1ad7a270cb 100644 --- a/pkg/resources/task_acceptance_test.go +++ b/pkg/resources/task_acceptance_test.go @@ -323,7 +323,7 @@ resource "snowflake_task" "child_task" { warehouse = snowflake_task.root_task.warehouse sql_statement = "{{ .ChildTask.SQL }}" enabled = {{ .ChildTask.Enabled }} - after = snowflake_task.root_task.name + after = [snowflake_task.root_task.name] comment = "{{ .ChildTask.Comment }}" {{ if .ChildTask.UserTaskTimeoutMs }} user_task_timeout_ms = {{ .ChildTask.UserTaskTimeoutMs }} @@ -555,7 +555,7 @@ func TestAcc_Task_SwitchScheduled(t *testing.T) { resource.TestCheckResourceAttr("snowflake_task.test_task", "schema", accName), resource.TestCheckResourceAttr("snowflake_task.test_task", "sql_statement", "SELECT 1"), resource.TestCheckResourceAttr("snowflake_task.test_task", "schedule", ""), - resource.TestCheckResourceAttr("snowflake_task.test_task", "after", taskRootName), + resource.TestCheckResourceAttr("snowflake_task.test_task", "after.#", "1"), ), }, { @@ -566,7 +566,7 @@ func TestAcc_Task_SwitchScheduled(t *testing.T) { resource.TestCheckResourceAttr("snowflake_task.test_task", "schema", accName), resource.TestCheckResourceAttr("snowflake_task.test_task", "sql_statement", "SELECT 1"), resource.TestCheckResourceAttr("snowflake_task.test_task", "schedule", "5 MINUTE"), - resource.TestCheckResourceAttr("snowflake_task.test_task", "after", ""), + resource.TestCheckResourceAttr("snowflake_task.test_task", "after.#", "0"), ), }, { @@ -577,7 +577,7 @@ func TestAcc_Task_SwitchScheduled(t *testing.T) { resource.TestCheckResourceAttr("snowflake_task.test_task", "schema", accName), resource.TestCheckResourceAttr("snowflake_task.test_task", "sql_statement", "SELECT 1"), resource.TestCheckResourceAttr("snowflake_task.test_task", "schedule", ""), - resource.TestCheckResourceAttr("snowflake_task.test_task", "after", taskRootName), + resource.TestCheckResourceAttr("snowflake_task.test_task", "after.#", "1"), ), }, }, @@ -587,7 +587,7 @@ func TestAcc_Task_SwitchScheduled(t *testing.T) { func taskConfigManagedScheduled(name string, taskRootName string) string { s := ` resource "snowflake_database" "test_database" { - name = "%s" + name = "tst-terraform-%s" comment = "Terraform acceptance test" } @@ -621,7 +621,7 @@ resource "snowflake_task" "test_task" { func taskConfigManagedScheduled2(name string, taskRootName string) string { s := ` resource "snowflake_database" "test_database" { - name = "%s" + name = "tst-terraform-%s" comment = "Terraform acceptance test" } @@ -646,7 +646,7 @@ resource "snowflake_task" "test_task" { schema = snowflake_schema.test_schema.name sql_statement = "SELECT 1" enabled = true - after = snowflake_task.test_task_root.name + after = [snowflake_task.test_task_root.name] } ` return fmt.Sprintf(s, name, name, taskRootName, name) @@ -655,7 +655,7 @@ resource "snowflake_task" "test_task" { func taskConfigManagedScheduled3(name string, taskRootName string) string { s := ` resource "snowflake_database" "test_database" { - name = "%s" + name = "tst-terraform-%s" comment = "Terraform acceptance test" } @@ -679,7 +679,7 @@ resource "snowflake_task" "test_task" { schema = snowflake_schema.test_schema.name sql_statement = "SELECT 1" enabled = false - after = snowflake_task.test_task_root.name + after = [snowflake_task.test_task_root.name] } ` diff --git a/pkg/snowflake/task.go b/pkg/snowflake/task.go index afbcd2bdba..f637ebef2d 100644 --- a/pkg/snowflake/task.go +++ b/pkg/snowflake/task.go @@ -23,7 +23,7 @@ type TaskBuilder struct { sessionParameters map[string]interface{} userTaskTimeoutMS int comment string - after string + after []string when string SQLStatement string disabled bool @@ -33,10 +33,10 @@ type TaskBuilder struct { } // GetFullName prepends db and schema to in parameter. -func (tb *TaskBuilder) GetFullName(in string) string { +func (tb *TaskBuilder) GetFullName(name string) string { var n strings.Builder - n.WriteString(fmt.Sprintf(`"%v"."%v"."%v"`, tb.db, tb.schema, in)) + n.WriteString(fmt.Sprintf(`"%v"."%v"."%v"`, tb.db, tb.schema, name)) return n.String() } @@ -87,8 +87,8 @@ func (tb *TaskBuilder) WithTimeout(t int) *TaskBuilder { return tb } -// WithDependency adds an after task dependency to the TaskBuilder. -func (tb *TaskBuilder) WithDependency(after string) *TaskBuilder { +// WithAfter adds after task dependencies to the TaskBuilder. +func (tb *TaskBuilder) WithAfter(after []string) *TaskBuilder { tb.after = after return tb } @@ -184,8 +184,12 @@ func (tb *TaskBuilder) Create() string { q.WriteString(fmt.Sprintf(` USER_TASK_TIMEOUT_MS = %v`, tb.userTaskTimeoutMS)) } - if tb.after != "" { - q.WriteString(fmt.Sprintf(` AFTER %v`, tb.GetFullName(tb.after))) + if len(tb.after) > 0 { + after := make([]string, 0) + for _, a := range tb.after { + after = append(after, tb.GetFullName(a)) + } + q.WriteString(fmt.Sprintf(` AFTER %v`, strings.Join(after, ", "))) } if tb.when != "" { @@ -254,14 +258,22 @@ func (tb *TaskBuilder) UnsetAllowOverlappingExecutionParameter() string { return fmt.Sprintf(`ALTER TASK %v UNSET ALLOW_OVERLAPPING_EXECUTION`, tb.QualifiedName()) } -// AddDependency returns the sql that will add the after dependency for the task. -func (tb *TaskBuilder) AddDependency(after string) string { - return fmt.Sprintf(`ALTER TASK %v ADD AFTER %v`, tb.QualifiedName(), tb.GetFullName(after)) +// AddAfter returns the sql that will add the after dependency for the task. +func (tb *TaskBuilder) AddAfter(after []string) string { + afterTasks := make([]string, 0) + for _, a := range after { + afterTasks = append(afterTasks, tb.GetFullName(a)) + } + return fmt.Sprintf(`ALTER TASK %v ADD AFTER %v`, tb.QualifiedName(), strings.Join(afterTasks, ", ")) } -// RemoveDependency returns the sql that will remove the after dependency for the task. -func (tb *TaskBuilder) RemoveDependency(after string) string { - return fmt.Sprintf(`ALTER TASK %v REMOVE AFTER %v`, tb.QualifiedName(), tb.GetFullName(after)) +// RemoveAfter returns the sql that will remove the after dependency for the task. +func (tb *TaskBuilder) RemoveAfter(after []string) string { + afterTasks := make([]string, 0) + for _, a := range after { + afterTasks = append(afterTasks, tb.GetFullName(a)) + } + return fmt.Sprintf(`ALTER TASK %v REMOVE AFTER %v`, tb.QualifiedName(), strings.Join(afterTasks, ", ")) } // AddSessionParameters returns the sql that will remove the session parameters for the task. @@ -379,31 +391,47 @@ type task struct { AllowOverlappingExecution sql.NullString `db:"allow_overlapping_execution"` } +func (t *task) QualifiedName() string { + return fmt.Sprintf(`"%v"."%v"."%v"`, EscapeString(t.DatabaseName), EscapeString(t.SchemaName), EscapeString(t.Name)) +} + +func (t *task) Suspend() string { + return fmt.Sprintf(`ALTER TASK %v SUSPEND`, t.QualifiedName()) +} + +func (t *task) Resume() string { + return fmt.Sprintf(`ALTER TASK %v RESUME`, t.QualifiedName()) +} + func (t *task) IsEnabled() bool { return strings.ToLower(t.State) == "started" } -func (t *task) GetPredecessorName() string { +func (t *task) GetPredecessors() ([]string, error) { if t.Predecessors == nil { - return "" + return []string{}, nil } // Since 2022_03, Snowflake returns this as a JSON array (even empty) - var fullNames []string - if err := json.Unmarshal([]byte(*t.Predecessors), &fullNames); err == nil { - for _, fullName := range fullNames { - name := fullName[strings.LastIndex(fullName, ".")+1:] - return strings.Trim(name, "\\\"") + var predecessorNames []string + if err := json.Unmarshal([]byte(*t.Predecessors), &predecessorNames); err == nil { + for i, predecessorName := range predecessorNames { + formattedName := predecessorName[strings.LastIndex(predecessorName, ".")+1:] + formattedName = strings.Trim(formattedName, "\\\"") + predecessorNames[i] = formattedName } - return "" + return predecessorNames, nil } pre := strings.Split(*t.Predecessors, ".") - name, err := strconv.Unquote(pre[len(pre)-1]) - if err != nil { - return pre[len(pre)-1] + for _, p := range pre { + predecessorName, err := strconv.Unquote(p) + if err != nil { + return nil, err + } + predecessorNames = append(predecessorNames, predecessorName) } - return name + return predecessorNames, nil } // ScanTask turns a sql row into a task object. @@ -453,3 +481,48 @@ func ListTasks(databaseName string, schemaName string, db *sql.DB) ([]task, erro } return dbs, errors.Wrapf(err, "unable to scan row for %s", stmt) } + +// GetRootTasks tries to retrieve the root of current task or returns the current (standalone) task. +func GetRootTasks(name string, databaseName string, schemaName string, db *sql.DB) ([]*task, error) { + builder := Task(name, databaseName, schemaName) + log.Printf("[DEBUG] retrieving predecessors for task %s\n", builder.QualifiedName()) + + q := builder.Show() + row := QueryRow(db, q) + t, err := ScanTask(row) + if err != nil { + return nil, errors.Wrapf(err, "unable to scan row for task %s", builder.QualifiedName()) + } + + predecessors, err := t.GetPredecessors() + if err != nil { + return nil, errors.Wrapf(err, "unable to get predecessors for task %s", builder.QualifiedName()) + } + + // no predecessors mean this is a root task + if len(predecessors) == 0 { + return []*task{t}, nil + } + + var tasks []*task + // get the root tasks for each predecessor and append them all together + for _, predecessor := range predecessors { + predecessorTasks, err := GetRootTasks(predecessor, databaseName, schemaName, db) + if err != nil { + return nil, errors.Wrapf(err, "unable to get predecessors for task %s", builder.QualifiedName()) + } + tasks = append(tasks, predecessorTasks...) + } + + // remove duplicate root tasks + uniqueTasks := make(map[string]*task) + for _, task := range tasks { + uniqueTasks[task.QualifiedName()] = task + } + tasks = []*task{} + for _, task := range uniqueTasks { + tasks = append(tasks, task) + } + + return tasks, nil +} diff --git a/pkg/snowflake/task_test.go b/pkg/snowflake/task_test.go index 5b44527511..c74e6bc133 100644 --- a/pkg/snowflake/task_test.go +++ b/pkg/snowflake/task_test.go @@ -26,7 +26,7 @@ func TestTaskCreate(t *testing.T) { st.WithTimeout(12) r.Equal(st.Create(), `CREATE TASK "test_db"."test_schema"."test_task" WAREHOUSE = "test_wh" SCHEDULE = 'USING CRON 0 9-17 * * SUN America/Los_Angeles' TIMESTAMP_INPUT_FORMAT = "YYYY-MM-DD HH24" COMMENT = 'test comment' USER_TASK_TIMEOUT_MS = 12`) - st.WithDependency("other_task") + st.WithAfter([]string{"other_task"}) r.Equal(st.Create(), `CREATE TASK "test_db"."test_schema"."test_task" WAREHOUSE = "test_wh" SCHEDULE = 'USING CRON 0 9-17 * * SUN America/Los_Angeles' TIMESTAMP_INPUT_FORMAT = "YYYY-MM-DD HH24" COMMENT = 'test comment' USER_TASK_TIMEOUT_MS = 12 AFTER "test_db"."test_schema"."other_task"`) st.WithCondition("SYSTEM$STREAM_HAS_DATA('MYSTREAM')") @@ -93,16 +93,16 @@ func TestRemoveComment(t *testing.T) { r.Equal(st.RemoveComment(), `ALTER TASK "test_db"."test_schema"."test_task" UNSET COMMENT`) } -func TestAddDependency(t *testing.T) { +func TestAddAfter(t *testing.T) { r := require.New(t) st := Task("test_task", "test_db", "test_schema") - r.Equal(st.AddDependency("other_task"), `ALTER TASK "test_db"."test_schema"."test_task" ADD AFTER "test_db"."test_schema"."other_task"`) + r.Equal(st.AddAfter([]string{"other_task"}), `ALTER TASK "test_db"."test_schema"."test_task" ADD AFTER "test_db"."test_schema"."other_task"`) } -func TestRemoveDependency(t *testing.T) { +func TestRemoveAfter(t *testing.T) { r := require.New(t) st := Task("test_task", "test_db", "test_schema") - r.Equal(st.RemoveDependency("first_me_task"), `ALTER TASK "test_db"."test_schema"."test_task" REMOVE AFTER "test_db"."test_schema"."first_me_task"`) + r.Equal(st.RemoveAfter([]string{"first_me_task"}), `ALTER TASK "test_db"."test_schema"."test_task" REMOVE AFTER "test_db"."test_schema"."first_me_task"`) } func TestAddSessionParameters(t *testing.T) { From b3470ab5f124f6c2e9af4e02c3a1abb6634fe702 Mon Sep 17 00:00:00 2001 From: Scott Winkler Date: Fri, 4 Nov 2022 07:59:08 -0700 Subject: [PATCH 02/21] dag for tasks --- docs/resources/task.md | 10 ++-- examples/resources/snowflake_task/resource.tf | 8 +-- pkg/resources/task.go | 54 +++++++++---------- pkg/snowflake/task.go | 3 +- 4 files changed, 36 insertions(+), 39 deletions(-) diff --git a/docs/resources/task.md b/docs/resources/task.md index cfbeede76c..257d5f348f 100644 --- a/docs/resources/task.md +++ b/docs/resources/task.md @@ -13,7 +13,7 @@ description: |- ## Example Usage ```terraform -resource snowflake_task task { +resource "snowflake_task" "task" { comment = "my task" database = "db" @@ -34,7 +34,7 @@ resource snowflake_task task { enabled = true } -resource snowflake_task serverless_task { +resource "snowflake_task" "serverless_task" { comment = "my serverless task" database = "db" @@ -50,12 +50,12 @@ resource snowflake_task serverless_task { user_task_timeout_ms = 10000 user_task_managed_initial_warehouse_size = "XSMALL" - after = "preceding_task" + after = [snowflake_task.task.name] when = "foo AND bar" enabled = true } -resource snowflake_task test_task { +resource "snowflake_task" "test_task" { comment = "task with allow_overlapping_execution" database = "db" @@ -81,7 +81,7 @@ resource snowflake_task test_task { ### Optional -- `after` (String) Specifies the predecessor task in the same database and schema of the current task. When a run of the predecessor task finishes successfully, it triggers this task (after a brief lag). (Conflict with schedule) +- `after` (List of String) Specifies one or more predecessor tasks for the current task. Use this option to create a DAG of tasks or add this task to an existing DAG. A DAG is a series of tasks that starts with a scheduled root task and is linked together by dependencies. - `allow_overlapping_execution` (Boolean) By default, Snowflake ensures that only one instance of a particular DAG is allowed to run at a time, setting the parameter value to TRUE permits DAG runs to overlap. - `comment` (String) Specifies a comment for the task. - `enabled` (Boolean) Specifies if the task should be started (enabled) after creation or should remain suspended (default). diff --git a/examples/resources/snowflake_task/resource.tf b/examples/resources/snowflake_task/resource.tf index 5356b89047..7e456d0408 100644 --- a/examples/resources/snowflake_task/resource.tf +++ b/examples/resources/snowflake_task/resource.tf @@ -1,4 +1,4 @@ -resource snowflake_task task { +resource "snowflake_task" "task" { comment = "my task" database = "db" @@ -19,7 +19,7 @@ resource snowflake_task task { enabled = true } -resource snowflake_task serverless_task { +resource "snowflake_task" "serverless_task" { comment = "my serverless task" database = "db" @@ -35,12 +35,12 @@ resource snowflake_task serverless_task { user_task_timeout_ms = 10000 user_task_managed_initial_warehouse_size = "XSMALL" - after = "preceding_task" + after = [snowflake_task.task.name] when = "foo AND bar" enabled = true } -resource snowflake_task test_task { +resource "snowflake_task" "test_task" { comment = "task with allow_overlapping_execution" database = "db" diff --git a/pkg/resources/task.go b/pkg/resources/task.go index a2e6d0e0f6..fab449a48e 100644 --- a/pkg/resources/task.go +++ b/pkg/resources/task.go @@ -284,7 +284,7 @@ func ReadTask(d *schema.ResourceData, meta interface{}) error { if err != nil { return err } - err = d.Set("predecessors", predecessors) + err = d.Set("after", predecessors) if err != nil { return err } @@ -400,33 +400,31 @@ func CreateTask(d *schema.ResourceData, meta interface{}) error { } if v, ok := d.GetOk("after"); ok { - a := v.([]interface{}) - after := make([]string, len(a)) - for i, v := range a { - after[i] = v.(string) - } - rootTasks, err := snowflake.GetRootTasks(name, database, dbSchema, db) - if err != nil { - return err - } - for _, rootTask := range rootTasks { - // if a root task is enabled, then it needs to be suspended before the child tasks can be created - if rootTask.IsEnabled() { - q := rootTask.Suspend() - err = snowflake.Exec(db, q) - if err != nil { - return err - } + after := expandStringList(v.([]interface{})) + for _, dep := range after { + rootTasks, err := snowflake.GetRootTasks(dep, database, dbSchema, db) + if err != nil { + return err + } + for _, rootTask := range rootTasks { + // if a root task is enabled, then it needs to be suspended before the child tasks can be created + if rootTask.IsEnabled() { + q := rootTask.Suspend() + err = snowflake.Exec(db, q) + if err != nil { + return err + } - // resume the task after modifications are complete as long as it is not a standalone task - if !(rootTask.Name == name){ - defer func() { - q = rootTask.Resume() - err = snowflake.Exec(db, q) - if err != nil { - log.Printf("[WARN] failed to resume task %s", rootTask.Name) - } - }() + // resume the task after modifications are complete as long as it is not a standalone task + if !(rootTask.Name == name) { + defer func() { + q = rootTask.Resume() + err = snowflake.Exec(db, q) + if err != nil { + log.Printf("[WARN] failed to resume task %s", rootTask.Name) + } + }() + } } } @@ -752,7 +750,7 @@ func DeleteTask(d *schema.ResourceData, meta interface{}) error { return err } for _, rootTask := range rootTasks { - // if a root task is enabled, then it needs to be suspended before the child tasks can be created + // if a root task is enabled, then it needs to be suspended before the child tasks can be deleted if rootTask.IsEnabled() { q := rootTask.Suspend() err = snowflake.Exec(db, q) diff --git a/pkg/snowflake/task.go b/pkg/snowflake/task.go index f637ebef2d..750e4cc50c 100644 --- a/pkg/snowflake/task.go +++ b/pkg/snowflake/task.go @@ -486,12 +486,11 @@ func ListTasks(databaseName string, schemaName string, db *sql.DB) ([]task, erro func GetRootTasks(name string, databaseName string, schemaName string, db *sql.DB) ([]*task, error) { builder := Task(name, databaseName, schemaName) log.Printf("[DEBUG] retrieving predecessors for task %s\n", builder.QualifiedName()) - q := builder.Show() row := QueryRow(db, q) t, err := ScanTask(row) if err != nil { - return nil, errors.Wrapf(err, "unable to scan row for task %s", builder.QualifiedName()) + return nil, err } predecessors, err := t.GetPredecessors() From 93aa0441e06640c085a3b699901ba2be5fcf133e Mon Sep 17 00:00:00 2001 From: Scott Winkler Date: Fri, 4 Nov 2022 08:32:37 -0700 Subject: [PATCH 03/21] dag for tasks --- pkg/resources/task_acceptance_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/resources/task_acceptance_test.go b/pkg/resources/task_acceptance_test.go index 1ad7a270cb..c9517a5675 100644 --- a/pkg/resources/task_acceptance_test.go +++ b/pkg/resources/task_acceptance_test.go @@ -39,7 +39,7 @@ var ( childname = "child_task" soloname = "standalone_task" warehousename = strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)) - databasename = strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)) + databasename = "tst-terraform-"+ strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)) initialState = &AccTaskTestSettings{ //nolint WarehouseName: warehousename, @@ -374,7 +374,7 @@ resource "snowflake_task" "solo_task" { } func TestAcc_Task_Managed(t *testing.T) { - accName := strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)) + accName := "tst-terraform-"+strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)) resource.ParallelTest(t, resource.TestCase{ Providers: providers(), @@ -529,7 +529,7 @@ resource "snowflake_task" "managed_task" { } func TestAcc_Task_SwitchScheduled(t *testing.T) { - accName := strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)) + accName := "tst-terraform-"+strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)) taskRootName := strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)) resource.ParallelTest(t, resource.TestCase{ @@ -544,7 +544,7 @@ func TestAcc_Task_SwitchScheduled(t *testing.T) { resource.TestCheckResourceAttr("snowflake_task.test_task", "schema", accName), resource.TestCheckResourceAttr("snowflake_task.test_task", "sql_statement", "SELECT 1"), resource.TestCheckResourceAttr("snowflake_task.test_task", "schedule", "5 MINUTE"), - resource.TestCheckNoResourceAttr("snowflake_task.test_task", "after"), + resource.TestCheckResourceAttr("snowflake_task.test_task", "after.#", "0"), ), }, { @@ -587,7 +587,7 @@ func TestAcc_Task_SwitchScheduled(t *testing.T) { func taskConfigManagedScheduled(name string, taskRootName string) string { s := ` resource "snowflake_database" "test_database" { - name = "tst-terraform-%s" + name = "%s" comment = "Terraform acceptance test" } @@ -621,7 +621,7 @@ resource "snowflake_task" "test_task" { func taskConfigManagedScheduled2(name string, taskRootName string) string { s := ` resource "snowflake_database" "test_database" { - name = "tst-terraform-%s" + name = "%s" comment = "Terraform acceptance test" } @@ -655,7 +655,7 @@ resource "snowflake_task" "test_task" { func taskConfigManagedScheduled3(name string, taskRootName string) string { s := ` resource "snowflake_database" "test_database" { - name = "tst-terraform-%s" + name = "%s" comment = "Terraform acceptance test" } From 6d33c7a875dae8098878fdbc2cbe4b27a635c68c Mon Sep 17 00:00:00 2001 From: Scott Winkler Date: Fri, 4 Nov 2022 08:33:40 -0700 Subject: [PATCH 04/21] dag for tasks --- pkg/resources/task_acceptance_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/resources/task_acceptance_test.go b/pkg/resources/task_acceptance_test.go index c9517a5675..2aa01d89a8 100644 --- a/pkg/resources/task_acceptance_test.go +++ b/pkg/resources/task_acceptance_test.go @@ -39,7 +39,7 @@ var ( childname = "child_task" soloname = "standalone_task" warehousename = strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)) - databasename = "tst-terraform-"+ strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)) + databasename = "tst-terraform-" + strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)) initialState = &AccTaskTestSettings{ //nolint WarehouseName: warehousename, @@ -374,7 +374,7 @@ resource "snowflake_task" "solo_task" { } func TestAcc_Task_Managed(t *testing.T) { - accName := "tst-terraform-"+strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)) + accName := "tst-terraform-" + strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)) resource.ParallelTest(t, resource.TestCase{ Providers: providers(), @@ -529,7 +529,7 @@ resource "snowflake_task" "managed_task" { } func TestAcc_Task_SwitchScheduled(t *testing.T) { - accName := "tst-terraform-"+strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)) + accName := "tst-terraform-" + strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)) taskRootName := strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)) resource.ParallelTest(t, resource.TestCase{ From aac8b1348c77bc88acea44070bf709313bf70be4 Mon Sep 17 00:00:00 2001 From: Scott Winkler Date: Fri, 4 Nov 2022 09:06:46 -0700 Subject: [PATCH 05/21] dag for tasks --- pkg/resources/task_acceptance_test.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/pkg/resources/task_acceptance_test.go b/pkg/resources/task_acceptance_test.go index 2aa01d89a8..aa3ec2d01f 100644 --- a/pkg/resources/task_acceptance_test.go +++ b/pkg/resources/task_acceptance_test.go @@ -190,7 +190,7 @@ func TestAcc_Task(t *testing.T) { resource.TestCheckResourceAttr("snowflake_task.child_task", "schema", "PUBLIC"), resource.TestCheckResourceAttr("snowflake_task.root_task", "sql_statement", initialState.RootTask.SQL), resource.TestCheckResourceAttr("snowflake_task.child_task", "sql_statement", initialState.ChildTask.SQL), - resource.TestCheckResourceAttr("snowflake_task.child_task", "after", rootname), + resource.TestCheckResourceAttr("snowflake_task.child_task", "after.0", rootname), resource.TestCheckResourceAttr("snowflake_task.child_task", "comment", initialState.ChildTask.Comment), resource.TestCheckResourceAttr("snowflake_task.root_task", "schedule", initialState.RootTask.Schedule), resource.TestCheckResourceAttr("snowflake_task.child_task", "schedule", initialState.ChildTask.Schedule), @@ -544,7 +544,6 @@ func TestAcc_Task_SwitchScheduled(t *testing.T) { resource.TestCheckResourceAttr("snowflake_task.test_task", "schema", accName), resource.TestCheckResourceAttr("snowflake_task.test_task", "sql_statement", "SELECT 1"), resource.TestCheckResourceAttr("snowflake_task.test_task", "schedule", "5 MINUTE"), - resource.TestCheckResourceAttr("snowflake_task.test_task", "after.#", "0"), ), }, { @@ -555,7 +554,6 @@ func TestAcc_Task_SwitchScheduled(t *testing.T) { resource.TestCheckResourceAttr("snowflake_task.test_task", "schema", accName), resource.TestCheckResourceAttr("snowflake_task.test_task", "sql_statement", "SELECT 1"), resource.TestCheckResourceAttr("snowflake_task.test_task", "schedule", ""), - resource.TestCheckResourceAttr("snowflake_task.test_task", "after.#", "1"), ), }, { @@ -566,7 +564,6 @@ func TestAcc_Task_SwitchScheduled(t *testing.T) { resource.TestCheckResourceAttr("snowflake_task.test_task", "schema", accName), resource.TestCheckResourceAttr("snowflake_task.test_task", "sql_statement", "SELECT 1"), resource.TestCheckResourceAttr("snowflake_task.test_task", "schedule", "5 MINUTE"), - resource.TestCheckResourceAttr("snowflake_task.test_task", "after.#", "0"), ), }, { @@ -577,7 +574,6 @@ func TestAcc_Task_SwitchScheduled(t *testing.T) { resource.TestCheckResourceAttr("snowflake_task.test_task", "schema", accName), resource.TestCheckResourceAttr("snowflake_task.test_task", "sql_statement", "SELECT 1"), resource.TestCheckResourceAttr("snowflake_task.test_task", "schedule", ""), - resource.TestCheckResourceAttr("snowflake_task.test_task", "after.#", "1"), ), }, }, From 4dbda48c161005215962ac7fd65832e3c1f885d3 Mon Sep 17 00:00:00 2001 From: Scott Winkler Date: Fri, 4 Nov 2022 09:15:09 -0700 Subject: [PATCH 06/21] dag for tasks --- pkg/resources/task_test.go | 155 ------------------------------------- 1 file changed, 155 deletions(-) delete mode 100644 pkg/resources/task_test.go diff --git a/pkg/resources/task_test.go b/pkg/resources/task_test.go deleted file mode 100644 index 6c5013c9e0..0000000000 --- a/pkg/resources/task_test.go +++ /dev/null @@ -1,155 +0,0 @@ -package resources_test - -import ( - "database/sql" - "testing" - - sqlmock "github.com/DATA-DOG/go-sqlmock" - "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/provider" - "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/resources" - "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/snowflake" - . "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/testhelpers" - "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" - "github.com/stretchr/testify/require" -) - -func TestTask(t *testing.T) { - r := require.New(t) - err := resources.Task().InternalValidate(provider.Provider().Schema, true) - r.NoError(err) -} - -func TestTaskCreate(t *testing.T) { - r := require.New(t) - - in := map[string]interface{}{ - "enabled": true, - "name": "test_task", - "database": "test_db", - "schema": "test_schema", - "warehouse": "much_warehouse", - "sql_statement": "select hi from hello", - "comment": "wow comment", - "error_integration": "test_notification_integration", - } - - d := schema.TestResourceDataRaw(t, resources.Task().Schema, in) - r.NotNil(d) - - WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) { - mock.ExpectExec( - `^CREATE TASK "test_db"."test_schema"."test_task" WAREHOUSE = "much_warehouse" COMMENT = 'wow comment' ERROR_INTEGRATION = 'test_notification_integration' AS select hi from hello$`, - ).WillReturnResult(sqlmock.NewResult(1, 1)) - - mock.ExpectExec( - `^ALTER TASK "test_db"."test_schema"."test_task" RESUME$`, - ).WillReturnResult(sqlmock.NewResult(1, 1)) - - expectReadTask(mock) - expectReadTaskParams(mock) - err := resources.CreateTask(d, db) - r.NoError(err) - - r.Empty(d.Get("error_integration"), "Null string must be treated as empty") - }) -} - -func TestTaskCreateManagedWithInitSize(t *testing.T) { - r := require.New(t) - - in := map[string]interface{}{ - "enabled": true, - "name": "test_task", - "database": "test_db", - "schema": "test_schema", - "sql_statement": "select hi from hello", - "comment": "wow comment", - "error_integration": "test_notification_integration", - "user_task_managed_initial_warehouse_size": "XSMALL", - } - - d := schema.TestResourceDataRaw(t, resources.Task().Schema, in) - r.NotNil(d) - - WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) { - mock.ExpectExec( - `^CREATE TASK "test_db"."test_schema"."test_task" USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE = 'XSMALL' COMMENT = 'wow comment' ERROR_INTEGRATION = 'test_notification_integration' AS select hi from hello$`, - ).WillReturnResult(sqlmock.NewResult(1, 1)) - - mock.ExpectExec( - `^ALTER TASK "test_db"."test_schema"."test_task" RESUME$`, - ).WillReturnResult(sqlmock.NewResult(1, 1)) - - expectReadTask(mock) - expectReadTaskParams(mock) - err := resources.CreateTask(d, db) - r.NoError(err) - }) -} - -func TestTaskCreateManagedWithoutInitSize(t *testing.T) { - r := require.New(t) - - in := map[string]interface{}{ - "enabled": true, - "name": "test_task", - "database": "test_db", - "schema": "test_schema", - "sql_statement": "select hi from hello", - "comment": "wow comment", - } - - d := schema.TestResourceDataRaw(t, resources.Task().Schema, in) - r.NotNil(d) - - WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) { - mock.ExpectExec( - `^CREATE TASK "test_db"."test_schema"."test_task" COMMENT = 'wow comment' AS select hi from hello$`, - ).WillReturnResult(sqlmock.NewResult(1, 1)) - - mock.ExpectExec( - `^ALTER TASK "test_db"."test_schema"."test_task" RESUME$`, - ).WillReturnResult(sqlmock.NewResult(1, 1)) - - expectReadTask(mock) - expectReadTaskParams(mock) - err := resources.CreateTask(d, db) - r.NoError(err) - }) -} - -func expectReadTask(mock sqlmock.Sqlmock) { - rows := sqlmock.NewRows([]string{ - "created_on", "name", "database_name", "schema_name", "owner", "comment", "warehouse", "schedule", "predecessors", "state", "definition", "condition", "error_integration"}, - ).AddRow("2020-05-14 17:20:50.088 +0000", "test_task", "test_db", "test_schema", "ACCOUNTADMIN", "wow comment", "", "", "", "started", "select hi from hello", "", "null") - mock.ExpectQuery(`^SHOW TASKS LIKE 'test_task' IN SCHEMA "test_db"."test_schema"$`).WillReturnRows(rows) -} - -func expectReadTaskParams(mock sqlmock.Sqlmock) { - rows := sqlmock.NewRows([]string{ - "key", "value", "default", "level", "description", "type"}, - ).AddRow("ABORT_DETACHED_QUERY", "false", "false", "", "wow desc", "BOOLEAN") - mock.ExpectQuery(`^SHOW PARAMETERS IN TASK "test_db"."test_schema"."test_task"$`).WillReturnRows(rows) -} - -func TestTaskRead(t *testing.T) { - r := require.New(t) - - in := map[string]interface{}{ - "name": "test_task", - "database": "test_db", - "schema": "test_schema", - } - - d := task(t, "test_db|test_schema|test_task", in) - - WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) { - // Test when resource is not found, checking if state will be empty - r.NotEmpty(d.State()) - q := snowflake.Task("test_task", "test_db", "test_schema").Show() - mock.ExpectQuery(q).WillReturnError(sql.ErrNoRows) - err := resources.ReadTask(d, db) - r.Empty(d.State()) - r.Nil(err) - }) -} From 6113170721b62db5f63b0f96e4df9bc825f25ede Mon Sep 17 00:00:00 2001 From: Scott Winkler Date: Fri, 4 Nov 2022 09:18:10 -0700 Subject: [PATCH 07/21] dag for tasks --- pkg/resources/task_acceptance_test.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/pkg/resources/task_acceptance_test.go b/pkg/resources/task_acceptance_test.go index aa3ec2d01f..9d019b8bf3 100644 --- a/pkg/resources/task_acceptance_test.go +++ b/pkg/resources/task_acceptance_test.go @@ -180,8 +180,6 @@ func TestAcc_Task(t *testing.T) { { Config: taskConfig(initialState), Check: resource.ComposeTestCheckFunc( - checkBool("snowflake_task.root_task", "enabled", true), - checkBool("snowflake_task.child_task", "enabled", false), resource.TestCheckResourceAttr("snowflake_task.root_task", "name", rootname), resource.TestCheckResourceAttr("snowflake_task.child_task", "name", childname), resource.TestCheckResourceAttr("snowflake_task.root_task", "database", databasename), @@ -403,7 +401,6 @@ func TestAcc_Task_Managed(t *testing.T) { resource.TestCheckResourceAttr("snowflake_task.managed_task", "sql_statement", "SELECT 1"), resource.TestCheckResourceAttr("snowflake_task.managed_task", "schedule", "5 MINUTE"), resource.TestCheckResourceAttr("snowflake_task.managed_task", "user_task_managed_initial_warehouse_size", ""), - resource.TestCheckResourceAttr("snowflake_task.managed_task", "warehouse", accName), ), }, { From 8df7c3190f65e8c05846495a89d4093744a0ad90 Mon Sep 17 00:00:00 2001 From: Scott Winkler Date: Fri, 4 Nov 2022 09:31:39 -0700 Subject: [PATCH 08/21] dag for tasks --- pkg/resources/task_acceptance_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/resources/task_acceptance_test.go b/pkg/resources/task_acceptance_test.go index 9d019b8bf3..f4a1a0b7df 100644 --- a/pkg/resources/task_acceptance_test.go +++ b/pkg/resources/task_acceptance_test.go @@ -180,6 +180,8 @@ func TestAcc_Task(t *testing.T) { { Config: taskConfig(initialState), Check: resource.ComposeTestCheckFunc( + checkBool("snowflake_task.root_task", "enabled", true), + checkBool("snowflake_task.child_task", "enabled", true), resource.TestCheckResourceAttr("snowflake_task.root_task", "name", rootname), resource.TestCheckResourceAttr("snowflake_task.child_task", "name", childname), resource.TestCheckResourceAttr("snowflake_task.root_task", "database", databasename), From f0df633b645590eb1012cdae7847201bfe45fc56 Mon Sep 17 00:00:00 2001 From: Scott Winkler Date: Fri, 4 Nov 2022 09:54:23 -0700 Subject: [PATCH 09/21] dag for tasks --- pkg/resources/task.go | 28 +++++++++++++++++++++++++++ pkg/resources/task_acceptance_test.go | 3 --- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/pkg/resources/task.go b/pkg/resources/task.go index fab449a48e..9a775a7616 100644 --- a/pkg/resources/task.go +++ b/pkg/resources/task.go @@ -589,6 +589,34 @@ func UpdateTask(d *schema.ResourceData, meta interface{}) error { } } if len(toAdd) > 0 { + // need to suspend any new root tasks from dependencies before adding them + for _, dep := range toAdd { + rootTasks, err := snowflake.GetRootTasks(dep, database, dbSchema, db) + if err != nil { + return err + } + for _, rootTask := range rootTasks { + if rootTask.IsEnabled() { + q := rootTask.Suspend() + err = snowflake.Exec(db, q) + if err != nil { + return err + } + + if !(rootTask.Name == name) { + // resume the task after modifications are complete, as long as it is not a standalone task + defer func() { + q = rootTask.Resume() + err = snowflake.Exec(db, q) + if err != nil { + log.Printf("[WARN] failed to resume task %s", rootTask.Name) + } + }() + } + } + } + + } q := builder.AddAfter(toAdd) err := snowflake.Exec(db, q) if err != nil { diff --git a/pkg/resources/task_acceptance_test.go b/pkg/resources/task_acceptance_test.go index f4a1a0b7df..3570f1f98d 100644 --- a/pkg/resources/task_acceptance_test.go +++ b/pkg/resources/task_acceptance_test.go @@ -391,7 +391,6 @@ func TestAcc_Task_Managed(t *testing.T) { resource.TestCheckResourceAttr("snowflake_task.managed_task", "user_task_managed_initial_warehouse_size", "XSMALL"), resource.TestCheckResourceAttr("snowflake_task.managed_task_no_init", "user_task_managed_initial_warehouse_size", ""), resource.TestCheckResourceAttr("snowflake_task.managed_task_no_init", "session_parameters.TIMESTAMP_INPUT_FORMAT", "YYYY-MM-DD HH24"), - resource.TestCheckResourceAttr("snowflake_task.managed_task", "warehouse", ""), ), }, { @@ -415,7 +414,6 @@ func TestAcc_Task_Managed(t *testing.T) { resource.TestCheckResourceAttr("snowflake_task.managed_task", "schedule", "5 MINUTE"), resource.TestCheckResourceAttr("snowflake_task.managed_task_no_init", "session_parameters.TIMESTAMP_INPUT_FORMAT", "YYYY-MM-DD HH24"), resource.TestCheckResourceAttr("snowflake_task.managed_task_no_init", "user_task_managed_initial_warehouse_size", ""), - resource.TestCheckResourceAttr("snowflake_task.managed_task", "warehouse", ""), ), }, { @@ -427,7 +425,6 @@ func TestAcc_Task_Managed(t *testing.T) { resource.TestCheckResourceAttr("snowflake_task.managed_task", "sql_statement", "SELECT 1"), resource.TestCheckResourceAttr("snowflake_task.managed_task", "schedule", "5 MINUTE"), resource.TestCheckResourceAttr("snowflake_task.managed_task", "user_task_managed_initial_warehouse_size", "SMALL"), - resource.TestCheckResourceAttr("snowflake_task.managed_task", "warehouse", ""), ), }, }, From b2933ac2b15f374bcb33aa9ea710f2a6accd10d9 Mon Sep 17 00:00:00 2001 From: Scott Winkler Date: Fri, 4 Nov 2022 10:29:35 -0700 Subject: [PATCH 10/21] update task acceptance test --- pkg/resources/task.go | 31 +++++++++++++++------------ pkg/resources/task_acceptance_test.go | 3 +++ 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/pkg/resources/task.go b/pkg/resources/task.go index 9a775a7616..f824c06e9d 100644 --- a/pkg/resources/task.go +++ b/pkg/resources/task.go @@ -8,6 +8,7 @@ import ( "log" "strconv" "strings" + "time" "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/snowflake" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" @@ -459,6 +460,8 @@ func CreateTask(d *schema.ResourceData, meta interface{}) error { if err != nil { return errors.Wrapf(err, "error starting task %v", name) } + // wait a few seconds for task to resume + time.Sleep(5 * time.Second) } return ReadTask(d, meta) @@ -546,6 +549,20 @@ func UpdateTask(d *schema.ResourceData, meta interface{}) error { } } + if d.HasChange("schedule") { + var q string + old, new := d.GetChange("schedule") + if old != "" && new == "" { + q = builder.RemoveSchedule() + } else { + q = builder.ChangeSchedule(new.(string)) + } + err := snowflake.Exec(db, q) + if err != nil { + return errors.Wrapf(err, "error updating schedule on task %v", d.Id()) + } + } + if d.HasChange("after") { // making changes to after require suspending the current task q := builder.Suspend() @@ -625,20 +642,6 @@ func UpdateTask(d *schema.ResourceData, meta interface{}) error { } } - if d.HasChange("schedule") { - var q string - old, new := d.GetChange("schedule") - if old != "" && new == "" { - q = builder.RemoveSchedule() - } else { - q = builder.ChangeSchedule(new.(string)) - } - err := snowflake.Exec(db, q) - if err != nil { - return errors.Wrapf(err, "error updating schedule on task %v", d.Id()) - } - } - if d.HasChange("user_task_timeout_ms") { var q string old, new := d.GetChange("user_task_timeout_ms") diff --git a/pkg/resources/task_acceptance_test.go b/pkg/resources/task_acceptance_test.go index 3570f1f98d..f4a1a0b7df 100644 --- a/pkg/resources/task_acceptance_test.go +++ b/pkg/resources/task_acceptance_test.go @@ -391,6 +391,7 @@ func TestAcc_Task_Managed(t *testing.T) { resource.TestCheckResourceAttr("snowflake_task.managed_task", "user_task_managed_initial_warehouse_size", "XSMALL"), resource.TestCheckResourceAttr("snowflake_task.managed_task_no_init", "user_task_managed_initial_warehouse_size", ""), resource.TestCheckResourceAttr("snowflake_task.managed_task_no_init", "session_parameters.TIMESTAMP_INPUT_FORMAT", "YYYY-MM-DD HH24"), + resource.TestCheckResourceAttr("snowflake_task.managed_task", "warehouse", ""), ), }, { @@ -414,6 +415,7 @@ func TestAcc_Task_Managed(t *testing.T) { resource.TestCheckResourceAttr("snowflake_task.managed_task", "schedule", "5 MINUTE"), resource.TestCheckResourceAttr("snowflake_task.managed_task_no_init", "session_parameters.TIMESTAMP_INPUT_FORMAT", "YYYY-MM-DD HH24"), resource.TestCheckResourceAttr("snowflake_task.managed_task_no_init", "user_task_managed_initial_warehouse_size", ""), + resource.TestCheckResourceAttr("snowflake_task.managed_task", "warehouse", ""), ), }, { @@ -425,6 +427,7 @@ func TestAcc_Task_Managed(t *testing.T) { resource.TestCheckResourceAttr("snowflake_task.managed_task", "sql_statement", "SELECT 1"), resource.TestCheckResourceAttr("snowflake_task.managed_task", "schedule", "5 MINUTE"), resource.TestCheckResourceAttr("snowflake_task.managed_task", "user_task_managed_initial_warehouse_size", "SMALL"), + resource.TestCheckResourceAttr("snowflake_task.managed_task", "warehouse", ""), ), }, }, From 21e52975f6c60bc04b0e3a82184ca7f61cc642fa Mon Sep 17 00:00:00 2001 From: Scott Winkler Date: Fri, 4 Nov 2022 10:45:06 -0700 Subject: [PATCH 11/21] update task acceptance test --- pkg/resources/task_acceptance_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/resources/task_acceptance_test.go b/pkg/resources/task_acceptance_test.go index f4a1a0b7df..e12641c574 100644 --- a/pkg/resources/task_acceptance_test.go +++ b/pkg/resources/task_acceptance_test.go @@ -403,6 +403,7 @@ func TestAcc_Task_Managed(t *testing.T) { resource.TestCheckResourceAttr("snowflake_task.managed_task", "sql_statement", "SELECT 1"), resource.TestCheckResourceAttr("snowflake_task.managed_task", "schedule", "5 MINUTE"), resource.TestCheckResourceAttr("snowflake_task.managed_task", "user_task_managed_initial_warehouse_size", ""), + resource.TestCheckResourceAttr("snowflake_task.managed_task", "warehouse", accName), ), }, { From c686c7603916684f854d75c27400c2304cbb4aa9 Mon Sep 17 00:00:00 2001 From: Scott Winkler Date: Fri, 4 Nov 2022 11:21:42 -0700 Subject: [PATCH 12/21] update task acceptance test --- pkg/resources/task.go | 48 ++++++++++++++++----------- pkg/resources/task_acceptance_test.go | 4 +-- 2 files changed, 31 insertions(+), 21 deletions(-) diff --git a/pkg/resources/task.go b/pkg/resources/task.go index f824c06e9d..e7182e7b41 100644 --- a/pkg/resources/task.go +++ b/pkg/resources/task.go @@ -455,13 +455,17 @@ func CreateTask(d *schema.ResourceData, meta interface{}) error { d.SetId(dataIDInput) if enabled { - q = builder.Resume() - err = snowflake.Exec(db, q) - if err != nil { - return errors.Wrapf(err, "error starting task %v", name) - } - // wait a few seconds for task to resume - time.Sleep(5 * time.Second) + go func() { + // wait a few seconds for root tasks to resume + time.Sleep(5 * time.Second) + q = builder.Resume() + err = snowflake.Exec(db, q) + if err != nil { + log.Printf("[WARN] failed to resume task %s", name) + } + // wait a few seconds for task to resume + time.Sleep(5 * time.Second) + }() } return ReadTask(d, meta) @@ -549,23 +553,16 @@ func UpdateTask(d *schema.ResourceData, meta interface{}) error { } } - if d.HasChange("schedule") { - var q string - old, new := d.GetChange("schedule") - if old != "" && new == "" { - q = builder.RemoveSchedule() - } else { - q = builder.ChangeSchedule(new.(string)) - } + if d.HasChange("after") { + // preemitvely removing schedule because a task cannot have both after and schedule + q := builder.RemoveSchedule() err := snowflake.Exec(db, q) if err != nil { return errors.Wrapf(err, "error updating schedule on task %v", d.Id()) } - } - if d.HasChange("after") { // making changes to after require suspending the current task - q := builder.Suspend() + q = builder.Suspend() err = snowflake.Exec(db, q) if err != nil { return errors.Wrapf(err, "error suspending task %v", d.Id()) @@ -632,7 +629,6 @@ func UpdateTask(d *schema.ResourceData, meta interface{}) error { } } } - } q := builder.AddAfter(toAdd) err := snowflake.Exec(db, q) @@ -642,6 +638,20 @@ func UpdateTask(d *schema.ResourceData, meta interface{}) error { } } + if d.HasChange("schedule") { + var q string + old, new := d.GetChange("schedule") + if old != "" && new == "" { + q = builder.RemoveSchedule() + } else { + q = builder.ChangeSchedule(new.(string)) + } + err := snowflake.Exec(db, q) + if err != nil { + return errors.Wrapf(err, "error updating schedule on task %v", d.Id()) + } + } + if d.HasChange("user_task_timeout_ms") { var q string old, new := d.GetChange("user_task_timeout_ms") diff --git a/pkg/resources/task_acceptance_test.go b/pkg/resources/task_acceptance_test.go index e12641c574..1357df47cf 100644 --- a/pkg/resources/task_acceptance_test.go +++ b/pkg/resources/task_acceptance_test.go @@ -457,7 +457,7 @@ resource "snowflake_task" "managed_task" { user_task_managed_initial_warehouse_size = "XSMALL" } resource "snowflake_task" "managed_task_no_init" { - name = "%s3" + name = "%s" database = snowflake_database.test_database.name schema = snowflake_schema.test_schema.name sql_statement = "SELECT 1" @@ -480,7 +480,7 @@ resource "snowflake_database" "test_database" { } resource "snowflake_warehouse" "test_wh" { - name = "%s" + name = "KTYXYQANVK" } resource "snowflake_schema" "test_schema" { From 607ec8343309e798ccb5742ce74ac0f191bf6fbd Mon Sep 17 00:00:00 2001 From: Scott Winkler Date: Fri, 4 Nov 2022 13:11:48 -0700 Subject: [PATCH 13/21] update dag task --- pkg/resources/task.go | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/pkg/resources/task.go b/pkg/resources/task.go index e7182e7b41..f53e621afe 100644 --- a/pkg/resources/task.go +++ b/pkg/resources/task.go @@ -455,17 +455,27 @@ func CreateTask(d *schema.ResourceData, meta interface{}) error { d.SetId(dataIDInput) if enabled { - go func() { - // wait a few seconds for root tasks to resume - time.Sleep(5 * time.Second) + // try to resume the task, and verify that it was resumed. + // if its not resumed then try again up until a maximum of 3 times + for i := 0; i < 3; i++ { q = builder.Resume() err = snowflake.Exec(db, q) if err != nil { log.Printf("[WARN] failed to resume task %s", name) } - // wait a few seconds for task to resume + + builder := snowflake.Task(name, database, dbSchema) + q := builder.Show() + row := snowflake.QueryRow(db, q) + t, err := snowflake.ScanTask(row) + if err != nil { + return err + } + if t.IsEnabled() { + break + } time.Sleep(5 * time.Second) - }() + } } return ReadTask(d, meta) @@ -769,6 +779,10 @@ func UpdateTask(d *schema.ResourceData, meta interface{}) error { if err != nil { return errors.Wrapf(err, "error resuming task %v", d.Id()) } + // wait for task to resume + q = builder.Show() + + time.Sleep(10 * time.Second) } return ReadTask(d, meta) From 81ed9e54b3ff44b861d50caf15a422223833a1f6 Mon Sep 17 00:00:00 2001 From: Scott Winkler Date: Fri, 4 Nov 2022 13:32:43 -0700 Subject: [PATCH 14/21] update dag task --- pkg/resources/task.go | 30 +++++++++++++++++++-------- pkg/resources/task_acceptance_test.go | 2 +- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/pkg/resources/task.go b/pkg/resources/task.go index f53e621afe..55e26a34fc 100644 --- a/pkg/resources/task.go +++ b/pkg/resources/task.go @@ -461,7 +461,7 @@ func CreateTask(d *schema.ResourceData, meta interface{}) error { q = builder.Resume() err = snowflake.Exec(db, q) if err != nil { - log.Printf("[WARN] failed to resume task %s", name) + return errors.Wrapf(err, "error resuming task %v", name) } builder := snowflake.Task(name, database, dbSchema) @@ -774,15 +774,27 @@ func UpdateTask(d *schema.ResourceData, meta interface{}) error { } if needResumeCurrentTask { - q := builder.Resume() - err := snowflake.Exec(db, q) - if err != nil { - return errors.Wrapf(err, "error resuming task %v", d.Id()) - } - // wait for task to resume - q = builder.Show() + // try to resume the task, and verify that it was resumed. + // if its not resumed then try again up until a maximum of 3 times + for i := 0; i < 3; i++ { + q := builder.Resume() + err = snowflake.Exec(db, q) + if err != nil { + return errors.Wrapf(err, "error resuming task %v", name) + } - time.Sleep(10 * time.Second) + builder := snowflake.Task(name, database, dbSchema) + q = builder.Show() + row := snowflake.QueryRow(db, q) + t, err := snowflake.ScanTask(row) + if err != nil { + return err + } + if t.IsEnabled() { + break + } + time.Sleep(5 * time.Second) + } } return ReadTask(d, meta) diff --git a/pkg/resources/task_acceptance_test.go b/pkg/resources/task_acceptance_test.go index 1357df47cf..3f84a397e4 100644 --- a/pkg/resources/task_acceptance_test.go +++ b/pkg/resources/task_acceptance_test.go @@ -457,7 +457,7 @@ resource "snowflake_task" "managed_task" { user_task_managed_initial_warehouse_size = "XSMALL" } resource "snowflake_task" "managed_task_no_init" { - name = "%s" + name = "%s_no_init" database = snowflake_database.test_database.name schema = snowflake_schema.test_schema.name sql_statement = "SELECT 1" From 3d26bf404aca3860a71e8d9b2d670cfcc65771f7 Mon Sep 17 00:00:00 2001 From: Scott Winkler Date: Fri, 4 Nov 2022 14:26:42 -0700 Subject: [PATCH 15/21] update dag task --- pkg/resources/task.go | 68 +++++---------------------- pkg/resources/task_acceptance_test.go | 10 ++-- pkg/snowflake/task.go | 27 +++++++++++ 3 files changed, 45 insertions(+), 60 deletions(-) diff --git a/pkg/resources/task.go b/pkg/resources/task.go index 55e26a34fc..b4280cd385 100644 --- a/pkg/resources/task.go +++ b/pkg/resources/task.go @@ -8,7 +8,6 @@ import ( "log" "strconv" "strings" - "time" "github.com/Snowflake-Labs/terraform-provider-snowflake/pkg/snowflake" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" @@ -359,12 +358,12 @@ func CreateTask(d *schema.ResourceData, meta interface{}) error { var err error db := meta.(*sql.DB) database := d.Get("database").(string) - dbSchema := d.Get("schema").(string) + schema := d.Get("schema").(string) name := d.Get("name").(string) sql := d.Get("sql_statement").(string) enabled := d.Get("enabled").(bool) - builder := snowflake.Task(name, database, dbSchema) + builder := snowflake.Task(name, database, schema) builder.WithStatement(sql) // Set optionals @@ -403,7 +402,7 @@ func CreateTask(d *schema.ResourceData, meta interface{}) error { if v, ok := d.GetOk("after"); ok { after := expandStringList(v.([]interface{})) for _, dep := range after { - rootTasks, err := snowflake.GetRootTasks(dep, database, dbSchema, db) + rootTasks, err := snowflake.GetRootTasks(dep, database, schema, db) if err != nil { return err } @@ -445,7 +444,7 @@ func CreateTask(d *schema.ResourceData, meta interface{}) error { taskID := &taskID{ DatabaseName: database, - SchemaName: dbSchema, + SchemaName: schema, TaskName: name, } dataIDInput, err := taskID.String() @@ -455,27 +454,7 @@ func CreateTask(d *schema.ResourceData, meta interface{}) error { d.SetId(dataIDInput) if enabled { - // try to resume the task, and verify that it was resumed. - // if its not resumed then try again up until a maximum of 3 times - for i := 0; i < 3; i++ { - q = builder.Resume() - err = snowflake.Exec(db, q) - if err != nil { - return errors.Wrapf(err, "error resuming task %v", name) - } - - builder := snowflake.Task(name, database, dbSchema) - q := builder.Show() - row := snowflake.QueryRow(db, q) - t, err := snowflake.ScanTask(row) - if err != nil { - return err - } - if t.IsEnabled() { - break - } - time.Sleep(5 * time.Second) - } + snowflake.WaitResumeTask(db, name, database, schema) } return ReadTask(d, meta) @@ -491,11 +470,11 @@ func UpdateTask(d *schema.ResourceData, meta interface{}) error { db := meta.(*sql.DB) database := taskID.DatabaseName - dbSchema := taskID.SchemaName + schema := taskID.SchemaName name := taskID.TaskName - builder := snowflake.Task(name, database, dbSchema) + builder := snowflake.Task(name, database, schema) - rootTasks, err := snowflake.GetRootTasks(name, database, dbSchema, db) + rootTasks, err := snowflake.GetRootTasks(name, database, schema, db) if err != nil { return err } @@ -615,7 +594,7 @@ func UpdateTask(d *schema.ResourceData, meta interface{}) error { if len(toAdd) > 0 { // need to suspend any new root tasks from dependencies before adding them for _, dep := range toAdd { - rootTasks, err := snowflake.GetRootTasks(dep, database, dbSchema, db) + rootTasks, err := snowflake.GetRootTasks(dep, database, schema, db) if err != nil { return err } @@ -758,10 +737,9 @@ func UpdateTask(d *schema.ResourceData, meta interface{}) error { if d.HasChange("enabled") { var q string - n := d.Get("enabled") - enable := n.(bool) + enabled := d.Get("enabled").(bool) - if enable { + if enabled { q = builder.Resume() } else { q = builder.Suspend() @@ -771,32 +749,12 @@ func UpdateTask(d *schema.ResourceData, meta interface{}) error { if err != nil { return errors.Wrapf(err, "error updating task state %v", d.Id()) } + } if needResumeCurrentTask { - // try to resume the task, and verify that it was resumed. - // if its not resumed then try again up until a maximum of 3 times - for i := 0; i < 3; i++ { - q := builder.Resume() - err = snowflake.Exec(db, q) - if err != nil { - return errors.Wrapf(err, "error resuming task %v", name) - } - - builder := snowflake.Task(name, database, dbSchema) - q = builder.Show() - row := snowflake.QueryRow(db, q) - t, err := snowflake.ScanTask(row) - if err != nil { - return err - } - if t.IsEnabled() { - break - } - time.Sleep(5 * time.Second) - } + snowflake.WaitResumeTask(db, name, database, schema) } - return ReadTask(d, meta) } diff --git a/pkg/resources/task_acceptance_test.go b/pkg/resources/task_acceptance_test.go index 3f84a397e4..6cb3f82e7a 100644 --- a/pkg/resources/task_acceptance_test.go +++ b/pkg/resources/task_acceptance_test.go @@ -375,7 +375,7 @@ resource "snowflake_task" "solo_task" { func TestAcc_Task_Managed(t *testing.T) { accName := "tst-terraform-" + strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)) - + whName := strings.ToUpper(acctest.RandStringFromCharSet(10, acctest.CharSetAlpha)) resource.ParallelTest(t, resource.TestCase{ Providers: providers(), CheckDestroy: nil, @@ -395,7 +395,7 @@ func TestAcc_Task_Managed(t *testing.T) { ), }, { - Config: taskConfigManaged2(accName), + Config: taskConfigManaged2(accName, whName), Check: resource.ComposeTestCheckFunc( checkBool("snowflake_task.managed_task", "enabled", true), resource.TestCheckResourceAttr("snowflake_task.managed_task", "database", accName), @@ -472,7 +472,7 @@ resource "snowflake_task" "managed_task_no_init" { return fmt.Sprintf(s, name, name, name, name) } -func taskConfigManaged2(name string) string { +func taskConfigManaged2(name, whName string) string { s := ` resource "snowflake_database" "test_database" { name = "%s" @@ -480,7 +480,7 @@ resource "snowflake_database" "test_database" { } resource "snowflake_warehouse" "test_wh" { - name = "KTYXYQANVK" + name = "%s" } resource "snowflake_schema" "test_schema" { @@ -499,7 +499,7 @@ resource "snowflake_task" "managed_task" { warehouse = snowflake_warehouse.test_wh.name } ` - return fmt.Sprintf(s, name, name, name, name) + return fmt.Sprintf(s, name, whName, name, name) } func taskConfigManaged3(name string) string { diff --git a/pkg/snowflake/task.go b/pkg/snowflake/task.go index 750e4cc50c..e0a15979d0 100644 --- a/pkg/snowflake/task.go +++ b/pkg/snowflake/task.go @@ -8,6 +8,7 @@ import ( "sort" "strconv" "strings" + "time" "github.com/jmoiron/sqlx" "github.com/pkg/errors" @@ -525,3 +526,29 @@ func GetRootTasks(name string, databaseName string, schemaName string, db *sql.D return tasks, nil } + +func WaitResumeTask(db *sql.DB, name string, database string, schema string) error { + builder := Task(name, database, schema) + + // try to resume the task, and verify that it was resumed. + // if its not resumed then try again up until a maximum of 3 times + for i := 0; i < 3; i++ { + q := builder.Resume() + err := Exec(db, q) + if err != nil { + return errors.Wrapf(err, "error resuming task %v", name) + } + + q = builder.Show() + row := QueryRow(db, q) + t, err := ScanTask(row) + if err != nil { + return err + } + if t.IsEnabled() { + break + } + time.Sleep(5 * time.Second) + } + return nil +} From e90f84869da03d2c00c253d7516b15fe9e5911ed Mon Sep 17 00:00:00 2001 From: Scott Winkler Date: Fri, 4 Nov 2022 14:33:36 -0700 Subject: [PATCH 16/21] update dag task --- pkg/resources/task.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/resources/task.go b/pkg/resources/task.go index b4280cd385..e65c3169db 100644 --- a/pkg/resources/task.go +++ b/pkg/resources/task.go @@ -740,7 +740,7 @@ func UpdateTask(d *schema.ResourceData, meta interface{}) error { enabled := d.Get("enabled").(bool) if enabled { - q = builder.Resume() + needResumeCurrentTask = true } else { q = builder.Suspend() } @@ -749,7 +749,6 @@ func UpdateTask(d *schema.ResourceData, meta interface{}) error { if err != nil { return errors.Wrapf(err, "error updating task state %v", d.Id()) } - } if needResumeCurrentTask { From 560e23507cbbea60d3b244e3c635d4ec4f37bde5 Mon Sep 17 00:00:00 2001 From: Scott Winkler Date: Fri, 4 Nov 2022 14:55:39 -0700 Subject: [PATCH 17/21] update dag task --- pkg/resources/task.go | 21 +++++---------------- 1 file changed, 5 insertions(+), 16 deletions(-) diff --git a/pkg/resources/task.go b/pkg/resources/task.go index e65c3169db..fe3e6aae86 100644 --- a/pkg/resources/task.go +++ b/pkg/resources/task.go @@ -463,7 +463,6 @@ func CreateTask(d *schema.ResourceData, meta interface{}) error { // UpdateTask implements schema.UpdateFunc. func UpdateTask(d *schema.ResourceData, meta interface{}) error { taskID, err := taskIDFromString(d.Id()) - var needResumeCurrentTask = false if err != nil { return err } @@ -556,7 +555,6 @@ func UpdateTask(d *schema.ResourceData, meta interface{}) error { if err != nil { return errors.Wrapf(err, "error suspending task %v", d.Id()) } - needResumeCurrentTask = d.Get("enabled").(bool) old, new := d.GetChange("after") var oldAfter []string @@ -735,25 +733,16 @@ func UpdateTask(d *schema.ResourceData, meta interface{}) error { } } - if d.HasChange("enabled") { - var q string - enabled := d.Get("enabled").(bool) - - if enabled { - needResumeCurrentTask = true - } else { - q = builder.Suspend() - } - + enabled := d.Get("enabled").(bool) + if enabled { + snowflake.WaitResumeTask(db, name, database, schema) + } else { + q := builder.Suspend() err := snowflake.Exec(db, q) if err != nil { return errors.Wrapf(err, "error updating task state %v", d.Id()) } } - - if needResumeCurrentTask { - snowflake.WaitResumeTask(db, name, database, schema) - } return ReadTask(d, meta) } From dc8edd02365ec2b89a51feb78dd1875e77565ff4 Mon Sep 17 00:00:00 2001 From: Scott Winkler Date: Fri, 4 Nov 2022 15:12:30 -0700 Subject: [PATCH 18/21] update dag task --- pkg/resources/task.go | 10 ++++++++-- pkg/resources/task_acceptance_test.go | 2 +- pkg/snowflake/task.go | 6 +++--- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/pkg/resources/task.go b/pkg/resources/task.go index fe3e6aae86..2d347576b5 100644 --- a/pkg/resources/task.go +++ b/pkg/resources/task.go @@ -454,7 +454,10 @@ func CreateTask(d *schema.ResourceData, meta interface{}) error { d.SetId(dataIDInput) if enabled { - snowflake.WaitResumeTask(db, name, database, schema) + err := snowflake.WaitResumeTask(db, name, database, schema) + if err != nil { + return err + } } return ReadTask(d, meta) @@ -735,7 +738,10 @@ func UpdateTask(d *schema.ResourceData, meta interface{}) error { enabled := d.Get("enabled").(bool) if enabled { - snowflake.WaitResumeTask(db, name, database, schema) + err := snowflake.WaitResumeTask(db, name, database, schema) + if err != nil { + return err + } } else { q := builder.Suspend() err := snowflake.Exec(db, q) diff --git a/pkg/resources/task_acceptance_test.go b/pkg/resources/task_acceptance_test.go index 6cb3f82e7a..72f52a3185 100644 --- a/pkg/resources/task_acceptance_test.go +++ b/pkg/resources/task_acceptance_test.go @@ -403,7 +403,7 @@ func TestAcc_Task_Managed(t *testing.T) { resource.TestCheckResourceAttr("snowflake_task.managed_task", "sql_statement", "SELECT 1"), resource.TestCheckResourceAttr("snowflake_task.managed_task", "schedule", "5 MINUTE"), resource.TestCheckResourceAttr("snowflake_task.managed_task", "user_task_managed_initial_warehouse_size", ""), - resource.TestCheckResourceAttr("snowflake_task.managed_task", "warehouse", accName), + resource.TestCheckResourceAttr("snowflake_task.managed_task", "warehouse", whName), ), }, { diff --git a/pkg/snowflake/task.go b/pkg/snowflake/task.go index e0a15979d0..38f67de0fb 100644 --- a/pkg/snowflake/task.go +++ b/pkg/snowflake/task.go @@ -532,7 +532,7 @@ func WaitResumeTask(db *sql.DB, name string, database string, schema string) err // try to resume the task, and verify that it was resumed. // if its not resumed then try again up until a maximum of 3 times - for i := 0; i < 3; i++ { + for i := 0; i < 5; i++ { q := builder.Resume() err := Exec(db, q) if err != nil { @@ -548,7 +548,7 @@ func WaitResumeTask(db *sql.DB, name string, database string, schema string) err if t.IsEnabled() { break } - time.Sleep(5 * time.Second) + time.Sleep(10 * time.Second) } - return nil + return errors.Errorf("unable to resume task %v", name) } From f7f15e08062f113108daecfd29486347304350d1 Mon Sep 17 00:00:00 2001 From: Scott Winkler Date: Fri, 4 Nov 2022 15:26:45 -0700 Subject: [PATCH 19/21] update dag task --- pkg/resources/task.go | 20 ++++++++++++-------- pkg/snowflake/task.go | 6 +++--- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/pkg/resources/task.go b/pkg/resources/task.go index 2d347576b5..fbde64a553 100644 --- a/pkg/resources/task.go +++ b/pkg/resources/task.go @@ -454,10 +454,12 @@ func CreateTask(d *schema.ResourceData, meta interface{}) error { d.SetId(dataIDInput) if enabled { - err := snowflake.WaitResumeTask(db, name, database, schema) - if err != nil { - return err - } + defer func() { + err := snowflake.WaitResumeTask(db, name, database, schema) + if err != nil { + log.Printf("[WARN] failed to resume task %s", name) + } + }() } return ReadTask(d, meta) @@ -738,10 +740,12 @@ func UpdateTask(d *schema.ResourceData, meta interface{}) error { enabled := d.Get("enabled").(bool) if enabled { - err := snowflake.WaitResumeTask(db, name, database, schema) - if err != nil { - return err - } + defer func() { + err := snowflake.WaitResumeTask(db, name, database, schema) + if err != nil { + log.Printf("[WARN] failed to resume task %s", name) + } + }() } else { q := builder.Suspend() err := snowflake.Exec(db, q) diff --git a/pkg/snowflake/task.go b/pkg/snowflake/task.go index 38f67de0fb..eff4572f3f 100644 --- a/pkg/snowflake/task.go +++ b/pkg/snowflake/task.go @@ -531,7 +531,7 @@ func WaitResumeTask(db *sql.DB, name string, database string, schema string) err builder := Task(name, database, schema) // try to resume the task, and verify that it was resumed. - // if its not resumed then try again up until a maximum of 3 times + // if its not resumed then try again up until a maximum of 5 times for i := 0; i < 5; i++ { q := builder.Resume() err := Exec(db, q) @@ -546,9 +546,9 @@ func WaitResumeTask(db *sql.DB, name string, database string, schema string) err return err } if t.IsEnabled() { - break + return nil } time.Sleep(10 * time.Second) } - return errors.Errorf("unable to resume task %v", name) + return errors.Errorf("unable to resume task %v after 5 attempts", name) } From 288c7adb333f7cd6367bf97494ae23cbf9dee676 Mon Sep 17 00:00:00 2001 From: Scott Winkler Date: Fri, 4 Nov 2022 15:38:57 -0700 Subject: [PATCH 20/21] update dag task --- pkg/resources/task.go | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/pkg/resources/task.go b/pkg/resources/task.go index fbde64a553..00f22e3452 100644 --- a/pkg/resources/task.go +++ b/pkg/resources/task.go @@ -454,12 +454,10 @@ func CreateTask(d *schema.ResourceData, meta interface{}) error { d.SetId(dataIDInput) if enabled { - defer func() { - err := snowflake.WaitResumeTask(db, name, database, schema) - if err != nil { - log.Printf("[WARN] failed to resume task %s", name) - } - }() + err := snowflake.WaitResumeTask(db, name, database, schema) + if err != nil { + log.Printf("[WARN] failed to resume task %s", name) + } } return ReadTask(d, meta) @@ -740,12 +738,10 @@ func UpdateTask(d *schema.ResourceData, meta interface{}) error { enabled := d.Get("enabled").(bool) if enabled { - defer func() { - err := snowflake.WaitResumeTask(db, name, database, schema) - if err != nil { - log.Printf("[WARN] failed to resume task %s", name) - } - }() + err := snowflake.WaitResumeTask(db, name, database, schema) + if err != nil { + log.Printf("[WARN] failed to resume task %s", name) + } } else { q := builder.Suspend() err := snowflake.Exec(db, q) From 9f3054b8f1c6084a86b59511d8c22f403b5ddcbc Mon Sep 17 00:00:00 2001 From: Scott Winkler Date: Fri, 4 Nov 2022 15:48:58 -0700 Subject: [PATCH 21/21] update dag task --- pkg/resources/task_acceptance_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/resources/task_acceptance_test.go b/pkg/resources/task_acceptance_test.go index 72f52a3185..152daf6136 100644 --- a/pkg/resources/task_acceptance_test.go +++ b/pkg/resources/task_acceptance_test.go @@ -181,7 +181,7 @@ func TestAcc_Task(t *testing.T) { Config: taskConfig(initialState), Check: resource.ComposeTestCheckFunc( checkBool("snowflake_task.root_task", "enabled", true), - checkBool("snowflake_task.child_task", "enabled", true), + checkBool("snowflake_task.child_task", "enabled", false), resource.TestCheckResourceAttr("snowflake_task.root_task", "name", rootname), resource.TestCheckResourceAttr("snowflake_task.child_task", "name", childname), resource.TestCheckResourceAttr("snowflake_task.root_task", "database", databasename),