From 0f745f1a12baa4dbb2494be4395c9aadaa3f58a4 Mon Sep 17 00:00:00 2001 From: Edmondo Porcu Date: Sat, 20 Aug 2022 19:36:38 -0700 Subject: [PATCH] Dat1 --- dat/main.py | 58 +++++++++++++++++++++++++++++++++++++++- dat/model/table.py | 15 ++++++----- dat/table_definitions.py | 30 +++++++++++++++------ poetry.lock | 19 +++++++++++-- pyproject.toml | 1 + 5 files changed, 106 insertions(+), 17 deletions(-) diff --git a/dat/main.py b/dat/main.py index ab29efc..26fda22 100644 --- a/dat/main.py +++ b/dat/main.py @@ -1,6 +1,62 @@ +import click +from pyspark.sql import SparkSession +from dat import table_definitions +from dat.writers import spark_writer +import logging -def write_reference_tables(table_names): +logging.basicConfig( + level=logging.INFO +) + +@click.command() +@click.option( + '--table-names', + default='all', + help='The reference table names to create. Can be a comma separated list or all' +) +@click.option( + '--output-path', + default='./out', + help='The base folder where the tables should be written' +) +def write_reference_tables(table_names, output_path): + logging.info( + 'Writing table {table_names} to {output_path}'.format( + table_names=table_names, + output_path=output_path + ) + ) + reference_tables = table_definitions.get_tables( + table_names + ) + spark = _create_spark_session() + write_plan_builder = spark_writer.WritePlanBuilder( + spark=spark + ) + write_plans = map( + lambda table: write_plan_builder.build_write_plan(table), + reference_tables + ) + for write_plan in write_plans: + logging.info( + 'Writing {table_name}'.format( + table_name=write_plan.table.table_name + ) + ) + + +def _create_spark_session(): + builder = SparkSession.builder.appName( + "MyApp" + ).config( + "spark.sql.extensions", + "io.delta.sql.DeltaSparkSessionExtension" + ).config( + "spark.sql.catalog.spark_catalog", + "org.apache.spark.sql.delta.catalog.DeltaCatalog", + ) + return builder.getOrCreate() if __file__ == '__main__': diff --git a/dat/model/table.py b/dat/model/table.py index b6620cc..ad6ec5c 100644 --- a/dat/model/table.py +++ b/dat/model/table.py @@ -2,6 +2,7 @@ from pydantic import BaseModel, validator from dat.model.row_collections import RowCollection +_wrong_column_name_message = 'Data {data} do not have the correct number of columns {columns}' # noqa: E501' class ReferenceTable(BaseModel): table_name: str @@ -14,13 +15,15 @@ class ReferenceTable(BaseModel): def data_shape_coherent_with_column_names(cls, row_collections, values): if 'column_names' in values: columns = values['column_names'] - for index, data_entry in enumerate(row_collections): - if len(data_entry.data) != len(columns): - raise ValueError( - 'Data at index {index} do not have the correct number of columns {columns}'.format( # noqa: E501 - index=index, columns=columns + for row_collection in row_collections: + for record in row_collection: + if len(record) != len(columns): + raise ValueError( + _wrong_column_name_message.format( + data=record, + columns=columns + ) ) - ) return row_collections diff --git a/dat/table_definitions.py b/dat/table_definitions.py index 0cc750c..dd85ce7 100644 --- a/dat/table_definitions.py +++ b/dat/table_definitions.py @@ -1,15 +1,13 @@ -from abc import abstractmethod -from typing import Optional, List, Tuple -import pyspark -from dat.model.table import StaticReferenceTable +from typing import List +from dat.model.table import ReferenceTable from dat.model.row_collections import RowCollection -reference_table_1 = StaticReferenceTable( +reference_table_1 = ReferenceTable( table_name='reference_table_1', table_description='My first table', column_names=['letter', 'number', 'a_float'], partition_keys=['letter'], - data=[ + row_collections=[ RowCollection( write_mode='overwrite', data=[ @@ -29,12 +27,12 @@ ) -reference_table_2 = StaticReferenceTable( +reference_table_2 = ReferenceTable( table_name='reference_table_2', table_description='My first table', column_names=['letter', 'number', 'a_float'], partition_keys=['letter'], - data=[ + row_collections=[ RowCollection( write_mode='overwrite', data=[ @@ -52,3 +50,19 @@ ), ] ) + +_all_tables = [ + reference_table_1, + reference_table_2 +] + +def get_tables(filter:str) -> List[ReferenceTable]: + if filter.lower() == 'all': + return _all_tables + names = map(lambda x: x.lower(), filter.split(',')) + results = [] + for table in _all_tables: + if table.table_name in names: + results.append(table) + return results + \ No newline at end of file diff --git a/poetry.lock b/poetry.lock index 444cdad..0b4d40d 100644 --- a/poetry.lock +++ b/poetry.lock @@ -51,11 +51,22 @@ test = ["coverage (>=4.5.4)", "fixtures (>=3.0.0)", "flake8 (>=4.0.0)", "stestr toml = ["toml"] yaml = ["pyyaml"] +[[package]] +name = "click" +version = "8.1.3" +description = "Composable command line interface toolkit" +category = "main" +optional = false +python-versions = ">=3.7" + +[package.dependencies] +colorama = {version = "*", markers = "platform_system == \"Windows\""} + [[package]] name = "colorama" version = "0.4.5" description = "Cross-platform colored terminal text." -category = "dev" +category = "main" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" @@ -342,7 +353,7 @@ python-versions = ">=3.7" [metadata] lock-version = "1.1" python-versions = "^3.9" -content-hash = "163826532243cb764e69f9451f207f889b82fdc7a4abbac7c9f836ecc9d265fe" +content-hash = "b99c2c95e5e596503f7dba2928de63231c2a201c9e26621551b9d2462420cddd" [metadata.files] atomicwrites = [] @@ -352,6 +363,10 @@ bandit = [ {file = "bandit-1.7.4-py3-none-any.whl", hash = "sha256:412d3f259dab4077d0e7f0c11f50f650cc7d10db905d98f6520a95a18049658a"}, {file = "bandit-1.7.4.tar.gz", hash = "sha256:2d63a8c573417bae338962d4b9b06fbc6080f74ecd955a092849e1e65c717bd2"}, ] +click = [ + {file = "click-8.1.3-py3-none-any.whl", hash = "sha256:bb4d8133cb15a609f44e8213d9b391b0809795062913b383c62be0ee95b1db48"}, + {file = "click-8.1.3.tar.gz", hash = "sha256:7682dc8afb30297001674575ea00d1814d808d6a36af415a82bd481d37ba7b8e"}, +] colorama = [] coverage = [] flake8 = [] diff --git a/pyproject.toml b/pyproject.toml index d1f6a64..c4c4930 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,6 +8,7 @@ authors = ["Your Name "] python = "^3.9" pydantic = "^1.9.2" pyspark = "^3.3.0" +click = "^8.1.3" [tool.poetry.dev-dependencies] flake8 = "^5.0.4"