diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..877c07e --- /dev/null +++ b/.coveragerc @@ -0,0 +1,19 @@ +# this file is based on the examples provided on scikit-learn's .coveragerc +# and is adapted from scikit-bio's .coveragerc + +[run] +omit = + */tests* + */__init__.py +source = sourcetracker +branch = True +include = */sourcetracker/* + +[report] +exclude_lines = + pragma: no cover + raise NotImplementedError + if __name__ == .__main__.: +omit = + */tests* + */__init__.py diff --git a/.gitignore b/.gitignore index c4a1b47..eede20e 100644 --- a/.gitignore +++ b/.gitignore @@ -69,3 +69,4 @@ target/ # example output data/tiny-test/mixing_proportions data/tiny-test/source_loo +data/tiny-test/example* diff --git a/.travis.yml b/.travis.yml index 8515ded..9c747df 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,11 +12,13 @@ before_install: install: - conda create --yes -n st2 python=$PYTHON_VERSION nose numpy scipy h5py flake8 scikit-bio=0.4.0 - source activate st2 + - pip install coveralls - pip install . script: - - nosetests + - nosetests --with-coverage - flake8 sourcetracker setup.py - sourcetracker2 gibbs --help - - cd data/tiny-test/ - - sourcetracker2 gibbs -i otu_table.biom -m map.txt -o example1/ - - sourcetracker2 gibbs -i otu_table.biom -m alt-map.txt -o example2/ --source_sink_column source-or-sink --source_column_value src --sink_column_value snk --source_category_column sample-type + - sourcetracker2 gibbs -i data/tiny-test/otu_table.biom -m data/tiny-test/map.txt -o example1/ + - sourcetracker2 gibbs -i data/tiny-test/otu_table.biom -m data/tiny-test/alt-map.txt -o example2/ --source_sink_column source-or-sink --source_column_value src --sink_column_value snk --source_category_column sample-type +after_success: + - coveralls diff --git a/ChangeLog.md b/ChangeLog.md index 8ded383..dd3cd30 100644 --- a/ChangeLog.md +++ b/ChangeLog.md @@ -2,6 +2,13 @@ ## 2.0.1-dev (changes since 2.0.1 go here) + * A candidate public API has been created for both normal sink/source + prediction and leave-one-out (LOO) classification. These calls are + ``_gibbs`` and ``_gibbs_loo``. + * The per-sink feature assignments are recorded for every run and written to + the output directory. They are named as X.contingency.txt where X is the + name of a sink. + ## 2.0.1 * Initial alpha release. diff --git a/README.md b/README.md index ae619e2..43f8821 100644 --- a/README.md +++ b/README.md @@ -1,20 +1,101 @@ -[![Build Status](https://travis-ci.org/biota/sourcetracker2.svg?branch=master)](https://travis-ci.org/biota/sourcetracker2) - # SourceTracker2 +[![Build Status](https://travis-ci.org/biota/sourcetracker2.svg?branch=master)](https://travis-ci.org/biota/sourcetracker2) [![Coverage Status](https://coveralls.io/repos/github/biota/sourcetracker2/badge.svg)](https://coveralls.io/github/biota/sourcetracker2) SourceTracker was originally described in [Knights et al., 2011](http://www.ncbi.nlm.nih.gov/pubmed/21765408). If you use this package, please cite the original SourceTracker paper linked above pending publication of SourceTracker 2. +# API vs. CLI + +There are two ways to access the SourceTracker 2 functionality, via the command +line (CLI) or the python API. Users seeking to replicate the functionality +of SourceTracker 1 should use the command line functionality (`sourcetracker2 gibbs`) +Programmatic users are encouraged to use the API (exposed via `gibbs` and `gibbs_loo`). + +The help documentation is broken down into sections with separate subsections for +API and CLI usage. + +# File Formats + +## Command Line +For descriptions of all file formats and options, please see the help +documentation, available with the command `sourcetracker2 gibbs --help`. + +This script requires a feature X sample contingency table (traditionally an OTU +table) and sample metadata (traditionally a mapping file). + +The feature X sample table is an `nXm` table (`n` rows, `m` columns) with sample IDs in the first column, and feature IDs in the first row. The values in each 'cell' of the table +must be integer counts. + +The sample metadata file is an `sXk` table (`s` rows, `k` columns) with sample IDs in +the first column, and metadata headers in the first row. The values in each 'cell' of the +table can be any type of data, and respresent information about each sample. + +Any feature table that can be read by the `biom-format >= 2.1.4` package will be +acceptable input. For example file formats please look at the test mapping file +and feature (OTU) tables we have included [here](https://github.com/biota/sourcetracker2/tree/master/data/tiny-test). + +## API +For descriptions of the requirements, please see documentation in the `gibbs` +function. Very briefly, this function wraps the main workhorse function `_gibbs_sampler` +and exposes all the parameters necessary to control the behavior of the Gibb's sampling +as well as the parallel functionality etc. + +A superficial but important difference from the CLI framework is that, internally, +SourceTracker 2 represents all tables as sample X feature (samples are rows, +columns are features). This reflects choices in Dan's original code, as well as +eases metadata based subsetting of tables. The API functions expect data in +sample X column format. + +# Preprocessing + +## Command line +Input feature data should be counts. If non-count data (e.g. the count of +feature i in sample j was 4.63) is passed, the 'ceiling' of the data will be +taken. This means that each non-integer count will be rounded up to the nearest +larger integer. + +Rarefaction is performed by default at 1000 seqs/sample for both sinks and +sources. This is done to prevent samples with more counts from dominating the +contributions. Rarefaction depth can be set (or entirely disabled) with the ``--source_rarefaction_depth`` and ``--sink_rarefaction_depth`` parameters. Source +samples which are collapsed are rarefied after collapse. + +Samples which are not present in both the input feature table and the metadata +are excluded from the analysis. + +Samples which come from the same source environment are 'collapsed', meaning +their mean value for each feature is computed and used in the analysis. See the +'Theory' section below for a discussion of this approach. + +## API +The `gibbs` and `gibbs_loo` functions due minimal preprocessing on the input data. +The source and sink dataframes are treated as final (no collapsing is done on them), +i.e. each sink is treated independently. The data is **not** rarified, the tables +are expected to have the desired row sums. + + +# Output + +## Command line +There are two default output files, the `mixing_proporitions.txt` and +`mixing_proportion_stds.txt`. `mixing_proporitions.txt` is a tab-separated contingency table with sinks as rows and sources as columns. The values in the table are the +mean fractional contributions of each source to each sink. `mixing_proporitions_stds` +has the same format, but contains the standard deviation of each fractional contribution. + +Optionally, you can create per-sink feature X sample tables with the `--per_sink_feature_assignments` flag. The per-sink feature tables are labeled with +the name of the sink. For example, if we have a sink called 'hand_sample3' the +output feature table would be 'hand_sample3.feature_table.txt'. These tables record the +origin source of each sink sequence (count of a feature). + +## API +The outputs of the `gibbs` and `gibbs_loo` functions are identical to the command line +outputs, just in dataframe form. + + # Documentation This script replicates and extends the functionality of Dan Knights's -SourceTracker R package. - -The `mapping file` which describes the `sources` and `sinks` must be -formatted in the same way it was for the SourceTracker R package. Specifically, -there must be a column `SourceSink` and a column `Env`. For an example, look -at `sourcetracker2/data/tiny-test/`. +SourceTracker R package. A major improvement in this version of SourceTracker is the ability to run it in parallel. Currently, parallelization across a single machine is @@ -35,7 +116,7 @@ SourceTracker2 is Python 3 software. The easiest way to install it is using Anac To install SourceTracker 2 using Anaconda, run the following commands: ```bash -conda create -n st2 python=3.5 numpy scipy h5py hdf5 matplotlib +conda create -n st2 python=3.5 numpy scipy scikit-bio=0.4.3 biom-format h5py hdf5 source activate st2 pip install sourcetracker ``` @@ -141,7 +222,7 @@ These usage examples expect that you are in the directory `sourcetracker2 gibbs -i otu_table.biom -m map.txt -o example1/` **Calculate the proportion of each source in each sink using an alternate sample metadata mapping file where samples are described differently.** -`sourcetracker2 gibbs -i otu_table.biom -m alt-map.txt -o example2/ --source_sink_column source-or-sink --source_column_value src --sink_column_value snk --source_category_column sample-type` +`sourcetracker2 gibbs -i otu_table.biom -m alt-map.txt -o example2/ --source_sink_column source-or-sink --source_column_value src --sink_column_value sink --source_category_column sample-type` **Calculate the class label (i.e. 'Env') of each source using a leave one out strategy** @@ -151,12 +232,17 @@ strategy** `sourcetracker2 gibbs -i otu_table.biom -m map.txt -o example4/ --burnin 100` **Calculate the proportion of each source in each sink, using a sink -rarefaction depth of 1500** -`sourcetracker2 gibbs -i otu_table.biom -m map.txt -o example5/ --sink_rarefaction_depth 2500` +rarefaction depth of 1700 and source rarefaction depth of 2000** +`sourcetracker2 gibbs -i otu_table.biom -m map.txt -o example5/ --sink_rarefaction_depth 1700 --source_rarefaction_depth 2000` **Calculate the proportion of each source in each sink, using ipyparallel to run in parallel with 5 jobs** `sourcetracker2 gibbs -i otu_table.biom -m map.txt -o example6/ --jobs 5` +**Calculate the proportion of each source in each sink, using ipyparallel to run in parallel with 5 jobs. Write the per sink feature tables (what SourceTracker 1 called +'full output'). These are feature by sample table indicating the origin source of each sequence (each count of a feature).** +`sourcetracker2 gibbs -i otu_table.biom -m map.txt -o example7/ --jobs 5 --per_sink_feature_assignments` + + # Miscellaneous The current implementation of SourceTracker 2 does not contain functionality for @@ -164,6 +250,3 @@ visualization of results or auto-tuning of the parameters (`alpha1`, `alpha1`, etc.). For an example of how you might visualize the data, please see this [Juypter notebook](https://github.com/biota/SourceTracker2/blob/master/ipynb/Visualizing%20results.ipynb). For auto-tuning functionality, please see the original R code. - -Like the old SourceTracker, SourceTracker2 rarifies the source environments it -collapses by default. diff --git a/ipynb/.ipynb_checkpoints/Sourcetracking using a Gibbs Sampler-checkpoint.ipynb b/ipynb/.ipynb_checkpoints/Sourcetracking using a Gibbs Sampler-checkpoint.ipynb deleted file mode 100644 index 81bfb7f..0000000 --- a/ipynb/.ipynb_checkpoints/Sourcetracking using a Gibbs Sampler-checkpoint.ipynb +++ /dev/null @@ -1,828 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": 90, - "metadata": { - "collapsed": true - }, - "outputs": [], - "source": [ - "import pandas as pd\n", - "import numpy as np\n", - "from collections import Counter" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## This notebook explains the theory behind microbial sourcetracking using a Gibb's sampler. \n", - "\n", - "## The theory and examples are based on work detailed in [Knights et al. 2011](http://www.nature.com/nmeth/journal/v8/n9/abs/nmeth.1650.html): if you find this work helpful please consider citing Dan Knights paper.\n", - "\n", - "\n", - "#### Note:\n", - "The formula for calculating the probability that a held-out sequence will be assigned to a given environment is reported incorrectly in [Knights et al. 2011](http://www.nature.com/nmeth/journal/v8/n9/abs/nmeth.1650.html). The corrected formula is:\n", - "\n", - "$$P( z_{i} = v \\mid z^{ \\neg i}, x) \\propto P(x_{i} \\mid v) \\times P(v \\mid x^{ \\neg i}) = \\begin{cases}\n", - "\\frac{m_{x_{i}v} + \\alpha_{1}}{m_{v} + \\tau \\alpha_{1}} \\times \\frac{n_{v}^{ \\neg i} + \\beta}{n - 1 + \\beta V} & v < V\n", - "\\\\\n", - "\\\\\n", - "\\frac{m_{x_{i}v} + n \\alpha_{2}}{m_{v} + \\tau n \\alpha_{2}} \\times \\frac{n_{v}^{ \\neg i} + \\beta}{n - 1 + \\beta V} & v = V\n", - "\\end{cases}$$\n", - "\n", - "This updated formula is what is truly being calculated by both the [R sourcetracking algorithm](https://github.com/danknights/sourcetracker) and this repository (personal communication with Dan Knights).\n" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Problem\n", - "\n", - "Given a number of `sources`, determine the contribution of each `source` to a `sink`. " - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "The Gibb's sampler works in four basic steps:\n", - " 1. Randomly assign the sequences in a sink to source environments. These random assignments represent which source a given sink sequence came from.\n", - " 2. Select one of the sequences from step 1, calculate the *actual* probabilities of that sequence having come from any of the source environments, and update the assigned source environment of the sequence based on a random draw with the calculated probabilities. Repeat many times. \n", - " 3. At intervals in the repeats of step 2, take the source environment assingments of all the sequences in a sink and record them. \n", - " 4. After doing step 3 a certain number of times (i.e. recording the full assignments of source environments for each sink sequence), terminate the iteration and move to the next sink. " - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Machinery\n", - "\n", - "There are two probabilities that form the basis of the Gibb's calculation. " - ] - }, - { - "cell_type": "markdown", - "metadata": { - "collapsed": true - }, - "source": [ - "### `p_tv`\n", - "The first is `p_tv`, the probability of seeing a taxon, given any of the sources. Let's look at an example table:" - ] - }, - { - "cell_type": "code", - "execution_count": 70, - "metadata": { - "collapsed": false - }, - "outputs": [ - { - "data": { - "text/html": [ - "
\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
OTU1OTU2OTU3
Source1010100
Source2369
\n", - "
" - ], - "text/plain": [ - " OTU1 OTU2 OTU3\n", - "Source1 0 10 100\n", - "Source2 3 6 9" - ] - }, - "execution_count": 70, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "table = pd.DataFrame([[0, 10, 100], [3, 6, 9]], columns=['OTU1', 'OTU2', 'OTU3'], index=['Source1', 'Source2'])\n", - "table" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "In the above table, we can immediately tell that if we see OTU3 in our sink sample, it is much more likely that it came from Source1. In fact, we can calculate the probability that each OTU belongs to each Source:" - ] - }, - { - "cell_type": "code", - "execution_count": 71, - "metadata": { - "collapsed": false - }, - "outputs": [ - { - "data": { - "text/html": [ - "
\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
OTU1OTU2OTU3
Source1010100
Source2369
sum316109
\n", - "
" - ], - "text/plain": [ - " OTU1 OTU2 OTU3\n", - "Source1 0 10 100\n", - "Source2 3 6 9\n", - "sum 3 16 109" - ] - }, - "execution_count": 71, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "# sum each OTU\n", - "table.loc['sum'] = table.sum(axis=0)\n", - "table" - ] - }, - { - "cell_type": "code", - "execution_count": 72, - "metadata": { - "collapsed": false - }, - "outputs": [ - { - "data": { - "text/html": [ - "
\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
OTU1OTU2OTU3
Source100.6250.917431
Source210.3750.082569
\n", - "
" - ], - "text/plain": [ - " OTU1 OTU2 OTU3\n", - "Source1 0 0.625 0.917431\n", - "Source2 1 0.375 0.082569" - ] - }, - "execution_count": 72, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "# Calculate the probability (p_tv) for each OTU\n", - "table_p_tv = table.loc[['Source1', 'Source2']].div(table.loc['sum'], axis=1)\n", - "table_p_tv" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "What we can see here is the p_tv for each OTU. If we see OTU1 in the sink sample, it could only have come from Source2, and therefore the p_tv (the probability of seeing OTU1) for Source2 is 100%.\n", - "\n", - "If OTU3 was seen in our sink, there would be a 91.7% chance of it coming from Source1 and a 8.3% chance of it coming from Source2. " - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "**Alpha1**\n", - "\n", - "You may note above that OTU1's abundance in Source1 is 0, and therefore no probability can be provided for that OTU to that Source. SourceTracker2 therefore allows the user to specify an `alpha1` value on the command line, which is the prior counts to be added to all of the sample counts. This would therefore provide a small amount of probability to OTU1 in Source1, and the table would look like (assuming an alpha1 of 0.01):" - ] - }, - { - "cell_type": "code", - "execution_count": 73, - "metadata": { - "collapsed": false - }, - "outputs": [ - { - "data": { - "text/html": [ - "
\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
OTU1OTU2OTU3
Source10.110.1100.1
Source23.16.19.1
\n", - "
" - ], - "text/plain": [ - " OTU1 OTU2 OTU3\n", - "Source1 0.1 10.1 100.1\n", - "Source2 3.1 6.1 9.1" - ] - }, - "execution_count": 73, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "table = pd.DataFrame([[0.1, 10.1, 100.1], [3.1, 6.1, 9.1]], columns=['OTU1', 'OTU2', 'OTU3'], index=['Source1', 'Source2'])\n", - "table" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "** Unknown ** and **Alpha2**\n", - "\n", - "A key feature of SourceTracker2 is to create an Unknown source environment. The basic premise here is to identify sequences that don't have a high probability of being derived from the known and specified Source Environments. An Unknown community is propogated with sequences with the `alpha2` parameter that specifies what percent of the total sink sequence count to be added for each OTU in the Unknown. For example, a sink with 100 sequences and an alpha of 0.001 would add `100 * 0.001 = 0.1` counts to each OTU, and the table would look like this:" - ] - }, - { - "cell_type": "code", - "execution_count": 86, - "metadata": { - "collapsed": false - }, - "outputs": [ - { - "data": { - "text/html": [ - "
\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
OTU1OTU2OTU3
Source10.110.1100.1
Source23.16.19.1
Unknown0.10.10.1
\n", - "
" - ], - "text/plain": [ - " OTU1 OTU2 OTU3\n", - "Source1 0.1 10.1 100.1\n", - "Source2 3.1 6.1 9.1\n", - "Unknown 0.1 0.1 0.1" - ] - }, - "execution_count": 86, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "table = pd.DataFrame([[0.1, 10.1, 100.1], [3.1, 6.1, 9.1], [0.1, 0.1, 0.1]], columns=['OTU1', 'OTU2', 'OTU3'], \n", - " index=['Source1', 'Source2', 'Unknown'])\n", - "table" - ] - }, - { - "cell_type": "code", - "execution_count": 88, - "metadata": { - "collapsed": false - }, - "outputs": [ - { - "data": { - "text/html": [ - "
\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
OTU1OTU2OTU3
Source10.0303030.6196320.915828
Source20.9393940.3742330.083257
Unknown0.0303030.0061350.000915
\n", - "
" - ], - "text/plain": [ - " OTU1 OTU2 OTU3\n", - "Source1 0.030303 0.619632 0.915828\n", - "Source2 0.939394 0.374233 0.083257\n", - "Unknown 0.030303 0.006135 0.000915" - ] - }, - "execution_count": 88, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "table = pd.DataFrame([[0.1, 10.1, 100.1], [3.1, 6.1, 9.1], [0.1, 0.1, 0.1]], columns=['OTU1', 'OTU2', 'OTU3'], \n", - " index=['Source1', 'Source2', 'Unknown'])\n", - "table.loc['sum'] = table.sum(axis=0)\n", - "table_p_tv = table.loc[['Source1', 'Source2', 'Unknown']].div(table.loc['sum'], axis=1)\n", - "table_p_tv" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "These are the precalculated p_tv values we can use" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### 'p_v'\n", - "The next probability is 'p_v', the probability of seeing an environment, given the number of sources. Each sink sample can be thought of a collection of sequences that identify a taxa:" - ] - }, - { - "cell_type": "code", - "execution_count": 77, - "metadata": { - "collapsed": true - }, - "outputs": [], - "source": [ - "sink_otus = [1, 1, 1, 2, 2, 2, 2, 2, 2, 3]" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Above our sink sample contains 10 sequences that each are assigned to one of the OTUs in our table. Let's forget for a moment that each OTU has a certain probability of being found in a given source environment (the p_tv), and instead focus on the number of possible Source environments we have. If our sink sample was completely unidentified with OTU information, it would look like:" - ] - }, - { - "cell_type": "code", - "execution_count": 79, - "metadata": { - "collapsed": false - }, - "outputs": [], - "source": [ - "sink = ['X', 'X', 'X', 'X', 'X', 'X', 'X', 'X', 'X', 'X']" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "And if we only knew that we had 3 source environments that each sink sequence must have come from, then we would say that each sink sequence had a `1/3` probability of being assigned to each source. \n", - "\n", - "Let's randomly assign each sink sequence to a source environment:" - ] - }, - { - "cell_type": "code", - "execution_count": 83, - "metadata": { - "collapsed": true - }, - "outputs": [], - "source": [ - "sink_source = [3, 1, 2, 3, 2, 3, 3, 1, 2, 1]\n", - "# where 1 = Source1, 2 = Source2, and 3 = Unknown" - ] - }, - { - "cell_type": "code", - "execution_count": 84, - "metadata": { - "collapsed": false - }, - "outputs": [ - { - "data": { - "text/plain": [ - "Counter({1: 3, 2: 3, 3: 4})" - ] - }, - "execution_count": 84, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "Counter(sink_source)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Therefore, the probability of seeing Source1 in our sink is `3/10`" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "**Restarts** \n", - "\n", - "When we restart the gibbs sampler, we reshuffle the assignments of each sink sequence to a Source Environment, and begin our calculations again." - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Sampling" - ] - }, - { - "cell_type": "code", - "execution_count": 85, - "metadata": { - "collapsed": true - }, - "outputs": [], - "source": [ - "sink_otus = [1, 1, 1, 2, 2, 2, 2, 2, 2, 3]\n", - "sink_otus_positions_shuffled = [4, 1, 8, 2, 7, 9, 0, 3, 5, 6]" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "The next thing we do is create a shuffled order for which to walk through our sink sequences. The above says that the first sequence we want to predict the source environment contributor for is the `4` index, which relates to OTU2." - ] - }, - { - "cell_type": "code", - "execution_count": 94, - "metadata": { - "collapsed": false - }, - "outputs": [ - { - "data": { - "text/plain": [ - "array([ 0.61836744, 0.37346926, 0.00816331])" - ] - }, - "execution_count": 94, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "# For OTU2, taken from the table above\n", - "# for Source1, Source2, Unknown\n", - "p_tv = np.array([0.619632, 0.374233, 0.006135])\n", - "p_v = np.array([0.3, 0.3, 0.4])\n", - "\n", - "combined_probability = p_tv * p_v\n", - "combined_probability / combined_probability.sum()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "We have now calculated the probability that this sink sequence at index 4 should belong to each Source Environment and the Unknown. We can then randomly assign that sequence to one of the environments given those probability distributions. We also update our p_v by adding 1 to the selected source community.\n", - "\n", - "**IF** the sequence gets assigned to the Unknown community, then the Unknown community gets an extra count in the table. In this way, the Unknown community gets propogated through repeated iterations. No changes are made to the table if the sequence gets assigned to an identified source (Source1 or Source2).\n", - "\n", - "Let's assume the unlikely case that our sample _did_ get assigned to the Unknown." - ] - }, - { - "cell_type": "code", - "execution_count": 95, - "metadata": { - "collapsed": false - }, - "outputs": [ - { - "data": { - "text/html": [ - "
\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
OTU1OTU2OTU3
Source10.110.1100.1
Source23.16.19.1
Unknown0.11.10.1
\n", - "
" - ], - "text/plain": [ - " OTU1 OTU2 OTU3\n", - "Source1 0.1 10.1 100.1\n", - "Source2 3.1 6.1 9.1\n", - "Unknown 0.1 1.1 0.1" - ] - }, - "execution_count": 95, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "# Update the table with the OTU2 count\n", - "table = pd.DataFrame([[0.1, 10.1, 100.1], [3.1, 6.1, 9.1], [0.1, 1.1, 0.1]], columns=['OTU1', 'OTU2', 'OTU3'], \n", - " index=['Source1', 'Source2', 'Unknown'])\n", - "table" - ] - }, - { - "cell_type": "code", - "execution_count": 96, - "metadata": { - "collapsed": false - }, - "outputs": [ - { - "data": { - "text/html": [ - "
\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
OTU1OTU2OTU3
Source10.0303030.5838150.915828
Source20.9393940.3526010.083257
Unknown0.0303030.0635840.000915
\n", - "
" - ], - "text/plain": [ - " OTU1 OTU2 OTU3\n", - "Source1 0.030303 0.583815 0.915828\n", - "Source2 0.939394 0.352601 0.083257\n", - "Unknown 0.030303 0.063584 0.000915" - ] - }, - "execution_count": 96, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "# and recalculate the p_tv values\n", - "table.loc['sum'] = table.sum(axis=0)\n", - "table_p_tv = table.loc[['Source1', 'Source2', 'Unknown']].div(table.loc['sum'], axis=1)\n", - "table_p_tv" - ] - }, - { - "cell_type": "code", - "execution_count": 98, - "metadata": { - "collapsed": false - }, - "outputs": [ - { - "data": { - "text/plain": [ - "Counter({1: 3, 2: 2, 3: 5})" - ] - }, - "execution_count": 98, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "# Update the p_v values\n", - "# change the 4th index to community 3\n", - "sink_source = [3, 1, 2, 3, 3, 3, 3, 1, 2, 1]\n", - "Counter(sink_source)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "And now:" - ] - }, - { - "cell_type": "code", - "execution_count": 99, - "metadata": { - "collapsed": true - }, - "outputs": [], - "source": [ - "sink_otus = [1, 1, 1, 2, 2, 2, 2, 2, 2, 3]\n", - "sink_otus_positions_shuffled = [4, 1, 8, 2, 7, 9, 0, 3, 5, 6]" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "We'd move on to index position 1, and repeat the entire process" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "After so many iterations, or **burn ins**, we can start to record the p_v source environment counts as the percent contribution of each Source to our sink sample. The **delay** determines how many iterations to skip between draws after the burnin period. The **draws_per_restart** indicate how many draws to save from each restart. All draws are then averaged together to provide a final mixing proportion." - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 2", - "language": "python", - "name": "python2" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 2 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython2", - "version": "2.7.6" - } - }, - "nbformat": 4, - "nbformat_minor": 0 -} diff --git a/sourcetracker/_cli/gibbs.py b/sourcetracker/_cli/gibbs.py index 29fb7b8..af73e1f 100755 --- a/sourcetracker/_cli/gibbs.py +++ b/sourcetracker/_cli/gibbs.py @@ -10,20 +10,17 @@ from __future__ import division -import glob import os import click -import numpy as np -from functools import partial import subprocess import time +import pandas as pd from biom import load_table -from biom.table import Table from sourcetracker._cli import cli -from sourcetracker._sourcetracker import ( - parse_mapping_file, collapse_sources, sinks_and_sources, - _cli_sync_biom_and_sample_metadata, _cli_collate_results, _cli_loo_runner, - _cli_sink_source_prediction_runner, subsample_sources_sinks) +from sourcetracker._sourcetracker import (biom_to_df, _gibbs, _gibbs_loo, + intersect_and_sort_samples, + get_samples, collapse_source_data, + subsample_dataframe) from ipyparallel import Client @@ -66,11 +63,11 @@ help=('Count to be added to each species in each environment, ' 'including `unknown`.')) @click.option('--source_rarefaction_depth', required=False, default=1000, - type=click.INT, show_default=True, + type=click.IntRange(min=0, max=None), show_default=True, help=('Depth at which to rarify sources. If 0, no ' 'rarefaction performed.')) @click.option('--sink_rarefaction_depth', required=False, default=1000, - type=click.INT, show_default=True, + type=click.IntRange(min=0, max=None), show_default=True, help=('Depth at which to rarify sinks. If 0, no ' 'rarefaction performed.')) @click.option('--restarts', required=False, default=10, @@ -105,6 +102,14 @@ 'has been successfully started, the jobs will not be ' 'successfully launched. If this is happening, increase ' 'this parameter.')) +@click.option('--per_sink_feature_assignments', required=False, default=False, + is_flag=True, show_default=True, + help=('If True, this option will cause SourceTracker2 to write ' + 'out a feature table for each sink (or source if `--loo` ' + 'is passed). These feature tables contain the specific ' + 'sequences that contributed to a sink from a given ' + 'source. This option can be memory intensive if there are ' + 'a large number of features.')) @click.option('--source_sink_column', required=False, default='SourceSink', type=click.STRING, show_default=True, help=('Sample metadata column indicating which samples should be' @@ -122,10 +127,10 @@ help=('Sample metadata column indicating the type of each ' 'source sample.')) def gibbs(table_fp, mapping_fp, output_dir, loo, jobs, alpha1, alpha2, beta, - source_rarefaction_depth, sink_rarefaction_depth, - restarts, draws_per_restart, burnin, delay, cluster_start_delay, - source_sink_column, source_column_value, sink_column_value, - source_category_column): + source_rarefaction_depth, sink_rarefaction_depth, restarts, + draws_per_restart, burnin, delay, cluster_start_delay, + per_sink_feature_assignments, source_sink_column, + source_column_value, sink_column_value, source_category_column): '''Gibb's sampler for Bayesian estimation of microbial sample sources. For details, see the project README file. @@ -134,96 +139,101 @@ def gibbs(table_fp, mapping_fp, output_dir, loo, jobs, alpha1, alpha2, beta, # failed if so. os.mkdir(output_dir) - # Load the mapping file and biom table and remove samples which are not - # shared. - o = open(mapping_fp, 'U') - sample_metadata_lines = o.readlines() - o.close() - - sample_metadata, biom_table = \ - _cli_sync_biom_and_sample_metadata( - parse_mapping_file(sample_metadata_lines), - load_table(table_fp)) - - # If biom table has fractional counts, it can produce problems in indexing - # later on. - biom_table.transform(lambda data, id, metadata: np.ceil(data)) - - # If biom table has sample metadata, there will be pickling errors when - # submitting multiple jobs. We remove the metadata by making a copy of the - # table without metadata. - biom_table = Table(biom_table._data.toarray(), - biom_table.ids(axis='observation'), - biom_table.ids(axis='sample')) - - # Parse the mapping file and options to get the samples requested for - # sources and sinks. - source_samples, sink_samples = sinks_and_sources( - sample_metadata, column_header=source_sink_column, - source_value=source_column_value, sink_value=sink_column_value) + # Load the metadata file and feature table. Do a data check on the + # feature_table. + sample_metadata = pd.read_table(mapping_fp, sep='\t', dtype=object) + sample_metadata.set_index(sample_metadata.columns[0], drop=True, + append=False, inplace=True) + _ft = load_table(table_fp) + feature_table = biom_to_df(_ft, apply_fractional_value_correction=True) + + # Remove samples not shared by both feature and metadata tables and order + # rows equivalently. + sample_metadata, feature_table = \ + intersect_and_sort_samples(sample_metadata, feature_table) + + # Identify source and sink samples. + source_samples = get_samples(sample_metadata, source_sink_column, + source_column_value) + sink_samples = get_samples(sample_metadata, source_sink_column, + sink_column_value) # If we have no source samples neither normal operation or loo will work. # Will also likely get strange errors. if len(source_samples) == 0: - raise ValueError('Mapping file or biom table passed contain no ' - '`source` samples.') + raise ValueError(('You passed %s as the `source_sink_column` and %s ' + 'as the `source_column_value`. There are no samples ' + 'which are sources under these values. Please see ' + 'the help documentation and check your mapping ' + 'file.') % (source_sink_column, source_column_value)) # Prepare the 'sources' matrix by collapsing the `source_samples` by their # metadata values. - sources_envs, sources_data = collapse_sources(source_samples, - sample_metadata, - source_category_column, - biom_table, sort=True) - - # Rarefiy data if requested. - sources_data, biom_table = \ - subsample_sources_sinks(sources_data, sink_samples, biom_table, - source_rarefaction_depth, - sink_rarefaction_depth) - - # Build function that require only a single parameter -- sample -- to - # enable parallel processing if requested. - if loo: - f = partial(_cli_loo_runner, source_category=source_category_column, - alpha1=alpha1, alpha2=alpha2, beta=beta, - restarts=restarts, draws_per_restart=draws_per_restart, - burnin=burnin, delay=delay, - sample_metadata=sample_metadata, - sources_data=sources_data, sources_envs=sources_envs, - biom_table=biom_table, output_dir=output_dir) - sample_iter = source_samples - else: - f = partial(_cli_sink_source_prediction_runner, alpha1=alpha1, - alpha2=alpha2, beta=beta, restarts=restarts, - draws_per_restart=draws_per_restart, burnin=burnin, - delay=delay, sources_data=sources_data, - biom_table=biom_table, output_dir=output_dir) - sample_iter = sink_samples - + csources = collapse_source_data(sample_metadata, feature_table, + source_samples, source_category_column, + 'mean') + + # Rarify collapsed source data if requested. + if source_rarefaction_depth > 0: + d = (csources.sum(1) >= source_rarefaction_depth) + if not d.all(): + print(csources.sum(1)) + too_shallow = (~d).sum() + shallowest = csources.sum(1).min() + raise ValueError(('You requested rarefaction of source samples at ' + '%s, but there are %s collapsed source samples ' + 'that have less sequences than that. The ' + 'shallowest of these is %s sequences.') % + (source_rarefaction_depth, too_shallow, + shallowest)) + else: + csources = subsample_dataframe(csources, source_rarefaction_depth) + + # Prepare sink data and rarify if requested. + sinks = feature_table.loc[sink_samples, :] + if sink_rarefaction_depth > 0: + d = (sinks.sum(1) >= sink_rarefaction_depth) + if not d.all(): + too_shallow = (~d).sum() + shallowest = sinks.sum(1).min() + raise ValueError(('You requested rarefaction of sink samples at ' + '%s, but there are %s sink samples that have ' + 'less sequences than that. The shallowest of ' + 'these is %s sequences.') % + (sink_rarefaction_depth, too_shallow, + shallowest)) + else: + sinks = subsample_dataframe(sinks, sink_rarefaction_depth) + + # If we've been asked to do multiple jobs, we need to spin up a cluster. if jobs > 1: # Launch the ipcluster and wait for it to come up. subprocess.Popen('ipcluster start -n %s --quiet' % jobs, shell=True) time.sleep(cluster_start_delay) - c = Client() - c[:].map(f, sample_iter, block=True) - # Shut the cluster down. Answer taken from SO: - # http://stackoverflow.com/questions/30930157/stopping-ipcluster-engines-ipython-parallel - c.shutdown(hub=True) + cluster = Client() else: - for sample in sample_iter: - f(sample) - - # Format results for output. - samples = [] - samples_data = [] - for sample_fp in glob.glob(os.path.join(output_dir, '*')): - samples.append(sample_fp.strip().split('/')[-1].split('.txt')[0]) - samples_data.append(np.loadtxt(sample_fp, delimiter='\t')) - mp, mps = _cli_collate_results(samples, samples_data, sources_envs) - - o = open(os.path.join(output_dir, 'mixing_proportions.txt'), 'w') - o.writelines(mp) - o.close() - o = open(os.path.join(output_dir, 'mixing_proportions_stds.txt'), 'w') - o.writelines(mps) - o.close() + cluster = None + + # Run the computations. + if not loo: + mpm, mps, fas = \ + _gibbs(csources, sinks, alpha1, alpha2, beta, restarts, + draws_per_restart, burnin, delay, cluster=cluster, + create_feature_tables=per_sink_feature_assignments) + else: + mpm, mps, fas = \ + _gibbs_loo(csources, alpha1, alpha2, beta, restarts, + draws_per_restart, burnin, delay, cluster=cluster, + create_feature_tables=per_sink_feature_assignments) + # If we started a cluster, shut it down. + if jobs > 1: + cluster.shutdown(hub=True) + + # Write results. + mpm.to_csv(os.path.join(output_dir, 'mixing_proportions.txt'), sep='\t') + mps.to_csv(os.path.join(output_dir, 'mixing_proportions_stds.txt'), + sep='\t') + if per_sink_feature_assignments: + for sink, fa in zip(mpm.index, fas): + fa.to_csv(os.path.join(output_dir, sink + '.feature_table.txt'), + sep='\t') diff --git a/sourcetracker/_sourcetracker.py b/sourcetracker/_sourcetracker.py index 9cf3439..2bb4140 100644 --- a/sourcetracker/_sourcetracker.py +++ b/sourcetracker/_sourcetracker.py @@ -9,278 +9,241 @@ # ---------------------------------------------------------------------------- from __future__ import division -import os -from copy import copy import numpy as np from skbio.stats import subsample_counts import pandas as pd from functools import partial -from biom.table import Table -import glob -from tempfile import TemporaryDirectory -def parse_mapping_file(mf_lines): - """Parse lines from a mapping file into a nested dictionary. - - This function copies qiime.parse_mapping_file_to_dict, but does _not_ allow - commented lines at the top of a mapping file. +def check_and_correct_data(feature_table, apply_fractional_value_correction): + '''Check and correct problems in the `feature_table`. Parameters ---------- - mf_lines : list - Each entry of the list is a tab delimited string corresponding to a row - of the mapping file. + feature_table : pd.DataFrame + Feature table where rows are features and columns are samples. + apply_fractional_value_correction : boolean + Fractional counts cause errors in indexing. Uses np.ceil to remove + them if True. Returns ------- - results : dict - """ - headers = mf_lines[0].strip().split('\t')[1:] - results = {} - for line in mf_lines[1:]: - tmp = line.strip().split('\t') - results[tmp[0]] = {k: v.strip() for k, v in zip(headers, tmp[1:])} - return results + feature_table : pd.DataFrame + Corrected feature table if changes have been made or original table if + no changes occurred. Will automatically be cast as dtype=np.int64 + Raises + ------ + ValueError + If data contains values that Gibb's sampling cannot handle. + ''' + if not np.isreal(feature_table.dtypes).all(): + raise TypeError('Feature table contains one or more columns which are ' + 'not numeric type. This is likely due to boolean, ' + 'string, or other data types being present. Data must ' + 'contain exclusively real-valued integers or floats.') + + if np.isnan(feature_table.values).any(): + raise ValueError('One or more values in the feature table is a `nan` ' + 'or `null` value. Data must contain exclusively real-' + 'valued integers or floats.') + + if not (np.ceil(feature_table.values) == feature_table.values).all(): + if apply_fractional_value_correction is True: + return pd.DataFrame(np.ceil(feature_table.values).astype(np.int64), + index=feature_table.index, + columns=feature_table.columns) + else: + raise ValueError('Non-integer data in the feature table is not ' + 'being corrected by `check_and_correct_data`. ' + 'Data leaving this function must be np.int64 ' + 'type. Either correct the table or pass the ' + '`apply_fractional_value_correction` to ' + '`check_and_correct_data`.') + return feature_table.astype(np.int64) -def collapse_sources(samples, sample_metadata, category, biom_table, - sort=True): - """Collapse sources based on metadata under `category`. + +def biom_to_df(biom_table, apply_fractional_value_correction=True): + '''Turn biom table into dataframe, correcting fractional counts. Parameters ---------- - samples : list - Sample id's that contain data to be summed and collapsed. - sample_metadata : dict - A mapping file that has been parsed with parse_mapping_file. The - keys are sample ids, and the values are dicts with each header in the - mapping file as a key and the values from that sample under that - header. - category : str - Header in the mapping file (and thus each second level dictionary of - `sample_metadata`) that identifies the source environment of each - sample. biom_table : biom.table.Table - A biom table containing source data. - sort : boolean, optional - If true, the set of sources will be sorted using np.argsort. + Biom table. + apply_fractional_value_correction : boolean + Fractional counts cause errors in indexing. Uses np.ceil to remove + them if True. Returns ------- - envs : np.array - Source environments in the same order as collapsed_sources. - collapsed_sources: np.array - Columns are features (OTUs), rows are collapsed samples. The [i,j] - entry is the sum of the counts of features j in all samples which were - considered part of source i. + feature_table : pd.DataFrame + Contingency table with rows, columns = samples, features. + ''' + table = pd.DataFrame(biom_table._data.toarray().T, + index=biom_table.ids(axis='sample'), + columns=biom_table.ids(axis='observation')) + return check_and_correct_data(table, apply_fractional_value_correction) - Notes - ----- - `biom_table` will have S samples = [s0, s1, ... sn]. Each of these samples - has a metadata value under `category` = [c0, c1, ... cn]. This function - will produce an array with K rows, where the ith row has the data from the - ith member of the set of category values. As an example: - - Input table data: - s1 s2 s3 s4 - o1 10 50 10 70 - o2 0 25 10 5 - o3 0 25 10 5 - o4 100 0 10 5 - - mf: - cat1 - s1 A - s2 A - s3 A - s4 B - - result of using 'cat1': - - Collapsed table data: - o1 o2 o3 o4 - A 70 35 35 110 - B 70 5 5 5 - """ - envs = [] - sample_groups = [] - for s in samples: - env = sample_metadata[s][category] - try: - idx = envs.index(env) - sample_groups[idx].append(s) - except ValueError: - envs.append(env) - idx = envs.index(env) - sample_groups.append([]) - sample_groups[idx].append(s) - - collapsed_sources = np.zeros((len(envs), biom_table.shape[0]), dtype=int) - for row, group in enumerate(sample_groups): - for sample in group: - collapsed_sources[row] += biom_table.data(sample, axis='sample', - dense=True).astype(int) - if sort: - indices = np.argsort(envs) - return np.array(envs)[indices], collapsed_sources[indices] + +def intersect_and_sort_samples(sample_metadata, feature_table): + '''Return input tables retaining only shared samples, row order equivalent. + + Parameters + ---------- + sample_metadata : pd.DataFrame + Contingency table with rows, columns = samples, metadata. + feature_table : pd.DataFrame + Contingency table with rows, columns = samples, features. + + Returns + ------- + sample_metadata, feature_table : pd.DataFrame, pd.DataFrame + Input tables with unshared samples removed and ordered equivalently. + + Raises + ------ + ValueError + If no shared samples are found. + ''' + shared_samples = np.intersect1d(sample_metadata.index, feature_table.index) + if shared_samples.size == 0: + raise ValueError('There are no shared samples between the feature ' + 'table and the sample metadata. Ensure that you have ' + 'passed the correct files.') + elif (shared_samples.size == sample_metadata.shape[0] == + feature_table.shape[0]): + s_metadata = sample_metadata.copy() + s_features = feature_table.copy() else: - return np.array(envs), collapsed_sources + s_metadata = sample_metadata.loc[np.in1d(sample_metadata.index, + shared_samples), :].copy() + s_features = feature_table.loc[np.in1d(feature_table.index, + shared_samples), :].copy() + return s_metadata, s_features.loc[s_metadata.index, :] -def subsample_sources_sinks(sources_data, sinks, feature_table, sources_depth, - sinks_depth): - '''Rarify data for sources and sinks. +def get_samples(sample_metadata, col, value): + '''Return samples which have `value` under `col`.''' + return sample_metadata.index[sample_metadata[col] == value].copy() - Notes - ----- - This function rarifies `sources_data` to `sources_depth`, and `sinks` in - `feature_table` to `sink_depth`. This function is neccesary because of - ipyparallel and the partial functions. + +def collapse_source_data(sample_metadata, feature_table, source_samples, + category, method): + '''Collapse each set of source samples into an aggregate source. Parameters ---------- - sources_data : np.array - Two dimensional array with collapsed source data. - sinks : np.array - One dimensional array of strings, with each string being the sample ID - of a sink in `feature_table`. - feature_table : biom.table.Table - Biom table containing data for `sinks` to be rarified. - sources_depth : int - Depth at which to subsample each source. If 0, no rarefaction will be - performed. - sinks_depth : int - Depth at which to subsample each sink. If 0, no rarefaction will be - performed. + sample_metadata : pd.DataFrame + Contingency table where rows are features and columns are metadata. + feature_table : pd.DataFrame + Contingency table where rows are features and columns are samples. + source_samples : iterable + Samples which should be considered for collapsing (i.e. are sources). + category : str + Column in `sample_metadata` which should be used to group samples. + method : str + One of the available aggregation methods in pd.DataFrame.agg (mean, + median, prod, sum, std, var). Returns ------- - rsd : np.array - Rarified `sources_data`. - rft : biom.table.Table - `feature_table` with samples identified in `sinks` rarified. - ''' - # Check that supplied depths do not exceed available sequences. Cryptic - # errors will be raised otherwise. - if sources_depth > 0 and (sources_data.sum(1) < sources_depth).any(): - raise ValueError('Invalid rarefaction depth for source data. There ' - 'are not enough sequences in at least one collapsed ' - 'source.') - if sinks_depth > 0: - for sample in sinks: - if feature_table.data(sample, axis='sample').sum() < sinks_depth: - raise ValueError('Invalid rarefaction depth for sink data. ' - 'There are not enough sequences in at least ' - 'one sink.') - - # Rarify source data. - if sources_depth == 0: - rsd = sources_data - else: - rsd = np.empty(sources_data.shape, dtype=np.float64) - for row in range(sources_data.shape[0]): - rsd[row] = subsample_counts(sources_data[row], sources_depth, - replace=False) - # Rarify sinks data in the biom table. - if sinks_depth == 0: - rft = feature_table - else: - # We'd like to use Table.subsample, but it removes features that have - # 0 count across every sample, which changes the size of the matrix. - # rft = feature_table.filter(sinks, axis='sample', inplace=False) - # rft = rft.subsample(sinks_depth) - def _rfx(data, sid, md): - if sid in sinks: - return subsample_counts(data.astype(np.int64), sinks_depth, - replace=False) - else: - return data - rft = feature_table.transform(_rfx, axis='sample', inplace=False) - return rsd, rft - - -class Sampler(object): - def __init__(self, sink_data, num_sources): - """Initialize Sampler. + pd.DataFrame + Collapsed sample data. - Parameters - ---------- - sink_data : np.array - Counts of each OTU in given sink. - num_sources : int - Number of sources, including unknown. + Notes + ----- + This function calls `check_and_correct_data` before returning the collapsed + source table. This is required in case the aggregation function causes + nans or non-integer data to be returned. - Attributes - ---------- - sink_data : np.array - Sink data (see Parameters). - num_sources : int - Number of source environments. - num_features : int - Number of features. - sum : int - Total number of sequences in sink sample. - taxon_sequence : np.array - Bookkeeping array that contains an entry for each sequence so that - their source assignments can be manipulated. Not set until - `generate_taxon_sequence` is called. - seq_env_assignments : np.array - Vector of integers where the ith entry is the source environment - index of the ith sequence. Not set until - `generate_environment_assignments` is called. - envcounts : np.array - Total number of sequences assigned to each source environment. Not - set until `generate_environment_assignments` is called. - """ - self.sink_data = sink_data.astype(np.int64) - self.sum = self.sink_data.sum() - self.num_sources = num_sources - self.num_features = sink_data.shape[0] + The order of the collapsed sources is determined by the sort order of their + names. For instance, in the example below, .4 comes before 3.0 so the + collapsed sources will have the 0th row as .4. - def generate_taxon_sequence(self): - """Generate vector of taxa containing each sequence in self.sink_data. + Examples + -------- + >>> samples = ['sample1', 'sample2', 'sample3', 'sample4'] + >>> category = 'pH' + >>> values = [3.0, 0.4, 3.0, 3.0] + >>> stable = pd.DataFrame(values, index=samples, columns = [category]) + >>> stable + pH + sample1 3.0 + sample2 0.4 + sample3 3.0 + sample4 3.0 + + >>> fdata = np.array([[ 10, 50, 10, 70], + [ 0, 25, 10, 5], + [ 0, 25, 10, 5], + [100, 0, 10, 5]]) + >>> ftable = pd.DataFrame(fdata, index = stable.index) + >>> ftable + 0 1 2 3 + sample1 10 50 10 70 + sample2 0 25 10 5 + sample3 0 25 10 5 + sample4 100 0 10 5 + + >>> source_samples = ['sample1', 'sample2', 'sample3'] + >>> method = 'sum' + >>> csources = collapse_source_data(stable, ftable, source_samples, + category, method) + >>> csources + 0 1 2 3 + collapse_col + 0.4 0 25 10 5 + 3.0 10 75 20 75 + ''' + sources = sample_metadata.loc[source_samples, :] + table = feature_table.loc[sources.index, :].copy() + table['collapse_col'] = sources[category] + return check_and_correct_data(table.groupby('collapse_col').agg(method), + True) - Notes - ----- - This function is used for book-keeping. The ith value of taxon_sequence - is the (integer) index of an OTU in self.sink_data. As an example, if - the sink data were [2, 0, 0, 4] (OTU_0 has an abundance of 2 and OTU_3 - has an abundance of 4), the taxon sequence would be [0, 0, 3, 3, 3, 3]. - """ - self.taxon_sequence = np.empty(self.sum, dtype=np.int32) - k = 0 - for ind, num in enumerate(self.sink_data): - self.taxon_sequence[k: k + num] = ind - k += num - def generate_environment_assignments(self): - """Randomly assign each sequence in the sink to a source environment. +def subsample_dataframe(df, depth): + '''Subsample (rarify) input dataframe without replacement. - Notes - ----- - Assignments to a source are made from a uniform distribtuion. + Parameters + ---------- + df : pd.DataFrame + Feature table where rows are features and columns are samples. + depth : int + Number of sequences to choose per sample. - """ - choices = np.arange(self.num_sources) - self.seq_env_assignments = np.random.choice(choices, size=self.sum, - replace=True) - self.envcounts = np.bincount(self.seq_env_assignments, - minlength=self.num_sources) + Returns + ------- + pd.DataFrame + Subsampled dataframe. + ''' + f = partial(subsample_counts, n=depth, replace=False) + return df.apply(f, axis=1, reduce=False, raw=False) - def seq_assignments_to_contingency_table(self): - """Return contingency table built from `self.taxon_sequence`. - Returns - ------- - ct : np.array - Two dimensional array with rows as features, sources as columns. - The [i, j] entry is the count of feature i assigned to source j. - """ - ct = np.zeros((self.num_features, self.num_sources)) - for i in range(int(self.sum)): - ct[self.taxon_sequence[i], self.seq_env_assignments[i]] += 1 - return ct +def generate_environment_assignments(n, num_sources): + '''Randomly assign `n` counts to one of `num_sources` environments. + + Parameters + ---------- + n : int + Number of environment assignments to generate. + num_sources : int + Number of possible environment states (this includes the 'Unknown'). + + Returns + ------- + seq_env_assignments : np.array + 1D vector of length `n`. The ith entry is the environment assignment of + the ith feature. + envcounts : np.array + 1D vector of length `num_sources`. The ith entry is the total number of + entries in `seq_env_assignments` which are equal to i. + ''' + seq_env_assignments = np.random.choice(np.arange(num_sources), size=n, + replace=True) + envcounts = np.bincount(seq_env_assignments, minlength=num_sources) + return seq_env_assignments, envcounts class ConditionalProbability(object): @@ -290,7 +253,7 @@ def __init__(self, alpha1, alpha2, beta, source_data): Paramaters ---------- alpha1 : float - Prior counts of each species in the training environments. Higher + Prior counts of each feature in the training environments. Higher values decrease the trust in the training environments, and make the source environment distributions over taxa smoother. By default, this is set to 0.001, which indicates reasonably high @@ -299,14 +262,14 @@ def __init__(self, alpha1, alpha2, beta, source_data): samples are available from a source environment. A more conservative value would be 0.01. alpha2 : float - Prior counts of each species in the Unknown environment. Higher + Prior counts of each feature in the Unknown environment. Higher values make the Unknown environment smoother and less prone to overfitting given a training sample. beta : float - Number of prior counts of test sequences from each OTU in each + Number of prior counts of test sequences from each feature in each environment source_data : np.array - Columns are features (OTUs), rows are collapsed samples. The [i,j] + Columns are features, rows are collapsed samples. The [i,j] entry is the sum of the counts of features j in all samples which were considered part of source i. @@ -348,9 +311,9 @@ def __init__(self, alpha1, alpha2, beta, source_data): reference [1]_ (with modifications based on communications with the author). Since the calculation of the conditional probability must occur during each pass of the Gibbs sampler, reducing the number of - computations is of paramount concern. This class precomputes everything - that is static throughout a run of the sampler to reduce the innermost - for loop computations. + computations is of paramount importance. This class precomputes + everything that is static throughout a run of the sampler to reduce the + innermost for-loop computations. The formula used to calculate the conditional joint probability is described in the project readme file. @@ -372,14 +335,14 @@ def __init__(self, alpha1, alpha2, beta, source_data): >>> cp.precompute() Now we can compute the 'slice' of the conditional probability depending on the current state of the test sequences (the ones randomly assigned - and then iteratively reassigned) and which taxon (the slice) the + and then iteratively reassigned) and which feature (the slice) the sequence we have removed was from. >>> xi = 2 - Count of the training sequences (that are taxon xi) currently assigned - to the unkown environment. + Count of the training sequences (that are feature xi) currently + assigned to the unknown environment. >>> m_xiV = 38 - Sum of the training sequences currently assigned to the unkown - environment (over all taxa). + Sum of the training sequences currently assigned to the unknown + environment (over all features). >>> m_V = 158 Counts of the test sequences in each environment at the current iteration of the sampler. @@ -396,13 +359,14 @@ def __init__(self, alpha1, alpha2, beta, source_data): self.alpha1 = alpha1 self.alpha2 = alpha2 self.beta = beta - self.m_xivs = source_data - self.m_vs = np.expand_dims(source_data.sum(1), axis=1) + self.m_xivs = source_data.astype(np.float64) + self.m_vs = np.expand_dims(source_data.sum(1), + axis=1).astype(np.float64) self.V = source_data.shape[0] + 1 self.tau = source_data.shape[1] # Create the joint probability vector which will be overwritten each # time self.calculate_cp_slice is called. - self.joint_probability = np.zeros(self.V) + self.joint_probability = np.zeros(self.V, dtype=np.float64) def set_n(self, n): """Set the sum of the sink.""" @@ -417,11 +381,11 @@ def precalculate(self): # We are going to be accessing columns of this array in the innermost # loop of the Gibbs sampler. By forcing this array into 'F' order - - # 'Fortran-contiguous' - we've set it so that acessing column slices is - # faster. Tests indicate about 2X speed up in this operation from 'F' - # order as opposed to the default 'C' order. + # 'Fortran-contiguous' - we've set it so that accessing column slices + # is faster. Tests indicate about 2X speed up in this operation from + # 'F' order as opposed to the default 'C' order. self.known_source_cp = np.array(self.known_p_tv / self.denominator_p_v, - order='F') + order='F', dtype=np.float64) self.alpha2_n = self.alpha2 * self.n self.alpha2_n_tau = self.alpha2_n * self.tau @@ -436,9 +400,9 @@ def calculate_cp_slice(self, xi, m_xiV, m_V, n_vnoti): that should be calculated. m_xiV : float Count of the training sequences (that are taxon xi) currently - assigned to the unkown environment. + assigned to the unknown environment. m_V : float - Sum of the training sequences currently assigned to the unkown + Sum of the training sequences currently assigned to the unknown environment (over all taxa). n_vnoti : float Counts of the test sequences in each environment at the current @@ -460,16 +424,16 @@ def calculate_cp_slice(self, xi, m_xiV, m_V, n_vnoti): return self.joint_probability -def gibbs_sampler(cp, sink, restarts, draws_per_restart, burnin, delay): +def gibbs_sampler(sink, cp, restarts, draws_per_restart, burnin, delay): """Run Gibbs Sampler to estimate feature contributions from a sink sample. Parameters ---------- - cp : ConditionalProbability object - Instantiation of the class handling probability calculations. sink : np.array A one dimentional array containing counts of features whose sources are to be estimated. + cp : ConditionalProbability object + Instantiation of the class handling probability calculations. restarts : int Number of independent Markov chains to grow. `draws_per_restart` * `restarts` gives the number of samplings of the mixing proportions that @@ -492,31 +456,35 @@ def gibbs_sampler(cp, sink, restarts, draws_per_restart, burnin, delay): Returns ------- - mixing_proportions : np.array - A two dimensional array containing the estimated source proportions. - The [i,j] entry is the estimated proportion of the sink sample coming - from source environment j calculated during draw i. - calculated_assignments : np.array - A three dimensional array containing the source environment assignments - for each sequence in the sample. The first dimension of the array is - the 'draw'; this is analagous to the first dimension in - sink_proportions. The second and third dimensions are features and - sources, respectively. Thus the [i,j,k] entry is the number of - sequences from the sink sample that were taxon j, from environment k, - in the ith independent draw. + final_envcounts : np.array + 2D array of ints. Rows are draws, columns are sources. The [i, j] entry + is the number of sequences from draw i that where assigned to have come + from environment j. + final_env_assignments : np.array + 2D array of ints. Rows are draws, columns are conserved but arbitrary + ordering. The [i, j] entry is the index of feature j in draw i. These + orderings are identical for each draw. + final_taxon_assignments : np.array + 2D array of ints. Rows are draws, columns are conserved but arbitrary + ordering (same ordering as `final_env_assignments`). The [i, j] entry + is the environment that the taxon `final_env_assignments[i, j]` is + determined to have come from in draw i (j is the environment). """ # Basic bookkeeping information we will use throughout the function. num_sources = cp.V num_features = cp.tau source_indices = np.arange(num_sources) + sink = sink.astype(np.int32) + sink_sum = sink.sum() # Calculate the number of passes that need to be conducted. total_draws = restarts * draws_per_restart total_passes = burnin + (draws_per_restart - 1) * delay + 1 # Results containers. - mixing_proportions = np.empty((total_draws, num_sources)) - calculated_assignments = np.empty((total_draws, num_features, num_sources)) + final_envcounts = np.zeros((total_draws, num_sources), dtype=np.int32) + final_env_assignments = np.zeros((total_draws, sink_sum), dtype=np.int32) + final_taxon_assignments = np.zeros((total_draws, sink_sum), dtype=np.int32) # Sequences from the sink will be randomly assigned a source environment # and then reassigned based on an increasingly accurate set of @@ -526,14 +494,14 @@ def gibbs_sampler(cp, sink, restarts, draws_per_restart, burnin, delay): # receive more accurate reassignments by virtue of more updates to the # probability model. 'order' will be shuffled each pass, but can be # instantiated here to avoid unnecessary duplication. - sink_sum = sink.sum() - order = np.arange(sink_sum, dtype=np.int) + order = np.arange(sink_sum, dtype=np.int32) # Create a bookkeeping vector that keeps track of each sequence in the # sink. Each one will be randomly assigned an environment, and then - # reassigned based on the increasinly accurate distribution. - sampler = Sampler(sink, num_sources) - sampler.generate_taxon_sequence() + # reassigned based on the increasinly accurate distribution. sink[i] i's + # will be placed in the `taxon_sequence` vector to allow each individual + # count to be removed and reassigned. + taxon_sequence = np.repeat(np.arange(num_features), sink).astype(np.int32) # Update the conditional probability class now that we have the sink sum. cp.set_n(sink_sum) @@ -546,17 +514,18 @@ def gibbs_sampler(cp, sink, restarts, draws_per_restart, burnin, delay): for restart in range(restarts): # Generate random source assignments for each sequence in the sink # using a uniform distribution. - sampler.generate_environment_assignments() + seq_env_assignments, envcounts = \ + generate_environment_assignments(sink_sum, num_sources) # Initially, the count of each taxon in the 'unknown' source should be # 0. - unknown_vector = np.zeros(num_features) + unknown_vector = np.zeros(num_features, dtype=np.int32) unknown_sum = 0 # If a sequence's random environmental assignment is to the 'unknown' # environment we alter the training data to include those sequences # in the 'unknown' source. - for e, t in zip(sampler.seq_env_assignments, sampler.taxon_sequence): + for e, t in zip(seq_env_assignments, taxon_sequence): if e == unknown_idx: unknown_vector[t] += 1 unknown_sum += 1 @@ -569,12 +538,12 @@ def gibbs_sampler(cp, sink, restarts, draws_per_restart, burnin, delay): np.random.shuffle(order) for seq_index in order: - e = sampler.seq_env_assignments[seq_index] - t = sampler.taxon_sequence[seq_index] + e = seq_env_assignments[seq_index] + t = taxon_sequence[seq_index] # Remove the ith sequence and update the probability # associated with that environment. - sampler.envcounts[e] -= 1 + envcounts[e] -= 1 if e == unknown_idx: unknown_vector[t] -= 1 unknown_sum -= 1 @@ -583,331 +552,154 @@ def gibbs_sampler(cp, sink, restarts, draws_per_restart, burnin, delay): # removal of the ith sequence. Scale this probability vector # for use by np.random.choice. jp = cp.calculate_cp_slice(t, unknown_vector[t], unknown_sum, - sampler.envcounts) + envcounts) # Reassign the sequence to a new source environment and # update counts for each environment and the unknown source # if necessary. - jp_sum = jp.sum() - new_e_idx = np.random.choice(source_indices, p=jp / jp_sum) + new_e_idx = np.random.choice(source_indices, p=jp / jp.sum()) + + seq_env_assignments[seq_index] = new_e_idx + envcounts[new_e_idx] += 1 - sampler.seq_env_assignments[seq_index] = new_e_idx - sampler.envcounts[new_e_idx] += 1 if new_e_idx == unknown_idx: unknown_vector[t] += 1 unknown_sum += 1 if rep > burnin and ((rep-burnin) % delay) == 1: - # Calculate proportion of sample from each source and record - # this as the source proportions for this sink. - prps = sampler.envcounts / sink_sum - prps_sum = prps.sum() - mixing_proportions[drawcount] = prps/prps_sum - - # Each sequence in the sink sample has been assigned to a - # source environment. Convert this information into a - # contingency table. - calculated_assignments[drawcount] = \ - sampler.seq_assignments_to_contingency_table() + # Update envcounts array with the assigned envs. + final_envcounts[drawcount] = envcounts + + # Assign vectors necessary for feature table reconstruction. + final_env_assignments[drawcount] = seq_env_assignments + final_taxon_assignments[drawcount] = taxon_sequence # We've made a draw, update this index so that the next - # iteration will be placed in the correct slice of the - # mixing_proportions and calculated_assignments arrays. + # iteration will be placed in the correct index of results. drawcount += 1 - return mixing_proportions, calculated_assignments + return (final_envcounts, final_env_assignments, final_taxon_assignments) -def sinks_and_sources(sample_metadata, column_header="SourceSink", - source_value='source', sink_value='sink'): - '''Return lists of source and sink samples. +def _gibbs_loo(sources, alpha1, alpha2, beta, restarts, draws_per_restart, + burnin, delay, cluster=None, create_feature_tables=True): + '''Gibb's LOO sampling API - see _gibbs for documentation. Notes ----- - This function assumes that source and sink samples will be delineated by - having the value `source` or `sink` under the header `SourceSink` in - `sample_metadata`. - - Parameters - ---------- - sample_metadata : dict - Dictionary containing sample metadata in QIIME 1 sample metadata - mapping file format. - column_header : str, optional - Column in the mapping file that describes where a sample is a source - or a sink. - source_value : str, optional - Value that indicates a sample is a source. - sink_value : str, optional - Value that indicates a sample is a sink. - - Returns - ------- - source_samples : list - Samples that will be collapsed and used as 'Sources'. - sink_samples : list - Samples that will be used as 'Sinks'. + In leave-one-out (LOO) classification, each source is individually + considered as a sink, so there is no need to pass a sinks dataframe. ''' - sink_samples = [] - source_samples = [] - for sample, md in sample_metadata.items(): - if md[column_header] == sink_value: - sink_samples.append(sample) - elif md[column_header] == source_value: - source_samples.append(sample) - else: - pass - return source_samples, sink_samples - - -def _cli_sync_biom_and_sample_metadata(sample_metadata, biom_table): - """Reduce mapping file dict and biom table to shared samples. - - Notes - ----- - This function is used by the command line interface (CLI) to allow mapping - files and biom tables which have disjoint sets of samples to be used in the - script without forcing the user to keep multiple copies of either. - - Parameters - ---------- - sample_metadata : dict - Traditional parsed QIIME mapping file, i.e. nested dictionary with - sample ids as keys to a dictionary with mapping file headers as keys - and value for that sample the entries under those headers for that - sample. - biom_table : biom.Table.table - Biom table. - - Returns - ------- - nbiom_table : biom.Table.table - Biom table with shared samples only. - nsample_metadata : dict - Sample metadata dictionary with only shared samples. - - Raises - ------ - ValueError - If there are no shared samples a ValueError is raised. - """ - mf_samples = set(sample_metadata) - biom_table_samples = set(biom_table.ids(axis='sample')) - if mf_samples == biom_table_samples: - return sample_metadata, biom_table - else: - shared_samples = mf_samples.intersection(biom_table_samples) - if len(shared_samples) == 0: - err_text = ('No samples were shared between sample metadata and ' - 'biom file.') - raise ValueError(err_text) - # Remove samples that were in the mapping file but not biom file. - nsample_metadata = {k: v for k, v in sample_metadata.items() if k in - shared_samples} - - def _f(sv, sid, smd): - '''Remove samples in biom table that are not in mapping file.''' - return sid in shared_samples - nbiom_table = biom_table.filter(_f, axis='sample', inplace=False) - return nsample_metadata, nbiom_table - - -def _cli_collate_results(samples, sample_data, env_ids): - """Collate results from individual samples. - - Notes - ----- - This function is used by the command line interface (CLI) to allow - individually written samples to be collated into two results arrays. - - Parameters - ---------- - samples : list - Names of the samples (sinks) in the same order as their data in - `sample_data`. - sample_data : list - Arrays of sample data, where each array contains the contributions from - each source for each draw. In the same order as `samples`. - env_ids : np.array - An containing the source environment identifiers, generated from - `collapse_to_known_sources`. - - Returns - ------- - mean_lines : str - String representing contingency table containing mean contribution from - each environment for each sink. - std_lines : str - String representing contingency table containing standard deviation of - contribution from each environment for each sink. - """ - mixing_proportions = [] - mixing_proportions_stds = [] - for sample, data in zip(samples, sample_data): - mixing_proportions.append(data.mean(axis=0)) - mixing_proportions_stds.append(data.std(axis=0)) - - header = '\t'.join(['SampleID'] + env_ids.tolist() + ['Unknown']) - mean_lines = [header] - std_lines = [header] - - for sample_id, means, stds in zip(samples, mixing_proportions, - mixing_proportions_stds): - mean_lines.append('\t'.join([sample_id] + list(map(str, means)))) - std_lines.append('\t'.join([sample_id] + list(map(str, stds)))) - - return '\n'.join(mean_lines), '\n'.join(std_lines) - - -def _cli_single_sample_formatter(proportions): - """Prepare data from a Gibb's run on a single sample for writing. - - Notes - ----- - This function is used by the CLI to prepare for writing the results of the - Gibb's sampler on a single sink sample to disk. This function serves two - purposes: - 1) By writing the results of individual samples to disk, a failed - or interrupted run has all but the sample being currently worked on - saved. - 2) Computation for individual samples can be split amongst multiple - processors because each will operate only on a single sample and write - that result to disk. - - Parameters - ---------- - proportions : np.array - Two dimensional array containing calculated source proportions - (columns) by independent draws (rows). - - Returns - ------- - lines : str - String ready to be written containing the `mixing_proportions` data. - """ - return '\n'.join(['\t'.join(list(map(str, row))) for row in proportions]) - - -def _cli_sink_source_prediction_runner(sample, alpha1, alpha2, beta, restarts, - draws_per_restart, burnin, delay, - sources_data, biom_table, output_dir): - """Run sink source prediction. - - Notes - ----- - This function is used by the CLI to allow ipyparallels to map different - samples to different processors and enable parallelization. The parameters - passed to this function not described below are described in the - ConditionalProbability class, the Sampler class, or the gibbs function. - - Parameters - ---------- - sample : str - ID for the sample whose sources should be predicted. - sources_data : np.array - Data detailing the source environments. - output_dir : str - Path to the output directory where the results will be saved. - """ - cp = ConditionalProbability(alpha1, alpha2, beta, sources_data) - if isinstance(biom_table, pd.core.frame.DataFrame): - sink_data = biom_table.loc[sample].values - elif isinstance(biom_table, Table): - sink_data = biom_table.data(sample, axis='sample', dense=True) + def f(cp_and_sink): + # The import is here to ensure that the engines of the cluster can + # access the gibbs_sampler function. + from sourcetracker._sourcetracker import gibbs_sampler + return gibbs_sampler(cp_and_sink[1], cp_and_sink[0], restarts, + draws_per_restart, burnin, delay) + cps_and_sinks = [] + for source in sources.index: + _sources = sources.select(lambda x: x != source) + cp = ConditionalProbability(alpha1, alpha2, beta, _sources.values) + sink = sources.loc[source, :].values + cps_and_sinks.append((cp, sink)) + + if cluster is not None: + results = cluster[:].map(f, cps_and_sinks, block=True) else: - raise TypeError('OTU table data is neither Pandas DataFrame nor ' - 'biom.table.Table. These are the only supported ' - 'formats.') - - results = gibbs_sampler(cp, sink_data, restarts, draws_per_restart, burnin, - delay) - lines = _cli_single_sample_formatter(results[0]) - o = open(os.path.join(output_dir, sample + '.txt'), 'w') - o.writelines(lines) - o.close() - # calculated_assignments.append(results[1]) + results = list(map(f, cps_and_sinks)) + mpm, mps, fas = collate_gibbs_results([i[0] for i in results], + [i[1] for i in results], + [i[2] for i in results], + sources.index, sources.index, + sources.columns, + create_feature_tables, loo=True) + return mpm, mps, fas -def _cli_loo_runner(sample, source_category, alpha1, alpha2, beta, restarts, - draws_per_restart, burnin, delay, sample_metadata, - sources_data, sources_envs, biom_table, output_dir): - """Run leave-one-out source contribution prediction. - - Notes - ----- - This function is used by the CLI to allow ipyparallels to map different - samples to different processors and enable parallelization. The parameters - passed to this function not described below are described in the - ConditionalProbability class, the Sampler class, or the gibbs function. - - Parameters - ---------- - sample : str - ID for the sample whose sources should be predicted. - source_category : str - Key for `sample_metadata[sample]` that indicates which source the - sample belongs to. - sample_metadata : dict - Nested dictionary containing samples ID's and associated metadata. - sources_envs : np.array - Array of the sources in order of the columns in `sources_data`. - output_dir : str - Path to the output directory where the results will be saved. - """ - sink_data = biom_table.data(sample, axis='sample', dense=True) - _tmp = sample_metadata[sample][source_category] - row = (sources_envs == _tmp).nonzero()[0] - _sd = copy(sources_data) - _sd[row] -= sink_data - cp = ConditionalProbability(alpha1, alpha2, beta, _sd) - results = gibbs_sampler(cp, sink_data, restarts, draws_per_restart, - burnin, delay) - lines = _cli_single_sample_formatter(results[0]) - o = open(os.path.join(output_dir, sample + '.txt'), 'w') - o.writelines(lines) - o.close() - - -def _gibbs(source_df, sink_df, alpha1, alpha2, beta, restarts, - draws_per_restart, burnin, delay, cluster=None): - '''Gibb's sampling API +def _gibbs(sources, sinks, alpha1, alpha2, beta, restarts, draws_per_restart, + burnin, delay, cluster=None, create_feature_tables=True): + '''Gibb's sampling API. Notes ----- - This function exists to allow API calls to source/sink prediction. - This function currently does not support LOO classification. It is a + This function exists to allow API calls to source/sink prediction. It is a candidate public API call. You can track progress on this via https://github.com/biota/sourcetracker2/issues/31 - Parameters that are not described in this function body are described - elsewhere in this library (e.g. alpha1, alpha2, etc.). - # TODO: document API fully - users won't be able to access this information - # without access to private functionality. - Warnings -------- This function does _not_ perform rarefaction, the user should perform - rarefaction prior to calling this function. + rarefaction prior to calling this function. This function also does not + perform checks on the data (i.e. using `check_and_correct_data`, or + `intersect_and_sort_samples`). Finally, this function does not collapse + sources or sinks, it expects each row of the `sources` dataframe to + represent a unique source, and each row of the `sinks` dataframe to + represent a unique sink. Parameters ---------- - source_df : DataFrame + sources : DataFrame A dataframe containing source data (rows are sources, columns are - OTUs). The index must be the names of the sources. - sink_df : DataFrame - A dataframe containing sink data (rows are sinks, columns are OTUs). - The index must be the names of the sinks. + features). The index must be the names of the sources. + sinks : DataFrame + A dataframe containing sink data (rows are sinks, columns are + features). The index must be the names of the sinks. + alpha1 : float + Prior counts of each feature in the training environments. Higher + values decrease the trust in the training environments, and make + the source environment distributions over taxa smoother. By + default, this is set to 0.001, which indicates reasonably high + trust in all source environments, even those with few training + sequences. This is useful when only a small number of biological + samples are available from a source environment. A more + conservative value would be 0.01. + alpha2 : float + Prior counts of each feature in the Unknown environment. Higher + values make the Unknown environment smoother and less prone to + overfitting given a training sample. + beta : float + Number of prior counts of test sequences from each feature in each + environment. + restarts : int + Number of independent Markov chains to grow. `draws_per_restart` * + `restarts` gives the number of samplings of the mixing proportions that + will be generated. + draws_per_restart : int + Number of times to sample the state of the Markov chain for each + independent chain grown. + burnin : int + Number of passes (withdarawal and reassignment of every sequence in the + sink) that will be made before a sample (draw) will be taken. Higher + values allow more convergence towards the true distribtion before draws + are taken. + delay : int > 1 + Number passes between each sampling (draw) of the Markov chain. Once + the burnin passes have been made, a sample will be taken every `delay` + number of passes. This is also known as 'thinning'. Thinning helps + reduce the impact of correlation between adjacent states of the Markov + chain. Delay must be greater than 1, otherwise draws will never be + taken. This is a legacy of the original R code. cluster : ipyparallel.client.client.Client or None An ipyparallel Client object, e.g. a started cluster. + create_feature_tables : boolean + If `True` create a feature table for each sink. The feature table + records the average count of each feature from each source for this + sink. This option can consume large amounts of memory if there are many + source, sinks, and features. If `False`, feature tables are not + created. Returns ------- - mp : DataFrame - A dataframe containing the mixing proportions (rows are sinks, columns - are sources) + mpm : DataFrame + Mixing proportion means. A dataframe containing the mixing proportions + (rows are sinks, columns are sources). mps : DataFrame - A dataframe containing the mixing proportions standard deviations - (rows are sinks, columns are sources) + Mixing proportion standard deviations. A dataframe containing the + mixing proportions standard deviations (rows are sinks, columns are + sources). + fas : list + ith item is a pd.DataFrame of the average feature assignments from each + source for the ith sink (in the same order as rows of `mpm` and `mps`). Examples -------- @@ -916,10 +708,9 @@ def _gibbs(source_df, sink_df, alpha1, alpha2, beta, restarts, >>> from ipyparallel import Client >>> import subprocess >>> import time - >>> from sourcetracker.sourcetracker import \ - (_gibbs, _cli_sink_source_prediction_runner) + >>> from sourcetracker._sourcetracker import _gibbs - Prepare some source data. + # Prepare some source data. >>> otus = np.array(['o%s' % i for i in range(50)]) >>> source1 = np.random.randint(0, 1000, size=50) >>> source2 = np.random.randint(0, 1000, size=50) @@ -928,7 +719,7 @@ def _gibbs(source_df, sink_df, alpha1, alpha2, beta, restarts, index=['source1', 'source2', 'source3'], columns=otus, dtype=np.float64) - Prepare some sink data. + # Prepare some sink data. >>> sink1 = np.ceil(.5*source1+.5*source2) >>> sink2 = np.ceil(.5*source2+.5*source3) >>> sink3 = np.ceil(.5*source1+.5*source3) @@ -940,7 +731,7 @@ def _gibbs(source_df, sink_df, alpha1, alpha2, beta, restarts, range(1,7)]), columns=otus, dtype=np.float64) - Set paramaters + # Set paramaters >>> alpha1 = .01 >>> alpha2 = .001 >>> beta = 10 @@ -949,40 +740,238 @@ def _gibbs(source_df, sink_df, alpha1, alpha2, beta, restarts, >>> burnin = 2 >>> delay = 2 - Call without a cluster - >>> mp, mps = _gibbs(source_df, sink_df, alpha1, alpha2, beta, restarts, - draws_per_restart, burnin, delay) + # Call without a cluster + >>> mpm, mps, fas = _gibbs(source_df, sink_df, alpha1, alpha2, beta, + restarts, draws_per_restart, burnin, delay, + cluster=None, create_feature_tables=True) - Start a cluster and call the function. + # Start a cluster and call the function >>> jobs = 4 >>> subprocess.Popen('ipcluster start -n %s --quiet' % jobs, shell=True) >>> time.sleep(25) >>> c = Client() - >>> mp, mps = _gibbs(source_df, sink_df, alpha1, alpha2, beta, restarts, - draws_per_restart, burnin, delay, cluster=c) + >>> mpm, mps, fas = _gibbs(source_df, sink_df, alpha1, alpha2, beta, + restarts, draws_per_restart, burnin, delay, + cluster=c, create_feature_tables=True) + ''' + cp = ConditionalProbability(alpha1, alpha2, beta, sources.values) + f = partial(gibbs_sampler, cp=cp, restarts=restarts, + draws_per_restart=draws_per_restart, burnin=burnin, + delay=delay) + if cluster is not None: + results = cluster[:].map(f, sinks.values, block=True) + else: + results = list(map(f, sinks.values)) + mpm, mps, fas = collate_gibbs_results([i[0] for i in results], + [i[1] for i in results], + [i[2] for i in results], + sinks.index, sources.index, + sources.columns, + create_feature_tables, loo=False) + return mpm, mps, fas + + +def cumulative_proportions(all_envcounts, sink_ids, source_ids): + '''Calculate contributions of each source for each sink in `sink_ids`. + + Parameters + ---------- + all_envcounts : list + Each entry is 2D array of ints. The ith entry must correspond to the + ith sink ID. The [j, k] entry of the ith table is the count of + sequences assigned to the sink from kth environment during the jth + draw. + sink_ids : np.array + ID's of the sinks. Must be in the same order as data in + `all_envcounts`. + source_ids : np.array + ID's of the sources. Must be in the same order as the columns of the + tables in `all_envcounts`. + + Returns + ------- + proportions : pd.DataFrame + A dataframe of floats, containing the mixing proportions of each source + in each sink. The [i, j] entry is the contribution from the jth source + to the ith sink. + proportions_std : pd.DataFrame + A dataframe of floats, identical to `proportions` except the entries + are the standard deviation of each entry in `proportions`. + + Notes + ----- + This script is designed to be used by `collate_gibbs_results` after + completion of multiple `gibbs_sampler` calls (for different sinks). This + function does _not_ check that the assumptions of ordering described above + are met. It is the user's responsibility to check these if using this + function independently. ''' - with TemporaryDirectory() as tmpdir: - f = partial(_cli_sink_source_prediction_runner, alpha1=alpha1, - alpha2=alpha2, beta=beta, restarts=restarts, - draws_per_restart=draws_per_restart, burnin=burnin, - delay=delay, sources_data=source_df.values, - biom_table=sink_df, output_dir=tmpdir) - if cluster is not None: - cluster[:].map(f, sink_df.index, block=True) + num_sinks = len(sink_ids) + num_sources = len(source_ids) + 1 + + proportions = np.zeros((num_sinks, num_sources), dtype=np.float64) + proportions_std = np.zeros((num_sinks, num_sources), dtype=np.float64) + + for i, envcounts in enumerate(all_envcounts): + proportions[i] = envcounts.sum(0) / envcounts.sum() + proportions_std[i] = (envcounts / envcounts.sum()).std(0) + + cols = list(source_ids) + ['Unknown'] + return (pd.DataFrame(proportions, index=sink_ids, columns=cols), + pd.DataFrame(proportions_std, index=sink_ids, columns=cols)) + + +def single_sink_feature_table(final_env_assignments, final_taxon_assignments, + source_ids, feature_ids): + '''Produce a feature table from the output of `gibbs_sampler`. + + Parameters + ---------- + final_env_assignments : np.array + 2D array of ints. The [i, j] entry is the environment that sequence j + was assigned in the ith draw. The ordering of the columns is determined + by `np.repeat` and the count of different features in the sink. + The shape is number of draws by sum of the sink. + final_taxon_assignments : np.array + 2D array of ints. The [i, j] entry is the index of feature j in all + features, that was assigned in the ith draw. The ordering of the + columns is determined by `np.repeat` and the count of different + features in the sink. The shape is number of draws by sum of the sink. + source_ids : np.array + ID's of the sources. + feature_ids : np.array + ID's of the features. + + Returns + ------- + pd.DataFrame + A dataframe containing counts of features contributed to the sink by + each source. + + Notes + ----- + This script is designed to be used by `collate_gibbs_results` after + completion of multiple `gibbs_sampler` calls (for different sinks). This + function does _not_ check that the assumptions of ordering described above + are met. It is the user's responsibility to check these if using this + function independently. + ''' + num_sources = len(source_ids) + 1 + num_features = len(feature_ids) + data = np.zeros((num_sources, num_features), dtype=np.int32) + for r, c in zip(final_env_assignments.ravel(), + final_taxon_assignments.ravel()): + data[r, c] += 1 + return pd.DataFrame(data, index=list(source_ids) + ['Unknown'], + columns=feature_ids) + + +def collate_gibbs_results(all_envcounts, all_env_assignments, + all_taxon_assignments, sink_ids, source_ids, + feature_ids, create_feature_tables, loo): + '''Collate `gibbs_sampler` output, optionally including feature tables. + + Parameters + ---------- + all_envcounts : list + Each entry is 2D array of ints. The ith entry must correspond to the + ith sink ID. The [j, k] entry of the ith table is the count of + sequences assigned to the sink from kth environment during the jth + draw. + all_env_assignments : list + Each entry is a 2D array of ints. The ith entry is the environment + assignments for the ith sink. The [j, k] cell of the ith entry is the + environment of the kth taxon from the jth draw. + all_taxon_assignments : list + Each entry is a 2D array of ints. The ith entry is the feature indices + (over all features) for the ith sink. The [j, k] cell of the ith entry + is the feature index of the kth taxon selected for removal and + reassignment in the jth draw. + sink_ids : np.array + ID's of the sinks. + source_ids : np.array + ID's of the sources. + feature_ids : np.array + ID's of the features. + create_feature_tables : boolean + If `True` create a feature table for each sink. The feature table + records the average count of each feature from each source for this + sink. This option can consume large amounts of memory if there are many + source, sinks, and features. If `False`, feature tables are not + created. + loo : boolean + If `True`, collate data based on the assumption that input data was + generated by a `gibbs_loo` call. + + Notes + ----- + This script is designed to be used by after completion of multiple + `gibbs_sampler` calls (for different sinks). This function does _not_ check + that the assumptions of ordering described below are met. It is the user's + responsibility to check these if using this function independently. + + If `loo=False`, the order of the entries in each list (first 3 inputs) must + be the same, and correspond to the order of the `sink_ids`. + + If `loo=True`, the order of the entries in each list (first 3 inputs) must + be the same, and correspond to the order of the `source_ids`. + ''' + if loo: + props, props_stds = cumulative_proportions(all_envcounts, source_ids, + source_ids[:-1]) + # The source_ids for each environment are different. Specifically, the + # ith row of `props` and `props_stds` must have a 0 value inserted at + # the ith position to reflect the fact that the ith source was held out + # (it was the sink during that iteration). To do this we can imagine + # breaking the nXn array returned by `cumulative_proportions`, and + # inserting it into an nXn+1 array, with the missing cells on the + # diagonal of the nXn+1 array. + nrows = len(source_ids) + ncols = nrows + 1 + new_source_ids = list(source_ids)+['Unknown'] + new_data = np.zeros((nrows, ncols), dtype=np.float64) + new_data_std = np.zeros((nrows, ncols), dtype=np.float64) + + new_data[np.triu_indices(ncols, 1)] = \ + props.values[np.triu_indices(ncols - 1, 0)] + new_data[np.tril_indices(ncols - 1, -1, ncols)] = \ + props.values[np.tril_indices(ncols - 1, -1)] + + new_data_std[np.triu_indices(ncols, 1)] = \ + props_stds.values[np.triu_indices(ncols - 1, 0)] + new_data_std[np.tril_indices(ncols - 1, -1, ncols)] = \ + props_stds.values[np.tril_indices(ncols - 1, -1)] + + props = pd.DataFrame(new_data, index=source_ids, + columns=new_source_ids) + props_stds = pd.DataFrame(new_data_std, index=source_ids, + columns=new_source_ids) + + if create_feature_tables: + fts = [] + for i, sink_id in enumerate(source_ids): + r_source_ids = source_ids[source_ids != sink_id] + ft = single_sink_feature_table(all_env_assignments[i], + all_taxon_assignments[i], + r_source_ids, feature_ids) + tmp = ft.T + tmp.insert(i, sink_id, 0) + fts.append(tmp.T) + else: + fts = None + + # LOO not done. + else: + props, props_stds = cumulative_proportions(all_envcounts, sink_ids, + source_ids) + if create_feature_tables: + fts = [] + for i, sink_id in enumerate(sink_ids): + ft = single_sink_feature_table(all_env_assignments[i], + all_taxon_assignments[i], + source_ids, feature_ids) + fts.append(ft) else: - for sink in sink_df.index: - f(sink) - - samples = [] - mp_means = [] - mp_stds = [] - for sample_fp in glob.glob(os.path.join(tmpdir, '*')): - samples.append(sample_fp.strip().split('/')[-1].split('.txt')[0]) - tmp_arr = np.loadtxt(sample_fp, delimiter='\t') - mp_means.append(tmp_arr.mean(0)) - mp_stds.append(tmp_arr.std(0)) - - cols = list(source_df.index) + ['Unknown'] - mp_df = pd.DataFrame(mp_means, index=samples, columns=cols) - mp_stds_df = pd.DataFrame(mp_stds, index=samples, columns=cols) - return mp_df, mp_stds_df + fts = None + + return props, props_stds, fts diff --git a/sourcetracker/tests/test_sourcetracker.py b/sourcetracker/tests/test_sourcetracker.py index 89850f4..0594e34 100644 --- a/sourcetracker/tests/test_sourcetracker.py +++ b/sourcetracker/tests/test_sourcetracker.py @@ -10,119 +10,226 @@ from unittest import TestCase, main import numpy as np +import pandas as pd from biom.table import Table -from sourcetracker._sourcetracker import ( - parse_mapping_file, collapse_sources, ConditionalProbability, - gibbs_sampler, Sampler, sinks_and_sources, - _cli_sync_biom_and_sample_metadata, _cli_single_sample_formatter, - _cli_collate_results, subsample_sources_sinks) - - -class TestPreparationFunctions(TestCase): - - def test_parse_mapping_file(self): - lines = ['#SampleID\tcat1\tcat2', - 'S1\t1\ta', - 'S2\t2\tb', - 'S3\tsdsd \tc', - 'S4\t1111\t9'] - exp = {'S1': {'cat1': '1', 'cat2': 'a'}, - 'S2': {'cat1': '2', 'cat2': 'b'}, - 'S3': {'cat1': 'sdsd', 'cat2': 'c'}, - 'S4': {'cat1': '1111', 'cat2': '9'}} - obs = parse_mapping_file(lines) - self.assertEqual(obs, exp) - - def test_collapse_sources(self): - # The order of the collapsed data is unclear. - data = np.arange(50).reshape(10, 5) - oids = ['o%s' % i for i in range(10)] - sids = ['s%s' % i for i in range(5)] - biom_table = Table(data, oids, sids) - sample_metadata = {'s4': {'cat1': '1', 'cat2': 'D'}, - 's0': {'cat1': '1', 'cat2': 'B'}, - 's1': {'cat1': '2', 'cat2': 'C'}, - 's3': {'cat1': '2', 'cat2': 'A'}, - 's2': {'cat1': '2', 'cat2': 'D'}} - - category = 'cat1' - samples = ['s0', 's1'] - sort = True - obs_envs, obs_collapsed_sources = collapse_sources(samples, - sample_metadata, - category, - biom_table, sort) - exp_envs = np.array(['1', '2']) - exp_collapsed_sources = \ - np.array([[0, 5, 10, 15, 20, 25, 30, 35, 40, 45], - [1, 6, 11, 16, 21, 26, 31, 36, 41, 46]]) - - np.testing.assert_array_equal(obs_collapsed_sources, - exp_collapsed_sources) - np.testing.assert_array_equal(obs_envs, exp_envs) - - # Change the order of the samples, sort being true should return the - # same data. - samples = ['s1', 's0'] - obs_envs, obs_collapsed_sources = collapse_sources(samples, - sample_metadata, - category, - biom_table, sort) - np.testing.assert_array_equal(obs_collapsed_sources, - exp_collapsed_sources) - np.testing.assert_array_equal(obs_envs, exp_envs) - - category = 'cat1' - samples = ['s2', 's0', 's1'] - sort = True - obs_envs, obs_collapsed_sources = collapse_sources(samples, - sample_metadata, - category, - biom_table, sort) - exp_envs = np.array(['1', '2']) - exp_collapsed_sources = \ - np.array([[0, 5, 10, 15, 20, 25, 30, 35, 40, 45], - [3, 13, 23, 33, 43, 53, 63, 73, 83, 93]]) - np.testing.assert_array_equal(obs_collapsed_sources, - exp_collapsed_sources) - np.testing.assert_array_equal(obs_envs, exp_envs) - - category = 'cat1' - samples = ['s4', 's2', 's0', 's1'] - sort = True - obs_envs, obs_collapsed_sources = collapse_sources(samples, - sample_metadata, - category, - biom_table, sort) - exp_envs = np.array(['1', '2']) - exp_collapsed_sources = \ - np.array([[4, 14, 24, 34, 44, 54, 64, 74, 84, 94], - [3, 13, 23, 33, 43, 53, 63, 73, 83, 93]]) - np.testing.assert_array_equal(obs_collapsed_sources, - exp_collapsed_sources) - np.testing.assert_array_equal(obs_envs, exp_envs) - - category = 'cat2' - samples = ['s4', 's2', 's0', 's1'] - sort = True - obs_envs, obs_collapsed_sources = collapse_sources(samples, - sample_metadata, - category, - biom_table, sort) - exp_envs = np.array(['B', 'C', 'D']) - exp_collapsed_sources = \ - np.array([[0, 5, 10, 15, 20, 25, 30, 35, 40, 45], - [1, 6, 11, 16, 21, 26, 31, 36, 41, 46], - [6, 16, 26, 36, 46, 56, 66, 76, 86, 96]]) - np.testing.assert_array_equal(obs_collapsed_sources, - exp_collapsed_sources) - np.testing.assert_array_equal(obs_envs, exp_envs) +from sourcetracker._sourcetracker import (biom_to_df, + intersect_and_sort_samples, + collapse_source_data, + subsample_dataframe, + check_and_correct_data, + collate_gibbs_results, + get_samples, + generate_environment_assignments, + cumulative_proportions, + single_sink_feature_table, + ConditionalProbability, + gibbs_sampler) + + +class TestCheckAndCorrectData(TestCase): - data = np.arange(200).reshape(20, 10) + def setUp(self): + data = np.random.randint(0, 100, size=18).reshape(3, 6) + self.ftable = pd.DataFrame(data) + + def test_no_errors(self): + # A table where nothing is wrong, no changes expected. + obs = check_and_correct_data(self.ftable, True) + pd.util.testing.assert_frame_equal(self.ftable, obs) + + def test_error_on_string_data(self): + # A table with a string element, expect a TypeError. + ftable = self.ftable.copy() + ftable.iloc[0, 3] = '4.5' + self.assertRaises(TypeError, check_and_correct_data, ftable, True) + self.assertRaises(TypeError, check_and_correct_data, ftable, False) + + def test_error_on_bool_data(self): + # A table with some boolean data, expect a TypeError. + ftable = self.ftable.copy() + ftable.iloc[0, 3] = True + self.assertRaises(TypeError, check_and_correct_data, ftable, True) + self.assertRaises(TypeError, check_and_correct_data, ftable, False) + + def test_error_on_nan_data(self): + # A table with nans, expect a ValueError. + ftable = self.ftable.copy() + ftable.iloc[0, 3] = np.nan + self.assertRaises(ValueError, check_and_correct_data, ftable, True) + + def test_error_on_no_correction_fractional_count(self): + # Check that fractional counts give an error if function isn't allowed + # to correct them. + ftable = self.ftable.copy() + ftable.iloc[0, 3] = 4.512 + self.assertRaises(ValueError, check_and_correct_data, ftable, False) + + def test_correcting_fractional_count(self): + # Check that fractional counts are corrected. + ftable1 = self.ftable.copy() + ftable2 = self.ftable.copy() + ftable1.iloc[0, 3] = 5 + ftable2.iloc[0, 3] = 4.512 + obs1 = check_and_correct_data(ftable1, True) + obs2 = check_and_correct_data(ftable2, True) + pd.util.testing.assert_frame_equal(obs1, obs2) + + +class TestBiomToDf(TestCase): + + def setUp(self): + self.exp = \ + pd.DataFrame(np.arange(200).reshape(20, 10).astype(np.int64).T, + index=['s%s' % i for i in range(10)], + columns=['o%s' % i for i in range(20)]) + + def test_converts_floats(self): + data = np.arange(200).reshape(20, 10).astype(float) oids = ['o%s' % i for i in range(20)] sids = ['s%s' % i for i in range(10)] - biom_table = Table(data, oids, sids) + + obs = biom_to_df(Table(data, oids, sids)) + pd.util.testing.assert_frame_equal(obs, self.exp) + + # Because the biom table and dataframe are transposed with respect to + # one another, we used data[3, 0] and exp.iloc[0, 3] to refer to the + # same value in the different tables. + data[3, 0] = 4.512 + obs = biom_to_df(Table(data, oids, sids)) + exp = self.exp.copy() + exp.iloc[0, 3] = 5 + pd.util.testing.assert_frame_equal(obs, exp) + + +class TestIntersectAndSortSamples(TestCase): + + def test_partially_overlapping_tables(self): + # Test an example where there are unshared samples present in both + # feature and sample tables. Notice that order is different between + # the samples that are shared between both tables. The order of samples + # in the returned tables is set by the ordering done in np.intersect1d. + sdata_c1 = [3.1, 'red', 5] + sdata_c2 = [3.6, 'yellow', 7] + sdata_c3 = [3.9, 'yellow', -2] + sdata_c4 = [2.5, 'red', 5] + sdata_c5 = [6.7, 'blue', 10] + samples = ['s1', 's4', 's2', 's3', 'sX'] + headers = ['pH', 'color', 'day'] + stable = pd.DataFrame([sdata_c1, sdata_c4, sdata_c2, sdata_c3, + sdata_c5], index=samples, columns=headers) + + fdata = np.arange(90).reshape(9, 10) + samples = ['s%i' % i for i in range(3, 12)] + columns = ['o%i' % i for i in range(1, 11)] + ftable = pd.DataFrame(fdata, index=samples, columns=columns) + + exp_ftable = pd.DataFrame(fdata[[1, 0], :], index=['s4', 's3'], + columns=columns) + exp_stable = pd.DataFrame([sdata_c4, sdata_c3], index=['s4', 's3'], + columns=headers) + + obs_stable, obs_ftable = intersect_and_sort_samples(stable, ftable) + + pd.util.testing.assert_frame_equal(obs_stable, exp_stable) + pd.util.testing.assert_frame_equal(obs_ftable, exp_ftable) + + # No shared samples, expect a ValueError. + ftable.index = ['ss%i' % i for i in range(9)] + self.assertRaises(ValueError, intersect_and_sort_samples, stable, + ftable) + + # All samples shared, expect no changes. + fdata = np.arange(50).reshape(5, 10) + samples = ['s1', 's4', 's2', 's3', 'sX'] + columns = ['o%i' % i for i in range(10)] + ftable = pd.DataFrame(fdata, index=samples, columns=columns) + + exp_ftable = ftable.loc[stable.index, :] + exp_stable = stable + + obs_stable, obs_ftable = intersect_and_sort_samples(stable, ftable) + pd.util.testing.assert_frame_equal(obs_stable, exp_stable) + pd.util.testing.assert_frame_equal(obs_ftable, exp_ftable) + + +class TestGetSamples(TestCase): + + def tests(self): + # Make a dataframe which contains mixed data to test. + col0 = ['a', 'a', 'a', 'a', 'b'] + col1 = [3, 2, 3, 1, 3] + col2 = ['red', 'red', 'blue', 255, 255] + headers = ['sample_location', 'num_reps', 'color'] + samples = ['s1', 's2', 's3', 's4', 's5'] + sample_metadata = \ + pd.DataFrame.from_dict({k: v for k, v in zip(headers, + [col0, col1, col2])}) + sample_metadata.index = samples + + obs = get_samples(sample_metadata, 'sample_location', 'b') + exp = pd.Index(['s5'], dtype='object') + pd.util.testing.assert_index_equal(obs, exp) + + obs = get_samples(sample_metadata, 'sample_location', 'a') + exp = pd.Index(['s1', 's2', 's3', 's4'], dtype='object') + pd.util.testing.assert_index_equal(obs, exp) + + obs = get_samples(sample_metadata, 'color', 255) + exp = pd.Index(['s4', 's5'], dtype='object') + pd.util.testing.assert_index_equal(obs, exp) + + obs = get_samples(sample_metadata, 'num_reps', 3) + exp = pd.Index(['s1', 's3', 's5'], dtype='object') + pd.util.testing.assert_index_equal(obs, exp) + + +class TestCollapseSourceData(TestCase): + + def test_example1(self): + # Simple example with 'sum' as collapse mode. + samples = ['sample1', 'sample2', 'sample3', 'sample4'] + category = 'pH' + values = [3.0, 0.4, 3.0, 3.0] + stable = pd.DataFrame(values, index=samples, columns=[category]) + fdata = np.array([[10, 50, 10, 70], + [0, 25, 10, 5], + [0, 25, 10, 5], + [100, 0, 10, 5]]) + ftable = pd.DataFrame(fdata, index=stable.index, + columns=map(str, np.arange(4))) + source_samples = ['sample1', 'sample2', 'sample3'] + method = 'sum' + obs = collapse_source_data(stable, ftable, source_samples, category, + method) + exp_data = np.vstack((fdata[1, :], fdata[0, :] + fdata[2, :])) + exp_index = [0.4, 3.0] + exp = pd.DataFrame(exp_data, index=exp_index, + columns=map(str, np.arange(4))) + exp.index.name = 'collapse_col' + pd.util.testing.assert_frame_equal(obs, exp) + + # Example with collapse mode 'mean'. This will cause non-integer values + # to be present, which the check_and_correct_data should catch. + source_samples = ['sample1', 'sample2', 'sample3', 'sample4'] + method = 'mean' + obs = collapse_source_data(stable, ftable, source_samples, category, + method) + exp_data = np.ceil(np.vstack((fdata[1, :], + fdata[[0, 2, 3], :].mean(0)))) + exp_index = [0.4, 3.0] + exp = pd.DataFrame(exp_data.astype(np.int64), index=exp_index, + columns=map(str, np.arange(4))) + exp.index.name = 'collapse_col' + pd.util.testing.assert_frame_equal(obs, exp) + + def test_example2(self): + # Test on another arbitrary example. + data = np.arange(200).reshape(20, 10) + oids = ['o%s' % i for i in range(20)] + sids = ['s%s' % i for i in range(10)] + ftable = pd.DataFrame(data.T, index=sids, columns=oids) + _stable = \ {'s4': {'cat1': '2', 'cat2': 'x', 'cat3': 'A', 'cat4': 'D'}, 's0': {'cat1': '1', 'cat2': 'y', 'cat3': 'z', 'cat4': 'D'}, 's1': {'cat1': '1', 'cat2': 'x', 'cat3': 'A', 'cat4': 'C'}, @@ -133,305 +240,397 @@ def test_collapse_sources(self): 's7': {'cat1': '2', 'cat2': 'x', 'cat3': 'z', 'cat4': '0'}, 's9': {'cat1': '2', 'cat2': 'x', 'cat3': 'z', 'cat4': '0'}, 's8': {'cat1': '2', 'cat2': 'x', 'cat3': 'z', 'cat4': '0'}} - + stable = pd.DataFrame(_stable).T category = 'cat4' - samples = ['s4', 's9', 's0', 's2'] - sort = False - - obs_envs, obs_collapsed_sources = collapse_sources(samples, - sample_metadata, - category, - biom_table, sort) - exp_envs = np.array(['D', '0']) - exp_collapsed_sources = \ - np.array([[6, 36, 66, 96, 126, 156, 186, 216, 246, 276, 306, 336, - 366, 396, 426, 456, 486, 516, 546, 576], - [9, 19, 29, 39, 49, 59, 69, 79, 89, 99, 109, 119, 129, - 139, 149, 159, 169, 179, 189, 199]]) - np.testing.assert_array_equal(obs_collapsed_sources, - exp_collapsed_sources) - np.testing.assert_array_equal(obs_envs, exp_envs) - - def test_subsample_sources_sinks(self): - sources_data = np.array([[5, 100, 3, 0, 0, 1, 9], - [2, 20, 1, 0, 0, 0, 98], - [1000, 0, 0, 0, 0, 0, 0]]) - sinks_data = np.array([[200, 0, 11, 400, 0, 0, 0], - [0, 0, 0, 0, 0, 0, 50]]) - sinks = np.array(['sink1', 'sink2']) - sources = np.array(['source1', 'source2', 'source3']) - # The table is composed of the 3 sources and 2 sinks. We concatenate - # the sink and source data together to create this table - feature_table = Table(np.vstack((sources_data, sinks_data)).T, - ['o%s' % i for i in range(7)], - np.hstack((sources, sinks))) - - # Test that errors are thrown appropriately. - sources_depth = 1001 - sinks_depth = 0 - self.assertRaises(ValueError, subsample_sources_sinks, sources_data, - sinks, feature_table, sources_depth, sinks_depth) - sources_depth = 100 - sinks_depth = 51 - self.assertRaises(ValueError, subsample_sources_sinks, sources_data, - sinks, feature_table, sources_depth, sinks_depth) - - # Test when no rarefaction would occur. - sources_depth = 0 - sinks_depth = 0 - obs_rsd, obs_rft = subsample_sources_sinks(sources_data, sinks, - feature_table, - sources_depth, sinks_depth) - np.testing.assert_array_equal(obs_rsd, sources_data) - self.assertEqual(obs_rft, feature_table) - - # Test with only sources rarefaction. - # This won't work since cython is generating the PRNG calls, instead, - # we can settle for less - ensure that the sums are correct. - # np.random.seed(0) - # sources_depth = 100 - # sinks_depth = 0 - # obs_rsd, obs_rft = subsample_sources_sinks(sources_data, sinks, - # feature_table, - # sources_depth, - # sinks_depth) - # exp_rsd = np.array([[5., 84., 2., 0., 0., 1., 8.], - # [1., 16., 1., 0., 0., 0., 82.], - # [100., 0., 0., 0., 0., 0., 0.]]) - # np.testing.assert_array_equal(obs_rsd, sources_data) - # self.assertEqual(obs_rft, feature_table) - sources_depth = 100 - sinks_depth = 0 - obs_rsd, obs_rft = subsample_sources_sinks(sources_data, sinks, - feature_table, - sources_depth, sinks_depth) - np.testing.assert_array_equal(obs_rsd.sum(1), np.array([100]*3)) - self.assertEqual(obs_rft, feature_table) - - # Test with only sinks rarefaction. - # This won't work since cython is generating the PRNG calls, instead, - # we can settle for less - ensure that the sums are correct. - # np.random.seed(0) - # sources_depth = 0 - # sinks_depth = 49 - # obs_rsd, obs_rft = subsample_sources_sinks(sources_data, sinks, - # feature_table, - # sources_depth, - # sinks_depth) - # exp_rft_array = np.array([[5., 2., 1000., 11., 0.], - # [100., 20., 0., 0., 0.], - # [3., 1., 0., 3., 0.], - # [0., 0., 0., 35., 0.], - # [0., 0., 0., 0., 0.], - # [1., 0., 0., 0., 0.], - # [9., 98., 0., 0., 49.]]) - # exp_rft_oids = feature_table.ids(axis='observation') - # exp_rft_sids = feature_table.ids(axis='sample') - # exp_rft = Table(exp_rft_array, exp_rft_oids, exp_rft_sids) - # np.testing.assert_array_equal(obs_rsd, sources_data) - # self.assertEqual(obs_rft, exp_rft) - sources_depth = 0 - sinks_depth = 49 - obs_rsd, obs_rft = subsample_sources_sinks(sources_data, sinks, - feature_table, - sources_depth, sinks_depth) - fft = obs_rft.filter(sinks, inplace=False) - np.testing.assert_array_equal(fft._data.toarray().sum(0), - np.array([49]*2)) - np.testing.assert_array_equal(obs_rsd, sources_data) - - -class TestCLIFunctions(TestCase): - '''Tests for the functions which convert command line options.''' - def setUp(self): - self.sample_metadata_1 = \ - {'s1': {'source_sink': 'source1', 'cat2': 'random_nonsense'}, - 's2': {'source_sink': 'sink', 'cat2': 'sink'}, - 's5': {'source_sink': 'source1', 'cat2': 'random_nonsense'}, - 's0': {'source_sink': 'source2', 'cat2': 'random_nonsense'}, - 's100': {'source_sink': 'sink', 'cat2': 'sink'}} - # Data for testing sinks_and_sources - self.sample_metadata_2 = \ - {'s1': {'SourceSink': 'source', 'Env': 'source1'}, - 's2': {'SourceSink': 'sink', 'Env': 'e1'}, - 's5': {'SourceSink': 'source', 'Env': 'source1'}, - 's0': {'SourceSink': 'source', 'Env': 'source2'}, - 's100': {'SourceSink': 'sink', 'Env': 'e2'}} - self.sample_metadata_3 = \ - {'s1': {'SourceSink': 'source', 'Env': 'source1'}, - 's2': {'SourceSink': 'source', 'Env': 'e1'}, - 's5': {'SourceSink': 'source', 'Env': 'source1'}, - 's0': {'SourceSink': 'source', 'Env': 'source2'}, - 's100': {'SourceSink': 'source', 'Env': 'e2'}} - # Data for testing _cli_sync_biom_and_sample_metadata - oids = ['o1', 'o2', 'o3'] - # Data for an example where samples are removed from biom table only. - sids = ['Sample1', 'Sample2', 'Sample3', 'Sample4'] - bt_1_data = np.arange(12).reshape(3, 4) - self.bt_1_in = Table(bt_1_data, oids, sids) - self.bt_1_out = Table(bt_1_data[:, :-1], oids, sids[:-1]) - self.mf_1_in = \ - {'Sample1': {'cat1': 'X', 'cat2': 'Y'}, - 'Sample2': {'cat1': 'X', 'cat2': 'Y'}, - 'Sample3': {'cat1': 'X', 'cat2': 'Y'}} - self.mf_1_out = self.mf_1_in - # Data for an example where sample are removed from mapping file only. - self.bt_2_in = self.bt_1_in - self.bt_2_out = self.bt_1_in - self.mf_2_in = \ - {'Sample1': {'cat1': 'X', 'cat2': 'Y'}, - 'Sample6': {'cat1': 'X', 'cat2': 'Y'}, - 'Sample3': {'cat1': 'X', 'cat2': 'Y'}, - 'Sample4': {'cat1': 'X', 'cat2': 'Y'}, - 'Sample2': {'cat1': 'X', 'cat2': 'Y'}} - self.mf_2_out = \ - {'Sample1': {'cat1': 'X', 'cat2': 'Y'}, - 'Sample3': {'cat1': 'X', 'cat2': 'Y'}, - 'Sample4': {'cat1': 'X', 'cat2': 'Y'}, - 'Sample2': {'cat1': 'X', 'cat2': 'Y'}} - # Data for an example where samples are removed from mapping file and - # biom file. - sids = ['Sample1', 'sampleA', 'Sample3', 'Sample4'] - bt_3_data = np.arange(12).reshape(3, 4) - self.bt_3_in = Table(bt_3_data, oids, sids) - self.bt_3_out = Table(bt_1_data[:, [0, 2, 3]], oids, - [sids[0], sids[2], sids[3]]) - self.mf_3_in = self.mf_2_out - self.mf_3_out = \ - {'Sample1': {'cat1': 'X', 'cat2': 'Y'}, - 'Sample3': {'cat1': 'X', 'cat2': 'Y'}, - 'Sample4': {'cat1': 'X', 'cat2': 'Y'}} - - def test_sinks_and_sources(self): - # Test single category, sink, and source identifier example. - obs_source_samples, obs_sink_samples = \ - sinks_and_sources(self.sample_metadata_2) - exp_source_samples = ['s1', 's5', 's0'] - exp_sink_samples = ['s2', 's100'] - self.assertEqual(set(exp_sink_samples), set(obs_sink_samples)) - self.assertEqual(set(exp_source_samples), set(obs_source_samples)) - - obs_source_samples, obs_sink_samples = \ - sinks_and_sources(self.sample_metadata_3) - exp_source_samples = ['s1', 's5', 's0', 's2', 's100'] - exp_sink_samples = [] - self.assertEqual(set(exp_sink_samples), set(obs_sink_samples)) - self.assertEqual(set(exp_source_samples), set(obs_source_samples)) - - def test_sinks_and_sources_alt_strings(self): - - metadata = { - 's1': {'source-or-sink': '--source!'}, - 's2': {'source-or-sink': '--sink!'}, - 's5': {'source-or-sink': '--source!'}, - 's0': {'source-or-sink': '--source!'}, - 's100': {'source-or-sink': '--sink!'}} - - obs_source_samples, obs_sink_samples = sinks_and_sources( - metadata, column_header='source-or-sink', source_value='--source!', - sink_value='--sink!') - exp_source_samples = ['s1', 's5', 's0'] - exp_sink_samples = ['s2', 's100'] - self.assertEqual(set(exp_sink_samples), set(obs_sink_samples)) - self.assertEqual(set(exp_source_samples), set(obs_source_samples)) - - def test__cli_sync_biom_and_sample_metadata(self): - # Test when syncing removes from mapping file only. - mf_obs, bt_obs = _cli_sync_biom_and_sample_metadata(self.mf_1_in, - self.bt_1_in) - self.assertEqual(mf_obs, self.mf_1_out) - self.assertEqual(bt_obs, self.bt_1_out) - - # Test when syncing removes from biom table only. - mf_obs, bt_obs = _cli_sync_biom_and_sample_metadata(self.mf_2_in, - self.bt_2_in) - self.assertEqual(mf_obs, self.mf_2_out) - self.assertEqual(bt_obs, self.bt_2_out) - - # Test when syncing removes from both mapping and biom files. - mf_obs, bt_obs = _cli_sync_biom_and_sample_metadata(self.mf_3_in, - self.bt_3_in) - self.assertEqual(mf_obs, self.mf_3_out) - self.assertEqual(bt_obs, self.bt_3_out) - - # Test that a ValueError is raised when no samples are shared. - self.assertRaises(ValueError, _cli_sync_biom_and_sample_metadata, - self.sample_metadata_1, self.bt_1_in) - - def test__cli_single_sample_formatter(self): - proportions = np.arange(20, dtype=int).reshape(5, 4) - obs = _cli_single_sample_formatter(proportions) - exp = ('0\t1\t2\t3\n4\t5\t6\t7\n8\t9\t10\t11\n12\t13\t14\t15\n16\t17' - '\t18\t19') - self.assertEqual(obs, exp) - - def test__cli_collate_results(self): - samples = ['s1', 's2', 'sC12', 's4'] - samples_data = [np.arange(18).reshape(6, 3), - 4 * np.arange(18).reshape(6, 3), - 200 + np.arange(18).reshape(6, 3), - 5 + 1000 * np.arange(18).reshape(6, 3)] - env_ids = np.array(['e1', 'asdf']) - obs_means, obs_stds = _cli_collate_results(samples, samples_data, - env_ids) - exp_means = '\n'.join(['SampleID\te1\tasdf\tUnknown', - 's1\t7.5\t8.5\t9.5', - 's2\t30.0\t34.0\t38.0', - 'sC12\t207.5\t208.5\t209.5', - 's4\t7505.0\t8505.0\t9505.0']) - exp_stds = '\n'.join([ - 'SampleID\te1\tasdf\tUnknown', - 's1\t5.12347538298\t5.12347538298\t5.12347538298', - 's2\t20.4939015319\t20.4939015319\t20.4939015319', - 'sC12\t5.12347538298\t5.12347538298\t5.12347538298', - 's4\t5123.47538298\t5123.47538298\t5123.47538298']) - self.assertEqual(obs_means, exp_means) - self.assertEqual(obs_stds, exp_stds) - - -class TestSamplerClass(TestCase): - '''Unit tests for the Python SourceTracker `Sampler` class.''' - - def setUp(self): - self.sampler_data = np.array([4., 5., 6.]) - self.num_sources = 3 - self.sum = 15 # number of seqs in the sink - self.sampler = Sampler(self.sampler_data, self.num_sources) - - def test_generate_taxon_sequence(self): - exp_taxon_sequence = np.array([0., 0., 0., 0., 1., 1., 1., 1., 1., 2., - 2., 2., 2., 2., 2.]) - - self.sampler.generate_taxon_sequence() - obs_taxon_sequence = self.sampler.taxon_sequence - - np.testing.assert_array_equal(obs_taxon_sequence, exp_taxon_sequence) + source_samples = ['s4', 's9', 's0', 's2'] + method = 'sum' + obs = collapse_source_data(stable, ftable, source_samples, category, + method) + exp_index = np.array(['0', 'D']) + exp_data = np.array([[9, 19, 29, 39, 49, 59, 69, 79, 89, 99, 109, 119, + 129, 139, 149, 159, 169, 179, 189, 199], + [6, 36, 66, 96, 126, 156, 186, 216, 246, 276, 306, + 336, 366, 396, 426, 456, 486, 516, 546, 576]]) + + exp = pd.DataFrame(exp_data, index=exp_index, columns=oids) + exp.index.name = 'collapse_col' + pd.util.testing.assert_frame_equal(obs, exp) + + +class TestSubsampleDataframe(TestCase): + + def test_no_errors_expected(self): + # Testing this function deterministically is hard because cython is + # generating the PRNG calls. We'll settle for ensuring that the sums + # are correct. + fdata = np.array([[10, 50, 10, 70], + [0, 25, 10, 5], + [0, 25, 10, 5], + [100, 0, 10, 5]]) + ftable = pd.DataFrame(fdata, index=['s1', 's2', 's3', 's4'], + columns=map(str, np.arange(4))) + n = 30 + obs = subsample_dataframe(ftable, n) + self.assertTrue((obs.sum(axis=1) == n).all()) + + def test_shape_doesnt_change(self): + # Test that when features are removed by subsampling, the shape of the + # table does not change. Although rarifaction is stochastic, the + # probability that the below table does not lose at least one feature + # during rarefaction (and thus satisfy as the test of the condition we) + # are interested in) is nearly 0. + fdata = np.array([[0, 0, 0, 1e4], + [0, 0, 1, 1e4], + [0, 1, 0, 1e4], + [1, 0, 0, 1e4]]).astype(int) + ftable = pd.DataFrame(fdata, index=['s1', 's2', 's3', 's4'], + columns=map(str, np.arange(4))) + n = 10 + obs = subsample_dataframe(ftable, n) + self.assertTrue((obs.sum(axis=1) == n).all()) + self.assertEqual(obs.shape, ftable.shape) + + +class TestDataAggregationFunctions(TestCase): + '''Test that returned data is collated and written correctly.''' + + def test_cumulative_proportions(self): + # 4 draws, 4 sources + unknown, 3 sinks + sink1_envcounts = np.array([[10, 100, 15, 0, 25], + [150, 0, 0, 0, 0], + [30, 30, 30, 30, 30], + [0, 11, 7, 35, 97]]) + sink2_envcounts = np.array([[100, 10, 15, 0, 25], + [100, 0, 50, 0, 0], + [0, 60, 30, 30, 30], + [7, 11, 0, 35, 97]]) + sink3_envcounts = np.array([[100, 10, 10, 5, 25], + [70, 20, 30, 30, 0], + [10, 30, 50, 30, 30], + [0, 27, 100, 20, 3]]) + all_envcounts = [sink1_envcounts, sink2_envcounts, sink3_envcounts] + sink_ids = np.array(['sink1', 'sink2', 'sink3']) + source_ids = np.array(['source1', 'source2', 'source3', 'source4']) + cols = list(source_ids) + ['Unknown'] + + prp_r1 = np.array([190, 141, 52, 65, 152]) / 600. + prp_r2 = np.array([207, 81, 95, 65, 152]) / 600. + prp_r3 = np.array([180, 87, 190, 85, 58]) / 600. + prp_data = np.vstack([prp_r1, prp_r2, prp_r3]) + + prp_std_data = np.zeros((3, 5), dtype=np.float64) + prp_std_data[0, 0] = (np.array([10, 150, 30, 0]) / 600.).std() + prp_std_data[0, 1] = (np.array([100, 0, 30, 11]) / 600.).std() + prp_std_data[0, 2] = (np.array([15, 0, 30, 7]) / 600.).std() + prp_std_data[0, 3] = (np.array([0, 0, 30, 35]) / 600.).std() + prp_std_data[0, 4] = (np.array([25, 0, 30, 97]) / 600.).std() + + prp_std_data[1, 0] = (np.array([100, 100, 0, 7]) / 600.).std() + prp_std_data[1, 1] = (np.array([10, 0, 60, 11]) / 600.).std() + prp_std_data[1, 2] = (np.array([15, 50, 30, 0]) / 600.).std() + prp_std_data[1, 3] = (np.array([0, 0, 30, 35]) / 600.).std() + prp_std_data[1, 4] = (np.array([25, 0, 30, 97]) / 600.).std() + + prp_std_data[2, 0] = (np.array([100, 70, 10, 0]) / 600.).std() + prp_std_data[2, 1] = (np.array([10, 20, 30, 27]) / 600.).std() + prp_std_data[2, 2] = (np.array([10, 30, 50, 100]) / 600.).std() + prp_std_data[2, 3] = (np.array([5, 30, 30, 20]) / 600.).std() + prp_std_data[2, 4] = (np.array([25, 0, 30, 3]) / 600.).std() + + exp_prp = pd.DataFrame(prp_data, index=sink_ids, columns=cols) + exp_prp_std = pd.DataFrame(prp_std_data, index=sink_ids, columns=cols) + obs_prp, obs_prp_std = cumulative_proportions(all_envcounts, sink_ids, + source_ids) + pd.util.testing.assert_frame_equal(obs_prp, exp_prp) + pd.util.testing.assert_frame_equal(obs_prp_std, exp_prp_std) + + def test_single_sink_feature_table(self): + # 4 draws, depth of sink = 10, 5 sources + Unknown. + final_env_assignments = np.array([[5, 0, 0, 0, 2, 0, 1, 0, 3, 1], + [1, 1, 3, 3, 2, 2, 1, 1, 1, 1], + [4, 1, 4, 4, 4, 4, 1, 1, 3, 2], + [2, 1, 0, 5, 5, 5, 5, 1, 0, 2]]) + # notice that each row is the same - they are determined by + # `generate_taxon_sequence` before the `gibbs_sampler` runs. + final_taxon_assignments = \ + np.array([[0, 3, 3, 227, 550, 550, 550, 999, 999, 1100], + [0, 3, 3, 227, 550, 550, 550, 999, 999, 1100], + [0, 3, 3, 227, 550, 550, 550, 999, 999, 1100], + [0, 3, 3, 227, 550, 550, 550, 999, 999, 1100], + [0, 3, 3, 227, 550, 550, 550, 999, 999, 1100]]) + # we are allowing more taxa than we have found in this sample, i.e. the + # largest value in `final_taxon_assignments` will be smaller than the + # largest index in the columns of the final table. + nfeatures = 1250 + nsources = 5 + data = np.zeros((nsources + 1, nfeatures), dtype=np.int32) + + # for the purpose of this test code, I'll increment data taxa by taxa. + data[np.array([5, 1, 4, 2]), 0] += 1 + data[0, 3] += 3 + data[1, 3] += 3 + data[3, 3] += 1 + data[4, 3] += 1 + data[np.array([0, 3, 4, 5]), 227] += 1 + data[0, 550] += 1 + data[1, 550] += 3 + data[2, 550] += 3 + data[4, 550] += 2 + data[5, 550] += 3 + data[0, 999] += 2 + data[1, 999] += 4 + data[3, 999] += 2 + data[1, 1100] += 2 + data[2, 1100] += 2 + + exp_sources = ['source%s' % i for i in range(nsources)] + ['Unknown'] + feature_ids = ['f%s' % i for i in range(1250)] + exp = pd.DataFrame(data, index=exp_sources, columns=feature_ids) + + source_ids = np.array(['source%s' % i for i in range(nsources)]) + obs = single_sink_feature_table(final_env_assignments, + final_taxon_assignments, source_ids, + feature_ids) + + pd.util.testing.assert_frame_equal(obs, exp) + + def test_collate_gibbs_results(self): + # We'll vary the depth of the sinks - simulating a situation where the + # user has not rarefied. + # We'll set: + # draws = 4 + # sink_depths = [10, 15, 7] + # sources = 5 (+1 unknown) + final_env_counts_sink1 = np.array([[5, 2, 1, 1, 0, 1], + [0, 6, 2, 2, 0, 0], + [0, 3, 1, 1, 5, 0], + [2, 2, 2, 0, 0, 4]]) + final_env_assignments_sink1 = \ + np.array([[5, 0, 0, 0, 2, 0, 1, 0, 3, 1], + [1, 1, 3, 3, 2, 2, 1, 1, 1, 1], + [4, 1, 4, 4, 4, 4, 1, 1, 3, 2], + [2, 1, 0, 5, 5, 5, 5, 1, 0, 2]]) + final_taxon_assignments_sink1 = \ + np.array([[0, 3, 3, 227, 550, 550, 550, 999, 999, 1100], + [0, 3, 3, 227, 550, 550, 550, 999, 999, 1100], + [0, 3, 3, 227, 550, 550, 550, 999, 999, 1100], + [0, 3, 3, 227, 550, 550, 550, 999, 999, 1100], + [0, 3, 3, 227, 550, 550, 550, 999, 999, 1100]]) + + final_env_counts_sink2 = np.array([[5, 1, 3, 2, 0, 4], + [1, 1, 4, 5, 1, 3], + [4, 1, 3, 2, 3, 2], + [2, 3, 3, 2, 1, 4]]) + final_env_assignments_sink2 = \ + np.array([[2, 5, 0, 5, 1, 5, 0, 0, 3, 0, 3, 5, 2, 2, 0], + [3, 2, 2, 3, 2, 3, 3, 5, 5, 1, 3, 4, 2, 0, 5], + [0, 2, 3, 2, 0, 0, 2, 4, 5, 4, 0, 5, 3, 1, 4], + [4, 3, 2, 1, 2, 5, 3, 5, 2, 0, 1, 0, 5, 1, 5]]) + final_taxon_assignments_sink2 = \ + np.array([[7, 7, 7, 7, 8, 8, 8, 8, 250, 250, 250, 250, 1249, 1249], + [7, 7, 7, 7, 8, 8, 8, 8, 250, 250, 250, 250, 1249, 1249], + [7, 7, 7, 7, 8, 8, 8, 8, 250, 250, 250, 250, 1249, 1249], + [7, 7, 7, 7, 8, 8, 8, 8, 250, 250, 250, 250, 1249, 1249]]) + + final_env_counts_sink3 = np.array([[4, 2, 0, 0, 1, 0], + [0, 3, 1, 0, 2, 1], + [0, 0, 1, 1, 3, 2], + [2, 1, 0, 3, 0, 1]]) + final_env_assignments_sink3 = \ + np.array([[4, 0, 0, 0, 1, 0, 1], + [1, 2, 1, 4, 5, 4, 1], + [4, 3, 5, 4, 4, 5, 2], + [3, 0, 1, 3, 3, 0, 5]]) + final_taxon_assignments_sink3 = \ + np.array([[3, 865, 865, 1100, 1100, 1100, 1249], + [3, 865, 865, 1100, 1100, 1100, 1249], + [3, 865, 865, 1100, 1100, 1100, 1249], + [3, 865, 865, 1100, 1100, 1100, 1249]]) + + # Create expected proportion data. + prp_data = np.zeros((3, 6), dtype=np.float64) + prp_std_data = np.zeros((3, 6), dtype=np.float64) + + prp_data[0] = (final_env_counts_sink1.sum(0) / + final_env_counts_sink1.sum()) + prp_data[1] = (final_env_counts_sink2.sum(0) / + final_env_counts_sink2.sum()) + prp_data[2] = (final_env_counts_sink3.sum(0) / + final_env_counts_sink3.sum()) + + prp_std_data[0] = \ + (final_env_counts_sink1 / final_env_counts_sink1.sum()).std(0) + prp_std_data[1] = \ + (final_env_counts_sink2 / final_env_counts_sink2.sum()).std(0) + prp_std_data[2] = \ + (final_env_counts_sink3 / final_env_counts_sink3.sum()).std(0) + + sink_ids = ['sink1', 'sink2', 'sink3'] + exp_sources = ['source%s' % i for i in range(5)] + ['Unknown'] + feature_ids = ['f%s' % i for i in range(1250)] + + exp_prp = pd.DataFrame(prp_data, index=sink_ids, columns=exp_sources) + exp_prp_std = pd.DataFrame(prp_std_data, index=sink_ids, + columns=exp_sources) + + # Create expected feature table data. + ft1 = np.zeros((6, 1250), dtype=np.int32) + for r, c in zip(final_env_assignments_sink1.ravel(), + final_taxon_assignments_sink1.ravel()): + ft1[r, c] += 1 + exp_ft1 = pd.DataFrame(ft1, index=exp_sources, columns=feature_ids) + ft2 = np.zeros((6, 1250), dtype=np.int32) + for r, c in zip(final_env_assignments_sink2.ravel(), + final_taxon_assignments_sink2.ravel()): + ft2[r, c] += 1 + exp_ft2 = pd.DataFrame(ft2, index=exp_sources, columns=feature_ids) + ft3 = np.zeros((6, 1250), dtype=np.int32) + for r, c in zip(final_env_assignments_sink3.ravel(), + final_taxon_assignments_sink3.ravel()): + ft3[r, c] += 1 + exp_ft3 = pd.DataFrame(ft3, index=exp_sources, columns=feature_ids) + exp_fts = [exp_ft1, exp_ft2, exp_ft3] + + # Prepare the inputs for passing to collate_gibbs_results + all_envcounts = [final_env_counts_sink1, final_env_counts_sink2, + final_env_counts_sink3] + all_env_assignments = [final_env_assignments_sink1, + final_env_assignments_sink2, + final_env_assignments_sink3] + all_taxon_assignments = [final_taxon_assignments_sink1, + final_taxon_assignments_sink2, + final_taxon_assignments_sink3] + + # Test when create_feature_tables=True + obs_prp, obs_prp_std, obs_fts = \ + collate_gibbs_results(all_envcounts, all_env_assignments, + all_taxon_assignments, np.array(sink_ids), + np.array(exp_sources[:-1]), + np.array(feature_ids), + create_feature_tables=True, loo=False) + pd.util.testing.assert_frame_equal(obs_prp, exp_prp) + pd.util.testing.assert_frame_equal(obs_prp_std, exp_prp_std) + for i in range(3): + pd.util.testing.assert_frame_equal(obs_fts[i], exp_fts[i]) + + # Test when create_feature_tables=False + obs_prp, obs_prp_std, obs_fts = \ + collate_gibbs_results(all_envcounts, all_env_assignments, + all_taxon_assignments, np.array(sink_ids), + np.array(exp_sources[:-1]), + np.array(feature_ids), + create_feature_tables=False, loo=False) + self.assertTrue(obs_fts is None) + + def test_collate_gibbs_results_loo(self): + # We'll vary the depth of the sources - simulating a situation where + # the user has not rarefied. + # We'll set: + # draws = 2 + # source_depths = [7, 4, 5] + # sources = 3 (+1 Unknown) + ec1 = np.array([[6, 0, 1], + [2, 2, 3]]) + ea1 = np.array([[0, 2, 0, 0, 0, 0, 0], + [0, 1, 0, 2, 1, 2, 2]]) + ta1 = np.array([[2, 2, 2, 4, 4, 4, 6], + [2, 2, 2, 4, 4, 4, 6]]) + + ec2 = np.array([[1, 2, 1], + [2, 2, 0]]) + ea2 = np.array([[0, 1, 2, 1], + [0, 1, 1, 0]]) + ta2 = np.array([[3, 3, 3, 3], + [3, 3, 3, 3]]) + + ec3 = np.array([[1, 2, 2], + [4, 0, 1]]) + ea3 = np.array([[1, 1, 0, 2, 2], + [0, 0, 0, 0, 2]]) + ta3 = np.array([[3, 3, 4, 5, 5], + [3, 3, 4, 5, 5]]) + + # Create expected proportion data. + prp_data = np.array([[0, 8/14., 2/14., 4/14.], + [3/8., 0, 4/8., 1/8.], + [5/10., 2/10., 0, 3/10.]], dtype=np.float64) + prp_std_data = np.zeros((3, 4), dtype=np.float64) + + prp_std_data[0, 1:] = (ec1 / ec1.sum()).std(0) + prp_std_data[1, np.array([0, 2, 3])] = (ec2 / ec2.sum()).std(0) + prp_std_data[2, np.array([0, 1, 3])] = (ec3 / ec3.sum()).std(0) + + exp_sources = ['source%s' % i for i in range(3)] + ['Unknown'] + feature_ids = ['f%s' % i for i in range(7)] + + exp_prp = pd.DataFrame(prp_data, index=exp_sources[:-1], + columns=exp_sources) + exp_prp_std = pd.DataFrame(prp_std_data, index=exp_sources[:-1], + columns=exp_sources) + + # Create expected feature table data. + ft1 = np.array([[0, 0, 0, 0, 0, 0, 0], + [0, 0, 4, 0, 3, 0, 1], + [0, 0, 1, 0, 1, 0, 0], + [0, 0, 1, 0, 2, 0, 1]], dtype=np.int64) + ft2 = np.array([[0, 0, 0, 3, 0, 0, 0], + [0, 0, 0, 0, 0, 0, 0], + [0, 0, 0, 4, 0, 0, 0], + [0, 0, 0, 1, 0, 0, 0]], dtype=np.int64) + ft3 = np.array([[0, 0, 0, 2, 2, 1, 0], + [0, 0, 0, 2, 0, 0, 0], + [0, 0, 0, 0, 0, 0, 0], + [0, 0, 0, 0, 0, 3, 0]], dtype=np.int64) + exp_fts = [pd.DataFrame(ft1, index=exp_sources, columns=feature_ids), + pd.DataFrame(ft2, index=exp_sources, columns=feature_ids), + pd.DataFrame(ft3, index=exp_sources, columns=feature_ids)] + + # Prepare the inputs for passing to collate_gibbs_results + all_envcounts = [ec1, ec2, ec3] + all_env_assignments = [ea1, ea2, ea3] + all_taxon_assignments = [ta1, ta2, ta3] + + # Test when create_feature_tables=True + obs_prp, obs_prp_std, obs_fts = \ + collate_gibbs_results(all_envcounts, all_env_assignments, + all_taxon_assignments, + np.array(exp_sources[:-1]), + np.array(exp_sources[:-1]), + np.array(feature_ids), + create_feature_tables=True, loo=True) + + pd.util.testing.assert_frame_equal(obs_prp, exp_prp) + pd.util.testing.assert_frame_equal(obs_prp_std, exp_prp_std) + for i in range(3): + pd.util.testing.assert_frame_equal(obs_fts[i], exp_fts[i]) + + # Test when create_feature_tables=False + obs_prp, obs_prp_std, obs_fts = \ + collate_gibbs_results(all_envcounts, all_env_assignments, + all_taxon_assignments, + np.array(exp_sources[:-1]), + np.array(exp_sources[:-1]), + np.array(feature_ids), + create_feature_tables=False, loo=True) + self.assertTrue(obs_fts is None) + + +class TestBookkeeping(TestCase): + '''Tests for fnxs which generate bookkeeping data for `gibbs_sampler`.''' def test_generate_environment_assignment(self): - np.random.seed(0) - self.sampler.generate_environment_assignments() - - exp_seq_env_assignments = np.array([0, 1, 0, 1, 1, 2, 0, 2, 0, 0, 0, 2, - 1, 2, 2]) - obs_seq_env_assignemnts = self.sampler.seq_env_assignments - np.testing.assert_array_equal(obs_seq_env_assignemnts, - exp_seq_env_assignments) - - exp_envcounts = np.array([6, 4, 5]) - obs_envcounts = self.sampler.envcounts - np.testing.assert_array_equal(obs_envcounts, exp_envcounts) - - def test_seq_assignments_to_contingency_table(self): - np.random.seed(0) - self.sampler.generate_taxon_sequence() - self.sampler.generate_environment_assignments() - obs_ct = self.sampler.seq_assignments_to_contingency_table() - - exp_ct = np.array([[2., 2., 0.], - [2., 1., 2.], - [2., 1., 3.]]) - - np.testing.assert_array_equal(obs_ct, exp_ct) + np.random.seed(235234234) + obs_sea, obs_ecs = generate_environment_assignments(100, 10) + exp_sea = \ + np.array([7, 3, 4, 1, 5, 2, 6, 3, 6, 4, 4, 7, 8, 2, 7, 7, 9, 9, 4, + 7, 0, 3, 6, 5, 7, 2, 7, 1, 2, 4, 1, 7, 0, 7, 5, 2, 8, 5, + 3, 3, 1, 4, 3, 3, 8, 7, 7, 5, 2, 6, 0, 2, 4, 0, 0, 5, 9, + 8, 2, 8, 9, 9, 8, 7, 5, 8, 0, 9, 8, 6, 3, 2, 3, 7, 3, 8, + 4, 4, 9, 1, 6, 6, 0, 9, 2, 9, 9, 4, 2, 9, 0, 4, 1, 3, 4, + 0, 0, 9, 8, 3]) + exp_ecs = np.array([10, 6, 11, 12, 12, 7, 7, 13, 10, 12]) + np.testing.assert_array_equal(obs_sea, exp_sea) + np.testing.assert_array_equal(obs_ecs, exp_ecs) class ConditionalProbabilityTests(TestCase): @@ -608,8 +807,9 @@ def test_single_pass(self): # Make calculations using gibbs function. np.random.seed(0) cp = ConditionalProbability(alpha1, alpha2, beta, source_data) - obs_mps, obs_ct = gibbs_sampler(cp, sink, restarts, draws_per_restart, - burnin, delay) + obs_ec, obs_ea, obs_ta = gibbs_sampler(sink, cp, restarts, + draws_per_restart, burnin, + delay) # Make calculation using handrolled. np.random.seed(0) @@ -656,16 +856,23 @@ def test_single_pass(self): if new_e == 2: unknown_vector[t] += 1 - prps = envcounts / float(envcounts.sum()) - exp_mps = prps/prps.sum() + # prps = envcounts / float(envcounts.sum()) + # exp_mps = prps/prps.sum() # Create taxon table like Sampler class would. exp_ct = np.zeros((4, 3)) for i in range(9): exp_ct[expected_et_pairs[1, i], np.int(seq_env_assignments[i])] += 1 - np.testing.assert_array_almost_equal(obs_mps.squeeze(), exp_mps) - np.testing.assert_array_equal(obs_ct.squeeze(), exp_ct) + # np.testing.assert_array_almost_equal(obs_mps.squeeze(), exp_mps) + # np.testing.assert_array_equal(obs_ct.squeeze().T, exp_ct) + + np.testing.assert_array_equal(obs_ec.squeeze(), envcounts) + np.testing.assert_array_equal(obs_ea.squeeze()[order], + seq_env_assignments) + np.testing.assert_array_equal(obs_ta.squeeze()[order], + expected_et_pairs[1, :]) + if __name__ == '__main__': main()