Skip to content

Commit

Permalink
TEST-modin-project#2290: correct parallel mode
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Myskov <[email protected]>
  • Loading branch information
amyskov committed Dec 1, 2020
1 parent 49bb910 commit 2899ff1
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 102 deletions.
97 changes: 31 additions & 66 deletions modin/pandas/test/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
IO_OPS_DATA_DIR,
io_ops_bad_exc,
eval_io_from_str,
SharedVars,
)

from modin.config import Engine, Backend, IsExperimental
Expand Down Expand Up @@ -302,74 +301,40 @@ def make_csv_file():

@pytest.fixture(scope="class")
def TestReadCSVFixture(worker_id):
# Process shared variables are needed because `xdist` spawns
# workers in separate processes with separate namespaces
shared_vars = SharedVars(
READ_CSV_SHARED_DATA_FILE,
{
"setup_started": False,
"setup_completed": False,
"teardowned_workers": [],
"filenames": [],
"csvs_names": {},
},
filenames = []
files_ids = [
"test_read_csv_regular",
"test_read_csv_blank_lines",
"test_read_csv_yes_no",
]
# each xdist worker spawned in separate process with separate namespace and dataset
pytest.csvs_names = {
file_id: get_unique_filename(file_id, debug_mode=True, suffix=worker_id)
for file_id in files_ids
}
# test_read_csv_col_handling, test_read_csv_parsing
_make_csv_file(filenames)(
filename=pytest.csvs_names["test_read_csv_regular"],
)
# test_read_csv_parsing
_make_csv_file(filenames)(
filename=pytest.csvs_names["test_read_csv_yes_no"],
additional_col_values=["Yes", "true", "No", "false"],
)
# test_read_csv_col_handling
_make_csv_file(filenames)(
filename=pytest.csvs_names["test_read_csv_blank_lines"],
add_blank_lines=True,
)
if not shared_vars.get_var_value("setup_started"):
shared_vars.set_var_value("setup_started", True)
filenames = []
files_ids = [
"test_read_csv_regular",
"test_read_csv_blank_lines",
"test_read_csv_yes_no",
]
pytest.csvs_names = {
file_id: get_unique_filename(file_id, debug_mode=True)
for file_id in files_ids
}
# test_read_csv_col_handling, test_read_csv_parsing
_make_csv_file(filenames)(
filename=pytest.csvs_names["test_read_csv_regular"],
)
# test_read_csv_parsing
_make_csv_file(filenames)(
filename=pytest.csvs_names["test_read_csv_yes_no"],
additional_col_values=["Yes", "true", "No", "false"],
)
# test_read_csv_col_handling
_make_csv_file(filenames)(
filename=pytest.csvs_names["test_read_csv_blank_lines"],
add_blank_lines=True,
)
filenames.extend(
[READ_CSV_SHARED_DATA_FILE, f"{READ_CSV_SHARED_DATA_FILE}.lock"]
)
shared_vars.set_var_value("setup_completed", True)
shared_vars.set_var_value("csvs_names", pytest.csvs_names)
shared_vars.set_var_value("filenames", filenames)

else:
# wait until first spawned worker finishes fixture setup
import time

while not shared_vars.get_var_value("setup_completed"):
time.sleep(1)

pytest.csvs_names = shared_vars.get_var_value("csvs_names")

yield
shared_vars.set_var_value("teardowned_workers", worker_id, append=True)

# execute fixture teardown only if all workers finished their tasks
if len(shared_vars.get_var_value("teardowned_workers")) == int(
os.environ.get("PYTEST_XDIST_WORKER_COUNT", "1")
):
# Delete csv files that were created
for filename in shared_vars.get_var_value("filenames"):
if os.path.exists(filename):
try:
os.remove(filename)
except PermissionError:
pass
# Delete csv files that were created
for filename in filenames:
if os.path.exists(filename):
try:
os.remove(filename)
except PermissionError:
pass


def setup_json_file(row_size, force=False):
Expand Down
39 changes: 3 additions & 36 deletions modin/pandas/test/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import os
from string import ascii_letters
import csv
import json
from filelock import FileLock

random_state = np.random.RandomState(seed=42)

Expand Down Expand Up @@ -924,7 +922,9 @@ def get_unique_filename(
for value in kwargs_name.values()
]
)
return os.path.join(data_dir, parameters_values + suffix_part + f".{extension}")
return os.path.join(
data_dir, test_name + parameters_values + suffix_part + f".{extension}"
)
else:
import uuid

Expand Down Expand Up @@ -1001,36 +1001,3 @@ def insert_lines_to_csv(
**csv_reader_writer_params,
)
writer.writerows(lines)


class SharedVars:
"""implements variables that can be shared among processes"""

def __init__(self, shared_file_path: str, data: dict):
self._shared_file_path = shared_file_path
self._lock = FileLock(f"{self._shared_file_path}.lock")
if not os.path.exists(self._shared_file_path):
with self._lock:
self._write_data(data)

def _write_data(self, data):
open(self._shared_file_path, "a").close()
with open(self._shared_file_path, "w") as f:
json.dump(data, f)

def get_var_value(self, var):
with self._lock:
with open(self._shared_file_path) as f:
data = json.load(f)
return data[var]

def set_var_value(self, var, value, append=False):
with self._lock:
with open(self._shared_file_path) as f:
data = json.load(f)
if append:
assert isinstance(data[var], list)
data[var].append(value)
else:
data[var] = value
self._write_data(data)

0 comments on commit 2899ff1

Please sign in to comment.