Skip to content

Commit

Permalink
Merge pull request #1 from tovrstra/wait-socket
Browse files Browse the repository at this point in the history
Make testing more robust by factoring out waiting for the socket
  • Loading branch information
tovrstra authored May 15, 2024
2 parents a370ec2 + df84b55 commit bf78b62
Show file tree
Hide file tree
Showing 85 changed files with 728 additions and 145 deletions.
25 changes: 20 additions & 5 deletions docs/advanced_topics/manual_cleaning.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,34 @@ Such arguments can be one of the two things:

1. If a file is given, all outputs using this file as input will be removed.
Furthermore, if the file itself is also a build output, it will also be removed.
2. If a directory is given, all outputs will be removed from this directory.

1. If a directory is given, all outputs will be removed from this directory.
Furthermore, if the directory is created in the build, it will also be removed.

Files are removed recursively, so outputs of outputs are also cleaned up.
`cleanup` will only remove files with status `PENDING`, `BUILT` or `VOLATILE`.
Static files, i.e., files you have created, are never removed.

!!! note "`cleanup` requires `stepup` to be running."
There are a few gotchas you should be aware of:

1. The `cleanup` script sends a list of paths to be cleaned to the director process.
The director takes care of analyzing the workflow to decide which files need to be removed.
For this reason, an instance of `stepup` must be running for `cleanup` to work.

1. By default, you need to run `cleanup` in the top-level directory where you also started `stepup`.
This requirement can be lifted by defining the top-level directory in the `STEPUP_ROOT` environment variable, as explained in the [next tutorial](stepup_root.md).

If `cleanup` cannot connect to the StepUp director process, it will keep trying and print warning messages, for example:

The `cleanup` script sends a list of paths to be cleaned to the director process.
The director takes care of analyzing the workflow to decide which files need to remove.
For this reason, an instance of `stepup` must be running for `cleanup` to work.
```
Trying to contact StepUp director process.
File ./.stepup/logs/director not found. Waiting 0.1 seconds.
Socket /tmp/stepup-c9a12bau/director read from ./.stepup/logs/director does not exist. Stepup not running? Waiting 0.2 seconds.
Socket /tmp/stepup-c9a12bau/director read from ./.stepup/logs/director does not exist. Stepup not running? Waiting 0.3 seconds.
...
```

When you see this, either start `stepup` in a second terminal or interrupt `cleanup` with Ctrl-C.

## Try the Following

Expand Down
20 changes: 20 additions & 0 deletions docs/advanced_topics/stepup_root.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# The STEPUP_ROOT variable

If you are working on a large project with several subdirectories,
it may be useful to define the `STEPUP_ROOT` environment variable.
It should contain the absolute path of the top-level directory where you would normally call the `stepup` and `cleanup` commands.
(The top-level directory contains the `.stepup` subdirectory and the top-level `plan.py`.)

With `STEPUP_ROOT` set, it is no longer necessary to change to the top-level directory before running `stepup` and `cleanup`.
Also, the `cleanup` arguments will be interpreted correctly in subdirectories.

You can manually set `STEPUP_ROOT` in the top-level directory as follows:

```bash
export STEPUP_ROOT="${PWD}"
```

However, this can be tedious, as it has to be set each time you open a new terminal window.
It is much easier to set such variables using [direnv](https://direnv.net/).
Once direnv is configured on your system, you can create an `.envrc` file with the above `export` line in the top-level directory.
Each time you change to the project directory or any of its subdirectories, the `STEPUP_ROOT` directory will automatically be set correctly.
10 changes: 10 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- Make `cleanup` command work in project subdirectories when `STEPUP_ROOT` is set.
- Avoid useless wait when running a `plan.py` script outside of `stepup`.

### Changed

- Documentation updates.


## [1.2.1] - 2024-05-07

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion docs/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ Then open the live preview in your browser at [http://127.0.0.1:8000/](http://12
and edit Markdown files in your IDE.

Please, use [Semantic Line Breaks](https://sembr.org/)
because it results in cleaner file diffs when editing documentation.
because it facilitates reviewing documentation changes.


## Tutorial Example Outputs
Expand Down
18 changes: 16 additions & 2 deletions docs/getting_started/first_step.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,19 @@ StepUp will not know anymore that it already executed some steps and runs all of

## Try the Following

Now, change the arguments of the `echo` command in `plan.py` and run `stepup -n -w1` again.
As expected, StepUp will detect the change and repeat the `plan.py` and `echo` steps.
- Change the arguments of the `echo` command in `plan.py` and run `stepup -n -w1` again.
As expected, StepUp detects the change and repeats the `plan.py` and `echo` steps.

- Normally, you would never run `./plan.py` directly as a normal Python script, i.e.,
without running it through `stepup`.
Try it anyway, just to see what happens.
The terminal output shows the commands that would normally be sent to the StepUp director
process when `plan.py` is executed by `stepup`.
You should get the following screen output:

```
{% include 'getting_started/first_step/stdout3.txt' | indent(width=4) %}
```
This output contains internal details of StepUp,
which can be useful for debugging purposes.
1 change: 1 addition & 0 deletions docs/getting_started/first_step/main.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ git clean -qdfX .
unset STEPUP_ROOT
stepup -n -w 1 | sed -f ../../clean_stdout.sed > stdout1.txt
stepup -n -w 1 | sed -f ../../clean_stdout.sed > stdout2.txt
./plan.py > stdout3.txt

# INP: plan.py
4 changes: 2 additions & 2 deletions docs/getting_started/first_step/stdout1.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
SUCCESS │ ./plan.py
START │ echo Hello World
SUCCESS │ echo Hello World
─────────────────────────────── Standard output ────────────────────────────────
──────────────────────────────────────────────────────────────── Standard output ─────────────────────────────────────────────────────────────────
Hello World
────────────────────────────────────────────────────────────────────────────────
──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
WORKFLOW │ Dumped to .stepup/workflow.mpk.xz
PHASE │ watch
DIRECTOR │ Stopping workers.
Expand Down
1 change: 1 addition & 0 deletions docs/getting_started/first_step/stdout3.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
step('dummy:', 'echo Hello World', [], [], [], [], Path('./'), False, None, False)
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ nav:
- advanced_topics/cyclic_dependencies.md
- advanced_topics/amending_static_inputs.md
- advanced_topics/manual_cleaning.md
- advanced_topics/stepup_root.md
- Reference:
- reference/stepup.core.api.md
- reference/stepup.core.interact.md
Expand Down
34 changes: 9 additions & 25 deletions stepup/core/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import contextlib
import os
from collections.abc import Callable, Collection, Iterable, Iterator
from time import sleep

from path import Path

Expand Down Expand Up @@ -559,6 +558,8 @@ def subs(path: str | None) -> Path | None:
def translate(path: str) -> Path:
"""Normalize the path and, if relative, make it relative to `ROOT` by prepending `HERE`.
If the environment variable `HERE` is not set, it is derived from `STEPUP_ROOT` if set.
Parameters
----------
path
Expand All @@ -572,6 +573,10 @@ def translate(path: str) -> Path:
path = mynormpath(path)
if not path.isabs():
here = os.getenv("HERE")
if here is None:
stepup_root = os.getenv("STEPUP_ROOT")
if stepup_root is not None:
here = myrelpath("./", stepup_root)
if here is not None:
path = mynormpath(here / path)
return path
Expand All @@ -592,31 +597,10 @@ def check_inp_paths(inp_paths: Collection[Path]):

def _get_rpc_client():
"""Try setting up a Synchronous RPC client or fall back to the dummy client if that fails."""
STEPUP_DIRECTOR_SOCKET = os.getenv("STEPUP_DIRECTOR_SOCKET")
if STEPUP_DIRECTOR_SOCKET is None:
STEPUP_ROOT = Path(os.getenv("STEPUP_ROOT", "./"))
path_tmpsock = STEPUP_ROOT / ".stepup/tmpsock.txt"
time = 0.0
for _ in range(5):
if time > 0:
print(f"WARNING: waiting {time} seconds for {path_tmpsock}")
sleep(time)
time *= 2
else:
time = 0.1
if path_tmpsock.is_file():
with open(path_tmpsock) as fh:
dir_sockets = Path(fh.read().strip())
if dir_sockets != "":
path_socket = dir_sockets / "director"
if path_socket.exists():
STEPUP_DIRECTOR_SOCKET = path_socket
break
if STEPUP_DIRECTOR_SOCKET is None:
print("STEPUP_DIRECTOR_SOCKET not set and .stepup/tmpsock.txt not valid.")
print("RPC calls are printed and have no effect.")
stepup_director_socket = os.getenv("STEPUP_DIRECTOR_SOCKET")
if stepup_director_socket is None:
return DummySyncRPCClient()
return SocketSyncRPCClient(STEPUP_DIRECTOR_SOCKET)
return SocketSyncRPCClient(stepup_director_socket)


RPC_CLIENT = _get_rpc_client()
Expand Down
10 changes: 8 additions & 2 deletions stepup/core/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,19 @@

import argparse

from .interact import cleanup
from .api import translate
from .director import get_socket
from .rpc import SocketSyncRPCClient


def main():
"""Main program."""
# Instead of using the interact module, a new RPC client is created after
# discovering the socket without relying on STEPUP_DIRECTOR_SOCKET.
args = parse_args()
numf, numd = cleanup(*args.paths)
rpc_client = SocketSyncRPCClient(get_socket())
tr_paths = sorted(translate(path) for path in args.paths)
numf, numd = rpc_client.call.cleanup(tr_paths)
print(f"Removed {numf} files and {numd} directories.")


Expand Down
33 changes: 32 additions & 1 deletion stepup/core/director.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import os
import shutil
import sys
import time
import traceback
from decimal import Decimal

Expand All @@ -43,7 +44,7 @@
from .watcher import Watcher
from .workflow import Workflow

__all__ = ("interpret_num_workers", "serve")
__all__ = ("interpret_num_workers", "serve", "get_socket")


def main():
Expand All @@ -52,6 +53,7 @@ def main():

async def async_main():
args = parse_args()
print(f"SOCKET {args.director_socket}", file=sys.stderr)
print(f"PID {os.getpid()}", file=sys.stderr)
async with ReporterClient.socket(args.reporter_socket) as reporter:
num_workers = interpret_num_workers(args.num_workers)
Expand Down Expand Up @@ -524,5 +526,34 @@ async def wait(self):
await self._watcher.active.wait()


def get_socket() -> str:
"""Block until the director socket is known and return it."""
stepup_root = Path(os.getenv("STEPUP_ROOT", "./"))
path_director_log = stepup_root / ".stepup/logs/director"
secs = 0
while True:
time.sleep(secs)
if os.path.isfile(path_director_log):
with open(path_director_log) as fh:
line = fh.readline()
if line.startswith("SOCKET"):
path_socket = Path(line[6:].strip())
if len(path_socket) > 2 and path_socket.exists():
return path_socket
else:
message = (
f"Socket {path_socket} read from {path_director_log} does not exist. "
"Stepup not running?"
)
else:
message = f"File {path_director_log} does not start with SOCKET line."
else:
message = f"File {path_director_log} not found."
if secs == 0.0:
print("Trying to contact StepUp director process.", file=sys.stderr)
secs += 0.1
print(f"{message} Waiting {secs:.1f} seconds.", file=sys.stderr)


if __name__ == "__main__":
main()
16 changes: 7 additions & 9 deletions stepup/core/tui.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,6 @@ async def async_main():
director_socket_path = dir_sockets / "director"
reporter_socket_path = dir_sockets / "reporter"

num_workers = interpret_num_workers(args.num_workers)
path_tmpsock = dir_stepup / "tmpsock.txt"
with open(path_tmpsock, "w") as fh:
print(dir_sockets, file=fh)

# Set up the reporter monitor
stop_event = asyncio.Event()
reporter_handler = ReporterHandler(args.show_perf > 0, stop_event)
Expand All @@ -77,6 +72,7 @@ async def async_main():
tasks = [task_reporter]

# Launch director as background process
num_workers = interpret_num_workers(args.num_workers)
argv = [
"-m",
"stepup.core.director",
Expand Down Expand Up @@ -118,15 +114,17 @@ async def async_main():
await asyncio.gather(*tasks)
except ConnectionRefusedError:
reporter_handler.report("ERROR", "Could not connect to director", [])
finally:
path_tmpsock.remove_p()


async def wait_for_path(path: Path, stop_event: asyncio.Event):
"""Wait until a path exists."""
time = 0.0
while not path.exists():
if stop_event.is_set():
return
await asyncio.sleep(0.1)
break
if time > 0:
await asyncio.sleep(time)
time += 0.1


@attrs.define
Expand Down
21 changes: 18 additions & 3 deletions tests/cases/amend/main.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ xargs rm -rvf < .gitignore
# Run the example
stepup -e -w 1 plan.py & # > current_stdout_01.txt &

# Wait for the director and get its socket.
export STEPUP_DIRECTOR_SOCKET=$(
python -c "import stepup.core.director; print(stepup.core.director.get_socket())"
)

# Get graph after normal run.
python3 - << EOD
from stepup.core.interact import *
Expand Down Expand Up @@ -41,11 +46,16 @@ grep word1 out1.txt
grep word2 out2.txt

# Wait for background processes, if any.
wait $(jobs -p)
wait

# Restart StepUp without changes
stepup -e -w 1 plan.py & # > current_stdout_02.txt &

# Wait for the director and get its socket.
export STEPUP_DIRECTOR_SOCKET=$(
python -c "import stepup.core.director; print(stepup.core.director.get_socket())"
)

# Get graph after restart without changes.
python3 - << EOD
from stepup.core.interact import *
Expand All @@ -63,12 +73,17 @@ grep word1 out1.txt
grep word2 out2.txt

# Wait for background processes, if any.
wait $(jobs -p)
wait

# Restart StepUp with changes
echo "word2 and other" > inp2.txt
stepup -e -w 1 plan.py & # > current_stdout_03.txt &

# Wait for the director and get its socket.
export STEPUP_DIRECTOR_SOCKET=$(
python -c "import stepup.core.director; print(stepup.core.director.get_socket())"
)

# Get graph after restart without changes.
python3 - << EOD
from stepup.core.interact import *
Expand All @@ -86,4 +101,4 @@ grep word1 out1.txt
grep word2 out2.txt

# Wait for background processes, if any.
wait $(jobs -p)
wait
7 changes: 6 additions & 1 deletion tests/cases/amend_delay/main.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ echo "Something old" > inp0.txt
echo "First inp1.txt" > inp1.txt
stepup -e -w 1 plan.py & # > current_stdout.txt &

# Wait for the director and get its socket.
export STEPUP_DIRECTOR_SOCKET=$(
python -c "import stepup.core.director; print(stepup.core.director.get_socket())"
)

# Initial graph
python3 - << EOD
from stepup.core.interact import *
Expand Down Expand Up @@ -64,4 +69,4 @@ EOD
[[ -f log.txt ]] || exit -1

# Wait for background processes, if any.
wait $(jobs -p)
wait
Loading

0 comments on commit bf78b62

Please sign in to comment.