Skip to content

Commit

Permalink
Making conda package population independent from package builds
Browse files Browse the repository at this point in the history
Closes #257
  • Loading branch information
costrouc committed Mar 9, 2022
1 parent cc70628 commit adc5305
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 76 deletions.
9 changes: 6 additions & 3 deletions conda-store-server/conda_store_server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ def conda_store_validate_specification(
specification.dependencies = conda_store.conda_default_packages.copy()

# CondaStore.conda_included_packages
dependency_names = set(MatchSpec(_).name for _ in specification.dependencies)
dependency_names = set(
MatchSpec(_).name for _ in specification.dependencies if isinstance(_, str)
)
for package in set(conda_store.conda_included_packages) - dependency_names:
specification.dependencies.append(package)

Expand All @@ -45,7 +47,9 @@ def conda_store_validate_specification(
)

# validate that required conda package are in specification
dependency_names = set(MatchSpec(_).name for _ in specification.dependencies)
dependency_names = set(
MatchSpec(_).name for _ in specification.dependencies if isinstance(_, str)
)
if not (set(conda_store.conda_required_packages) <= dependency_names):
raise ValueError(
f"Conda packages {conda_store.conda_required_packages - dependency_names} required and missing from specification"
Expand Down Expand Up @@ -395,7 +399,6 @@ def create_build(self, environment_id: int, specification_sha256: str):

(
tasks.task_update_storage_metrics.si()
| tasks.task_update_conda_channels.si()
| tasks.task_build_conda_environment.si(build.id)
| group(*artifact_tasks)
| tasks.task_update_storage_metrics.si()
Expand Down
42 changes: 12 additions & 30 deletions conda-store-server/conda_store_server/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import tempfile
import traceback

from sqlalchemy import and_
import yaml

from conda_store_server import api, conda, orm, utils, schema
Expand All @@ -34,47 +33,30 @@ def set_build_failed(conda_store, build, logs):


def set_build_completed(conda_store, build, logs, packages):
unique_channel_urls = {p["base_url"] for p in packages}

channel_url_to_id_map = {}
for channel in unique_channel_urls:
for package in packages:
channel = package["channel_id"]
if channel == "https://conda.anaconda.org/pypi":
# ignore pypi packages
# ignore pypi package for now
continue

channel_id = api.get_conda_channel(conda_store.db, channel)
if channel_id is None:
raise ValueError(
f"channel url={channel} not recognized in conda-store channel database"
)
channel_url_to_id_map[channel] = channel_id.id
package["channel_id"] = channel_id.id

def package_query(package):
return (
_package = (
conda_store.db.query(orm.CondaPackage)
.filter(
and_(
orm.CondaPackage.channel_id
== channel_url_to_id_map[package["base_url"]],
orm.CondaPackage.subdir == package["platform"],
orm.CondaPackage.name == package["name"],
orm.CondaPackage.version == package["version"],
orm.CondaPackage.build == package["build_string"],
orm.CondaPackage.build_number == package["build_number"],
)
)
.filter(orm.CondaPackage.md5 == package["md5"])
.filter(orm.CondaPackage.channel_id == package["channel_id"])
.first()
)

for package in packages:
if package["base_url"] == "https://conda.anaconda.org/pypi":
# ignore pypi packages
continue

_package = package_query(package)

if _package is not None:
build.packages.append(_package)
if _package is None:
_package = orm.CondaPackage(**package)
conda_store.db.add(_package)
build.packages.append(_package)

conda_store.storage.set(
conda_store.db,
Expand Down Expand Up @@ -172,7 +154,7 @@ def build_conda_environment(conda_store, build):
with utils.timer(conda_store.log, f"chown of {conda_prefix}"):
utils.chown(conda_prefix, uid, gid)

packages = conda.conda_list(conda_prefix, executable=conda_store.conda_command)
packages = conda.conda_prefix_packages(conda_prefix)
build.size = utils.disk_usage(conda_prefix)

set_build_completed(conda_store, build, output.encode("utf-8"), packages)
Expand Down
60 changes: 56 additions & 4 deletions conda-store-server/conda_store_server/conda.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,21 @@
"""Interface between Conda-Store and conda
This module provides all the functionality that is required for
executing conda commands
"""

import os
import json
import subprocess
import bz2
import datetime
import hashlib

import yarl
import requests
from conda_pack import pack
from conda.base.context import context
from conda.core.prefix_data import PrefixData


def normalize_channel_name(channel_alias, channel):
Expand All @@ -21,9 +32,7 @@ def conda_list(prefix, executable: str = "conda"):


def conda_pack(prefix, output):
import conda_pack

conda_pack.pack(prefix=str(prefix), output=str(output))
pack(prefix=str(prefix), output=str(output))


def download_repodata(
Expand Down Expand Up @@ -87,6 +96,49 @@ def conda_platform():
It will be one of the values in ``conda.base.constants.KNOWN_SUBDIRS``.
"""
from conda.base.context import context

return context.subdir


def conda_prefix_packages(prefix):
"""
Returns a list of the packages that exist for a given prefix
"""
packages = []

prefix_data = PrefixData(prefix)
prefix_data.load()

for record in prefix_data.iter_records():
package = {
"build": record.build,
"build_number": record.build_number,
"constrains": list(record.constrains),
"depends": list(record.depends),
"license": record.license,
"license_family": record.license_family,
"md5": hashlib.md5(
open(record.package_tarball_full_path, "rb").read()
).hexdigest(),
"sha256": hashlib.sha256(
open(record.package_tarball_full_path, "rb").read()
).hexdigest(),
"name": record.name,
"size": record.size,
"subdir": record.subdir,
"timestamp": record.timestamp,
"version": record.version,
"channel_id": record.channel.base_url,
"summary": None,
"description": None,
}

info_json = os.path.join(record.extracted_package_dir, "info/about.json")
if os.path.exists(info_json):
info = json.load(open(info_json))
package["summary"] = info.get("summary")
package["description"] = info.get("description")

packages.append(package)
return packages
91 changes: 52 additions & 39 deletions conda-store-server/conda_store_server/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import enum
import datetime
import shutil
import itertools
import math

from sqlalchemy import (
Table,
Expand Down Expand Up @@ -271,49 +273,60 @@ class CondaChannel(Base):
name = Column(Unicode(255), unique=True, nullable=False)
last_update = Column(DateTime)

def update_packages(self, db, subdirs=None):
def update_packages(self, db, subdirs=None, batch_size=5000):
repodata = download_repodata(self.name, self.last_update, subdirs=subdirs)

for architecture in repodata["architectures"]:
packages = list(
repodata["architectures"][architecture]["packages"].values()
)

existing_architecture_sha256 = {
_[0]
for _ in db.query(CondaPackage.sha256)
.filter(CondaPackage.channel_id == self.id)
.filter(CondaPackage.subdir == architecture)
.all()
}
for package in packages:
if package["sha256"] not in existing_architecture_sha256:
db.add(
CondaPackage(
build=package["build"],
build_number=package["build_number"],
constrains=package.get("constrains"),
depends=package["depends"],
license=package.get("license"),
license_family=package.get("liciense_family"),
md5=package["md5"],
sha256=package["sha256"],
name=package["name"],
size=package["size"],
subdir=package.get("subdir"),
timestamp=package.get("timestamp"),
version=package["version"],
channel_id=self.id,
summary=repodata.get("packages", {})
.get(package["name"], {})
.get("summary"),
description=repodata.get("packages", {})
.get(package["name"], {})
.get("description"),
# key_depth is the number of characters to use for splitting
# up the packages for inserting based on the sha256
# the batch size is roughly equal to total_size / 16^key_depth
# this formula ensures that we use roughly the batch size
total_size = len(repodata["architectures"][architecture]["packages"])
key_depth = round(math.log(total_size / batch_size) / math.log(16))

for key, packages in itertools.groupby(
sorted(
repodata["architectures"][architecture]["packages"].values(),
key=lambda p: p["sha256"],
),
key=lambda p: p["sha256"][:key_depth],
):
existing_architecture_sha256 = {
_[0]
for _ in db.query(CondaPackage.sha256)
.filter(CondaPackage.channel_id == self.id)
.filter(CondaPackage.subdir == architecture)
.filter(CondaPackage.sha256.startswith(key))
.all()
}
for package in packages:
if package["sha256"] not in existing_architecture_sha256:
db.add(
CondaPackage(
build=package["build"],
build_number=package["build_number"],
constrains=package.get("constrains"),
depends=package["depends"],
license=package.get("license"),
license_family=package.get("liciense_family"),
md5=package["md5"],
sha256=package["sha256"],
name=package["name"],
size=package["size"],
subdir=package.get("subdir"),
timestamp=package.get("timestamp"),
version=package["version"],
channel_id=self.id,
summary=repodata.get("packages", {})
.get(package["name"], {})
.get("summary"),
description=repodata.get("packages", {})
.get(package["name"], {})
.get("description"),
)
)
)
existing_architecture_sha256.add(package["sha256"])
db.commit()
existing_architecture_sha256.add(package["sha256"])
db.commit()

self.last_update = datetime.datetime.utcnow()
db.commit()
Expand Down
8 changes: 8 additions & 0 deletions conda-store-server/conda_store_server/worker/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import random

from celery import Task, current_app
from celery.signals import worker_ready
import yaml
from sqlalchemy.exc import IntegrityError

Expand All @@ -16,6 +17,13 @@
)


@worker_ready.connect
def at_start(sender, **k):
with sender.app.connection():
sender.app.send_task("task_update_conda_channels")
sender.app.send_task("task_watch_paths")


class WorkerTask(Task):
_worker = None

Expand Down

0 comments on commit adc5305

Please sign in to comment.