From 2899ff1f7dec07fcaccf57f3e4d902b18c42ab0c Mon Sep 17 00:00:00 2001 From: Alexander Myskov Date: Mon, 30 Nov 2020 10:42:37 -0600 Subject: [PATCH] TEST-#2290: correct parallel mode Signed-off-by: Alexander Myskov --- modin/pandas/test/test_io.py | 97 ++++++++++++------------------------ modin/pandas/test/utils.py | 39 ++------------- 2 files changed, 34 insertions(+), 102 deletions(-) diff --git a/modin/pandas/test/test_io.py b/modin/pandas/test/test_io.py index 68938e4089e..99dc31a5161 100644 --- a/modin/pandas/test/test_io.py +++ b/modin/pandas/test/test_io.py @@ -40,7 +40,6 @@ IO_OPS_DATA_DIR, io_ops_bad_exc, eval_io_from_str, - SharedVars, ) from modin.config import Engine, Backend, IsExperimental @@ -302,74 +301,40 @@ def make_csv_file(): @pytest.fixture(scope="class") def TestReadCSVFixture(worker_id): - # Process shared variables are needed because `xdist` spawns - # workers in separate processes with separate namespaces - shared_vars = SharedVars( - READ_CSV_SHARED_DATA_FILE, - { - "setup_started": False, - "setup_completed": False, - "teardowned_workers": [], - "filenames": [], - "csvs_names": {}, - }, + filenames = [] + files_ids = [ + "test_read_csv_regular", + "test_read_csv_blank_lines", + "test_read_csv_yes_no", + ] + # each xdist worker spawned in separate process with separate namespace and dataset + pytest.csvs_names = { + file_id: get_unique_filename(file_id, debug_mode=True, suffix=worker_id) + for file_id in files_ids + } + # test_read_csv_col_handling, test_read_csv_parsing + _make_csv_file(filenames)( + filename=pytest.csvs_names["test_read_csv_regular"], + ) + # test_read_csv_parsing + _make_csv_file(filenames)( + filename=pytest.csvs_names["test_read_csv_yes_no"], + additional_col_values=["Yes", "true", "No", "false"], + ) + # test_read_csv_col_handling + _make_csv_file(filenames)( + filename=pytest.csvs_names["test_read_csv_blank_lines"], + add_blank_lines=True, ) - if not shared_vars.get_var_value("setup_started"): - shared_vars.set_var_value("setup_started", True) - filenames = [] - files_ids = [ - "test_read_csv_regular", - "test_read_csv_blank_lines", - "test_read_csv_yes_no", - ] - pytest.csvs_names = { - file_id: get_unique_filename(file_id, debug_mode=True) - for file_id in files_ids - } - # test_read_csv_col_handling, test_read_csv_parsing - _make_csv_file(filenames)( - filename=pytest.csvs_names["test_read_csv_regular"], - ) - # test_read_csv_parsing - _make_csv_file(filenames)( - filename=pytest.csvs_names["test_read_csv_yes_no"], - additional_col_values=["Yes", "true", "No", "false"], - ) - # test_read_csv_col_handling - _make_csv_file(filenames)( - filename=pytest.csvs_names["test_read_csv_blank_lines"], - add_blank_lines=True, - ) - filenames.extend( - [READ_CSV_SHARED_DATA_FILE, f"{READ_CSV_SHARED_DATA_FILE}.lock"] - ) - shared_vars.set_var_value("setup_completed", True) - shared_vars.set_var_value("csvs_names", pytest.csvs_names) - shared_vars.set_var_value("filenames", filenames) - - else: - # wait until first spawned worker finishes fixture setup - import time - - while not shared_vars.get_var_value("setup_completed"): - time.sleep(1) - - pytest.csvs_names = shared_vars.get_var_value("csvs_names") yield - shared_vars.set_var_value("teardowned_workers", worker_id, append=True) - - # execute fixture teardown only if all workers finished their tasks - if len(shared_vars.get_var_value("teardowned_workers")) == int( - os.environ.get("PYTEST_XDIST_WORKER_COUNT", "1") - ): - # Delete csv files that were created - for filename in shared_vars.get_var_value("filenames"): - if os.path.exists(filename): - try: - os.remove(filename) - except PermissionError: - pass + # Delete csv files that were created + for filename in filenames: + if os.path.exists(filename): + try: + os.remove(filename) + except PermissionError: + pass def setup_json_file(row_size, force=False): diff --git a/modin/pandas/test/utils.py b/modin/pandas/test/utils.py index 20ef3db73d3..0e32e406050 100644 --- a/modin/pandas/test/utils.py +++ b/modin/pandas/test/utils.py @@ -28,8 +28,6 @@ import os from string import ascii_letters import csv -import json -from filelock import FileLock random_state = np.random.RandomState(seed=42) @@ -924,7 +922,9 @@ def get_unique_filename( for value in kwargs_name.values() ] ) - return os.path.join(data_dir, parameters_values + suffix_part + f".{extension}") + return os.path.join( + data_dir, test_name + parameters_values + suffix_part + f".{extension}" + ) else: import uuid @@ -1001,36 +1001,3 @@ def insert_lines_to_csv( **csv_reader_writer_params, ) writer.writerows(lines) - - -class SharedVars: - """implements variables that can be shared among processes""" - - def __init__(self, shared_file_path: str, data: dict): - self._shared_file_path = shared_file_path - self._lock = FileLock(f"{self._shared_file_path}.lock") - if not os.path.exists(self._shared_file_path): - with self._lock: - self._write_data(data) - - def _write_data(self, data): - open(self._shared_file_path, "a").close() - with open(self._shared_file_path, "w") as f: - json.dump(data, f) - - def get_var_value(self, var): - with self._lock: - with open(self._shared_file_path) as f: - data = json.load(f) - return data[var] - - def set_var_value(self, var, value, append=False): - with self._lock: - with open(self._shared_file_path) as f: - data = json.load(f) - if append: - assert isinstance(data[var], list) - data[var].append(value) - else: - data[var] = value - self._write_data(data)