Skip to content

Commit

Permalink
Replace forking with spawn for background/concurrent operations
Browse files Browse the repository at this point in the history
  • Loading branch information
AjayP13 committed Dec 29, 2023
1 parent 9b5ee56 commit 71cfa80
Show file tree
Hide file tree
Showing 9 changed files with 177 additions and 77 deletions.
3 changes: 3 additions & 0 deletions scripts/.githooks/post-checkout
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,6 @@ if [ ! -L ./.vscode ] || [ ! -e ./.vscode ]; then
rm -rf ./.vscode 1>/dev/null 2>/dev/null || true
ln -s "$(realpath ./scripts/.vscode)" ./.vscode
fi

# Don't track local changes to project.env
git update-index --skip-worktree ./scripts/project.env
14 changes: 4 additions & 10 deletions src/datadreamer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import inspect
import logging
import os
import sys
from collections import UserDict, defaultdict
from multiprocessing import Process
from threading import Lock
Expand Down Expand Up @@ -73,7 +72,9 @@ def is_running_in_memory() -> bool:

@staticmethod
def is_background_process() -> bool:
return DataDreamer.initialized() and DataDreamer.ctx.pid != os.getpid()
return "DATADREAMER_BACKGROUND_PROCESS" in os.environ or (
DataDreamer.initialized() and DataDreamer.ctx.pid != os.getpid()
)

@staticmethod
def is_registered_thread() -> bool:
Expand Down Expand Up @@ -334,16 +335,9 @@ def _disable_setfit_logging():
setfit_logging.disable_progress_bar()

def __enter__(self):
from .utils.distributed_utils import is_distributed

if hasattr(DataDreamer.ctx, "steps"):
raise RuntimeError("Cannot nest DataDreamer() context managers.")

# Specifically for macOS
if sys.platform == "darwin": # pragma: no cover
# See: https://bugs.python.org/issue33725
os.environ["OBJC_DISABLE_INITIALIZE_FORK_SAFETY"] = "YES"

# Initialize
_DATADREAMER_CTX_LOCK.acquire()
if self.output_folder_path:
Expand Down Expand Up @@ -381,7 +375,7 @@ def __enter__(self):
else:
logger.setLevel(self.log_level or logging.INFO)

if not is_distributed():
if not DataDreamer.is_background_process():
if self.output_folder_path:
logger.info(
f"Initialized. 🚀 Dreaming to folder: {self.output_folder_path}"
Expand Down
6 changes: 2 additions & 4 deletions src/llms/hf_transformers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from datasets.fingerprint import Hasher
from transformers import logging as transformers_logging

from .. import DataDreamer
from ..logging import logger as datadreamer_logger
from ..utils import ring_utils as ring
from ..utils.arg_utils import AUTO, Default
Expand Down Expand Up @@ -284,9 +283,8 @@ def model(self) -> PreTrainedModel:
model = BetterTransformer.transform(model)

# Torch compile
if not DataDreamer.is_background_process(): # .compile() fails in background
torch._dynamo.config.suppress_errors = True
model = torch.compile(model)
torch._dynamo.config.suppress_errors = True
model = torch.compile(model)

# Finished loading
log_if_timeout.stop(
Expand Down
10 changes: 0 additions & 10 deletions src/steps/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import os
import shutil
import sys
import threading
import warnings
from collections import defaultdict
from collections.abc import Iterable
Expand Down Expand Up @@ -154,15 +153,6 @@ def __init__( # noqa: C901
" concurrent() utility function to run concurrently."
)

# Check main thread if macOS
background_supported = (
sys.platform != "darwin"
or threading.current_thread() is threading.main_thread()
)
assert (
not background or background_supported
), "Combining concurrent() with background=True is not supported on macOS."

# Fill in default argument valu]es
if not isinstance(args, dict):
args = {}
Expand Down
6 changes: 5 additions & 1 deletion src/steps/step_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,11 @@ def dummy_empty_generator():
return iter(())

step._set_output(
LazyRows(dummy_empty_generator, total_num_rows=total_num_rows)
LazyRows(
dummy_empty_generator,
total_num_rows=total_num_rows,
auto_progress=False if total_num_rows is None else True,
)
)

# Send the data card (sometimes this gets updated in .run())
Expand Down
14 changes: 12 additions & 2 deletions src/tests/steps/prompt/test_prompt.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import sys
from itertools import islice

from ....datasets import OutputIterableDataset
Expand All @@ -8,6 +7,17 @@

class TestPromptBase:
def test_prompt_base(self, create_datadreamer):
# This test seems to throw a warning from Python on macOS / M-series chips:
#
# https://github.com/apple/ml-stable-diffusion/issues/8
#
# It looks like an active bug with M-series chips / macOS / CPython.
# There is nothing we can do to fix it, the following warning can be ignored:
# """
# multiprocessing/resource_tracker.py:224: UserWarning: resource_tracker:
# There appear to be 1 leaked semaphore objects to clean up at shutdown
# warnings.warn('resource_tracker: There appear to be
# """
with create_datadreamer():
prompts = ["What color is the sky?", "Who was the first president"]
llm = HFTransformers("google/flan-t5-small")
Expand All @@ -17,7 +27,7 @@ def test_prompt_base(self, create_datadreamer):
args={"llm": llm, "max_new_tokens": 2, "batch_size": 5},
inputs={"prompts": questions.output["questions"]},
outputs={"generations": "answers"},
background=sys.platform != "darwin",
background=True,
)
wait(answers)
assert questions.fingerprint == "ec0e94db113f0bb0"
Expand Down
6 changes: 1 addition & 5 deletions src/tests/steps/test_step_background.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import json
import os
import sys
from time import sleep

import pytest
Expand Down Expand Up @@ -135,9 +134,6 @@ def run(self):
assert isinstance(shuffle_step.output, OutputDataset)
assert shuffle_step.output["out1"][0] == 3

@pytest.mark.skipif(
sys.platform == "darwin", reason="macOS doesn't support concurrent + background"
)
def test_concurrent_step_operations(self, create_datadreamer, caplog):
class TestStep(Step):
def setup(self):
Expand Down Expand Up @@ -201,7 +197,7 @@ def data_generator():
for i in range(1000):
yield i

return LazyRows(data_generator)
return LazyRows(data_generator, auto_progress=False)

with create_datadreamer():
step = TestStep(name="my-step", background=True, progress_interval=0)
Expand Down
Loading

0 comments on commit 71cfa80

Please sign in to comment.