-
Notifications
You must be signed in to change notification settings - Fork 44
WIP: Support batch-reading for data-types with chunksize parameter #206
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments to explain what I changed, hopefully it helps with the review!
mlem/contrib/pandas.py
Outdated
def read_html(*args, **kwargs): | ||
# read_html returns list of dataframes | ||
return pd.read_html(*args, **kwargs)[0] | ||
|
||
|
||
PANDAS_FORMATS = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored to make this a non-global variable to avoid needing hacks around tests for batch-reading.
def test_simple_batch_df(data, format):
writer = PandasWriter(format=format)
# Batch-reading JSON files require line-delimited data
if format == "json":
writer.fmt.write_args = {"orient": "records", "lines": True}
dataset_write_read_check(
DatasetType.create(data), writer, PandasReader, pd.DataFrame.equals, batch=2
)
# Need reset if PANDAS_FORMATS is a global variable
if format == "csv":
writer.fmt.write_args = {"index": False}
writer.fmt.read_args = {}
writer.fmt.read_func = read_csv_with_unnamed
if format == "json":
writer.fmt.write_args = {"date_format": "iso", "date_unit": "ns"}
writer.fmt.read_args = {}
writer.fmt.read_func = read_json_reset_index
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure, why you need to change something for tests? This means you testing something else. no?
And you don't need function anyway. You can just do writer.fmt = PandasFormat(<whatever you need>)
before chech, or maybe use fmt = writer.fmt.copy()
and then change what you need.
mlem/core/dataset_type.py
Outdated
dataset: DatasetType, | ||
storage: Storage, | ||
path: str, | ||
writer_fmt_args: Optional[Dict[str, Any]] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added optional parameter to update writer_fmt_args which is required for JSON batch-reading (needs to be in line-delimited form).
This parameter is currently only used in batch-reading tests, to write JSON dataset in line-delimited for batch-reading. It can be further exposed to users here - https://github.com/iterative/mlem/blob/main/mlem/core/objects.py#L659, via write_value
call.
Lmk if you want me to look into adding this as part of the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's leave batch writing out of the scope of this PR. If you only using it for tests, lets remove this for now. In tests you can write the test dataset manually without mlem, like our forefathers did.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I commented in the code a bit, but main idea is following: you implemented batch
as a flag for regular read. But actually we need like a separate type of loading data.
It should return some lazy iterable object that would yield batches, preferably of the same type that the whole dataset is. For example, apply
flow should change something like that:
if batch is None:
# existing logic
res = apply(model, dataset.get_value())
else:
res = [apply(model, batch) for batch in dataset.iter_batches(batch)]
You don't need to worry about batch writing or merging different batches for now
mlem/contrib/pandas.py
Outdated
unnamed = {} | ||
for i, df_chunk in enumerate(df_iterator): | ||
# Instantiate Pandas DataFrame with columns if it is the first chunk | ||
if i == 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this may be df is None
mlem/contrib/pandas.py
Outdated
for col in df_chunk.columns: | ||
if col.startswith("Unnamed: "): | ||
unnamed[col] = "" | ||
df = pd.concat([df, df_chunk], ignore_index=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You still read the whole file to memory. The idea is to apply model to each part before you read next
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, sorry I misunderstood. - I'll refactor and implement iterator for these batch functions.
mlem/contrib/pandas.py
Outdated
def read_html(*args, **kwargs): | ||
# read_html returns list of dataframes | ||
return pd.read_html(*args, **kwargs)[0] | ||
|
||
|
||
PANDAS_FORMATS = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure, why you need to change something for tests? This means you testing something else. no?
And you don't need function anyway. You can just do writer.fmt = PandasFormat(<whatever you need>)
before chech, or maybe use fmt = writer.fmt.copy()
and then change what you need.
mlem/contrib/pandas.py
Outdated
@@ -505,16 +548,38 @@ def read(self, artifacts: Artifacts) -> DatasetType: | |||
self.dataset_type.align(self.fmt.read(artifacts)) | |||
) | |||
|
|||
def read_batch(self, artifacts: Artifacts, batch: int) -> DatasetType: | |||
fmt = update_batch_args(self.format, self.fmt, batch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I get now this thing above. I'd say we just need separate set of args for batch and non-batch reading, so you don't have to change the state every time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored this bit - so there's no longer a need to update args based on batch/non-batch reads.
mlem/contrib/pandas.py
Outdated
# Pandas supports batch-reading for JSON only if the JSON file is line-delimited | ||
# https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html#line-delimited-json | ||
if self.format == "json": | ||
dataset_lines = sum(1 for line in open(artifacts["data"].uri)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- you are reading the whole file here, but you actually need 2 lines to know there are enough.
- if it is one-line json, you still read the whole dataset. and then you actually read it again if everything is ok
- it will fail you your dataset has just 1 row
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's assume dataset file is in the right format for now
mlem/core/dataset_type.py
Outdated
dataset: DatasetType, | ||
storage: Storage, | ||
path: str, | ||
writer_fmt_args: Optional[Dict[str, Any]] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's leave batch writing out of the scope of this PR. If you only using it for tests, lets remove this for now. In tests you can write the test dataset manually without mlem, like our forefathers did.
@mike0sv I think my forked repo is no longer synced with this repository because it's made public now (my commits are no longer updated here). Shall I make a new PR? |
if df is None: | ||
df = pd.DataFrame(columns=chunk.columns, dtype=col_types) | ||
col_types = { | ||
chunk.columns[idx]: chunk.dtypes[idx] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
zip
would fit nicely here
@@ -404,6 +414,10 @@ class Config: | |||
def read(self, artifacts: Artifacts) -> DatasetType: | |||
raise NotImplementedError | |||
|
|||
@abstractmethod | |||
def read_batch(self, artifacts: Artifacts, batch: int) -> Iterator: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should actually be Iterator[DatasetType]
@@ -142,6 +145,7 @@ def load_meta( | |||
follow_links: bool = True, | |||
load_value: bool = False, | |||
fs: Optional[AbstractFileSystem] = None, | |||
batch: Optional[int] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
load_meta
should not have batch arg: you are only loading metadata. load_value
here is for convenience. If you need batching, you should set load_value=False
and then call read_batch
on DatasetMeta
directly
@@ -668,6 +668,9 @@ def write_value(self) -> Artifacts: | |||
def load_value(self): | |||
self.dataset = self.reader.read(self.relative_artifacts) | |||
|
|||
def load_batch_value(self, batch: int): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be changed to something like read_batch
, return Iterator[DatasetType]
and not set any values. self.dataset
field is for whole dataset value, and if you wish to use batching, that means you don't want to load whole dataset into memory. So, read_batch
will return lazy iterator that you can iterate on and get DatasetType
s to work with. For now lets say that those DatasetType
s should be the same as the reader
is holding (==DatasetType
of the whole dataset).
repo=data_repo, | ||
rev=data_rev, | ||
type_=import_type, | ||
batch=batch, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for now let's ignore this branch and focus on the other one without import. once we done, we can discuss how to approach this (I am not sure myself)
@@ -92,6 +99,7 @@ def apply( | |||
data_rev, | |||
load_value=True, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as per comments in metadata.py
and objects.py
, this should be load_meta(..., load_value=batch is None)
@@ -85,7 +86,7 @@ def apply( | |||
resolved_method = PREDICT_METHOD_NAME | |||
echo(EMOJI_APPLY + f"Applying `{resolved_method}` method...") | |||
res = [ | |||
w.call_method(resolved_method, get_dataset_value(part)) | |||
w.call_method(resolved_method, get_dataset_value(part, batch)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here you should flatten all the batches if there are any. Here the example of how I see it:
Suppose you have dataframe=[1,2,3,4]
(I mean 1 column, 4 rows) saved to csv file. You load its metadata without loading the value, let's say dt = DatasetMeta(dataset_type=DataFrameType(...), ...)
. And you call apply
with data=[dt]. If batch arg is not provided, what will happen is get_dataset_value
will load the actual dataframe and in the end res = [w.call_method(..., dataframe([1,2,3,4))]
. But if you provided batch=2
, dt. read_batch
should be called. If you iterate through it, you will get 2 parts of the dataframe, and in the end res=[w.call_method(..., dataframe([1,2])), w.call_method(..., dataframe([3,4]]))]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍🏻 Makes sense, I implemented something similar yesterday, but I couldn't sync the changes to this PR because the private fork no longer points to this repository.
Closing the PR because private forks can't sync to this repository anymore since it went public. I've pushed the changes to a separate PR (#216) after forking the public repository. |
Context
Some datasets are large and rather than dealing them with one big block, we could split the data into chunks. This PR adds batch-reading support for data formats which provides the
chunksize
parameter using the Pandas API.Supported formats:
Modifications
mlem/cli/apply.py
- Addedbatch
parameter when calling apply method - supports both importing data on/off-the-fly workflowsmlem/api/utils.py
- Added batch reading support when getting Dataset valuemlem/core/errors.py
- Added new errorsUnsupportedDatasetBatchLoadingType
andDatasetBatchLoadingJSONError
for batch-reading workflowsmlem/contrib/pandas.py
- Added batch-reading support for CSV, JSON data formatstests/contrib/test_pandas.py
- Added tests for supported batch-reading data formats, exception tests for unsupported batch-reading data formatsWhich issue(s) this PR fixes:
Fixes #23