Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] "Schemas are inconsistent" error for parquet files which have the same dtype, but are different in not null setting #429

Closed
gabrielspmoreira opened this issue Nov 12, 2020 · 10 comments
Labels
bug Something isn't working

Comments

@gabrielspmoreira
Copy link
Member

Describe the bug
This bug did not ocurr with NVT 0.2, but now occurs with the main branch (future NVT 0.3).
It us raised the error "Schemas are inconsistent" when the parquet files in the same folder share the same columns and dtypes, but there are null values for some column in one of the parquet files, and not in the corresponding column of another parquet file.
But it is raised an error when an NVT dataset is instantiated and we try to head() its first elements, like

ds = nvt.Dataset(PATH, engine="parquet", part_size="1000MB")
ds.to_ddf().head()

The error raised then is:

---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/io/parquet/arrow.py in _append_row_groups(metadata, md)
     33     try:
---> 34         metadata.append_row_groups(md)
     35     except RuntimeError as err:

/opt/conda/envs/rapids/lib/python3.7/site-packages/pyarrow/_parquet.pyx in pyarrow._parquet.FileMetaData.append_row_groups()

RuntimeError: AppendRowGroups requires equal schemas.

The above exception was the direct cause of the following exception:

RuntimeError                              Traceback (most recent call last)
<ipython-input-4-b3788a1214d6> in <module>
----> 1 ds.to_ddf().head()

/nvtabular0.3/NVTabular/nvtabular/io/dataset.py in to_ddf(self, columns, shuffle, seed)
    263         """
    264         # Use DatasetEngine to create ddf
--> 265         ddf = self.engine.to_ddf(columns=columns)
    266 
    267         # Shuffle the partitions of ddf (optional)

/nvtabular0.3/NVTabular/nvtabular/io/parquet.py in to_ddf(self, columns)
    103             gather_statistics=False,
    104             split_row_groups=self.row_groups_per_part,
--> 105             storage_options=self.storage_options,
    106         )
    107 

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask_cudf/io/parquet.py in read_parquet(path, columns, split_row_groups, row_groups_per_part, **kwargs)
    192         split_row_groups=split_row_groups,
    193         engine=CudfEngine,
--> 194         **kwargs,
    195     )
    196 

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, gather_statistics, split_row_groups, chunksize, **kwargs)
    237         filters=filters,
    238         split_row_groups=split_row_groups,
--> 239         **kwargs,
    240     )
    241 

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask_cudf/io/parquet.py in read_metadata(*args, **kwargs)
     15     @staticmethod
     16     def read_metadata(*args, **kwargs):
---> 17         meta, stats, parts, index = ArrowEngine.read_metadata(*args, **kwargs)
     18 
     19         # If `strings_to_categorical==True`, convert objects to int32

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/io/parquet/arrow.py in read_metadata(cls, fs, paths, categories, index, gather_statistics, filters, split_row_groups, **kwargs)
    654             gather_statistics,
    655         ) = _gather_metadata(
--> 656             paths, fs, split_row_groups, gather_statistics, filters, dataset_kwargs
    657         )
    658 

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/io/parquet/arrow.py in _gather_metadata(paths, fs, split_row_groups, gather_statistics, filters, dataset_kwargs)
    246                 md.set_file_path(fn)
    247             if metadata:
--> 248                 _append_row_groups(metadata, md)
    249             else:
    250                 metadata = md

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/io/parquet/arrow.py in _append_row_groups(metadata, md)
     40                 "pyarrow schema. Such as "
     41                 '`to_parquet(..., schema={"column1": pa.string()})`'
---> 42             ) from err
     43         else:
     44             raise err

RuntimeError: Schemas are inconsistent, try using `to_parquet(..., schema="infer")`, or pass an explicit pyarrow schema. Such as `to_parquet(..., schema={"column1": pa.string()})`

By using this script from @rjzamora , it was possible to check that the metadata for the parquet files differs because columns are not null for one file and nullable for the other, that contains nulls.

ValueError: Schema in /gfn-merlin/gmoreira/data/debug/gfn_problematic_columns//2020-08-30.parquet was different. 
DayOfWeekUTC: string not null
  -- field metadata --
  PARQUET:field_id: '1'
MonthUTC: string not null
  -- field metadata --
  PARQUET:field_id: '2'
HourOfDayUTC: float not null
  -- field metadata --
  PARQUET:field_id: '3'
WeekNumberUTC: float not null
  -- field metadata --
  PARQUET:field_id: '4'
-- schema metadata --
pandas: '{"index_columns": [], "column_indexes": [{"name": null, "field_n' + 685

vs

DayOfWeekUTC: string
  -- field metadata --
  PARQUET:field_id: '1'
MonthUTC: string
  -- field metadata --
  PARQUET:field_id: '2'
HourOfDayUTC: float
  -- field metadata --
  PARQUET:field_id: '3'
WeekNumberUTC: float
  -- field metadata --
  PARQUET:field_id: '4'
-- schema metadata --
pandas: '{"index_columns": [], "column_indexes": [{"name": null, "field_n' + 685

BTW, the two parquet files can be loaded individually using dask_cudf. But when they are loaded together (e.g. pointing to a directory with the two files)

df = dask_cudf.read_parquet(PATH).compute()

the following error is raised by dask_cudf

---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-8-a4ba9b6fbe05> in <module>
----> 1 df2 = df.compute()

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
    165         dask.base.compute
    166         """
--> 167         (result,) = compute(self, traverse=False, **kwargs)
    168         return result
    169 

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
    450         postcomputes.append(x.__dask_postcompute__())
    451 
--> 452     results = schedule(dsk, keys, **kwargs)
    453     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    454 

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/local.py in get_sync(dsk, keys, **kwargs)
    525     """
    526     kwargs.pop("num_workers", None)  # if num_workers present, remove it
--> 527     return get_async(apply_sync, 1, dsk, keys, **kwargs)
    528 
    529 

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    492 
    493                 while state["ready"] and len(state["running"]) < num_workers:
--> 494                     fire_task()
    495 
    496             succeeded = True

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/local.py in fire_task()
    464                         pack_exception,
    465                     ),
--> 466                     callback=queue.put,
    467                 )
    468 

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/local.py in apply_sync(func, args, kwds, callback)
    514 def apply_sync(func, args=(), kwds={}, callback=None):
    515     """ A naive synchronous version of apply_async """
--> 516     res = func(*args, **kwds)
    517     if callback is not None:
    518         callback(res)

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    225         failed = False
    226     except BaseException as e:
--> 227         result = pack_exception(e, dumps)
    228         failed = True
    229     return key, result, failed

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    220     try:
    221         task, data = loads(task_info)
--> 222         result = _execute_task(task, data)
    223         id = get_id()
    224         result = dumps((result, id))

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in read_parquet_part(func, fs, meta, part, columns, index, kwargs)
    274     This function is used by `read_parquet`."""
    275     if isinstance(part, list):
--> 276         dfs = [func(fs, rg, columns.copy(), index, **kwargs) for rg in part]
    277         df = concat(dfs, axis=0)
    278     else:

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in <listcomp>(.0)
    274     This function is used by `read_parquet`."""
    275     if isinstance(part, list):
--> 276         dfs = [func(fs, rg, columns.copy(), index, **kwargs) for rg in part]
    277         df = concat(dfs, axis=0)
    278     else:

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask_cudf/io/parquet.py in read_partition(fs, piece, columns, index, categories, partitions, **kwargs)
     55                 row_groups=row_group,
     56                 strings_to_categorical=strings_to_cats,
---> 57                 **kwargs.get("read", {}),
     58             )
     59         else:

/opt/conda/envs/rapids/lib/python3.7/site-packages/cudf/io/parquet.py in read_parquet(filepath_or_buffer, engine, columns, filters, row_groups, skiprows, num_rows, strings_to_categorical, use_pandas_metadata, *args, **kwargs)
    248             num_rows=num_rows,
    249             strings_to_categorical=strings_to_categorical,
--> 250             use_pandas_metadata=use_pandas_metadata,
    251         )
    252     else:

cudf/_lib/parquet.pyx in cudf._lib.parquet.read_parquet()

cudf/_lib/parquet.pyx in cudf._lib.parquet.read_parquet()

**RuntimeError: cuDF failure at: /opt/conda/envs/rapids/conda-bld/libcudf_1603354682602/work/cpp/src/io/parquet/reader_impl.cu:229: Corrupted header or footer**

Steps/Code to reproduce bug
Here is folder with a minimalist notebook and two small parquet files to reproduce the issue (internal access only for the NVT team)

Expected behavior
NVT should be able to load a dataset whose parquet files share the same dtypes, even if the columns are not null for some files and nullable for the others

Environment details (please complete the following information):
nvtabular in the main branch (future 0.3)
cudf==0.16
dask_cudf==0.16
pyarrow==1.0.1

Additional context
Add any other context about the problem here.

@gabrielspmoreira gabrielspmoreira added the bug Something isn't working label Nov 12, 2020
@vinhngx
Copy link
Contributor

vinhngx commented Nov 12, 2020

+1 I've seen this issue many times, but only with loading NVTabular's parquet output.
If we've got some tools to inspect and ensure data sanity (like the script from @rjzamora), it would be good to document and share in a "best practice" kinda document somewhere.

@EvenOldridge
Copy link
Member

Let's get these best practices into the doc related to data prep that's being prepared in:
#424

@rjzamora
Copy link
Collaborator

rjzamora commented Nov 12, 2020

Thanks for the nice reproducer @gabrielspmoreira !

I cannot reproduce the Corrupted header or footer you get with dask_cudf, but I certainly get the same Schemas are inconsistent problem. The general problem here is that we are using pyarrow to aggregate the metadata from each individual file into a single metadata object, and pyarrow is very picky about schema consistency within its aggregate_row_groups API. In the future, dask.dataframe will be moving to the pyarrow.dataset API, and this should no longer be an issue. Until then, it is up to the user to supply a dataset with a consistent schema if they want to parse/use the parquet metadata to produce row-group based partitions (as NVTabular does).

I think Dask, Dask-CuDF, and NVTabular could all use better documentation on the subject of parquet metadata handling and preparation. I am in the process of working out a simple set of utilities to (1) generate a global _metadata file and (2) rewrite a dataset to ensure a consistent schema.

Note that if each of your files can fit comfortably in GPU memory (one at a time), you can always generate a clean dataset by round-tripping the data with dask_cudf (without parsing metadata on the read side):

import dask_cudf

PATH = '/.../gfn_problematic_columns'
PATH_FIX = '/.../gfn_problematic_columns_fixed'

ddf = dask_cudf.read_parquet(PATH, gather_statistics=False)
ddf.to_parquet(PATH_FIX)

@gabrielspmoreira
Copy link
Member Author

Thanks for the investigation @rjzamora . It would be very helpful if we could at least provide an error message for the user that highlights either the full schemas (so that the user can check the internal metadata differences by himself, like the example below from your checker script) or describe in the message which columns are not matching among the schemas.

ValueError: Schema in /gfn-merlin/gmoreira/data/debug/gfn_problematic_columns//2020-08-30.parquet was different. 
DayOfWeekUTC: string not null
  -- field metadata --
  PARQUET:field_id: '1'
MonthUTC: string not null
  -- field metadata --
  PARQUET:field_id: '2'
HourOfDayUTC: float not null
  -- field metadata --
  PARQUET:field_id: '3'
WeekNumberUTC: float not null
  -- field metadata --
  PARQUET:field_id: '4'
-- schema metadata --
pandas: '{"index_columns": [], "column_indexes": [{"name": null, "field_n' + 685

vs

DayOfWeekUTC: string
  -- field metadata --
  PARQUET:field_id: '1'
MonthUTC: string
  -- field metadata --
  PARQUET:field_id: '2'
HourOfDayUTC: float
  -- field metadata --
  PARQUET:field_id: '3'
WeekNumberUTC: float
  -- field metadata --
  PARQUET:field_id: '4'
-- schema metadata --
pandas: '{"index_columns": [], "column_indexes": [{"name": null, "field_n' + 685

@gabrielspmoreira
Copy link
Member Author

@rjzamora I like your suggestion of using ddf = dask_cudf.read_parquet(PATH, gather_statistics=False).to_parquet(PATH_FIX) to fix the dataset. Therefore, I need to be able to have data separated by dates, because our model evaluation protocol with incremental training requires sliding over days. I could maybe use date as a partition column for the parquet files, and keep it as a single dataset. Does NVT supports loading data from parquet datasets partitioned by columns?

@rjzamora
Copy link
Collaborator

rjzamora commented Nov 13, 2020

It would be very helpful if we could at least provide an error message for the user that highlights either the full schemas

That makes sense. In dask.dataframe we suggest that the user try schema="infer" at write time, but more-specific information would be better.

Does NVT supports loading data from parquet datasets partitioned by columns?

Good question - It is supported upstream, but I haven't tried yet. I'll update this comment after I test :)

EDIT: It does seem that NVTabular will handle hive-partitioned datasets just fine. For example, I am able to do ddf.to_parquet(PATH_FIX, partition_on="MonthUTC") in the example above, and then I can read back PATH_FIX with nvt.Dataset without any errors. One thing to keep in mind is that dask_cudf.read_parquet (which is used by NVTabular) is still unable to aggregate multiple files into a single ddf partition. Therefore, restricting the size of the file too much will effectively limit the maximum partition size NVTabular can use (hurting performance). This is the reason why it may make sense to create a utility that can generate a clean dataset with partitions of a user-specified size. This is also another motivation for dsk#6765

@gabrielspmoreira
Copy link
Member Author

@rjzamora There might be scenarios where I have my original parquet partitioned by a columns and I'd like to keep the same column partition in the output parquet dataset preprocessed by NVTabular.
For example, it is common for Recommender Systems to have users interactions split by days, so that we can run incremental model training and evaluation.
Would it be possible for NVTabular to keep the same column partition, or provide a way to allow users to set the partition column for the output dataset?

@rjzamora
Copy link
Collaborator

Would it be possible for NVTabular to keep the same column partition, or provide a way to allow users to set the partition column for the output dataset?

NVTabular doesn't really support this right now, but it seems very doable. The easiest case to support is unshuffled output, where we can just use dask_cudf.to_parquet. If we want to do support nvt-style shuffling, the partition_on code path will require a bit more work.

@gabrielspmoreira
Copy link
Member Author

Thanks @rjzamora . I have opened a feature request for the partition column for the NVT output #431

kkraus14 pushed a commit to rapidsai/cudf that referenced this issue Nov 20, 2020
Depends on [dask#6851](dask/dask#6851)

[dask#6851](dask/dask#6851) introduces a new `create_metadata_file` utility which can generate a global `_metadata` file from a list of parquet files.  Since Dask's `read_parquet` code is optimized to leverage this shared metadata source, it makes a lot of sense to make this file easy to generate.

**Why have this utility in dask_cudf?**
Although I originally planned to keep this entirely upstream, it eventually became clear that cudf's **schema-agnostic** mechanism for aggregating metadata is adventageous when the dataset in question comprises files with inconsistent schema information.  For example, a pyarrow-written dataset may have an inconsistent schema if only a few partitions contain null elements.  In this case, the upstream version of `create_metadata_file` will fail with an "inconsistent schema" error, while the `dask_cudf` version will not.  This means the user can use the dask_cudf version in lieu of rewritting the entire dataset, because once the `_metadata` file is created, the schema's will no longer be validated at read time.


**Use Example**

```python
import glob
import dask_cudf

# Specify the list of parquet files to collect metadata from
paths = glob.glob("/my/dataset/*.parquet")

# Use `create_metadata_file` to generate `_metadata`
dask_cudf.io.parquet.create_metadata_file(paths)
...
```

Addresses [nvtabular#429](NVIDIA-Merlin/NVTabular#429)
@benfred
Copy link
Member

benfred commented Oct 6, 2021

@gabrielspmoreira @rjzamora - closing this , re-open if there is still an issue

@benfred benfred closed this as completed Oct 6, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

5 participants