Skip to content

Commit

Permalink
fix types
Browse files Browse the repository at this point in the history
  • Loading branch information
LeonLuttenberger committed Dec 18, 2023
1 parent 4783a6a commit 77984ac
Show file tree
Hide file tree
Showing 8 changed files with 21 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def _read_stream( # type: ignore[override] # pylint: disable=arguments-differ
def _write_block( # type: ignore[override] # pylint: disable=arguments-differ
self,
f: pa.NativeFile,
block: BlockAccessor,
block: BlockAccessor, # type: ignore[type-arg]
**writer_args: Any,
) -> None:
write_options_dict = writer_args.get("write_options", {})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def _open_input_source(
def _write_block( # type: ignore[override]
self,
f: pa.NativeFile,
block: BlockAccessor,
block: BlockAccessor, # type: ignore[type-arg]
pandas_kwargs: Optional[Dict[str, Any]],
**writer_args: Any,
) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def _open_input_source(
def _write_block( # type: ignore[override]
self,
f: pa.NativeFile,
block: BlockAccessor,
block: BlockAccessor, # type: ignore[type-arg]
**writer_args: Any,
) -> None:
schema: Optional[pa.schema] = writer_args.get("schema", None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,14 @@ class ArrowParquetDatasource(ArrowParquetBaseDatasource): # pylint: disable=abs
relative to the root S3 prefix.
"""

def create_reader(self, **kwargs: Dict[str, Any]) -> Reader:
def create_reader(self, **kwargs: Dict[str, Any]) -> Reader: # type: ignore[type-arg]
"""Return a Reader for the given read arguments."""
return _ArrowParquetDatasourceReader(**kwargs) # type: ignore[arg-type]

def _write_block( # type: ignore[override] # pylint: disable=arguments-differ, arguments-renamed, unused-argument
self,
f: "pyarrow.NativeFile",
block: BlockAccessor,
block: BlockAccessor, # type: ignore[type-arg]
pandas_kwargs: Optional[Dict[str, Any]],
**writer_args: Any,
) -> None:
Expand Down Expand Up @@ -119,7 +119,7 @@ def _get_file_suffix(self, file_format: str, compression: Optional[str]) -> str:
# raw pyarrow file fragment causes S3 network calls.
class _SerializedPiece:
def __init__(self, frag: ParquetFileFragment):
self._data = cloudpickle.dumps( # type: ignore[attr-defined]
self._data = cloudpickle.dumps( # type: ignore[attr-defined,no-untyped-call]
(frag.format, frag.path, frag.filesystem, frag.partition_expression)
)

Expand Down Expand Up @@ -185,7 +185,7 @@ def _deserialize_pieces_with_retry(
raise final_exception # type: ignore[misc]


class _ArrowParquetDatasourceReader(Reader): # pylint: disable=too-many-instance-attributes
class _ArrowParquetDatasourceReader(Reader): # type: ignore[type-arg] # pylint: disable=too-many-instance-attributes
def __init__(
self,
paths: Union[str, List[str]],
Expand All @@ -194,7 +194,7 @@ def __init__(
columns: Optional[List[str]] = None,
schema: Optional[Schema] = None,
meta_provider: ParquetMetadataProvider = DefaultParquetMetadataProvider(),
_block_udf: Optional[Callable[[Block], Block]] = None,
_block_udf: Optional[Callable[[Block], Block]] = None, # type: ignore[type-arg]
**reader_args: Any,
):
import pyarrow as pa
Expand Down Expand Up @@ -225,7 +225,7 @@ def __init__(
# Try to infer dataset schema by passing dummy table through UDF.
dummy_table = schema.empty_table()
try:
inferred_schema = _block_udf(dummy_table).schema
inferred_schema = _block_udf(dummy_table).schema # type: ignore[union-attr]
inferred_schema = inferred_schema.with_metadata(schema.metadata)
except Exception: # pylint: disable=broad-except
_logger.debug(
Expand Down Expand Up @@ -361,7 +361,7 @@ def _estimate_files_encoding_ratio(self) -> float:
# 1. Use _add_table_partitions to add partition columns. The behavior is controlled by Pandas SDK
# native `dataset` parameter. The partitions are loaded relative to the `path_root` prefix.
def _read_pieces(
block_udf: Optional[Callable[[Block], Block]],
block_udf: Optional[Callable[[Block], Block]], # type: ignore[type-arg]
reader_args: Any,
columns: Optional[List[str]],
schema: Optional[Union[type, "pyarrow.lib.Schema"]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def _get_write_path_for_block(
*,
filesystem: Optional["pyarrow.fs.FileSystem"] = None,
dataset_uuid: Optional[str] = None,
block: Optional[Block] = None,
block: Optional[Block] = None, # type: ignore[type-arg]
block_index: Optional[int] = None,
file_format: Optional[str] = None,
) -> str:
Expand Down Expand Up @@ -64,7 +64,7 @@ def __init__(self) -> None:
def _read_file(self, f: pyarrow.NativeFile, path: str, **reader_args: Any) -> pd.DataFrame:
raise NotImplementedError()

def do_write( # pylint: disable=arguments-differ
def do_write( # type: ignore[override] # pylint: disable=arguments-differ
self,
blocks: List[ObjectRef[pd.DataFrame]],
metadata: List[BlockMetadata],
Expand Down Expand Up @@ -141,9 +141,9 @@ def write_block(write_path: str, block: pd.DataFrame) -> str:

return write_tasks

def write( # type: ignore[override]
def write(
self,
blocks: Iterable[Union[Block, ObjectRef[pd.DataFrame]]],
blocks: Iterable[Union[Block, ObjectRef[pd.DataFrame]]], # type: ignore[type-arg]
ctx: TaskContext,
path: str,
dataset_uuid: str,
Expand Down Expand Up @@ -188,7 +188,7 @@ def write_block(write_path: str, block: pd.DataFrame) -> str:

file_suffix = self._get_file_suffix(self._FILE_EXTENSION, compression)

builder = DelegatingBlockBuilder() # type: ignore[no-untyped-call]
builder = DelegatingBlockBuilder() # type: ignore[no-untyped-call,var-annotated]
for block in blocks:
# Dereference the block if ObjectRef is passed
builder.add_block(ray_get(block) if isinstance(block, ray.ObjectRef) else block) # type: ignore[arg-type]
Expand All @@ -198,7 +198,7 @@ def write_block(write_path: str, block: pd.DataFrame) -> str:
path,
filesystem=filesystem,
dataset_uuid=dataset_uuid,
block=block,
block=block, # type: ignore[arg-type]
block_index=ctx.task_idx,
file_format=file_suffix,
)
Expand All @@ -211,7 +211,7 @@ def _get_file_suffix(self, file_format: str, compression: Optional[str]) -> str:
def _write_block(
self,
f: "pyarrow.NativeFile",
block: BlockAccessor,
block: BlockAccessor, # type: ignore[type-arg]
writer_args_fn: Callable[[], Dict[str, Any]] = lambda: {},
**writer_args: Any,
) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def _read_file(self, f: pyarrow.NativeFile, path: str, **reader_args: Any) -> pd
def _write_block( # type: ignore[override] # pylint: disable=arguments-differ, arguments-renamed
self,
f: io.TextIOWrapper,
block: BlockAccessor,
block: BlockAccessor, # type: ignore[type-arg]
pandas_kwargs: Optional[Dict[str, Any]],
**writer_args: Any,
) -> None:
Expand Down
2 changes: 1 addition & 1 deletion awswrangler/distributed/ray/modin/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def _validate_partition_shape(df: pd.DataFrame) -> bool:
"""
# Unwrap partitions as they are currently stored (axis=None)
partitions_shape = np.array(unwrap_partitions(df)).shape
return partitions_shape[1] == 1
return partitions_shape[1] == 1 # type: ignore[no-any-return]


FunctionType = TypeVar("FunctionType", bound=Callable[..., Any])
Expand Down
4 changes: 2 additions & 2 deletions awswrangler/distributed/ray/modin/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def _block_to_df(
return _table_to_df(table=block._table, kwargs=to_pandas_kwargs) # pylint: disable=protected-access


def _ray_dataset_from_df(df: Union[pd.DataFrame, modin_pd.DataFrame]) -> Dataset:
def _ray_dataset_from_df(df: Union[pd.DataFrame, modin_pd.DataFrame]) -> Dataset: # type: ignore[type-arg]
"""Create Ray dataset from supported types of data frames."""
if isinstance(df, modin_pd.DataFrame):
return from_modin(df) # type: ignore[no-any-return]
Expand All @@ -39,7 +39,7 @@ def _ray_dataset_from_df(df: Union[pd.DataFrame, modin_pd.DataFrame]) -> Dataset


def _to_modin(
dataset: Dataset,
dataset: Dataset, # type: ignore[type-arg]
to_pandas_kwargs: Optional[Dict[str, Any]] = None,
ignore_index: Optional[bool] = True,
) -> modin_pd.DataFrame:
Expand Down

0 comments on commit 77984ac

Please sign in to comment.