-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Incorrect results in datafusion #1441
Comments
In the file above I removed NULLs from Parquet files (missing strings got replaced with "NULL" and ints with 0). Querying these files produces the desired results, which further confirms my theory, that there is something wrong with NULL handling. |
hi @franeklubi thanks for the detailed sharing. to simplify bug reproduction, can you help me understand the difference between parquet and csv data? specifically:
they seem to have different number of rows. Same thing applies to trips data. |
Hi @jimexist! Thanks for the reply The issue results from the headers kept in the CSV files. That's my bad - please remove them before testing. I will amend the issue info, so others won't get the same problem as you |
I've updated the |
thanks for the update. to people reading this, i'm still trying to minimize the reproduction steps, so i guess below is a simpler statement: CREATE EXTERNAL TABLE stop_parquet STORED AS PARQUET LOCATION './parquets/stops'; CREATE EXTERNAL TABLE stop_csv (time TEXT, trip_tid TEXT, trip_line TEXT, stop_name TEXT) STORED AS CSV LOCATION './csvs/stop.csv';
i.e. even without join, we can tell that parquet and csv reads differently. |
thinking out loud: doing In [10]: import pandas as pd
In [11]: csv_pd = pd.read_csv('./csvs/stop.csv')
In [12]: pq_pd = pd.read_parquet('./parquets/stops')
In [13]: csv_pd.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 33254 entries, 0 to 33253
Data columns (total 4 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 time 33254 non-null object
1 trip_tid 32113 non-null float64
2 trip_line 32126 non-null object
3 stop_name 705 non-null object
dtypes: float64(1), object(3)
memory usage: 1.0+ MB
In [14]: pq_pd.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 33254 entries, 0 to 33253
Data columns (total 4 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 time 33254 non-null datetime64[ns]
1 trip_tid 32113 non-null float64
2 trip_line 32126 non-null object
3 stop_name 705 non-null object
dtypes: datetime64[ns](1), float64(1), object(2)
memory usage: 1.0+ MB and at least the |
seems like these values are absent in parquet readings:
and the scanning process was invalid:
notice the
Given the investigation above I believe this might be related to the single distinct to group by optimization or the hash aggregation steps. cc @Dandandan @houqp |
I'll try and check this out in more detail tomorrow. Thanks for the investigation @jimexist |
I spent some time looking at the parquet data and the csv data, and it looks to me like there may be something wrong with the parquet reader. Specifically, I just ran a query that did a select * dump_parquet.sql: CREATE EXTERNAL TABLE stops_parquet
STORED AS PARQUET
LOCATION '/Users/alamb/Documents/Wrong-answer-datafusion-1141/issue_data/parquets/stops';
show columns from stops_parquet;
CREATE EXTERNAL TABLE trips_parquet
STORED AS PARQUET
LOCATION '/Users/alamb/Documents/Wrong-answer-datafusion-1141/issue_data/parquets/trips';
show columns from trips_parquet;
select * from stops_parquet order by time, trip_tid, trip_line, stop_name;
select * from trips_parquet order by tid, line, base_day; and CREATE EXTERNAL TABLE stops_csv (time timestamp, trip_tid bigint, trip_line TEXT, stop_name TEXT)
STORED AS CSV WITH HEADER ROW
LOCATION '/Users/alamb/Documents/Wrong-answer-datafusion-1141/issue_data/csvs/stop.csv';
show columns from stops_csv;
CREATE EXTERNAL TABLE trips_csv (tid bigint, line TEXT, base_day date)
STORED AS CSV WITH HEADER ROW
LOCATION '/Users/alamb/Documents/Wrong-answer-datafusion-1141/issue_data/csvs/trip.csv';
show columns from trips_csv;
select * from stops_csv order by time, trip_tid, trip_line, stop_name;
select * from trips_csv order by tid, line, base_day; Like this: ~/Software/arrow-datafusion/target/debug/datafusion-cli -f dump_csv.sql > dump_csv.txt
~/Software/arrow-datafusion/target/debug/datafusion-cli -f dump_parquet.sql > dump_parquet.txt The results are here: And a quick visual diff shows they aren't the same The first few lines of
While the first few lines of
(note that the stop name is different) However, when I look for that mismatched line Here is the raw data in csv: $ grep 54788307 issue_data/csvs/stop.csv
2021-11-15 00:00:00,54788307,186,RONDO ZESŁAŃCÓW SYBERYJSKICH
Here is what comes out when using pandas: Python 3.8.12 (default, Oct 13 2021, 06:42:42)
[Clang 13.0.0 (clang-1300.0.29.3)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import pandas as pd
import pandas as pd
>>> df = pd.read_parquet('issue_data/parquets/stops/2021-11.parquet')
df = pd.read_parquet('issue_data/parquets/stops/2021-11.parquet')
>>> df.to_csv('/tmp/2021-11.csv')
df.to_csv('/tmp/2021-11.csv')
>>> $ grep 54788307 /tmp/2021-11.csv
16591,2021-11-15 00:00:00,54788307.0,186,RONDO ZESŁAŃCÓW SYBERYJSKICH |
Oh weird, but then a direct query
Seems to get the correct answer 🤔 |
When I just do select * the answer is not correct either: ❯ select * from stops_parquet;
+---------------------+----------+-----------+------------------------------+
| time | trip_tid | trip_line | stop_name |
+---------------------+----------+-----------+------------------------------+
...
| 2021-11-15 00:00:00 | 54788307 | 186 | Armatnia |
...
+---------------------+----------+-----------+------------------------------+
33254 rows in set. Query took 1.553 seconds. |
I have a smaller reproducer and am trying to narrow down where the problem is but probably won't be able to work on this until next week sometime at the earliest In case anyone else is interested, here is the repo: repro.zip cargo run --bin datafusion-cli -- -f repro.sql | grep 54788307 It should print out
But actually prints out
|
I plan to focus on this issue tomorrow |
A guess: it might be an issue with reading statistics / predicate push down? |
The plot thickens! 🕵️ Regarding parquet predicate pruning, amusingly in this case, I think row group pruning actually helps avoid the problem. As you may recall, when a filter is applied like this select * from stops_parquet where trip_tid=54788307; The answer is correct ( +---------------------+----------+-----------+------------------------------+
| time | trip_tid | trip_line | stop_name |
+---------------------+----------+-----------+------------------------------+
| 2021-11-15 00:00:00 | 54788307 | 186 | RONDO ZESŁAŃCÓW SYBERYJSKICH |
+---------------------+----------+-----------+------------------------------+ However, when I disable pruning then the wrong answer comes out! +---------------------+----------+-----------+-----------+
| time | trip_tid | trip_line | stop_name |
+---------------------+----------+-----------+-----------+
| 2021-11-15 00:00:00 | 54788307 | 186 | Armatnia |
+---------------------+----------+-----------+-----------+ I added some debugging, and verified that the query does in fact skip several row groups:
|
Cross referencing the data output from pandas and the parquer reader, Pandas:
And arrow says the following (interestingly, note that the
|
This discrepancy looks like it comes out of I have not studied the code enough yet to fully understand what it is doing, and I need to attend to some other items now. If anyone has ideas (cc @tustvold ) on where to look next I would appreciate it |
I will probably file a ticket in arrow-rs shortly with the slimmed down reproducer |
@alamb this reads like a detective 🕵️♂️ |
I can also open the parquet files from arrow2. I think that this is something on the parquet crate. the below pasted in this example: let mut distinct = HashSet::<String>::new();
let start = SystemTime::now();
for maybe_batch in reader {
let batch = maybe_batch?;
let a = batch
.column(3)
.as_any()
.downcast_ref::<Utf8Array<i32>>()
.unwrap();
for i in a {
if let Some(i) = i {
distinct.insert(i.to_string());
}
}
}
println!("{}", distinct.len());
println!("{:#?}", distinct); using
yields 132 valid stop_names (over all row groups):
For reference, the file heavily uses RLE-encoding (i.e. the RLE bit of the RLE-bitpacking hybrid parquet encoder), both for the validity and for the dictionary indices, so that would be a place to go for. |
You could try using ComplexObjectArrayReader to decode the column instead of ArrowArrayReader. This might help narrow down if the bug lies in the RLE decoding or something higher up in ArrowArrayReader. Alternatively you could see if the issue occurs with apache/arrow-rs#1082 which replaces ArrowArrayReader with an alternative that shares more with the PrimitiveArrayReader/ComplexObjectArrayReader implementations, again this might help narrow down the origin. Not at a computer at the moment, otherwise would try myself |
Thanks @tustvold and @jorgecarleitao for the tips. Will give them a try. |
FYI apache/arrow-rs#1110 runs into a similar issue, that appears to be fixed by switching to ComplexObjectArrayReader instead of ArrowArrayReader. I will have a poke around tomorrow if I have time, and see if I can spot what is going wrong. |
I tested the fix from @yordan-pavlov in apache/arrow-rs#1130 against my reproducer and with #1130 it now gets the correct answer ❤️ so that seems like progress |
I believe with the update to use arrow 7.0.0 which contains @yordan-pavlov 's fix, this should now be fixed in DataFusion? |
Ye that is my understanding -- someone just needs to rerun the (wonderful) reproducer from @franeklubi to confirm |
I reran the queries from @franeklubi at 2e91818: SetupCREATE EXTERNAL TABLE stop (time TEXT, trip_tid TEXT, trip_line TEXT, stop_name TEXT) STORED AS CSV WITH HEADER ROW LOCATION '/Users/alamb/Downloads/issue_data/csvs/stop.csv';
CREATE EXTERNAL TABLE trip (tid TEXT, line TEXT, base_day TEXT) STORED AS CSV WITH HEADER ROW LOCATION '/Users/alamb/Downloads/issue_data/csvs/trip.csv';
CREATE EXTERNAL TABLE stop_parquet STORED AS PARQUET LOCATION '/Users/alamb/Downloads/issue_data/parquets/stops/';
CREATE EXTERNAL TABLE trip_parquet STORED AS PARQUET LOCATION '/Users/alamb/Downloads/issue_data/parquets/trips/'; Now the results are consistent when using csv and parquet: Parquet❯ SELECT DISTINCT stop_name FROM stop_parquet INNER JOIN trip_parquet ON tid = trip_tid WHERE line = '176' ORDER BY stop_name NULLS LAST;
+----------------------+
| stop_name |
+----------------------+
| Bartnicza |
| Bazyliańska |
| Bolesławicka |
| Brzezińska |
| Budowlana |
| Choszczówka |
| Chłodnia |
| Daniszewska |
| Fabryka Pomp |
| Insurekcji |
| Marcelin |
| Marywilska-Las |
| Ołówkowa |
| PKP Płudy |
| PKP Żerań |
| Parowozowa |
| Pelcowizna |
| Polnych Kwiatów |
| Raciborska |
| Rembielińska |
| Sadkowska |
| Smugowa |
| Starego Dębu |
| Zyndrama z Maszkowic |
| os.Marywilska |
| Śpiewaków |
| |
+----------------------+
27 rows in set. Query took 0.042 seconds. Csv❯ SELECT DISTINCT stop_name FROM stop INNER JOIN trip ON tid = trip_tid WHERE line = '176' ORDER BY stop_name NULLS LAST;
+----------------------+
| stop_name |
+----------------------+
| |
| Bartnicza |
| Bazyliańska |
| Bolesławicka |
| Brzezińska |
| Budowlana |
| Choszczówka |
| Chłodnia |
| Daniszewska |
| Fabryka Pomp |
| Insurekcji |
| Marcelin |
| Marywilska-Las |
| Ołówkowa |
| PKP Płudy |
| PKP Żerań |
| Parowozowa |
| Pelcowizna |
| Polnych Kwiatów |
| Raciborska |
| Rembielińska |
| Sadkowska |
| Smugowa |
| Starego Dębu |
| Zyndrama z Maszkowic |
| os.Marywilska |
| Śpiewaków |
+----------------------+
27 rows in set. Query took 0.116 seconds. Interestingly, the CSV results don't seem to have the |
Describe the bug
I came upon a bug while querying my custom Parquet dataset, which causes DataFusion to produce incoherent and incorrect results.
I tested my dataset in various ways, all of which produced the desired results:
To Reproduce
Steps to reproduce the behavior:
issue_data.zip
Inside there are the Parquet files and CSVs with exactly the same data (also, there's an sqlite database created from the provided CSV files).
README.md
to reproduce the issue:The query, that fails when querying Parquet files with datafusion-cli:
Change only in
where
fromline
totrip_line
produces the desired results.Expected behavior
Should produce these 27 rows:
Query 1 from
README.md
(mentioned above) produces this incorrect set of 33 rows:Additional context
Datafusion version:
My guess
Since the Parquet files have encoded NULLs, and reading the CSV files with
datafusion-cli
gets rid of those, my best bet is on the usage of NULLs and some weir behavior when joining.The text was updated successfully, but these errors were encountered: