Skip to content

Commit

Permalink
feat: Task error integration (#830)
Browse files Browse the repository at this point in the history
  • Loading branch information
gouline authored Jan 26, 2022
1 parent b79988e commit 8acfd5f
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 35 deletions.
1 change: 1 addition & 0 deletions docs/resources/task.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ resource snowflake_task serverless_task {
- **after** (String) Specifies the predecessor task in the same database and schema of the current task. When a run of the predecessor task finishes successfully, it triggers this task (after a brief lag). (Conflict with schedule)
- **comment** (String) Specifies a comment for the task.
- **enabled** (Boolean) Specifies if the task should be started (enabled) after creation or should remain suspended (default).
- **error_integration** (String) Specifies the name of the notification integration used for error notifications.
- **id** (String) The ID of this resource.
- **schedule** (String) The schedule for periodically running the task. This can be a cron or interval in minutes. (Conflict with after)
- **session_parameters** (Map of String) Specifies session parameters to set for the session when the task runs. A task supports all session parameters.
Expand Down
23 changes: 23 additions & 0 deletions pkg/resources/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ var taskSchema = map[string]*schema.Schema{
Description: "Specifies the size of the compute resources to provision for the first run of the task, before a task history is available for Snowflake to determine an ideal size. Once a task has successfully completed a few runs, Snowflake ignores this parameter setting. (Conflicts with warehouse)",
ConflictsWith: []string{"warehouse"},
},
"error_integration": {
Type: schema.TypeString,
Optional: true,
Description: "Specifies the name of the notification integration used for error notifications.",
},
}

type taskID struct {
Expand Down Expand Up @@ -317,6 +322,11 @@ func ReadTask(d *schema.ResourceData, meta interface{}) error {
return err
}

err = d.Set("error_integration", t.ErrorIntegration.String)
if err != nil {
return err
}

if t.Predecessors != nil {
err = d.Set("after", t.GetPredecessorName())
if err != nil {
Expand Down Expand Up @@ -427,6 +437,10 @@ func CreateTask(d *schema.ResourceData, meta interface{}) error {
builder.WithComment(v.(string))
}

if v, ok := d.GetOk("error_integration"); ok {
builder.WithErrorIntegration((v.(string)))
}

if v, ok := d.GetOk("after"); ok {
root, err := getActiveRootTaskAndSuspend(d, meta)
if err != nil {
Expand Down Expand Up @@ -517,6 +531,15 @@ func UpdateTask(d *schema.ResourceData, meta interface{}) error {

}

if d.HasChange("error_integration") {
errorIntegration := d.Get("error_integration")
q := builder.ChangeErrorIntegration(errorIntegration.(string))
err := snowflake.Exec(db, q)
if err != nil {
return errors.Wrapf(err, "error updating task error_integration on %v", d.Id())
}
}

// Need to remove dependency before adding schedule if needed
if d.HasChange("after") {
var (
Expand Down
43 changes: 23 additions & 20 deletions pkg/resources/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,22 @@ func TestTaskCreate(t *testing.T) {
r := require.New(t)

in := map[string]interface{}{
"enabled": true,
"name": "test_task",
"database": "test_db",
"schema": "test_schema",
"warehouse": "much_warehouse",
"sql_statement": "select hi from hello",
"comment": "wow comment",
"enabled": true,
"name": "test_task",
"database": "test_db",
"schema": "test_schema",
"warehouse": "much_warehouse",
"sql_statement": "select hi from hello",
"comment": "wow comment",
"error_integration": "test_notification_integration",
}

d := schema.TestResourceDataRaw(t, resources.Task().Schema, in)
r.NotNil(d)

WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) {
mock.ExpectExec(
`^CREATE TASK "test_db"."test_schema"."test_task" WAREHOUSE = "much_warehouse" COMMENT = 'wow comment' AS select hi from hello$`,
`^CREATE TASK "test_db"."test_schema"."test_task" WAREHOUSE = "much_warehouse" COMMENT = 'wow comment' ERROR_INTEGRATION = 'test_notification_integration' AS select hi from hello$`,
).WillReturnResult(sqlmock.NewResult(1, 1))

mock.ExpectExec(
Expand All @@ -55,12 +56,13 @@ func TestTaskCreateManagedWithInitSize(t *testing.T) {
r := require.New(t)

in := map[string]interface{}{
"enabled": true,
"name": "test_task",
"database": "test_db",
"schema": "test_schema",
"sql_statement": "select hi from hello",
"comment": "wow comment",
"enabled": true,
"name": "test_task",
"database": "test_db",
"schema": "test_schema",
"sql_statement": "select hi from hello",
"comment": "wow comment",
"error_integration": "test_notification_integration",
"user_task_managed_initial_warehouse_size": "XSMALL",
}

Expand All @@ -69,7 +71,7 @@ func TestTaskCreateManagedWithInitSize(t *testing.T) {

WithMockDb(t, func(db *sql.DB, mock sqlmock.Sqlmock) {
mock.ExpectExec(
`^CREATE TASK "test_db"."test_schema"."test_task" USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE = 'XSMALL' COMMENT = 'wow comment' AS select hi from hello$`,
`^CREATE TASK "test_db"."test_schema"."test_task" USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE = 'XSMALL' COMMENT = 'wow comment' ERROR_INTEGRATION = 'test_notification_integration' AS select hi from hello$`,
).WillReturnResult(sqlmock.NewResult(1, 1))

mock.ExpectExec(
Expand Down Expand Up @@ -116,8 +118,8 @@ func TestTaskCreateManagedWithoutInitSize(t *testing.T) {

func expectReadTask(mock sqlmock.Sqlmock) {
rows := sqlmock.NewRows([]string{
"created_on", "name", "database_name", "schema_name", "owner", "comment", "warehouse", "schedule", "predecessors", "state", "definition", "condition"},
).AddRow("2020-05-14 17:20:50.088 +0000", "test_task", "test_db", "test_schema", "ACCOUNTADMIN", "wow comment", "", "", "", "started", "select hi from hello", "")
"created_on", "name", "database_name", "schema_name", "owner", "comment", "warehouse", "schedule", "predecessors", "state", "definition", "condition", "error_integration"},
).AddRow("2020-05-14 17:20:50.088 +0000", "test_task", "test_db", "test_schema", "ACCOUNTADMIN", "wow comment", "", "", "", "started", "select hi from hello", "", "test_integration")
mock.ExpectQuery(`^SHOW TASKS LIKE 'test_task' IN SCHEMA "test_db"."test_schema"$`).WillReturnRows(rows)
}

Expand All @@ -132,9 +134,10 @@ func TestTaskRead(t *testing.T) {
r := require.New(t)

in := map[string]interface{}{
"name": "test_task",
"database": "test_db",
"schema": "test_schema",
"name": "test_task",
"database": "test_db",
"schema": "test_schema",
"error_integration": "test_notification_integration",
}

d := task(t, "test_db|test_schema|test_task", in)
Expand Down
1 change: 0 additions & 1 deletion pkg/snowflake/external_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ func (tb *ExternalTableBuilder) GetTagValueString() string {
return strings.TrimSuffix(q.String(), ", ")
}


type externalTable struct {
CreatedOn sql.NullString `db:"created_on"`
ExternalTableName sql.NullString `db:"name"`
Expand Down
2 changes: 1 addition & 1 deletion pkg/snowflake/external_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestExternalTableCreate(t *testing.T) {
r.Equal(s.Create(), `CREATE EXTERNAL TABLE "test_db"."test_schema"."test_table" ("column1" OBJECT AS expression1, "column2" VARCHAR AS expression2) WITH LOCATION = location REFRESH_ON_CREATE = false AUTO_REFRESH = false PATTERN = 'pattern' FILE_FORMAT = ( file format ) COMMENT = 'Test Comment'`)
}

func TestExternalTableUpdate(t *testing.T){
func TestExternalTableUpdate(t *testing.T) {
r := require.New(t)
s := ExternalTable("test_table", "test_db", "test_schema")
s.WithTags([]TagValue{{Name: "tag1", Value: "value1", Schema: "test_schema", Database: "test_db"}})
Expand Down
43 changes: 30 additions & 13 deletions pkg/snowflake/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type TaskBuilder struct {
sql_statement string
disabled bool
user_task_managed_initial_warehouse_size string
errorIntegration string
}

// GetFullName prepends db and schema to in parameter
Expand Down Expand Up @@ -102,6 +103,12 @@ func (tb *TaskBuilder) WithInitialWarehouseSize(initialWarehouseSize string) *Ta
return tb
}

/// WithErrorIntegration adds ErrorIntegration specification to the TaskBuilder
func (tb *TaskBuilder) WithErrorIntegration(s string) *TaskBuilder {
tb.errorIntegration = s
return tb
}

// Task returns a pointer to a Builder that abstracts the DDL operations for a task.
//
// Supported DDL operations are:
Expand Down Expand Up @@ -157,6 +164,10 @@ func (tb *TaskBuilder) Create() string {
q.WriteString(fmt.Sprintf(` COMMENT = '%v'`, EscapeString(tb.comment)))
}

if tb.errorIntegration != "" {
q.WriteString(fmt.Sprintf(` ERROR_INTEGRATION = '%v'`, EscapeString(tb.errorIntegration)))
}

if tb.user_task_timeout_ms > 0 {
q.WriteString(fmt.Sprintf(` USER_TASK_TIMEOUT_MS = %v`, tb.user_task_timeout_ms))
}
Expand Down Expand Up @@ -309,20 +320,26 @@ func (tb *TaskBuilder) IsDisabled() bool {
return tb.disabled
}

// ChangeErrorIntegration return SQL query that will update the error_integration on the task.
func (tb *TaskBuilder) ChangeErrorIntegration(c string) string {
return fmt.Sprintf(`ALTER TASK %v SET ERROR_INTEGRATION = %v`, tb.QualifiedName(), EscapeString(c))
}

type task struct {
Id string `db:"id"`
CreatedOn string `db:"created_on"`
Name string `db:"name"`
DatabaseName string `db:"database_name"`
SchemaName string `db:"schema_name"`
Owner string `db:"owner"`
Comment *string `db:"comment"`
Warehouse *string `db:"warehouse"`
Schedule *string `db:"schedule"`
Predecessors *string `db:"predecessors"`
State string `db:"state"`
Definition string `db:"definition"`
Condition *string `db:"condition"`
Id string `db:"id"`
CreatedOn string `db:"created_on"`
Name string `db:"name"`
DatabaseName string `db:"database_name"`
SchemaName string `db:"schema_name"`
Owner string `db:"owner"`
Comment *string `db:"comment"`
Warehouse *string `db:"warehouse"`
Schedule *string `db:"schedule"`
Predecessors *string `db:"predecessors"`
State string `db:"state"`
Definition string `db:"definition"`
Condition *string `db:"condition"`
ErrorIntegration sql.NullString `db:"error_integration"`
}

func (t *task) IsEnabled() bool {
Expand Down

0 comments on commit 8acfd5f

Please sign in to comment.