Skip to content

Commit

Permalink
FEAT-#3603: add experimental read_custom_text function that can rea…
Browse files Browse the repository at this point in the history
…d custom line-by-line text files (#3441)

Signed-off-by: Anatoly Myachev <[email protected]>
Co-authored-by: Vasily Litvinov <[email protected]>
  • Loading branch information
anmyachev and vnlitvinov authored Mar 15, 2022
1 parent cb54844 commit 006ccd0
Show file tree
Hide file tree
Showing 10 changed files with 282 additions and 4 deletions.
1 change: 1 addition & 0 deletions docs/release_notes/release_notes-0.14.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ Key Features and Updates
* REFACTOR-#4093: Refactor base to be smaller (#4220)
* REFACTOR-#4047: Rename `cluster` directory to `cloud` in examples (#4212)
* Pandas API implementations and improvements
* FEAT-#3603: add experimental `read_custom_text` function that can read custom line-by-line text files (#3441)
* FEAT-#979: Enable reading from SQL server (#4279)
* OmniSci enhancements
*
Expand Down
5 changes: 5 additions & 0 deletions modin/core/execution/dispatching/factories/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,11 @@ def to_pickle(cls, *args, **kwargs):
def to_pickle_distributed(cls, *args, **kwargs):
return cls.__factory._to_pickle_distributed(*args, **kwargs)

@classmethod
@_inherit_docstrings(factories.ExperimentalPandasOnRayFactory._read_custom_text)
def read_custom_text(cls, **kwargs):
return cls.__factory._read_custom_text(**kwargs)

@classmethod
@_inherit_docstrings(factories.BaseFactory._to_csv)
def to_csv(cls, *args, **kwargs):
Expand Down
9 changes: 9 additions & 0 deletions modin/core/execution/dispatching/factories/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,15 @@ def _read_csv_glob(cls, **kwargs):
def _read_pickle_distributed(cls, **kwargs):
return cls.io_cls.read_pickle_distributed(**kwargs)

@classmethod
@doc(
_doc_io_method_raw_template,
source="Custom text files",
params=_doc_io_method_kwargs_params,
)
def _read_custom_text(cls, **kwargs):
return cls.io_cls.read_custom_text(**kwargs)

@classmethod
def _to_pickle_distributed(cls, *args, **kwargs):
"""
Expand Down
4 changes: 4 additions & 0 deletions modin/core/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
from .text.csv_glob_dispatcher import CSVGlobDispatcher
from .text.fwf_dispatcher import FWFDispatcher
from .text.json_dispatcher import JSONDispatcher
from .text.custom_text_dispatcher import (
CustomTextExperimentalDispatcher,
)
from .text.excel_dispatcher import ExcelDispatcher
from .file_dispatcher import FileDispatcher
from .text.text_file_dispatcher import TextFileDispatcher
Expand All @@ -41,4 +44,5 @@
"SQLDispatcher",
"ExcelDispatcher",
"PickleExperimentalDispatcher",
"CustomTextExperimentalDispatcher",
]
101 changes: 101 additions & 0 deletions modin/core/io/text/custom_text_dispatcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# Licensed to Modin Development Team under one or more contributor license agreements.
# See the NOTICE file distributed with this work for additional information regarding
# copyright ownership. The Modin Development Team licenses this file to you under the
# Apache License, Version 2.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

"""Module houses `CustomTextExperimentalDispatcher` class, that is used for reading custom text files."""

import pandas

from modin.core.io.file_dispatcher import OpenFile
from modin.core.io.text.text_file_dispatcher import TextFileDispatcher
from modin.config import NPartitions


class CustomTextExperimentalDispatcher(TextFileDispatcher):
"""Class handles utils for reading custom text files."""

read_callback = None

@classmethod
def _read(cls, filepath_or_buffer, columns, custom_parser, **kwargs):
"""
Read data from `filepath_or_buffer` according to the passed `read_custom_text` `kwargs` parameters.
Parameters
----------
filepath_or_buffer : str, path object or file-like object
`filepath_or_buffer` parameter of `read_custom_text` function.
columns : list or callable(file-like object, **kwargs) -> list
Column names of list type or callable that create column names from opened file
and passed `kwargs`.
custom_parser : callable(file-like object, **kwargs) -> pandas.DataFrame
Function that takes as input a part of the `filepath_or_buffer` file loaded into
memory in file-like object form.
**kwargs : dict
Parameters of `read_custom_text` function.
Returns
-------
BaseQueryCompiler
Query compiler with imported data for further processing.
"""
filepath_or_buffer_md = (
cls.get_path(filepath_or_buffer)
if isinstance(filepath_or_buffer, str)
else cls.get_path_or_buffer(filepath_or_buffer)
)
compression_infered = cls.infer_compression(
filepath_or_buffer, kwargs["compression"]
)

with OpenFile(filepath_or_buffer_md, "rb", compression_infered) as f:
splits = cls.partitioned_file(
f,
num_partitions=NPartitions.get(),
is_quoting=kwargs.pop("is_quoting"),
nrows=kwargs["nrows"],
)

if callable(columns):
with OpenFile(filepath_or_buffer_md, "rb", compression_infered) as f:
columns = columns(f, **kwargs)
if not isinstance(columns, pandas.Index):
columns = pandas.Index(columns)

empty_pd_df = pandas.DataFrame(columns=columns)
index_name = empty_pd_df.index.name
column_widths, num_splits = cls._define_metadata(empty_pd_df, columns)

# kwargs that will be passed to the workers
partition_kwargs = dict(
kwargs,
fname=filepath_or_buffer_md,
num_splits=num_splits,
nrows=None,
compression=compression_infered,
)

partition_ids, index_ids, dtypes_ids = cls._launch_tasks(
splits, callback=custom_parser, **partition_kwargs
)

new_query_compiler = cls._get_new_qc(
partition_ids=partition_ids,
index_ids=index_ids,
dtypes_ids=dtypes_ids,
index_col=None,
index_name=index_name,
column_widths=column_widths,
column_names=columns,
nrows=kwargs["nrows"],
)
return new_query_compiler
12 changes: 10 additions & 2 deletions modin/core/storage_formats/pandas/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def generic_parse(fname, **kwargs):
num_splits = kwargs.pop("num_splits", None)
start = kwargs.pop("start", None)
end = kwargs.pop("end", None)
header_size = kwargs.pop("header_size", None)
header_size = kwargs.pop("header_size", 0)
encoding = kwargs.get("encoding", None)
callback = kwargs.pop("callback")
if start is None or end is None:
Expand Down Expand Up @@ -380,6 +380,14 @@ def parse(fname, **kwargs):
return _split_result_for_readers(1, num_splits, df) + [length, width]


@doc(_doc_pandas_parser_class, data_type="custom text")
class CustomTextExperimentalParser(PandasParser):
@staticmethod
@doc(_doc_parse_func, parameters=_doc_parse_parameters_common)
def parse(fname, **kwargs):
return PandasParser.generic_parse(fname, **kwargs)


@doc(_doc_pandas_parser_class, data_type="tables with fixed-width formatted lines")
class PandasFWFParser(PandasParser):
@staticmethod
Expand Down Expand Up @@ -617,7 +625,7 @@ def parse(fname, **kwargs):
# This only happens when we are reading with only one worker (Default)
return pandas.read_json(fname, **kwargs)
if not pandas_df.columns.equals(columns):
raise NotImplementedError("Columns must be the same across all rows.")
raise ValueError("Columns must be the same across all rows.")
partition_columns = pandas_df.columns
return _split_result_for_readers(1, num_splits, pandas_df) + [
len(pandas_df),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,15 @@
_split_result_for_readers,
PandasCSVGlobParser,
PandasPickleExperimentalParser,
CustomTextExperimentalParser,
)
from modin.core.storage_formats.pandas.query_compiler import PandasQueryCompiler
from modin.core.execution.ray.implementations.pandas_on_ray.io import PandasOnRayIO
from modin.core.io import CSVGlobDispatcher, PickleExperimentalDispatcher
from modin.core.io import (
CSVGlobDispatcher,
PickleExperimentalDispatcher,
CustomTextExperimentalDispatcher,
)
from modin.core.execution.ray.implementations.pandas_on_ray.dataframe import (
PandasOnRayDataframe,
)
Expand Down Expand Up @@ -64,6 +69,12 @@ class ExperimentalPandasOnRayIO(PandasOnRayIO):
build_args,
)._read

read_custom_text = type(
"",
(RayTask, CustomTextExperimentalParser, CustomTextExperimentalDispatcher),
build_args,
)._read

@classmethod
def read_sql(
cls,
Expand Down
1 change: 1 addition & 0 deletions modin/experimental/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from .io import ( # noqa F401
read_sql,
read_csv_glob,
read_custom_text,
read_pickle_distributed,
to_pickle_distributed,
)
Expand Down
41 changes: 41 additions & 0 deletions modin/experimental/pandas/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,47 @@ def read_sql(
return DataFrame(query_compiler=FactoryDispatcher.read_sql(**kwargs))


def read_custom_text(
filepath_or_buffer,
columns,
custom_parser,
compression="infer",
nrows: Optional[int] = None,
is_quoting=True,
):
"""
Load custom text data from file.
Parameters
----------
filepath_or_buffer : str
File path where the custom text data will be loaded from.
columns : list or callable(file-like object, **kwargs) -> list
Column names of list type or callable that create column names from opened file
and passed `kwargs`.
custom_parser : callable(file-like object, **kwargs) -> pandas.DataFrame
Function that takes as input a part of the `filepath_or_buffer` file loaded into
memory in file-like object form.
compression : {'infer', 'gzip', 'bz2', 'zip', 'xz', None}, default: 'infer'
If 'infer' and 'path_or_url' is path-like, then detect compression from
the following extensions: '.gz', '.bz2', '.zip', or '.xz' (otherwise no
compression). If 'infer' and 'path_or_url' is not path-like, then use
None (= no decompression).
nrows : int, optional
Amount of rows to read.
is_quoting : bool, default: True
Whether or not to consider quotes.
Returns
-------
modin.DataFrame
"""
Engine.subscribe(_update_engine)
assert IsExperimental.get(), "This only works in experimental mode"
_, _, _, kwargs = inspect.getargvalues(inspect.currentframe())
return DataFrame(query_compiler=FactoryDispatcher.read_custom_text(**kwargs))


# CSV and table
def _make_parser_func(sep: str) -> Callable:
"""
Expand Down
99 changes: 98 additions & 1 deletion modin/experimental/pandas/test/test_io_exp.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,19 @@

from contextlib import nullcontext
import glob
import json
import numpy as np
import pandas
import pytest
import modin.experimental.pandas as pd
from modin.config import Engine
from modin.utils import get_current_execution
from modin.pandas.test.utils import df_equals, teardown_test_files, test_data
from modin.pandas.test.utils import (
df_equals,
get_unique_filename,
teardown_test_files,
test_data,
)
from modin.test.test_utils import warns_that_defaulting_to_pandas
from modin.pandas.test.utils import parse_dates_values_by_id, time_parsing_csv_path

Expand Down Expand Up @@ -223,3 +230,93 @@ def test_distributed_pickling(filename, compression):

pickle_files = glob.glob(filename)
teardown_test_files(pickle_files)


@pytest.mark.skipif(
not Engine.get() == "Ray",
reason=f"{Engine.get()} does not have experimental read_custom_text API",
)
def test_read_custom_json_text():
filename = get_unique_filename(extension="json")

def _generate_json(file_name, nrows, ncols):
data = np.random.rand(nrows, ncols)
df = pandas.DataFrame(data, columns=[f"col{x}" for x in range(ncols)])
df.to_json(file_name, lines=True, orient="records")

_generate_json(filename, 64, 8)

# Custom parser allows us to add some specifics to reading files,
# which is not available through the ready-made API.
# For example, the parser allows us to reduce the amount of RAM
# required for reading by selecting a subset of columns.
def _custom_parser(io_input, **kwargs):
result = {"col0": [], "col1": [], "col3": []}
for line in io_input:
# for example, simjson can be used here
obj = json.loads(line)
for key in result:
result[key].append(obj[key])
return pandas.DataFrame(result).rename(columns={"col0": "testID"})

df1 = pd.read_custom_text(
filename,
columns=["testID", "col1", "col3"],
custom_parser=_custom_parser,
is_quoting=False,
)
df2 = pd.read_json(filename, lines=True)[["col0", "col1", "col3"]].rename(
columns={"col0": "testID"}
)
df_equals(df1, df2)


@pytest.mark.skipif(
not Engine.get() == "Ray",
reason=f"{Engine.get()} does not have experimental API",
)
def test_read_evaluated_dict():
filename = get_unique_filename(extension="json")

def _generate_evaluated_dict(file_name, nrows, ncols):
result = {}
keys = [f"col{x}" for x in range(ncols)]

with open(file_name, mode="w") as _file:
for i in range(nrows):
data = np.random.rand(ncols)
for idx, key in enumerate(keys):
result[key] = data[idx]
_file.write(str(result))
_file.write("\n")

_generate_evaluated_dict(filename, 64, 8)

# This parser allows us to read a format not supported by other reading functions
def _custom_parser(io_input, **kwargs):
cat_list = []
asin_list = []
for line in io_input:
obj = eval(line)
cat_list.append(obj["col1"])
asin_list.append(obj["col2"])
return pandas.DataFrame({"col1": asin_list, "col2": cat_list})

df1 = pd.read_custom_text(
filename,
columns=["col1", "col2"],
custom_parser=_custom_parser,
)
assert df1.shape == (64, 2)

def columns_callback(io_input, **kwargs):
columns = None
for line in io_input:
columns = list(eval(line).keys())[1:3]
break
return columns

df2 = pd.read_custom_text(
filename, columns=columns_callback, custom_parser=_custom_parser
)
df_equals(df1, df2)

0 comments on commit 006ccd0

Please sign in to comment.