Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

app: Update Forall command to allow multiple concurrent processes #755

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 41 additions & 14 deletions src/west/app/project.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2018, 2019 Nordic Semiconductor ASA

Check warning on line 1 in src/west/app/project.py

View workflow job for this annotation

GitHub Actions / Check file src/west/app/project.py

File format check failed

Run 'ruff format src/west/app/project.py'
# Copyright 2018, 2019 Foundries.io
#
# SPDX-License-Identifier: Apache-2.0
Expand All @@ -6,6 +6,7 @@
'''West project commands'''

import argparse
import asyncio
import logging
import os
import shlex
Expand Down Expand Up @@ -1710,16 +1711,15 @@
parser.add_argument('projects', metavar='PROJECT', nargs='*',
help='''projects (by name or path) to operate on;
defaults to active cloned projects''')
parser.add_argument('-j', '--jobs', nargs='?', const=-1,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
parser.add_argument('-j', '--jobs', nargs='?', const=-1,
# Default to 1 when `-j` is not given because there is no way to
# whether the user commands can be run at the same time safely.
parser.add_argument('-j', '--jobs', nargs='?', const=-1,

Eventually, west grep, west update and others could default to cpu_count() if everything goes well but I think forall should always default to 1.

(such a comment also helps a bit with the peculiar default+const argparse idiom)

default=1, type=int, action='store',
help='''Use multiple jobs to parallelize commands.
Pass no number or -1 to run commands on all cores.''')
return parser

def do_run(self, args, user_args):
failed = []
group_set = set(args.groups)
env = os.environ.copy()
for project in self._cloned_projects(args, only_active=not args.all):
if group_set and not group_set.intersection(set(project.groups)):
continue

async def run_for_project(self, project, args, semaphore):
async with semaphore:
env = os.environ.copy()
env["WEST_PROJECT_NAME"] = project.name
env["WEST_PROJECT_PATH"] = project.path
env["WEST_PROJECT_ABSPATH"] = project.abspath if project.abspath else ''
Expand All @@ -1729,12 +1729,39 @@

cwd = args.cwd if args.cwd else project.abspath

self.banner(
f'running "{args.subcommand}" in {project.name_and_path}:')
rc = subprocess.Popen(args.subcommand, shell=True, env=env,
cwd=cwd).wait()
if rc:
failed.append(project)
self.banner(f'running "{args.subcommand}" in {project.name_and_path}:',
end=('\r' if self.jobs > 1 else '\n'))
proc = await asyncio.create_subprocess_shell(
args.subcommand,
cwd=cwd, env=env, shell=True,
stdout=asyncio.subprocess.PIPE if self.jobs > 1 else None,
stderr=asyncio.subprocess.PIPE if self.jobs > 1 else None)

if self.jobs > 1:
(out, err) = await proc.communicate()

self.banner(f'finished "{args.subcommand}" in {project.name_and_path}:')
sys.stdout.write(out.decode())
sys.stderr.write(err.decode())

return proc.returncode

return await proc.wait()
Copy link
Collaborator

@marc-hb marc-hb Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to capture standard outputs from different processes here so they don't interleave concurrently and randomly and become really hard to read. This could even turn into a terminal disaster if they use --color ANSI codes. I think we already touched on this question in #713 and before.

You probably tested this with relatively "quiet" and easy to read output and a reasonable number of threads... can you try again after cranking it all up? This code must be prepared to handle not just the "common" cases but all cases.

You can either use the usual (out, err) = proc.communicate(). This avoids concurrent terminal outputs from different subprocesses but it assumes process outputs are already line-based.
https://docs.python.org/3/library/asyncio-subprocess.html

So it's probably better to play it safer and use some readline() variation. I found a couple examples that seem relevant: https://kevinmccarthy.org/2016/07/25/streaming-subprocess-stdin-and-stdout-with-asyncio-in-python/
https://stackoverflow.com/questions/2804543/read-subprocess-stdout-line-by-line

The real icing on the cake would be an option that prefixes each line of the outputs with the project name! BTW:

Eventually, that parallelization and output capture code should be generic enough to be-reused by all commands, not just forall! And especially west update where it is ... awaited (pun intended) the most (#713 etc.)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I didn't want to dive right in here, but it would be nice to get right from the get-go.

I was dabbling with the idea of having the output behave like ninja where the output line is replaced with the banner and maybe some counter indicating the progress. And when the subprocesses is done, print its output as is.

Not sure if this would require something like curses, I think it could be more lightweight.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was dabbling with the idea of having the output behave like ninja where the output line is replaced with the banner and maybe some counter indicating the progress. And when the subprocesses is done, print its output as is

That would be awesome but I think just 1) making sure the output is readable 2) all commands use the same output "framework" would already be a major milestone and great stepping stone towards something better. And it would give what a lot of users have been waiting for: concurrency at last.


def do_run(self, args, unknown):
group_set = set(args.groups)
projects = [p for p in self._cloned_projects(args, only_active=not args.all)
if not group_set or group_set.intersection(set(p.groups))]

asyncio.run(self.do_run_async(args, projects))

async def do_run_async(self, args, projects):
self.jobs = args.jobs if args.jobs > 0 else os.cpu_count() or sys.maxsize
sem = asyncio.Semaphore(self.jobs)

rcs = await asyncio.gather(*[self.run_for_project(p, args, sem) for p in projects])

failed = [p for (p, rc) in zip(projects, rcs) if rc]
self._handle_failed(args, failed)

GREP_EPILOG = '''
Expand Down
8 changes: 4 additions & 4 deletions src/west/commands.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2018 Open Source Foundries Limited.

Check warning on line 1 in src/west/commands.py

View workflow job for this annotation

GitHub Actions / Check file src/west/commands.py

File format check failed

Run 'ruff format src/west/commands.py'
# Copyright 2019 Foundries.io Limited.
# Copyright 2022 Nordic Semiconductor ASA
#
Expand Down Expand Up @@ -440,16 +440,16 @@
if colorize:
self._reset_colors(sys.stdout)

def banner(self, *args):
def banner(self, *args, end: str = '\n'):
'''Prints args as a "banner" using inf().

The args are prefixed with '=== ' and colorized by default.'''
self.inf('===', *args, colorize=True)
self.inf('===', *args, colorize=True, end=end)

def small_banner(self, *args):
def small_banner(self, *args, end: str = '\n'):
'''Prints args as a smaller banner(), i.e. prefixed with '-- ' and
not colorized.'''
self.inf('---', *args, colorize=False)
self.inf('---', *args, colorize=False, end=end)

def wrn(self, *args, end: str = '\n'):
'''Print a warning.
Expand Down
16 changes: 16 additions & 0 deletions tests/test_project.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020, Nordic Semiconductor ASA

Check warning on line 1 in tests/test_project.py

View workflow job for this annotation

GitHub Actions / Check file tests/test_project.py

File format check failed

Run 'ruff format tests/test_project.py'

import collections
import os
Expand Down Expand Up @@ -425,6 +425,22 @@
]


@pytest.mark.parametrize("jobs", ["-j 1", "-j 2", "-j"])
def test_forall_jobs(jobs, west_init_tmpdir):
# 'forall' with no projects cloned shouldn't fail
output = cmd(['forall', jobs, '-c', '']).splitlines()
assert '=== running "" in manifest (zephyr):' in output

cmd('update net-tools Kconfiglib')

# print order is no longer guaranteed when there are multiple projects
Copy link
Collaborator

@marc-hb marc-hb Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then the banners don't make sense anymore and should not be printed when j > 1

It's more complicated...

output = cmd(['forall', jobs, '-c', '']).splitlines()

assert '=== running "" in manifest (zephyr):' in output
assert '=== running "" in net-tools (net-tools):' in output
assert '=== running "" in Kconfiglib (subdir/Kconfiglib):' in output


def test_grep(west_init_tmpdir):
# Make sure we don't find things we don't expect, and do find
# things we do.
Expand Down
Loading