Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PERF: Rely on BIOM for upstream data manipulation #149

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ install:
script:
- source activate test-env
- flake8 sourcetracker setup.py
- nosetests -v sourcetracker sourcetracker/* --with-coverage --cover-package=sourcetracker
- nosetests -v sourcetracker sourcetracker/*/tests/*.py --with-coverage --cover-package=sourcetracker
after_success:
- coveralls
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def run(self):
'h5py',
'seaborn'],
classifiers=classifiers,
package_data={'sourcetracker/_q2': ['citations.bib']},
package_data={'sourcetracker': ['citations.bib']},
entry_points={'qiime2.plugins': q2cmds,
'console_scripts': standalone},
cmdclass={'install': CustomInstallCommand,
Expand Down
4 changes: 2 additions & 2 deletions sourcetracker/_cli/gibbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from sourcetracker._cli import cli
from sourcetracker._gibbs import gibbs_helper
from sourcetracker._plot import plot_heatmap
from sourcetracker._util import parse_sample_metadata, biom_to_df
from sourcetracker._util import parse_sample_metadata

# import default descriptions
from sourcetracker._gibbs_defaults import (DESC_TBL, DESC_MAP, DESC_OUT,
Expand Down Expand Up @@ -145,7 +145,7 @@ def gibbs(table_fp: Table,

# Load the metadata file and feature table.
sample_metadata = parse_sample_metadata(open(mapping_fp, 'U'))
feature_table = biom_to_df(load_table(table_fp))
feature_table = load_table(table_fp)

# run the gibbs sampler helper function (same used for q2)
results = gibbs_helper(feature_table, sample_metadata, loo, jobs,
Expand Down
87 changes: 41 additions & 46 deletions sourcetracker/_gibbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@

import pandas as pd
from biom import Table
from scipy.sparse import csr_matrix
from sourcetracker._sourcetracker import (intersect_and_sort_samples,
get_samples, collapse_source_data,
subsample_dataframe,
validate_gibbs_input)
from sourcetracker._sourcetracker import gibbs as _gibbs
from sourcetracker._util import biom_to_df
# import default values
from sourcetracker._gibbs_defaults import (DEFAULT_ALPH1, DEFAULT_ALPH2,
DEFAULT_TEN, DEFAULT_ONE,
Expand All @@ -40,15 +39,14 @@ def gibbs(feature_table: Table,
draws_per_restart: int = DEFAULT_ONE,
burnin: int = DEFAULT_HUND,
delay: int = DEFAULT_ONE,
per_sink_feature_assignments: bool = DEFAULT_TRU,
per_sink_feature_assignments: bool = DEFAULT_FLS,
sample_with_replacement: bool = DEFAULT_FLS,
source_sink_column: str = DEFAULT_SNK,
source_column_value: str = DEFAULT_SRS,
sink_column_value: str = DEFAULT_SRS2,
source_category_column: str = DEFAULT_CAT)\
-> (pd.DataFrame, pd.DataFrame, Table, pd.DataFrame):
-> (pd.DataFrame, pd.DataFrame):
# convert tables
feature_table = biom_to_df(feature_table)
sample_metadata = sample_metadata.to_dataframe()
# run the gibbs sampler helper function (same used for q2)
results = gibbs_helper(feature_table, sample_metadata, loo, jobs,
Expand All @@ -60,36 +58,27 @@ def gibbs(feature_table: Table,
source_category_column)
# get the results (with fas)
# here we only return the three df (via q2)
mpm, mps, fas = results
mpm, mps = results
# make list filter

def filter_list(inds, factor): return [ind for ind in list(inds)
if ind not in factor]
# concat each sink-source (dropping sources with same name as sink)
fas_merged = pd.concat({sink: source.reindex(filter_list(source.index,
sink))
for sink, source in zip(mpm.columns, fas)})
# if loo is True then columns are source-source
if loo:
columns_ = ['Source_one', 'Source_two']
# if loo is False then columns as sink-source
else:
columns_ = ['Sink', 'Source']
# make the index map and mapping in the same step
ss_map = {'sample%i' % i: list(map(str, v))
for i, v in enumerate(fas_merged.index.tolist())}
ss_map = pd.DataFrame(ss_map, columns_).T
ss_map.index.name = 'sampleid'
# output for QIIME2
fas_merged.index = ss_map.index
fas_merged = Table(fas_merged.T.values,
fas_merged.T.index,
fas_merged.T.columns)
# this is because QIIME will only
# support these for now
# in the future we will work
# on supporting collections (i.e. fas)
return mpm, mps, fas_merged, ss_map
return mpm, mps


def subsample(table, depth, replacement):
"""Ensure all features are present in the resulting table"""
all_feat = table.ids(axis='observation')
ss = table.subsample(depth, with_replacement=replacement)
missing_feat = set(all_feat) - set(ss.ids(axis='observation'))
zerod_feat = Table(csr_matrix((len(missing_feat), len(table.ids()))),
list(missing_feat),
list(table.ids()))
newtab = ss.concat(zerod_feat, axis='observation')
return newtab.sort_order(all_feat, axis='observation')


def gibbs_helper(feature_table: Table,
Expand Down Expand Up @@ -118,8 +107,6 @@ def gibbs_helper(feature_table: Table,
This function is a helper that applies to both the click and QIIME2
command line functionality.
'''

# Do high level check on feature data.
feature_table = validate_gibbs_input(feature_table)

# Remove samples not shared by both feature and metadata tables and order
Expand All @@ -142,53 +129,61 @@ def gibbs_helper(feature_table: Table,
'the help documentation and check your mapping '
'file.') % (source_sink_column, source_column_value))

# Prepare the 'sources' matrix by collapsing the `source_samples` by their
# metadata values.
csources = collapse_source_data(sample_metadata, feature_table,
source_samples, source_category_column,
'mean')
if loo:
# sources are not collapsed for LOO
csources = feature_table.filter(set(source_samples),
inplace=False).remove_empty()
else:
# Prepare the 'sources' matrix by collapsing the `source_samples` by
# their metadata values.
csources = collapse_source_data(sample_metadata, feature_table,
source_samples, source_category_column,
'mean')

# Rarify collapsed source data if requested.
if source_rarefaction_depth > 0:
d = (csources.sum(1) >= source_rarefaction_depth)
d = (csources.sum('sample') >= source_rarefaction_depth)
if not d.all():
count_too_shallow = (~d).sum()
shallowest = csources.sum(1).min()
shallowest = csources.sum('sample').min()
raise ValueError(('You requested rarefaction of source samples at '
'%s, but there are %s collapsed source samples '
'that have less sequences than that. The '
'shallowest of these is %s sequences.') %
(source_rarefaction_depth, count_too_shallow,
shallowest))
else:
csources = subsample_dataframe(csources, source_rarefaction_depth,
replace=sample_with_replacement)

csources = subsample(csources,
source_rarefaction_depth,
sample_with_replacement)
# Prepare to rarify sink data if we are not doing LOO. If we are doing loo,
# we skip the rarefaction, and set sinks to `None`.
if not loo:
sinks = feature_table.loc[sink_samples, :]
sinks = feature_table.filter(set(sink_samples), inplace=False)
if sink_rarefaction_depth > 0:
d = (sinks.sum(1) >= sink_rarefaction_depth)
d = (sinks.sum('sample') >= sink_rarefaction_depth)
if not d.all():
count_too_shallow = (~d).sum()
shallowest = sinks.sum(1).min()
shallowest = sinks.sum('sample').min()
raise ValueError(('You requested rarefaction of sink samples '
'at %s, but there are %s sink samples that '
'have less sequences than that. The '
'shallowest of these is %s sequences.') %
(sink_rarefaction_depth, count_too_shallow,
shallowest))
else:
sinks = subsample_dataframe(sinks, sink_rarefaction_depth,
replace=sample_with_replacement)
sinks = subsample(sinks,
sink_rarefaction_depth,
sample_with_replacement) # noqa
else:
sinks = None

# Run the computations.
mpm, mps, fas = _gibbs(csources, sinks, alpha1, alpha2, beta, restarts,
draws_per_restart, burnin, delay, jobs,
create_feature_tables=per_sink_feature_assignments)
create_feature_tables=per_sink_feature_assignments,
sample_metadata=sample_metadata,
source_category_column=source_category_column)
# number of returns chnages based on flag
# this was refactored for QIIME2
# transpose to follow convention
Expand Down
12 changes: 4 additions & 8 deletions sourcetracker/_q2/plugin_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
'draws_per_restart': Int,
'burnin': Int,
'delay': Int,
'per_sink_feature_assignments': Bool % Choices(True),
'per_sink_feature_assignments': Bool,
'sample_with_replacement': Bool,
'source_sink_column': Str,
'source_column_value': Str,
Expand Down Expand Up @@ -76,7 +76,7 @@
'source_category_column': DESC_CAT}

citations = qiime2.plugin.Citations.load(
'_q2/citations.bib', package='sourcetracker')
'citations.bib', package='sourcetracker')

plugin = qiime2.plugin.Plugin(
name='sourcetracker2',
Expand All @@ -92,15 +92,11 @@
inputs={'feature_table': FeatureTable[Frequency]},
parameters=PARAMETERS,
outputs=[('mixing_proportions', FeatureTable[RelativeFrequency]),
('mixing_proportion_stds', FeatureTable[RelativeFrequency]),
('per_sink_assignments', FeatureTable[RelativeFrequency]),
('per_sink_assignments_map', SampleData[SinkSourceMap])],
('mixing_proportion_stds', FeatureTable[RelativeFrequency])],
input_descriptions={'feature_table': DESC_TBL},
parameter_descriptions=PARAMETERDESC,
output_descriptions={'mixing_proportions': OUT_MEAN,
'mixing_proportion_stds': OUT_STD,
'per_sink_assignments': OUT_PFA,
'per_sink_assignments_map': OUT_PFAM},
'mixing_proportion_stds': OUT_STD},
name='sourcetracker2 gibbs',
description=('SourceTracker2 is a highly parallel version of '
'SourceTracker that was originally described in'
Expand Down
Loading