Skip to content

Commit

Permalink
refactor(snakemake): upgrade snakemake 7 to snakemake 8 (reanahub#471)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alputer committed Oct 23, 2024
1 parent 564c027 commit 4d98bf3
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 25 deletions.
1 change: 1 addition & 0 deletions AUTHORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ The list of contributors in alphabetical order:
- [Adelina Lintuluoto](https://orcid.org/0000-0002-0726-1452)
- [Agisilaos Kounelis](https://orcid.org/0000-0001-9312-3189)
- [Alastair Lyall](https://orcid.org/0009-0000-4955-8935)
- [Alp Tuna](https://orcid.org/0009-0001-1915-3993)
- [Audrius Mecionis](https://orcid.org/0000-0002-3759-1663)
- [Bruno Rosendo](https://orcid.org/0000-0002-0923-3148)
- [Burt Holzman](https://orcid.org/0000-0001-5235-6314)
Expand Down
3 changes: 3 additions & 0 deletions reana_commons/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,3 +499,6 @@ def default_workspace():
"REANA_KRB5_CONFIGMAP_NAME", f"{REANA_COMPONENT_PREFIX}-krb5-conf"
)
"""Kerberos configMap name."""

SNAKEMAKE_MAX_PARALLEL_JOBS = int(os.getenv("SNAKEMAKE_MAX_PARALLEL_JOBS", "300"))
"""Snakemake maximum number of jobs that can run in parallel."""
119 changes: 107 additions & 12 deletions reana_commons/snakemake.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,64 @@
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2021, 2022 CERN.
# Copyright (C) 2021, 2022, 2024 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""REANA Snakemake Workflow utils."""

import os
import sys
from itertools import filterfalse, chain
from typing import Any, Dict, List, Optional

from snakemake import snakemake
from snakemake.dag import DAG
from snakemake.io import load_configfile
from snakemake.jobs import Job
from snakemake.persistence import Persistence
from snakemake.rules import Rule
from snakemake.workflow import Workflow
from pathlib import Path

if sys.version_info >= (3, 11):
from snakemake.api import SnakemakeApi
from snakemake.settings.types import (
ResourceSettings,
WorkflowSettings,
ConfigSettings,
OutputSettings,
StorageSettings,
DeploymentSettings,
)
else:
from snakemake import snakemake
from snakemake.dag import DAG
from snakemake.io import load_configfile
from snakemake.jobs import Job
from snakemake.persistence import Persistence
from snakemake.rules import Rule
from snakemake.workflow import Workflow

from reana_commons.errors import REANAValidationError
from reana_commons.config import SNAKEMAKE_MAX_PARALLEL_JOBS


def snakemake_validate(
workflow_file: str, configfiles: List[str], workdir: Optional[str] = None
):
"""Validate Snakemake workflow specification.
"""Validate snakemake workflow."""
if sys.version_info >= (3, 11):
snakemake_validate_v8(workflow_file, configfiles, workdir)
else:
snakemake_validate_v7(workflow_file, configfiles, workdir)


def snakemake_load(workflow_file: str, **kwargs: Any):
"""Load snakemake specification."""
if sys.version_info >= (3, 11):
return snakemake_load_v8(workflow_file, **kwargs)
else:
return snakemake_load_v7(workflow_file, **kwargs)


def snakemake_validate_v7(
workflow_file: str, configfiles: List[str], workdir: Optional[str] = None
):
"""Old snakemake validate version for python versions < 3.11, it is needed since snakemake8 dropped support for python < 3.11.
:param workflow_file: A specification file compliant with
`snakemake` workflow specification.
Expand All @@ -47,8 +79,45 @@ def snakemake_validate(
raise REANAValidationError("Snakemake specification is invalid.")


def snakemake_load(workflow_file: str, **kwargs: Any) -> Dict:
"""Load Snakemake workflow specification into an internal representation.
def snakemake_validate_v8(
workflow_file: str, configfiles: List[str], workdir: Optional[str] = None
):
"""Seems to be enough for the first validation. We may move to snakemake --dry-run when the validation process will be fully moved to the server-side.
:param workflow_file: A specification file compliant with
`snakemake` workflow specification.
:type workflow_file: string
:param configfiles: List of config files paths.
:type configfiles: List
:param workdir: Path to working directory.
:type workdir: string or None
"""
with SnakemakeApi(
OutputSettings(
quiet=True,
)
) as snakemake_api:
try:
workflow_api = snakemake_api.workflow(
resource_settings=ResourceSettings(nodes=SNAKEMAKE_MAX_PARALLEL_JOBS),
config_settings=ConfigSettings(configfiles=configfiles),
storage_settings=StorageSettings(),
storage_provider_settings=dict(),
workflow_settings=WorkflowSettings(),
deployment_settings=DeploymentSettings(),
snakefile=workflow_file,
workdir=workdir,
)

workflow_api.dag()

except Exception as e:
snakemake_api.print_exception(e)
raise REANAValidationError("Snakemake specification is invalid.")


def snakemake_load_v7(workflow_file: str, **kwargs: Any):
"""Load Snakemake workflow specification into an internal representation. Used for python <3.11 and it is needed since snakemake8 dropped support for python 3.11.
:param workflow_file: A specification file compliant with
`snakemake` workflow specification.
Expand Down Expand Up @@ -216,3 +285,29 @@ def relpath(f):
if not rule.norun
],
}


def snakemake_load_v8(workflow_file: str, **kwargs: Any):
"""Load Snakemake workflow specification into an internal representation.
:param workflow_file: A specification file compliant with
`snakemake` workflow specification.
:type workflow_file: string
:returns: Dictonary containing relevant workflow metadata.
"""
workdir = kwargs.get("workdir")
if workdir:
workflow_file = os.path.join(workdir, workflow_file)

workflow_file = Path(workflow_file) # convert str to Path
configfiles = [Path(kwargs.get("input"))] if kwargs.get("input") else []

snakemake_validate(
workflow_file=workflow_file, configfiles=configfiles, workdir=workdir
)

return {
"job_dependencies": {},
"steps": [],
}
21 changes: 9 additions & 12 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,19 @@
],
"cwl": ["cwltool==3.1.20210628163208"],
"snakemake": [
# install patched version of snakemake v7 that works with Python 3.12
# see https://github.com/snakemake/snakemake/issues/2480
# see https://github.com/snakemake/snakemake/issues/2648
# see https://github.com/snakemake/snakemake/issues/2657
"snakemake @ git+https://github.com/mdonadoni/snakemake.git@cea31624976989ad0645eb2e1751260d32259506", # branch `7.32.4-python3.12`
"pulp>=2.7.0,<2.8.0",
"snakemake==7.32.4 ; python_version<'3.11'",
"pulp>=2.7.0,<2.8.0 ; python_version<'3.11'",
"snakemake==8.24.1 ; python_version>='3.11'",
"snakemake-interface-common==1.17.4 ; python_version>='3.11'",
"snakemake-interface-executor-plugins==9.3.2 ; python_version>='3.11'",
"snakemake-interface-storage-plugins==3.3.0 ; python_version>='3.11'",
"snakemake-interface-report-plugins==1.1.0 ; python_version>='3.11'",
],
"snakemake-reports": [
"snakemake[reports] @ git+https://github.com/mdonadoni/snakemake.git@cea31624976989ad0645eb2e1751260d32259506", # branch `7.32.4-python3.12`
"pulp>=2.7.0,<2.8.0",
"snakemake-xrootd": [
"snakemake-storage-plugin-xrootd==0.1.4 ; python_version>='3.11'",
],
}

# backwards compatibility with extras before PEP 685
extras_require["snakemake_reports"] = extras_require["snakemake-reports"]

extras_require["all"] = []
for key, reqs in extras_require.items():
if ":" == key[0]:
Expand Down
7 changes: 6 additions & 1 deletion tests/test_snakemake.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,15 @@
import os
import pytest
import sys
from pathlib import Path

from reana_commons.snakemake import snakemake_load


@pytest.mark.xfail(
sys.version_info >= (3, 11),
reason="Test expted to fail for python versions 3.11 and above as we currently return only empty dictionary in snakemake_load function for python >= 3.11. Development is blocked until we move validation to the server side.",
)
def test_snakemake_load(tmpdir, dummy_snakefile):
"""Test that Snakemake metadata is loaded properly."""
workdir = tmpdir.mkdir("sub")
Expand All @@ -28,7 +33,7 @@ def test_snakemake_load(tmpdir, dummy_snakefile):
assert len(workdir.listdir()) == 2

os.chdir(tmpdir)
metadata = snakemake_load(p.strpath, workdir=workdir.strpath)
metadata = snakemake_load(Path(p.strpath), workdir=Path(workdir.strpath))
# check that the cwd is preserved
assert os.getcwd() == tmpdir

Expand Down

0 comments on commit 4d98bf3

Please sign in to comment.