Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-40921][SQL] Add WHEN NOT MATCHED BY SOURCE clause to MERGE INTO #38400

Conversation

johanl-db
Copy link
Contributor

@johanl-db johanl-db commented Oct 26, 2022

What changes were proposed in this pull request?

This change adds a third type of WHEN clause to the MERGE INTO command that allows updating or deleting rows from the target table that have no match in the source table based on the merge condition.

The following example updates all rows from the target table that have a match in the source table using the source value. For target rows that have no match in the source table, the 'state' column of rows that were created before '2022-10-26' is set to 'active', while rows created before that date are deleted from the target table.

MERGE INTO target
USING source
ON target.key = source.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED BY SOURCE AND target.create_at > '2022-10-26' THEN UPDATE SET target.status = 'active'
WHEN NOT MATCHED BY SOURCE THEN DELETE 

In addition, the existing WHEN NOT MATCHED clause can now also include an optional BY TARGET qualifier that has no effect on its semantic other than allowing a more consistent use of the clauses together:

WHEN NOT MATCHED BY TARGET THEN INSERT * 

is equivalent to:

WHEN NOT MATCHED THEN INSERT *

The updated SQL syntax for the MERGE INTO command is described more precisely in the user-facing change section below.

The changes proposed in this pull request are two-fold:

  1. Update SQL parsing to handle the new clause introduced. This results in a new field notMatchedBySourceActions being populated in the logical plan node of the MERGE INTO command MergeIntoTable.
  2. Handle the newly added merge clause during analysis. In particular, resolve all attribute references used in WHEN NOT MATCHED BY SOURCE conditions and actions. The attributes used in a NOT MATCHED BY SOURCE clause may only refer to the target table.

Why are the changes needed?

The new clause opens up uses cases leveraging the merge command to sync a target from a source table by conditionally deleting or updating records that are not present in the source. As an example, the following command incrementally syncs the target table from the source table for the past 5 days:

MERGE INTO target
USING (SELECT `columns`  FROM source  WHERE created_at >= (current_date - INTERVAL ‘5’ DAY)  AS tmp_name
ON FALSE
WHEN NOT MATCHED BY SOURCE AND (current_date - INTERVAL ‘5’ DAY) THEN DELETE
WHEN NOT MATCHED BY TARGET THEN INSERT `columns`

After running this command, all rows older than 5 days in the target table are left unmodified while rows newer than 5 days that are either not already in the target table or not in the source table anymore are inserted and deleted, respectively.

Does this PR introduce any user-facing change?

Two user-facing changes are introduced in the MERGE INTO syntax:

  • WHEN NOT MATCHED BY SOURCE clause.
  • Optional BY TARGET qualifier for WHEN NOT MATCHED clauses.

The updated Spark SQL syntax is:

MERGE INTO target_table_name [target_alias]
   USING source_table_reference [source_alias]
   ON merge_condition
   { WHEN MATCHED [ AND matched_condition ] THEN matched_action |
     WHEN NOT MATCHED [BY TARGET] [ AND not_matched_condition ] THEN not_matched_action
     WHEN NOT MATCHED BY SOURCE [ AND not_matched_by_source_condition ] THEN not_matched_by_source_action } [...]

matched_action
 { DELETE |
   UPDATE SET * |
   UPDATE SET { column = [ expr | DEFAULT ] } [, ...] }

not_matched_action
 { INSERT * |
   INSERT (column1 [, ...] ) VALUES ( expr | DEFAULT ] [, ...] )

not_matched_by_source_action
 { DELETE |
   UPDATE SET { column = [ expr | DEFAULT ] } [, ...] }

This syntax replicates the semantics used by other vendors, see:

How was this patch tested?

Tests are extended or added to cover the following aspect of the change:

  • Parsing (in DDLParserSuite.scala):
    • Existing tests are extended to cover parsing of WHEN NOT MATCHED BY SOURCE clauses in a range of cases. This covers parsing the clause with optional conditions and a variety of UPDATE and DELETE actions.
    • New tests are added to cover NOT MATCHED BY TARGET and invalid action UPDATE SET * for WHEN NOT MATCHED BY SOURCE.
  • Analysis (in PlanResolutionSuite.scala):
    • Existing tests are extended to also cover attribute reference resolution for WHEN NOT MATCHED BY SOURCE conditions and actions together with other type of clauses.
    • New tests are added to cover reference resolution specific to WHEN NOT MATCHED BY SOURCE clauses:
      • Unqualified reference to a column present both in the target and source table is not ambiguous in WHEN NOT MATCHED BY SOURCE conditions or actions since it can only refer to the target table.
      • Reference to columns in the source table are invalid in WHEN NOT MATCHED BY SOURCE.

…O command

This change adds a third type of WHEN clause to the MERGE INTO that allows
updating or deleting rows from the target table that have no match in the
source table based on the merge condition.
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

resolveMergeExprOrFail(c, mergeInto)
val resolveFromChildren = resolveValuesFrom match {
case m: MergeIntoTable => m
case p => Project(Nil, p)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this pattern match looks a bit hacky. How about adding a new method?

def resolveAssignmentsByPlanOutput ... {
  resolveAssignments(..., Project(Nil, p))
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or we pass an enum:

  object MergeResolvePolicy extends Enumeration {
    val BOTH, TARGET, SOURCE = Value
  }

then the code here can be

val basePlan = policy match {
  case BOTH => m
  case SOURCE => Project(Nil, m.source)
  case TARGET => Project(Nil, m.target)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went the enum route, introduced MergeResolvePolicy.

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except for a few minor comments

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in c7fa847 Nov 1, 2022
SandishKumarHN pushed a commit to SandishKumarHN/spark that referenced this pull request Dec 12, 2022
### What changes were proposed in this pull request?
This change adds a third type of WHEN clause to the MERGE INTO command that allows updating or deleting rows from the target table that have no match in the source table based on the merge condition.

The following example updates all rows from the target table that have a match in the source table using the source value. For target rows that have no match in the source table, the 'state' column of rows that were created before '2022-10-26' is set to 'active', while rows created before that date are deleted from the target table.
```
MERGE INTO target
USING source
ON target.key = source.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED BY SOURCE AND target.create_at > '2022-10-26' THEN UPDATE SET target.status = 'active'
WHEN NOT MATCHED BY SOURCE THEN DELETE
```

In addition, the existing WHEN NOT MATCHED clause can now also include an optional BY TARGET qualifier that has no effect on its semantic other than allowing a more consistent use of the clauses together:
```
WHEN NOT MATCHED BY TARGET THEN INSERT *
```
is equivalent to:
```
WHEN NOT MATCHED THEN INSERT *
```
The updated SQL syntax for the MERGE INTO command is described more precisely in the user-facing change section below.

The changes proposed in this pull request are two-fold:
1. Update SQL parsing to handle the new clause introduced. This results in a new field `notMatchedBySourceActions` being populated in the logical plan node of the MERGE INTO command `MergeIntoTable`.
2. Handle the newly added merge clause during analysis. In particular, resolve all attribute references used in WHEN NOT MATCHED BY SOURCE conditions and actions. The attributes used in a NOT MATCHED BY SOURCE clause may only refer to the target table.

### Why are the changes needed?
The new clause opens up uses cases leveraging the merge command to sync a target from a source table by conditionally deleting or updating records that are not present in the source. As an example, the following command incrementally syncs the target table from the source table for the past 5 days:
```
MERGE INTO target
USING (SELECT `columns`  FROM source  WHERE created_at >= (current_date - INTERVAL ‘5’ DAY)  AS tmp_name
ON FALSE
WHEN NOT MATCHED BY SOURCE AND (current_date - INTERVAL ‘5’ DAY) THEN DELETE
WHEN NOT MATCHED BY TARGET THEN INSERT `columns`
```
After running this command, all rows older than 5 days in the target table are left unmodified while rows newer than 5 days that are either not already in the target table or not in the source table anymore are inserted and deleted, respectively.

### Does this PR introduce _any_ user-facing change?
Two user-facing changes are introduced in the MERGE INTO syntax:
- WHEN NOT MATCHED BY SOURCE clause.
- Optional BY TARGET qualifier for WHEN NOT MATCHED clauses.

The updated Spark SQL syntax is:
```
MERGE INTO target_table_name [target_alias]
   USING source_table_reference [source_alias]
   ON merge_condition
   { WHEN MATCHED [ AND matched_condition ] THEN matched_action |
     WHEN NOT MATCHED [BY TARGET] [ AND not_matched_condition ] THEN not_matched_action
     WHEN NOT MATCHED BY SOURCE [ AND not_matched_by_source_condition ] THEN not_matched_by_source_action } [...]

matched_action
 { DELETE |
   UPDATE SET * |
   UPDATE SET { column = [ expr | DEFAULT ] } [, ...] }

not_matched_action
 { INSERT * |
   INSERT (column1 [, ...] ) VALUES ( expr | DEFAULT ] [, ...] )

not_matched_by_source_action
 { DELETE |
   UPDATE SET { column = [ expr | DEFAULT ] } [, ...] }
```
This syntax replicates the semantics used by other vendors, see:
- [Microsoft T-SQL Merge](https://learn.microsoft.com/en-us/sql/t-sql/statements/merge-transact-sql)
- [Google BigQuery Merge](https://cloud.google.com/bigquery/docs/reference/standard-sql/dml-syntax#merge_statement)

### How was this patch tested?
Tests are extended or added to cover the following aspect of the change:
- Parsing (in DDLParserSuite.scala):
  - Existing tests are extended to cover parsing of WHEN NOT MATCHED BY SOURCE clauses in a range of cases. This covers parsing the clause with optional conditions and a variety of UPDATE and DELETE actions.
  - New tests are added to cover NOT MATCHED BY TARGET and invalid action UPDATE SET * for WHEN NOT MATCHED BY SOURCE.
- Analysis (in PlanResolutionSuite.scala):
  - Existing tests are extended to also cover attribute reference resolution for WHEN NOT MATCHED BY SOURCE conditions and actions together with other type of clauses.
  - New tests are added to cover reference resolution specific to WHEN NOT MATCHED BY SOURCE clauses:
    - Unqualified reference to a column present both in the target and source table is not ambiguous in WHEN NOT MATCHED BY SOURCE conditions or actions since it can only refer to the target table.
    - Reference to columns in the source table are invalid in WHEN NOT MATCHED BY SOURCE.

Closes apache#38400 from johanl-db/SPARK-40921-when-not-matched-by-source.

Authored-by: Johan Lasperas <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants