From 43511eb3c7c772cc0402f26a3512e2227993317f Mon Sep 17 00:00:00 2001 From: Benjamin K <53038537+treee111@users.noreply.github.com> Date: Thu, 29 Sep 2022 23:17:32 +0200 Subject: [PATCH] [FEAT] Refactor a lot to decouple and make maintenance easier (focus osm_maps_functions.py) (#152) * import file_directory functions one by one - and not by importing the whole file "as" fd_fct * create OsmData object and insert into class OsmMaps * move check for -co and -xy to input.py * shuffle a lot * go throught calc_border_countries also when no border countries are wanted * break OsmData in small pieces * refactor border country calc + fix unittests * force_processing refactoring, all checks in OsmData now! - move to function process_input_of_the_tool - move corresponding stuff from check_and_download_files also - additional needed changes - mainiy in downloader.py - adjust tests & do nearly refactoring * import needed functions + pylint findings * refactor imports to "single import" and not the whole file * fix pylint finding * Bump to version v2.1.0a13 --- setup.cfg | 2 +- tests/test_downloader.py | 34 ++++ tests/test_geofabrik.py | 2 +- tests/test_osm_maps.py | 35 ++-- wahoomc/downloader.py | 111 ++++++----- wahoomc/geofabrik.py | 17 +- wahoomc/input.py | 3 + wahoomc/main.py | 14 +- wahoomc/osm_maps_functions.py | 334 ++++++++++++++++++++-------------- wahoomc/setup_functions.py | 19 +- 10 files changed, 342 insertions(+), 229 deletions(-) diff --git a/setup.cfg b/setup.cfg index 8661e31d..b66013af 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = wahoomc -version = 2.0.2 +version = 2.1.0a13 author = Benjamin Kreuscher author_email = benni.kreuscher@gmail.com description = Create maps for your Wahoo bike computer based on latest OSM maps diff --git a/tests/test_downloader.py b/tests/test_downloader.py index eca2d16b..a8332927 100644 --- a/tests/test_downloader.py +++ b/tests/test_downloader.py @@ -83,6 +83,25 @@ def test_download_geofabrik_file(self): self.assertTrue(os.path.exists(path)) + def test_download_geofabrik_file_2(self): + """ + Test the download of geofabrik file via URL. + check & download via built functions of downloader class + """ + path = os.path.join(constants.USER_DL_DIR, 'geofabrik.json') + + if os.path.exists(path): + os.remove(path) + + self.assertTrue( + self.o_downloader.should_geofabrik_file_be_downloaded()) + + self.o_downloader.download_geofabrik_file() + + self.assertTrue(os.path.exists(path)) + self.assertFalse( + self.o_downloader.should_geofabrik_file_be_downloaded()) + def test_download_malta_osm_pbf_file(self): """ Test the download of geofabrik file via URL @@ -115,6 +134,21 @@ def test_delete_not_existing_file(self): with self.assertRaises(FileNotFoundError): os.remove(path) + def test_check_dl_needed_geofabrik(self): + """ + Test if geofabrik file needs to be downloaded + """ + path = os.path.join(constants.USER_DL_DIR, 'geofabrik.json') + + if os.path.exists(path): + os.remove(path) + + self.o_downloader.max_days_old = 24 + # self.o_downloader.check_geofabrik_file() + + self.assertTrue( + self.o_downloader.should_geofabrik_file_be_downloaded()) + if __name__ == '__main__': unittest.main() diff --git a/tests/test_geofabrik.py b/tests/test_geofabrik.py index 10d06a80..e168ec44 100644 --- a/tests/test_geofabrik.py +++ b/tests/test_geofabrik.py @@ -46,7 +46,7 @@ def setUp(self): if not os.path.isfile(constants.GEOFABRIK_PATH): o_downloader = Downloader(24, False) - o_downloader.check_and_download_geofabrik_if_needed() + o_downloader.download_geofabrik_file() def test_tiles_via_geofabrik_malta(self): """ diff --git a/tests/test_osm_maps.py b/tests/test_osm_maps.py index d1da7688..b8ccb1c4 100644 --- a/tests/test_osm_maps.py +++ b/tests/test_osm_maps.py @@ -8,7 +8,7 @@ # import custom python packages # sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) -from wahoomc.osm_maps_functions import OsmMaps +from wahoomc.osm_maps_functions import OsmData from wahoomc.osm_maps_functions import get_tile_by_one_xy_combination_from_jsons from wahoomc.osm_maps_functions import get_xy_coordinates_from_input from wahoomc.osm_maps_functions import TileNotFoundError @@ -23,10 +23,7 @@ class TestOsmMaps(unittest.TestCase): tests for the OSM maps file """ - def setUp(self): - o_input_data = InputData() - - self.o_osm_maps = OsmMaps(o_input_data) + # def setUp(self): def test_input_country_malta(self): """ @@ -34,10 +31,13 @@ def test_input_country_malta(self): check, if the given input-parameter is saved to the OsmMaps instance """ - self.o_osm_maps.o_input_data.country = 'malta' - self.o_osm_maps.process_input(True) + o_input_data = InputData() + o_input_data.country = 'malta' + + o_osm_data = OsmData() + o_osm_data.process_input_of_the_tool(o_input_data) - result = self.o_osm_maps.country_name + result = o_osm_data.country_name self.assertEqual(result, 'malta') def test_calc_border_countries_input_country(self): @@ -123,16 +123,27 @@ def process_and_check_border_countries(self, inp_val, calc_border_c, exp_result, helper method to process a country or json file and check the calculated border countries """ + o_input_data = InputData() if inp_mode == 'country': - self.o_osm_maps.o_input_data.country = inp_val + o_input_data.country = inp_val elif inp_mode == 'xy_coordinate': - self.o_osm_maps.o_input_data.xy_coordinates = inp_val + o_input_data.xy_coordinates = inp_val + o_input_data.process_border_countries = calc_border_c + + o_osm_data = OsmData() + o_osm_data.process_input_of_the_tool(o_input_data) - self.o_osm_maps.process_input(calc_border_c) - result = self.o_osm_maps.border_countries + result = o_osm_data.border_countries + # delete the path to the file, here, only the correct border countries are checked + for res in result: + result[res] = {} + + # self.assertEqual(result, exp_result) self.assertEqual(result, exp_result) + # self.assertDictEqual() Equal(tIn(member=exp_result, container=result) + class TestStaticJsons(unittest.TestCase): """ diff --git a/wahoomc/downloader.py b/wahoomc/downloader.py index 35319883..a50b4fab 100644 --- a/wahoomc/downloader.py +++ b/wahoomc/downloader.py @@ -12,15 +12,13 @@ import logging # import custom python packages -from wahoomc import file_directory_functions as fd_fct -from wahoomc import constants_functions as const_fct +from wahoomc.file_directory_functions import download_url_to_file, unzip +from wahoomc.constants_functions import translate_country_input_to_geofabrik, get_geofabrik_region_of_country -# User from wahoomc.constants import USER_DL_DIR from wahoomc.constants import USER_MAPS_DIR from wahoomc.constants import LAND_POLYGONS_PATH from wahoomc.constants import GEOFABRIK_PATH -from wahoomc.constants import USER_OUTPUT_DIR log = logging.getLogger('main-logger') @@ -46,14 +44,14 @@ def download_file(target_filepath, url, is_zip): last_part = url.rsplit('/', 1)[-1] dl_file_path = os.path.join(USER_DL_DIR, last_part) # download URL to file - fd_fct.download_url_to_file(url, dl_file_path) + download_url_to_file(url, dl_file_path) # unpack it - should work on macOS and Windows - fd_fct.unzip(dl_file_path, USER_DL_DIR) + unzip(dl_file_path, USER_DL_DIR) # delete .zip file os.remove(dl_file_path) else: # no zipping --> directly download to given target filepath - fd_fct.download_url_to_file(url, target_filepath) + download_url_to_file(url, target_filepath) # Check if file exists (if target file exists) if not os.path.isfile(target_filepath): log.error('! failed to find %s', target_filepath) @@ -67,8 +65,8 @@ def get_osm_pbf_filepath_url(country): build the geofabrik download url to a countries' OSM file and download filepath """ - transl_c = const_fct.translate_country_input_to_geofabrik(country) - region = const_fct.get_geofabrik_region_of_country(country) + transl_c = translate_country_input_to_geofabrik(country) + region = get_geofabrik_region_of_country(country) if region != 'no': url = 'https://download.geofabrik.de/' + region + \ '/' + transl_c + '-latest.osm.pbf' @@ -87,69 +85,61 @@ class Downloader: This is the class to check and download maps / artifacts" """ - def __init__(self, max_days_old, force_download): + def __init__(self, max_days_old, force_download, tiles_from_json=None, border_countries=None): self.max_days_old = max_days_old self.force_download = force_download - self.tiles_from_json = [] - self.border_countries = {} + self.tiles_from_json = tiles_from_json + self.border_countries = border_countries - def check_and_download_geofabrik_if_needed(self): + self.need_to_dl = [] + + def should_geofabrik_file_be_downloaded(self): """ check geofabrik file if not existing or is not up-to-date - and download if needed - """ - - force_processing = False + # if geofabrik file needs to be downloaded, force-processing is set because there might be + # a change in the geofabrik file + """ if self.check_file(GEOFABRIK_PATH) is True or \ self.force_download is True: log.info('# Need to download geofabrik file') - download_file(GEOFABRIK_PATH, - 'https://download.geofabrik.de/index-v1.json', False) - force_processing = True - log.info('+ check geofabrik.json file: OK') + return True - return force_processing + return False - def check_and_download_files_if_needed(self): + def download_geofabrik_file(self): """ - check land_polygons and OSM map files if not existing or are not up-to-date - and download if needed + download geofabrik file """ + download_file(GEOFABRIK_PATH, + 'https://download.geofabrik.de/index-v1.json', False) - force_processing = False + log.info('+ download geofabrik.json file: OK') + + def check_land_polygons_file(self): + """ + check land_polygons file if not existing or are not up-to-date + """ if self.check_file(LAND_POLYGONS_PATH) is True or \ self.force_download is True: log.info('# Need to download land polygons file') + self.need_to_dl.append('land_polygons') + + def download_files_if_needed(self): + """ + check land_polygons and OSM map files if not existing or are not up-to-date + and download if needed + """ + if 'land_polygons' in self.need_to_dl: download_file(LAND_POLYGONS_PATH, 'https://osmdata.openstreetmap.de/download/land-polygons-split-4326.zip', True) - force_processing = True - - log.info('+ check land_polygons.shp file: OK') - - if self.check_osm_pbf_file() is True or self.force_download is True: - # trigger download of relevant countries' OSM files - for country, item in self.border_countries.items(): - try: - if item['download'] is True: - map_file_path, url = get_osm_pbf_filepath_url(country) - download_file(map_file_path, url, False) - self.border_countries[country] = { - 'map_file': map_file_path} - except KeyError: - pass - force_processing = True + # log.info('+ download land_polygons.shp file: OK') - log.info('+ Check countries .osm.pbf files: OK') - - # if force_processing is True: - fd_fct.create_empty_directories( - USER_OUTPUT_DIR, self.tiles_from_json) - - return force_processing + if 'osm_pbf' in self.need_to_dl: + self.download_osm_pbf_file() def check_file(self, target_filepath): """ @@ -186,7 +176,6 @@ def check_osm_pbf_file(self): check if the relevant countries' OSM files are up-to-date """ - need_to_download = False log.info('-' * 80) log.info('# check countries .osm.pbf files') @@ -197,7 +186,7 @@ def check_osm_pbf_file(self): # get translated country (geofabrik) of country # do not download the same file for different countries # --> e.g. China, Hong Kong and Macao, see Issue #11 - transl_c = const_fct.translate_country_input_to_geofabrik(country) + transl_c = translate_country_input_to_geofabrik(country) # check for already existing .osm.pbf file map_file_path = glob.glob( @@ -212,7 +201,7 @@ def check_osm_pbf_file(self): log.info( '+ mapfile for %s: deleted. Input: %s.', transl_c, country) os.remove(map_file_path[0]) - need_to_download = True + self.need_to_dl.append('osm_pbf') else: self.border_countries[country] = { 'map_file': map_file_path[0]} @@ -223,11 +212,21 @@ def check_osm_pbf_file(self): map_file_path = self.border_countries[country].get('map_file') if map_file_path is None or (not os.path.isfile(map_file_path) or self.force_download): self.border_countries[country]['download'] = True - need_to_download = True - - # self.border_countries = border_countries + self.need_to_dl.append('osm_pbf') - return need_to_download + def download_osm_pbf_file(self): + """ + download countries' OSM files + """ + for country, item in self.border_countries.items(): + try: + if item['download'] is True: + map_file_path, url = get_osm_pbf_filepath_url(country) + download_file(map_file_path, url, False) + self.border_countries[country] = { + 'map_file': map_file_path} + except KeyError: + pass def should_file_be_downloaded(self, file_path): """ diff --git a/wahoomc/geofabrik.py b/wahoomc/geofabrik.py index c702d1ab..94c98d8f 100644 --- a/wahoomc/geofabrik.py +++ b/wahoomc/geofabrik.py @@ -12,7 +12,8 @@ from shapely.geometry import Polygon, shape # import custom python packages -from wahoomc import constants +from wahoomc.constants import GEOFABRIK_PATH +from wahoomc.constants import special_regions, geofabrik_regions, block_download log = logging.getLogger('main-logger') @@ -142,7 +143,7 @@ def geom(wanted): input parameter is the name of the desired country/region as use by Geofabric in their json file. """ - with open(constants.GEOFABRIK_PATH, encoding='utf8') as file_handle: + with open(GEOFABRIK_PATH, encoding='utf8') as file_handle: data = geojson.load(file_handle) file_handle.close() @@ -196,7 +197,7 @@ def find_needed_countries(bbox_tiles, wanted_map, wanted_region_polygon): """ output = [] - with open(constants.GEOFABRIK_PATH, encoding='utf8') as file_handle: + with open(GEOFABRIK_PATH, encoding='utf8') as file_handle: geofabrik_json_data = geojson.load(file_handle) file_handle.close() @@ -244,19 +245,19 @@ def find_needed_countries(bbox_tiles, wanted_map, wanted_region_polygon): if rshape.intersects(poly): # if so # catch special_regions like (former) colonies where the map of the region is not fysically in the map of the parent country. # example Guadeloupe, it's parent country is France but Guadeloupe is not located within the region covered by the map of France - if wanted_map not in constants.special_regions: + if wanted_map not in special_regions: # If we are proseccing a sub-region add the parent of this sub-region # to the must download list. # This to prevent downloading several small regions AND it's containing region # we are processing a sub-regiongo find the parent region: - if parent not in constants.geofabrik_regions and regionname not in constants.geofabrik_regions: + if parent not in geofabrik_regions and regionname not in geofabrik_regions: # we are processing a sub-regiongo find the parent region x_value = 0 # handle sub-sub-regions like unterfranken->bayern->germany - while parent not in constants.geofabrik_regions: + while parent not in geofabrik_regions: parent, child = find_geofbrik_parent( parent, geofabrik_json_data) - if parent in constants.geofabrik_regions: + if parent in geofabrik_regions: parent = child break if x_value > 10: # prevent endless loop @@ -288,7 +289,7 @@ def find_needed_countries(bbox_tiles, wanted_map, wanted_region_polygon): if regionname != wanted_map: # check if we are processing a country or a sub-region. # For countries only process other countries. also block special geofabrik sub regions - if parent in constants.geofabrik_regions and regionname not in constants.block_download: + if parent in geofabrik_regions and regionname not in block_download: # processing a country and no special sub-region # check if rshape is subset of desired region. If so discard it if wanted_region_polygon.contains(rshape): diff --git a/wahoomc/input.py b/wahoomc/input.py index 1f200498..5b3fa337 100644 --- a/wahoomc/input.py +++ b/wahoomc/input.py @@ -172,6 +172,9 @@ def is_required_input_given_or_exit(self, issue_message): "Or in the GUI select a country to create maps for.") else: sys.exit() + elif self.country and self.xy_coordinates: + sys.exit( + "Country and X/Y coordinates are given. Only one of both is allowed!") else: return True diff --git a/wahoomc/main.py b/wahoomc/main.py index 681a6667..b9589d55 100644 --- a/wahoomc/main.py +++ b/wahoomc/main.py @@ -11,6 +11,7 @@ from wahoomc.setup_functions import initialize_work_directories from wahoomc.setup_functions import move_old_content_into_new_dirs from wahoomc.osm_maps_functions import OsmMaps +from wahoomc.osm_maps_functions import OsmData # logging used in the terminal output: # # means top-level command @@ -35,13 +36,14 @@ def run(): initialize_work_directories() move_old_content_into_new_dirs() - o_osm_maps = OsmMaps(o_input_data) + o_osm_data = OsmData() + # Check for not existing or expired files. Mark for download, if dl is needed + o_downloader = o_osm_data.process_input_of_the_tool(o_input_data) - # Read json file - # Check for expired land polygons file and download, if too old - # Check for expired .osm.pbf files and download, if too old - o_osm_maps.process_input(o_input_data.process_border_countries) - o_osm_maps.check_and_download_files() + # Download files marked for download + o_downloader.download_files_if_needed() + + o_osm_maps = OsmMaps(o_osm_data) if o_input_data.only_merge is False: # Filter tags from country osm.pbf files' diff --git a/wahoomc/osm_maps_functions.py b/wahoomc/osm_maps_functions.py index 9d8df8c6..1b320849 100644 --- a/wahoomc/osm_maps_functions.py +++ b/wahoomc/osm_maps_functions.py @@ -15,7 +15,9 @@ import logging # import custom python packages -from wahoomc import file_directory_functions as fd_fct +from wahoomc.file_directory_functions import get_tooling_win_path, read_json_file, \ + get_folders_in_folder, get_filenames_of_jsons_in_folder, create_empty_directories, \ + get_tag_wahoo_xml_path, TagWahooXmlNotFoundError from wahoomc.constants_functions import get_path_to_static_tile_json, translate_tags_to_keep from wahoomc.constants import USER_WAHOO_MC @@ -60,11 +62,11 @@ def get_tile_by_one_xy_combination_from_jsons(xy_combination): # go through all files in all folders of the "json" directory file_path_jsons = os.path.join(RESOURCES_DIR, 'json') - for folder in fd_fct.get_folders_in_folder(file_path_jsons): - for file in fd_fct.get_filenames_of_jsons_in_folder(os.path.join(file_path_jsons, folder)): + for folder in get_folders_in_folder(file_path_jsons): + for file in get_filenames_of_jsons_in_folder(os.path.join(file_path_jsons, folder)): # get content of json in folder - content = fd_fct.read_json_file( + content = read_json_file( os.path.join(file_path_jsons, folder, file + '.json')) # check tiles values against input x/y combination @@ -100,95 +102,164 @@ def run_subprocess_and_log_output(cmd, error_message, cwd=""): sys.exit() -class OsmMaps: +class OsmData(): # pylint: disable=too-few-public-methods """ - This is a OSM data class + object with all internal parameters to process maps """ - osmosis_win_file_path = fd_fct.get_tooling_win_path( - ['Osmosis', 'bin', 'osmosis.bat']) - - # Number of workers for the Osmosis read binary fast function - workers = '1' - - def __init__(self, o_input_data): - self.force_processing = '' - + def __init__(self): + """ + xxx + """ + self.force_processing = False self.tiles = [] self.border_countries = {} self.country_name = '' - self.o_input_data = o_input_data - self.o_downloader = Downloader( - o_input_data.max_days_old, o_input_data.force_download) - - if 8 * struct.calcsize("P") == 32: - self.osmconvert_path = fd_fct.get_tooling_win_path(['osmconvert']) - else: - self.osmconvert_path = fd_fct.get_tooling_win_path( - ['osmconvert64-0.8.8p']) - - def process_input(self, calc_border_countries): + def process_input_of_the_tool(self, o_input_data): """ Process input: get relevant tiles and if border countries should be calculated The three primary inputs are giving by a separate value each and have separate processing: 1. country name 2. x/y combinations + + # Check for not existing or expired files. Mark for download, if dl is needed + # - land polygons file + # - .osm.pbf files + + steps in this function: + 1. take over input paramters (force_processing is changed in the function further down) + 2. check + download geofabrik file if geofabrik-processing + 3. calculate relevant tiles for map creation + 4. calculate border countries for map creation + 5. evaluate the country-name for folder cration during processing + 6. calculate if force_processing should be set to true """ + o_downloader = Downloader( + o_input_data.max_days_old, o_input_data.force_download, self.tiles, self.border_countries) + # takeover what is given by user first for force_processing + self.force_processing = o_input_data.force_processing + log.info('-' * 80) - if self.o_input_data.country and self.o_input_data.xy_coordinates: - log.error( - "! country and X/Y coordinates are given. Only one of both is allowed!") - sys.exit() + # geofabrik file + if o_input_data.geofabrik_tiles and \ + o_downloader.should_geofabrik_file_be_downloaded(): + self.force_processing = True + o_downloader.download_geofabrik_file() - # option 1: input a country as parameter, e.g. germany - if self.o_input_data.country: - log.info('# Input country: %s.', self.o_input_data.country) + # calc tiles + if o_input_data.country: + self.calc_tiles_country(o_input_data) + elif o_input_data.xy_coordinates: + self.calc_tiles_xy(o_input_data) - # option 1a: use Geofabrik-URL to calculate the relevant tiles - if self.o_input_data.geofabrik_tiles: - self.force_processing = self.o_downloader.check_and_download_geofabrik_if_needed() + # calc border countries + log.info('-' * 80) + if o_input_data.country: + self.calc_border_countries_country(o_input_data) + elif o_input_data.xy_coordinates: + self.calc_border_countries() + # log border countries when and when not calculated to output the processed country(s) + self.log_border_countries() + + # calc country name + if o_input_data.country: + # country name is the input argument + self.country_name = o_input_data.country + elif o_input_data.xy_coordinates: + self.calc_country_name_xy() - o_geofabrik = Geofabrik(self.o_input_data.country) - self.tiles = o_geofabrik.get_tiles_of_country() + # calc force processing + # Check for not existing or expired files. Mark for download, if dl is needed + o_downloader.check_land_polygons_file() + o_downloader.check_osm_pbf_file() - # option 1b: use static json files in the repo to calculate relevant tiles - else: - json_file_path = get_path_to_static_tile_json( - self.o_input_data.country) - self.tiles = fd_fct.read_json_file(json_file_path) + # If one of the files needs to be downloaded, reprocess all files + if o_downloader.need_to_dl: + self.force_processing = True - # country name is the input argument - self.country_name = self.o_input_data.country + return o_downloader - # option 2: input a x/y combinations as parameter, e.g. 134/88 or 133/88,130/100 - elif self.o_input_data.xy_coordinates: - log.info( - '# Input X/Y coordinates: %s.', self.o_input_data.xy_coordinates) + def calc_tiles_country(self, o_input_data): + """ + option 1: input a country as parameter, e.g. germany + """ + log.info('# Input country: %s.', o_input_data.country) - # option 2a: use Geofabrik-URL to get the relevant tiles - if self.o_input_data.geofabrik_tiles: - sys.exit("X/Y coordinated via Geofabrik not implemented now") + # option 1a: use Geofabrik-URL to calculate the relevant tiles + if o_input_data.geofabrik_tiles: + o_geofabrik = Geofabrik(o_input_data.country) + self.tiles = o_geofabrik.get_tiles_of_country() - # option 2b: use static json files in the repo to get relevant tiles - else: - xy_coordinates = get_xy_coordinates_from_input( - self.o_input_data.xy_coordinates) + # option 1b: use static json files in the repo to calculate relevant tiles + else: + json_file_path = get_path_to_static_tile_json( + o_input_data.country) + self.tiles = read_json_file(json_file_path) - # loop through x/y combinations and find each tile in the json files - self.find_tiles_for_xy_combinations(xy_coordinates) + def calc_tiles_xy(self, o_input_data): + """ + option 2: input a x/y combinations as parameter, e.g. 134/88 or 133/88,130/100 + """ + log.info( + '# Input X/Y coordinates: %s.', o_input_data.xy_coordinates) - # calc border country when input X/Y coordinates - calc_border_countries = True + # option 2a: use Geofabrik-URL to get the relevant tiles + if o_input_data.geofabrik_tiles: + sys.exit("X/Y coordinated via Geofabrik not implemented now") - # Build list of countries needed - self.border_countries = {} - if calc_border_countries: - self.calc_border_countries(calc_border_countries) + # option 2b: use static json files in the repo to get relevant tiles else: - self.border_countries[self.country_name] = {} + xy_coordinates = get_xy_coordinates_from_input( + o_input_data.xy_coordinates) + + # loop through x/y combinations and find each tile in the json files + for xy_comb in xy_coordinates: + try: + self.tiles.append(get_tile_by_one_xy_combination_from_jsons( + xy_comb)) + + except TileNotFoundError: + pass + + def calc_border_countries_country(self, o_input_data): + """ + calculate the border countries for the given tiles when input is a country + - if CLI/GUI input by user + """ + if o_input_data.process_border_countries: + self.calc_border_countries() + # set the to-be-processed country as border country + else: + self.border_countries[o_input_data.country] = {} + + def calc_border_countries(self): + """ + calculate the border countries for the given tiles. i.e. + - if CLI/GUI input by user + - if processing x/y coordinates + """ + log.info('# Determine involved/border countries') + + # Build list of countries needed + for tile in self.tiles: + for country in tile['countries']: + if country not in self.border_countries: + self.border_countries[country] = {} + + def log_border_countries(self): + """ + write calculated border countries/involved countries to log + """ + for country in self.border_countries: + log.info('+ Involved country: %s', country) + + # input can be only one country, if there are more than one, + # border countries must be selected + if len(self.border_countries) > 1: + log.info('+ Border countries will be processed') def find_tiles_for_xy_combinations(self, xy_coordinates): """ @@ -199,54 +270,43 @@ def find_tiles_for_xy_combinations(self, xy_coordinates): self.tiles.append(get_tile_by_one_xy_combination_from_jsons( xy_comb)) - # country name is the X/Y combinations separated by minus - # >1 x/y combinations are separated by underscore - if not self.country_name: - self.country_name = f'{xy_comb["x"]}-{xy_comb["y"]}' - else: - self.country_name = f'{self.country_name}_{xy_comb["x"]}-{xy_comb["y"]}' - except TileNotFoundError: pass - def check_and_download_files(self): + def calc_country_name_xy(self): """ - trigger check of land_polygons and OSM map files if not existing or are not up-to-date + country name is the X/Y combinations separated by minus + >1 x/y combinations are separated by underscore """ + for tile in self.tiles: + if not self.country_name: + self.country_name = f'{tile["x"]}-{tile["y"]}' + else: + self.country_name = f'{self.country_name}_{tile["x"]}-{tile["y"]}' - self.o_downloader.tiles_from_json = self.tiles - self.o_downloader.border_countries = self.border_countries - - force_processing = self.o_downloader.check_and_download_files_if_needed() - # if download is needed or force_processing given via input --> force_processing = True - if force_processing or self.o_input_data.force_processing or self.force_processing: - self.force_processing = True - else: - self.force_processing = False +class OsmMaps: + """ + This is a OSM data class + """ - def calc_border_countries(self, calc_border_countries): - """ - calculate relevant border countries for the given tiles - """ + osmosis_win_file_path = get_tooling_win_path( + ['Osmosis', 'bin', 'osmosis.bat']) - log.info('-' * 80) - if calc_border_countries: - log.info('# Determine involved/border countries') + # Number of workers for the Osmosis read binary fast function + workers = '1' - # Build list of countries needed - for tile in self.tiles: - for country in tile['countries']: - if country not in self.border_countries: - self.border_countries[country] = {} + def __init__(self, o_osm_data): + self.o_osm_data = o_osm_data - # log.info('+ Count of involved countries: %s', - # len(self.border_countries)) - for country in self.border_countries: - log.info('+ Involved country: %s', country) + if 8 * struct.calcsize("P") == 32: + self.osmconvert_path = get_tooling_win_path(['osmconvert']) + else: + self.osmconvert_path = get_tooling_win_path( + ['osmconvert64-0.8.8p']) - if calc_border_countries and len(self.border_countries) > 1: - log.info('+ Border countries will be processed') + create_empty_directories( + USER_OUTPUT_DIR, self.o_osm_data.tiles) def filter_tags_from_country_osm_pbf_files(self): """ @@ -258,7 +318,7 @@ def filter_tags_from_country_osm_pbf_files(self): # Windows if platform.system() == "Windows": - for key, val in self.border_countries.items(): + for key, val in self.o_osm_data.border_countries.items(): out_file_o5m = os.path.join(USER_OUTPUT_DIR, f'outFile-{key}.o5m') out_file_o5m_filtered = os.path.join(USER_OUTPUT_DIR, @@ -266,7 +326,7 @@ def filter_tags_from_country_osm_pbf_files(self): out_file_o5m_filtered_names = os.path.join(USER_OUTPUT_DIR, f'outFileFiltered-{key}-Names.o5m') - if not os.path.isfile(out_file_o5m_filtered) or self.force_processing is True: + if not os.path.isfile(out_file_o5m_filtered) or self.o_osm_data.force_processing is True: log.info('+ Converting map of %s to o5m format', key) cmd = [self.osmconvert_path] cmd.extend(['-v', '--hash-memory=2500', '--complete-ways', @@ -280,7 +340,7 @@ def filter_tags_from_country_osm_pbf_files(self): log.info( '+ Filtering unwanted map objects out of map of %s', key) - cmd = [fd_fct.get_tooling_win_path(['osmfilter'])] + cmd = [get_tooling_win_path(['osmfilter'])] cmd.append(out_file_o5m) cmd.append( '--keep="' + translate_tags_to_keep(sys_platform=platform.system()) + '"') @@ -291,7 +351,7 @@ def filter_tags_from_country_osm_pbf_files(self): run_subprocess_and_log_output( cmd, '! Error in OSMFilter with country: {key}') - cmd = [fd_fct.get_tooling_win_path(['osmfilter'])] + cmd = [get_tooling_win_path(['osmfilter'])] cmd.append(out_file_o5m) cmd.append( '--keep="' + translate_tags_to_keep( @@ -311,12 +371,12 @@ def filter_tags_from_country_osm_pbf_files(self): # Non-Windows else: - for key, val in self.border_countries.items(): + for key, val in self.o_osm_data.border_countries.items(): out_file_o5m_filtered = os.path.join(USER_OUTPUT_DIR, f'filtered-{key}.o5m.pbf') out_file_o5m_filtered_names = os.path.join(USER_OUTPUT_DIR, f'outFileFiltered-{key}-Names.o5m.pbf') - if not os.path.isfile(out_file_o5m_filtered) or self.force_processing is True: + if not os.path.isfile(out_file_o5m_filtered) or self.o_osm_data.force_processing is True: log.info('+ Create filtered country file for %s', key) # https://docs.osmcode.org/osmium/latest/osmium-tags-filter.html @@ -354,16 +414,16 @@ def generate_land(self): log.info('# Generate land') tile_count = 1 - for tile in self.tiles: + for tile in self.o_osm_data.tiles: land_file = os.path.join(USER_OUTPUT_DIR, f'{tile["x"]}', f'{tile["y"]}', 'land.shp') out_file = os.path.join(USER_OUTPUT_DIR, f'{tile["x"]}', f'{tile["y"]}', 'land') # create land.dbf, land.prj, land.shp, land.shx - if not os.path.isfile(land_file) or self.force_processing is True: + if not os.path.isfile(land_file) or self.o_osm_data.force_processing is True: log.info( - '+ Generate land %s of %s for Coordinates: %s,%s', tile_count, len(self.tiles), tile["x"], tile["y"]) + '+ Generate land %s of %s for Coordinates: %s,%s', tile_count, len(self.o_osm_data.tiles), tile["x"], tile["y"]) cmd = ['ogr2ogr', '-overwrite', '-skipfailures'] # Try to prevent getting outside of the +/-180 and +/- 90 degrees borders. Normally the +/- 0.1 are there to prevent white lines at border borders. if tile["x"] == 255 or tile["y"] == 255 or tile["x"] == 0 or tile["y"] == 0: @@ -383,7 +443,7 @@ def generate_land(self): cmd, f'! Error generating land for tile: {tile["x"]},{tile["y"]}') # create land1.osm - if not os.path.isfile(out_file+'1.osm') or self.force_processing is True: + if not os.path.isfile(out_file+'1.osm') or self.o_osm_data.force_processing is True: # Windows if platform.system() == "Windows": cmd = ['python', os.path.join(RESOURCES_DIR, @@ -409,12 +469,12 @@ def generate_sea(self): log.info('# Generate sea') tile_count = 1 - for tile in self.tiles: + for tile in self.o_osm_data.tiles: out_file = os.path.join(USER_OUTPUT_DIR, f'{tile["x"]}', f'{tile["y"]}', 'sea.osm') - if not os.path.isfile(out_file) or self.force_processing is True: + if not os.path.isfile(out_file) or self.o_osm_data.force_processing is True: log.info( - '+ Generate sea %s of %s for Coordinates: %s,%s', tile_count, len(self.tiles), tile["x"], tile["y"]) + '+ Generate sea %s of %s for Coordinates: %s,%s', tile_count, len(self.o_osm_data.tiles), tile["x"], tile["y"]) with open(os.path.join(RESOURCES_DIR, 'sea.osm'), encoding="utf-8") as sea_file: sea_data = sea_file.read() @@ -452,20 +512,20 @@ def split_filtered_country_files_to_tiles(self): log.info('-' * 80) log.info('# Split filtered country files to tiles') tile_count = 1 - for tile in self.tiles: + for tile in self.o_osm_data.tiles: - for country, val in self.border_countries.items(): + for country, val in self.o_osm_data.border_countries.items(): if country not in tile['countries']: continue log.info( - '+ Splitting tile %s of %s for Coordinates: %s,%s from map of %s', tile_count, len(self.tiles), tile["x"], tile["y"], country) + '+ Splitting tile %s of %s for Coordinates: %s,%s from map of %s', tile_count, len(self.o_osm_data.tiles), tile["x"], tile["y"], country) out_file = os.path.join(USER_OUTPUT_DIR, f'{tile["x"]}', f'{tile["y"]}', f'split-{country}.osm.pbf') out_file_names = os.path.join(USER_OUTPUT_DIR, f'{tile["x"]}', f'{tile["y"]}', f'split-{country}-names.osm.pbf') out_merged = os.path.join(USER_OUTPUT_DIR, f'{tile["x"]}', f'{tile["y"]}', 'merged.osm.pbf') - if not os.path.isfile(out_merged) or self.force_processing is True: + if not os.path.isfile(out_merged) or self.o_osm_data.force_processing is True: # Windows if platform.system() == "Windows": cmd = [self.osmconvert_path, @@ -530,9 +590,9 @@ def merge_splitted_tiles_with_land_and_sea(self, process_border_countries): log.info('-' * 80) log.info('# Merge splitted tiles with land an sea') tile_count = 1 - for tile in self.tiles: # pylint: disable=too-many-nested-blocks + for tile in self.o_osm_data.tiles: # pylint: disable=too-many-nested-blocks log.info( - '+ Merging tiles for tile %s of %s for Coordinates: %s,%s', tile_count, len(self.tiles), tile["x"], tile["y"]) + '+ Merging tiles for tile %s of %s for Coordinates: %s,%s', tile_count, len(self.o_osm_data.tiles), tile["x"], tile["y"]) out_tile_dir = os.path.join(USER_OUTPUT_DIR, f'{tile["x"]}', f'{tile["y"]}') @@ -540,7 +600,7 @@ def merge_splitted_tiles_with_land_and_sea(self, process_border_countries): land_files = glob.glob(os.path.join(out_tile_dir, 'land*.osm')) - if not os.path.isfile(out_file) or self.force_processing is True: + if not os.path.isfile(out_file) or self.o_osm_data.force_processing is True: # sort land* osm files self.sort_osm_files(tile) @@ -551,7 +611,7 @@ def merge_splitted_tiles_with_land_and_sea(self, process_border_countries): # loop through all countries of tile, if border-countries should be processed. # if border-countries should not be processed, only process the "entered" country for country in tile['countries']: - if process_border_countries or country in self.border_countries: + if process_border_countries or country in self.o_osm_data.border_countries: cmd.append('--rbf') cmd.append(os.path.join( out_tile_dir, f'split-{country}.osm.pbf')) @@ -581,7 +641,7 @@ def merge_splitted_tiles_with_land_and_sea(self, process_border_countries): # loop through all countries of tile, if border-countries should be processed. # if border-countries should not be processed, only process the "entered" country for country in tile['countries']: - if process_border_countries or country in self.border_countries: + if process_border_countries or country in self.o_osm_data.border_countries: cmd.append(os.path.join( out_tile_dir, f'split-{country}.osm.pbf')) cmd.append(os.path.join( @@ -648,12 +708,12 @@ def create_map_files(self, save_cruiser, tag_wahoo_xml): threads = 1 tile_count = 1 - for tile in self.tiles: + for tile in self.o_osm_data.tiles: log.info( - '+ Creating map file for tile %s of %s for Coordinates: %s,%s', tile_count, len(self.tiles), tile["x"], tile["y"]) + '+ Creating map file for tile %s of %s for Coordinates: %s,%s', tile_count, len(self.o_osm_data.tiles), tile["x"], tile["y"]) out_file = os.path.join(USER_OUTPUT_DIR, f'{tile["x"]}', f'{tile["y"]}.map') - if not os.path.isfile(out_file+'.lzma') or self.force_processing is True: + if not os.path.isfile(out_file+'.lzma') or self.o_osm_data.force_processing is True: merged_file = os.path.join(USER_OUTPUT_DIR, f'{tile["x"]}', f'{tile["y"]}', 'merged.osm.pbf') @@ -673,8 +733,8 @@ def create_map_files(self, save_cruiser, tag_wahoo_xml): # add path to tag-wahoo xml file try: cmd.append( - f'tag-conf-file={fd_fct.get_tag_wahoo_xml_path(tag_wahoo_xml)}') - except fd_fct.TagWahooXmlNotFoundError: + f'tag-conf-file={get_tag_wahoo_xml_path(tag_wahoo_xml)}') + except TagWahooXmlNotFoundError: log.error( 'The tag-wahoo xml file was not found: ˚%s˚. Does the file exist and is your input correct?', tag_wahoo_xml) sys.exit() @@ -684,7 +744,7 @@ def create_map_files(self, save_cruiser, tag_wahoo_xml): # Windows if platform.system() == "Windows": - cmd = [fd_fct.get_tooling_win_path(['lzma']), 'e', out_file, + cmd = [get_tooling_win_path(['lzma']), 'e', out_file, out_file+'.lzma', f'-mt{threads}', '-d27', '-fb273', '-eos'] # Non-Windows else: @@ -714,24 +774,24 @@ def make_and_zip_files(self, extension, zip_folder): """ if extension == '.map.lzma': - folder_name = self.country_name + folder_name = self.o_osm_data.country_name else: - folder_name = self.country_name + '-maps' + folder_name = self.o_osm_data.country_name + '-maps' log.info('-' * 80) log.info('# Create: %s files', extension) - log.info('+ Country: %s', self.country_name) + log.info('+ Country: %s', self.o_osm_data.country_name) # Check for us/utah etc names try: - res = self.country_name.index('/') - self.country_name = self.country_name[res+1:] + res = self.o_osm_data.country_name.index('/') + self.o_osm_data.country_name = self.o_osm_data.country_name[res+1:] except ValueError: pass # copy the needed tiles to the country folder log.info('+ Copying %s tiles to output folders', extension) - for tile in self.tiles: + for tile in self.o_osm_data.tiles: src = os.path.join(f'{USER_OUTPUT_DIR}', f'{tile["x"]}', f'{tile["y"]}') + extension dst = os.path.join( @@ -746,7 +806,7 @@ def make_and_zip_files(self, extension, zip_folder): if zip_folder: # Windows if platform.system() == "Windows": - cmd = [fd_fct.get_tooling_win_path(['7za']), 'a', '-tzip'] + cmd = [get_tooling_win_path(['7za']), 'a', '-tzip'] cmd.extend( [folder_name + '.zip', os.path.join(".", folder_name, "*")]) @@ -788,5 +848,5 @@ def copy_to_dst(self, extension, src, dst): shutil.copy2(src, dst) except Exception as exception: # pylint: disable=broad-except log.error( - '! Error copying %s files for country %s: %s', extension, self.country_name, exception) + '! Error copying %s files for country %s: %s', extension, self.o_osm_data.country_name, exception) sys.exit() diff --git a/wahoomc/setup_functions.py b/wahoomc/setup_functions.py index abf9206e..fe0f212b 100644 --- a/wahoomc/setup_functions.py +++ b/wahoomc/setup_functions.py @@ -8,8 +8,11 @@ import logging # import custom python packages -from wahoomc import file_directory_functions as fd_fct -from wahoomc import constants +from wahoomc.file_directory_functions import move_content +from wahoomc.constants import USER_WAHOO_MC +from wahoomc.constants import USER_DL_DIR +from wahoomc.constants import USER_MAPS_DIR +from wahoomc.constants import USER_OUTPUT_DIR log = logging.getLogger('main-logger') @@ -18,10 +21,10 @@ def initialize_work_directories(): """ Initialize work directories """ - os.makedirs(constants.USER_WAHOO_MC, exist_ok=True) - os.makedirs(constants.USER_DL_DIR, exist_ok=True) - os.makedirs(constants.USER_MAPS_DIR, exist_ok=True) - os.makedirs(constants.USER_OUTPUT_DIR, exist_ok=True) + os.makedirs(USER_WAHOO_MC, exist_ok=True) + os.makedirs(USER_DL_DIR, exist_ok=True) + os.makedirs(USER_MAPS_DIR, exist_ok=True) + os.makedirs(USER_OUTPUT_DIR, exist_ok=True) def move_old_content_into_new_dirs(): @@ -33,5 +36,5 @@ def move_old_content_into_new_dirs(): This coding is only valid/needed when using the cloned version or .zip version. If working with a installed version via PyPI, nothing will be done because folders to copy do not exist """ - fd_fct.move_content('wahooMapsCreator_download', constants.USER_DL_DIR) - fd_fct.move_content('wahooMapsCreator_output', constants.USER_OUTPUT_DIR) + move_content('wahooMapsCreator_download', USER_DL_DIR) + move_content('wahooMapsCreator_output', USER_OUTPUT_DIR)