Skip to content

Commit

Permalink
Rework pairing to support more than two sequences
Browse files Browse the repository at this point in the history
martin-steinegger committed Nov 9, 2021
1 parent efacc69 commit e19df7c
Showing 1 changed file with 68 additions and 72 deletions.
140 changes: 68 additions & 72 deletions src/util/pairaln.cpp
Original file line number Diff line number Diff line change
@@ -9,110 +9,106 @@
#include "NcbiTaxonomy.h"
#include "MappingReader.h"

#define ZSTD_STATIC_LINKING_ONLY
#include <zstd.h>

#include <map>

#ifdef OPENMP
#include <omp.h>
#endif


// need for sorting the results
static bool compareByTaxId(const Matcher::result_t &first, const Matcher::result_t &second) {
return (first.dbOrfStartPos < second.dbOrfStartPos);
}

int pairaln(int argc, const char **argv, const Command& command) {
Parameters &par = Parameters::getInstance();
par.parseParameters(argc, argv, command, true, 0, 0);

const bool sameDB = par.db1.compare(par.db2) == 0 ? true : false;
const bool touch = (par.preloadMode != Parameters::PRELOAD_MODE_MMAP);

std::string db2NoIndexName = PrefilteringIndexReader::dbPathWithoutIndex(par.db2);
MappingReader* mapping = new MappingReader(db2NoIndexName);
const int dbaccessMode = (DBReader<unsigned int>::USE_INDEX | DBReader<unsigned int>::USE_DATA);
IndexReader qDbr(par.db1, par.threads, IndexReader::SRC_SEQUENCES, (touch) ? (IndexReader::PRELOAD_INDEX | IndexReader::PRELOAD_DATA) : 0, dbaccessMode);
IndexReader *tDbr;
if (sameDB) {
tDbr = &qDbr;
} else {
tDbr = new IndexReader(par.db2, par.threads, IndexReader::SRC_SEQUENCES, (touch) ? (IndexReader::PRELOAD_INDEX | IndexReader::PRELOAD_DATA) : 0, dbaccessMode);
}

DBReader<unsigned int> alnDbr(par.db3.c_str(), par.db3Index.c_str(), par.threads, DBReader<unsigned int>::USE_INDEX|DBReader<unsigned int>::USE_DATA);
alnDbr.open(DBReader<unsigned int>::NOSORT);
if(alnDbr.getSize() % 2 != 0){
Debug(Debug::ERROR) << "Alignment database does not seem to be paired\n";
EXIT(EXIT_FAILURE);
}
size_t localThreads = 1;
#ifdef OPENMP
localThreads = std::max(std::min((size_t)par.threads, alnDbr.getSize()), (size_t)1);
#endif

DBWriter resultWriter(par.db4.c_str(), par.db4Index.c_str(), localThreads, par.compressed, alnDbr.getDbtype());
DBWriter resultWriter(par.db4.c_str(), par.db4Index.c_str(), 1, par.compressed, alnDbr.getDbtype());
resultWriter.open();

Debug::Progress progress(alnDbr.getSize());
#pragma omp parallel num_threads(localThreads)
{
unsigned int thread_idx = 0;
#ifdef OPENMP
thread_idx = static_cast<unsigned int>(omp_get_thread_num());
#endif
do {
int thread_idx = 0;
char buffer[1024 + 32768*4];
std::string result;
result.reserve(1024 * 1024);
std::vector<Matcher::result_t> resultA;
resultA.reserve(100000);
std::vector<Matcher::result_t> resultB;
resultB.reserve(100000);
std::unordered_map<unsigned int, Matcher::result_t *> findPair;
std::string outputA;
outputA.reserve(100000);
std::string outputB;
outputB.reserve(100000);
#pragma omp for schedule(static, 2)
for (size_t i = 0; i < alnDbr.getSize(); i+=2) {
progress.updateProgress();
progress.updateProgress();
resultA.clear();
resultB.clear();
outputA.clear();
outputB.clear();
Matcher::readAlignmentResults(resultA, alnDbr.getData(i, thread_idx), true);
Matcher::readAlignmentResults(resultB, alnDbr.getData(i+1, thread_idx), true);
std::vector<Matcher::result_t> result;
result.reserve(100000);
std::unordered_map<unsigned int, size_t> findPair;
std::string output;
output.reserve(100000);

for (size_t resIdx = 0; resIdx < resultA.size(); ++resIdx) {
unsigned int taxon = mapping->lookup(resultA[resIdx].dbKey);
if(findPair.find(taxon) == findPair.end()){
findPair.insert({taxon, &resultA[resIdx]});
unsigned int referenceDbKey = 0;
size_t id = alnDbr.getId(referenceDbKey);
if (id == UINT_MAX) {
break;
}
char* data = alnDbr.getData(id, thread_idx);
Matcher::readAlignmentResults(result, data, true);
for (size_t resIdx = 0; resIdx < result.size(); ++resIdx) {
unsigned int taxon = mapping->lookup(result[resIdx].dbKey);
if (findPair.find(taxon) == findPair.end()) {
findPair.emplace(taxon, 1);
}
}

// find intersection between all proteins
for (size_t i = 0; i < alnDbr.getSize(); i++) {
unsigned int dbKey = alnDbr.getDbKey(i);
if (dbKey == referenceDbKey) {
continue;
}
result.clear();
Matcher::readAlignmentResults(result, alnDbr.getData(i, thread_idx), true);
// find pairs
for (size_t resIdx = 0; resIdx < result.size(); ++resIdx) {
unsigned int taxon = mapping->lookup(result[resIdx].dbKey);
if (findPair.find(taxon) != findPair.end()) {
findPair[taxon]++;
}
}
}

for (size_t i = 0; i < alnDbr.getSize(); i++) {
progress.updateProgress();
result.clear();
output.clear();
Matcher::readAlignmentResults(result, alnDbr.getData(i, thread_idx), true);
// find pairs
for (size_t resIdx = 0; resIdx < resultB.size(); ++resIdx) {
unsigned int taxon = mapping->lookup(resultB[resIdx].dbKey);
for (size_t resIdx = 0; resIdx < result.size(); ++resIdx) {
unsigned int taxon = mapping->lookup(result[resIdx].dbKey);
// we don't want to introduce a new field, reuse existing unused field here
result[resIdx].dbOrfStartPos = taxon;
}
// stable sort is required to assure that best hit is first per taxon
std::stable_sort(result.begin(), result.end(), compareByTaxId);
unsigned int prevTaxon = UINT_MAX;
for (size_t resIdx = 0; resIdx < result.size(); ++resIdx) {
unsigned int taxon = result[resIdx].dbOrfStartPos;
// found pair!
if(findPair.find(taxon) != findPair.end()) {
bool hasBacktrace = (resultB[resIdx].backtrace.size() > 0);
size_t len = Matcher::resultToBuffer(buffer, *findPair[taxon], hasBacktrace, false, false);
outputA.append(buffer, len);
len = Matcher::resultToBuffer(buffer, resultB[resIdx], hasBacktrace, false, false);
outputB.append(buffer, len);
findPair.erase (taxon);
if (taxon != prevTaxon
&& findPair.find(taxon) != findPair.end()
&& findPair[taxon] == alnDbr.getSize()) {
bool hasBacktrace = result[resIdx].backtrace.size() > 0;
size_t len = Matcher::resultToBuffer(buffer, result[resIdx], hasBacktrace, false, false);
output.append(buffer, len);
}
prevTaxon = taxon;
}
resultWriter.writeData(outputA.c_str(), outputA.length(), alnDbr.getDbKey(i), thread_idx);
resultWriter.writeData(outputB.c_str(), outputB.length(), alnDbr.getDbKey(i+1), thread_idx);
findPair.clear();
resultWriter.writeData(output.c_str(), output.length(), alnDbr.getDbKey(i), thread_idx);
}
}
} while(false);

resultWriter.close();
// clean up
delete mapping;
alnDbr.close();
if (sameDB == false) {
delete tDbr;
}

return 0;
return EXIT_SUCCESS;
}

0 comments on commit e19df7c

Please sign in to comment.