From 8b135a32d8824f399c924dfdc5aa5a16ad73da02 Mon Sep 17 00:00:00 2001 From: davidu1975 Date: Sun, 8 Dec 2024 20:04:38 +0000 Subject: [PATCH 01/16] wip --- conf/env.py | 2 + conf/settings.py | 3 + .../management/commands/import_dbt_sectors.py | 37 +-- ...import_eyb_business_cluster_information.py | 267 +++++++++++++++--- 4 files changed, 254 insertions(+), 55 deletions(-) diff --git a/conf/env.py b/conf/env.py index 90b9ec02..ebf20349 100644 --- a/conf/env.py +++ b/conf/env.py @@ -218,6 +218,8 @@ class BaseSettings(PydanticBaseSettings): eyb_salary_s3_prefix: str = '' eyb_rent_s3_prefix: str = '' postcode_from_s3_prefix: str = '' + nomis_uk_business_employee_counts_from_s3_prefix: str = '' + sic_codes_dit_sector_mapping: str = '' class CIEnvironment(BaseSettings): diff --git a/conf/settings.py b/conf/settings.py index f3419241..c9cb2783 100644 --- a/conf/settings.py +++ b/conf/settings.py @@ -613,3 +613,6 @@ EYB_SALARY_S3_PREFIX = env.eyb_salary_s3_prefix EYB_RENT_S3_PREFIX = env.eyb_rent_s3_prefix POSTCODE_FROM_S3_PREFIX = env.postcode_from_s3_prefix +NOMIS_UK_BUSINESS_EMPLOYEE_COUNTS_FROM_S3_PREFIX = env.nomis_uk_business_employee_counts_from_s3_prefix +SIC_CODES_DIT_SECTOR_MAPPING = env.sic_codes_dit_sector_mapping + diff --git a/dataservices/management/commands/import_dbt_sectors.py b/dataservices/management/commands/import_dbt_sectors.py index e82cc0eb..9eafc4e3 100644 --- a/dataservices/management/commands/import_dbt_sectors.py +++ b/dataservices/management/commands/import_dbt_sectors.py @@ -10,26 +10,29 @@ def get_dbtsector_table_batch(data, data_table): - table_data = ( - ( - data_table, - ( - json.loads(dbt_sector)['id'], - json.loads(dbt_sector)['field_01'], - json.loads(dbt_sector)['full_sector_name'], - json.loads(dbt_sector)['sector_cluster__april_2023'], - json.loads(dbt_sector)['field_04'], - json.loads(dbt_sector)['field_05'], - json.loads(dbt_sector)['field_02'], - ), - ) - for dbt_sector in data - ) - + def get_table_data(): + + for dbt_sector in data: + json_data = json.loads(dbt_sector) + yield ( + ( + data_table, + ( + json_data['id'], + json_data['field_01'], + json_data['full_sector_name'], + json_data['sector_cluster__april_2023'], + json_data['field_04'], + json_data['field_05'], + json_data['field_02'], + ), + ) + ) + return ( None, None, - table_data, + get_table_data(), ) diff --git a/dataservices/management/commands/import_eyb_business_cluster_information.py b/dataservices/management/commands/import_eyb_business_cluster_information.py index 89b8fcdc..981a6e46 100644 --- a/dataservices/management/commands/import_eyb_business_cluster_information.py +++ b/dataservices/management/commands/import_eyb_business_cluster_information.py @@ -1,15 +1,142 @@ +import json import pandas as pd import sqlalchemy as sa +from sqlalchemy.ext.declarative import declarative_base +from django.conf import settings +from django.core.management.base import BaseCommand -from dataservices.models import EYBBusinessClusterInformation +from dataservices.core.mixins import S3DownloadMixin +from dataservices.management.commands.helpers import ingest_data -from .helpers import BaseDataWorkspaceIngestionCommand +def get_uk_business_employee_counts_batch(data, data_table): + + table_data = ( + ( + data_table, + ( + json.loads(uk_business_employee_count)['geo_description'], + json.loads(uk_business_employee_count)['geo_code'], + json.loads(uk_business_employee_count)['sic_code'], + json.loads(uk_business_employee_count)['sic_description'], + json.loads(uk_business_employee_count)['total_business_count'], + json.loads(uk_business_employee_count)['business_count_release_year'], + # missing employee data represented as np.nan which results in error saving django model + # columns are int in dataframe so cannot store None resulting in below conditional assignment + json.loads(uk_business_employee_count)['total_employee_count'] if json.loads(uk_business_employee_count)['total_employee_count'] and json.loads(uk_business_employee_count)['total_employee_count'] > 0 else None, + json.loads(uk_business_employee_count)['employee_count_release_year'] if json.loads(uk_business_employee_count)['employee_count_release_year'] and json.loads(uk_business_employee_count)['employee_count_release_year'] > 0 else None, + ), + ) + for uk_business_employee_count in data + ) -class Command(BaseDataWorkspaceIngestionCommand): - help = 'Import ONS total UK business and employee counts per region and section, 2 and 5 digit Standard Industrial Classification' # noqa:E501 + return ( + None, + None, + table_data, + ) + + +def get_uk_business_employee_counts_postgres_table(metadata, table_name): + return sa.Table( + table_name, + metadata, + sa.Column("geo_description", sa.TEXT, nullable=False), + sa.Column("geo_code", sa.TEXT, nullable=False), + sa.Column("sic_code", sa.TEXT, nullable=False), + sa.Column("sic_description", sa.TEXT, nullable=False), + sa.Column("total_business_count", sa.INTEGER, nullable=True), + sa.Column("business_count_release_year", sa.SMALLINT, nullable=True), + sa.Column("total_employee_count", sa.INTEGER, nullable=True), + sa.Column("employee_count_release_year", sa.SMALLINT, nullable=True), + sa.Index(None, "sic_code"), + schema="public", + ) + + +def get_sic_codes_dit_sector_mapping_batch(data, data_table): + + table_data = ( + ( + data_table, + ( + json.loads(sic_codes_dit_sector_mapping)['dbt_full_sector_name'], + json.loads(sic_codes_dit_sector_mapping)['dbt_sector_name'], + json.loads(sic_codes_dit_sector_mapping)['sic_code'], + ), + ) + for sic_codes_dit_sector_mapping in data + ) + + return ( + None, + None, + table_data, + ) + + +def get_sic_codes_dit_sector_mapping_postgres_table(metadata, table_name): + return sa.Table( + table_name, + metadata, + sa.Column("dbt_full_sector_name", sa.TEXT, nullable=True), + sa.Column("dbt_sector_name", sa.TEXT, nullable=True), + sa.Column("sic_code", sa.TEXT, nullable=False), + sa.Index(None, "sic_code"), + schema="public", + ) + + +def save_uk_business_employee_counts_data(data): + + table_name = 'dataservices_tmp_eybbusinessclusterinformation' + + engine = sa.create_engine(settings.DATABASE_URL, future=True) + + metadata = sa.MetaData() + + data_table = get_uk_business_employee_counts_postgres_table(metadata, table_name) + + def on_before_visible(conn, ingest_table, batch_metadata): + pass + + def batches(_): + yield get_uk_business_employee_counts_batch(data, data_table) + + ingest_data(engine, metadata, on_before_visible, batches) + + +def save_sic_codes_dit_sector_mapping_data(data): + + table_name = 'dataservices_tmp_sic_codes_dit_sector_mapping' + + engine = sa.create_engine(settings.DATABASE_URL, future=True) + + metadata = sa.MetaData() + + data_table = get_sic_codes_dit_sector_mapping_postgres_table(metadata, table_name) + + def on_before_visible(conn, ingest_table, batch_metadata): + pass - sql = ''' + def batches(_): + yield get_sic_codes_dit_sector_mapping_batch(data, data_table) + + ingest_data(engine, metadata, on_before_visible, batches) + + +def delete_temp_table(table_name): + Base = declarative_base() + metadata = sa.MetaData() + engine = sa.create_engine(settings.DATABASE_URL, future=True) + metadata.reflect(bind=engine) + table = metadata.tables[table_name] + if table is not None: + Base.metadata.drop_all(engine, [table], checkfirst=True) + + +def get_data(): + sql = """ SELECT nubec.geo_description, nubec.geo_code, @@ -21,42 +148,106 @@ class Command(BaseDataWorkspaceIngestionCommand): nubec.employee_count_release_year, sector_mapping.dbt_full_sector_name, sector_mapping.dbt_sector_name - FROM ons.nomis__uk_business_employee_counts nubec + FROM public.dataservices_tmp_eybbusinessclusterinformation nubec LEFT JOIN ( SELECT - scmds."DIT full sector name" as dbt_full_sector_name, - scmds."DIT sector" as dbt_sector_name, + scmds.dbt_full_sector_name, + scmds.dbt_sector_name, -- necessary because sic codes are stored as integer in source table meaning leading 0 was dropped - substring(((scmds."SIC code" + 100000)::varchar) from 2 for 5) as five_digit_sic - from public.ref_sic_codes_dit_sector_mapping scmds + substring(((scmds.sic_code + 100000)::varchar) from 2 for 5) as five_digit_sic + from public.dataservices_tmp_sic_codes_dit_sector_mapping scmds ) AS sector_mapping ON nubec.sic_code = sector_mapping.five_digit_sic WHERE nubec.geo_code <> 'K02000001' - ''' - - def load_data(self): - data = [] - chunks = pd.read_sql(sa.text(self.sql), self.engine, chunksize=5000) - - for chunk in chunks: - for _idx, row in chunk.iterrows(): - data.append( - EYBBusinessClusterInformation( - geo_description=row.geo_description, - geo_code=row.geo_code, - sic_code=row.sic_code, - sic_description=row.sic_description, - total_business_count=row.total_business_count, - business_count_release_year=row.business_count_release_year, - # missing employee data represented as np.nan which results in error saving django model - # columns are int in dataframe so cannot store None resulting in below conditional assignment - total_employee_count=row.total_employee_count if row.total_employee_count > 0 else None, - employee_count_release_year=( - row.employee_count_release_year if row.employee_count_release_year > 0 else None - ), - dbt_full_sector_name=row.dbt_full_sector_name, - dbt_sector_name=row.dbt_sector_name, - ) - ) - - return data + """ + + data = [] + engine = sa.create_engine(settings.DATABASE_URL, future=True) + chunks = pd.read_sql(sa.text(sql), engine, chunksize=5000) + + for chunk in chunks: + for _, row in chunk.iterrows(): + data.append( + + ) + + return data + + + +class Command(BaseCommand, S3DownloadMixin): + + help = 'Import ONS total UK business and employee counts per region and section, 2 and 5 digit Standard Industrial Classification' # noqa:E501 + + def handle(self, *args, **options): + self.do_handle( + prefix=settings.NOMIS_UK_BUSINESS_EMPLOYEE_COUNTS_FROM_S3_PREFIX, + save_func=save_uk_business_employee_counts_data, + ) + self.do_handle( + prefix=settings.SIC_CODES_DIT_SECTOR_MAPPING, + save_func=save_sic_codes_dit_sector_mapping_data, + ) + data = get_data() + + table_name = 'dataservices_tmp_eybbusinessclusterinformation' + delete_temp_table(table_name) + table_name = 'dataservices_tmp_sic_codes_dit_sector_mapping' + delete_temp_table(table_name) + + + +# class Command(BaseDataWorkspaceIngestionCommand): +# help = 'Import ONS total UK business and employee counts per region and section, 2 and 5 digit Standard Industrial Classification' # noqa:E501 + +# sql = ''' +# SELECT +# nubec.geo_description, +# nubec.geo_code, +# nubec.sic_code, +# nubec.sic_description, +# nubec.total_business_count, +# nubec.business_count_release_year, +# nubec.total_employee_count, +# nubec.employee_count_release_year, +# sector_mapping.dbt_full_sector_name, +# sector_mapping.dbt_sector_name +# FROM ons.nomis__uk_business_employee_counts nubec +# LEFT JOIN ( +# SELECT +# scmds."DIT full sector name" as dbt_full_sector_name, +# scmds."DIT sector" as dbt_sector_name, +# -- necessary because sic codes are stored as integer in source table meaning leading 0 was dropped +# substring(((scmds."SIC code" + 100000)::varchar) from 2 for 5) as five_digit_sic +# from public.ref_sic_codes_dit_sector_mapping scmds +# ) AS sector_mapping +# ON nubec.sic_code = sector_mapping.five_digit_sic +# WHERE nubec.geo_code <> 'K02000001' +# ''' + +# def load_data(self): +# data = [] +# chunks = pd.read_sql(sa.text(self.sql), self.engine, chunksize=5000) + +# for chunk in chunks: +# for _idx, row in chunk.iterrows(): +# data.append( +# EYBBusinessClusterInformation( +# geo_description=row.geo_description, +# geo_code=row.geo_code, +# sic_code=row.sic_code, +# sic_description=row.sic_description, +# total_business_count=row.total_business_count, +# business_count_release_year=row.business_count_release_year, +# # missing employee data represented as np.nan which results in error saving django model +# # columns are int in dataframe so cannot store None resulting in below conditional assignment +# total_employee_count=row.total_employee_count if row.total_employee_count > 0 else None, +# employee_count_release_year=( +# row.employee_count_release_year if row.employee_count_release_year > 0 else None +# ), +# dbt_full_sector_name=row.dbt_full_sector_name, +# dbt_sector_name=row.dbt_sector_name, +# ) +# ) + +# return data From d7c6a0974b3f7697200e4d2df22e79b13dcac0e9 Mon Sep 17 00:00:00 2001 From: davidu1975 Date: Sun, 8 Dec 2024 21:26:03 +0000 Subject: [PATCH 02/16] refactor postcode s2 save method --- .../commands/import_postcodes_from_s3.py | 49 ++++++++++--------- 1 file changed, 27 insertions(+), 22 deletions(-) diff --git a/dataservices/management/commands/import_postcodes_from_s3.py b/dataservices/management/commands/import_postcodes_from_s3.py index cc99bf46..e6e1eb75 100644 --- a/dataservices/management/commands/import_postcodes_from_s3.py +++ b/dataservices/management/commands/import_postcodes_from_s3.py @@ -31,32 +31,37 @@ def map_eer_to_european_reqion(eer_code: str) -> str: def get_postcode_table_batch(data, data_table): - table_data = ( - ( - data_table, - ( - json.loads(postcode)['id'], - ( - json.loads(postcode)['pcd'].replace(' ', '') - if json.loads(postcode)['pcd'] - else json.loads(postcode)['pcd'] - ), + + def get_table_data(): + for postcode in data: + json_data = json.loads(postcode) + + yield ( ( - json.loads(postcode)['region_name'].strip() - if json.loads(postcode)['region_name'] - else json.loads(postcode)['region_name'] - ), - map_eer_to_european_reqion(json.loads(postcode)['eer']), - datetime.now(), - datetime.now(), - ), - ) - for postcode in data - ) + data_table, + ( + json_data['id'], + ( + json_data['pcd'].replace(' ', '') + if json_data['pcd'] + else json_data['pcd'] + ), + ( + json_data['region_name'].strip() + if json_data['region_name'] + else json_data['region_name'] + ), + map_eer_to_european_reqion(json_data['eer']), + datetime.now(), + datetime.now(), + ), + ) + ) + return ( None, None, - table_data, + get_table_data(), ) From 9663720b8809e4e3c36e60e85f6df3deb8ad13f2 Mon Sep 17 00:00:00 2001 From: davidu1975 Date: Mon, 9 Dec 2024 13:26:38 +0000 Subject: [PATCH 03/16] wip --- conf/settings.py | 1 - .../management/commands/import_dbt_sectors.py | 2 +- ...import_eyb_business_cluster_information.py | 108 ++++++++++++------ .../commands/import_eyb_rent_data.py | 45 ++++---- .../commands/import_postcodes_from_s3.py | 2 +- requirements.in | 2 +- requirements.txt | 2 +- requirements_test.txt | 2 +- 8 files changed, 98 insertions(+), 66 deletions(-) diff --git a/conf/settings.py b/conf/settings.py index c9cb2783..2a63ed33 100644 --- a/conf/settings.py +++ b/conf/settings.py @@ -615,4 +615,3 @@ POSTCODE_FROM_S3_PREFIX = env.postcode_from_s3_prefix NOMIS_UK_BUSINESS_EMPLOYEE_COUNTS_FROM_S3_PREFIX = env.nomis_uk_business_employee_counts_from_s3_prefix SIC_CODES_DIT_SECTOR_MAPPING = env.sic_codes_dit_sector_mapping - diff --git a/dataservices/management/commands/import_dbt_sectors.py b/dataservices/management/commands/import_dbt_sectors.py index 85aa67cf..8c9a92d2 100644 --- a/dataservices/management/commands/import_dbt_sectors.py +++ b/dataservices/management/commands/import_dbt_sectors.py @@ -45,7 +45,7 @@ def get_table_data(): ), ) ) - + return ( None, None, diff --git a/dataservices/management/commands/import_eyb_business_cluster_information.py b/dataservices/management/commands/import_eyb_business_cluster_information.py index 981a6e46..1ff61032 100644 --- a/dataservices/management/commands/import_eyb_business_cluster_information.py +++ b/dataservices/management/commands/import_eyb_business_cluster_information.py @@ -10,30 +10,42 @@ def get_uk_business_employee_counts_batch(data, data_table): - - table_data = ( - ( - data_table, - ( - json.loads(uk_business_employee_count)['geo_description'], - json.loads(uk_business_employee_count)['geo_code'], - json.loads(uk_business_employee_count)['sic_code'], - json.loads(uk_business_employee_count)['sic_description'], - json.loads(uk_business_employee_count)['total_business_count'], - json.loads(uk_business_employee_count)['business_count_release_year'], - # missing employee data represented as np.nan which results in error saving django model - # columns are int in dataframe so cannot store None resulting in below conditional assignment - json.loads(uk_business_employee_count)['total_employee_count'] if json.loads(uk_business_employee_count)['total_employee_count'] and json.loads(uk_business_employee_count)['total_employee_count'] > 0 else None, - json.loads(uk_business_employee_count)['employee_count_release_year'] if json.loads(uk_business_employee_count)['employee_count_release_year'] and json.loads(uk_business_employee_count)['employee_count_release_year'] > 0 else None, - ), - ) - for uk_business_employee_count in data - ) + + def get_table_data(): + + for uk_business_employee_count in data: + json_data = json.loads(uk_business_employee_count) + + yield ( + ( + data_table, + ( + json_data['geo_description'], + json_data['geo_code'], + json_data['sic_code'], + json_data['sic_description'], + json_data['total_business_count'], + json_data['business_count_release_year'], + # missing employee data represented as np.nan which results in error saving django model + # columns are int in dataframe so cannot store None resulting in below conditional assignment + ( + json_data['total_employee_count'] + if json_data['total_employee_count'] and json_data['total_employee_count'] > 0 + else None + ), + ( + json_data['employee_count_release_year'] + if json_data['employee_count_release_year'] and json_data['employee_count_release_year'] > 0 + else None + ), + ), + ) + ) return ( None, None, - table_data, + get_table_data(), ) @@ -55,23 +67,29 @@ def get_uk_business_employee_counts_postgres_table(metadata, table_name): def get_sic_codes_dit_sector_mapping_batch(data, data_table): - - table_data = ( - ( - data_table, - ( - json.loads(sic_codes_dit_sector_mapping)['dbt_full_sector_name'], - json.loads(sic_codes_dit_sector_mapping)['dbt_sector_name'], - json.loads(sic_codes_dit_sector_mapping)['sic_code'], - ), - ) - for sic_codes_dit_sector_mapping in data - ) + + def get_table_data(): + + for sic_codes_dit_sector_mapping in data: + breakpoint() + + json_data = json.loads(sic_codes_dit_sector_mapping) + + yield ( + ( + data_table, + ( + json_data['dbt_full_sector_name'], + json_data['dbt_sector_name'], + json_data['sic_code'], + ), + ) + ) return ( None, None, - table_data, + get_table_data(), ) @@ -88,6 +106,10 @@ def get_sic_codes_dit_sector_mapping_postgres_table(metadata, table_name): def save_uk_business_employee_counts_data(data): + pass + + +def save_uk_business_employee_counts_tmp_data(data): table_name = 'dataservices_tmp_eybbusinessclusterinformation' @@ -168,13 +190,25 @@ def get_data(): for chunk in chunks: for _, row in chunk.iterrows(): data.append( - + ( + row.geo_description, + row.geo_code, + row.sic_code, + row.sic_description, + row.total_business_count, + row.business_count_release_year, + # missing employee data represented as np.nan which results in error saving django model + # columns are int in dataframe so cannot store None resulting in below conditional assignment + row.total_employee_count if row.total_employee_count > 0 else None, + row.employee_count_release_year if row.employee_count_release_year > 0 else None, + row.dbt_full_sector_name, + row.dbt_sector_name, + ) ) return data - class Command(BaseCommand, S3DownloadMixin): help = 'Import ONS total UK business and employee counts per region and section, 2 and 5 digit Standard Industrial Classification' # noqa:E501 @@ -182,13 +216,14 @@ class Command(BaseCommand, S3DownloadMixin): def handle(self, *args, **options): self.do_handle( prefix=settings.NOMIS_UK_BUSINESS_EMPLOYEE_COUNTS_FROM_S3_PREFIX, - save_func=save_uk_business_employee_counts_data, + save_func=save_uk_business_employee_counts_tmp_data, ) self.do_handle( prefix=settings.SIC_CODES_DIT_SECTOR_MAPPING, save_func=save_sic_codes_dit_sector_mapping_data, ) data = get_data() + save_uk_business_employee_counts_data(data) table_name = 'dataservices_tmp_eybbusinessclusterinformation' delete_temp_table(table_name) @@ -196,7 +231,6 @@ def handle(self, *args, **options): delete_temp_table(table_name) - # class Command(BaseDataWorkspaceIngestionCommand): # help = 'Import ONS total UK business and employee counts per region and section, 2 and 5 digit Standard Industrial Classification' # noqa:E501 diff --git a/dataservices/management/commands/import_eyb_rent_data.py b/dataservices/management/commands/import_eyb_rent_data.py index 2ccb5e50..12509ad5 100644 --- a/dataservices/management/commands/import_eyb_rent_data.py +++ b/dataservices/management/commands/import_eyb_rent_data.py @@ -14,35 +14,34 @@ def get_table_data(): for eyb_rent in data: json_data = json.loads(eyb_rent) - + yield ( ( data_table, - ( - json_data['id'], - json_data['region'].strip(), - json_data['vertical'].strip(), - json_data['sub_vertical'].strip(), - ( - json_data['gbp_per_square_foot_per_month'] - if json_data['gbp_per_month'] and json_data['gbp_per_month'] > 0 - else None - ), ( - json_data['square_feet'] - if json_data['square_feet'] and json_data['square_feet'] > 0 - else None + json_data['id'], + json_data['region'].strip(), + json_data['vertical'].strip(), + json_data['sub_vertical'].strip(), + ( + json_data['gbp_per_square_foot_per_month'] + if json_data['gbp_per_month'] and json_data['gbp_per_month'] > 0 + else None + ), + ( + json_data['square_feet'] + if json_data['square_feet'] and json_data['square_feet'] > 0 + else None + ), + ( + json_data['gbp_per_month'] + if json_data['gbp_per_month'] and json_data['gbp_per_month'] > 0 + else None + ), + json_data['release_year'], ), - ( - json_data['gbp_per_month'] - if json_data['gbp_per_month'] and json_data['gbp_per_month'] > 0 - else None - ), - json_data['release_year'], - ), - ) + ) ) - return ( None, diff --git a/dataservices/management/commands/import_postcodes_from_s3.py b/dataservices/management/commands/import_postcodes_from_s3.py index e6e1eb75..e31f78d2 100644 --- a/dataservices/management/commands/import_postcodes_from_s3.py +++ b/dataservices/management/commands/import_postcodes_from_s3.py @@ -55,7 +55,7 @@ def get_table_data(): datetime.now(), datetime.now(), ), - ) + ) ) return ( diff --git a/requirements.in b/requirements.in index 1f6cee50..7ecaacd8 100644 --- a/requirements.in +++ b/requirements.in @@ -1,5 +1,5 @@ dbt-copilot-python==0.2.1 -django==4.2.16 +django==4.2.17 django-celery-beat==2.5.0 django-cleanup==8.1.0 django-clearcache>=1.2.1 diff --git a/requirements.txt b/requirements.txt index 48df1905..4ab40ae3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -110,7 +110,7 @@ distlib==0.3.8 # via virtualenv dj-database-url==2.1.0 # via -r requirements.in -django==4.2.16 +django==4.2.17 # via # -r requirements.in # directory-client-core diff --git a/requirements_test.txt b/requirements_test.txt index 8cc4b118..1e503638 100644 --- a/requirements_test.txt +++ b/requirements_test.txt @@ -125,7 +125,7 @@ distlib==0.3.8 # via virtualenv dj-database-url==2.1.0 # via -r requirements.in -django==4.2.16 +django==4.2.17 # via # -r requirements.in # directory-client-core From a9b917a30c1859ecf028de8e981aa1afde988c90 Mon Sep 17 00:00:00 2001 From: davidu1975 Date: Mon, 9 Dec 2024 13:55:03 +0000 Subject: [PATCH 04/16] black --- .../management/commands/import_postcodes_from_s3.py | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/dataservices/management/commands/import_postcodes_from_s3.py b/dataservices/management/commands/import_postcodes_from_s3.py index e31f78d2..cb0be390 100644 --- a/dataservices/management/commands/import_postcodes_from_s3.py +++ b/dataservices/management/commands/import_postcodes_from_s3.py @@ -41,16 +41,8 @@ def get_table_data(): data_table, ( json_data['id'], - ( - json_data['pcd'].replace(' ', '') - if json_data['pcd'] - else json_data['pcd'] - ), - ( - json_data['region_name'].strip() - if json_data['region_name'] - else json_data['region_name'] - ), + (json_data['pcd'].replace(' ', '') if json_data['pcd'] else json_data['pcd']), + (json_data['region_name'].strip() if json_data['region_name'] else json_data['region_name']), map_eer_to_european_reqion(json_data['eer']), datetime.now(), datetime.now(), From f1a574b0f5b3a785523195592ff044722ee606a5 Mon Sep 17 00:00:00 2001 From: davidu1975 Date: Mon, 9 Dec 2024 14:04:31 +0000 Subject: [PATCH 05/16] Remove test --- .../commands/tests/test_import_data.py | 38 ------------------- 1 file changed, 38 deletions(-) diff --git a/dataservices/management/commands/tests/test_import_data.py b/dataservices/management/commands/tests/test_import_data.py index 6464cf3b..6e9d742e 100644 --- a/dataservices/management/commands/tests/test_import_data.py +++ b/dataservices/management/commands/tests/test_import_data.py @@ -623,44 +623,6 @@ def test_helper_get_dataflow_metadata(): assert result.loc[:, 'source_data_modified_utc'][0] == expected -@pytest.mark.django_db -@mock.patch('pandas.read_sql') -@override_settings(DATA_WORKSPACE_DATASETS_URL='postgresql://') -def test_import_eyb_business_cluster_information(read_sql_mock): - data = { - 'geo_code': ['E92000001', 'N92000002', 'E12000003'], - 'geo_description': ['England', 'Northern Ireland', 'Yorkshire and The Humber'], - 'sic_code': ['42', '01110', '10130'], - 'sic_description': [ - 'Civil Engineering', - 'Growing of cereals (except rice), leguminous crops and oil seeds', - 'Production of meat and poultry meat products', - ], - 'total_business_count': [19070, 170, 55], - 'business_count_release_year': [2023, 2023, 2023], - 'total_employee_count': [159000, np.nan, 8000], - 'employee_count_release_year': [2022, np.nan, 2022], - 'dbt_full_sector_name': [ - None, - 'Agriculture, horticulture, fisheries and pets : Arable crops', - 'Food and drink : Meat products', - ], - 'dbt_sector_name': [None, 'Agriculture, horticulture, fisheries and pets', 'Food and drink'], - } - - read_sql_mock.return_value = [pd.DataFrame(data)] - - assert len(models.EYBBusinessClusterInformation.objects.all()) == 0 - - # dry run - management.call_command('import_eyb_business_cluster_information') - assert len(models.EYBBusinessClusterInformation.objects.all()) == 0 - - # write - management.call_command('import_eyb_business_cluster_information', '--write') - assert len(models.EYBBusinessClusterInformation.objects.all()) == 3 - - @pytest.mark.django_db def test_import_markets_countries_territories(capsys): management.call_command('import_markets_countries_territories', '--write') From d024bb08d6125d8dcdb42f833fc1715a0c0a7a2d Mon Sep 17 00:00:00 2001 From: davidu1975 Date: Mon, 9 Dec 2024 14:14:07 +0000 Subject: [PATCH 06/16] Fix failing test --- dataservices/management/commands/tests/test_import_data.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dataservices/management/commands/tests/test_import_data.py b/dataservices/management/commands/tests/test_import_data.py index 6e9d742e..d935225b 100644 --- a/dataservices/management/commands/tests/test_import_data.py +++ b/dataservices/management/commands/tests/test_import_data.py @@ -4,7 +4,6 @@ from itertools import cycle, islice from unittest import mock -import numpy as np import pandas as pd import pytest import sqlalchemy From 7757bf6e2de1ad15b35f7504c1760e136cf4cea1 Mon Sep 17 00:00:00 2001 From: davidu1975 Date: Tue, 10 Dec 2024 10:34:28 +0000 Subject: [PATCH 07/16] wip --- conf/env.py | 1 - conf/settings.py | 3 +- ...import_eyb_business_cluster_information.py | 79 ++++++++++--------- 3 files changed, 43 insertions(+), 40 deletions(-) diff --git a/conf/env.py b/conf/env.py index ebf20349..9ef0cad7 100644 --- a/conf/env.py +++ b/conf/env.py @@ -219,7 +219,6 @@ class BaseSettings(PydanticBaseSettings): eyb_rent_s3_prefix: str = '' postcode_from_s3_prefix: str = '' nomis_uk_business_employee_counts_from_s3_prefix: str = '' - sic_codes_dit_sector_mapping: str = '' class CIEnvironment(BaseSettings): diff --git a/conf/settings.py b/conf/settings.py index 2a63ed33..f31972bb 100644 --- a/conf/settings.py +++ b/conf/settings.py @@ -613,5 +613,4 @@ EYB_SALARY_S3_PREFIX = env.eyb_salary_s3_prefix EYB_RENT_S3_PREFIX = env.eyb_rent_s3_prefix POSTCODE_FROM_S3_PREFIX = env.postcode_from_s3_prefix -NOMIS_UK_BUSINESS_EMPLOYEE_COUNTS_FROM_S3_PREFIX = env.nomis_uk_business_employee_counts_from_s3_prefix -SIC_CODES_DIT_SECTOR_MAPPING = env.sic_codes_dit_sector_mapping +NOMIS_UK_BUSINESS_EMPLOYEE_COUNTS_FROM_S3_PREFIX = env.nomis_uk_business_employee_counts_from_s3_prefix \ No newline at end of file diff --git a/dataservices/management/commands/import_eyb_business_cluster_information.py b/dataservices/management/commands/import_eyb_business_cluster_information.py index 1ff61032..7f5dc8a8 100644 --- a/dataservices/management/commands/import_eyb_business_cluster_information.py +++ b/dataservices/management/commands/import_eyb_business_cluster_information.py @@ -6,7 +6,7 @@ from django.core.management.base import BaseCommand from dataservices.core.mixins import S3DownloadMixin -from dataservices.management.commands.helpers import ingest_data +from dataservices.management.commands.helpers import ingest_data, BaseDataWorkspaceIngestionCommand def get_uk_business_employee_counts_batch(data, data_table): @@ -147,40 +147,25 @@ def batches(_): ingest_data(engine, metadata, on_before_visible, batches) -def delete_temp_table(table_name): +def delete_temp_tables(table_names): Base = declarative_base() metadata = sa.MetaData() engine = sa.create_engine(settings.DATABASE_URL, future=True) metadata.reflect(bind=engine) - table = metadata.tables[table_name] - if table is not None: - Base.metadata.drop_all(engine, [table], checkfirst=True) + for name in table_names: + table = metadata.tables[name] + if table is not None: + Base.metadata.drop_all(engine, [table], checkfirst=True) -def get_data(): +def get_sic_codes_dit_sector_mapping(): sql = """ SELECT - nubec.geo_description, - nubec.geo_code, - nubec.sic_code, - nubec.sic_description, - nubec.total_business_count, - nubec.business_count_release_year, - nubec.total_employee_count, - nubec.employee_count_release_year, - sector_mapping.dbt_full_sector_name, - sector_mapping.dbt_sector_name - FROM public.dataservices_tmp_eybbusinessclusterinformation nubec - LEFT JOIN ( - SELECT - scmds.dbt_full_sector_name, - scmds.dbt_sector_name, - -- necessary because sic codes are stored as integer in source table meaning leading 0 was dropped - substring(((scmds.sic_code + 100000)::varchar) from 2 for 5) as five_digit_sic - from public.dataservices_tmp_sic_codes_dit_sector_mapping scmds - ) AS sector_mapping - ON nubec.sic_code = sector_mapping.five_digit_sic - WHERE nubec.geo_code <> 'K02000001' + scmds.dbt_full_sector_name, + scmds.dbt_sector_name, + -- necessary because sic codes are stored as integer in source table meaning leading 0 was dropped + substring(((scmds.sic_code + 100000)::varchar) from 2 for 5) as five_digit_sic + from public.ref_sic_codes_dit_sector_mapping scmds """ data = [] @@ -209,7 +194,7 @@ def get_data(): return data -class Command(BaseCommand, S3DownloadMixin): +class Command(BaseCommand, S3DownloadMixin, BaseDataWorkspaceIngestionCommand): help = 'Import ONS total UK business and employee counts per region and section, 2 and 5 digit Standard Industrial Classification' # noqa:E501 @@ -218,18 +203,38 @@ def handle(self, *args, **options): prefix=settings.NOMIS_UK_BUSINESS_EMPLOYEE_COUNTS_FROM_S3_PREFIX, save_func=save_uk_business_employee_counts_tmp_data, ) - self.do_handle( - prefix=settings.SIC_CODES_DIT_SECTOR_MAPPING, - save_func=save_sic_codes_dit_sector_mapping_data, - ) - data = get_data() + data = get_sic_codes_dit_sector_mapping() save_uk_business_employee_counts_data(data) - table_name = 'dataservices_tmp_eybbusinessclusterinformation' - delete_temp_table(table_name) + delete_temp_tables(['dataservices_tmp_eybbusinessclusterinformation', 'dataservices_tmp_sic_codes_dit_sector_mapping',]) table_name = 'dataservices_tmp_sic_codes_dit_sector_mapping' - delete_temp_table(table_name) - + + def load_data(self): +# # data = [] +# # chunks = pd.read_sql(sa.text(self.sql), self.engine, chunksize=5000) + +# # for chunk in chunks: +# # for _idx, row in chunk.iterrows(): +# # data.append( +# # EYBBusinessClusterInformation( +# # geo_description=row.geo_description, +# # geo_code=row.geo_code, +# # sic_code=row.sic_code, +# # sic_description=row.sic_description, +# # total_business_count=row.total_business_count, +# # business_count_release_year=row.business_count_release_year, +# # # missing employee data represented as np.nan which results in error saving django model +# # # columns are int in dataframe so cannot store None resulting in below conditional assignment +# # total_employee_count=row.total_employee_count if row.total_employee_count > 0 else None, +# # employee_count_release_year=( +# # row.employee_count_release_year if row.employee_count_release_year > 0 else None +# # ), +# # dbt_full_sector_name=row.dbt_full_sector_name, +# # dbt_sector_name=row.dbt_sector_name, +# # ) +# # ) + +# # return data # class Command(BaseDataWorkspaceIngestionCommand): # help = 'Import ONS total UK business and employee counts per region and section, 2 and 5 digit Standard Industrial Classification' # noqa:E501 From 36f97ccca687b8aa17c68be8f654d8d4cf57fbab Mon Sep 17 00:00:00 2001 From: davidu1975 Date: Tue, 10 Dec 2024 11:51:41 +0000 Subject: [PATCH 08/16] Add Ingestion of Nomis Employee info --- conf/settings.py | 2 +- ...import_eyb_business_cluster_information.py | 242 ++++++++++++------ 2 files changed, 165 insertions(+), 79 deletions(-) diff --git a/conf/settings.py b/conf/settings.py index f31972bb..d38d04a9 100644 --- a/conf/settings.py +++ b/conf/settings.py @@ -613,4 +613,4 @@ EYB_SALARY_S3_PREFIX = env.eyb_salary_s3_prefix EYB_RENT_S3_PREFIX = env.eyb_rent_s3_prefix POSTCODE_FROM_S3_PREFIX = env.postcode_from_s3_prefix -NOMIS_UK_BUSINESS_EMPLOYEE_COUNTS_FROM_S3_PREFIX = env.nomis_uk_business_employee_counts_from_s3_prefix \ No newline at end of file +NOMIS_UK_BUSINESS_EMPLOYEE_COUNTS_FROM_S3_PREFIX = env.nomis_uk_business_employee_counts_from_s3_prefix diff --git a/dataservices/management/commands/import_eyb_business_cluster_information.py b/dataservices/management/commands/import_eyb_business_cluster_information.py index 7f5dc8a8..bd021dfd 100644 --- a/dataservices/management/commands/import_eyb_business_cluster_information.py +++ b/dataservices/management/commands/import_eyb_business_cluster_information.py @@ -1,12 +1,53 @@ import json + import pandas as pd import sqlalchemy as sa -from sqlalchemy.ext.declarative import declarative_base from django.conf import settings from django.core.management.base import BaseCommand +from sqlalchemy.ext.declarative import declarative_base from dataservices.core.mixins import S3DownloadMixin -from dataservices.management.commands.helpers import ingest_data, BaseDataWorkspaceIngestionCommand +from dataservices.management.commands.helpers import ingest_data + + +def get_uk_business_employee_counts_tmp_batch(data, data_table): + + def get_table_data(): + + for uk_business_employee_count in data: + json_data = json.loads(uk_business_employee_count) + + yield ( + ( + data_table, + ( + json_data['geo_description'], + json_data['geo_code'], + json_data['sic_code'], + json_data['sic_description'], + json_data['total_business_count'], + json_data['business_count_release_year'], + # missing employee data represented as np.nan which results in error saving django model + # columns are int in dataframe so cannot store None resulting in below conditional assignment + ( + json_data['total_employee_count'] + if json_data['total_employee_count'] and json_data['total_employee_count'] > 0 + else None + ), + ( + json_data['employee_count_release_year'] + if json_data['employee_count_release_year'] and json_data['employee_count_release_year'] > 0 + else None + ), + ), + ) + ) + + return ( + None, + None, + get_table_data(), + ) def get_uk_business_employee_counts_batch(data, data_table): @@ -38,6 +79,8 @@ def get_table_data(): if json_data['employee_count_release_year'] and json_data['employee_count_release_year'] > 0 else None ), + json_data['dbt_full_sector_name'], + json_data['dbt_sector_name'], ), ) ) @@ -49,7 +92,7 @@ def get_table_data(): ) -def get_uk_business_employee_counts_postgres_table(metadata, table_name): +def get_uk_business_employee_counts_postgres_tmp_table(metadata, table_name): return sa.Table( table_name, metadata, @@ -66,12 +109,29 @@ def get_uk_business_employee_counts_postgres_table(metadata, table_name): ) +def get_uk_business_employee_counts_postgres_table(metadata, table_name): + return sa.Table( + table_name, + metadata, + sa.Column("geo_description", sa.TEXT, nullable=False), + sa.Column("geo_code", sa.TEXT, nullable=False), + sa.Column("sic_code", sa.TEXT, nullable=False), + sa.Column("sic_description", sa.TEXT, nullable=False), + sa.Column("total_business_count", sa.INTEGER, nullable=True), + sa.Column("business_count_release_year", sa.SMALLINT, nullable=True), + sa.Column("total_employee_count", sa.INTEGER, nullable=True), + sa.Column("employee_count_release_year", sa.SMALLINT, nullable=True), + sa.Column("dbt_full_sector_name", sa.TEXT, nullable=True), + sa.Column("dbt_sector_name", sa.TEXT, nullable=True), + schema="public", + ) + + def get_sic_codes_dit_sector_mapping_batch(data, data_table): def get_table_data(): for sic_codes_dit_sector_mapping in data: - breakpoint() json_data = json.loads(sic_codes_dit_sector_mapping) @@ -105,8 +165,65 @@ def get_sic_codes_dit_sector_mapping_postgres_table(metadata, table_name): ) -def save_uk_business_employee_counts_data(data): - pass +def save_uk_business_employee_counts_data(): + sql = """ + SELECT + nubec.geo_description, + nubec.geo_code, + nubec.sic_code, + nubec.sic_description, + nubec.total_business_count, + nubec.business_count_release_year, + nubec.total_employee_count, + nubec.employee_count_release_year, + sector_mapping.dbt_full_sector_name, + sector_mapping.dbt_sector_name + FROM dataservices_tmp_eybbusinessclusterinformation nubec + LEFT JOIN ( + SELECT + scmds.dbt_full_sector_name, + scmds.bt_sector_name, + scmds.five_digit_sic + from dataservices_tmp_sic_codes_dit_sector_mapping scmds + ) AS sector_mapping + ON nubec.sic_code = sector_mapping.five_digit_sic + WHERE nubec.geo_code <> 'K02000001' + """ + + engine = sa.create_engine(settings.DATABASE_URL, future=True) + + data = [] + + chunks = pd.read_sql(sa.text(sql), engine, chunksize=5000) + + for chunk in chunks: + for _, row in chunk.iterrows(): + data.append( + ( + row.geo_description, + row.geo_code, + row.sic_code, + row.sic_description, + row.total_business_count, + row.business_count_release_year, + row.total_employee_count, + row.employee_count_release_year, + row.dbt_full_sector_name, + row.dbt_sector_name, + ) + ) + + metadata = sa.MetaData() + + data_table = get_uk_business_employee_counts_postgres_table(metadata, 'dataservices_eybbusinessclusterinformation') + + def on_before_visible(conn, ingest_table, batch_metadata): + pass + + def batches(_): + yield get_uk_business_employee_counts_batch(data, data_table) + + ingest_data(engine, metadata, on_before_visible, batches) def save_uk_business_employee_counts_tmp_data(data): @@ -117,18 +234,18 @@ def save_uk_business_employee_counts_tmp_data(data): metadata = sa.MetaData() - data_table = get_uk_business_employee_counts_postgres_table(metadata, table_name) + data_table = get_uk_business_employee_counts_postgres_tmp_table(metadata, table_name) def on_before_visible(conn, ingest_table, batch_metadata): pass def batches(_): - yield get_uk_business_employee_counts_batch(data, data_table) + yield get_uk_business_employee_counts_tmp_batch(data, data_table) ingest_data(engine, metadata, on_before_visible, batches) -def save_sic_codes_dit_sector_mapping_data(data): +def save_sic_codes_dit_sector_mapping_tmp_data(data): table_name = 'dataservices_tmp_sic_codes_dit_sector_mapping' @@ -158,43 +275,7 @@ def delete_temp_tables(table_names): Base.metadata.drop_all(engine, [table], checkfirst=True) -def get_sic_codes_dit_sector_mapping(): - sql = """ - SELECT - scmds.dbt_full_sector_name, - scmds.dbt_sector_name, - -- necessary because sic codes are stored as integer in source table meaning leading 0 was dropped - substring(((scmds.sic_code + 100000)::varchar) from 2 for 5) as five_digit_sic - from public.ref_sic_codes_dit_sector_mapping scmds - """ - - data = [] - engine = sa.create_engine(settings.DATABASE_URL, future=True) - chunks = pd.read_sql(sa.text(sql), engine, chunksize=5000) - - for chunk in chunks: - for _, row in chunk.iterrows(): - data.append( - ( - row.geo_description, - row.geo_code, - row.sic_code, - row.sic_description, - row.total_business_count, - row.business_count_release_year, - # missing employee data represented as np.nan which results in error saving django model - # columns are int in dataframe so cannot store None resulting in below conditional assignment - row.total_employee_count if row.total_employee_count > 0 else None, - row.employee_count_release_year if row.employee_count_release_year > 0 else None, - row.dbt_full_sector_name, - row.dbt_sector_name, - ) - ) - - return data - - -class Command(BaseCommand, S3DownloadMixin, BaseDataWorkspaceIngestionCommand): +class Command(BaseCommand, S3DownloadMixin): help = 'Import ONS total UK business and employee counts per region and section, 2 and 5 digit Standard Industrial Classification' # noqa:E501 @@ -203,38 +284,43 @@ def handle(self, *args, **options): prefix=settings.NOMIS_UK_BUSINESS_EMPLOYEE_COUNTS_FROM_S3_PREFIX, save_func=save_uk_business_employee_counts_tmp_data, ) - data = get_sic_codes_dit_sector_mapping() - save_uk_business_employee_counts_data(data) - - delete_temp_tables(['dataservices_tmp_eybbusinessclusterinformation', 'dataservices_tmp_sic_codes_dit_sector_mapping',]) - table_name = 'dataservices_tmp_sic_codes_dit_sector_mapping' - - def load_data(self): -# # data = [] -# # chunks = pd.read_sql(sa.text(self.sql), self.engine, chunksize=5000) - -# # for chunk in chunks: -# # for _idx, row in chunk.iterrows(): -# # data.append( -# # EYBBusinessClusterInformation( -# # geo_description=row.geo_description, -# # geo_code=row.geo_code, -# # sic_code=row.sic_code, -# # sic_description=row.sic_description, -# # total_business_count=row.total_business_count, -# # business_count_release_year=row.business_count_release_year, -# # # missing employee data represented as np.nan which results in error saving django model -# # # columns are int in dataframe so cannot store None resulting in below conditional assignment -# # total_employee_count=row.total_employee_count if row.total_employee_count > 0 else None, -# # employee_count_release_year=( -# # row.employee_count_release_year if row.employee_count_release_year > 0 else None -# # ), -# # dbt_full_sector_name=row.dbt_full_sector_name, -# # dbt_sector_name=row.dbt_sector_name, -# # ) -# # ) - -# # return data + sic_code_data = self.load_sic_code_data() + save_sic_codes_dit_sector_mapping_tmp_data(sic_code_data) + save_uk_business_employee_counts_data() + + delete_temp_tables( + [ + 'dataservices_tmp_eybbusinessclusterinformation', + 'dataservices_tmp_sic_codes_dit_sector_mapping', + ] + ) + + def load_sic_code_data(self): + sql = """ + SELECT + scmds."DIT full sector name" as dbt_full_sector_name, + scmds."DIT sector" as dbt_sector_name, + -- necessary because sic codes are stored as integer in source table meaning leading 0 was dropped + substring(((scmds."SIC code" + 100000)::varchar) from 2 for 5) as five_digit_sic + from public.ref_sic_codes_dit_sector_mapping scmds + """ + + data = [] + engine = sa.create_engine(settings.DATA_WORKSPACE_DATASETS_URL, execution_options={'stream_results': True}) + chunks = pd.read_sql(sa.text(sql), engine, chunksize=5000) + + for chunk in chunks: + for _, row in chunk.iterrows(): + data.append( + ( + row.dbt_full_sector_name, + row.dbt_sector_name, + row.five_digit_sic, + ) + ) + + return data + # class Command(BaseDataWorkspaceIngestionCommand): # help = 'Import ONS total UK business and employee counts per region and section, 2 and 5 digit Standard Industrial Classification' # noqa:E501 From d154e55d4fd7a3f496841fec5eb4678364f61632 Mon Sep 17 00:00:00 2001 From: davidu1975 Date: Tue, 10 Dec 2024 11:58:28 +0000 Subject: [PATCH 09/16] Add basic error handling --- ...import_eyb_business_cluster_information.py | 32 +++++++++++-------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/dataservices/management/commands/import_eyb_business_cluster_information.py b/dataservices/management/commands/import_eyb_business_cluster_information.py index bd021dfd..7bbfca57 100644 --- a/dataservices/management/commands/import_eyb_business_cluster_information.py +++ b/dataservices/management/commands/import_eyb_business_cluster_information.py @@ -280,20 +280,24 @@ class Command(BaseCommand, S3DownloadMixin): help = 'Import ONS total UK business and employee counts per region and section, 2 and 5 digit Standard Industrial Classification' # noqa:E501 def handle(self, *args, **options): - self.do_handle( - prefix=settings.NOMIS_UK_BUSINESS_EMPLOYEE_COUNTS_FROM_S3_PREFIX, - save_func=save_uk_business_employee_counts_tmp_data, - ) - sic_code_data = self.load_sic_code_data() - save_sic_codes_dit_sector_mapping_tmp_data(sic_code_data) - save_uk_business_employee_counts_data() - - delete_temp_tables( - [ - 'dataservices_tmp_eybbusinessclusterinformation', - 'dataservices_tmp_sic_codes_dit_sector_mapping', - ] - ) + + try: + self.do_handle( + prefix=settings.NOMIS_UK_BUSINESS_EMPLOYEE_COUNTS_FROM_S3_PREFIX, + save_func=save_uk_business_employee_counts_tmp_data, + ) + sic_code_data = self.load_sic_code_data() + save_sic_codes_dit_sector_mapping_tmp_data(sic_code_data) + save_uk_business_employee_counts_data() + except Exception: + pass + finally: + delete_temp_tables( + [ + 'dataservices_tmp_eybbusinessclusterinformation', + 'dataservices_tmp_sic_codes_dit_sector_mapping', + ] + ) def load_sic_code_data(self): sql = """ From 9616e040309c1e5417cfee66f0f33cc24c0a179d Mon Sep 17 00:00:00 2001 From: davidu1975 Date: Tue, 10 Dec 2024 13:21:09 +0000 Subject: [PATCH 10/16] debugging --- .../import_eyb_business_cluster_information.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/dataservices/management/commands/import_eyb_business_cluster_information.py b/dataservices/management/commands/import_eyb_business_cluster_information.py index 7bbfca57..435d9308 100644 --- a/dataservices/management/commands/import_eyb_business_cluster_information.py +++ b/dataservices/management/commands/import_eyb_business_cluster_information.py @@ -300,17 +300,31 @@ def handle(self, *args, **options): ) def load_sic_code_data(self): - sql = """ + sql = ''' SELECT scmds."DIT full sector name" as dbt_full_sector_name, scmds."DIT sector" as dbt_sector_name, -- necessary because sic codes are stored as integer in source table meaning leading 0 was dropped substring(((scmds."SIC code" + 100000)::varchar) from 2 for 5) as five_digit_sic from public.ref_sic_codes_dit_sector_mapping scmds - """ + ''' data = [] engine = sa.create_engine(settings.DATA_WORKSPACE_DATASETS_URL, execution_options={'stream_results': True}) + connection = engine.raw_connection() + cursor = connection.cursor() + s = "SELECT" + s += " table_schema" + s += ", table_name" + s += " FROM information_schema.tables" + s += " WHERE" + s += " (" + s += " table_schema = '"+SCHEMA+"'" + s += " AND table_type = 'BASE TABLE'" + s += " )" + s += " ORDER BY table_schema, table_name;" + cursor.execute(s) + list_tables = cursor.fetchall() chunks = pd.read_sql(sa.text(sql), engine, chunksize=5000) for chunk in chunks: From 17be96a652e0eb9be29b1ed5bf2854617ef4cbb7 Mon Sep 17 00:00:00 2001 From: davidu1975 Date: Tue, 10 Dec 2024 16:50:39 +0000 Subject: [PATCH 11/16] use 3 s3 downloads for data now --- conf/env.py | 2 + conf/settings.py | 2 + ...import_eyb_business_cluster_information.py | 198 ++++++++++-------- 3 files changed, 119 insertions(+), 83 deletions(-) diff --git a/conf/env.py b/conf/env.py index 9ef0cad7..b09ab38f 100644 --- a/conf/env.py +++ b/conf/env.py @@ -219,6 +219,8 @@ class BaseSettings(PydanticBaseSettings): eyb_rent_s3_prefix: str = '' postcode_from_s3_prefix: str = '' nomis_uk_business_employee_counts_from_s3_prefix: str = '' + ref_sic_codes_mapping_from_s3_prefix: str = '' + sector_reference_dataset_from_s3_prefix: str = '' class CIEnvironment(BaseSettings): diff --git a/conf/settings.py b/conf/settings.py index d38d04a9..1bf7d0c3 100644 --- a/conf/settings.py +++ b/conf/settings.py @@ -614,3 +614,5 @@ EYB_RENT_S3_PREFIX = env.eyb_rent_s3_prefix POSTCODE_FROM_S3_PREFIX = env.postcode_from_s3_prefix NOMIS_UK_BUSINESS_EMPLOYEE_COUNTS_FROM_S3_PREFIX = env.nomis_uk_business_employee_counts_from_s3_prefix +REF_SIC_CODES_MAPPING_FROM_S3_PREFIX = env.ref_sic_codes_mapping_from_s3_prefix +SECTOR_REFERENCE_DATASET_FROM_S3_PREFIX = env.sector_reference_dataset_from_s3_prefix diff --git a/dataservices/management/commands/import_eyb_business_cluster_information.py b/dataservices/management/commands/import_eyb_business_cluster_information.py index 435d9308..72bcb565 100644 --- a/dataservices/management/commands/import_eyb_business_cluster_information.py +++ b/dataservices/management/commands/import_eyb_business_cluster_information.py @@ -1,4 +1,5 @@ import json +import logging import pandas as pd import sqlalchemy as sa @@ -9,6 +10,8 @@ from dataservices.core.mixins import S3DownloadMixin from dataservices.management.commands.helpers import ingest_data +logger = logging.getLogger(__name__) + def get_uk_business_employee_counts_tmp_batch(data, data_table): @@ -54,8 +57,7 @@ def get_uk_business_employee_counts_batch(data, data_table): def get_table_data(): - for uk_business_employee_count in data: - json_data = json.loads(uk_business_employee_count) + for json_data in data: yield ( ( @@ -127,21 +129,48 @@ def get_uk_business_employee_counts_postgres_table(metadata, table_name): ) -def get_sic_codes_dit_sector_mapping_batch(data, data_table): +def get_ref_sic_codes_mapping_batch(data, data_table): def get_table_data(): - for sic_codes_dit_sector_mapping in data: + for ref_sic_codes_mapping in data: - json_data = json.loads(sic_codes_dit_sector_mapping) + json_data = json.loads(ref_sic_codes_mapping) yield ( ( data_table, ( - json_data['dbt_full_sector_name'], - json_data['dbt_sector_name'], + json_data['id'], json_data['sic_code'], + json_data['mapping_id'], + json_data['dit_sector_list_id'], + ), + ) + ) + + return ( + None, + None, + get_table_data(), + ) + + +def get_sector_reference_dataset_batch(data, data_table): + + def get_table_data(): + + for sector_reference_dataset in data: + + json_data = json.loads(sector_reference_dataset) + + yield ( + ( + data_table, + ( + json_data['id'], + json_data['field_04'], + json_data['full_sector_name'], ), ) ) @@ -153,19 +182,33 @@ def get_table_data(): ) -def get_sic_codes_dit_sector_mapping_postgres_table(metadata, table_name): +def get_ref_sic_codes_mapping_postgres_table(metadata, table_name): return sa.Table( table_name, metadata, - sa.Column("dbt_full_sector_name", sa.TEXT, nullable=True), - sa.Column("dbt_sector_name", sa.TEXT, nullable=True), + sa.Column("id", sa.INTEGER, nullable=False), sa.Column("sic_code", sa.TEXT, nullable=False), - sa.Index(None, "sic_code"), + sa.Column("mapping_id", sa.TEXT, nullable=True), + sa.Column("dit_sector_list_id", sa.INTEGER, nullable=True), + sa.Index(None, "dit_sector_list_id"), + schema="public", + ) + + +def get_sector_reference_dataset_postgres_table(metadata, table_name): + return sa.Table( + table_name, + metadata, + sa.Column("id", sa.INTEGER, nullable=False), + sa.Column("field_04", sa.TEXT, nullable=True), + sa.Column("full_sector_name", sa.TEXT, nullable=True), + sa.Index(None, "id"), schema="public", ) def save_uk_business_employee_counts_data(): + sql = """ SELECT nubec.geo_description, @@ -179,39 +222,42 @@ def save_uk_business_employee_counts_data(): sector_mapping.dbt_full_sector_name, sector_mapping.dbt_sector_name FROM dataservices_tmp_eybbusinessclusterinformation nubec - LEFT JOIN ( + LEFT JOIN ( SELECT - scmds.dbt_full_sector_name, - scmds.bt_sector_name, - scmds.five_digit_sic - from dataservices_tmp_sic_codes_dit_sector_mapping scmds - ) AS sector_mapping - ON nubec.sic_code = sector_mapping.five_digit_sic + sectorref.full_sector_name as dbt_full_sector_name, + sectorref.field_04 as dbt_sector_name, + substring(((dataservices_tmp_ref_sic_codes_mapping.sic_code + 100000)::varchar) from 2 for 5) as five_digit_sic # noqa:E501 + from dataservices_tmp_sector_reference sectorref + INNER JOIN dataservices_tmp_ref_sic_codes_mapping ON dataservices_tmp_ref_sic_codes_mapping.dit_sector_list_id = dataservices_tmp_sector_reference.id + ) as sector_mapping WHERE nubec.geo_code <> 'K02000001' + ON nubec.sic_code = sector_mapping.five_digit_sic """ engine = sa.create_engine(settings.DATABASE_URL, future=True) data = [] - chunks = pd.read_sql(sa.text(sql), engine, chunksize=5000) + breakpoint() + with engine.connect() as connection: + chunks = pd.read_sql_query(sa.text(sql), connection, chunksize=5000) - for chunk in chunks: - for _, row in chunk.iterrows(): - data.append( - ( - row.geo_description, - row.geo_code, - row.sic_code, - row.sic_description, - row.total_business_count, - row.business_count_release_year, - row.total_employee_count, - row.employee_count_release_year, - row.dbt_full_sector_name, - row.dbt_sector_name, + for chunk in chunks: + for _, row in chunk.iterrows(): + data.append( + { + 'geo_description': row.geo_description, + 'geo_code': row.geo_code, + 'sic_code': row.sic_code, + 'sic_description': row.sic_description, + 'total_business_count': row.total_business_count, + 'business_count_release_year': row.business_count_release_year, + 'total_employee_count': row.total_employee_count, + 'employee_count_release_year': row.employee_count_release_year, + 'dbt_full_sector_name': row.dbt_full_sector_name, + 'dbt_sector_name': row.dbt_sector_name, + } ) - ) metadata = sa.MetaData() @@ -245,21 +291,40 @@ def batches(_): ingest_data(engine, metadata, on_before_visible, batches) -def save_sic_codes_dit_sector_mapping_tmp_data(data): +def save_ref_sic_codes_mapping_tmp_data(data): - table_name = 'dataservices_tmp_sic_codes_dit_sector_mapping' + table_name = 'dataservices_tmp_ref_sic_codes_mapping' engine = sa.create_engine(settings.DATABASE_URL, future=True) metadata = sa.MetaData() - data_table = get_sic_codes_dit_sector_mapping_postgres_table(metadata, table_name) + data_table = get_ref_sic_codes_mapping_postgres_table(metadata, table_name) def on_before_visible(conn, ingest_table, batch_metadata): pass def batches(_): - yield get_sic_codes_dit_sector_mapping_batch(data, data_table) + yield get_ref_sic_codes_mapping_batch(data, data_table) + + ingest_data(engine, metadata, on_before_visible, batches) + + +def save_sector_reference_dataset_tmp_data(data): + + table_name = 'dataservices_tmp_sector_reference' + + engine = sa.create_engine(settings.DATABASE_URL, future=True) + + metadata = sa.MetaData() + + data_table = get_sector_reference_dataset_postgres_table(metadata, table_name) + + def on_before_visible(conn, ingest_table, batch_metadata): + pass + + def batches(_): + yield get_sector_reference_dataset_batch(data, data_table) ingest_data(engine, metadata, on_before_visible, batches) @@ -270,7 +335,7 @@ def delete_temp_tables(table_names): engine = sa.create_engine(settings.DATABASE_URL, future=True) metadata.reflect(bind=engine) for name in table_names: - table = metadata.tables[name] + table = metadata.tables.get(name, None) if table is not None: Base.metadata.drop_all(engine, [table], checkfirst=True) @@ -286,59 +351,26 @@ def handle(self, *args, **options): prefix=settings.NOMIS_UK_BUSINESS_EMPLOYEE_COUNTS_FROM_S3_PREFIX, save_func=save_uk_business_employee_counts_tmp_data, ) - sic_code_data = self.load_sic_code_data() - save_sic_codes_dit_sector_mapping_tmp_data(sic_code_data) + self.do_handle( + prefix=settings.REF_SIC_CODES_MAPPING_FROM_S3_PREFIX, + save_func=save_ref_sic_codes_mapping_tmp_data, + ) + self.do_handle( + prefix=settings.SECTOR_REFERENCE_DATASET_FROM_S3_PREFIX, + save_func=save_sector_reference_dataset_tmp_data, + ) save_uk_business_employee_counts_data() except Exception: - pass + logger.exception("import_eyb_business_cluster_information failed to ingest data from s3") finally: delete_temp_tables( [ 'dataservices_tmp_eybbusinessclusterinformation', - 'dataservices_tmp_sic_codes_dit_sector_mapping', + 'dataservices_tmp_ref_sic_codes_mapping', + 'dataservices_tmp_sector_reference', ] ) - def load_sic_code_data(self): - sql = ''' - SELECT - scmds."DIT full sector name" as dbt_full_sector_name, - scmds."DIT sector" as dbt_sector_name, - -- necessary because sic codes are stored as integer in source table meaning leading 0 was dropped - substring(((scmds."SIC code" + 100000)::varchar) from 2 for 5) as five_digit_sic - from public.ref_sic_codes_dit_sector_mapping scmds - ''' - - data = [] - engine = sa.create_engine(settings.DATA_WORKSPACE_DATASETS_URL, execution_options={'stream_results': True}) - connection = engine.raw_connection() - cursor = connection.cursor() - s = "SELECT" - s += " table_schema" - s += ", table_name" - s += " FROM information_schema.tables" - s += " WHERE" - s += " (" - s += " table_schema = '"+SCHEMA+"'" - s += " AND table_type = 'BASE TABLE'" - s += " )" - s += " ORDER BY table_schema, table_name;" - cursor.execute(s) - list_tables = cursor.fetchall() - chunks = pd.read_sql(sa.text(sql), engine, chunksize=5000) - - for chunk in chunks: - for _, row in chunk.iterrows(): - data.append( - ( - row.dbt_full_sector_name, - row.dbt_sector_name, - row.five_digit_sic, - ) - ) - - return data - # class Command(BaseDataWorkspaceIngestionCommand): # help = 'Import ONS total UK business and employee counts per region and section, 2 and 5 digit Standard Industrial Classification' # noqa:E501 From 9287dfb6248f6ed5ee6656797f3f1fd1013eb149 Mon Sep 17 00:00:00 2001 From: davidu1975 Date: Wed, 11 Dec 2024 11:13:23 +0000 Subject: [PATCH 12/16] Ingest nomis UK employee data from s3 --- ...import_eyb_business_cluster_information.py | 86 ++++--------------- 1 file changed, 16 insertions(+), 70 deletions(-) diff --git a/dataservices/management/commands/import_eyb_business_cluster_information.py b/dataservices/management/commands/import_eyb_business_cluster_information.py index 72bcb565..f195478f 100644 --- a/dataservices/management/commands/import_eyb_business_cluster_information.py +++ b/dataservices/management/commands/import_eyb_business_cluster_information.py @@ -59,6 +59,9 @@ def get_table_data(): for json_data in data: + if json_data['geo_code'] == 'K02000001': + continue + yield ( ( data_table, @@ -187,7 +190,7 @@ def get_ref_sic_codes_mapping_postgres_table(metadata, table_name): table_name, metadata, sa.Column("id", sa.INTEGER, nullable=False), - sa.Column("sic_code", sa.TEXT, nullable=False), + sa.Column("sic_code", sa.INTEGER, nullable=False), sa.Column("mapping_id", sa.TEXT, nullable=True), sa.Column("dit_sector_list_id", sa.INTEGER, nullable=True), sa.Index(None, "dit_sector_list_id"), @@ -221,16 +224,16 @@ def save_uk_business_employee_counts_data(): nubec.employee_count_release_year, sector_mapping.dbt_full_sector_name, sector_mapping.dbt_sector_name - FROM dataservices_tmp_eybbusinessclusterinformation nubec + FROM public.dataservices_tmp_eybbusinessclusterinformation nubec LEFT JOIN ( - SELECT - sectorref.full_sector_name as dbt_full_sector_name, - sectorref.field_04 as dbt_sector_name, - substring(((dataservices_tmp_ref_sic_codes_mapping.sic_code + 100000)::varchar) from 2 for 5) as five_digit_sic # noqa:E501 - from dataservices_tmp_sector_reference sectorref - INNER JOIN dataservices_tmp_ref_sic_codes_mapping ON dataservices_tmp_ref_sic_codes_mapping.dit_sector_list_id = dataservices_tmp_sector_reference.id + SELECT + dataservices_tmp_sector_reference.full_sector_name as dbt_full_sector_name, + dataservices_tmp_sector_reference.field_04 as dbt_sector_name, + -- necessary because sic codes are stored as integer in source table meaning leading 0 was dropped + substring(((dataservices_tmp_ref_sic_codes_mapping.sic_code + 100000)::varchar) from 2 for 5) as five_digit_sic -- # noqa:E501 + FROM public.dataservices_tmp_ref_sic_codes_mapping + INNER JOIN public.dataservices_tmp_sector_reference ON public.dataservices_tmp_ref_sic_codes_mapping.dit_sector_list_id = public.dataservices_tmp_sector_reference.id ) as sector_mapping - WHERE nubec.geo_code <> 'K02000001' ON nubec.sic_code = sector_mapping.five_digit_sic """ @@ -238,7 +241,6 @@ def save_uk_business_employee_counts_data(): data = [] - breakpoint() with engine.connect() as connection: chunks = pd.read_sql_query(sa.text(sql), connection, chunksize=5000) @@ -291,7 +293,7 @@ def batches(_): ingest_data(engine, metadata, on_before_visible, batches) -def save_ref_sic_codes_mapping_tmp_data(data): +def save_ref_sic_codes_mapping_data(data): table_name = 'dataservices_tmp_ref_sic_codes_mapping' @@ -310,7 +312,7 @@ def batches(_): ingest_data(engine, metadata, on_before_visible, batches) -def save_sector_reference_dataset_tmp_data(data): +def save_sector_reference_dataset_data(data): table_name = 'dataservices_tmp_sector_reference' @@ -353,11 +355,11 @@ def handle(self, *args, **options): ) self.do_handle( prefix=settings.REF_SIC_CODES_MAPPING_FROM_S3_PREFIX, - save_func=save_ref_sic_codes_mapping_tmp_data, + save_func=save_ref_sic_codes_mapping_data, ) self.do_handle( prefix=settings.SECTOR_REFERENCE_DATASET_FROM_S3_PREFIX, - save_func=save_sector_reference_dataset_tmp_data, + save_func=save_sector_reference_dataset_data, ) save_uk_business_employee_counts_data() except Exception: @@ -370,59 +372,3 @@ def handle(self, *args, **options): 'dataservices_tmp_sector_reference', ] ) - - -# class Command(BaseDataWorkspaceIngestionCommand): -# help = 'Import ONS total UK business and employee counts per region and section, 2 and 5 digit Standard Industrial Classification' # noqa:E501 - -# sql = ''' -# SELECT -# nubec.geo_description, -# nubec.geo_code, -# nubec.sic_code, -# nubec.sic_description, -# nubec.total_business_count, -# nubec.business_count_release_year, -# nubec.total_employee_count, -# nubec.employee_count_release_year, -# sector_mapping.dbt_full_sector_name, -# sector_mapping.dbt_sector_name -# FROM ons.nomis__uk_business_employee_counts nubec -# LEFT JOIN ( -# SELECT -# scmds."DIT full sector name" as dbt_full_sector_name, -# scmds."DIT sector" as dbt_sector_name, -# -- necessary because sic codes are stored as integer in source table meaning leading 0 was dropped -# substring(((scmds."SIC code" + 100000)::varchar) from 2 for 5) as five_digit_sic -# from public.ref_sic_codes_dit_sector_mapping scmds -# ) AS sector_mapping -# ON nubec.sic_code = sector_mapping.five_digit_sic -# WHERE nubec.geo_code <> 'K02000001' -# ''' - -# def load_data(self): -# data = [] -# chunks = pd.read_sql(sa.text(self.sql), self.engine, chunksize=5000) - -# for chunk in chunks: -# for _idx, row in chunk.iterrows(): -# data.append( -# EYBBusinessClusterInformation( -# geo_description=row.geo_description, -# geo_code=row.geo_code, -# sic_code=row.sic_code, -# sic_description=row.sic_description, -# total_business_count=row.total_business_count, -# business_count_release_year=row.business_count_release_year, -# # missing employee data represented as np.nan which results in error saving django model -# # columns are int in dataframe so cannot store None resulting in below conditional assignment -# total_employee_count=row.total_employee_count if row.total_employee_count > 0 else None, -# employee_count_release_year=( -# row.employee_count_release_year if row.employee_count_release_year > 0 else None -# ), -# dbt_full_sector_name=row.dbt_full_sector_name, -# dbt_sector_name=row.dbt_sector_name, -# ) -# ) - -# return data From 0bc16485539673ab08cfe8fbe874701a350b5312 Mon Sep 17 00:00:00 2001 From: davidu1975 Date: Wed, 11 Dec 2024 11:28:06 +0000 Subject: [PATCH 13/16] no --write option --- dataservices/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dataservices/tasks.py b/dataservices/tasks.py index b4fa8c80..86f9bf91 100644 --- a/dataservices/tasks.py +++ b/dataservices/tasks.py @@ -58,7 +58,7 @@ def run_import_dbt_sectors(): @app.task() def run_import_eyb_business_cluster_information(): - call_command('import_eyb_business_cluster_information', '--write') + call_command('import_eyb_business_cluster_information') @app.task() From 271e611affb87728830f748993b352339397ca3c Mon Sep 17 00:00:00 2001 From: davidu1975 Date: Wed, 11 Dec 2024 11:42:27 +0000 Subject: [PATCH 14/16] remove unnecessary temp table columns --- .../commands/import_eyb_business_cluster_information.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/dataservices/management/commands/import_eyb_business_cluster_information.py b/dataservices/management/commands/import_eyb_business_cluster_information.py index f195478f..bed1b58f 100644 --- a/dataservices/management/commands/import_eyb_business_cluster_information.py +++ b/dataservices/management/commands/import_eyb_business_cluster_information.py @@ -144,9 +144,7 @@ def get_table_data(): ( data_table, ( - json_data['id'], json_data['sic_code'], - json_data['mapping_id'], json_data['dit_sector_list_id'], ), ) @@ -189,9 +187,7 @@ def get_ref_sic_codes_mapping_postgres_table(metadata, table_name): return sa.Table( table_name, metadata, - sa.Column("id", sa.INTEGER, nullable=False), sa.Column("sic_code", sa.INTEGER, nullable=False), - sa.Column("mapping_id", sa.TEXT, nullable=True), sa.Column("dit_sector_list_id", sa.INTEGER, nullable=True), sa.Index(None, "dit_sector_list_id"), schema="public", From 072cc7047a27151adeba803f390ecacf5426c09d Mon Sep 17 00:00:00 2001 From: davidu1975 Date: Wed, 11 Dec 2024 14:55:59 +0000 Subject: [PATCH 15/16] Add tests for ingestion of nomis uk employee data from s3 --- dataservices/tests/conftest.py | 74 ++++++++++- dataservices/tests/test_ingestion_pattern.py | 129 +++++++++++++++++++ 2 files changed, 201 insertions(+), 2 deletions(-) diff --git a/dataservices/tests/conftest.py b/dataservices/tests/conftest.py index 79db7b95..2eab3115 100644 --- a/dataservices/tests/conftest.py +++ b/dataservices/tests/conftest.py @@ -130,7 +130,7 @@ def total_trade_records(countries): @pytest.fixture() def trade_in_services_records(countries): records = [ - {'code': '0', 'name': 'none value', 'exports': None, 'imports': None}, + {'code': '0', 'name': 'null value', 'exports': None, 'imports': None}, {'code': '1', 'name': 'first', 'exports': 6, 'imports': 1}, {'code': '2', 'name': 'second', 'exports': 5, 'imports': 1}, {'code': '3', 'name': 'third', 'exports': 4, 'imports': 1}, @@ -170,7 +170,7 @@ def trade_in_services_records(countries): def trade_in_goods_records(countries): for idx, iso2 in enumerate(['DE', 'FR', 'CN']): records = [ - {'code': '0', 'name': 'none value', 'exports': None, 'imports': None}, + {'code': '0', 'name': 'null value', 'exports': None, 'imports': None}, {'code': '1', 'name': 'first', 'exports': 6, 'imports': 1}, {'code': '2', 'name': 'second', 'exports': 5, 'imports': 1}, {'code': '3', 'name': 'third', 'exports': 4, 'imports': 1}, @@ -324,6 +324,76 @@ def business_cluster_information_data(): models.EYBBusinessClusterInformation.objects.all().delete() +@pytest.fixture +def sector_reference_dataset_data(): + yield [ + '{"id": 3, "field_04": "Advanced engineering", "full_sector_name": "Advanced engineering : Metallurgical process plant"}\n', # noqa: E501 + '{"id": 4, "field_04": "Advanced engineering", "full_sector_name": "Advanced engineering : Metals, minerals and materials"}\n', # noqa: E501 + '{"id": 38, "field_04": "Automotive", "full_sector_name": "Automotive"}\n', # noqa: E501 + ] + + +@pytest.fixture +def ref_sic_codes_mapping_data(): + yield [ + '{"id": 1, "sic_code": 1110, "mapping_id": "SIC-SEC-106", "updated_date": "2021-08-19T10:05:34.680837+00:00", "sic_description": "Growing of cereals (except rice), leguminous crops and oil seeds", "dit_sector_list_id": 21}\n', # noqa: E501 + '{"id": 2, "sic_code": 1120, "mapping_id": "SIC-SEC-107", "updated_date": "2021-08-19T10:05:34.689149+00:00", "sic_description": "Growing of rice", "dit_sector_list_id": 21}\n', # noqa: E501 + '{"id": 3, "sic_code": 1130, "mapping_id": "SIC-SEC-129", "updated_date": "2021-08-19T10:05:34.696666+00:00", "sic_description": "Growing of vegetables and melons, roots and tubers", "dit_sector_list_id": 31}\n', # noqa: E501 + ] + + +@pytest.fixture +def uk_business_employee_counts_data(): + yield [ + { + "geo_code": "K02000002", + "sic_code": "01", + "geo_description": "United Kingdom", + "sic_description": "Crop and animal production, hunting and related service activities", + "total_business_count": 132540, + "total_employee_count": None, + "business_count_release_year": 2023, + "employee_count_release_year": None, + "dbt_full_sector_name": "Metallurgical process plant", + "dbt_sector_name": "Advanced engineering", + }, # noqa: E501 + { + "geo_code": "K02000003", + "sic_code": "03", + "geo_description": "United Kingdom", + "sic_description": "Fishing and aquaculture", + "total_business_count": 4070, + "total_employee_count": None, + "business_count_release_year": 2023, + "employee_count_release_year": None, + "dbt_full_sector_name": "Metallurgical process plant", + "dbt_sector_name": "Advanced engineering", + }, # noqa: E501 + { + "geo_code": "K02000004", + "sic_code": "03", + "geo_description": "United Kingdom", + "sic_description": "Fishing and aquaculture", + "total_business_count": 4070, + "total_employee_count": None, + "business_count_release_year": 2023, + "employee_count_release_year": None, + "dbt_full_sector_name": "Automotive", + "dbt_sector_name": "Automotive", + }, # noqa: E501 + ] + + +@pytest.fixture +def uk_business_employee_counts_str_data(uk_business_employee_counts_data): + + data = [] + for line in uk_business_employee_counts_data: + line = json.dumps(line) + data.append(line) + yield data + + @pytest.fixture def eyb_salary_s3_data(): yield [ diff --git a/dataservices/tests/test_ingestion_pattern.py b/dataservices/tests/test_ingestion_pattern.py index 2308f059..6fcd144a 100644 --- a/dataservices/tests/test_ingestion_pattern.py +++ b/dataservices/tests/test_ingestion_pattern.py @@ -48,6 +48,20 @@ get_sectors_gva_value_bands_table, save_sectors_gva_value_bands_data, ) +from dataservices.management.commands.import_eyb_business_cluster_information import ( + get_ref_sic_codes_mapping_batch, + get_ref_sic_codes_mapping_postgres_table, + get_sector_reference_dataset_batch, + get_sector_reference_dataset_postgres_table, + get_uk_business_employee_counts_batch, + get_uk_business_employee_counts_postgres_table, + get_uk_business_employee_counts_postgres_tmp_table, + get_uk_business_employee_counts_tmp_batch, + save_ref_sic_codes_mapping_data, + save_sector_reference_dataset_data, + save_uk_business_employee_counts_tmp_data, +) + dbsector_data = [ { @@ -253,6 +267,54 @@ def test_import_postcode_data_set_from_s3( assert mock_save_postcode_data.call_count == 1 +uk_business_employee_counts = [ + { + "geo_code": "K02000002", + "sic_code": "01", + "geo_description": "United Kingdom", + "sic_description": "Crop and animal production, hunting and related service activities", + "total_business_count": 132540, + "total_employee_count": None, + "business_count_release_year": 2023, + "employee_count_release_year": None, + "dbt_full_sector_name": "Metallurgical process plant", + "dbt_sector_name": "Advanced engineering", + }, +] + + +@pytest.mark.django_db +@pytest.mark.parametrize("get_s3_file_data", [uk_business_employee_counts[0]], indirect=True) +@mock.patch( + 'dataservices.management.commands.import_eyb_business_cluster_information.save_uk_business_employee_counts_data' +) # noqa:E501 +@mock.patch( + 'dataservices.management.commands.import_eyb_business_cluster_information.save_uk_business_employee_counts_tmp_data' +) # noqa:E501 +@mock.patch( + 'dataservices.management.commands.import_eyb_business_cluster_information.save_ref_sic_codes_mapping_data' +) # noqa:E501 +@mock.patch( + 'dataservices.management.commands.import_eyb_business_cluster_information.save_sector_reference_dataset_data' +) # noqa:E501 +@mock.patch('dataservices.core.mixins.get_s3_file') +@mock.patch('dataservices.core.mixins.get_s3_paginator') +def test_import_eyb_business_cluster_information_from_s3( + mock_get_s3_paginator, + mock_get_s3_file, + mock_save_sector_reference_dataset_data, + mock_save_ref_sic_codes_mapping_data, + mock_save_uk_business_employee_counts_tmp_data, + mock_save_uk_business_employee_counts_data, + get_s3_file_data, + get_s3_data_transfer_data, +): + mock_get_s3_file.return_value = get_s3_file_data + mock_get_s3_paginator.return_value = get_s3_data_transfer_data + management.call_command('import_eyb_business_cluster_information') + assert mock_save_uk_business_employee_counts_data.call_count == 1 + + @pytest.mark.django_db @override_settings(DATABASE_URL='postgresql://') @mock.patch.object(pg_bulk_ingest, 'ingest', return_value=None) @@ -357,6 +419,73 @@ def test_get_postcode_batch(postcode_data): assert next(ret[2]) is not None +@pytest.mark.django_db +@override_settings(DATABASE_URL='postgresql://') +@mock.patch.object(pg_bulk_ingest, 'ingest', return_value=None) +@mock.patch.object(Engine, 'connect') +def test_save_get_uk_business_employee_counts_tmp(mock_connection, mock_ingest, uk_business_employee_counts_str_data): + mock_connection.return_value.__enter__.return_value = mock.MagicMock() + save_uk_business_employee_counts_tmp_data(data=uk_business_employee_counts_str_data) + assert mock_ingest.call_count == 1 + + +@pytest.mark.django_db +def test_get_uk_business_employee_counts_tmp_batch(uk_business_employee_counts_str_data): + metadata = sa.MetaData() + ret = get_uk_business_employee_counts_tmp_batch( + uk_business_employee_counts_str_data, + get_uk_business_employee_counts_postgres_tmp_table(metadata, 'tmp_nomis_table'), + ) + assert next(ret[2]) is not None + + +@pytest.mark.django_db +def test_get_uk_business_employee_counts_batch(uk_business_employee_counts_data): + metadata = sa.MetaData() + ret = get_uk_business_employee_counts_batch( + uk_business_employee_counts_data, get_uk_business_employee_counts_postgres_table(metadata, 'nomis_table') + ) + assert next(ret[2]) is not None + + +@pytest.mark.django_db +@override_settings(DATABASE_URL='postgresql://') +@mock.patch.object(pg_bulk_ingest, 'ingest', return_value=None) +@mock.patch.object(Engine, 'connect') +def test_save_ref_sic_codes_mapping(mock_connection, mock_ingest, ref_sic_codes_mapping_data): + mock_connection.return_value.__enter__.return_value = mock.MagicMock() + save_ref_sic_codes_mapping_data(data=ref_sic_codes_mapping_data) + assert mock_ingest.call_count == 1 + + +@pytest.mark.django_db +def test_get_ref_sic_codes_mapping_batch(ref_sic_codes_mapping_data): + metadata = sa.MetaData() + ret = get_ref_sic_codes_mapping_batch( + ref_sic_codes_mapping_data, get_ref_sic_codes_mapping_postgres_table(metadata, 'tmp_sic+_codes_mapping_ref') + ) + assert next(ret[2]) is not None + + +@pytest.mark.django_db +@override_settings(DATABASE_URL='postgresql://') +@mock.patch.object(pg_bulk_ingest, 'ingest', return_value=None) +@mock.patch.object(Engine, 'connect') +def test_save_sector_reference_dataset(mock_connection, mock_ingest, sector_reference_dataset_data): + mock_connection.return_value.__enter__.return_value = mock.MagicMock() + save_sector_reference_dataset_data(data=sector_reference_dataset_data) + assert mock_ingest.call_count == 1 + + +@pytest.mark.django_db +def test_get_sector_reference_datase_batch(sector_reference_dataset_data): + metadata = sa.MetaData() + ret = get_sector_reference_dataset_batch( + sector_reference_dataset_data, get_sector_reference_dataset_postgres_table(metadata, 'tmp_sector_ref') + ) + assert next(ret[2]) is not None + + @pytest.mark.django_db @patch.object(Paginator, 'paginate') def test_get_s3_paginator(mock_paginate, get_s3_data_transfer_data): From 3a770f3f6e746619ab21ba086d92403112a5f5a3 Mon Sep 17 00:00:00 2001 From: davidu1975 Date: Thu, 12 Dec 2024 10:55:26 +0000 Subject: [PATCH 16/16] removed duplicate code --- .../management/commands/import_dbt_sectors.py | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/dataservices/management/commands/import_dbt_sectors.py b/dataservices/management/commands/import_dbt_sectors.py index 8c9a92d2..0e03f5d8 100644 --- a/dataservices/management/commands/import_dbt_sectors.py +++ b/dataservices/management/commands/import_dbt_sectors.py @@ -29,23 +29,6 @@ def get_table_data(): ) ) - for dbt_sector in data: - json_data = json.loads(dbt_sector) - yield ( - ( - data_table, - ( - json_data['id'], - json_data['field_01'], - json_data['full_sector_name'], - json_data['sector_cluster__april_2023'], - json_data['field_04'], - json_data['field_05'], - json_data['field_02'], - ), - ) - ) - return ( None, None,