Skip to content

Commit

Permalink
bodhi-ci: a Job shouldn't have to know about all other jobs
Browse files Browse the repository at this point in the history
This introduces a `ProgressReporter` class which handles the status
printing.

Signed-off-by: Aurélien Bompard <[email protected]>
  • Loading branch information
abompard authored and bowlofeggs committed Jan 7, 2019
1 parent b79b7dd commit 42ef370
Showing 1 changed file with 57 additions and 40 deletions.
97 changes: 57 additions & 40 deletions devel/ci/bodhi-ci
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,7 @@ def build(concurrency, container_runtime, failfast, release, tty):
def clean(concurrency, container_runtime, init, release, tty):
"""Remove all builds pertaining to Bodhi CI."""
buffer_output = concurrency != 1 or len(release) != 1
clean_jobs = []
for r in release:
clean_jobs.append(CleanJob(r, buffer_output=buffer_output, all_jobs=clean_jobs))
clean_jobs = [CleanJob(r, buffer_output=buffer_output) for r in release]
_run_jobs(clean_jobs)


Expand Down Expand Up @@ -294,6 +292,44 @@ def integration(archive, concurrency, container_runtime, no_build, failfast, ini
archive_path=archive_path))


class ProgressReporter(object):
"""Report progress on the provided jobs.
Attributes:
jobs: A list of the Jobs to report on.
"""

def __init__(self, jobs: typing.List['Job']):
"""
Args:
jobs: A list of the Jobs to report on.
"""
self._jobs = [] # type: typing.List[Job]
for job in jobs:
self.register_job(job)

def register_job(self, job):
"""Register a job to be reported.
Args:
job (Job): A job to report the status of.
"""
self._jobs.append(job)
loop = asyncio.get_event_loop()
loop.create_task(self._print_on_complete(job))

async def _print_on_complete(self, job):
"""Print the status of a job when it's complete."""
await job.complete.wait()
self.print_status()

def print_status(self):
"""Print a status report on all the jobs."""
for job in self._jobs:
click.echo(job.summary_line)
click.echo('\n')


class Job(object):
"""
Represent a CI job.
Expand All @@ -319,16 +355,14 @@ class Job(object):
# into the {}'s.
_container_image_template = '{}/{}'

def __init__(self, release: str, all_jobs: typing.List['Job'],
def __init__(self, release: str,
depends_on: typing.Union[typing.List['Job'], 'Job'] = None,
buffer_output: bool = False):
"""
Initialize the new Job.
Args:
release: The release this Job pertains to.
all_jobs: The list of all Jobs running. Used to print out progress reports as the Job
progresses.
depends_on: A list of Job that this Job should wait to complete before
starting. It is also possible to provide a Job instance if there is a single
dependency.
Expand All @@ -347,7 +381,6 @@ class Job(object):
self.started = False
self.returncode = None
self.skipped = False
self._all_jobs = all_jobs
self._container_image = self._container_image_template.format(CONTAINER_NAME, self.release)
self._popen_kwargs = {'shell': False} # type: typing.Mapping[str, typing.Union[bool, int]]
if buffer_output:
Expand Down Expand Up @@ -444,7 +477,6 @@ class Job(object):
self._finish_time = datetime.datetime.utcnow()
# Kick off any tasks that were waiting for us to finish.
self.complete.set()
_print_status(self._all_jobs)

return self

Expand Down Expand Up @@ -957,29 +989,26 @@ def _build_jobs_list(
'unit'):
# We don't want to build non-pip releases if the command is mypy; mypy only uses pip.
if command != 'mypy' or release == 'pip':
build_job = BuildJob(release, jobs, buffer_output=buffer_output)
build_job = BuildJob(release, buffer_output=buffer_output)
jobs.append(build_job)
if command in ('all', 'docs'):
docs_job = DocsJob(release, jobs, depends_on=build_job, buffer_output=buffer_output)
docs_job = DocsJob(release, depends_on=build_job, buffer_output=buffer_output)
jobs.append(docs_job)
if command in ('all', 'flake8'):
flake8_job = Flake8Job(release, jobs, depends_on=build_job, buffer_output=buffer_output)
flake8_job = Flake8Job(release, depends_on=build_job, buffer_output=buffer_output)
jobs.append(flake8_job)
if command in ('all', 'mypy') and release == 'pip':
mypy_build_job = MyPyBuildJob(release, jobs, depends_on=build_job,
buffer_output=buffer_output)
mypy_job = MyPyJob(release, jobs, depends_on=mypy_build_job,
buffer_output=buffer_output)
mypy_build_job = MyPyBuildJob(release, depends_on=build_job, buffer_output=buffer_output)
mypy_job = MyPyJob(release, depends_on=mypy_build_job, buffer_output=buffer_output)
jobs.extend([mypy_build_job, mypy_job])
if command in ('all', 'pydocstyle'):
pydocstyle_job = PydocstyleJob(release, jobs, depends_on=build_job,
buffer_output=buffer_output)
pydocstyle_job = PydocstyleJob(release, depends_on=build_job, buffer_output=buffer_output)
jobs.append(pydocstyle_job)
if command in ('all', 'diff_cover', 'unit'):
for pyver in pyvers:
unit_job = UnitJob(
archive=archive, archive_path=archive_path, pyver=pyver, release=release,
depends_on=build_job, buffer_output=buffer_output, all_jobs=jobs)
depends_on=build_job, buffer_output=buffer_output)
jobs.append(unit_job)
if command in ('all', 'diff_cover'):
if pyver == 2 and release not in ('f27', 'f28', 'f29', 'pip'):
Expand All @@ -988,24 +1017,23 @@ def _build_jobs_list(
continue
diff_cover_job = DiffCoverJob(
archive=archive, archive_path=archive_path, pyver=pyver,
release=release, depends_on=unit_job, buffer_output=buffer_output,
all_jobs=jobs)
release=release, depends_on=unit_job, buffer_output=buffer_output)
jobs.append(diff_cover_job)
if command in ('all', 'integration') and 3 in pyvers and "f29" in releases:
build_jobs = [
IntegrationBuildJob(app_name="resultsdb", release="f29", all_jobs=jobs,
IntegrationBuildJob(app_name="resultsdb", release="f29",
buffer_output=buffer_output),
IntegrationBuildJob(app_name="waiverdb", release="f29", all_jobs=jobs,
IntegrationBuildJob(app_name="waiverdb", release="f29",
buffer_output=buffer_output),
IntegrationBuildJob(app_name="greenwave", release="f29", all_jobs=jobs,
IntegrationBuildJob(app_name="greenwave", release="f29",
buffer_output=buffer_output),
IntegrationBuildJob(app_name="bodhi", release="f29", all_jobs=jobs,
IntegrationBuildJob(app_name="bodhi", release="f29",
buffer_output=buffer_output),
]
jobs.extend(build_jobs)
integration_job = IntegrationJob(
archive=archive, archive_path=archive_path, pyver=3, release="f29",
depends_on=build_jobs, buffer_output=buffer_output, all_jobs=jobs)
depends_on=build_jobs, buffer_output=buffer_output)
jobs.append(integration_job)

return jobs
Expand All @@ -1025,18 +1053,6 @@ def _cancel_jobs(jobs):
job.cancelled = True


def _print_status(jobs: typing.List[Job]):
"""
Print a status report on all the jobs.
Args:
jobs: A list of the Jobs to report on.
"""
for job in jobs:
click.echo(job.summary_line)
click.echo('\n')


def _process_results(loop, done, pending):
"""
Process the finished and pendings tasks and return error output and an exit code.
Expand Down Expand Up @@ -1093,14 +1109,15 @@ def _run_jobs(jobs):
Args:
jobs (list): A list of Jobs to run.
"""
_print_status(jobs)

if not jobs:
click.echo("No jobs!")
sys.exit(3)

loop = asyncio.get_event_loop()

progress_reporter = ProgressReporter(jobs)
progress_reporter.print_status()

processes = [j.run() for j in jobs]

return_when = asyncio.ALL_COMPLETED
Expand Down Expand Up @@ -1135,7 +1152,7 @@ def _stop_all_jobs(loop):
"""
args = [container_runtime, 'ps', '--filter=label={}'.format(CONTAINER_LABEL), '-q']
processes = subprocess.check_output(args).decode()
stop_jobs = [StopJob(process, all_jobs=[]).run()
stop_jobs = [StopJob(process).run()
for process in processes.split('\n') if process]

# If you give run_until_complete a future with no tasks, you will haz a sad (that's the
Expand Down

0 comments on commit 42ef370

Please sign in to comment.