From 7b0f51409c8f7cc45d13e72a136d41be71ba9de8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Wa=C5=9Bko?= Date: Wed, 14 Jun 2023 13:45:15 +0200 Subject: [PATCH 01/11] Implementing Update --- .../Database/0.0.0-dev/src/Errors.enso | 4 ++-- .../src/Internal/Base_Generator.enso | 14 +++++++++++++ .../0.0.0-dev/src/Internal/IR/Query.enso | 3 +++ .../0.0.0-dev/src/Internal/Upload_Table.enso | 20 +++++++++++++++++-- .../Table_Tests/src/Database/Upload_Spec.enso | 6 +++--- 5 files changed, 40 insertions(+), 7 deletions(-) diff --git a/distribution/lib/Standard/Database/0.0.0-dev/src/Errors.enso b/distribution/lib/Standard/Database/0.0.0-dev/src/Errors.enso index e96ffdceb0f7..5fde4ad7f0a5 100644 --- a/distribution/lib/Standard/Database/0.0.0-dev/src/Errors.enso +++ b/distribution/lib/Standard/Database/0.0.0-dev/src/Errors.enso @@ -180,13 +180,13 @@ type Unmatched_Rows ## PRIVATE Indicates that the `Update` operation encountered input rows that did not have any matching rows in the target table. - Error + Error (count : Integer) ## PRIVATE Pretty print the rows already present error. to_display_text : Text to_display_text self = - "The `Update` operation encountered input rows that did not have any matching rows in the target table. The operation has been rolled back. Consider `Update_Or_Insert` if you want to insert rows that do not match any existing rows." + "The `Update` operation encountered " + self.count.to_text + " input rows that did not have any matching rows in the target table. The operation has been rolled back. Consider `Update_Or_Insert` if you want to insert rows that do not match any existing rows." type Rows_Already_Present ## PRIVATE diff --git a/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Base_Generator.enso b/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Base_Generator.enso index 4deb208560ca..9112343f3041 100644 --- a/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Base_Generator.enso +++ b/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Base_Generator.enso @@ -442,6 +442,20 @@ generate_query dialect query = case query of columns_part = Builder.join ", " (column_names.map dialect.wrap_identifier) . paren Builder.code "INSERT INTO " ++ dialect.wrap_identifier table_name ++ " " ++ columns_part ++ " " ++ inner_query _ -> Panic.throw (Illegal_State.Error "The inner query of `Query.Insert_From_Select` must be `Query.Select`, but it was "+select_query.to_display_text+". This is a bug in the Database library.") + Query.Update_From_Table target_table_name source_table_name column_names key_columns -> + target = dialect.wrap_identifier target_table_name + source = dialect.wrap_identifier source_table_name + if column_names.is_empty then + Panic.throw (Illegal_State.Error "Invalid IR: `Query.Update_From_Table` must have at least one column to update. This is a bug in the Database library.") + if key_columns.is_empty then + Panic.throw (Illegal_State.Error "Invalid IR: `Query.Update_From_Table` must have at least one key column. This is a bug in the Database library.") + set_part = Builder.join ", " <| column_names.map column_name-> + id = dialect.wrap_identifier column_name + Builder.code id ++ "=" ++ source ++ "." ++ id + key_part = Builder.join " AND " <| column_names.map column_name-> + id = dialect.wrap_identifier column_name + Builder.code target ++ "." ++ id ++ "=" ++ source ++ "." ++ id + Builder.code "UPDATE " ++ target ++ " SET " ++ set_part ++ " FROM " ++ source ++ " WHERE " ++ key_part _ -> Error.throw <| Unsupported_Database_Operation.Error "Unsupported query type: "+query.to_text ## PRIVATE diff --git a/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/IR/Query.enso b/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/IR/Query.enso index 5ef57db05cfc..de8ffe67e4c3 100644 --- a/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/IR/Query.enso +++ b/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/IR/Query.enso @@ -62,3 +62,6 @@ type Query The columns in that query should correspond to the columns in specified in `column_names`, matching by position. Insert_From_Select (table_name:Text) (column_names : Vector Text) (select:Query) + + ## PRIVATE + Update_From_Table (target_table_name:Text) (source_table_name:Text) (column_names : Vector Text) (key_columns : Vector Text) diff --git a/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Upload_Table.enso b/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Upload_Table.enso index d643300a4ab9..88946889dfac 100644 --- a/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Upload_Table.enso +++ b/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Upload_Table.enso @@ -227,13 +227,16 @@ common_update_table source_table connection table_name update_action key_columns Error.throw new_error if target_table.is_error then handle_error else tmp_table_name = "temporary-source-table-"+random_uuid - tmp_table = internal_upload_table source_table connection tmp_table_name key_columns temporary=True on_problems=Problem_Behavior.Report_Error + tmp_table = internal_upload_table source_table connection tmp_table_name primary_key=key_columns temporary=True on_problems=Problem_Behavior.Report_Error tmp_table.if_not_error <| resulting_table = append_to_existing_table tmp_table target_table update_action key_columns error_on_missing_columns on_problems connection.drop_table tmp_table.name resulting_table ## PRIVATE + Assumes that `source_table` is a simple table query without any filters, + joins and other composite operations - if a complex query is needed, it + should be first materialized into a temporary table. append_to_existing_table source_table target_table update_action key_columns error_on_missing_columns on_problems = In_Transaction.ensure_in_transaction <| effective_key_columns = if key_columns.is_nothing then [] else key_columns check_update_arguments source_table target_table effective_key_columns update_action error_on_missing_columns on_problems <| @@ -251,12 +254,25 @@ append_to_existing_table source_table target_table update_action key_columns err insert_statement = connection.dialect.generate_sql <| Query.Insert_From_Select target_table.name source_table.column_names source_table.to_select_query Panic.rethrow <| connection.execute_update insert_statement - Update_Action.Update -> throw_not_implemented + Update_Action.Update -> + rows_missing_from_target = source_table.join target_table on=key_columns join_kind=Join_Kind.Left_Exclusive + rows_missing_from_target_count = rows_missing_from_target.row_count + if rows_missing_from_target_count != 0 then Error.throw (Unmatched_Rows.Error rows_missing_from_target_count) else + connection = target_table.connection + update_statement = connection.dialect.generate_sql <| + Query.Update_From_Table target_table.name source_table.name source_table.column_names key_columns + Panic.rethrow <| connection.execute_update update_statement Update_Action.Update_Or_Insert -> throw_not_implemented Update_Action.Align_Records -> throw_not_implemented upload_status.if_not_error target_table ## PRIVATE + This helper ensures that all arguments are valid. + + The `action` is run only if the input invariants are satisfied: + - all columns in `source_table` have a corresponding column in `target_table` + (with the same name), + - all `key_columns` are present in both source and target tables. check_update_arguments source_table target_table key_columns update_action error_on_missing_columns on_problems ~action = check_source_column source_column = # The column must exist because it was verified earlier. diff --git a/test/Table_Tests/src/Database/Upload_Spec.enso b/test/Table_Tests/src/Database/Upload_Spec.enso index ac8f7df1ac82..61bc1e180fb1 100644 --- a/test/Table_Tests/src/Database/Upload_Spec.enso +++ b/test/Table_Tests/src/Database/Upload_Spec.enso @@ -412,7 +412,7 @@ spec make_new_connection prefix persistent_connector=True = dest = target_table_builder [["X", [1, 2, 3]], ["Y", ['a', 'b', 'c']]] primary_key=["Y"] default_key_columns connection dest.name . should_fail_with Illegal_Argument - Test.specify "should be able to Update existing rows in a table" pending="TODO: Update" <| + Test.specify "should be able to Update existing rows in a table" <| dest = target_table_builder [["X", [1, 2, 3]], ["Y", ['a', 'b', 'c']]] src = source_table_builder [["X", [2]], ["Y", ['ZZZ']]] @@ -422,7 +422,7 @@ spec make_new_connection prefix persistent_connector=True = rows = dest.rows.to_vector.map .to_vector rows.should_contain_the_same_elements_as [[1, 'a'], [2, 'ZZZ'], [3, 'c']] - Test.specify "should fail on unmatched rows in Update mode" pending="TODO: Update" <| + Test.specify "should fail on unmatched rows in Update mode" <| dest = target_table_builder [["X", [1, 2, 3]], ["Y", ['a', 'b', 'c']]] src = source_table_builder [["X", [2, 100]], ["Y", ['d', 'e']]] r1 = src.update_database_table connection dest.name update_action=Update_Action.Update key_columns=["X"] @@ -478,7 +478,7 @@ spec make_new_connection prefix persistent_connector=True = rows1 = result.rows.to_vector.map .to_vector rows1.should_contain_the_same_elements_as expected_rows - Test.specify "should match columns by name, reordering to destination order if needed (Update)" pending="TODO: Update" <| + Test.specify "should match columns by name, reordering to destination order if needed (Update)" <| dest = target_table_builder [["X", [1, 2, 3]], ["Y", ['a', 'b', 'c']]] primary_key=["X"] src = source_table_builder [["Y", ['d', 'e', 'f']], ["X", [3, 2, 1]]] result = src.update_database_table connection dest.name update_action=Update_Action.Update key_columns=["X"] From 4b1b882f70725cfeb684be6fa062360b30d9463e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Wa=C5=9Bko?= Date: Wed, 14 Jun 2023 14:10:39 +0200 Subject: [PATCH 02/11] fix --- .../Database/0.0.0-dev/src/Internal/Base_Generator.enso | 6 +++--- test/Table_Tests/src/Database/Upload_Spec.enso | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Base_Generator.enso b/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Base_Generator.enso index 9112343f3041..2217b497d9b8 100644 --- a/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Base_Generator.enso +++ b/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Base_Generator.enso @@ -451,10 +451,10 @@ generate_query dialect query = case query of Panic.throw (Illegal_State.Error "Invalid IR: `Query.Update_From_Table` must have at least one key column. This is a bug in the Database library.") set_part = Builder.join ", " <| column_names.map column_name-> id = dialect.wrap_identifier column_name - Builder.code id ++ "=" ++ source ++ "." ++ id - key_part = Builder.join " AND " <| column_names.map column_name-> + id ++ "=" ++ source ++ "." ++ id + key_part = Builder.join " AND " <| key_columns.map column_name-> id = dialect.wrap_identifier column_name - Builder.code target ++ "." ++ id ++ "=" ++ source ++ "." ++ id + target ++ "." ++ id ++ "=" ++ source ++ "." ++ id Builder.code "UPDATE " ++ target ++ " SET " ++ set_part ++ " FROM " ++ source ++ " WHERE " ++ key_part _ -> Error.throw <| Unsupported_Database_Operation.Error "Unsupported query type: "+query.to_text diff --git a/test/Table_Tests/src/Database/Upload_Spec.enso b/test/Table_Tests/src/Database/Upload_Spec.enso index 61bc1e180fb1..5f9dc4a73f40 100644 --- a/test/Table_Tests/src/Database/Upload_Spec.enso +++ b/test/Table_Tests/src/Database/Upload_Spec.enso @@ -418,6 +418,7 @@ spec make_new_connection prefix persistent_connector=True = r1 = src.update_database_table connection dest.name update_action=Update_Action.Update key_columns=["X"] r1.column_names . should_equal ["X", "Y"] + r1.should_succeed rows = dest.rows.to_vector.map .to_vector rows.should_contain_the_same_elements_as [[1, 'a'], [2, 'ZZZ'], [3, 'c']] From 6f7c1daf31956be2a719caee0838c4d22f1c7c7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Wa=C5=9Bko?= Date: Wed, 14 Jun 2023 14:11:22 +0200 Subject: [PATCH 03/11] enable upsert tests --- test/Table_Tests/src/Database/Upload_Spec.enso | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/test/Table_Tests/src/Database/Upload_Spec.enso b/test/Table_Tests/src/Database/Upload_Spec.enso index 5f9dc4a73f40..9b0e80704a6e 100644 --- a/test/Table_Tests/src/Database/Upload_Spec.enso +++ b/test/Table_Tests/src/Database/Upload_Spec.enso @@ -380,7 +380,7 @@ spec make_new_connection prefix persistent_connector=True = sqlite_temporary_primary_key_pending = if prefix.contains "SQLite" then "Fix get_primary_key on SQLite temporary tables" test_table_append source_table_builder target_table_builder = - Test.specify "should be able to append new rows to a table" pending="TODO: Upsert" <| + Test.specify "should be able to append new rows to a table" <| dest = target_table_builder [["X", [1, 2, 3]], ["Y", ['a', 'b', 'c']]] primary_key=["X"] src = source_table_builder [["X", [4, 5, 6]], ["Y", ['d', 'e', 'f']]] result = src.update_database_table connection dest.name key_columns=["X"] @@ -396,7 +396,7 @@ spec make_new_connection prefix persistent_connector=True = r1.should_fail_with Rows_Already_Present # pending=sqlite_temporary_primary_key_pending - Test.specify "should use the target table primary key for the key by default" pending="TODO: Upsert" <| + Test.specify "should use the target table primary key for the key by default" <| dest1 = target_table_builder [["X", [1, 2, 3]], ["Y", ['a', 'b', 'c']], ["Z", [4, 5, 6]]] primary_key=["Y", "Z"] default_key_columns connection dest1.name . should_equal ["Y", "Z"] @@ -433,7 +433,7 @@ spec make_new_connection prefix persistent_connector=True = rows = dest.rows.to_vector.map .to_vector rows.should_contain_the_same_elements_as [[1, 'a'], [2, 'b'], [3, 'c']] - Test.specify "should upsert by default (update existing rows, insert new rows)" pending="TODO: Upsert" <| + Test.specify "should upsert by default (update existing rows, insert new rows)" <| dest = target_table_builder [["X", [1, 2, 3]], ["Y", ['a', 'b', 'c']]] primary_key=["X"] src = source_table_builder [["X", [2, 100]], ["Y", ['D', 'E']]] r1 = src.update_database_table connection dest.name key_columns=["X"] @@ -469,7 +469,7 @@ spec make_new_connection prefix persistent_connector=True = rows1 = result.rows.to_vector.map .to_vector rows1.should_contain_the_same_elements_as expected_rows - Test.specify "should match columns by name, reordering to destination order if needed (Upsert)" pending="TODO: Upsert" <| + Test.specify "should match columns by name, reordering to destination order if needed (Upsert)" <| dest = target_table_builder [["X", [1, 2, 3]], ["Y", ['a', 'b', 'c']]] primary_key=["X"] src = source_table_builder [["Y", ['d', 'e', 'f']], ["X", [1, 5, 6]]] result = src.update_database_table connection dest.name key_columns=["X"] @@ -499,7 +499,7 @@ spec make_new_connection prefix persistent_connector=True = rows1 = result.rows.to_vector.map .to_vector rows1.should_contain_the_same_elements_as expected_rows - Test.specify "should allow to use a transformed table, with computed fields, as a source" pending="TODO: Upsert" <| + Test.specify "should allow to use a transformed table, with computed fields, as a source" <| dest = target_table_builder [["X", [1, 2, 3]], ["Y", ['a', 'b', 'c']]] primary_key=["X"] t1 = source_table_builder [["Z", [10, 20]], ["Y", ['D', 'E']]] t2 = source_table_builder [["Z", [20, 10]], ["X", [-99, 10]]] @@ -556,7 +556,7 @@ spec make_new_connection prefix persistent_connector=True = r2 = src.update_database_table connection d2.name key_columns=["X"] update_action=Update_Action.Insert r2.should_fail_with Non_Unique_Primary_Key - Test.specify "should fail if the key causes update of multiple values (it's not unique in the target table)" pending="TODO: Upsert" <| + Test.specify "should fail if the key causes update of multiple values (it's not unique in the target table)" <| dest = target_table_builder [["X", [1, 1, 2]], ["Y", ['a', 'b', 'c']]] src = source_table_builder [["X", [1, 2, 3]], ["Y", ['d', 'e', 'f']]] r1 = src.update_database_table connection dest.name key_columns=["X"] From e042b1788d51e596e72938409ab05d04275f685d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Wa=C5=9Bko?= Date: Wed, 14 Jun 2023 14:26:34 +0200 Subject: [PATCH 04/11] basic upsert impl --- .../0.0.0-dev/src/Internal/Upload_Table.enso | 23 +++++++++++++++---- .../Table_Tests/src/Database/Upload_Spec.enso | 3 +-- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Upload_Table.enso b/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Upload_Table.enso index 88946889dfac..e05fadd01d70 100644 --- a/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Upload_Table.enso +++ b/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Upload_Table.enso @@ -241,7 +241,7 @@ append_to_existing_table source_table target_table update_action key_columns err effective_key_columns = if key_columns.is_nothing then [] else key_columns check_update_arguments source_table target_table effective_key_columns update_action error_on_missing_columns on_problems <| throw_not_implemented = - Error.throw (Illegal_State.Error "Not implemented yet. Only `Insert` is implemented.") + Error.throw (Illegal_State.Error "Not implemented yet.") upload_status = case update_action of Update_Action.Insert -> existing_rows_check = if effective_key_columns.is_empty then True else @@ -255,14 +255,27 @@ append_to_existing_table source_table target_table update_action key_columns err Query.Insert_From_Select target_table.name source_table.column_names source_table.to_select_query Panic.rethrow <| connection.execute_update insert_statement Update_Action.Update -> - rows_missing_from_target = source_table.join target_table on=key_columns join_kind=Join_Kind.Left_Exclusive - rows_missing_from_target_count = rows_missing_from_target.row_count - if rows_missing_from_target_count != 0 then Error.throw (Unmatched_Rows.Error rows_missing_from_target_count) else + unmatched_rows = source_table.join target_table on=key_columns join_kind=Join_Kind.Left_Exclusive + unmatched_rows_count = unmatched_rows.row_count + if unmatched_rows_count != 0 then Error.throw (Unmatched_Rows.Error unmatched_rows_count) else connection = target_table.connection update_statement = connection.dialect.generate_sql <| Query.Update_From_Table target_table.name source_table.name source_table.column_names key_columns Panic.rethrow <| connection.execute_update update_statement - Update_Action.Update_Or_Insert -> throw_not_implemented + Update_Action.Update_Or_Insert -> + connection = target_table.connection + + # The update will only affect matched rows and ignore unmatched ones. + update_statement = connection.dialect.generate_sql <| + Query.Update_From_Table target_table.name source_table.name source_table.column_names key_columns + Panic.rethrow <| connection.execute_update update_statement + + # Having updated the matched rows, I now insert the unmatched ones. + unmatched_rows = source_table.join target_table on=key_columns join_kind=Join_Kind.Left_Exclusive + # assert source_table.column_names == unmatched_rows.column_names + insert_statement = connection.dialect.generate_sql <| + Query.Insert_From_Select target_table.name unmatched_rows.column_names unmatched_rows.to_select_query + Panic.rethrow <| connection.execute_update insert_statement Update_Action.Align_Records -> throw_not_implemented upload_status.if_not_error target_table diff --git a/test/Table_Tests/src/Database/Upload_Spec.enso b/test/Table_Tests/src/Database/Upload_Spec.enso index 9b0e80704a6e..32767f3f499d 100644 --- a/test/Table_Tests/src/Database/Upload_Spec.enso +++ b/test/Table_Tests/src/Database/Upload_Spec.enso @@ -395,8 +395,7 @@ spec make_new_connection prefix persistent_connector=True = r1 = src.update_database_table connection dest.name update_action=Update_Action.Insert key_columns=["X"] r1.should_fail_with Rows_Already_Present - # pending=sqlite_temporary_primary_key_pending - Test.specify "should use the target table primary key for the key by default" <| + Test.specify "should use the target table primary key for the key by default" pending=sqlite_temporary_primary_key_pending <| dest1 = target_table_builder [["X", [1, 2, 3]], ["Y", ['a', 'b', 'c']], ["Z", [4, 5, 6]]] primary_key=["Y", "Z"] default_key_columns connection dest1.name . should_equal ["Y", "Z"] From 44e7a822566b56804a3167d36d3d2017a368f2e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Wa=C5=9Bko?= Date: Wed, 14 Jun 2023 17:08:06 +0200 Subject: [PATCH 05/11] Multiple_Target_Rows_Matched_For_Update --- .../Database/0.0.0-dev/src/Errors.enso | 4 +- .../0.0.0-dev/src/Internal/Upload_Table.enso | 52 ++++++++++++++----- .../Standard/Table/0.0.0-dev/src/Errors.enso | 3 +- .../Table_Tests/src/Database/Upload_Spec.enso | 18 ++++++- 4 files changed, 59 insertions(+), 18 deletions(-) diff --git a/distribution/lib/Standard/Database/0.0.0-dev/src/Errors.enso b/distribution/lib/Standard/Database/0.0.0-dev/src/Errors.enso index 5fde4ad7f0a5..5e23a2dbeed3 100644 --- a/distribution/lib/Standard/Database/0.0.0-dev/src/Errors.enso +++ b/distribution/lib/Standard/Database/0.0.0-dev/src/Errors.enso @@ -204,10 +204,10 @@ type Multiple_Target_Rows_Matched_For_Update ## PRIVATE Indicates that the source table had rows matching multiple rows in the target table by the specified key. - Error + Error (example_key : Vector Any) (example_count : Integer) ## PRIVATE Pretty print the multiple target rows matched for update error. to_display_text : Text to_display_text self = - "The update operation encountered input rows that matched multiple rows in the target table. The operation has been rolled back. You may need to use a more specific key for matching." + "The update operation encountered input rows that matched multiple rows in the target table (for example, the key " + self.example_key.to_display_text + " matched " + self.example_count.to_text + " rows). The operation has been rolled back. You may need to use a more specific key for matching." diff --git a/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Upload_Table.enso b/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Upload_Table.enso index e05fadd01d70..a72bc48ecee3 100644 --- a/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Upload_Table.enso +++ b/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Upload_Table.enso @@ -258,27 +258,53 @@ append_to_existing_table source_table target_table update_action key_columns err unmatched_rows = source_table.join target_table on=key_columns join_kind=Join_Kind.Left_Exclusive unmatched_rows_count = unmatched_rows.row_count if unmatched_rows_count != 0 then Error.throw (Unmatched_Rows.Error unmatched_rows_count) else + check_multiple_target_rows_match source_table target_table key_columns <| + connection = target_table.connection + update_statement = connection.dialect.generate_sql <| + Query.Update_From_Table target_table.name source_table.name source_table.column_names key_columns + Panic.rethrow <| connection.execute_update update_statement + Update_Action.Update_Or_Insert -> + check_multiple_target_rows_match source_table target_table key_columns <| connection = target_table.connection + # The update will only affect matched rows and ignore unmatched ones. update_statement = connection.dialect.generate_sql <| Query.Update_From_Table target_table.name source_table.name source_table.column_names key_columns Panic.rethrow <| connection.execute_update update_statement - Update_Action.Update_Or_Insert -> - connection = target_table.connection - - # The update will only affect matched rows and ignore unmatched ones. - update_statement = connection.dialect.generate_sql <| - Query.Update_From_Table target_table.name source_table.name source_table.column_names key_columns - Panic.rethrow <| connection.execute_update update_statement - # Having updated the matched rows, I now insert the unmatched ones. - unmatched_rows = source_table.join target_table on=key_columns join_kind=Join_Kind.Left_Exclusive - # assert source_table.column_names == unmatched_rows.column_names - insert_statement = connection.dialect.generate_sql <| - Query.Insert_From_Select target_table.name unmatched_rows.column_names unmatched_rows.to_select_query - Panic.rethrow <| connection.execute_update insert_statement + # Having updated the matched rows, I now insert the unmatched ones. + unmatched_rows = source_table.join target_table on=key_columns join_kind=Join_Kind.Left_Exclusive + # assert source_table.column_names == unmatched_rows.column_names + insert_statement = connection.dialect.generate_sql <| + Query.Insert_From_Select target_table.name unmatched_rows.column_names unmatched_rows.to_select_query + Panic.rethrow <| connection.execute_update insert_statement Update_Action.Align_Records -> throw_not_implemented upload_status.if_not_error target_table +## PRIVATE +check_multiple_target_rows_match source_table target_table key_columns ~action = + connection = target_table.connection + agg = (key_columns.map (Aggregate_Column.Group_By _)) + [Aggregate_Column.Count] + duplicated_key_in_target = target_table.aggregate agg . filter -1 (Filter_Condition.Greater than=1) + example = duplicated_key_in_target.read max_rows=1 + case example.row_count == 0 of + False -> + row = duplicated_key_in_target.first_row . to_vector + offending_key = row.drop (Index_Sub_Range.Last 1) + count = row.last + Error.throw (Multiple_Target_Rows_Matched_For_Update.Error offending_key count) + True -> action + +type Unmatched_Source_Rows_Behavior + Insert + Error + +type Unmatched_Target_Rows_Behavior + Keep + Delete + +perform_update source_table target_table key_columns unmatched_rows_behavior unmatched_source_rows_behavior = + "TODO" + ## PRIVATE This helper ensures that all arguments are valid. diff --git a/distribution/lib/Standard/Table/0.0.0-dev/src/Errors.enso b/distribution/lib/Standard/Table/0.0.0-dev/src/Errors.enso index d7b80a8366eb..1b209dc92031 100644 --- a/distribution/lib/Standard/Table/0.0.0-dev/src/Errors.enso +++ b/distribution/lib/Standard/Table/0.0.0-dev/src/Errors.enso @@ -477,13 +477,12 @@ type Unmatched_Columns merged tables. Error (column_names : Vector Text) - # TODO [RW] the message is wrong for update_database_table_case, needs updating ## PRIVATE Create a human-readable version of the error. to_display_text : Text to_display_text self = - "The following columns were not present in some of the provided tables: " + (self.column_names.map (n -> "["+n+"]") . join ", ") + ". The missing values have been filled with `Nothing`." + "The following columns were not present in some of the provided tables: " + (self.column_names.map (n -> "["+n+"]") . join ", ") + "." type Cross_Join_Row_Limit_Exceeded ## PRIVATE diff --git a/test/Table_Tests/src/Database/Upload_Spec.enso b/test/Table_Tests/src/Database/Upload_Spec.enso index 32767f3f499d..532860e45fc6 100644 --- a/test/Table_Tests/src/Database/Upload_Spec.enso +++ b/test/Table_Tests/src/Database/Upload_Spec.enso @@ -502,7 +502,7 @@ spec make_new_connection prefix persistent_connector=True = dest = target_table_builder [["X", [1, 2, 3]], ["Y", ['a', 'b', 'c']]] primary_key=["X"] t1 = source_table_builder [["Z", [10, 20]], ["Y", ['D', 'E']]] t2 = source_table_builder [["Z", [20, 10]], ["X", [-99, 10]]] - src = t1.join t2 on=["Z"] . remove_columns "Z" . set "[X] + 100" "X" + src = t1.join t2 on=["Z"] join_kind=Join_Kind.Inner . remove_columns "Z" . set "[X] + 100" "X" src.at "X" . to_vector . should_contain_the_same_elements_as [1, 110] r1 = src.update_database_table connection dest.name key_columns=["X"] @@ -560,6 +560,20 @@ spec make_new_connection prefix persistent_connector=True = src = source_table_builder [["X", [1, 2, 3]], ["Y", ['d', 'e', 'f']]] r1 = src.update_database_table connection dest.name key_columns=["X"] r1.should_fail_with Multiple_Target_Rows_Matched_For_Update + r1.catch.to_display_text . should_contain "key [1] matched 2 rows" + + src2 = source_table_builder [["X", [1]], ["Y", ['d']]] + r2 = src2.update_database_table connection dest.name key_columns=["X"] update_action=Update_Action.Update + r2.should_fail_with Multiple_Target_Rows_Matched_For_Update + + ## In the future we may consider `Align_Records` to remove the + duplicated rows and keep just one of them. But that probably + should not be a default, so maybe only if we introduce a + parameter like `multi_row_update`. + # TODO + Nothing + #r3 = src.update_database_table connection dest.name key_columns=["X"] update_action=Update_Action.Align_Records + #r3.should_fail_with Multiple_Target_Rows_Matched_For_Update Test.specify "should fail if the source table contains columns not present in the target (data loss)" <| dest = target_table_builder [["X", [0, 10, 100]]] primary_key=["X"] @@ -567,6 +581,8 @@ spec make_new_connection prefix persistent_connector=True = r1 = src.update_database_table connection dest.name key_columns=["X"] r1.should_fail_with Unmatched_Columns r1.catch.column_names . should_equal ["Y"] + r1.catch.to_display_text . should_contain "columns were not present" + r1.catch.to_display_text . should_contain "Y" Test.specify "by default should keep columns missing from source as-is and use defaults when inserting" <| dest_name = Name_Generator.random_name "table-defaults" From 1c571e882c031dea12d0f6bfcbfbab123718c5b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Wa=C5=9Bko?= Date: Wed, 14 Jun 2023 18:53:23 +0200 Subject: [PATCH 06/11] extract common logic into a single function --- .../0.0.0-dev/src/Internal/Upload_Table.enso | 106 +++++++++++------- .../Table_Tests/src/Database/Upload_Spec.enso | 20 ++-- 2 files changed, 79 insertions(+), 47 deletions(-) diff --git a/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Upload_Table.enso b/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Upload_Table.enso index a72bc48ecee3..ac3ab8e604e2 100644 --- a/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Upload_Table.enso +++ b/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Upload_Table.enso @@ -240,8 +240,6 @@ common_update_table source_table connection table_name update_action key_columns append_to_existing_table source_table target_table update_action key_columns error_on_missing_columns on_problems = In_Transaction.ensure_in_transaction <| effective_key_columns = if key_columns.is_nothing then [] else key_columns check_update_arguments source_table target_table effective_key_columns update_action error_on_missing_columns on_problems <| - throw_not_implemented = - Error.throw (Illegal_State.Error "Not implemented yet.") upload_status = case update_action of Update_Action.Insert -> existing_rows_check = if effective_key_columns.is_empty then True else @@ -255,55 +253,85 @@ append_to_existing_table source_table target_table update_action key_columns err Query.Insert_From_Select target_table.name source_table.column_names source_table.to_select_query Panic.rethrow <| connection.execute_update insert_statement Update_Action.Update -> - unmatched_rows = source_table.join target_table on=key_columns join_kind=Join_Kind.Left_Exclusive - unmatched_rows_count = unmatched_rows.row_count - if unmatched_rows_count != 0 then Error.throw (Unmatched_Rows.Error unmatched_rows_count) else - check_multiple_target_rows_match source_table target_table key_columns <| - connection = target_table.connection - update_statement = connection.dialect.generate_sql <| - Query.Update_From_Table target_table.name source_table.name source_table.column_names key_columns - Panic.rethrow <| connection.execute_update update_statement + perform_update source_table target_table key_columns Unmatched_Source_Rows_Behavior.Error Unmatched_Target_Rows_Behavior.Ignore Update_Action.Update_Or_Insert -> - check_multiple_target_rows_match source_table target_table key_columns <| - connection = target_table.connection - # The update will only affect matched rows and ignore unmatched ones. - update_statement = connection.dialect.generate_sql <| - Query.Update_From_Table target_table.name source_table.name source_table.column_names key_columns - Panic.rethrow <| connection.execute_update update_statement - - # Having updated the matched rows, I now insert the unmatched ones. - unmatched_rows = source_table.join target_table on=key_columns join_kind=Join_Kind.Left_Exclusive - # assert source_table.column_names == unmatched_rows.column_names - insert_statement = connection.dialect.generate_sql <| - Query.Insert_From_Select target_table.name unmatched_rows.column_names unmatched_rows.to_select_query - Panic.rethrow <| connection.execute_update insert_statement - Update_Action.Align_Records -> throw_not_implemented + perform_update source_table target_table key_columns Unmatched_Source_Rows_Behavior.Insert Unmatched_Target_Rows_Behavior.Ignore + Update_Action.Align_Records -> + perform_update source_table target_table key_columns Unmatched_Source_Rows_Behavior.Insert Unmatched_Target_Rows_Behavior.Delete upload_status.if_not_error target_table ## PRIVATE -check_multiple_target_rows_match source_table target_table key_columns ~action = +type Unmatched_Source_Rows_Behavior + ## PRIVATE + Insert + + ## PRIVATE + Error + +## PRIVATE +type Unmatched_Target_Rows_Behavior + ## PRIVATE + Ignore + + ## PRIVATE + Delete + +## PRIVATE + Common logic for `Update`, `Update_Or_Insert` and `Align_Records` actions. +perform_update source_table target_table key_columns unmatched_source_rows_behavior unmatched_target_rows_behavior = Panic.recover [Unmatched_Rows, Multiple_Target_Rows_Matched_For_Update] <| + unmatched_source_rows = source_table.join target_table on=key_columns join_kind=Join_Kind.Left_Exclusive + + # First, check if there are unmatched source rows and report an error if needed. + case unmatched_source_rows_behavior of + Unmatched_Source_Rows_Behavior.Error -> + count = unmatched_source_rows.row_count + if count != 0 then Panic.throw (Unmatched_Rows.Error count) else Nothing + + # The insert will happen _after_ updates. + Unmatched_Source_Rows_Behavior.Insert -> Nothing + + # Check if there are rows in source that match multiple rows in the target. + check_multiple_target_rows_match source_table target_table key_columns + connection = target_table.connection + + # The update will only affect matched rows and ignore unmatched ones. + update_statement = connection.dialect.generate_sql <| + Query.Update_From_Table target_table.name source_table.name source_table.column_names key_columns + Panic.rethrow <| connection.execute_update update_statement + + case unmatched_target_rows_behavior of + Unmatched_Target_Rows_Behavior.Ignore -> Nothing + Unmatched_Target_Rows_Behavior.Delete -> + Error.throw (Illegal_State.Error "Not implemented yet.") + + case unmatched_source_rows_behavior of + # This check was handled before. + Unmatched_Source_Rows_Behavior.Error -> Nothing + + # Having updated the matched rows, I now insert the unmatched ones. + Unmatched_Source_Rows_Behavior.Insert -> + # assert source_table.column_names == unmatched_rows.column_names + insert_statement = connection.dialect.generate_sql <| + Query.Insert_From_Select target_table.name unmatched_source_rows.column_names unmatched_source_rows.to_select_query + Panic.rethrow <| connection.execute_update insert_statement + +## PRIVATE +check_multiple_target_rows_match source_table target_table key_columns = + matched_rows = source_table.join target_table on=key_columns join_kind=Join_Kind.Inner agg = (key_columns.map (Aggregate_Column.Group_By _)) + [Aggregate_Column.Count] - duplicated_key_in_target = target_table.aggregate agg . filter -1 (Filter_Condition.Greater than=1) + ## This aggregation will only find duplicated in target, not in the source, + because the source is already guaranteed to be unique - that was checked + when uploading the temporary table with the key as its primary key. + duplicated_key_in_target = matched_rows.aggregate agg . filter -1 (Filter_Condition.Greater than=1) example = duplicated_key_in_target.read max_rows=1 case example.row_count == 0 of False -> row = duplicated_key_in_target.first_row . to_vector offending_key = row.drop (Index_Sub_Range.Last 1) count = row.last - Error.throw (Multiple_Target_Rows_Matched_For_Update.Error offending_key count) - True -> action - -type Unmatched_Source_Rows_Behavior - Insert - Error - -type Unmatched_Target_Rows_Behavior - Keep - Delete - -perform_update source_table target_table key_columns unmatched_rows_behavior unmatched_source_rows_behavior = - "TODO" + Panic.throw (Multiple_Target_Rows_Matched_For_Update.Error offending_key count) + True -> Nothing ## PRIVATE This helper ensures that all arguments are valid. diff --git a/test/Table_Tests/src/Database/Upload_Spec.enso b/test/Table_Tests/src/Database/Upload_Spec.enso index 532860e45fc6..d4abae10f1f2 100644 --- a/test/Table_Tests/src/Database/Upload_Spec.enso +++ b/test/Table_Tests/src/Database/Upload_Spec.enso @@ -446,7 +446,7 @@ spec make_new_connection prefix persistent_connector=True = rows2 = dest.rows.to_vector.map .to_vector rows2.should_contain_the_same_elements_as expected_rows - Test.specify "should allow to align an existing table with a source (upsert + delete rows missing from source)" pending="TODO: Align_Records" <| + Test.specify "should allow to align an existing table with a source (upsert + delete rows missing from source)" <| dest = target_table_builder [["X", [1, 2, 3]], ["Y", ['a', 'b', 'c']]] primary_key=["X"] src = source_table_builder [["X", [2, 100]], ["Y", ['D', 'E']]] r1 = src.update_database_table connection dest.name update_action=Update_Action.Align_Records key_columns=["X"] @@ -488,7 +488,7 @@ spec make_new_connection prefix persistent_connector=True = rows1 = result.rows.to_vector.map .to_vector rows1.should_contain_the_same_elements_as expected_rows - Test.specify "should match columns by name, reordering to destination order if needed (Align)" pending="TODO: Align_Records" <| + Test.specify "should match columns by name, reordering to destination order if needed (Align)" <| dest = target_table_builder [["X", [1, 2, 3]], ["Y", ['a', 'b', 'c']]] primary_key=["X"] src = source_table_builder [["Y", ['d', 'e', 'f']], ["X", [2, 1, 6]]] result = src.update_database_table connection dest.name update_action=Update_Action.Align_Records key_columns=["X"] @@ -520,7 +520,7 @@ spec make_new_connection prefix persistent_connector=True = result.column_names . should_equal ["X"] result.at "X" . to_vector . should_contain_the_same_elements_as [1, 10, 100, 1, 2, 3] - Test.specify "should fail if no key is specified in other modes" pending="TODO: other modes" <| + Test.specify "should fail if no key is specified in other modes" <| dest = target_table_builder [["X", [1, 10, 100]]] src = source_table_builder [["X", [1, 2, 3]]] @@ -570,10 +570,14 @@ spec make_new_connection prefix persistent_connector=True = duplicated rows and keep just one of them. But that probably should not be a default, so maybe only if we introduce a parameter like `multi_row_update`. - # TODO - Nothing - #r3 = src.update_database_table connection dest.name key_columns=["X"] update_action=Update_Action.Align_Records - #r3.should_fail_with Multiple_Target_Rows_Matched_For_Update + r3 = src.update_database_table connection dest.name key_columns=["X"] update_action=Update_Action.Align_Records + r3.should_fail_with Multiple_Target_Rows_Matched_For_Update + + ## BUT the check should not throw an error if the duplicated key is on an unaffected row! + (here key 1 is duplicated, but we are NOT updating it) + src3 = source_table_builder [["X", [2]], ["Y", ['f']]] + Problems.assume_no_problems <| + src3.update_database_table connection dest.name key_columns=["X"] Test.specify "should fail if the source table contains columns not present in the target (data loss)" <| dest = target_table_builder [["X", [0, 10, 100]]] primary_key=["X"] @@ -596,7 +600,7 @@ spec make_new_connection prefix persistent_connector=True = rows1.should_contain_the_same_elements_as expected_rows connection.drop_table dest_name - Test.specify "should use defaults for missing input columns for newly inserted rows when Aligning the tables, but keep existing values for existing rows" pending="TODO: Align_Records" <| + Test.specify "should use defaults for missing input columns for newly inserted rows when Aligning the tables, but keep existing values for existing rows" <| dest_name = Name_Generator.random_name "table-defaults-align" connection.create_table dest_name [Column_Description.Value "X" Value_Type.Integer, Column_Description.Value "Y" Value_Type.Integer [Column_Constraint.Default_Expression "42"], Column_Description.Value "Z" Value_Type.Integer] temporary=True . should_succeed initial_data = Table.new [["X", [10, 20]], ["Y", [100, 200]], ["Z", [1000, 2000]]] From 3dfa598820fbe4c87ea9ad0ec58b7b5a6806e1aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Wa=C5=9Bko?= Date: Wed, 14 Jun 2023 19:05:13 +0200 Subject: [PATCH 07/11] rewrite to be cleaner --- .../0.0.0-dev/src/Internal/Upload_Table.enso | 139 +++++++++--------- 1 file changed, 66 insertions(+), 73 deletions(-) diff --git a/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Upload_Table.enso b/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Upload_Table.enso index ac3ab8e604e2..15198c66c109 100644 --- a/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Upload_Table.enso +++ b/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Upload_Table.enso @@ -240,98 +240,91 @@ common_update_table source_table connection table_name update_action key_columns append_to_existing_table source_table target_table update_action key_columns error_on_missing_columns on_problems = In_Transaction.ensure_in_transaction <| effective_key_columns = if key_columns.is_nothing then [] else key_columns check_update_arguments source_table target_table effective_key_columns update_action error_on_missing_columns on_problems <| + helper = Append_Helper.Context source_table target_table effective_key_columns upload_status = case update_action of Update_Action.Insert -> - existing_rows_check = if effective_key_columns.is_empty then True else - joined = source_table.join target_table on=key_columns join_kind=Join_Kind.Inner - count = joined.row_count - if count == 0 then True else - Error.throw (Rows_Already_Present.Error count) - existing_rows_check.if_not_error <| - connection = target_table.connection - insert_statement = connection.dialect.generate_sql <| - Query.Insert_From_Select target_table.name source_table.column_names source_table.to_select_query - Panic.rethrow <| connection.execute_update insert_statement + helper.check_already_existing_rows <| + helper.insert_new_rows Update_Action.Update -> - perform_update source_table target_table key_columns Unmatched_Source_Rows_Behavior.Error Unmatched_Target_Rows_Behavior.Ignore + helper.check_rows_unmatched_in_target <| + helper.check_multiple_target_rows_matched <| + helper.update_common_rows Update_Action.Update_Or_Insert -> - perform_update source_table target_table key_columns Unmatched_Source_Rows_Behavior.Insert Unmatched_Target_Rows_Behavior.Ignore + helper.check_multiple_target_rows_matched <| + helper.update_common_rows + helper.insert_new_rows Update_Action.Align_Records -> - perform_update source_table target_table key_columns Unmatched_Source_Rows_Behavior.Insert Unmatched_Target_Rows_Behavior.Delete + helper.check_multiple_target_rows_matched <| + helper.update_common_rows + helper.insert_new_rows + helper.delete_unmatched_target_rows upload_status.if_not_error target_table ## PRIVATE -type Unmatched_Source_Rows_Behavior +type Append_Helper ## PRIVATE - Insert + Context source_table target_table key_columns ## PRIVATE - Error + connection = self.target_table.connection -## PRIVATE -type Unmatched_Target_Rows_Behavior ## PRIVATE - Ignore + The update only affects matched rows, unmatched rows are ignored. + update_common_rows self = + update_statement = self.connection.dialect.generate_sql <| + Query.Update_From_Table self.target_table.name self.source_table.name self.source_table.column_names self.key_columns + Panic.rethrow <| self.connection.execute_update update_statement ## PRIVATE - Delete - -## PRIVATE - Common logic for `Update`, `Update_Or_Insert` and `Align_Records` actions. -perform_update source_table target_table key_columns unmatched_source_rows_behavior unmatched_target_rows_behavior = Panic.recover [Unmatched_Rows, Multiple_Target_Rows_Matched_For_Update] <| - unmatched_source_rows = source_table.join target_table on=key_columns join_kind=Join_Kind.Left_Exclusive - - # First, check if there are unmatched source rows and report an error if needed. - case unmatched_source_rows_behavior of - Unmatched_Source_Rows_Behavior.Error -> - count = unmatched_source_rows.row_count - if count != 0 then Panic.throw (Unmatched_Rows.Error count) else Nothing - - # The insert will happen _after_ updates. - Unmatched_Source_Rows_Behavior.Insert -> Nothing - - # Check if there are rows in source that match multiple rows in the target. - check_multiple_target_rows_match source_table target_table key_columns - - connection = target_table.connection + Inserts rows from source that were not already present in the target table. + Does not affect matching rows. + insert_new_rows self = + insert_statement = self.connection.dialect.generate_sql <| + Query.Insert_From_Select self.target_table.name self.source_table.column_names self.source_table.to_select_query + Panic.rethrow <| self.connection.execute_update insert_statement - # The update will only affect matched rows and ignore unmatched ones. - update_statement = connection.dialect.generate_sql <| - Query.Update_From_Table target_table.name source_table.name source_table.column_names key_columns - Panic.rethrow <| connection.execute_update update_statement - - case unmatched_target_rows_behavior of - Unmatched_Target_Rows_Behavior.Ignore -> Nothing - Unmatched_Target_Rows_Behavior.Delete -> - Error.throw (Illegal_State.Error "Not implemented yet.") + ## PRIVATE + Deletes rows from target table that were not present in the source. + delete_unmatched_target_rows self = + Error.throw (Illegal_State.Error "Not implemented yet.") - case unmatched_source_rows_behavior of - # This check was handled before. - Unmatched_Source_Rows_Behavior.Error -> Nothing + ## PRIVATE + Checks if any rows from the source table already exist in the target, and + if they do - raises an error. + + Does nothing if `key_columns` is empty, as then there is no notion of + 'matching' rows. + check_already_existing_rows self ~continuation = + if self.key_columns.is_empty then True else + joined = self.source_table.join self.target_table on=self.key_columns join_kind=Join_Kind.Inner + count = joined.row_count + if count == 0 then continuation else + Error.throw (Rows_Already_Present.Error count) - # Having updated the matched rows, I now insert the unmatched ones. - Unmatched_Source_Rows_Behavior.Insert -> - # assert source_table.column_names == unmatched_rows.column_names - insert_statement = connection.dialect.generate_sql <| - Query.Insert_From_Select target_table.name unmatched_source_rows.column_names unmatched_source_rows.to_select_query - Panic.rethrow <| connection.execute_update insert_statement + ## PRIVATE + check_rows_unmatched_in_target self ~continuation = + # assert key_columns.not_empty + unmatched_rows = self.source_table.join self.target_table on=self.key_columns join_kind=Join_Kind.Left_Exclusive + count = unmatched_rows.row_count + if count != 0 then Error.throw (Unmatched_Rows.Error count) else continuation -## PRIVATE -check_multiple_target_rows_match source_table target_table key_columns = - matched_rows = source_table.join target_table on=key_columns join_kind=Join_Kind.Inner - agg = (key_columns.map (Aggregate_Column.Group_By _)) + [Aggregate_Column.Count] - ## This aggregation will only find duplicated in target, not in the source, - because the source is already guaranteed to be unique - that was checked - when uploading the temporary table with the key as its primary key. - duplicated_key_in_target = matched_rows.aggregate agg . filter -1 (Filter_Condition.Greater than=1) - example = duplicated_key_in_target.read max_rows=1 - case example.row_count == 0 of - False -> - row = duplicated_key_in_target.first_row . to_vector - offending_key = row.drop (Index_Sub_Range.Last 1) - count = row.last - Panic.throw (Multiple_Target_Rows_Matched_For_Update.Error offending_key count) - True -> Nothing + ## PRIVATE + Check if there are rows in source that match multiple rows in the target. + check_multiple_target_rows_match self ~continuation = + matched_rows = self.source_table.join self.target_table on=self.key_columns join_kind=Join_Kind.Inner + agg = (self.key_columns.map (Aggregate_Column.Group_By _)) + [Aggregate_Column.Count] + ## This aggregation will only find duplicated in target, not in the source, + because the source is already guaranteed to be unique - that was checked + when uploading the temporary table with the key as its primary key. + duplicated_key_in_target = matched_rows.aggregate agg . filter -1 (Filter_Condition.Greater than=1) + example = duplicated_key_in_target.read max_rows=1 + case example.row_count == 0 of + False -> + row = duplicated_key_in_target.first_row . to_vector + offending_key = row.drop (Index_Sub_Range.Last 1) + count = row.last + Panic.throw (Multiple_Target_Rows_Matched_For_Update.Error offending_key count) + True -> continuation ## PRIVATE This helper ensures that all arguments are valid. From 0e6cb1e41234a0b29277222fe0a6549f8f69d29f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Wa=C5=9Bko?= Date: Wed, 14 Jun 2023 19:49:40 +0200 Subject: [PATCH 08/11] fixes --- .../0.0.0-dev/src/Internal/Upload_Table.enso | 39 +++++++++++-------- .../Table_Tests/src/Database/Upload_Spec.enso | 17 +++++++- 2 files changed, 39 insertions(+), 17 deletions(-) diff --git a/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Upload_Table.enso b/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Upload_Table.enso index 15198c66c109..d1f41e4cb1f2 100644 --- a/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Upload_Table.enso +++ b/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Upload_Table.enso @@ -1,7 +1,6 @@ from Standard.Base import all from Standard.Base.Random import random_uuid import Standard.Base.Errors.Illegal_Argument.Illegal_Argument -import Standard.Base.Errors.Illegal_State.Illegal_State import Standard.Table.Data.Table.Table as In_Memory_Table from Standard.Table import Aggregate_Column, Join_Kind, Value_Type, Column_Selector @@ -244,19 +243,19 @@ append_to_existing_table source_table target_table update_action key_columns err upload_status = case update_action of Update_Action.Insert -> helper.check_already_existing_rows <| - helper.insert_new_rows + helper.insert_rows source_table Update_Action.Update -> helper.check_rows_unmatched_in_target <| - helper.check_multiple_target_rows_matched <| + helper.check_multiple_target_rows_match <| helper.update_common_rows Update_Action.Update_Or_Insert -> - helper.check_multiple_target_rows_matched <| + helper.check_multiple_target_rows_match <| helper.update_common_rows - helper.insert_new_rows + helper.insert_rows helper.new_source_rows Update_Action.Align_Records -> - helper.check_multiple_target_rows_matched <| + helper.check_multiple_target_rows_match <| helper.update_common_rows - helper.insert_new_rows + helper.insert_rows helper.new_source_rows helper.delete_unmatched_target_rows upload_status.if_not_error target_table @@ -266,7 +265,7 @@ type Append_Helper Context source_table target_table key_columns ## PRIVATE - connection = self.target_table.connection + connection self = self.target_table.connection ## PRIVATE The update only affects matched rows, unmatched rows are ignored. @@ -276,17 +275,25 @@ type Append_Helper Panic.rethrow <| self.connection.execute_update update_statement ## PRIVATE - Inserts rows from source that were not already present in the target table. - Does not affect matching rows. - insert_new_rows self = + Inserts all rows from the source. + + Behaviour is ill-defined if any of the rows already exist in the target. + If only new rows are supposed to be inserted, they have to be filtered + before inserting. + insert_rows self table_to_insert = insert_statement = self.connection.dialect.generate_sql <| - Query.Insert_From_Select self.target_table.name self.source_table.column_names self.source_table.to_select_query + Query.Insert_From_Select self.target_table.name table_to_insert.column_names table_to_insert.to_select_query Panic.rethrow <| self.connection.execute_update insert_statement + ## PRIVATE + Finds rows that are present in the source but not in the target. + new_source_rows self = + self.source_table.join self.target_table on=self.key_columns join_kind=Join_Kind.Left_Exclusive + ## PRIVATE Deletes rows from target table that were not present in the source. delete_unmatched_target_rows self = - Error.throw (Illegal_State.Error "Not implemented yet.") + Nothing ## PRIVATE Checks if any rows from the source table already exist in the target, and @@ -295,7 +302,7 @@ type Append_Helper Does nothing if `key_columns` is empty, as then there is no notion of 'matching' rows. check_already_existing_rows self ~continuation = - if self.key_columns.is_empty then True else + if self.key_columns.is_empty then continuation else joined = self.source_table.join self.target_table on=self.key_columns join_kind=Join_Kind.Inner count = joined.row_count if count == 0 then continuation else @@ -304,7 +311,7 @@ type Append_Helper ## PRIVATE check_rows_unmatched_in_target self ~continuation = # assert key_columns.not_empty - unmatched_rows = self.source_table.join self.target_table on=self.key_columns join_kind=Join_Kind.Left_Exclusive + unmatched_rows = self.new_source_rows count = unmatched_rows.row_count if count != 0 then Error.throw (Unmatched_Rows.Error count) else continuation @@ -323,7 +330,7 @@ type Append_Helper row = duplicated_key_in_target.first_row . to_vector offending_key = row.drop (Index_Sub_Range.Last 1) count = row.last - Panic.throw (Multiple_Target_Rows_Matched_For_Update.Error offending_key count) + Error.throw (Multiple_Target_Rows_Matched_For_Update.Error offending_key count) True -> continuation ## PRIVATE diff --git a/test/Table_Tests/src/Database/Upload_Spec.enso b/test/Table_Tests/src/Database/Upload_Spec.enso index d4abae10f1f2..bae4c395e4d8 100644 --- a/test/Table_Tests/src/Database/Upload_Spec.enso +++ b/test/Table_Tests/src/Database/Upload_Spec.enso @@ -588,7 +588,7 @@ spec make_new_connection prefix persistent_connector=True = r1.catch.to_display_text . should_contain "columns were not present" r1.catch.to_display_text . should_contain "Y" - Test.specify "by default should keep columns missing from source as-is and use defaults when inserting" <| + Test.specify "should use defaults when inserting" <| dest_name = Name_Generator.random_name "table-defaults" connection.create_table dest_name [Column_Description.Value "Y" Value_Type.Integer [Column_Constraint.Default_Expression "42"], Column_Description.Value "X" Value_Type.Integer] temporary=True primary_key=[] . should_succeed src = source_table_builder [["X", [1, 2, 3]]] @@ -600,6 +600,21 @@ spec make_new_connection prefix persistent_connector=True = rows1.should_contain_the_same_elements_as expected_rows connection.drop_table dest_name + Test.specify "should use defaults when inserting new values in upsert, but retain existing values" <| + dest_name = Name_Generator.random_name "table-defaults" + connection.create_table dest_name [Column_Description.Value "Y" Value_Type.Integer [Column_Constraint.Default_Expression "42"], Column_Description.Value "X" Value_Type.Integer, Column_Description.Value "Z" Value_Type.Integer] temporary=True primary_key=[] . should_succeed + Problems.assume_no_problems <| + (Table.from_rows ["X", "Y", "Z"] [[1, 1000, 10]]).update_database_table connection dest_name key_columns=[] update_action=Update_Action.Insert + + src = source_table_builder [["X", [1, 2, 3]], ["Z", [100, 200, 300]]] + r1 = src.update_database_table connection dest_name key_columns=["X"] update_action=Update_Action.Update_Or_Insert + Problems.assume_no_problems r1 + r1.column_names . should_equal ["Y", "X", "Z"] + expected_rows = [[1000, 1, 100], [42, 2, 200], [42, 3, 300]] + rows1 = r1.rows.to_vector.map .to_vector + rows1.should_contain_the_same_elements_as expected_rows + connection.drop_table dest_name + Test.specify "should use defaults for missing input columns for newly inserted rows when Aligning the tables, but keep existing values for existing rows" <| dest_name = Name_Generator.random_name "table-defaults-align" connection.create_table dest_name [Column_Description.Value "X" Value_Type.Integer, Column_Description.Value "Y" Value_Type.Integer [Column_Constraint.Default_Expression "42"], Column_Description.Value "Z" Value_Type.Integer] temporary=True . should_succeed From 4d036873a9c9b07dff65cca99aa32d35c2a7aadf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Wa=C5=9Bko?= Date: Wed, 14 Jun 2023 20:25:43 +0200 Subject: [PATCH 09/11] finish Align --- .../Database/0.0.0-dev/src/Data/SQL.enso | 2 +- .../0.0.0-dev/src/Internal/Base_Generator.enso | 7 +++++++ .../0.0.0-dev/src/Internal/IR/Query.enso | 16 ++++++++++++++++ .../0.0.0-dev/src/Internal/Upload_Table.enso | 4 +++- 4 files changed, 27 insertions(+), 2 deletions(-) diff --git a/distribution/lib/Standard/Database/0.0.0-dev/src/Data/SQL.enso b/distribution/lib/Standard/Database/0.0.0-dev/src/Data/SQL.enso index c5429fc6d7c0..f49d8fd89984 100644 --- a/distribution/lib/Standard/Database/0.0.0-dev/src/Data/SQL.enso +++ b/distribution/lib/Standard/Database/0.0.0-dev/src/Data/SQL.enso @@ -72,7 +72,7 @@ type Builder join separator statements = sep = case separator of Builder.Value _ -> separator - _ -> Builder.code separator + _ : Text -> Builder.code separator if statements.length == 0 then Builder.empty else (1.up_to statements.length . fold (statements.at 0) acc-> i-> acc ++ sep ++ statements.at i) diff --git a/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Base_Generator.enso b/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Base_Generator.enso index 2217b497d9b8..250625b8856c 100644 --- a/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Base_Generator.enso +++ b/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Base_Generator.enso @@ -456,6 +456,13 @@ generate_query dialect query = case query of id = dialect.wrap_identifier column_name target ++ "." ++ id ++ "=" ++ source ++ "." ++ id Builder.code "UPDATE " ++ target ++ " SET " ++ set_part ++ " FROM " ++ source ++ " WHERE " ++ key_part + Query.Delete_Unmatched_Rows target_table_name source_table_name key_columns -> + target = dialect.wrap_identifier target_table_name + source = dialect.wrap_identifier source_table_name + if key_columns.is_empty then + Panic.throw (Illegal_State.Error "Invalid IR: `Query.Delete_Unmatched_Rows` must have at least one key column. This is a bug in the Database library.") + key_part = Builder.join ", " key_columns + Builder.code "DELETE FROM " ++ target ++ " WHERE (" ++ key_part ++ ") NOT IN (SELECT " ++ key_part ++ " FROM " ++ source ++ ")" _ -> Error.throw <| Unsupported_Database_Operation.Error "Unsupported query type: "+query.to_text ## PRIVATE diff --git a/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/IR/Query.enso b/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/IR/Query.enso index de8ffe67e4c3..75ebdf9cd22e 100644 --- a/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/IR/Query.enso +++ b/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/IR/Query.enso @@ -64,4 +64,20 @@ type Query Insert_From_Select (table_name:Text) (column_names : Vector Text) (select:Query) ## PRIVATE + An SQL UPDATE query that updates rows in the target table with values + from the source table when the key columns match. Target table rows that + do not match the source are left unaffected. + + This will usually be a query of the form + `UPDATE target SET c1=source.c1, ... FROM source WHERE target.key1=source.key1 AND ...` + where `c1` comes from `column_names` and `key1` comes from `key_columns`. Update_From_Table (target_table_name:Text) (source_table_name:Text) (column_names : Vector Text) (key_columns : Vector Text) + + ## PRIVATE + An SQL DELETE query that deletes from target table rows that are _not_ + present in the source table, based on the specified key columns. + They key columns must be present under the same name in both tables. + + This will usually be a query of the form + `DELETE FROM target WHERE (key_columns...) NOT IN (SELECT key_columns... FROM source)`. + Delete_Unmatched_Rows (target_table_name:Text) (source_table_name:Text) (key_columns : Vector Text) diff --git a/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Upload_Table.enso b/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Upload_Table.enso index d1f41e4cb1f2..a13da970ad16 100644 --- a/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Upload_Table.enso +++ b/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Upload_Table.enso @@ -293,7 +293,9 @@ type Append_Helper ## PRIVATE Deletes rows from target table that were not present in the source. delete_unmatched_target_rows self = - Nothing + delete_statement = self.connection.dialect.generate_sql <| + Query.Delete_Unmatched_Rows self.target_table.name self.source_table.name self.key_columns + Panic.rethrow <| self.connection.execute_update delete_statement ## PRIVATE Checks if any rows from the source table already exist in the target, and From 2f88a8d923253344ab84b01e92eb415adbfa2cf6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Wa=C5=9Bko?= Date: Wed, 14 Jun 2023 20:35:01 +0200 Subject: [PATCH 10/11] fixes --- .../Database/0.0.0-dev/src/Internal/Base_Generator.enso | 2 +- test/Table_Tests/src/Database/Upload_Spec.enso | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Base_Generator.enso b/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Base_Generator.enso index 250625b8856c..bb09881e6d74 100644 --- a/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Base_Generator.enso +++ b/distribution/lib/Standard/Database/0.0.0-dev/src/Internal/Base_Generator.enso @@ -461,7 +461,7 @@ generate_query dialect query = case query of source = dialect.wrap_identifier source_table_name if key_columns.is_empty then Panic.throw (Illegal_State.Error "Invalid IR: `Query.Delete_Unmatched_Rows` must have at least one key column. This is a bug in the Database library.") - key_part = Builder.join ", " key_columns + key_part = Builder.join ", " (key_columns.map dialect.wrap_identifier) Builder.code "DELETE FROM " ++ target ++ " WHERE (" ++ key_part ++ ") NOT IN (SELECT " ++ key_part ++ " FROM " ++ source ++ ")" _ -> Error.throw <| Unsupported_Database_Operation.Error "Unsupported query type: "+query.to_text diff --git a/test/Table_Tests/src/Database/Upload_Spec.enso b/test/Table_Tests/src/Database/Upload_Spec.enso index bae4c395e4d8..c651bbbb5a17 100644 --- a/test/Table_Tests/src/Database/Upload_Spec.enso +++ b/test/Table_Tests/src/Database/Upload_Spec.enso @@ -517,8 +517,13 @@ spec make_new_connection prefix persistent_connector=True = src = source_table_builder [["X", [1, 2, 3]]] result = src.update_database_table connection dest.name update_action=Update_Action.Insert key_columns=[] + expected = [1, 10, 100, 1, 2, 3] result.column_names . should_equal ["X"] - result.at "X" . to_vector . should_contain_the_same_elements_as [1, 10, 100, 1, 2, 3] + result.at "X" . to_vector . should_contain_the_same_elements_as expected + + r2 = src.update_database_table connection dest.name update_action=Update_Action.Insert key_columns=Nothing + r2.column_names . should_equal ["X"] + r2.at "X" . to_vector . should_contain_the_same_elements_as expected Test.specify "should fail if no key is specified in other modes" <| dest = target_table_builder [["X", [1, 10, 100]]] @@ -529,7 +534,7 @@ spec make_new_connection prefix persistent_connector=True = if sqlite_temporary_primary_key_pending.is_nothing then # The default will also fail because no primary key is detected in the DB. - default_key_columns connection dest.name . should_equal [] + default_key_columns connection dest.name . should_equal Nothing r2 = src.update_database_table connection dest.name update_action=Update_Action.Update r2.should_fail_with Illegal_Argument From e130f028092c8c735eb036ea6f7afd85745149cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Wa=C5=9Bko?= Date: Wed, 14 Jun 2023 20:39:03 +0200 Subject: [PATCH 11/11] changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d0ff80e2f9a..eca4da324ead 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -485,6 +485,7 @@ - [Speed improvements to `Column` `.truncate`, `.ceil`, and `.floor`.][6941] - [Implemented addition and subtraction for `Date_Period` and `Time_Period`.][6956] +- [Implemented `Table.update_database_table`.][7035] [debug-shortcuts]: https://github.com/enso-org/enso/blob/develop/app/gui/docs/product/shortcuts.md#debug @@ -704,6 +705,7 @@ [6925]: https://github.com/enso-org/enso/pull/6925 [6941]: https://github.com/enso-org/enso/pull/6941 [6956]: https://github.com/enso-org/enso/pull/6956 +[7035]: https://github.com/enso-org/enso/pull/7035 #### Enso Compiler