diff --git a/signing-manifests/bug1639199.0.yml b/signing-manifests/bug1639199.0.yml index 7e4173d..778b871 100644 --- a/signing-manifests/bug1639199.0.yml +++ b/signing-manifests/bug1639199.0.yml @@ -1,10 +1,12 @@ --- -url: https://bugzilla.mozilla.org/attachment.cgi?id=9153960#/esr78_switch_test.mar -artifact-name: esr78_switch_test.mar bug: 1639199 -private-artifact: false -signing-formats: ["autograph_hash_only_mar384"] sha256: b43c2d8fec3bc98f25a28cabf05147c627c13b7657683a500309f11b393446fb filesize: 2207 +private-artifact: false +signing-formats: ["autograph_hash_only_mar384"] requestor: Justin Wood reason: test mar signing for esr switch +artifact-name: esr78_switch_test.mar +fetch: + type: bmo-attachment + attachment-id: 9153960 diff --git a/signing-manifests/bug1642701.yml b/signing-manifests/bug1642701.yml index 31d626f..4ef7820 100644 --- a/signing-manifests/bug1642701.yml +++ b/signing-manifests/bug1642701.yml @@ -1,10 +1,11 @@ --- -url: https://github.com/mozilla-releng/staging-adhoc-signing/raw/99e70937e8ea2aac48168e473ae5ba39b47978d1/artifacts/SignableFile.bin -artifact-name: SignableFile.bin bug: 1642701 -private-artifact: false -signing-formats: ["autograph_authenticode_stub"] sha256: 5de3f160913fe1764ea4d572762ad3119969648cd6e80ac40862321b6943a063 filesize: 4096 +private-artifact: false +signing-formats: ["autograph_authenticode_stub"] requestor: tjr reason: new cert +artifact-name: SignableFile.bin +fetch: + url: https://github.com/mozilla-releng/staging-adhoc-signing/raw/99e70937e8ea2aac48168e473ae5ba39b47978d1/artifacts/SignableFile.bin diff --git a/signing-manifests/example.yml.tmpl b/signing-manifests/example.yml.tmpl index 3e0a7a2..cc48ed0 100644 --- a/signing-manifests/example.yml.tmpl +++ b/signing-manifests/example.yml.tmpl @@ -1,10 +1,12 @@ --- -url: https://evil.com/foo/bar.exe -artifact-name: bar.exe bug: 12345 -private-artifact: false -signing-formats: ["autograph_gpg"] sha256: abcd12345 filesize: 12345 +private-artifact: false +signing-formats: ["autograph_gpg"] requestor: Dr. Pepper reason: sign my evil file!!! +artifact-name: bar.exe +fetch: + type: static-url + url: https://evil.com/foo/bar.exe diff --git a/signing-manifests/mar.yml b/signing-manifests/mar.yml index bd26309..9023ffb 100644 --- a/signing-manifests/mar.yml +++ b/signing-manifests/mar.yml @@ -1,10 +1,11 @@ --- -url: https://firefox-ci-tc.services.mozilla.com/api/queue/v1/task/Sejw462STvKGjIAQ5dsm2g/runs/0/artifacts/public/build/target.complete.mar -artifact-name: target.complete.mar bug: 12345 -private-artifact: false -signing-formats: ["autograph_hash_only_mar384"] sha256: df3463a5f3f84c9d4a572d0ebbb6dce6b4bb21ebc8ba86400268e9546441ec03 filesize: 64260074 +private-artifact: false +signing-formats: ["autograph_hash_only_mar384"] requestor: Aki Sasaki reason: test mar signing +artifact-name: target.complete.mar +fetch: + url: https://firefox-ci-tc.services.mozilla.com/api/queue/v1/task/Sejw462STvKGjIAQ5dsm2g/runs/0/artifacts/public/build/target.complete.mar diff --git a/signing-manifests/stub.yml b/signing-manifests/stub.yml index 5f07b5f..b723026 100644 --- a/signing-manifests/stub.yml +++ b/signing-manifests/stub.yml @@ -1,10 +1,11 @@ --- -url: https://firefox-ci-tc.services.mozilla.com/api/queue/v1/task/V6HrQPOuT5mjeaGu1bV9iA/runs/0/artifacts/public/build/setup-stub.exe -artifact-name: setup-stub.exe bug: 12345 -private-artifact: false -signing-formats: ["autograph_authenticode_stub"] sha256: aa78e7e049789fb49ab0128f1195fce7f90258ef46615ba90f94af5784f101a3 filesize: 451035 +private-artifact: false +signing-formats: ["autograph_authenticode_stub"] requestor: Aki Sasaki reason: test stub signing +artifact-name: setup-stub.exe +fetch: + url: https://firefox-ci-tc.services.mozilla.com/api/queue/v1/task/V6HrQPOuT5mjeaGu1bV9iA/runs/0/artifacts/public/build/setup-stub.exe diff --git a/taskcluster/adhoc_taskgraph/__init__.py b/taskcluster/adhoc_taskgraph/__init__.py index d75d68b..9472b5f 100644 --- a/taskcluster/adhoc_taskgraph/__init__.py +++ b/taskcluster/adhoc_taskgraph/__init__.py @@ -19,6 +19,7 @@ def register(graph_config): "signing_manifest", "target", "worker_types", + "fetches", ]) diff --git a/taskcluster/adhoc_taskgraph/fetches.py b/taskcluster/adhoc_taskgraph/fetches.py new file mode 100644 index 0000000..2dbd622 --- /dev/null +++ b/taskcluster/adhoc_taskgraph/fetches.py @@ -0,0 +1,62 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +from __future__ import absolute_import, print_function, unicode_literals + +from six import text_type + +from voluptuous import Required + +from taskgraph.util.schema import taskref_or_string +from taskgraph.util import path as mozpath +from taskgraph.transforms.fetch import fetch_builder + + +@fetch_builder('bmo-attachment', schema={ + # The URL to download. + Required('attachment-id'): text_type, + + # The SHA-256 of the downloaded content. + Required('sha256'): text_type, + + # Size of the downloaded entity, in bytes. + Required('size'): int, + + # The name to give to the generated artifact. + Required('artifact-name'): text_type, + +}) +def create_fetch_url_task(config, name, fetch): + + artifact_name = fetch['artifact-name'] + + workdir = '/builds/worker' + + # Arguments that matter to the cache digest + args = ( + 'bmo-attachment ' + '--sha256 {} ' + '--size {} ' + '--name {} ' + '{} ' + '/builds/worker/artifacts/{}'.format( + fetch['sha256'], + fetch['size'], + artifact_name, + fetch['attachment-id'], + artifact_name + ) + ) + + cmd = [ + 'bash', + '-c', + '/usr/local/bin/fetch-bmo.py {}'.format(args) + ] + + return { + 'command': cmd, + 'artifact_name': artifact_name, + 'digest_data': args, + } diff --git a/taskcluster/adhoc_taskgraph/signing_manifest.py b/taskcluster/adhoc_taskgraph/signing_manifest.py index 4398002..96e6a86 100644 --- a/taskcluster/adhoc_taskgraph/signing_manifest.py +++ b/taskcluster/adhoc_taskgraph/signing_manifest.py @@ -37,16 +37,25 @@ base_schema = Schema( { - Required("url"): text_type, Required("bug"): int, - Required("private-artifact"): bool, - Required("signing-formats"): [Any(*SUPPORTED_SIGNING_FORMATS)], Required("sha256"): text_type, Required("filesize"): int, + Required("private-artifact"): bool, + Required("signing-formats"): [Any(*SUPPORTED_SIGNING_FORMATS)], Required("requestor"): basestring, Required("reason"): basestring, - Optional("gpg-signature"): basestring, Required("artifact-name"): basestring, + Required("fetch"): Any( + { + Optional("gpg-signature"): basestring, + Optional('type'): 'static-url', + Required('url'): basestring, + }, + { + Required('type'): 'bmo-attachment', + Required('attachment-id'): Any(basestring, int) + } + ), Required("manifest_name"): basestring, } ) diff --git a/taskcluster/adhoc_taskgraph/transforms/fetch.py b/taskcluster/adhoc_taskgraph/transforms/fetch.py index d864dcf..05414e2 100644 --- a/taskcluster/adhoc_taskgraph/transforms/fetch.py +++ b/taskcluster/adhoc_taskgraph/transforms/fetch.py @@ -19,11 +19,17 @@ def from_manifests(config, jobs): manifest = job.pop('manifest') job['name'] = manifest['manifest_name'] fetch = job.setdefault("fetch", {}) - fetch['type'] = 'static-url' - fetch["url"] = manifest["url"] + fetch['type'] = manifest["fetch"].get('type', 'static-url') + if fetch['type'] == 'static-url': + fetch["url"] = manifest["fetch"]["url"] + if manifest['fetch'].get('gpg-signature'): + fetch['gpg-signature'] = manifest['fetch'].get('gpg-signature') + elif fetch['type'] == 'bmo-attachment': + fetch['attachment-id'] = unicode(manifest["fetch"]['attachment-id']) fetch["sha256"] = manifest["sha256"] fetch["size"] = manifest["filesize"] - for k in ("gpg-signature", "artifact-name"): + + for k in ("artifact-name", ): if manifest.get(k): fetch[k] = manifest[k] job.setdefault('attributes', {})['manifest'] = manifest diff --git a/taskcluster/ci/dep-signing/kind.yml b/taskcluster/ci/dep-signing/kind.yml index 40d8464..3d0a2b1 100644 --- a/taskcluster/ci/dep-signing/kind.yml +++ b/taskcluster/ci/dep-signing/kind.yml @@ -14,10 +14,12 @@ transforms: job-template: description: Ad-hoc signing + attributes: + code-review: true index: type: dep-signing worker-type: dep-signing worker: signing-type: dep-signing max-run-time: 3600 - run-on-tasks-for: ['action'] + run-on-tasks-for: ['action', 'github-pull-request'] diff --git a/taskcluster/ci/fetch/kind.yml b/taskcluster/ci/fetch/kind.yml index 734be61..0fbeb6c 100644 --- a/taskcluster/ci/fetch/kind.yml +++ b/taskcluster/ci/fetch/kind.yml @@ -12,3 +12,5 @@ transforms: job-template: description: 'Unsigned artifact' + attributes: + code-review: true diff --git a/taskcluster/ci/pr/kind.yml b/taskcluster/ci/pr/kind.yml index 6fb365b..2afb4d6 100644 --- a/taskcluster/ci/pr/kind.yml +++ b/taskcluster/ci/pr/kind.yml @@ -6,6 +6,7 @@ loader: taskgraph.loader.transform:loader kind-dependencies: - fetch + - dep-signing transforms: - taskgraph.transforms.code_review:transforms diff --git a/taskcluster/docker/fetch/Dockerfile b/taskcluster/docker/fetch/Dockerfile index 9cde9e9..7961188 100644 --- a/taskcluster/docker/fetch/Dockerfile +++ b/taskcluster/docker/fetch/Dockerfile @@ -24,6 +24,9 @@ RUN apt-get update && \ # %include-run-task +COPY fetch-bmo.py /usr/local/bin/fetch-bmo.py +RUN chmod a+x /usr/local/bin/fetch-bmo.py + ENV SHELL=/bin/bash \ HOME=/builds/worker \ PATH=/builds/worker/.local/bin:$PATH diff --git a/taskcluster/docker/fetch/fetch-bmo.py b/taskcluster/docker/fetch/fetch-bmo.py new file mode 100644 index 0000000..72d7589 --- /dev/null +++ b/taskcluster/docker/fetch/fetch-bmo.py @@ -0,0 +1,253 @@ +#!/usr/bin/python3 -u +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +# XXX This file borrows much from `fetch-content` + +import argparse +import contextlib +import gzip +import hashlib +import pathlib +import random +import sys +import time +import urllib.parse +import urllib.request + + +def log(msg): + print(msg, file=sys.stderr) + sys.stderr.flush() + + +class IntegrityError(Exception): + """Represents an integrity error when downloading a URL.""" + + +@contextlib.contextmanager +def rename_after_close(fname, *args, **kwargs): + """ + Context manager that opens a temporary file to use as a writer, + and closes the file on context exit, renaming it to the expected + file name in case of success, or removing it in case of failure. + + Takes the same options as open(), but must be used as a context + manager. + """ + path = pathlib.Path(fname) + tmp = path.with_name('%s.tmp' % path.name) + try: + with tmp.open(*args, **kwargs) as fh: + yield fh + except Exception: + tmp.unlink() + raise + else: + tmp.rename(fname) + + +# The following is copied from +# https://github.com/mozilla-releng/redo/blob/6d07678a014e0c525e54a860381a165d34db10ff/redo/__init__.py#L15-L85 +def retrier(attempts=5, sleeptime=10, max_sleeptime=300, sleepscale=1.5, jitter=1): + """ + A generator function that sleeps between retries, handles exponential + backoff and jitter. The action you are retrying is meant to run after + retrier yields. + + At each iteration, we sleep for sleeptime + random.randint(-jitter, jitter). + Afterwards sleeptime is multiplied by sleepscale for the next iteration. + + Args: + attempts (int): maximum number of times to try; defaults to 5 + sleeptime (float): how many seconds to sleep between tries; defaults to + 60s (one minute) + max_sleeptime (float): the longest we'll sleep, in seconds; defaults to + 300s (five minutes) + sleepscale (float): how much to multiply the sleep time by each + iteration; defaults to 1.5 + jitter (int): random jitter to introduce to sleep time each iteration. + the amount is chosen at random between [-jitter, +jitter] + defaults to 1 + + Yields: + None, a maximum of `attempts` number of times + + Example: + >>> n = 0 + >>> for _ in retrier(sleeptime=0, jitter=0): + ... if n == 3: + ... # We did the thing! + ... break + ... n += 1 + >>> n + 3 + + >>> n = 0 + >>> for _ in retrier(sleeptime=0, jitter=0): + ... if n == 6: + ... # We did the thing! + ... break + ... n += 1 + ... else: + ... print("max tries hit") + max tries hit + """ + jitter = jitter or 0 # py35 barfs on the next line if jitter is None + if jitter > sleeptime: + # To prevent negative sleep times + raise Exception('jitter ({}) must be less than sleep time ({})'.format(jitter, sleeptime)) + + sleeptime_real = sleeptime + for _ in range(attempts): + log("attempt %i/%i" % (_ + 1, attempts)) + + yield sleeptime_real + + if jitter: + sleeptime_real = sleeptime + random.randint(-jitter, jitter) + # our jitter should scale along with the sleeptime + jitter = int(jitter * sleepscale) + else: + sleeptime_real = sleeptime + + sleeptime *= sleepscale + + if sleeptime_real > max_sleeptime: + sleeptime_real = max_sleeptime + + # Don't need to sleep the last time + if _ < attempts - 1: + log("sleeping for %.2fs (attempt %i/%i)" % (sleeptime_real, _ + 1, attempts)) + time.sleep(sleeptime_real) + + +def stream_download(url, sha256=None, size=None): + """Download a URL to a generator, optionally with content verification. + + If ``sha256`` or ``size`` are defined, the downloaded URL will be + validated against those requirements and ``IntegrityError`` will be + raised if expectations do not match. + + Because verification cannot occur until the file is completely downloaded + it is recommended for consumers to not do anything meaningful with the + data if content verification is being used. To securely handle retrieved + content, it should be streamed to a file or memory and only operated + on after the generator is exhausted without raising. + """ + log('Downloading %s' % url) + + h = hashlib.sha256() + length = 0 + + t0 = time.time() + with urllib.request.urlopen(url) as fh: + if not url.endswith('.gz') and fh.info().get('Content-Encoding') == 'gzip': + fh = gzip.GzipFile(fileobj=fh) + + while True: + chunk = fh.read(65536) + if not chunk: + break + + h.update(chunk) + length += len(chunk) + + yield chunk + + duration = time.time() - t0 + digest = h.hexdigest() + + log('%s resolved to %d bytes with sha256 %s in %.3fs' % ( + url, length, digest, duration)) + + if size: + if size == length: + log('Verified size of %s' % url) + else: + raise IntegrityError('size mismatch on %s: wanted %d; got %d' % ( + url, size, length)) + + if sha256: + if digest == sha256: + log('Verified sha256 integrity of %s' % url) + else: + raise IntegrityError('sha256 mismatch on %s: wanted %s; got %s' % ( + url, sha256, digest)) + + +def download_to_path(url, path, sha256=None, size=None): + """Download a URL to a filesystem path, possibly with verification.""" + + # We download to a temporary file and rename at the end so there's + # no chance of the final file being partially written or containing + # bad data. + try: + path.unlink() + except FileNotFoundError: + pass + + for _ in retrier(attempts=5, sleeptime=60): + try: + log('Downloading %s to %s' % (url, path)) + + with rename_after_close(path, 'wb') as fh: + for chunk in stream_download(url, sha256=sha256, size=size): + fh.write(chunk) + + return + except IntegrityError: + raise + except Exception as e: + log("Download failed: {}".format(e)) + continue + + raise Exception("Download failed, no more retries!") + + +def command_bmo_fetch(args): + + dest = pathlib.Path(args.dest) + dest.parent.mkdir(parents=True, exist_ok=True) + + url = "https://bugzilla.mozilla.org/attachment.cgi?id={}".format(args.attachment_id) + + try: + download_to_path(url, dest, sha256=args.sha256, size=args.size) + + except Exception: + try: + dest.unlink() + except FileNotFoundError: + pass + + raise + + +def main(): + parser = argparse.ArgumentParser() + subparsers = parser.add_subparsers(title='sub commands') + + url = subparsers.add_parser('bmo-attachment', help='Download a static URL') + url.set_defaults(func=command_bmo_fetch) + url.add_argument('--sha256', required=True, + help='SHA-256 of downloaded content') + url.add_argument('--size', required=True, type=int, + help='Size of downloaded content, in bytes') + url.add_argument('--name', required=True, + help='The base filename of what we are fetching') + url.add_argument('attachment_id', help='Attachment ID fetch') + url.add_argument('dest', help='Destination path') + + args = parser.parse_args() + + if not args.dest: + parser.error('no destination directory specified, either pass in --dest ' + 'or set $MOZ_FETCHES_DIR') + + return args.func(args) + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/taskcluster/run-task/fetch-content b/taskcluster/run-task/fetch-content deleted file mode 100755 index 5ea6616..0000000 --- a/taskcluster/run-task/fetch-content +++ /dev/null @@ -1,443 +0,0 @@ -#!/usr/bin/python3 -u -# This Source Code Form is subject to the terms of the Mozilla Public -# License, v. 2.0. If a copy of the MPL was not distributed with this -# file, You can obtain one at http://mozilla.org/MPL/2.0/. - -import argparse -import bz2 -import concurrent.futures -import gzip -import hashlib -import json -import lzma -import multiprocessing -import os -import pathlib -import random -import subprocess -import sys -import tempfile -import time -import urllib.request - -try: - import zstandard -except ImportError: - zstandard = None - - -CONCURRENCY = multiprocessing.cpu_count() - - -def log(msg): - print(msg, file=sys.stderr) - sys.stderr.flush() - - -class IntegrityError(Exception): - """Represents an integrity error when downloading a URL.""" - - -# The following is copied from -# https://github.com/mozilla-releng/redo/blob/6d07678a014e0c525e54a860381a165d34db10ff/redo/__init__.py#L15-L85 -def retrier(attempts=5, sleeptime=10, max_sleeptime=300, sleepscale=1.5, jitter=1): - """ - A generator function that sleeps between retries, handles exponential - backoff and jitter. The action you are retrying is meant to run after - retrier yields. - - At each iteration, we sleep for sleeptime + random.randint(-jitter, jitter). - Afterwards sleeptime is multiplied by sleepscale for the next iteration. - - Args: - attempts (int): maximum number of times to try; defaults to 5 - sleeptime (float): how many seconds to sleep between tries; defaults to - 60s (one minute) - max_sleeptime (float): the longest we'll sleep, in seconds; defaults to - 300s (five minutes) - sleepscale (float): how much to multiply the sleep time by each - iteration; defaults to 1.5 - jitter (int): random jitter to introduce to sleep time each iteration. - the amount is chosen at random between [-jitter, +jitter] - defaults to 1 - - Yields: - None, a maximum of `attempts` number of times - - Example: - >>> n = 0 - >>> for _ in retrier(sleeptime=0, jitter=0): - ... if n == 3: - ... # We did the thing! - ... break - ... n += 1 - >>> n - 3 - - >>> n = 0 - >>> for _ in retrier(sleeptime=0, jitter=0): - ... if n == 6: - ... # We did the thing! - ... break - ... n += 1 - ... else: - ... print("max tries hit") - max tries hit - """ - jitter = jitter or 0 # py35 barfs on the next line if jitter is None - if jitter > sleeptime: - # To prevent negative sleep times - raise Exception('jitter ({}) must be less than sleep time ({})'.format(jitter, sleeptime)) - - sleeptime_real = sleeptime - for _ in range(attempts): - log("attempt %i/%i" % (_ + 1, attempts)) - - yield sleeptime_real - - if jitter: - sleeptime_real = sleeptime + random.randint(-jitter, jitter) - # our jitter should scale along with the sleeptime - jitter = int(jitter * sleepscale) - else: - sleeptime_real = sleeptime - - sleeptime *= sleepscale - - if sleeptime_real > max_sleeptime: - sleeptime_real = max_sleeptime - - # Don't need to sleep the last time - if _ < attempts - 1: - log("sleeping for %.2fs (attempt %i/%i)" % (sleeptime_real, _ + 1, attempts)) - time.sleep(sleeptime_real) - - -def stream_download(url, sha256=None, size=None): - """Download a URL to a generator, optionally with content verification. - - If ``sha256`` or ``size`` are defined, the downloaded URL will be - validated against those requirements and ``IntegrityError`` will be - raised if expectations do not match. - - Because verification cannot occur until the file is completely downloaded - it is recommended for consumers to not do anything meaningful with the - data if content verification is being used. To securely handle retrieved - content, it should be streamed to a file or memory and only operated - on after the generator is exhausted without raising. - """ - log('Downloading %s' % url) - - h = hashlib.sha256() - length = 0 - - t0 = time.time() - with urllib.request.urlopen(url) as fh: - if not url.endswith('.gz') and fh.info().get('Content-Encoding') == 'gzip': - fh = gzip.GzipFile(fileobj=fh) - - while True: - chunk = fh.read(65536) - if not chunk: - break - - h.update(chunk) - length += len(chunk) - - yield chunk - - duration = time.time() - t0 - digest = h.hexdigest() - - log('%s resolved to %d bytes with sha256 %s in %.3fs' % ( - url, length, digest, duration)) - - if size: - if size == length: - log('Verified size of %s' % url) - else: - raise IntegrityError('size mismatch on %s: wanted %d; got %d' % ( - url, size, length)) - - if sha256: - if digest == sha256: - log('Verified sha256 integrity of %s' % url) - else: - raise IntegrityError('sha256 mismatch on %s: wanted %s; got %s' % ( - url, sha256, digest)) - - -def download_to_path(url, path, sha256=None, size=None): - """Download a URL to a filesystem path, possibly with verification.""" - - # We download to a temporary file and rename at the end so there's - # no chance of the final file being partially written or containing - # bad data. - try: - path.unlink() - except FileNotFoundError: - pass - - for _ in retrier(attempts=5, sleeptime=60): - try: - tmp = path.with_name('%s.tmp' % path.name) - - log('Downloading %s to %s' % (url, tmp)) - - try: - with tmp.open('wb') as fh: - for chunk in stream_download(url, sha256=sha256, size=size): - fh.write(chunk) - except IntegrityError: - tmp.unlink() - raise - - log('Renaming to %s' % path) - tmp.rename(path) - return - except IntegrityError: - raise - except Exception as e: - log("Download failed: {}".format(e)) - continue - - raise Exception("Download failed, no more retries!") - - -def gpg_verify_path(path: pathlib.Path, public_key_data: bytes, - signature_data: bytes): - """Verify that a filesystem path verifies using GPG. - - Takes a Path defining a file to verify. ``public_key_data`` contains - bytes with GPG public key data. ``signature_data`` contains a signed - GPG document to use with ``gpg --verify``. - """ - log('Validating GPG signature of %s' % path) - log('GPG key data:\n%s' % public_key_data.decode('ascii')) - - with tempfile.TemporaryDirectory() as td: - try: - # --batch since we're running unattended. - gpg_args = ['gpg', '--homedir', td, '--batch'] - - log('Importing GPG key...') - subprocess.run(gpg_args + ['--import'], - input=public_key_data, - check=True) - - log('Verifying GPG signature...') - subprocess.run(gpg_args + ['--verify', '-', '%s' % path], - input=signature_data, - check=True) - - log('GPG signature verified!') - finally: - # There is a race between the agent self-terminating and - # shutil.rmtree() from the temporary directory cleanup that can - # lead to exceptions. Kill the agent before cleanup to prevent this. - env = dict(os.environ) - env['GNUPGHOME'] = td - subprocess.run(['gpgconf', '--kill', 'gpg-agent'], env=env) - - -def archive_type(path: pathlib.Path): - """Attempt to identify a path as an extractable archive.""" - if path.suffixes[-2:-1] == ['.tar']: - return 'tar' - elif path.suffix == '.zip': - return 'zip' - else: - return None - - -def extract_archive(path, dest_dir, typ): - """Extract an archive to a destination directory.""" - - # Resolve paths to absolute variants. - path = path.resolve() - dest_dir = dest_dir.resolve() - - # We pipe input to the decompressor program so that we can apply - # custom decompressors that the program may not know about. - if typ == 'tar': - if path.suffix == '.bz2': - ifh = bz2.open(str(path), 'rb') - elif path.suffix == '.gz': - ifh = gzip.open(str(path), 'rb') - elif path.suffix == '.xz': - ifh = lzma.open(str(path), 'rb') - elif path.suffix == '.zst': - if not zstandard: - raise ValueError('zstandard Python package not available') - dctx = zstandard.ZstdDecompressor() - ifh = dctx.stream_reader(path.open('rb')) - elif path.suffix == '.tar': - ifh = path.open('rb') - else: - raise ValueError('unknown archive format for tar file: %s' % path) - - args = ['tar', 'xf', '-'] - pipe_stdin = True - elif typ == 'zip': - # unzip from stdin has wonky behavior. We don't use a pipe for it. - ifh = open(os.devnull, 'rb') - args = ['unzip', '-o', str(path)] - pipe_stdin = False - else: - raise ValueError('unknown archive format: %s' % path) - - log('Extracting %s to %s using %r' % (path, dest_dir, args)) - t0 = time.time() - - with ifh, subprocess.Popen(args, cwd=str(dest_dir), bufsize=0, - stdin=subprocess.PIPE) as p: - while True: - if not pipe_stdin: - break - - chunk = ifh.read(131072) - if not chunk: - break - - p.stdin.write(chunk) - - if p.returncode: - raise Exception('%r exited %d' % (args, p.returncode)) - - log('%s extracted in %.3fs' % (path, time.time() - t0)) - - -def fetch_and_extract(url, dest_dir, extract=True, sha256=None, size=None): - """Fetch a URL and extract it to a destination path. - - If the downloaded URL is an archive, it is extracted automatically - and the archive is deleted. Otherwise the file remains in place in - the destination directory. - """ - - basename = url.split('/')[-1] - dest_path = dest_dir / basename - - download_to_path(url, dest_path, sha256=sha256, size=size) - - if not extract: - return - - typ = archive_type(dest_path) - if typ: - extract_archive(dest_path, dest_dir, typ) - log('Removing %s' % dest_path) - dest_path.unlink() - - -def fetch_urls(downloads): - """Fetch URLs pairs to a pathlib.Path.""" - with concurrent.futures.ThreadPoolExecutor(CONCURRENCY) as e: - fs = [] - - for download in downloads: - fs.append(e.submit(fetch_and_extract, *download)) - - for f in fs: - f.result() - - -def command_static_url(args): - gpg_sig_url = args.gpg_sig_url - gpg_env_key = args.gpg_key_env - - if bool(gpg_sig_url) != bool(gpg_env_key): - print('--gpg-sig-url and --gpg-key-env must both be defined') - return 1 - - if gpg_sig_url: - gpg_signature = b''.join(stream_download(gpg_sig_url)) - gpg_key = os.environb[gpg_env_key.encode('ascii')] - - dest = pathlib.Path(args.dest) - dest.parent.mkdir(parents=True, exist_ok=True) - - try: - download_to_path(args.url, dest, sha256=args.sha256, size=args.size) - - if gpg_sig_url: - gpg_verify_path(dest, gpg_key, gpg_signature) - - except Exception: - try: - dest.unlink() - except FileNotFoundError: - pass - - raise - - -def api(root_url, service, version, path): - # taskcluster-lib-urls is not available when this script runs, so - # simulate its behavior: - if root_url == 'https://taskcluster.net': - return 'https://{service}.taskcluster.net/{version}/{path}'.format( - service=service, version=version, path=path) - return '{root_url}/api/{service}/{version}/{path}'.format( - root_url=root_url, service=service, version=version, path=path) - - -def command_task_artifacts(args): - fetches = json.loads(os.environ['MOZ_FETCHES']) - downloads = [] - for fetch in fetches: - extdir = pathlib.Path(args.dest) - if 'dest' in fetch: - extdir = extdir.joinpath(fetch['dest']) - extdir.mkdir(parents=True, exist_ok=True) - root_url = os.environ['TASKCLUSTER_ROOT_URL'] - if fetch['artifact'].startswith('public/'): - path = 'task/{task}/artifacts/{artifact}'.format( - task=fetch['task'], artifact=fetch['artifact']) - url = api(root_url, 'queue', 'v1', path) - else: - url = ('{proxy_url}/api/queue/v1/task/{task}/artifacts/{artifact}').format( - proxy_url=os.environ['TASKCLUSTER_PROXY_URL'], - task=fetch['task'], - artifact=fetch['artifact']) - downloads.append((url, extdir, fetch['extract'])) - - fetch_urls(downloads) - - -def main(): - parser = argparse.ArgumentParser() - subparsers = parser.add_subparsers(title='sub commands') - - url = subparsers.add_parser('static-url', help='Download a static URL') - url.set_defaults(func=command_static_url) - url.add_argument('--sha256', required=True, - help='SHA-256 of downloaded content') - url.add_argument('--size', required=True, type=int, - help='Size of downloaded content, in bytes') - url.add_argument('--gpg-sig-url', - help='URL containing signed GPG document validating ' - 'URL to fetch') - url.add_argument('--gpg-key-env', - help='Environment variable containing GPG key to validate') - url.add_argument('url', help='URL to fetch') - url.add_argument('dest', help='Destination path') - - artifacts = subparsers.add_parser('task-artifacts', - help='Fetch task artifacts') - artifacts.set_defaults(func=command_task_artifacts) - artifacts.add_argument('-d', '--dest', default=os.environ.get('MOZ_FETCHES_DIR'), - help='Destination directory which will contain all ' - 'artifacts (defaults to $MOZ_FETCHES_DIR)') - - args = parser.parse_args() - - if not args.dest: - parser.error('no destination directory specified, either pass in --dest ' - 'or set $MOZ_FETCHES_DIR') - - return args.func(args) - - -if __name__ == '__main__': - sys.exit(main()) diff --git a/taskcluster/run-task/hgrc b/taskcluster/run-task/hgrc deleted file mode 100755 index f6a2f66..0000000 --- a/taskcluster/run-task/hgrc +++ /dev/null @@ -1,33 +0,0 @@ -# By default the progress bar starts after 3s and updates every 0.1s. We -# change this so it shows and updates every 1.0s. -# We also tell progress to assume a TTY is present so updates are printed -# even if there is no known TTY. -[progress] -delay = 1.0 -refresh = 1.0 -assume-tty = true - -[extensions] -share = -sparse = -robustcheckout = /usr/local/mercurial/robustcheckout.py - -[hostsecurity] -# When running a modern Python, Mercurial will default to TLS 1.1+. -# When running on a legacy Python, Mercurial will default to TLS 1.0+. -# There is no good reason we shouldn't be running a modern Python -# capable of speaking TLS 1.2. And the only Mercurial servers we care -# about should be running TLS 1.2. So make TLS 1.2 the minimum. -minimumprotocol = tls1.2 - -# Settings to make 1-click loaners more useful. -[extensions] -histedit = -rebase = - -[diff] -git = 1 -showfunc = 1 - -[pager] -pager = LESS=FRSXQ less diff --git a/taskcluster/run-task/robustcheckout.py b/taskcluster/run-task/robustcheckout.py deleted file mode 100644 index 6c47c9e..0000000 --- a/taskcluster/run-task/robustcheckout.py +++ /dev/null @@ -1,813 +0,0 @@ -# This software may be used and distributed according to the terms of the -# GNU General Public License version 2 or any later version. - -"""Robustly perform a checkout. - -This extension provides the ``hg robustcheckout`` command for -ensuring a working directory is updated to the specified revision -from a source repo using best practices to ensure optimal clone -times and storage efficiency. -""" - -from __future__ import absolute_import - -import contextlib -import errno -import functools -import json -import os -import random -import re -import socket -import ssl -import time -import urllib2 -import urlparse - -from mercurial.i18n import _ -from mercurial.node import hex, nullid -from mercurial import ( - commands, - error, - exchange, - extensions, - cmdutil, - hg, - match as matchmod, - phases, - registrar, - scmutil, - util, -) - -# TRACKING hg43 -try: - from mercurial import configitems - configitems.dynamicdefault -except ImportError: - configitems = None - -# Causes worker to purge caches on process exit and for task to retry. -EXIT_PURGE_CACHE = 72 - -testedwith = '4.3 4.4 4.5 4.6 4.7 4.8 4.9 5.0' -minimumhgversion = '4.3' - -cmdtable = {} - -# TRACKING hg43 Mercurial 4.3 introduced registrar.command as a replacement for -# cmdutil.command. -if util.safehasattr(registrar, 'command'): - command = registrar.command(cmdtable) -else: - command = cmdutil.command(cmdtable) - -# TRACKING hg43 Mercurial 4.3 introduced the config registrar. 4.4 requires -# config items to be registered to avoid a devel warning -if util.safehasattr(registrar, 'configitem'): - configtable = {} - configitem = registrar.configitem(configtable) - - configitem('robustcheckout', 'retryjittermin', default=configitems.dynamicdefault) - configitem('robustcheckout', 'retryjittermax', default=configitems.dynamicdefault) - - -# Mercurial 4.2 introduced the vfs module and deprecated the symbol in -# scmutil. -def getvfs(): - try: - from mercurial.vfs import vfs - return vfs - except ImportError: - return scmutil.vfs - - -def getsparse(): - from mercurial import sparse - return sparse - - -def supported_hg(): - '''Returns True if the Mercurial version is supported for robustcheckout''' - return '.'.join( - str(v) for v in util.versiontuple(n=2) - ) in testedwith.split() - - -if os.name == 'nt': - import ctypes - - # Get a reference to the DeleteFileW function - # DeleteFileW accepts filenames encoded as a null terminated sequence of - # wide chars (UTF-16). Python's ctypes.c_wchar_p correctly encodes unicode - # strings to null terminated UTF-16 strings. - # However, we receive (byte) strings from mercurial. When these are passed - # to DeleteFileW via the c_wchar_p type, they are implicitly decoded via - # the 'mbcs' encoding on windows. - kernel32 = ctypes.windll.kernel32 - DeleteFile = kernel32.DeleteFileW - DeleteFile.argtypes = [ctypes.c_wchar_p] - DeleteFile.restype = ctypes.c_bool - - def unlinklong(fn): - normalized_path = '\\\\?\\' + os.path.normpath(fn) - if not DeleteFile(normalized_path): - raise OSError(errno.EPERM, "couldn't remove long path", fn) - -# Not needed on other platforms, but is handy for testing -else: - def unlinklong(fn): - os.unlink(fn) - - -def unlinkwrapper(unlinkorig, fn, ui): - '''Calls unlink_long if original unlink function fails.''' - try: - ui.debug('calling unlink_orig %s\n' % fn) - return unlinkorig(fn) - except WindowsError as e: - # Windows error 3 corresponds to ERROR_PATH_NOT_FOUND - # only handle this case; re-raise the exception for other kinds of - # failures. - if e.winerror != 3: - raise - ui.debug('caught WindowsError ERROR_PATH_NOT_FOUND; ' - 'calling unlink_long %s\n' % fn) - return unlinklong(fn) - - -@contextlib.contextmanager -def wrapunlink(ui): - '''Context manager that temporarily monkeypatches unlink functions.''' - purgemod = extensions.find('purge') - to_wrap = [(purgemod.util, 'unlink')] - - # Pass along the ui object to the unlink_wrapper so we can get logging out - # of it. - wrapped = functools.partial(unlinkwrapper, ui=ui) - - # Wrap the original function(s) with our unlink wrapper. - originals = {} - for mod, func in to_wrap: - ui.debug('wrapping %s %s\n' % (mod, func)) - originals[mod, func] = extensions.wrapfunction(mod, func, wrapped) - - try: - yield - finally: - # Restore the originals. - for mod, func in to_wrap: - ui.debug('restoring %s %s\n' % (mod, func)) - setattr(mod, func, originals[mod, func]) - - -def purgewrapper(orig, ui, *args, **kwargs): - '''Runs original purge() command with unlink monkeypatched.''' - with wrapunlink(ui): - return orig(ui, *args, **kwargs) - - -def peerlookup(remote, v): - # TRACKING hg46 4.6 added commandexecutor API. - if util.safehasattr(remote, 'commandexecutor'): - with remote.commandexecutor() as e: - return e.callcommand('lookup', {'key': v}).result() - else: - return remote.lookup(v) - - -@command('robustcheckout', [ - ('', 'upstream', '', 'URL of upstream repo to clone from'), - ('r', 'revision', '', 'Revision to check out'), - ('b', 'branch', '', 'Branch to check out'), - ('', 'purge', False, 'Whether to purge the working directory'), - ('', 'sharebase', '', 'Directory where shared repos should be placed'), - ('', 'networkattempts', 3, 'Maximum number of attempts for network ' - 'operations'), - ('', 'sparseprofile', '', 'Sparse checkout profile to use (path in repo)'), - ], - '[OPTION]... URL DEST', - norepo=True) -def robustcheckout(ui, url, dest, upstream=None, revision=None, branch=None, - purge=False, sharebase=None, networkattempts=None, - sparseprofile=None): - """Ensure a working copy has the specified revision checked out. - - Repository data is automatically pooled into the common directory - specified by ``--sharebase``, which is a required argument. It is required - because pooling storage prevents excessive cloning, which makes operations - complete faster. - - One of ``--revision`` or ``--branch`` must be specified. ``--revision`` - is preferred, as it is deterministic and there is no ambiguity as to which - revision will actually be checked out. - - If ``--upstream`` is used, the repo at that URL is used to perform the - initial clone instead of cloning from the repo where the desired revision - is located. - - ``--purge`` controls whether to removed untracked and ignored files from - the working directory. If used, the end state of the working directory - should only contain files explicitly under version control for the requested - revision. - - ``--sparseprofile`` can be used to specify a sparse checkout profile to use. - The sparse checkout profile corresponds to a file in the revision to be - checked out. If a previous sparse profile or config is present, it will be - replaced by this sparse profile. We choose not to "widen" the sparse config - so operations are as deterministic as possible. If an existing checkout - is present and it isn't using a sparse checkout, we error. This is to - prevent accidentally enabling sparse on a repository that may have - clients that aren't sparse aware. Sparse checkout support requires Mercurial - 4.3 or newer and the ``sparse`` extension must be enabled. - """ - if not revision and not branch: - raise error.Abort('must specify one of --revision or --branch') - - if revision and branch: - raise error.Abort('cannot specify both --revision and --branch') - - # Require revision to look like a SHA-1. - if revision: - if len(revision) < 12 or len(revision) > 40 or not re.match('^[a-f0-9]+$', revision): - raise error.Abort('--revision must be a SHA-1 fragment 12-40 ' - 'characters long') - - sharebase = sharebase or ui.config('share', 'pool') - if not sharebase: - raise error.Abort('share base directory not defined; refusing to operate', - hint='define share.pool config option or pass --sharebase') - - # Sparse profile support was added in Mercurial 4.3, where it was highly - # experimental. Because of the fragility of it, we only support sparse - # profiles on 4.3. When 4.4 is released, we'll need to opt in to sparse - # support. We /could/ silently fall back to non-sparse when not supported. - # However, given that sparse has performance implications, we want to fail - # fast if we can't satisfy the desired checkout request. - if sparseprofile: - if not supported_hg(): - raise error.Abort('sparse profile support only available for ' - 'Mercurial versions greater than 4.3 (using %s)' % util.version()) - - try: - extensions.find('sparse') - except KeyError: - raise error.Abort('sparse extension must be enabled to use ' - '--sparseprofile') - - ui.warn('(using Mercurial %s)\n' % util.version()) - - # worker.backgroundclose only makes things faster if running anti-virus, - # which our automation doesn't. Disable it. - ui.setconfig('worker', 'backgroundclose', False) - - # By default the progress bar starts after 3s and updates every 0.1s. We - # change this so it shows and updates every 1.0s. - # We also tell progress to assume a TTY is present so updates are printed - # even if there is no known TTY. - # We make the config change here instead of in a config file because - # otherwise we're at the whim of whatever configs are used in automation. - ui.setconfig('progress', 'delay', 1.0) - ui.setconfig('progress', 'refresh', 1.0) - ui.setconfig('progress', 'assume-tty', True) - - sharebase = os.path.realpath(sharebase) - - optimes = [] - behaviors = set() - start = time.time() - - try: - return _docheckout(ui, url, dest, upstream, revision, branch, purge, - sharebase, optimes, behaviors, networkattempts, - sparse_profile=sparseprofile) - finally: - overall = time.time() - start - - # We store the overall time multiple ways in order to help differentiate - # the various "flavors" of operations. - - # ``overall`` is always the total operation time. - optimes.append(('overall', overall)) - - def record_op(name): - # If special behaviors due to "corrupt" storage occur, we vary the - # name to convey that. - if 'remove-store' in behaviors: - name += '_rmstore' - if 'remove-wdir' in behaviors: - name += '_rmwdir' - - optimes.append((name, overall)) - - # We break out overall operations primarily by their network interaction - # We have variants within for working directory operations. - if 'clone' in behaviors and 'create-store' in behaviors: - record_op('overall_clone') - - if 'sparse-update' in behaviors: - record_op('overall_clone_sparsecheckout') - else: - record_op('overall_clone_fullcheckout') - - elif 'pull' in behaviors or 'clone' in behaviors: - record_op('overall_pull') - - if 'sparse-update' in behaviors: - record_op('overall_pull_sparsecheckout') - else: - record_op('overall_pull_fullcheckout') - - if 'empty-wdir' in behaviors: - record_op('overall_pull_emptywdir') - else: - record_op('overall_pull_populatedwdir') - - else: - record_op('overall_nopull') - - if 'sparse-update' in behaviors: - record_op('overall_nopull_sparsecheckout') - else: - record_op('overall_nopull_fullcheckout') - - if 'empty-wdir' in behaviors: - record_op('overall_nopull_emptywdir') - else: - record_op('overall_nopull_populatedwdir') - - server_url = urlparse.urlparse(url).netloc - - if 'TASKCLUSTER_INSTANCE_TYPE' in os.environ: - perfherder = { - 'framework': { - 'name': 'vcs', - }, - 'suites': [], - } - for op, duration in optimes: - perfherder['suites'].append({ - 'name': op, - 'value': duration, - 'lowerIsBetter': True, - 'shouldAlert': False, - 'serverUrl': server_url, - 'extraOptions': [os.environ['TASKCLUSTER_INSTANCE_TYPE']], - 'subtests': [], - }) - - ui.write('PERFHERDER_DATA: %s\n' % json.dumps(perfherder, - sort_keys=True)) - -def _docheckout(ui, url, dest, upstream, revision, branch, purge, sharebase, - optimes, behaviors, networkattemptlimit, networkattempts=None, - sparse_profile=None): - if not networkattempts: - networkattempts = [1] - - def callself(): - return _docheckout(ui, url, dest, upstream, revision, branch, purge, - sharebase, optimes, behaviors, networkattemptlimit, - networkattempts=networkattempts, - sparse_profile=sparse_profile) - - @contextlib.contextmanager - def timeit(op, behavior): - behaviors.add(behavior) - errored = False - try: - start = time.time() - yield - except Exception: - errored = True - raise - finally: - elapsed = time.time() - start - - if errored: - op += '_errored' - - optimes.append((op, elapsed)) - - ui.write('ensuring %s@%s is available at %s\n' % (url, revision or branch, - dest)) - - # We assume that we're the only process on the machine touching the - # repository paths that we were told to use. This means our recovery - # scenario when things aren't "right" is to just nuke things and start - # from scratch. This is easier to implement than verifying the state - # of the data and attempting recovery. And in some scenarios (such as - # potential repo corruption), it is probably faster, since verifying - # repos can take a while. - - destvfs = getvfs()(dest, audit=False, realpath=True) - - def deletesharedstore(path=None): - storepath = path or destvfs.read('.hg/sharedpath').strip() - if storepath.endswith('.hg'): - storepath = os.path.dirname(storepath) - - storevfs = getvfs()(storepath, audit=False) - storevfs.rmtree(forcibly=True) - - if destvfs.exists() and not destvfs.exists('.hg'): - raise error.Abort('destination exists but no .hg directory') - - # Refuse to enable sparse checkouts on existing checkouts. The reasoning - # here is that another consumer of this repo may not be sparse aware. If we - # enabled sparse, we would lock them out. - if destvfs.exists() and sparse_profile and not destvfs.exists('.hg/sparse'): - raise error.Abort('cannot enable sparse profile on existing ' - 'non-sparse checkout', - hint='use a separate working directory to use sparse') - - # And the other direction for symmetry. - if not sparse_profile and destvfs.exists('.hg/sparse'): - raise error.Abort('cannot use non-sparse checkout on existing sparse ' - 'checkout', - hint='use a separate working directory to use sparse') - - # Require checkouts to be tied to shared storage because efficiency. - if destvfs.exists('.hg') and not destvfs.exists('.hg/sharedpath'): - ui.warn('(destination is not shared; deleting)\n') - with timeit('remove_unshared_dest', 'remove-wdir'): - destvfs.rmtree(forcibly=True) - - # Verify the shared path exists and is using modern pooled storage. - if destvfs.exists('.hg/sharedpath'): - storepath = destvfs.read('.hg/sharedpath').strip() - - ui.write('(existing repository shared store: %s)\n' % storepath) - - if not os.path.exists(storepath): - ui.warn('(shared store does not exist; deleting destination)\n') - with timeit('removed_missing_shared_store', 'remove-wdir'): - destvfs.rmtree(forcibly=True) - elif not re.search('[a-f0-9]{40}/\.hg$', storepath.replace('\\', '/')): - ui.warn('(shared store does not belong to pooled storage; ' - 'deleting destination to improve efficiency)\n') - with timeit('remove_unpooled_store', 'remove-wdir'): - destvfs.rmtree(forcibly=True) - - if destvfs.isfileorlink('.hg/wlock'): - ui.warn('(dest has an active working directory lock; assuming it is ' - 'left over from a previous process and that the destination ' - 'is corrupt; deleting it just to be sure)\n') - with timeit('remove_locked_wdir', 'remove-wdir'): - destvfs.rmtree(forcibly=True) - - def handlerepoerror(e): - if e.message == _('abandoned transaction found'): - ui.warn('(abandoned transaction found; trying to recover)\n') - repo = hg.repository(ui, dest) - if not repo.recover(): - ui.warn('(could not recover repo state; ' - 'deleting shared store)\n') - with timeit('remove_unrecovered_shared_store', 'remove-store'): - deletesharedstore() - - ui.warn('(attempting checkout from beginning)\n') - return callself() - - raise - - # At this point we either have an existing working directory using - # shared, pooled storage or we have nothing. - - def handlenetworkfailure(): - if networkattempts[0] >= networkattemptlimit: - raise error.Abort('reached maximum number of network attempts; ' - 'giving up\n') - - ui.warn('(retrying after network failure on attempt %d of %d)\n' % - (networkattempts[0], networkattemptlimit)) - - # Do a backoff on retries to mitigate the thundering herd - # problem. This is an exponential backoff with a multipler - # plus random jitter thrown in for good measure. - # With the default settings, backoffs will be: - # 1) 2.5 - 6.5 - # 2) 5.5 - 9.5 - # 3) 11.5 - 15.5 - backoff = (2 ** networkattempts[0] - 1) * 1.5 - jittermin = ui.configint('robustcheckout', 'retryjittermin', 1000) - jittermax = ui.configint('robustcheckout', 'retryjittermax', 5000) - backoff += float(random.randint(jittermin, jittermax)) / 1000.0 - ui.warn('(waiting %.2fs before retry)\n' % backoff) - time.sleep(backoff) - - networkattempts[0] += 1 - - def handlepullerror(e): - """Handle an exception raised during a pull. - - Returns True if caller should call ``callself()`` to retry. - """ - if isinstance(e, error.Abort): - if e.args[0] == _('repository is unrelated'): - ui.warn('(repository is unrelated; deleting)\n') - destvfs.rmtree(forcibly=True) - return True - elif e.args[0].startswith(_('stream ended unexpectedly')): - ui.warn('%s\n' % e.args[0]) - # Will raise if failure limit reached. - handlenetworkfailure() - return True - elif isinstance(e, ssl.SSLError): - # Assume all SSL errors are due to the network, as Mercurial - # should convert non-transport errors like cert validation failures - # to error.Abort. - ui.warn('ssl error: %s\n' % e) - handlenetworkfailure() - return True - elif isinstance(e, urllib2.URLError): - if isinstance(e.reason, socket.error): - ui.warn('socket error: %s\n' % e.reason) - handlenetworkfailure() - return True - else: - ui.warn('unhandled URLError; reason type: %s; value: %s' % ( - e.reason.__class__.__name__, e.reason)) - else: - ui.warn('unhandled exception during network operation; type: %s; ' - 'value: %s' % (e.__class__.__name__, e)) - - return False - - # Perform sanity checking of store. We may or may not know the path to the - # local store. It depends if we have an existing destvfs pointing to a - # share. To ensure we always find a local store, perform the same logic - # that Mercurial's pooled storage does to resolve the local store path. - cloneurl = upstream or url - - try: - clonepeer = hg.peer(ui, {}, cloneurl) - rootnode = peerlookup(clonepeer, '0') - except error.RepoLookupError: - raise error.Abort('unable to resolve root revision from clone ' - 'source') - except (error.Abort, ssl.SSLError, urllib2.URLError) as e: - if handlepullerror(e): - return callself() - raise - - if rootnode == nullid: - raise error.Abort('source repo appears to be empty') - - storepath = os.path.join(sharebase, hex(rootnode)) - storevfs = getvfs()(storepath, audit=False) - - if storevfs.isfileorlink('.hg/store/lock'): - ui.warn('(shared store has an active lock; assuming it is left ' - 'over from a previous process and that the store is ' - 'corrupt; deleting store and destination just to be ' - 'sure)\n') - if destvfs.exists(): - with timeit('remove_dest_active_lock', 'remove-wdir'): - destvfs.rmtree(forcibly=True) - - with timeit('remove_shared_store_active_lock', 'remove-store'): - storevfs.rmtree(forcibly=True) - - if storevfs.exists() and not storevfs.exists('.hg/requires'): - ui.warn('(shared store missing requires file; this is a really ' - 'odd failure; deleting store and destination)\n') - if destvfs.exists(): - with timeit('remove_dest_no_requires', 'remove-wdir'): - destvfs.rmtree(forcibly=True) - - with timeit('remove_shared_store_no_requires', 'remove-store'): - storevfs.rmtree(forcibly=True) - - if storevfs.exists('.hg/requires'): - requires = set(storevfs.read('.hg/requires').splitlines()) - # FUTURE when we require generaldelta, this is where we can check - # for that. - required = {'dotencode', 'fncache'} - - missing = required - requires - if missing: - ui.warn('(shared store missing requirements: %s; deleting ' - 'store and destination to ensure optimal behavior)\n' % - ', '.join(sorted(missing))) - if destvfs.exists(): - with timeit('remove_dest_missing_requires', 'remove-wdir'): - destvfs.rmtree(forcibly=True) - - with timeit('remove_shared_store_missing_requires', 'remove-store'): - storevfs.rmtree(forcibly=True) - - created = False - - if not destvfs.exists(): - # Ensure parent directories of destination exist. - # Mercurial 3.8 removed ensuredirs and made makedirs race safe. - if util.safehasattr(util, 'ensuredirs'): - makedirs = util.ensuredirs - else: - makedirs = util.makedirs - - makedirs(os.path.dirname(destvfs.base), notindexed=True) - makedirs(sharebase, notindexed=True) - - if upstream: - ui.write('(cloning from upstream repo %s)\n' % upstream) - - if not storevfs.exists(): - behaviors.add('create-store') - - try: - with timeit('clone', 'clone'): - shareopts = {'pool': sharebase, 'mode': 'identity'} - res = hg.clone(ui, {}, clonepeer, dest=dest, update=False, - shareopts=shareopts) - except (error.Abort, ssl.SSLError, urllib2.URLError) as e: - if handlepullerror(e): - return callself() - raise - except error.RepoError as e: - return handlerepoerror(e) - except error.RevlogError as e: - ui.warn('(repo corruption: %s; deleting shared store)\n' % e.message) - with timeit('remove_shared_store_revlogerror', 'remote-store'): - deletesharedstore() - return callself() - - # TODO retry here. - if res is None: - raise error.Abort('clone failed') - - # Verify it is using shared pool storage. - if not destvfs.exists('.hg/sharedpath'): - raise error.Abort('clone did not create a shared repo') - - created = True - - # The destination .hg directory should exist. Now make sure we have the - # wanted revision. - - repo = hg.repository(ui, dest) - - # We only pull if we are using symbolic names or the requested revision - # doesn't exist. - havewantedrev = False - - if revision: - try: - ctx = scmutil.revsingle(repo, revision) - except error.RepoLookupError: - ctx = None - - if ctx: - if not ctx.hex().startswith(revision): - raise error.Abort('--revision argument is ambiguous', - hint='must be the first 12+ characters of a ' - 'SHA-1 fragment') - - checkoutrevision = ctx.hex() - havewantedrev = True - - if not havewantedrev: - ui.write('(pulling to obtain %s)\n' % (revision or branch,)) - - remote = None - try: - remote = hg.peer(repo, {}, url) - pullrevs = [peerlookup(remote, revision or branch)] - checkoutrevision = hex(pullrevs[0]) - if branch: - ui.warn('(remote resolved %s to %s; ' - 'result is not deterministic)\n' % - (branch, checkoutrevision)) - - if checkoutrevision in repo: - ui.warn('(revision already present locally; not pulling)\n') - else: - with timeit('pull', 'pull'): - pullop = exchange.pull(repo, remote, heads=pullrevs) - if not pullop.rheads: - raise error.Abort('unable to pull requested revision') - except (error.Abort, ssl.SSLError, urllib2.URLError) as e: - if handlepullerror(e): - return callself() - raise - except error.RepoError as e: - return handlerepoerror(e) - except error.RevlogError as e: - ui.warn('(repo corruption: %s; deleting shared store)\n' % e.message) - deletesharedstore() - return callself() - finally: - if remote: - remote.close() - - # Now we should have the wanted revision in the store. Perform - # working directory manipulation. - - # Purge if requested. We purge before update because this way we're - # guaranteed to not have conflicts on `hg update`. - if purge and not created: - ui.write('(purging working directory)\n') - purgeext = extensions.find('purge') - - # Mercurial 4.3 doesn't purge files outside the sparse checkout. - # See https://bz.mercurial-scm.org/show_bug.cgi?id=5626. Force - # purging by monkeypatching the sparse matcher. - try: - old_sparse_fn = getattr(repo.dirstate, '_sparsematchfn', None) - if old_sparse_fn is not None: - assert supported_hg(), 'Mercurial version not supported (must be 4.3+)' - # TRACKING hg50 - # Arguments passed to `matchmod.always` were unused and have been removed - if util.versiontuple(n=2) >= (5, 0): - repo.dirstate._sparsematchfn = lambda: matchmod.always() - else: - repo.dirstate._sparsematchfn = lambda: matchmod.always(repo.root, '') - - with timeit('purge', 'purge'): - if purgeext.purge(ui, repo, all=True, abort_on_err=True, - # The function expects all arguments to be - # defined. - **{'print': None, - 'print0': None, - 'dirs': None, - 'files': None}): - raise error.Abort('error purging') - finally: - if old_sparse_fn is not None: - repo.dirstate._sparsematchfn = old_sparse_fn - - # Update the working directory. - - if repo[b'.'].node() == nullid: - behaviors.add('empty-wdir') - else: - behaviors.add('populated-wdir') - - if sparse_profile: - sparsemod = getsparse() - - # By default, Mercurial will ignore unknown sparse profiles. This could - # lead to a full checkout. Be more strict. - try: - repo.filectx(sparse_profile, changeid=checkoutrevision).data() - except error.ManifestLookupError: - raise error.Abort('sparse profile %s does not exist at revision ' - '%s' % (sparse_profile, checkoutrevision)) - - # TRACKING hg48 - parseconfig takes `action` param - if util.versiontuple(n=2) >= (4, 8): - old_config = sparsemod.parseconfig(repo.ui, repo.vfs.tryread('sparse'), 'sparse') - else: - old_config = sparsemod.parseconfig(repo.ui, repo.vfs.tryread('sparse')) - - old_includes, old_excludes, old_profiles = old_config - - if old_profiles == {sparse_profile} and not old_includes and not \ - old_excludes: - ui.write('(sparse profile %s already set; no need to update ' - 'sparse config)\n' % sparse_profile) - else: - if old_includes or old_excludes or old_profiles: - ui.write('(replacing existing sparse config with profile ' - '%s)\n' % sparse_profile) - else: - ui.write('(setting sparse config to profile %s)\n' % - sparse_profile) - - # If doing an incremental update, this will perform two updates: - # one to change the sparse profile and another to update to the new - # revision. This is not desired. But there's not a good API in - # Mercurial to do this as one operation. - with repo.wlock(), timeit('sparse_update_config', - 'sparse-update-config'): - fcounts = map(len, sparsemod._updateconfigandrefreshwdir( - repo, [], [], [sparse_profile], force=True)) - - repo.ui.status('%d files added, %d files dropped, ' - '%d files conflicting\n' % tuple(fcounts)) - - ui.write('(sparse refresh complete)\n') - - op = 'update_sparse' if sparse_profile else 'update' - behavior = 'update-sparse' if sparse_profile else 'update' - - with timeit(op, behavior): - if commands.update(ui, repo, rev=checkoutrevision, clean=True): - raise error.Abort('error updating') - - ui.write('updated to %s\n' % checkoutrevision) - - return None - - -def extsetup(ui): - # Ensure required extensions are loaded. - for ext in ('purge', 'share'): - try: - extensions.find(ext) - except KeyError: - extensions.load(ui, ext, None) - - purgemod = extensions.find('purge') - extensions.wrapcommand(purgemod.cmdtable, 'purge', purgewrapper) diff --git a/taskcluster/run-task/run-task b/taskcluster/run-task/run-task deleted file mode 100755 index d60bd71..0000000 --- a/taskcluster/run-task/run-task +++ /dev/null @@ -1,1001 +0,0 @@ -#!/usr/bin/python3 -u -# This Source Code Form is subject to the terms of the Mozilla Public -# License, v. 2.0. If a copy of the MPL was not distributed with this -# file, You can obtain one at http://mozilla.org/MPL/2.0/. - -"""Run a task after performing common actions. - -This script is meant to be the "driver" for TaskCluster based tasks. -It receives some common arguments to control the run-time environment. - -It performs actions as requested from the arguments. Then it executes -the requested process and prints its output, prefixing it with the -current time to improve log usefulness. -""" - -import sys -from typing import Optional - -if sys.version_info[0:2] < (3, 5): - print('run-task requires Python 3.5+') - sys.exit(1) - - -import argparse -import datetime -import errno -import io -import json -import os -from pathlib import Path -import re -import shutil -import socket -import stat -import subprocess - -import urllib.error -import urllib.request - - -SECRET_BASEURL_TPL = 'http://taskcluster/secrets/v1/secret/{}' -FINGERPRINT_URL = SECRET_BASEURL_TPL.format('project/taskcluster/gecko/hgfingerprint') -FALLBACK_FINGERPRINT = { - 'fingerprints': - "sha256:17:38:aa:92:0b:84:3e:aa:8e:52:52:e9:4c:2f:98:a9:0e:bf:6c:3e:e9" - ":15:ff:0a:29:80:f7:06:02:5b:e8:48"} - -GITHUB_SSH_FINGERPRINT = ( - b"github.com ssh-rsa " - b"AAAAB3NzaC1yc2EAAAABIwAAAQEAq2A7hRGmdnm9tUDbO9IDSwBK6TbQa+PXYPCPy6rbTrTtw7PHkcc" - b"Krpp0yVhp5HdEIcKr6pLlVDBfOLX9QUsyCOV0wzfjIJNlGEYsdlLJizHhbn2mUjvSAHQqZETYP81eFz" - b"LQNnPHt4EVVUh7VfDESU84KezmD5QlWpXLmvU31/yMf+Se8xhHTvKSCZIFImWwoG6mbUoWf9nzpIoaS" - b"jB+weqqUUmpaaasXVal72J+UX2B+2RPW3RcT0eOzQgqlJL3RKrTJvdsjE3JEAvGq3lGHSZXy28G3sku" - b"a2SmVi/w4yCE6gbODqnTWlg7+wC604ydGXA8VJiS5ap43JXiUFFAaQ==\n" -) - - -CACHE_UID_GID_MISMATCH = ''' -There is a UID/GID mismatch on the cache. This likely means: - -a) different tasks are running as a different user/group -b) different Docker images have different UID/GID for the same user/group - -Our cache policy is that the UID/GID for ALL tasks must be consistent -for the lifetime of the cache. This eliminates permissions problems due -to file/directory user/group ownership. - -To make this error go away, ensure that all Docker images are use -a consistent UID/GID and that all tasks using this cache are running as -the same user/group. -''' - - -NON_EMPTY_VOLUME = ''' -error: volume %s is not empty - -Our Docker image policy requires volumes to be empty. - -The volume was likely populated as part of building the Docker image. -Change the Dockerfile and anything run from it to not create files in -any VOLUME. - -A lesser possibility is that you stumbled upon a TaskCluster platform bug -where it fails to use new volumes for tasks. -''' - - -FETCH_CONTENT_NOT_FOUND = ''' -error: fetch-content script not found - -The script at `taskcluster/scripts/misc/fetch-content` could not be -detected in the current environment. -''' - -# The exit code to use when caches should be purged and the task retried. -# This is EX_OSFILE (from sysexits.h): -# Some system file does not exist, cannot be opened, or has some -# sort of error (e.g., syntax error). -EXIT_PURGE_CACHE = 72 - - -IS_MACOSX = sys.platform == 'darwin' -IS_POSIX = os.name == 'posix' -IS_WINDOWS = os.name == 'nt' - - -def print_line(prefix, m): - now = datetime.datetime.utcnow().isoformat().encode('utf-8') - # slice microseconds to 3 decimals. - now = now[:-3] if now[-7:-6] == b'.' else now - sys.stdout.buffer.write(b'[%s %sZ] %s' % (prefix, now, m)) - sys.stdout.buffer.flush() - - -def run_required_command(prefix, args, *, extra_env=None, cwd=None): - res = run_command(prefix, args, extra_env=extra_env, cwd=cwd) - if res: - sys.exit(res) - - -def run_command(prefix, args, *, extra_env=None, cwd=None): - """Runs a process and prefixes its output with the time. - - Returns the process exit code. - """ - print_line(prefix, b'executing %r\n' % args) - - env = dict(os.environ) - env.update(extra_env or {}) - - # Note: TaskCluster's stdin is a TTY. This attribute is lost - # when we pass sys.stdin to the invoked process. If we cared - # to preserve stdin as a TTY, we could make this work. But until - # someone needs it, don't bother. - - # We want stdout to be bytes on Python 3. That means we can't use - # universal_newlines=True (because it implies text mode). But - # p.stdout.readline() won't work for bytes text streams. So, on Python 3, - # we manually install a latin1 stream wrapper. This allows us to readline() - # and preserves bytes, without losing any data. - - p = subprocess.Popen(args, - # Disable buffering because we want to receive output - # as it is generated so timestamps in logs are - # accurate. - bufsize=0, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - stdin=sys.stdin.fileno(), - cwd=cwd, - env=env) - - stdout = io.TextIOWrapper(p.stdout, encoding='latin1') - - while True: - data = stdout.readline().encode('latin1') - - if data == b'': - break - - print_line(prefix, data) - - return p.wait() - - -def get_posix_user_group(user, group): - import grp - import pwd - - try: - user_record = pwd.getpwnam(user) - except KeyError: - print('could not find user %s; specify a valid user with --user' % user) - sys.exit(1) - - try: - group_record = grp.getgrnam(group) - except KeyError: - print('could not find group %s; specify a valid group with --group' % - group) - sys.exit(1) - - # Most tasks use worker:worker. We require they have a specific numeric ID - # because otherwise it is too easy for files written to caches to have - # mismatched numeric IDs, which results in permissions errors. - if user_record.pw_name == 'worker' and user_record.pw_uid != 1000: - print('user `worker` must have uid=1000; got %d' % user_record.pw_uid) - sys.exit(1) - - if group_record.gr_name == 'worker' and group_record.gr_gid != 1000: - print('group `worker` must have gid=1000; got %d' % group_record.gr_gid) - sys.exit(1) - - # Find all groups to which this user is a member. - gids = [g.gr_gid for g in grp.getgrall() if group in g.gr_mem] - - return user_record, group_record, gids - - -def write_audit_entry(path, msg): - now = datetime.datetime.utcnow().isoformat().encode('utf-8') - with open(path, 'ab') as fh: - fh.write(b'[%sZ %s] %s\n' % ( - now, os.environb.get(b'TASK_ID', b'UNKNOWN'), msg)) - - -WANTED_DIR_MODE = stat.S_IXUSR | stat.S_IRUSR | stat.S_IWUSR - - -def set_dir_permissions(path, uid, gid): - st = os.lstat(path) - - if st.st_uid != uid or st.st_gid != gid: - os.chown(path, uid, gid) - - # Also make sure dirs are writable in case we need to delete - # them. - if st.st_mode & WANTED_DIR_MODE != WANTED_DIR_MODE: - os.chmod(path, st.st_mode | WANTED_DIR_MODE) - - -def chown_recursive(path, user, group, uid, gid): - print_line(b'chown', - b'recursively changing ownership of %s to %s:%s\n' % - (path.encode('utf-8'), user.encode('utf-8'), group.encode( - 'utf-8'))) - - set_dir_permissions(path, uid, gid) - - for root, dirs, files in os.walk(path): - for d in dirs: - set_dir_permissions(os.path.join(root, d), uid, gid) - - for f in files: - # File may be a symlink that points to nowhere. In which case - # os.chown() would fail because it attempts to follow the - # symlink. We only care about directory entries, not what - # they point to. So setting the owner of the symlink should - # be sufficient. - os.lchown(os.path.join(root, f), uid, gid) - - -def configure_cache_posix(cache, user, group, - untrusted_caches, running_as_root): - """Configure a cache path on POSIX platforms. - - For each cache, we write out a special file denoting attributes and - capabilities of run-task and the task being executed. These attributes - are used by subsequent run-task invocations to validate that use of - the cache is acceptable. - - We /could/ blow away the cache data on requirements mismatch. - While this would be convenient, this could result in "competing" tasks - effectively undoing the other's work. This would slow down task - execution in aggregate. Without monitoring for this, people may not notice - the problem and tasks would be slower than they could be. We follow the - principle of "fail fast" to ensure optimal task execution. - - We also write an audit log of who used the caches. This log is printed - during failures to help aid debugging. - """ - - our_requirements = { - # Include a version string that we can bump whenever to trigger - # fresh caches. The actual value is not relevant and doesn't need - # to follow any explicit order. Since taskgraph bakes this file's - # hash into cache names, any change to this file/version is sufficient - # to force the use of a new cache. - b'version=1', - # Include the UID and GID the task will run as to ensure that tasks - # with different UID and GID don't share the same cache. - b'uid=%d' % user.pw_uid, - b'gid=%d' % group.gr_gid, - } - - requires_path = os.path.join(cache, '.cacherequires') - audit_path = os.path.join(cache, '.cachelog') - - # The cache is empty. Configure it. - if not os.listdir(cache): - print_line(b'cache', b'cache %s is empty; writing requirements: ' - b'%s\n' % ( - cache.encode('utf-8'), b' '.join(sorted(our_requirements)))) - - # We write a requirements file so future invocations know what the - # requirements are. - with open(requires_path, 'wb') as fh: - fh.write(b'\n'.join(sorted(our_requirements))) - - # And make it read-only as a precaution against deletion. - os.chmod(requires_path, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) - - write_audit_entry(audit_path, - b'created; requirements: %s' % - b', '.join(sorted(our_requirements))) - - set_dir_permissions(cache, user.pw_uid, group.gr_gid) - return - - # The cache has content and we have a requirements file. Validate - # requirements alignment. - if os.path.exists(requires_path): - with open(requires_path, 'rb') as fh: - wanted_requirements = set(fh.read().splitlines()) - - print_line(b'cache', b'cache %s exists; requirements: %s\n' % ( - cache.encode('utf-8'), b' '.join(sorted(wanted_requirements)))) - - missing = wanted_requirements - our_requirements - - # Allow requirements mismatch for uid/gid if and only if caches - # are untrusted. This allows cache behavior on Try to be - # reasonable. Otherwise, random tasks could "poison" cache - # usability by introducing uid/gid mismatches. For untrusted - # environments like Try, this is a perfectly reasonable thing to - # allow. - if missing and untrusted_caches and running_as_root and \ - all(s.startswith((b'uid=', b'gid=')) for s in missing): - print_line(b'cache', - b'cache %s uid/gid mismatch; this is acceptable ' - b'because caches for this task are untrusted; ' - b'changing ownership to facilitate cache use\n' % - cache.encode('utf-8')) - chown_recursive(cache, user.pw_name, group.gr_name, user.pw_uid, - group.gr_gid) - - # And write out the updated reality. - with open(requires_path, 'wb') as fh: - fh.write(b'\n'.join(sorted(our_requirements))) - - write_audit_entry(audit_path, - b'chown; requirements: %s' % - b', '.join(sorted(our_requirements))) - - elif missing: - print('error: requirements for populated cache %s differ from ' - 'this task' % cache) - print('cache requirements: %s' % ' '.join(sorted( - s.decode('utf-8') for s in wanted_requirements))) - print('our requirements: %s' % ' '.join(sorted( - s.decode('utf-8') for s in our_requirements))) - if any(s.startswith((b'uid=', b'gid=')) for s in missing): - print(CACHE_UID_GID_MISMATCH) - - write_audit_entry(audit_path, - b'requirements mismatch; wanted: %s' % - b', '.join(sorted(our_requirements))) - - print('') - print('audit log:') - with open(audit_path, 'r') as fh: - print(fh.read()) - - return True - else: - write_audit_entry(audit_path, b'used') - - # We don't need to adjust permissions here because the cache is - # associated with a uid/gid and the first task should have set - # a proper owner/group. - - return - - # The cache has content and no requirements file. This shouldn't - # happen because run-task should be the first thing that touches a - # cache. - print('error: cache %s is not empty and is missing a ' - '.cacherequires file; the cache names for this task are ' - 'likely mis-configured or TASKCLUSTER_CACHES is not set ' - 'properly' % cache) - - write_audit_entry(audit_path, b'missing .cacherequires') - return True - - -def configure_volume_posix(volume, user, group, running_as_root): - # The only time we should see files in the volume is if the Docker - # image build put files there. - # - # For the sake of simplicity, our policy is that volumes should be - # empty. This also has the advantage that an empty volume looks - # a lot like an empty cache. Tasks can rely on caches being - # swapped in and out on any volume without any noticeable change - # of behavior. - volume_files = os.listdir(volume) - if volume_files: - print(NON_EMPTY_VOLUME % volume) - print('entries in root directory: %s' % - ' '.join(sorted(volume_files))) - sys.exit(1) - - # The volume is almost certainly owned by root:root. Chown it so it - # is writable. - - if running_as_root: - print_line(b'volume', b'changing ownership of volume %s ' - b'to %d:%d\n' % (volume.encode('utf-8'), - user.pw_uid, - group.gr_gid)) - set_dir_permissions(volume, user.pw_uid, group.gr_gid) - - -def git_checkout( - destination_path: str, - head_repo: str, - base_repo: Optional[str], - ref: Optional[str], - commit: Optional[str], - ssh_key_file: Optional[Path], - ssh_known_hosts_file: Optional[Path] -): - env = {'PYTHONUNBUFFERED': '1'} - - if ssh_key_file and ssh_known_hosts_file: - if not ssh_key_file.exists(): - raise RuntimeError("Can't find specified ssh_key file.") - if not ssh_known_hosts_file.exists(): - raise RuntimeError("Can't find specified known_hosts file.") - env['GIT_SSH_COMMAND'] = " ".join([ - "ssh", - "-oIdentityFile={}".format(ssh_key_file.as_posix()), - "-oStrictHostKeyChecking=yes", - "-oUserKnownHostsFile={}".format(ssh_known_hosts_file.as_posix()), - ]) - elif ssh_key_file or ssh_known_hosts_file: - raise RuntimeError( - 'Must specify both ssh_key_file and ssh_known_hosts_file, if either are specified', - ) - - if not os.path.exists(destination_path): - # Repository doesn't already exist, needs to be cloned - args = [ - 'git', - 'clone', - base_repo if base_repo else head_repo, - destination_path, - ] - - run_required_command(b'vcs', args, extra_env=env) - - # If a ref isn't provided, we fetch all refs from head_repo, which may be slow - args = [ - 'git', - 'fetch', - head_repo, - ref if ref else '+refs/heads/*:refs/remotes/work/*' - ] - - run_required_command(b'vcs', args, cwd=destination_path, extra_env=env) - - args = [ - 'git', - 'checkout', - '-f', - commit if commit else ref - ] - - run_required_command(b'vcs', args, cwd=destination_path) - - args = [ - 'git', - 'clean', - # Two -f`s causes subdirectories with `.git` directories to be removed - # as well. - '-xdff', - ] - - run_required_command(b'vcs', args, cwd=destination_path) - - args = [ - 'git', - 'rev-parse', - '--verify', - 'HEAD' - ] - - commit_hash = subprocess.check_output( - args, cwd=destination_path, universal_newlines=True - ).strip() - assert re.match('^[a-f0-9]{40}$', commit_hash) - - if head_repo.startswith('https://github.com'): - if head_repo.endswith('/'): - head_repo = head_repo[:-1] - - tinderbox_link = '{}/commit/{}'.format(head_repo, commit_hash) - repo_name = head_repo.split('/')[-1] - else: - tinderbox_link = head_repo - repo_name = head_repo - - msg = ("TinderboxPrint:" - "{commit_hash}\n".format(commit_hash=commit_hash, - link=tinderbox_link, - name=repo_name)) - - print_line(b'vcs', msg.encode('utf-8')) - - return commit_hash - - -def fetch_hgfingerprint(): - try: - print_line(b'vcs', b'fetching hg.mozilla.org fingerprint from %s\n' % - FINGERPRINT_URL.encode('utf-8')) - res = urllib.request.urlopen(FINGERPRINT_URL, timeout=10) - secret = res.read() - try: - secret = json.loads(secret.decode('utf-8')) - except ValueError: - print_line(b'vcs', b'invalid JSON in hg fingerprint secret') - sys.exit(1) - except (urllib.error.URLError, socket.timeout): - print_line(b'vcs', b'Unable to retrieve current hg.mozilla.org fingerprint' - b'using the secret service, using fallback instead.') - # XXX This fingerprint will not be accurate if running on an old - # revision after the server fingerprint has changed. - secret = {'secret': FALLBACK_FINGERPRINT} - - return secret['secret']['fingerprints'] - - -def fetch_ssh_secret(secret_name): - """ Retrieves the private ssh key, and returns it as a StringIO object - """ - secret_url = SECRET_BASEURL_TPL.format(secret_name) - try: - print_line(b'vcs', b'fetching secret %s from %s\n' % - (secret_name.encode('utf-8'), secret_url.encode('utf-8'))) - res = urllib.request.urlopen(secret_url, timeout=10) - secret = res.read() - try: - secret = json.loads(secret.decode('utf-8')) - except ValueError: - print_line(b'vcs', b'invalid JSON in secret') - sys.exit(1) - except (urllib.error.URLError, socket.timeout): - print_line(b'vcs', b'Unable to retrieve ssh secret. aborting...') - sys.exit(1) - - return secret['secret']['ssh_privkey'] - - -def hg_checkout( - destination_path: str, - head_repo: str, - base_repo: Optional[str], - store_path: str, - hgmo_fingerprint: str, - sparse_profile: Optional[str], - branch: Optional[str], - revision: Optional[str] -): - if IS_MACOSX: - hg_bin = '/tools/python27-mercurial/bin/hg' - elif IS_POSIX: - hg_bin = 'hg' - elif IS_WINDOWS: - # This is where OCC installs it in the AMIs. - hg_bin = r'C:\Program Files\Mercurial\hg.exe' - if not os.path.exists(hg_bin): - print('could not find Mercurial executable: %s' % hg_bin) - sys.exit(1) - else: - raise RuntimeError('Must be running on mac, posix or windows') - - args = [ - hg_bin, - 'robustcheckout', - '--sharebase', store_path, - '--purge', - ] - - # Obtain certificate fingerprints. Without this, the checkout will use the fingerprint - # on the system, which is managed some other way (such as puppet) - if hgmo_fingerprint: - try: - print_line(b'vcs', b'fetching hg.mozilla.org fingerprint from %s\n' % - FINGERPRINT_URL.encode('utf-8')) - res = urllib.request.urlopen(FINGERPRINT_URL, timeout=10) - secret = res.read() - try: - secret = json.loads(secret.decode('utf-8')) - except ValueError: - print_line(b'vcs', b'invalid JSON in hg fingerprint secret') - sys.exit(1) - except (urllib.error.URLError, socket.timeout): - print_line(b'vcs', b'Unable to retrieve current hg.mozilla.org fingerprint' - b'using the secret service, using fallback instead.') - # XXX This fingerprint will not be accurate if running on an old - # revision after the server fingerprint has changed. - secret = {'secret': FALLBACK_FINGERPRINT} - - hgmo_fingerprint = secret['secret']['fingerprints'] - args.extend([ - '--config', 'hostsecurity.hg.mozilla.org:fingerprints=%s' % hgmo_fingerprint, - ]) - - if base_repo: - args.extend(['--upstream', base_repo]) - if sparse_profile: - args.extend(['--sparseprofile', sparse_profile]) - - # Specify method to checkout a revision. This defaults to revisions as - # SHA-1 strings, but also supports symbolic revisions like `tip` via the - # branch flag. - args.extend([ - '--branch' if branch else '--revision', - branch or revision, - head_repo, destination_path, - ]) - - run_required_command(b'vcs', args, extra_env={'PYTHONUNBUFFERED': '1'}) - - # Update the current revision hash and ensure that it is well formed. - revision = subprocess.check_output( - [hg_bin, 'log', - '--rev', '.', - '--template', '{node}'], - cwd=destination_path, - # Triggers text mode on Python 3. - universal_newlines=True) - - assert re.match('^[a-f0-9]{40}$', revision) - - msg = ("TinderboxPrint:" - "{revision}\n".format(revision=revision, - head_repo=head_repo, - repo_name=head_repo.split('/')[-1])) - - print_line(b'vcs', msg.encode('utf-8')) - - return revision - - -def fetch_artifacts(): - print_line(b'fetches', b'fetching artifacts\n') - - fetch_content = shutil.which('fetch-content') - - if not fetch_content or not os.path.isfile(fetch_content): - print(FETCH_CONTENT_NOT_FOUND) - sys.exit(1) - - cmd = [fetch_content, 'task-artifacts'] - subprocess.run(cmd, check=True, env=os.environ) - print_line(b'fetches', b'finished fetching artifacts\n') - - -def add_vcs_arguments(parser, project, name): - """Adds arguments to ArgumentParser to control VCS options for a project.""" - - parser.add_argument('--%s-checkout' % project, - help='Directory where %s checkout should be created' % - name) - parser.add_argument('--%s-sparse-profile' % project, - help='Path to sparse profile for %s checkout' % name) - - -def collect_vcs_options(args, project, name): - checkout = getattr(args, '%s_checkout' % project) - sparse_profile = getattr(args, '%s_sparse_profile' % project) - - env_prefix = project.upper() - - repo_type = os.environ.get('%s_REPOSITORY_TYPE' % env_prefix) - base_repo = os.environ.get('%s_BASE_REPOSITORY' % env_prefix) - head_repo = os.environ.get('%s_HEAD_REPOSITORY' % env_prefix) - revision = os.environ.get('%s_HEAD_REV' % env_prefix) - ref = os.environ.get('%s_HEAD_REF' % env_prefix) - private_key_secret = os.environ.get('%s_SSH_SECRET_NAME' % env_prefix) - - store_path = os.environ.get('HG_STORE_PATH') - - # Expand ~ in some paths. - if checkout: - checkout = os.path.abspath(os.path.expanduser(checkout)) - if store_path: - store_path = os.path.abspath(os.path.expanduser(store_path)) - - # Some callers set the base repository to mozilla-central for historical - # reasons. Switch to mozilla-unified because robustcheckout works best - # with it. - if base_repo == 'https://hg.mozilla.org/mozilla-central': - base_repo = 'https://hg.mozilla.org/mozilla-unified' - - return { - 'store-path': store_path, - 'project': project, - 'name': name, - 'env-prefix': env_prefix, - 'checkout': checkout, - 'sparse-profile': sparse_profile, - 'base-repo': base_repo, - 'head-repo': head_repo, - 'revision': revision, - 'ref': ref, - 'repo-type': repo_type, - 'ssh-secret-name': private_key_secret, - } - - -def vcs_checkout_from_args(options, *, hgmo_fingerprint): - - if not options['checkout']: - if options['ref'] and not options['revision']: - print('task should be defined in terms of non-symbolic revision') - sys.exit(1) - return - - revision = options['revision'] - ref = options['ref'] - ssh_key_file = None - ssh_known_hosts_file = None - ssh_dir = None - - try: - if options.get('ssh-secret-name'): - ssh_dir = Path("~/.ssh-run-task").expanduser() - os.makedirs(ssh_dir, 0o700) - ssh_key_file = ssh_dir.joinpath('private_ssh_key') - ssh_key = fetch_ssh_secret(options['ssh-secret-name']) - # We don't use write_text here, to avoid \n -> \r\n on windows - ssh_key_file.write_bytes(ssh_key.encode("ascii")) - ssh_key_file.chmod(0o600) - # TODO: We should pull this from a secret, so it can be updated on old trees - ssh_known_hosts_file = ssh_dir.joinpath("known_hosts") - ssh_known_hosts_file.write_bytes(GITHUB_SSH_FINGERPRINT) - - if options['repo-type'] == 'git': - if not revision and not ref: - raise RuntimeError('Git requires that either a ref, a revision, or both are provided') - - if not ref: - print('Providing a ref will improve the performance of this checkout') - - revision = git_checkout( - options['checkout'], - options['head-repo'], - options['base-repo'], - ref, - revision, - ssh_key_file, - ssh_known_hosts_file, - ) - elif options['repo-type'] == 'hg': - if not revision and not ref: - raise RuntimeError('Hg requires that at least one of a ref or revision ' - 'is provided') - - revision = hg_checkout( - options['checkout'], - options['head-repo'], - options['base-repo'], - options['store-path'], - hgmo_fingerprint, - options['sparse-profile'], - ref, - revision - ) - else: - raise RuntimeError('Type of VCS must be either "git" or "hg"') - finally: - if ssh_dir: - shutil.rmtree(ssh_dir, ignore_errors=True) - pass - - os.environ['%s_HEAD_REV' % options['env-prefix']] = revision - - -def main(args): - print_line(b'setup', b'run-task started in %s\n' % os.getcwd().encode('utf-8')) - running_as_root = IS_POSIX and os.getuid() == 0 - - # Arguments up to '--' are ours. After are for the main task - # to be executed. - try: - i = args.index('--') - our_args = args[0:i] - task_args = args[i + 1:] - except ValueError: - our_args = args - task_args = [] - - parser = argparse.ArgumentParser() - parser.add_argument('--user', default='worker', help='user to run as') - parser.add_argument('--group', default='worker', help='group to run as') - parser.add_argument('--task-cwd', help='directory to run the provided command in') - - repositories = os.environ.get('REPOSITORIES') - if repositories: - repositories = json.loads(repositories) - else: - repositories = {'vcs': "repository"} - - for repository, name in repositories.items(): - add_vcs_arguments(parser, repository, name) - - parser.add_argument('--fetch-hgfingerprint', action='store_true', - help='Fetch the latest hgfingerprint from the secrets store, ' - 'using the taskclusterProxy') - - args = parser.parse_args(our_args) - - repositories = [ - collect_vcs_options(args, repository, name) for (repository, name) in repositories.items() - ] - # Sort repositories so that parent checkout paths come before children - repositories.sort(key=lambda repo: Path(repo['checkout'] or "/").parts) - - uid = gid = gids = None - if IS_POSIX and running_as_root: - user, group, gids = get_posix_user_group(args.user, args.group) - uid = user.pw_uid - gid = group.gr_gid - - if running_as_root and os.path.exists("/dev/kvm"): - # Ensure kvm permissions for worker, required for Android x86 - st = os.stat("/dev/kvm") - os.chmod("/dev/kvm", st.st_mode | 0o666) - - # Validate caches. - # - # Taskgraph should pass in a list of paths that are caches via an - # environment variable (which we don't want to pass down to child - # processes). - - if 'TASKCLUSTER_CACHES' in os.environ: - caches = os.environ['TASKCLUSTER_CACHES'].split(';') - del os.environ['TASKCLUSTER_CACHES'] - else: - caches = [] - - if 'TASKCLUSTER_UNTRUSTED_CACHES' in os.environ: - untrusted_caches = True - del os.environ['TASKCLUSTER_UNTRUSTED_CACHES'] - else: - untrusted_caches = False - - for cache in caches: - if not os.path.isdir(cache): - print('error: cache %s is not a directory; this should never ' - 'happen' % cache) - return 1 - - purge = configure_cache_posix(cache, user, group, untrusted_caches, - running_as_root) - - if purge: - return EXIT_PURGE_CACHE - - if 'TASKCLUSTER_VOLUMES' in os.environ: - volumes = os.environ['TASKCLUSTER_VOLUMES'].split(';') - del os.environ['TASKCLUSTER_VOLUMES'] - else: - volumes = [] - - if volumes and not IS_POSIX: - print('assertion failed: volumes not expected on Windows') - return 1 - - # Sanitize volumes. - for volume in volumes: - # If a volume is a cache, it was dealt with above. - if volume in caches: - print_line(b'volume', b'volume %s is a cache\n' % - volume.encode('utf-8')) - continue - - configure_volume_posix(volume, user, group, running_as_root) - - all_caches_and_volumes = set(map(os.path.normpath, caches)) - all_caches_and_volumes |= set(map(os.path.normpath, volumes)) - - def path_in_cache_or_volume(path): - path = os.path.normpath(path) - - while path: - if path in all_caches_and_volumes: - return True - - path, child = os.path.split(path) - if not child: - break - - return False - - def prepare_checkout_dir(checkout): - if not checkout: - return - - # The checkout path becomes the working directory. Since there are - # special cache files in the cache's root directory and working - # directory purging could blow them away, disallow this scenario. - if os.path.exists(os.path.join(checkout, '.cacherequires')): - print('error: cannot perform vcs checkout into cache root: %s' % - checkout) - sys.exit(1) - - # TODO given the performance implications, consider making this a fatal - # error. - if not path_in_cache_or_volume(checkout): - print_line(b'vcs', b'WARNING: vcs checkout path (%s) not in cache ' - b'or volume; performance will likely suffer\n' % - checkout.encode('utf-8')) - - # Ensure the directory for the source checkout exists. - try: - os.makedirs(os.path.dirname(checkout)) - except OSError as e: - if e.errno != errno.EEXIST: - raise - - # And that it is owned by the appropriate user/group. - if running_as_root: - os.chown(os.path.dirname(checkout), uid, gid) - - def prepare_hg_store_path(): - # And ensure the shared store path exists and has proper permissions. - if 'HG_STORE_PATH' not in os.environ: - print('error: HG_STORE_PATH environment variable not set') - sys.exit(1) - - store_path = os.environ['HG_STORE_PATH'] - - if not path_in_cache_or_volume(store_path): - print_line(b'vcs', b'WARNING: HG_STORE_PATH (%s) not in cache or ' - b'volume; performance will likely suffer\n' % - store_path.encode('utf-8')) - - try: - os.makedirs(store_path) - except OSError as e: - if e.errno != errno.EEXIST: - raise - - if running_as_root: - os.chown(store_path, uid, gid) - - repository_paths = [ - Path(repo["checkout"]) for repo in repositories if repo["checkout"] - ] - for repo in repositories: - if not repo['checkout']: - continue - parents = Path(repo['checkout']).parents - if any((path in repository_paths) for path in parents): - # Skip creating any checkouts that are inside other checokuts - continue - prepare_checkout_dir(repo['checkout']) - - hgmo_fingerprint = None - if any( - repo['checkout'] and repo['repo-type'] == 'hg' for repo in repositories - ): - prepare_hg_store_path() - if args.fetch_hgfingerprint: - hgmo_fingerprint = fetch_hgfingerprint() - - if IS_POSIX and running_as_root: - # Drop permissions to requested user. - # This code is modeled after what `sudo` was observed to do in a Docker - # container. We do not bother calling setrlimit() because containers have - # their own limits. - print_line(b'setup', b'running as %s:%s\n' % ( - args.user.encode('utf-8'), args.group.encode('utf-8'))) - - os.setgroups(gids) - os.umask(0o22) - os.setresgid(gid, gid, gid) - os.setresuid(uid, uid, uid) - - for repo in repositories: - vcs_checkout_from_args(repo, hgmo_fingerprint=hgmo_fingerprint) - - try: - for k in ['MOZ_FETCHES_DIR', 'UPLOAD_DIR'] + [ - '%_PATH'.format(repository['project'].upper()) for repository in repositories - ]: - if k in os.environ: - os.environ[k] = os.path.abspath(os.environ[k]) - print_line(b'setup', b'%s is %s\n' % ( - k.encode('utf-8'), - os.environ[k].encode('utf-8'))) - - if 'MOZ_FETCHES' in os.environ: - fetch_artifacts() - - return run_command(b'task', task_args, cwd=args.task_cwd) - finally: - fetches_dir = os.environ.get('MOZ_FETCHES_DIR') - if fetches_dir and os.path.isdir(fetches_dir): - print_line(b'fetches', b'removing %s\n' % fetches_dir.encode('utf-8')) - shutil.rmtree(fetches_dir) - print_line(b'fetches', b'finished\n') - - -if __name__ == '__main__': - sys.exit(main(sys.argv[1:]))