Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler silently replaces ParquetExec with EmptyExec if data path is not correctly mounted in container #353

Closed
Tracked by #273
andygrove opened this issue Oct 15, 2022 · 10 comments · Fixed by #414
Assignees
Labels
bug Something isn't working

Comments

@andygrove
Copy link
Member

Describe the bug
When I run Ballista in docker-compose and then run the benchmarks, all benchmark queries run very fast and return result sets with zero rows and zero columns and I see that the executed plans contain EmptyExec instead of ParquetExec.

To Reproduce

cargo build --release
docker-compose up --build

Then run benchmarks using instructions in repo.

Expected behavior
Should not be replacing ParquetExec with EmptyExec

Additional context
None

@andygrove andygrove added the bug Something isn't working label Oct 15, 2022
@andygrove andygrove self-assigned this Oct 15, 2022
@andygrove
Copy link
Member Author

My local Docker images do not seem to have been rebuilt:

$ docker images | grep "^ballista"
ballista-benchmarks                                   latest                 cb19c8c820b4   12 minutes ago   220MB
ballista-executor                                     latest                 3270ba3c5215   42 minutes ago   160MB
ballista-scheduler                                    latest                 63cf61dfc749   27 hours ago     231MB
ballista-builder                                      latest                 b062501e26f6   3 weeks ago      1.56GB

@andygrove
Copy link
Member Author

I deleted all images, rebuilt, and same issue. When running in docker-compose, scheduler shows:

ShuffleWriterExec: Some(Hash([Column { name: "l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 2))
  AggregateExec: mode=Partial, gby=[l_returnflag@5 as l_returnflag, l_linestatus@6 as l_linestatus], aggr=[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(UInt8(1))]
    ProjectionExec: expr=[l_extendedprice@1 * 1 - l_discount@2 as lineitem.l_extendedprice * Float64(1) - lineitem.l_discountFloat64(1) - lineitem.l_discountlineitem.l_discountFloat64(1)lineitem.l_extendedprice, l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5 as l_linestatus]
      CoalesceBatchesExec: target_batch_size=4096
        FilterExec: l_shipdate@6 <= 10471
          EmptyExec: produce_one_row=false

When running scheduler outside of docker-compose, I see:

=========ResolvedStage[stage_id=1.0, partitions=1]=========
ShuffleWriterExec: Some(Hash([Column { name: "l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 2))
  AggregateExec: mode=Partial, gby=[l_returnflag@5 as l_returnflag, l_linestatus@6 as l_linestatus], aggr=[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(UInt8(1))]
    ProjectionExec: expr=[l_extendedprice@1 * 1 - l_discount@2 as lineitem.l_extendedprice * Float64(1) - lineitem.l_discountFloat64(1) - lineitem.l_discountlineitem.l_discountFloat64(1)lineitem.l_extendedprice, l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5 as l_linestatus]
      CoalesceBatchesExec: target_batch_size=4096
        FilterExec: l_shipdate@6 <= 10471
          ParquetExec: limit=None, partitions=[mnt/bigdata/tpch/sf1-parquet/lineitem/part-0.parquet], predicate=l_shipdate_min@0 <= 10471, projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate]

@andygrove
Copy link
Member Author

andygrove commented Oct 15, 2022

I added debug logging in scheduler/src/state/mod.rs

Scheduler has this optimized logical plan:

Calculated optimized plan: Sort: lineitem.l_returnflag ASC NULLS LAST, lineitem.l_linestatus ASC NULLS LAST
ballista-scheduler_1  |   Projection: lineitem.l_returnflag, lineitem.l_linestatus, SUM(lineitem.l_quantity) AS sum_qty, SUM(lineitem.l_extendedprice) AS sum_base_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS sum_disc_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax) AS sum_charge, AVG(lineitem.l_quantity) AS avg_qty, AVG(lineitem.l_extendedprice) AS avg_price, AVG(lineitem.l_discount) AS avg_disc, COUNT(UInt8(1)) AS count_order
ballista-scheduler_1  |     Aggregate: groupBy=[[lineitem.l_returnflag, lineitem.l_linestatus]], aggr=[[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice), SUM(lineitem.l_extendedprice * Float64(1) - lineitem.l_discountFloat64(1) - lineitem.l_discountlineitem.l_discountFloat64(1)lineitem.l_extendedprice AS lineitem.l_extendedprice * Float64(1) - lineitem.l_discount) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), SUM(lineitem.l_extendedprice * Float64(1) - lineitem.l_discountFloat64(1) - lineitem.l_discountlineitem.l_discountFloat64(1)lineitem.l_extendedprice AS lineitem.l_extendedprice * Float64(1) - lineitem.l_discount * Float64(1) + lineitem.l_tax) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(UInt8(1))]]
ballista-scheduler_1  |       Projection: lineitem.l_extendedprice * Float64(1) - lineitem.l_discount AS lineitem.l_extendedprice * Float64(1) - lineitem.l_discountFloat64(1) - lineitem.l_discountlineitem.l_discountFloat64(1)lineitem.l_extendedprice, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_tax, lineitem.l_returnflag, lineitem.l_linestatus
ballista-scheduler_1  |         Filter: lineitem.l_shipdate <= Date32("10471")
ballista-scheduler_1  |           TableScan: lineitem projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], partial_filters=[lineitem.l_shipdate <= Date32("10471")]

Then the following code fails:

        let plan = session_ctx.create_physical_plan(&optimized_plan).await?;
        let x = format!("{:?}", plan);
        assert!(!x.contains("EmptyExec"));

@andygrove
Copy link
Member Author

physical plan has this:

input: FilterExec { predicate: BinaryExpr { left: Column { name: "l_shipdate", index: 6 }, op: LtEq, right: Literal { value: Date32("10471") } }, 
input: RepartitionExec { 
input: EmptyExec { produce_one_row: false, schema: Schema { fields: [Field { name: "l_quantity"

@andygrove
Copy link
Member Author

I now suspect this is somehow related to the eliminate_filter optimization rule inserting an EmptyRelation

@andygrove
Copy link
Member Author

I maybe realize now what the root issue is - I was running the benchmark against a data set that was not mounted into the containers running under docker compose. I would expect this to cause the query to fail but somehow the optimizer is determining that no rows can match the filter and just removes the table scan!

@andygrove
Copy link
Member Author

andygrove commented Oct 16, 2022

This is where the EmptyExec comes from:

impl TableProvider for ListingTable {

    async fn scan(
        &self,
        ctx: &SessionState,
        projection: &Option<Vec<usize>>,
        filters: &[Expr],
        limit: Option<usize>,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        let (partitioned_file_lists, statistics) =
            self.list_files_for_scan(ctx, filters, limit).await?;

        // if no files need to be read, return an `EmptyExec`
        if partitioned_file_lists.is_empty() {
            let schema = self.schema();
            let projected_schema = project_schema(&schema, projection.as_ref())?;
            return Ok(Arc::new(EmptyExec::new(false, projected_schema)));
        }

@andygrove andygrove changed the title Cannot run benchmarks when using docker-compose Scheduler silently replaces ParquetExec with EmptyExec if data path is not correctly mounted in container Oct 16, 2022
@andygrove andygrove added this to the Ballista 0.9.0 milestone Oct 17, 2022
@mingmwang
Copy link
Contributor

Is it a bug? Currently we do not have a Catalog service, if the data path does not exist, I think it is valid to return an empty relation.

@avantgardnerio
Copy link
Contributor

Is it a bug?

IMO, absolutely. I think few users would expect this behavior, and would spend quite a bit of time tracking it down. I can't think of another piece of software that treats a missing file the same way as an empty one.

@Dandandan
Copy link
Contributor

I believe Spark / Presto / etc. commonly return an empty result when given a path/table without any files (on object storage). This makes sense for an empty table.

Looking at the example though it shows an actual file that has been listed, so in that case I agree we should return an error.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants