Skip to content

Commit

Permalink
Make execution parallel (#9)
Browse files Browse the repository at this point in the history
* make everything run in parallel

* use block printer to have some status messages on screen

* new parameter: `-j` or `--num_threads` for controlling the level of parallelism

* new parameter: `-no_status` to disable printing status info on screen
  • Loading branch information
niosus authored Jun 12, 2017
1 parent 332c550 commit 08b712e
Show file tree
Hide file tree
Showing 11 changed files with 290 additions and 103 deletions.
65 changes: 57 additions & 8 deletions catkin_tools_fetch/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ def prepare_arguments(parser):
config_group.add_argument(
'--default_url', default="{package}",
help='Where to look for packages by default.')
config_group.add_argument('--no_status', action='store_true',
help='Do not use progress status when cloning.')
config_group.add_argument('--num_threads', '-j',
type=int,
default=4,
help='Number of threads run in parallel.')

# Behavior
behavior_group = parser.add_argument_group(
Expand Down Expand Up @@ -101,6 +107,14 @@ def prepare_arguments_deps(parser):
action='store_true',
default=False,
help='Print output from commands.')
parent_parser.add_argument('--no_status',
action='store_true',
default=False,
help='Do not use progress status when cloning.')
parent_parser.add_argument('--num_threads', '-j',
type=int,
default=4,
help='Number of threads run in parallel.')

packages_help_msg = """
Packages for which the dependencies are analyzed.
Expand Down Expand Up @@ -144,6 +158,7 @@ def prepare_arguments_deps(parser):
metavar='PKGNAME',
nargs='*',
help=packages_help_msg)

return parser


Expand All @@ -163,6 +178,15 @@ def main(opts):
else:
log.setLevel(logging.getLevelName("INFO"))

if opts.no_status:
log.info(" Not printing status messages while cloning.")
use_preprint = False
else:
log.info(" Will print status messages while cloning.")
use_preprint = True

log.info(" Using %s threads.", opts.num_threads)

context = Context.load(opts.workspace, opts.profile, opts, append=True)
default_url = Tools.prepare_default_url(opts.default_url)
if not opts.workspace:
Expand All @@ -189,23 +213,34 @@ def main(opts):
return fetch(packages=opts.packages,
workspace=opts.workspace,
context=context,
default_url=default_url)
default_url=default_url,
use_preprint=use_preprint,
num_threads=opts.num_threads)
if opts.subverb == 'update':
return update(packages=opts.packages,
workspace=opts.workspace,
context=context,
default_url=default_url,
conflict_strategy=opts.on_conflict)


def update(packages, workspace, context, default_url, conflict_strategy):
conflict_strategy=opts.on_conflict,
use_preprint=use_preprint,
num_threads=opts.num_threads)


def update(packages,
workspace,
context,
default_url,
conflict_strategy,
use_preprint,
num_threads):
"""Update packages from the available remotes.
Args:
packages (list): A list of packages provided by the user.
workspace (str): Path to a workspace (without src/ in the end).
context (Context): Current context. Needed to find current packages.
default_url (str): A default url with a {package} placeholder in it.
use_preprint (bool): Show status messages while cloning
Returns:
int: Return code. 0 if success. Git error code otherwise.
Expand All @@ -214,19 +249,29 @@ def update(packages, workspace, context, default_url, conflict_strategy):
workspace_packages = find_packages(context.source_space_abs,
exclude_subspaces=True,
warnings=[])
updater = Updater(ws_path, workspace_packages, conflict_strategy)
updater = Updater(ws_path=ws_path,
packages=workspace_packages,
conflict_strategy=conflict_strategy,
use_preprint=use_preprint,
num_threads=num_threads)
updater.update_packages(packages)
return 0


def fetch(packages, workspace, context, default_url):
def fetch(packages,
workspace,
context,
default_url,
use_preprint,
num_threads):
"""Fetch dependencies of a package.
Args:
packages (list): A list of packages provided by the user.
workspace (str): Path to a workspace (without src/ in the end).
context (Context): Current context. Needed to find current packages.
default_url (str): A default url with a {package} placeholder in it.
use_preprint (bool): Show status messages while cloning
Returns:
int: Return code. 0 if success. Git error code otherwise.
Expand Down Expand Up @@ -267,7 +312,11 @@ def fetch(packages, workspace, context, default_url):
# to download dependencies for one project only.
packages.add(new_dep_name)
try:
downloader = Downloader(ws_path, available_pkgs, ignore_pkgs)
downloader = Downloader(ws_path=ws_path,
available_pkgs=available_pkgs,
ignore_pkgs=ignore_pkgs,
use_preprint=use_preprint,
num_threads=num_threads)
except ValueError as e:
log.critical(" Encountered error. Abort.")
log.critical(" Error message: %s", e.message)
Expand Down
8 changes: 5 additions & 3 deletions catkin_tools_fetch/lib/dependency_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from xml.dom import minidom

from catkin_tools_fetch.lib.tools import Tools
from catkin_tools_fetch.lib.printer import Printer

log = logging.getLogger('deps')

Expand Down Expand Up @@ -55,6 +56,7 @@ def __init__(self, download_mask, pkg_name):
'`download_mask` must contain a "{package}" placeholder.')
self.__download_mask = download_mask
self.pkg_name = pkg_name
self.printer = Printer()

def get_dependencies(self, package_folder):
"""Find and parse package.xml file and return a dict of dependencies.
Expand All @@ -76,9 +78,9 @@ def get_dependencies(self, package_folder):
deps = Parser.__node_to_list(xmldoc, tag)
deps = Parser.__fix_dependencies(deps, self.pkg_name)
all_deps += deps
log.info(" %-21s: Found %s dependencies.",
Tools.decorate(self.pkg_name),
len(all_deps))
msg = " {}: Found {} dependencies".format(
Tools.decorate(self.pkg_name), len(all_deps))
self.printer.print_msg(msg)
log.debug(" Dependencies: %s", all_deps)
deps_with_urls = self.__init_dep_dict(all_deps)
return Parser.__update_explicit_values(xmldoc, deps_with_urls)
Expand Down
91 changes: 65 additions & 26 deletions catkin_tools_fetch/lib/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
import logging

from os import path
from termcolor import colored
from concurrent import futures

from catkin_tools_fetch.lib.tools import Tools
from catkin_tools_fetch.lib.tools import GitBridge
from catkin_tools_fetch.lib.printer import Printer

log = logging.getLogger('deps')

Expand All @@ -22,12 +25,19 @@ class Downloader(object):
ws_path (str): Workspace path. This is where packages live.
"""

IGNORE_TAG = "[IGNORED]"
NOT_FOUND_TAG = "[NOT FOUND]"
IGNORE_TAG = colored("[IGNORED]", 'yellow')
NOT_FOUND_TAG = colored("[NOT FOUND]", 'red')
CLONING_TAG = "[CLONING]"
CHECKING_TAG = "[CHECKING]"

NO_ERROR = 0

def __init__(self, ws_path, available_pkgs, ignore_pkgs):
def __init__(self,
ws_path,
available_pkgs,
ignore_pkgs,
use_preprint=True,
num_threads=4):
"""Init a downloader.
Args:
Expand All @@ -44,6 +54,9 @@ def __init__(self, ws_path, available_pkgs, ignore_pkgs):
self.ws_path = ws_path
self.available_pkgs = available_pkgs
self.ignore_pkgs = ignore_pkgs
self.thread_pool = futures.ThreadPoolExecutor(max_workers=num_threads)
self.use_preprint = use_preprint
self.printer = Printer()

def download_dependencies(self, dep_dict):
"""Check and download dependencies from a dependency dictionary.
Expand All @@ -59,6 +72,14 @@ def download_dependencies(self, dep_dict):
checked_deps = self.__check_dependencies(dep_dict)
return self.__clone_dependencies(checked_deps)

def __clone_dependency(self, pkg_name, url, dep_path, branch):
"""Clone a single dependency. Return a future to the clone process."""
if self.use_preprint:
msg = " {}: {}".format(Tools.decorate(pkg_name),
Downloader.CLONING_TAG)
self.printer.add_msg(pkg_name, msg)
return GitBridge.clone(pkg_name, url, dep_path, branch)

def __clone_dependencies(self, checked_deps):
"""Clone dependencies.
Expand All @@ -73,28 +94,39 @@ def __clone_dependencies(self, checked_deps):
return Downloader.NO_ERROR
log.info(" Cloning valid dependencies:")
error_code = Downloader.NO_ERROR
# store all tasks in a futures list
futures_list = []
for name, dependency in checked_deps.items():
url = dependency.url
branch = dependency.branch
if not branch:
branch = "master"
if name in self.available_pkgs:
log.info(" %-21s: %s",
Tools.decorate(name),
GitBridge.EXISTS_TAG)
msg = " {}: {}".format(
Tools.decorate(name), GitBridge.EXISTS_TAG)
self.printer.purge_msg(name, msg)
continue
dep_path = path.join(self.ws_path, name)
clone_result = GitBridge.clone(url, dep_path, branch)
if clone_result in [GitBridge.CLONED_TAG.format(branch=branch),
GitBridge.EXISTS_TAG]:
log.info(" %-21s: %s", Tools.decorate(name), clone_result)
elif clone_result == GitBridge.ERROR_TAG:
log.error(" %-21s: %s", Tools.decorate(name), clone_result)
future = self.thread_pool.submit(
self.__clone_dependency, name, url, dep_path, branch)
futures_list.append(future)
# we have all the futures ready. Now just wait for them to finish.
for future in futures.as_completed(futures_list):
pkg_name, clone_result = future.result()
msg = " {}: {}".format(
Tools.decorate(pkg_name), clone_result)
self.printer.purge_msg(pkg_name, msg)
if clone_result == GitBridge.ERROR_TAG:
error_code = 1
else:
log.error(" undefined result of clone.")
return error_code

def __check_dependency(self, dependency):
if self.use_preprint:
msg = " {}: {}".format(
Tools.decorate(dependency.name), Downloader.CHECKING_TAG)
self.printer.add_msg(dependency.name, msg)
return GitBridge.repository_exists(dependency)

def __check_dependencies(self, dep_dict):
"""Check dependencies for validity.
Expand All @@ -111,18 +143,25 @@ def __check_dependencies(self, dep_dict):
if not dep_dict:
# exit early if there are no new dependencies
return checked_deps
futures_list = []
log.info(" Checking merged dependencies:")
for name, dependency in dep_dict.items():
url = dependency.url
if name in self.ignore_pkgs:
log.info(" %-21s: %s",
Tools.decorate(name),
Downloader.IGNORE_TAG)
elif GitBridge.repository_exists(url):
log.info(" %-21s: %s", Tools.decorate(name), url)
checked_deps[name] = dependency
for dependency in dep_dict.values():
if dependency.name in self.ignore_pkgs:
msg = " {}: {}".format(
Tools.decorate(dependency.name), Downloader.IGNORE_TAG)
self.printer.add_msg(dependency.name, msg)
continue
futures_list.append(self.thread_pool.submit(
self.__check_dependency, dependency))
for future in futures.as_completed(futures_list):
dependency, repo_found = future.result()
if repo_found:
msg = " {}: {}".format(
Tools.decorate(dependency.name), dependency.url)
self.printer.purge_msg(dependency.name, msg)
checked_deps[dependency.name] = dependency
else:
log.info(" %-21s: %s",
Tools.decorate(name),
Downloader.NOT_FOUND_TAG)
msg = " {}: {}".format(
Tools.decorate(dependency.name), Downloader.NOT_FOUND_TAG)
self.printer.purge_msg(dependency.name, msg)
return checked_deps
47 changes: 47 additions & 0 deletions catkin_tools_fetch/lib/printer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""Module for printing a block of text overwriting previous text."""

import sys
from threading import RLock


class Printer:
"""Reprints messages wiping unneeded lines. Supports multiple threads."""

__rlock = RLock()

def __init__(self, line_length=70):
"""Initialize object."""
self.__msgs = {}
self.__line_length = line_length

def add_msg(self, key, msg):
"""Add a new message and print it on last line."""
with self.__rlock:
self.__msgs[key] = msg
print(self.__msgs[key].ljust(self.__line_length, " "))

def print_msg(self, msg):
"""Print a single message."""
print(msg.ljust(self.__line_length, " "))

def purge_msg(self, key, last_msg):
"""Print the last message on top active line and move lower."""
with self.__rlock:
self.__move_up()
if key in self.__msgs:
del self.__msgs[key]
print(last_msg.ljust(self.__line_length, " "))
self.__print_active(move_up=False)

def __print_active(self, move_up=False):
"""Print all active messages overwriting console."""
# Clear previous text by overwritig non-spaces with spaces
if move_up:
self.__move_up()
for key in self.__msgs.keys():
print(self.__msgs[key].ljust(self.__line_length, " "))

def __move_up(self):
"""Move cursor to the top active line."""
for _ in range(len(self.__msgs)):
sys.stdout.write("\033[A")
Loading

0 comments on commit 08b712e

Please sign in to comment.