Skip to content

Commit

Permalink
Merge pull request #1227 from akaviaLab/refactor/flake8-flux_analysis
Browse files Browse the repository at this point in the history
flake8 of flux_analysis directory
  • Loading branch information
Midnighter authored May 29, 2022
2 parents a865ee9 + a51024a commit 7d97819
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 12 deletions.
2 changes: 2 additions & 0 deletions src/cobra/flux_analysis/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Provide functions related to Flux Analysis."""

from .deletion import (
double_gene_deletion,
double_reaction_deletion,
Expand Down
13 changes: 7 additions & 6 deletions src/cobra/flux_analysis/deletion.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ def _multi_deletion(
elif "room" in method:
add_room(model, solution=solution, linear="linear" in method, **kwargs)

args = set([frozenset(comb) for comb in product(*element_lists)])
args = {frozenset(comb) for comb in product(*element_lists)}
processes = min(processes, len(args))

def extract_knockout_results(result_iter):
Expand All @@ -223,9 +223,10 @@ def extract_knockout_results(result_iter):
return result

if processes > 1:
worker = dict(
gene=_gene_deletion_worker, reaction=_reaction_deletion_worker
)[entity]
worker = {
"gene": _gene_deletion_worker,
"reaction": _reaction_deletion_worker,
}[entity]
chunk_size = len(args) // processes

with ProcessPool(
Expand All @@ -235,7 +236,7 @@ def extract_knockout_results(result_iter):
pool.imap_unordered(worker, args, chunksize=chunk_size)
)
else:
worker = dict(gene=_gene_deletion, reaction=_reaction_deletion)[entity]
worker = {"gene": _gene_deletion, "reaction": _reaction_deletion}[entity]
results = extract_knockout_results(map(partial(worker, model), args))
return results

Expand Down Expand Up @@ -634,7 +635,7 @@ def __getitem__(
args = [{obj} for obj in args]
elif isinstance(args[0], set):
try:
args = [set(elem.id for elem in obj) for obj in args]
args = [{elem.id for elem in obj} for obj in args]
except AttributeError:
args = [set(obj) for obj in args]
else:
Expand Down
6 changes: 3 additions & 3 deletions src/cobra/flux_analysis/gapfilling.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


import logging
from typing import TYPE_CHECKING, Dict, List, Optional
from typing import TYPE_CHECKING, Dict, List, Optional, Union

from optlang.interface import OPTIMAL
from optlang.symbolics import Zero
Expand Down Expand Up @@ -107,7 +107,7 @@ def __init__(
model: Model,
universal: Optional[Model] = None,
lower_bound: float = 0.05,
penalties: Optional[Dict[str, "Reaction"]] = None,
penalties: Optional[Union[Dict[str, int], Dict["Reaction", int]]] = None,
exchange_reactions: bool = False,
demand_reactions: bool = True,
integer_threshold: float = 1e-6,
Expand Down Expand Up @@ -136,7 +136,7 @@ def __init__(
# threshold when it is not supported by the chosen solver.
self.integer_threshold = integer_threshold
self.universal = universal.copy() if universal else Model("universal")
self.penalties = dict(universal=1, exchange=100, demand=1)
self.penalties = {"universal": 1, "exchange": 100, "demand": 1}
if penalties is not None:
self.penalties.update(penalties)
self.indicators = []
Expand Down
2 changes: 1 addition & 1 deletion src/cobra/flux_analysis/phenotype_phase_plane.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def production_envelope(
"""
reactions = model.reactions.get_by_any(reactions)
objective = model.solver.objective if objective is None else objective
data = dict()
data = {}

if carbon_sources is None:
c_input = _find_carbon_sources(model)
Expand Down
4 changes: 2 additions & 2 deletions src/cobra/flux_analysis/reaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def assess(
if m.slim_optimize(error_value=0.0) >= flux_coefficient_cutoff:
return True
else:
results = dict()
results = {}
results["precursors"] = assess_component(
model, reaction, "reactants", flux_coefficient_cutoff
)
Expand Down Expand Up @@ -113,7 +113,7 @@ def assess_component(
"""
reaction = model.reactions.get_by_any(reaction)[0]
result_key = dict(reactants="produced", products="capacity")[side]
result_key = {"reactants": "produced", "products": "capacity"}[side]
get_components = attrgetter(side)
with model as m:
m.objective = reaction
Expand Down

0 comments on commit 7d97819

Please sign in to comment.