diff --git a/python_modules/libraries/dagster-dask/dagster_dask/data_frame.py b/python_modules/libraries/dagster-dask/dagster_dask/data_frame.py index 32a43184a2a44..a68ef505690c9 100644 --- a/python_modules/libraries/dagster-dask/dagster_dask/data_frame.py +++ b/python_modules/libraries/dagster-dask/dagster_dask/data_frame.py @@ -10,15 +10,17 @@ EnumValue, EventMetadataEntry, Field, + Float, Int, Permissive, + Selector, + Shape, String, TypeCheck, check, dagster_type_loader, dagster_type_materializer, ) -from dagster.config.field_utils import Selector WriteCompressionTextOptions = Enum( 'WriteCompressionText', [EnumValue('gzip'), EnumValue('bz2'), EnumValue('xz'),], @@ -34,9 +36,9 @@ def dict_without_keys(ddict, *keys): @dagster_type_materializer( - Selector( - { - 'csv': Permissive( + Shape({ + 'to': { + 'csv': Field( { 'path': Field( Any, @@ -102,9 +104,10 @@ def dict_without_keys(ddict, *keys): is_required=False, description="Options to be passed in to the compute method", ), - } + }, + is_required=False, ), - 'parquet': Permissive( + 'parquet': Field( { 'path': Field( Any, @@ -184,9 +187,10 @@ def dict_without_keys(ddict, *keys): is_required=False, description="Options to be passed in to the compute method.", ), - } + }, + is_required=False, ), - 'hdf': Permissive( + 'hdf': Field( { 'path': Field( Any, @@ -218,9 +222,10 @@ def dict_without_keys(ddict, *keys): is_required=False, description="The scheduler to use, like 'threads' or 'processes'.", ), - } + }, + is_required=False, ), - 'json': Permissive( + 'json': Field( { 'path': Field( Any, @@ -265,8 +270,9 @@ def dict_without_keys(ddict, *keys): String, is_required=False, description="String like 'gzip' or 'xz'.", ), }, + is_required=False, ), - 'sql': Permissive( + 'sql': Field( { 'name': Field(String, is_required=True, description="Name of SQL table",), 'uri': Field( @@ -356,524 +362,613 @@ def dict_without_keys(ddict, *keys): """, ), }, + is_required=False, ), }, - ) + 'sample': Field( + Float, + is_required=False, + description='Sample a random fraction of items.', + ), + 'repartition': Field( + Selector( + { + 'npartitions': Field( + int, + description='Number of partitions of output.', + ), + 'partition_size': Field( + Any, + description='Max number of bytes of memory for each partition.', + ), + }, + ), + is_required=False, + description='Repartition dataframe along new divisions.', + ), + 'reset_index': Field( + Bool, + is_required=False, + description='Reset the index to the default index. If true, the index will be inserted into dataframe columns. Defaults to false.' + ), + }) ) def dataframe_materializer(_context, config, dask_df): check.inst_param(dask_df, 'dask_df', dd.DataFrame) - file_type, file_options = list(config.items())[0] - path = file_options.get('path') - if file_type == 'csv': - dask_df.to_csv(path, **dict_without_keys(file_options, 'path')) - elif file_type == 'parquet': - dask_df.to_parquet(path, **dict_without_keys(file_options, 'path')) - elif file_type == 'hdf': - dask_df.to_hdf(path, **dict_without_keys(file_options, 'path')) - elif file_type == 'json': - dask_df.to_json(path, **dict_without_keys(file_options, 'path')) - elif file_type == 'sql': - dask_df.to_sql(**file_options) - else: - check.failed('Unsupported file_type {file_type}'.format(file_type=file_type)) + if 'sample' in config: + value = config['sample'] + dask_df = dask_df.sample(frac=value) + + if 'repartition' in config: + value = config['repartition'] + dask_df = dask_df.repartition(**value) + + if 'reset_index' in config: + value = config['reset_index'] + dask_df = dask_df.reset_index(drop=value) + + for to_type, to_options in config['to'].items(): + path = to_options.get('path') - return AssetMaterialization.file(path) + if to_type == 'csv': + dask_df.to_csv(path, **dict_without_keys(to_options, 'path')) + elif to_type == 'parquet': + dask_df.to_parquet(path, **dict_without_keys(to_options, 'path')) + elif to_type == 'hdf': + dask_df.to_hdf(path, **dict_without_keys(to_options, 'path')) + elif to_type == 'json': + dask_df.to_json(path, **dict_without_keys(to_options, 'path')) + elif to_type == 'sql': + dask_df.to_sql(**to_options) + else: + check.failed('Unsupported to_type {to_type}'.format(to_type=to_type)) + + yield AssetMaterialization.file(path) @dagster_type_loader( - Selector( - { - 'csv': Permissive( - { - 'path': Field( - Any, - is_required=True, - description=""" - str or list, Absolute or relative filepath(s). - Prefix with a protocol like `s3://` to read from alternative filesystems. - To read from multiple files you can pass a globstring or a list of paths, - with the caveat that they must all have the same protocol. - """, - ), - 'blocksize': Field( - Any, - is_required=False, - description=""" - str or int or None, Number of bytes by which to cut up larger files. - Default value is computed based on available physical memory and the number of cores, up to a maximum of 64MB. - Can be a number like 64000000` or a string like ``'64MB'. If None, a single block is used for each file. - """, - ), - 'sample': Field( - Int, - is_required=False, - description="Number of bytes to use when determining dtypes.", - ), - 'assume_missing': Field( - Bool, - is_required=False, - description=""" - If True, all integer columns that aren’t specified in `dtype` are assumed to contain missing values, - and are converted to floats. Default is False. - """, - ), - 'storage_options': Field( - Permissive(), - is_required=False, - description=""" - Extra options that make sense for a particular storage connection, - e.g. host, port, username, password, etc. - """, - ), - 'include_path_column': Field( - Any, - is_required=False, - description=""" - bool or str, Whether or not to include the path to each particular file. - If True a new column is added to the dataframe called path. - If str, sets new column name. Default is False. - """, - ), - } - ), - 'parquet': Permissive( - { - 'path': Field( - Any, - is_required=True, - description=""" - str or list, Source directory for data, or path(s) to individual parquet files. - Prefix with a protocol like s3:// to read from alternative filesystems. - To read from multiple files you can pass a globstring or a list of paths, - with the caveat that they must all have the same protocol. - """, - ), - 'columns': Field( - Any, - is_required=False, - description=""" - str or list or None (default), Field name(s) to read in as columns in the output. - By default all non-index fields will be read (as determined by the pandas parquet metadata, if present). - Provide a single field name instead of a list to read in the data as a Series. - """, - ), - 'index': Field( - Any, - is_required=False, - description=""" - list or False or None (default), Field name(s) to use as the output frame index. - By default will be inferred from the pandas parquet file metadata (if present). - Use False to read all fields as columns. - """, - ), - 'categories': Field( - Any, - is_required=False, - description=""" - list or dict or None, For any fields listed here, - if the parquet encoding is Dictionary, the column will be created with dtype category. - Use only if it is guaranteed that the column is encoded as dictionary in all row-groups. - If a list, assumes up to 2**16-1 labels; if a dict, specify the number of labels expected; - if None, will load categories automatically for data written by dask/fastparquet, not otherwise. - """, - ), - 'storage_options': Field( - Permissive(), - is_required=False, - description="Key/value pairs to be passed on to the file-system backend, if any.", - ), - 'engine': Field( - EngineParquetOptions, - is_required=False, - description=""" - Parquet reader library to use. - If only one library is installed, it will use that one; - if both, it will use ‘fastparquet’. - """, - ), - 'gather_statistics': Field( - Bool, - is_required=False, - description=""" - default is None, Gather the statistics for each dataset partition. - By default, this will only be done if the _metadata file is available. - Otherwise, statistics will only be gathered if True, - because the footer of every file will be parsed (which is very slow on some systems). - """, - ), - 'split_row_groups:': Field( - Bool, - is_required=False, - description=""" - If True (default) then output dataframe partitions will correspond - to parquet-file row-groups (when enough row-group metadata is available). - Otherwise, partitions correspond to distinct files. - Only the “pyarrow” engine currently supports this argument. - """, - ), - 'chunksize': Field( - Any, - is_required=False, - description=""" - int or string, The target task partition size. - If set, consecutive row-groups from the same file will be aggregated - into the same output partition until the aggregate size reaches this value. - """, - ), - } - ), - 'hdf': Permissive( - { - 'path': Field( - Any, - is_required=True, - description=""" - str or pathlib.Path or list, - File pattern (string), pathlib.Path, buffer to read from, or list of file paths. - Can contain wildcards. - """, - ), - 'Key': Field( - Any, - is_required=True, - description="group identifier in the store. Can contain wildcards.", - ), - 'start': Field( - Int, - is_required=False, - description="defaults to 0, row number to start at.", - ), - 'stop': Field( - Int, - is_required=False, - description="defaults to None (the last row), row number to stop at.", - ), - 'columns': Field( - list, - is_required=False, - description="A list of columns that if not None, will limit the return columns (default is None).", - ), - 'chunksize': Field( - Any, - is_required=False, - description="Maximal number of rows per partition (default is 1000000).", - ), - 'sorted_index': Field( - Bool, - is_required=False, - description="Option to specify whether or not the input hdf files have a sorted index (default is False).", - ), - 'lock': Field( - Bool, - is_required=False, - description="Option to use a lock to prevent concurrency issues (default is True).", - ), - 'mode': Field( - String, - is_required=False, - description=""" - {‘a’, ‘r’, ‘r+’}, default ‘a’. Mode to use when opening file(s). - ‘r’ - Read-only; no data can be modified. - ‘a’ - Append; an existing file is opened for reading and writing, and if the file does not exist it is created. - ‘r+’ - It is similar to ‘a’, but the file must already exist. - """, - ), - } - ), - 'json': Permissive( - { - 'path': Field( - Any, - is_required=True, - description=""" - str or list, Location to read from. - If a string, can include a glob character to find a set of file names. - Supports protocol specifications such as 's3://'. - """, - ), - 'encoding': Field( - String, - is_required=False, - description="The text encoding to implement, e.g., “utf-8”.", - ), - 'errors': Field( - String, - is_required=False, - description="how to respond to errors in the conversion (see str.encode()).", - ), - 'orient': Field( - String, is_required=False, description="The JSON string format." - ), - 'storage_option': Field( - Permissive(), - is_required=False, - description="Passed to backend file-system implementation.", - ), - 'blocksize': Field( - Int, - is_required=False, - description=""" - default is None, If None, files are not blocked, and you get one partition per input file. - If int, which can only be used for line-delimited JSON files, - each partition will be approximately this size in bytes, to the nearest newline character. - """, - ), - 'sample': Field( - Int, - is_required=False, - description=""" - Number of bytes to pre-load, - to provide an empty dataframe structure to any blocks without data. - Only relevant is using blocksize. - """, - ), - 'compression': Field( - String, - is_required=False, - description="default is None, String like ‘gzip’ or ‘xz’.", - ), - } - ), - 'sql_table': Permissive( - { - 'table': Field( - Any, - is_required=True, - description="str or sqlalchemy expression, Select columns from here.", - ), - 'uri': Field( - String, - is_required=True, - description="Full sqlalchemy URI for the database connection.", - ), - 'index_col': Field( - String, - is_required=True, - description=""" - Column which becomes the index, and defines the partitioning. - Should be a indexed column in the SQL server, and any orderable type. - If the type is number or time, then partition boundaries can be inferred from npartitions or bytes_per_chunk; - otherwide must supply explicit divisions=. - index_col could be a function to return a value, e.g., sql.func.abs(sql.column('value')).label('abs(value)'). - index_col=sql.func.abs(sql.column("value")).label("abs(value)"), - or index_col=cast(sql.column("id"),types.BigInteger).label("id") to convert the textfield id to BigInteger. - Note sql, cast, types methods comes frome sqlalchemy module. - Labeling columns created by functions or arithmetic operations is required - """, - ), - 'divisions': Field( - Any, - is_required=False, - description=""" - sequence, Values of the index column to split the table by. - If given, this will override npartitions and bytes_per_chunk. - The divisions are the value boundaries of the index column used to define the partitions. - For example, divisions=list('acegikmoqsuwz') could be used - to partition a string column lexographically into 12 partitions, - with the implicit assumption that each partition contains similar numbers of records. - """, - ), - 'npartitions': Field( - Int, - is_required=False, - description=""" - Number of partitions, if divisions is not given. - Will split the values of the index column linearly between limits, if given, or the column max/min. - The index column must be numeric or time for this to work. - """, - ), - 'columns': Field( - Any, - is_required=False, - description=""" - list of strings or None, Which columns to select; - if None, gets all; can include sqlalchemy functions, - e.g., sql.func.abs(sql.column('value')).label('abs(value)'). - Labeling columns created by functions or arithmetic operations is recommended. - """, - ), - 'bytes_per_chunk': Field( - Any, - is_required=False, - description=""" - str or int, If both divisions and npartitions is None, - this is the target size of each partition, in bytes. - """, - ), - 'head_rows': Field( - Int, - is_required=False, - description="How many rows to load for inferring the data-types, unless passing meta.", - ), - 'schema': Field( - String, - is_required=False, - description=""" - If using a table name, pass this to sqlalchemy to select - which DB schema to use within the URI connection. - """, - ), - } - ), - 'table': Permissive( - { - 'path': Field( - Any, - is_required=True, - description=""" - str or list, Absolute or relative filepath(s). - Prefix with a protocol like 's3://' to read from alternative filesystems. - To read from multiple files you can pass a globstring or a list of paths, - with the caveat that they must all have the same protocol. - """, - ), - 'blocksize': Field( - Any, - is_required=False, - description=""" + Shape({ + 'read': Selector( + { + 'csv': Permissive( + { + 'path': Field( + Any, + is_required=True, + description=""" + str or list, Absolute or relative filepath(s). + Prefix with a protocol like `s3://` to read from alternative filesystems. + To read from multiple files you can pass a globstring or a list of paths, + with the caveat that they must all have the same protocol. + """, + ), + 'blocksize': Field( + Any, + is_required=False, + description=""" str or int or None, Number of bytes by which to cut up larger files. - Default value is computed based on available physical memory and the number of cores, - up to a maximum of 64MB. Can be a number like 64000000` or a string like ``'64MB'. - If None, a single block is used for each file. - """, - ), - 'sample': Field( - Int, - is_required=False, - description="Number of bytes to use when determining dtypes.", - ), - 'assume_missing': Field( - Bool, - is_required=False, - description=""" - If True, all integer columns that aren’t specified in dtype are assumed to contain missing values, - and are converted to floats. Default is False. - """, - ), - 'storage_options': Field( - Permissive(), - is_required=False, - description=""" - Extra options that make sense for a particular storage connection, - e.g. host, port, username, password, etc. - """, - ), - 'include_path_column': Field( - Any, - is_required=False, - description=""" - bool or str, Whether or not to include the path to each particular file. - If True a new column is added to the dataframe called path. - If str, sets new column name. Default is False. - """, - ), - } - ), - 'fwf': Permissive( + Default value is computed based on available physical memory and the number of cores, up to a maximum of 64MB. + Can be a number like 64000000` or a string like ``'64MB'. If None, a single block is used for each file. + """, + ), + 'sample': Field( + Int, + is_required=False, + description="Number of bytes to use when determining dtypes.", + ), + 'assume_missing': Field( + Bool, + is_required=False, + description=""" + If True, all integer columns that aren’t specified in `dtype` are assumed to contain missing values, + and are converted to floats. Default is False. + """, + ), + 'storage_options': Field( + Permissive(), + is_required=False, + description=""" + Extra options that make sense for a particular storage connection, + e.g. host, port, username, password, etc. + """, + ), + 'include_path_column': Field( + Any, + is_required=False, + description=""" + bool or str, Whether or not to include the path to each particular file. + If True a new column is added to the dataframe called path. + If str, sets new column name. Default is False. + """, + ), + } + ), + 'parquet': Permissive( + { + 'path': Field( + Any, + is_required=True, + description=""" + str or list, Source directory for data, or path(s) to individual parquet files. + Prefix with a protocol like s3:// to read from alternative filesystems. + To read from multiple files you can pass a globstring or a list of paths, + with the caveat that they must all have the same protocol. + """, + ), + 'columns': Field( + Any, + is_required=False, + description=""" + str or list or None (default), Field name(s) to read in as columns in the output. + By default all non-index fields will be read (as determined by the pandas parquet metadata, if present). + Provide a single field name instead of a list to read in the data as a Series. + """, + ), + 'index': Field( + Any, + is_required=False, + description=""" + list or False or None (default), Field name(s) to use as the output frame index. + By default will be inferred from the pandas parquet file metadata (if present). + Use False to read all fields as columns. + """, + ), + 'categories': Field( + Any, + is_required=False, + description=""" + list or dict or None, For any fields listed here, + if the parquet encoding is Dictionary, the column will be created with dtype category. + Use only if it is guaranteed that the column is encoded as dictionary in all row-groups. + If a list, assumes up to 2**16-1 labels; if a dict, specify the number of labels expected; + if None, will load categories automatically for data written by dask/fastparquet, not otherwise. + """, + ), + 'storage_options': Field( + Permissive(), + is_required=False, + description="Key/value pairs to be passed on to the file-system backend, if any.", + ), + 'engine': Field( + EngineParquetOptions, + is_required=False, + description=""" + Parquet reader library to use. + If only one library is installed, it will use that one; + if both, it will use ‘fastparquet’. + """, + ), + 'gather_statistics': Field( + Bool, + is_required=False, + description=""" + default is None, Gather the statistics for each dataset partition. + By default, this will only be done if the _metadata file is available. + Otherwise, statistics will only be gathered if True, + because the footer of every file will be parsed (which is very slow on some systems). + """, + ), + 'split_row_groups:': Field( + Bool, + is_required=False, + description=""" + If True (default) then output dataframe partitions will correspond + to parquet-file row-groups (when enough row-group metadata is available). + Otherwise, partitions correspond to distinct files. + Only the “pyarrow” engine currently supports this argument. + """, + ), + 'chunksize': Field( + Any, + is_required=False, + description=""" + int or string, The target task partition size. + If set, consecutive row-groups from the same file will be aggregated + into the same output partition until the aggregate size reaches this value. + """, + ), + } + ), + 'hdf': Permissive( + { + 'path': Field( + Any, + is_required=True, + description=""" + str or pathlib.Path or list, + File pattern (string), pathlib.Path, buffer to read from, or list of file paths. + Can contain wildcards. + """, + ), + 'Key': Field( + Any, + is_required=True, + description="group identifier in the store. Can contain wildcards.", + ), + 'start': Field( + Int, + is_required=False, + description="defaults to 0, row number to start at.", + ), + 'stop': Field( + Int, + is_required=False, + description="defaults to None (the last row), row number to stop at.", + ), + 'columns': Field( + list, + is_required=False, + description="A list of columns that if not None, will limit the return columns (default is None).", + ), + 'chunksize': Field( + Any, + is_required=False, + description="Maximal number of rows per partition (default is 1000000).", + ), + 'sorted_index': Field( + Bool, + is_required=False, + description="Option to specify whether or not the input hdf files have a sorted index (default is False).", + ), + 'lock': Field( + Bool, + is_required=False, + description="Option to use a lock to prevent concurrency issues (default is True).", + ), + 'mode': Field( + String, + is_required=False, + description=""" + {‘a’, ‘r’, ‘r+’}, default ‘a’. Mode to use when opening file(s). + ‘r’ - Read-only; no data can be modified. + ‘a’ - Append; an existing file is opened for reading and writing, and if the file does not exist it is created. + ‘r+’ - It is similar to ‘a’, but the file must already exist. + """, + ), + } + ), + 'json': Permissive( + { + 'path': Field( + Any, + is_required=True, + description=""" + str or list, Location to read from. + If a string, can include a glob character to find a set of file names. + Supports protocol specifications such as 's3://'. + """, + ), + 'encoding': Field( + String, + is_required=False, + description="The text encoding to implement, e.g., “utf-8”.", + ), + 'errors': Field( + String, + is_required=False, + description="how to respond to errors in the conversion (see str.encode()).", + ), + 'storage_option': Field( + Permissive(), + is_required=False, + description="Passed to backend file-system implementation.", + ), + 'blocksize': Field( + Int, + is_required=False, + description=""" + default is None, If None, files are not blocked, and you get one partition per input file. + If int, which can only be used for line-delimited JSON files, + each partition will be approximately this size in bytes, to the nearest newline character. + """, + ), + 'sample': Field( + Int, + is_required=False, + description=""" + Number of bytes to pre-load, + to provide an empty dataframe structure to any blocks without data. + Only relevant is using blocksize. + """, + ), + 'compression': Field( + String, + is_required=False, + description="default is None, String like ‘gzip’ or ‘xz’.", + ), + } + ), + 'sql_table': Permissive( + { + 'table': Field( + Any, + is_required=True, + description="str or sqlalchemy expression, Select columns from here.", + ), + 'uri': Field( + String, + is_required=True, + description="Full sqlalchemy URI for the database connection.", + ), + 'index_col': Field( + String, + is_required=True, + description=""" + Column which becomes the index, and defines the partitioning. + Should be a indexed column in the SQL server, and any orderable type. + If the type is number or time, then partition boundaries can be inferred from npartitions or bytes_per_chunk; + otherwide must supply explicit divisions=. + index_col could be a function to return a value, e.g., sql.func.abs(sql.column('value')).label('abs(value)'). + index_col=sql.func.abs(sql.column("value")).label("abs(value)"), + or index_col=cast(sql.column("id"),types.BigInteger).label("id") to convert the textfield id to BigInteger. + Note sql, cast, types methods comes frome sqlalchemy module. + Labeling columns created by functions or arithmetic operations is required + """, + ), + 'divisions': Field( + Any, + is_required=False, + description=""" + sequence, Values of the index column to split the table by. + If given, this will override npartitions and bytes_per_chunk. + The divisions are the value boundaries of the index column used to define the partitions. + For example, divisions=list('acegikmoqsuwz') could be used + to partition a string column lexographically into 12 partitions, + with the implicit assumption that each partition contains similar numbers of records. + """, + ), + 'npartitions': Field( + Int, + is_required=False, + description=""" + Number of partitions, if divisions is not given. + Will split the values of the index column linearly between limits, if given, or the column max/min. + The index column must be numeric or time for this to work. + """, + ), + 'columns': Field( + Any, + is_required=False, + description=""" + list of strings or None, Which columns to select; + if None, gets all; can include sqlalchemy functions, + e.g., sql.func.abs(sql.column('value')).label('abs(value)'). + Labeling columns created by functions or arithmetic operations is recommended. + """, + ), + 'bytes_per_chunk': Field( + Any, + is_required=False, + description=""" + str or int, If both divisions and npartitions is None, + this is the target size of each partition, in bytes. + """, + ), + 'head_rows': Field( + Int, + is_required=False, + description="How many rows to load for inferring the data-types, unless passing meta.", + ), + 'schema': Field( + String, + is_required=False, + description=""" + If using a table name, pass this to sqlalchemy to select + which DB schema to use within the URI connection. + """, + ), + } + ), + 'table': Permissive( + { + 'path': Field( + Any, + is_required=True, + description=""" + str or list, Absolute or relative filepath(s). + Prefix with a protocol like 's3://' to read from alternative filesystems. + To read from multiple files you can pass a globstring or a list of paths, + with the caveat that they must all have the same protocol. + """, + ), + 'blocksize': Field( + Any, + is_required=False, + description=""" + str or int or None, Number of bytes by which to cut up larger files. + Default value is computed based on available physical memory and the number of cores, + up to a maximum of 64MB. Can be a number like 64000000` or a string like ``'64MB'. + If None, a single block is used for each file. + """, + ), + 'sample': Field( + Int, + is_required=False, + description="Number of bytes to use when determining dtypes.", + ), + 'assume_missing': Field( + Bool, + is_required=False, + description=""" + If True, all integer columns that aren’t specified in dtype are assumed to contain missing values, + and are converted to floats. Default is False. + """, + ), + 'storage_options': Field( + Permissive(), + is_required=False, + description=""" + Extra options that make sense for a particular storage connection, + e.g. host, port, username, password, etc. + """, + ), + 'include_path_column': Field( + Any, + is_required=False, + description=""" + bool or str, Whether or not to include the path to each particular file. + If True a new column is added to the dataframe called path. + If str, sets new column name. Default is False. + """, + ), + } + ), + 'fwf': Permissive( + { + 'path': Field( + Any, + is_required=True, + description=""" + str or list, Absolute or relative filepath(s). + Prefix with a protocol like 's3://' to read from alternative filesystems. + To read from multiple files you can pass a globstring or a list of paths, + with the caveat that they must all have the same protocol. + """, + ), + 'blocksize': Field( + Any, + is_required=False, + description=""" + str or int or None, Number of bytes by which to cut up larger files. + Default value is computed based on available physical memory + and the number of cores up to a maximum of 64MB. + Can be a number like 64000000` or a string like ``'64MB'. + If None, a single block is used for each file. + """, + ), + 'sample': Field( + Int, + is_required=False, + description="Number of bytes to use when determining dtypes.", + ), + 'assume_missing': Field( + Bool, + is_required=False, + description=""" + If True, all integer columns that aren’t specified in dtype are assumed + to contain missing values, and are converted to floats. + Default is False. + """, + ), + 'storage_options': Field( + Permissive(), + is_required=False, + description=""" + Extra options that make sense for a particular storage connection, + e.g. host, port, username, password, etc. + """, + ), + 'include_path_column': Field( + Any, + is_required=False, + description=""" + bool or str, Whether or not to include the path to each particular file. + If True a new column is added to the dataframe called path. + If str, sets new column name. Default is False. + """, + ), + } + ), + 'orc': Permissive( + { + 'path': Field( + Any, + is_required=True, + description=""" + str or list, Location of file(s), + which can be a full URL with protocol specifier, + and may include glob character if a single string. + """, + ), + 'columns': Field( + list, is_required=False, description="Columns to load. If None, loads all.", + ), + 'storage_options': Field( + Permissive(), + is_required=False, + description="Further parameters to pass to the bytes backend.", + ), + } + ), + }, + ), + 'sample': Field( + Float, + is_required=False, + description='Sample a random fraction of items.', + ), + 'repartition': Field( + Selector( { - 'path': Field( - Any, - is_required=True, - description=""" - str or list, Absolute or relative filepath(s). - Prefix with a protocol like 's3://' to read from alternative filesystems. - To read from multiple files you can pass a globstring or a list of paths, - with the caveat that they must all have the same protocol. - """, - ), - 'blocksize': Field( - Any, - is_required=False, - description=""" - str or int or None, Number of bytes by which to cut up larger files. - Default value is computed based on available physical memory - and the number of cores up to a maximum of 64MB. - Can be a number like 64000000` or a string like ``'64MB'. - If None, a single block is used for each file. - """, - ), - 'sample': Field( - Int, - is_required=False, - description="Number of bytes to use when determining dtypes.", - ), - 'assume_missing': Field( - Bool, - is_required=False, - description=""" - If True, all integer columns that aren’t specified in dtype are assumed - to contain missing values, and are converted to floats. - Default is False. - """, - ), - 'storage_options': Field( - Permissive(), - is_required=False, - description=""" - Extra options that make sense for a particular storage connection, - e.g. host, port, username, password, etc. - """, - ), - 'include_path_column': Field( - Any, - is_required=False, - description=""" - bool or str, Whether or not to include the path to each particular file. - If True a new column is added to the dataframe called path. - If str, sets new column name. Default is False. - """, + 'npartitions': Field( + int, + description='Number of partitions of output.', ), - } - ), - 'orc': Permissive( - { - 'path': Field( + 'partition_size': Field( Any, - is_required=True, - description=""" - str or list, Location of file(s), - which can be a full URL with protocol specifier, - and may include glob character if a single string. - """, - ), - 'columns': Field( - list, is_required=False, description="Columns to load. If None, loads all.", - ), - 'storage_options': Field( - Permissive(), - is_required=False, - description="Further parameters to pass to the bytes backend.", + description='Max number of bytes of memory for each partition.', ), - } + }, ), - }, - ) + is_required=False, + description='Repartition dataframe along new divisions.', + ), + 'lower_cols': Field( + Bool, + is_required=False, + description='Lowercase column names.', + ), + 'reset_index': Field( + Bool, + is_required=False, + description='Reset the index to the default index. If true, the index will be inserted into dataframe columns. Defaults to false.' + ) + }) ) def dataframe_loader(_context, config): - file_type, file_options = list(config.items())[0] - path = file_options.get('path') + read_type, read_options = next(iter(config['read'].items())) + path = read_options.get('path') - if file_type == 'csv': - return dd.read_csv(path, **dict_without_keys(file_options, 'path')) - elif file_type == 'parquet': - return dd.read_parquet(path, **dict_without_keys(file_options, 'path')) - elif file_type == 'hdf': - return dd.read_hdf(path, **dict_without_keys(file_options, 'path')) - elif file_type == 'json': - return dd.read_json(path, **dict_without_keys(file_options, 'path')) - elif file_type == 'sql_table': - return dd.read_sql_table(**file_options) - elif file_type == 'table': - return dd.read_table(path, **dict_without_keys(file_options, 'path')) - elif file_type == 'fwf': - return dd.read_fwf(path, **dict_without_keys(file_options, 'path')) - elif file_type == 'orc': - return dd.read_orc(path, **dict_without_keys(file_options, 'path')) + if read_type == 'csv': + df = dd.read_csv(path, **dict_without_keys(read_options, 'path')) + elif read_type == 'parquet': + df = dd.read_parquet(path, **dict_without_keys(read_options, 'path')) + elif read_type == 'hdf': + df = dd.read_hdf(path, **dict_without_keys(read_options, 'path')) + elif read_type == 'json': + df = dd.read_json(path, **dict_without_keys(read_options, 'path')) + elif read_type == 'sql_table': + df = dd.read_sql_table(**read_options) + elif read_type == 'table': + df = dd.read_table(path, **dict_without_keys(read_options, 'path')) + elif read_type == 'fwf': + df = dd.read_fwf(path, **dict_without_keys(read_options, 'path')) + elif read_type == 'orc': + df = dd.read_orc(path, **dict_without_keys(read_options, 'path')) else: raise DagsterInvariantViolationError( - 'Unsupported file_type {file_type}'.format(file_type=file_type) + 'Unsupported read_type {read_type}'.format(read_type=read_type) ) + + if 'sample' in config: + value = config['sample'] + df = df.sample(frac=value) + + if 'repartition' in config: + value = config['repartition'] + df = df.repartition(**value) + + if 'lower_cols' in config: + value = config['lower_cols'] + if value: + df.columns = map(str.lower, df.columns) + + if 'reset_index' in config: + value = config['reset_index'] + df = df.reset_index(drop=value) + + return df def df_type_check(_, value):