Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FTE query failure due to "Dynamic filters present in join were not fully consumed by it's probe side" #18383

Closed
raunaqmorarka opened this issue Jul 24, 2023 · 0 comments · Fixed by #18385
Assignees
Labels
bug Something isn't working

Comments

@raunaqmorarka
Copy link
Member

Repro steps

CREATE TABLE item (item_id varchar);

SET SESSION retry_policy='TASK';

EXPLAIN
WITH
t1 AS (
    SELECT
        NULL AS address_id
    FROM item i1
    INNER JOIN
        item i2
        ON i1.item_id = i2.item_id
),
t2 AS (
    SELECT item_id AS address_id
    FROM item
    UNION
    SELECT *
    FROM t1
)
SELECT
    *
    FROM t2
    INNER JOIN item ON item.item_id = t2.address_id;

Error stack trace

Current plan:
                Output[columnNames = [address_id, item_id]]
                │   Layout: [address_id:varchar, address_id:varchar]
                │   item_id := address_id
                └─ InnerJoin[criteria = ("address_id" = "item_id_9"), hash = [$hashvalue, $hashvalue_22], distribution = PARTITIONED]
                   │   Layout: [address_id:varchar]
                   │   Distribution: PARTITIONED
                   ├─ Aggregate[type = FINAL, keys = [address_id], hash = [$hashvalue]]
                   │  │   Layout: [address_id:varchar, $hashvalue:bigint]
                   │  └─ LocalExchange[partitioning = HASH, hashColumn = [$hashvalue], arguments = ["address_id"]]
                   │     │   Layout: [address_id:varchar, $hashvalue:bigint]
                   │     ├─ Project[]
                   │     │  │   Layout: [address_id:varchar, $hashvalue_13:bigint]
                   │     │  │   address_id := "item_id"
                   │     │  └─ RemoteExchange[partitionCount = 4, type = REPARTITION, hashColumn = [$hashvalue_13]]
                   │     │     │   Layout: [item_id:varchar, $hashvalue_13:bigint]
                   │     │     └─ Aggregate[type = PARTIAL, keys = [item_id], hash = [$hashvalue_14]]
                   │     │        │   Layout: [item_id:varchar, $hashvalue_14:bigint]
                   │     │        └─ ScanFilterProject[table = delta:default.item, dynamicFilters = {"item_id" = #df_693 await}]
                   │     │               Layout: [item_id:varchar, $hashvalue_14:bigint]
                   │     │               $hashvalue_14 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("item_id"), 0))
                   │     │               item_id := item_id:varchar:REGULAR
                   │     └─ Project[]
                   │        │   Layout: [address_id:varchar, $hashvalue_15:bigint]
                   │        │   address_id := "expr_8"
                   │        └─ RemoteExchange[partitionCount = 4, type = REPARTITION, hashColumn = [$hashvalue_15]]
                   │           │   Layout: [expr_8:varchar, $hashvalue_15:bigint]
                   │           └─ Aggregate[type = PARTIAL, keys = [expr_8], hash = [$hashvalue_21]]
                   │              │   Layout: [expr_8:varchar, $hashvalue_21:bigint]
                   │              └─ Project[]
                   │                 │   Layout: [expr_8:varchar, $hashvalue_21:bigint]
                   │                 │   $hashvalue_21 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("expr_8"), 0))
                   │                 └─ Project[]
                   │                    │   Layout: [expr_8:varchar]
                   │                    │   expr_8 := CAST(null AS varchar)
                   │                    └─ InnerJoin[criteria = ("item_id_0" = "item_id_4"), hash = [$hashvalue_16, $hashvalue_18], distribution = PARTITIONED]
                   │                       │   Layout: []
                   │                       │   Distribution: PARTITIONED
                   │                       │   maySkipOutputDuplicates = true
                   │                       ├─ RemoteExchange[partitionCount = 4, type = REPARTITION, hashColumn = [$hashvalue_16]]
                   │                       │  │   Layout: [item_id_0:varchar, $hashvalue_16:bigint]
                   │                       │  └─ Project[]
                   │                       │     │   Layout: [item_id_0:varchar, $hashvalue_17:bigint]
                   │                       │     │   $hashvalue_17 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("item_id_0"), 0))
                   │                       │     └─ Values[]
                   │                       │            Layout: [item_id_0:varchar]
                   │                       └─ LocalExchange[partitioning = SINGLE]
                   │                          │   Layout: [item_id_4:varchar, $hashvalue_18:bigint]
                   │                          └─ RemoteExchange[partitionCount = 4, type = REPARTITION, hashColumn = [$hashvalue_19]]
                   │                             │   Layout: [item_id_4:varchar, $hashvalue_19:bigint]
                   │                             └─ DynamicFilterSource[dynamicFilterAssignments = {item_id_4 -> #df_695}]
                   │                                │   Layout: [item_id_4:varchar, $hashvalue_20:bigint]
                   │                                └─ Project[]
                   │                                   │   Layout: [item_id_4:varchar, $hashvalue_20:bigint]
                   │                                   │   $hashvalue_20 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("item_id_4"), 0))
                   │                                   └─ Values[]
                   │                                          Layout: [item_id_4:varchar]
                   └─ LocalExchange[partitioning = SINGLE]
                      │   Layout: [item_id_9:varchar, $hashvalue_22:bigint]
                      └─ RemoteExchange[partitionCount = 4, type = REPARTITION, hashColumn = [$hashvalue_23]]
                         │   Layout: [item_id_9:varchar, $hashvalue_23:bigint]
                         └─ DynamicFilterSource[dynamicFilterAssignments = {item_id_9 -> #df_693}]
                            │   Layout: [item_id_9:varchar, $hashvalue_24:bigint]
                            └─ ScanProject[table = delta:default.item]
                                   Layout: [item_id_9:varchar, $hashvalue_24:bigint]
                                   $hashvalue_24 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("item_id_9"), 0))
                                   item_id_9 := item_id:varchar:REGULAR

Using set session allow_pushdown_into_connectors=true; shows that there is a ScanFilterProject[table = delta:default.item, filterPredicate = CAST(null AS boolean), dynamicFilters = {"item_id_0" = #df_695}] which is turned into an empty ValuesNode by a PushPredicateIntoTableScan run after RemoveUnsupportedDynamicFilters has already run.

Trino version: dev
 Fragment 0 [HASH]
     Output layout: [address_id, address_id]
     Output partitioning: SINGLE []
     Partition count: 4
     Output[columnNames = [address_id, item_id]]
     │   Layout: [address_id:varchar, address_id:varchar]
     │   Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B}
     │   item_id := address_id
     └─ InnerJoin[criteria = ("address_id" = "item_id_9"), hash = [$hashvalue, $hashvalue_22], distribution = PARTITIONED]
        │   Layout: [address_id:varchar]
        │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B}
        │   Distribution: PARTITIONED
        ├─ Aggregate[type = FINAL, keys = [address_id], hash = [$hashvalue]]
        │  │   Layout: [address_id:varchar, $hashvalue:bigint]
        │  │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B}
        │  └─ LocalExchange[partitioning = HASH, hashColumn = [$hashvalue], arguments = ["address_id"]]
        │     │   Layout: [address_id:varchar, $hashvalue:bigint]
        │     │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
        │     ├─ Project[]
        │     │  │   Layout: [address_id:varchar, $hashvalue_13:bigint]
        │     │  │   Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}
        │     │  │   address_id := "item_id"
        │     │  └─ RemoteSource[sourceFragmentIds = [1]]
        │     │         Layout: [item_id:varchar, $hashvalue_13:bigint]
        │     └─ Project[]
        │        │   Layout: [address_id:varchar, $hashvalue_15:bigint]
        │        │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
        │        │   address_id := "expr_8"
        │        └─ RemoteSource[sourceFragmentIds = [2]]
        │               Layout: [expr_8:varchar, $hashvalue_15:bigint]
        └─ LocalExchange[partitioning = SINGLE]
           │   Layout: [item_id_9:varchar, $hashvalue_22:bigint]
           │   Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B}
           └─ RemoteSource[sourceFragmentIds = [5]]
                  Layout: [item_id_9:varchar, $hashvalue_23:bigint]

 Fragment 1 [SOURCE]
     Output layout: [item_id, $hashvalue_14]
     Output partitioning: HASH [item_id][$hashvalue_14]
     Aggregate[type = PARTIAL, keys = [item_id], hash = [$hashvalue_14]]
     │   Layout: [item_id:varchar, $hashvalue_14:bigint]
     │   Estimates: {rows: 0 (0B), cpu: ?, memory: ?, network: ?}
     └─ ScanFilterProject[table = delta:default.item, dynamicFilters = {"item_id" = #df_693 await}]
            Layout: [item_id:varchar, $hashvalue_14:bigint]
            Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}/{rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}/{rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}
            $hashvalue_14 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("item_id"), 0))
            item_id := item_id:varchar:REGULAR

 Fragment 2 [HASH]
     Output layout: [expr_8, $hashvalue_21]
     Output partitioning: HASH [expr_8][$hashvalue_21]
     Partition count: 4
     Aggregate[type = PARTIAL, keys = [expr_8], hash = [$hashvalue_21]]
     │   Layout: [expr_8:varchar, $hashvalue_21:bigint]
     └─ Project[]
        │   Layout: [expr_8:varchar, $hashvalue_21:bigint]
     Aggregate[type = PARTIAL, keys = [expr_8], hash = [$hashvalue_21]]
     │   Layout: [expr_8:varchar, $hashvalue_21:bigint]
     └─ Project[]
        │   Layout: [expr_8:varchar, $hashvalue_21:bigint]
        │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
        │   $hashvalue_21 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("expr_8"), 0))
        └─ Project[]
           │   Layout: [expr_8:varchar]
           │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
           │   expr_8 := CAST(null AS varchar)
           └─ InnerJoin[criteria = ("item_id_0" = "item_id_4"), hash = [$hashvalue_16, $hashvalue_18], distribution = PARTITIONED]
              │   Layout: []
              │   Estimates: {rows: ? (0B), cpu: ?, memory: ?, network: 0B}
              │   Distribution: PARTITIONED
              │   maySkipOutputDuplicates = true
              ├─ RemoteSource[sourceFragmentIds = [3]]
              │      Layout: [item_id_0:varchar, $hashvalue_16:bigint]
              └─ LocalExchange[partitioning = SINGLE]
                 │   Layout: [item_id_4:varchar, $hashvalue_18:bigint]
                 │   Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B}
                 └─ RemoteSource[sourceFragmentIds = [4]]
                        Layout: [item_id_4:varchar, $hashvalue_19:bigint]

 Fragment 3 [SOURCE]
     Output layout: [item_id_0, $hashvalue_17]
     Output partitioning: HASH [item_id_0][$hashvalue_17]
     ScanFilterProject[table = delta:default.item, filterPredicate = CAST(null AS boolean), dynamicFilters = {"item_id_0" = #df_695}]
         Layout: [item_id_0:varchar, $hashvalue_17:bigint]
         Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}/{rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}/{rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}
         $hashvalue_17 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("item_id_0"), 0))
         item_id_0 := item_id:varchar:REGULAR

 Fragment 4 [SINGLE]
     Output layout: [item_id_4, $hashvalue_20]
     Output partitioning: HASH [item_id_4][$hashvalue_20]
     DynamicFilterSource[dynamicFilterAssignments = {item_id_4 -> #df_695}]
     │   Layout: [item_id_4:varchar, $hashvalue_20:bigint]
     └─ Project[]
        │   Layout: [item_id_4:varchar, $hashvalue_20:bigint]
        │   Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}
        │   $hashvalue_20 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("item_id_4"), 0))
        └─ Values[]
               Layout: [item_id_4:varchar]
               Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}

 Fragment 5 [SOURCE]
     Output layout: [item_id_9, $hashvalue_24]
     Output partitioning: HASH [item_id_9][$hashvalue_24]
     DynamicFilterSource[dynamicFilterAssignments = {item_id_9 -> #df_693}]
     │   Layout: [item_id_9:varchar, $hashvalue_24:bigint]
     └─ ScanProject[table = delta:default.item]
            Layout: [item_id_9:varchar, $hashvalue_24:bigint]
            Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}/{rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}
            $hashvalue_24 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("item_id_9"), 0))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Development

Successfully merging a pull request may close this issue.

1 participant