Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When_matched_update causes records to be lost with explicit predicate #2158

Closed
sebdiem opened this issue Feb 1, 2024 · 10 comments · Fixed by #2326
Closed

When_matched_update causes records to be lost with explicit predicate #2158

sebdiem opened this issue Feb 1, 2024 · 10 comments · Fixed by #2326
Labels
binding/python Issues for the Python package bug Something isn't working

Comments

@sebdiem
Copy link
Contributor

sebdiem commented Feb 1, 2024

Environment

Delta-rs version: 0.15.0

Binding: python

Environment:

  • Cloud provider:
  • OS: OSX, linux
  • Other:

Bug

What happened:

I have a table with several columns, among them instance_id and cost.
I want to update rows for which cost is null based on data contained in a pandas dataframe.
For this purpose I use a merge operation like this:

    delta_table.merge(
        pa.Table.from_pandas(costs),
        predicate="t.instance_id = s.instance_id and t.cost is null",
        source_alias="s",
        target_alias="t",
    ).when_matched_update({"cost": "s.cost"}).execute()

In 0.14.0 it worked perfectly fine and rows for which cost was already set stayed untouched while other were updated.
In 0.15.0 however the behavior is different: the rows with cost set were deleted, the other were updated.

What you expected to happen:
Same behavior as in 0.14.0

How to reproduce it:
Here is a minimal python script with the following dependencies:

pandas==1.4.4
pyarrow==15.0.0
deltalake==0.15.0
import os
import shutil

import pandas as pd
import pyarrow as pa
from deltalake import DeltaTable, write_deltalake
from deltalake.schema import (
    Field,
    PrimitiveType,
    Schema,
)

# first create a table with the given schema
path = "/tmp/delta-rs-bug-3"
shutil.rmtree(path)
os.makedirs(path)
schema = Schema(
    [
        Field("instance_id", PrimitiveType("string"), nullable=False),
        Field("month", PrimitiveType("string"), nullable=False),
        Field("cost", PrimitiveType("float"), nullable=True),
    ]
)
write_deltalake(
    path,
    data=[],
    schema=schema,
    name="test_table",
    partition_by=["month"],
    overwrite_schema=False,
)
table = DeltaTable(path)

# then fill this table with some entries
initial_df = pd.DataFrame(
    [
        {"instance_id": "1", "month": "2024-02", "cost": 32.0},
        {"instance_id": "2", "month": "2024-02", "cost": None},
    ]
)
table.merge(
    pa.Table.from_pandas(initial_df),
    predicate="t.instance_id = s.instance_id",
    source_alias="s",
    target_alias="t",
).when_not_matched_insert_all().execute()

# now try to merge new data into this table: boum
new_df = pd.DataFrame(
    [
        {"instance_id": "1", "month": "2024-02", "cost": 38.0},
        {"instance_id": "2", "month": "2024-02", "cost": 38.0},
    ]
)
table.merge(
    pa.Table.from_pandas(new_df),
    predicate="t.instance_id = s.instance_id and t.cost is null",
    source_alias="s",
    target_alias="t",
).when_matched_update({"cost": "s.cost"}).execute()
print(table.to_pandas())
# delta lake 0.14.0
# >>> print(table.to_pandas())
#   instance_id    month  cost
# 0           2  2024-02  38.0
# 1           1  2024-02  32.0
# ===================================
# delta lake 0.15.0
# >>> print(table.to_pandas())
#   instance_id    month  cost
# 0           2  2024-02  38.0

More details:

@sebdiem sebdiem added the bug Something isn't working label Feb 1, 2024
@Blajda
Copy link
Collaborator

Blajda commented Feb 2, 2024

Hi @sebdiem
This regression was fixed in this pr: #2149

@Blajda Blajda closed this as completed Feb 2, 2024
@sebdiem
Copy link
Contributor Author

sebdiem commented Feb 2, 2024

thanks a lot!

@sebdiem
Copy link
Contributor Author

sebdiem commented Feb 6, 2024

hmm actually I've just tested the newly released 0.15.2 and the bug described here is still present IMO.

@ion-elgreco
Copy link
Collaborator

ion-elgreco commented Feb 6, 2024

Yup, can confirm.

Edit:
Actually I am not sure if the current behaviour is incorrect because it only matches on 1 row from the target, and you passed only when_matched_update().

But passing the t.cost is null predicate inside the when_matched_update gives both rows where the null got updated now.

table.merge(
    pa.Table.from_pandas(new_df),
    predicate="t.instance_id = s.instance_id",
    source_alias="s",
    target_alias="t",
).when_matched_update({"cost": "s.cost"}, predicate = "t.cost is null").execute()

@ion-elgreco ion-elgreco reopened this Feb 6, 2024
@ion-elgreco
Copy link
Collaborator

@sebdiem do you have some time to cross check the behavior with spark delta?

@sebdiem
Copy link
Contributor Author

sebdiem commented Feb 6, 2024

@sebdiem do you have some time to cross check the behavior with spark delta?

yes will try to do that tomorrow

@rtyler rtyler added the binding/python Issues for the Python package label Feb 6, 2024
@Blajda
Copy link
Collaborator

Blajda commented Feb 6, 2024

The correct behavior is from 0.14. Staring At this I think the root cause is that predicate pushdown was enabled which pushes a filter of t.cost is null into the scan. This will be a simple fix.

@Blajda Blajda assigned Blajda and unassigned Blajda Feb 6, 2024
@Blajda
Copy link
Collaborator

Blajda commented Feb 7, 2024

Oddly I can't reproduce the issue when using rust

    async fn test_merge_pushdowns() {
        //See #2158
        let schema = vec![
            StructField::new(
                "id".to_string(),
                DataType::Primitive(PrimitiveType::String),
                true,
            ),
            StructField::new(
                "cost".to_string(),
                DataType::Primitive(PrimitiveType::Float),
                true,
            ),
            StructField::new(
                "month".to_string(),
                DataType::Primitive(PrimitiveType::String),
                true,
            ),
        ];

        let arrow_schema = Arc::new(ArrowSchema::new(vec![
            Field::new("id", ArrowDataType::Utf8, true),
            Field::new("cost", ArrowDataType::Float32, true),
            Field::new("month", ArrowDataType::Utf8, true),
        ]));

        let table = DeltaOps::new_in_memory()
            .create()
            .with_columns(schema)
            .await
            .unwrap();

        let ctx = SessionContext::new();
        let batch = RecordBatch::try_new(
            Arc::clone(&arrow_schema.clone()),
            vec![
                Arc::new(arrow::array::StringArray::from(vec!["A", "B"])),
                Arc::new(arrow::array::Float32Array::from(vec![Some(10.15), None])),
                Arc::new(arrow::array::StringArray::from(vec![
                    "2023-07-04",
                    "2023-07-04",
                ])),
            ],
        )
        .unwrap();

        let table = DeltaOps(table)
            .write(vec![batch.clone()])
            .with_save_mode(SaveMode::Append)
            .await
            .unwrap();
        assert_eq!(table.version(), 1);
        assert_eq!(table.get_files_count(), 1);

        let batch = RecordBatch::try_new(
            Arc::clone(&arrow_schema.clone()),
            vec![
                Arc::new(arrow::array::StringArray::from(vec!["A", "B"])),
                Arc::new(arrow::array::Float32Array::from(vec![Some(12.15), Some(11.15)])),
                Arc::new(arrow::array::StringArray::from(vec![
                    "2023-07-04",
                    "2023-07-04",
                ])),
            ],
        ).unwrap();
        let source = ctx.read_batch(batch).unwrap();

        let (table, _metrics) = DeltaOps(table)
            .merge(source, "target.id = source.id and target.cost is null")
            .with_source_alias("source")
            .with_target_alias("target")
            .when_matched_update(|insert| {
                insert
                    .update("id", "target.id")
                    .update("cost", "source.cost")
                    .update("month", "target.month")
            })
            .unwrap()
            .await
            .unwrap();

        let expected = vec![
    "+----+-------+------------+",
    "| id | cost  | month      |",
    "+----+-------+------------+",
    "| A  | 10.15 | 2023-07-04 |",
    "| B  | 11.15 | 2023-07-04 |",
    "+----+-------+------------+",
        ];
        let actual = get_data(&table).await;
        assert_batches_sorted_eq!(&expected, &actual);
    }

@ion-elgreco
Copy link
Collaborator

ion-elgreco commented Feb 24, 2024

@Blajda weirdly enough using this predicate works: "t.instance_id = s.instance_id, t.cost is null" and gives the old correct behavior. Not sure if that helps narrowing it down

@ion-elgreco
Copy link
Collaborator

@Blajda I ran your rust test but it doesn't pass on my side:


running 1 test
thread 'operations::merge::tests::test_merge_pushdowns' panicked at crates/core/src/operations/merge/mod.rs:2966:9:
assertion `left == right` failed: 

expected:

[
    "+----+-------+------------+",
    "| id | cost  | month      |",
    "+----+-------+------------+",
    "| A  | 10.15 | 2023-07-04 |",
    "| B  | 11.15 | 2023-07-04 |",
    "+----+-------+------------+",
]
actual:

[
    "+----+-------+------------+",
    "| id | cost  | month      |",
    "+----+-------+------------+",
    "| B  | 11.15 | 2023-07-04 |",
    "+----+-------+------------+",
]

@ion-elgreco ion-elgreco changed the title Possible regression in merge .when_matched_update after 0.15.0 When_matched_update causes records to be lost with explicit predicate Mar 23, 2024
Blajda added a commit to Blajda/delta-rs that referenced this issue Mar 23, 2024
Blajda added a commit to Blajda/delta-rs that referenced this issue Mar 23, 2024
Blajda added a commit that referenced this issue Mar 23, 2024
reverts #2291

Introduces a regression to merge that occurred before
#2158

Causes the added test case to fail
ion-elgreco added a commit to ion-elgreco/delta-rs that referenced this issue Mar 23, 2024
Blajda added a commit that referenced this issue Mar 23, 2024
# Description
Fix broken test case with partitions

- fixes #2158

---------

Co-authored-by: ion-elgreco <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/python Issues for the Python package bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants