From f8b6d24e9e42f7139ce787966fe52ac9a5295d57 Mon Sep 17 00:00:00 2001 From: shiki Date: Fri, 3 Aug 2018 00:12:12 -0400 Subject: [PATCH 01/15] added multiprocessing support for bm25 --- gensim/summarization/bm25.py | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/gensim/summarization/bm25.py b/gensim/summarization/bm25.py index 3a2bf5bbf6..9997d52c4c 100644 --- a/gensim/summarization/bm25.py +++ b/gensim/summarization/bm25.py @@ -37,6 +37,8 @@ import math from six import iteritems from six.moves import xrange +from functools import partial +from multiprocessing import Pool PARAM_K1 = 1.5 @@ -151,6 +153,13 @@ def get_scores(self, document, average_idf): scores.append(score) return scores +def _get_scores(bm25, document, average_idf): + scores = [] + for index in xrange(bm25.corpus_size): + score = bm25.get_score(document, index, average_idf) + scores.append(score) + return scores + def get_bm25_weights(corpus): """Returns BM25 scores (weights) of documents in corpus. @@ -180,9 +189,13 @@ def get_bm25_weights(corpus): bm25 = BM25(corpus) average_idf = sum(float(val) for val in bm25.idf.values()) / len(bm25.idf) - weights = [] - for doc in corpus: - scores = bm25.get_scores(doc, average_idf) - weights.append(scores) + # weights = [] + # for doc in corpus: + # scores = bm25.get_scores(doc, average_idf) + # weights.append(scores) + + get_score = partial(_get_scores, bm25, average_idf=average_idf) + pool = Pool() + weights = pool.map(get_score, corpus) return weights From c0346cc120b355ed5b8b23c09e9b2ee2d8c694eb Mon Sep 17 00:00:00 2001 From: shiki Date: Sat, 4 Aug 2018 12:23:45 -0400 Subject: [PATCH 02/15] added effective_n_job check --- gensim/summarization/bm25.py | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/gensim/summarization/bm25.py b/gensim/summarization/bm25.py index 9997d52c4c..9c2d8e58cb 100644 --- a/gensim/summarization/bm25.py +++ b/gensim/summarization/bm25.py @@ -38,7 +38,7 @@ from six import iteritems from six.moves import xrange from functools import partial -from multiprocessing import Pool +from multiprocessing import Pool, cpu_count PARAM_K1 = 1.5 @@ -153,6 +153,7 @@ def get_scores(self, document, average_idf): scores.append(score) return scores + def _get_scores(bm25, document, average_idf): scores = [] for index in xrange(bm25.corpus_size): @@ -161,7 +162,18 @@ def _get_scores(bm25, document, average_idf): return scores -def get_bm25_weights(corpus): +def _effective_n_jobs(n_jobs): + """Determine the number of jobs which are going to run in parallel""" + if n_jobs == 0: + raise ValueError('n_jobs == 0 in Parallel has no meaning') + elif n_jobs is None: + return 1 + elif n_jobs < 0: + n_jobs = max(cpu_count() + 1 + n_jobs, 1) + return n_jobs + + +def get_bm25_weights(corpus, n_jobs=1): """Returns BM25 scores (weights) of documents in corpus. Each document has to be weighted with every document in given corpus. @@ -189,13 +201,19 @@ def get_bm25_weights(corpus): bm25 = BM25(corpus) average_idf = sum(float(val) for val in bm25.idf.values()) / len(bm25.idf) + if _effective_n_jobs(n_jobs) == 1: + weights = [bm25.get_scores(doc, average_idf)] + return weights + # weights = [] # for doc in corpus: # scores = bm25.get_scores(doc, average_idf) # weights.append(scores) get_score = partial(_get_scores, bm25, average_idf=average_idf) - pool = Pool() + pool = Pool(n_jobs) weights = pool.map(get_score, corpus) + pool.close() + pool.join() return weights From a066f9ccccb65aad2111c03db03660eea3c627c1 Mon Sep 17 00:00:00 2001 From: shiki Date: Sat, 4 Aug 2018 12:25:05 -0400 Subject: [PATCH 03/15] added comment for helper function --- gensim/summarization/bm25.py | 1 + 1 file changed, 1 insertion(+) diff --git a/gensim/summarization/bm25.py b/gensim/summarization/bm25.py index 9c2d8e58cb..2594b6c5a4 100644 --- a/gensim/summarization/bm25.py +++ b/gensim/summarization/bm25.py @@ -155,6 +155,7 @@ def get_scores(self, document, average_idf): def _get_scores(bm25, document, average_idf): + """helper function for retrieve bm25 scores in parallel""" scores = [] for index in xrange(bm25.corpus_size): score = bm25.get_score(document, index, average_idf) From 567acf0a198e8a3f02fd67280cb6b5f9f6816d3b Mon Sep 17 00:00:00 2001 From: shiki Date: Sat, 4 Aug 2018 12:25:53 -0400 Subject: [PATCH 04/15] fixed minor error --- gensim/summarization/bm25.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gensim/summarization/bm25.py b/gensim/summarization/bm25.py index 2594b6c5a4..0f9409cc70 100644 --- a/gensim/summarization/bm25.py +++ b/gensim/summarization/bm25.py @@ -203,7 +203,7 @@ def get_bm25_weights(corpus, n_jobs=1): average_idf = sum(float(val) for val in bm25.idf.values()) / len(bm25.idf) if _effective_n_jobs(n_jobs) == 1: - weights = [bm25.get_scores(doc, average_idf)] + weights = [bm25.get_scores(doc, average_idf) for doc in corpus] return weights # weights = [] From bbc2efe94248de006f0740b531e750413bb433d8 Mon Sep 17 00:00:00 2001 From: shiki Date: Sat, 4 Aug 2018 12:27:54 -0400 Subject: [PATCH 05/15] deleted unwanted comments --- gensim/summarization/bm25.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/gensim/summarization/bm25.py b/gensim/summarization/bm25.py index 0f9409cc70..5eff5e5117 100644 --- a/gensim/summarization/bm25.py +++ b/gensim/summarization/bm25.py @@ -206,11 +206,6 @@ def get_bm25_weights(corpus, n_jobs=1): weights = [bm25.get_scores(doc, average_idf) for doc in corpus] return weights - # weights = [] - # for doc in corpus: - # scores = bm25.get_scores(doc, average_idf) - # weights.append(scores) - get_score = partial(_get_scores, bm25, average_idf=average_idf) pool = Pool(n_jobs) weights = pool.map(get_score, corpus) From c0f03bcb6f3bbb0ceaed7bdb8324af7b9bcaf097 Mon Sep 17 00:00:00 2001 From: shiki Date: Sat, 4 Aug 2018 15:34:00 -0400 Subject: [PATCH 06/15] updated example with new api --- gensim/summarization/bm25.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gensim/summarization/bm25.py b/gensim/summarization/bm25.py index 5eff5e5117..ce96b5aba2 100644 --- a/gensim/summarization/bm25.py +++ b/gensim/summarization/bm25.py @@ -196,7 +196,7 @@ def get_bm25_weights(corpus, n_jobs=1): ... ["cat", "outer", "space"], ... ["wag", "dog"] ... ] - >>> result = get_bm25_weights(corpus) + >>> result = get_bm25_weights(corpus, n_jobs=-1) """ bm25 = BM25(corpus) From 404c3f524a434f395de0375edb7051d7bcb083b3 Mon Sep 17 00:00:00 2001 From: shiki Date: Sat, 4 Aug 2018 15:34:46 -0400 Subject: [PATCH 07/15] updated example with new api --- gensim/summarization/bm25.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gensim/summarization/bm25.py b/gensim/summarization/bm25.py index ce96b5aba2..c162e20629 100644 --- a/gensim/summarization/bm25.py +++ b/gensim/summarization/bm25.py @@ -22,7 +22,7 @@ ... ["cat", "outer", "space"], ... ["wag", "dog"] ... ] ->>> result = get_bm25_weights(corpus) +>>> result = get_bm25_weights(corpus, n_jobs=-1) Data: From 385bcf578d745a5940688d9aac01d94ecb7f497f Mon Sep 17 00:00:00 2001 From: shiki Date: Sat, 4 Aug 2018 20:21:20 -0400 Subject: [PATCH 08/15] updated support for multiprocessing --- gensim/summarization/bm25.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/gensim/summarization/bm25.py b/gensim/summarization/bm25.py index c162e20629..e3945477c3 100644 --- a/gensim/summarization/bm25.py +++ b/gensim/summarization/bm25.py @@ -202,12 +202,13 @@ def get_bm25_weights(corpus, n_jobs=1): bm25 = BM25(corpus) average_idf = sum(float(val) for val in bm25.idf.values()) / len(bm25.idf) - if _effective_n_jobs(n_jobs) == 1: + n_processes = _effective_n_jobs(n_jobs) + if n_processes == 1: weights = [bm25.get_scores(doc, average_idf) for doc in corpus] return weights get_score = partial(_get_scores, bm25, average_idf=average_idf) - pool = Pool(n_jobs) + pool = Pool(n_processes) weights = pool.map(get_score, corpus) pool.close() pool.join() From 95846ba1c6095d3b2769e758d6807de3a898dc63 Mon Sep 17 00:00:00 2001 From: shiki Date: Sat, 4 Aug 2018 21:49:05 -0400 Subject: [PATCH 09/15] updated docstring --- gensim/summarization/bm25.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/gensim/summarization/bm25.py b/gensim/summarization/bm25.py index e3945477c3..9e04fe8b56 100644 --- a/gensim/summarization/bm25.py +++ b/gensim/summarization/bm25.py @@ -182,6 +182,8 @@ def get_bm25_weights(corpus, n_jobs=1): ---------- corpus : list of list of str Corpus of documents. + n_jobs : int + The number of processes to use for computing bm25. Returns ------- From 8ca83cc087531e5f0ea055e5e06ea531dec0ed82 Mon Sep 17 00:00:00 2001 From: shiki Date: Sun, 5 Aug 2018 15:46:37 -0400 Subject: [PATCH 10/15] fixed typo --- gensim/summarization/bm25.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gensim/summarization/bm25.py b/gensim/summarization/bm25.py index 9e04fe8b56..4106607b7b 100644 --- a/gensim/summarization/bm25.py +++ b/gensim/summarization/bm25.py @@ -155,7 +155,7 @@ def get_scores(self, document, average_idf): def _get_scores(bm25, document, average_idf): - """helper function for retrieve bm25 scores in parallel""" + """helper function for retrieving bm25 scores in parallel""" scores = [] for index in xrange(bm25.corpus_size): score = bm25.get_score(document, index, average_idf) From 16df452f500121a68dc2e565e62f6d73a5d9e809 Mon Sep 17 00:00:00 2001 From: shiki Date: Sun, 5 Aug 2018 22:50:28 -0400 Subject: [PATCH 11/15] fixed formatting --- gensim/test/test_BM25.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/gensim/test/test_BM25.py b/gensim/test/test_BM25.py index a96302e8c9..0372bd8fc4 100644 --- a/gensim/test/test_BM25.py +++ b/gensim/test/test_BM25.py @@ -44,6 +44,15 @@ def test_disjoint_docs_if_weight_zero(self): self.assertAlmostEqual(weights[0][1], 0) self.assertAlmostEqual(weights[1][0], 0) + def test_multiprocessing(self): + """ Result should be the same using different processes """ + weights1 = get_bm25_weights(common_texts) + weights2 = get_bm25_weights(common_texts, n_jobs=2) + weights3 = get_bm25_weights(common_texts, n_jobs=-1) + self.assertEqual(weights1, weights2) + self.assertEqual(weights1, weights3) + self.assertEqual(weights2, weights3) + if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) From 8a74d1249be79307e3c2fd64603fcb8bf4d93729 Mon Sep 17 00:00:00 2001 From: shiki Date: Tue, 7 Aug 2018 23:41:02 -0400 Subject: [PATCH 12/15] changed assertEqual to assertAlmostEqual --- gensim/test/test_BM25.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/gensim/test/test_BM25.py b/gensim/test/test_BM25.py index 0372bd8fc4..e37efc2920 100644 --- a/gensim/test/test_BM25.py +++ b/gensim/test/test_BM25.py @@ -49,9 +49,9 @@ def test_multiprocessing(self): weights1 = get_bm25_weights(common_texts) weights2 = get_bm25_weights(common_texts, n_jobs=2) weights3 = get_bm25_weights(common_texts, n_jobs=-1) - self.assertEqual(weights1, weights2) - self.assertEqual(weights1, weights3) - self.assertEqual(weights2, weights3) + self.assertAlmostEqual(weights1, weights2) + self.assertAlmostEqual(weights1, weights3) + self.assertAlmostEqual(weights2, weights3) if __name__ == '__main__': From a9cf90e6dd03bac7954484dce8db2114b2d3c48e Mon Sep 17 00:00:00 2001 From: shiki Date: Thu, 9 Aug 2018 00:18:03 -0400 Subject: [PATCH 13/15] fixed docstrings to numpy-style --- gensim/summarization/bm25.py | 36 ++++++++++++++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/gensim/summarization/bm25.py b/gensim/summarization/bm25.py index 4106607b7b..ab50b276b4 100644 --- a/gensim/summarization/bm25.py +++ b/gensim/summarization/bm25.py @@ -155,7 +155,24 @@ def get_scores(self, document, average_idf): def _get_scores(bm25, document, average_idf): - """helper function for retrieving bm25 scores in parallel""" + """Helper function for retrieving bm25 scores of given `document` in parallel + in relation to every item in corpus. + + Parameters + ---------- + bm25 : BM25 object + BM25 object fitted on the corpus where documents are retrieved. + document : list of str + Document to be scored. + average_idf : float + Average idf in corpus. + + Returns + ------- + list of float + BM25 scores. + + """ scores = [] for index in xrange(bm25.corpus_size): score = bm25.get_score(document, index, average_idf) @@ -164,7 +181,22 @@ def _get_scores(bm25, document, average_idf): def _effective_n_jobs(n_jobs): - """Determine the number of jobs which are going to run in parallel""" + """Determines the number of jobs can run in parallel. + + Just like in sklearn, passing n_jobs=-1 means using all available + CPU cores. + + Parameters + ---------- + n_jobs : int + Number of workers requested by caller. + + Returns + ------- + int + number of effective jobs + + """ if n_jobs == 0: raise ValueError('n_jobs == 0 in Parallel has no meaning') elif n_jobs is None: From f9db84947ec4d7f41495bb78218363abb0b67979 Mon Sep 17 00:00:00 2001 From: shiki Date: Thu, 9 Aug 2018 20:59:44 -0400 Subject: [PATCH 14/15] removed space from blank lines --- gensim/summarization/bm25.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/gensim/summarization/bm25.py b/gensim/summarization/bm25.py index ab50b276b4..493847adce 100644 --- a/gensim/summarization/bm25.py +++ b/gensim/summarization/bm25.py @@ -157,7 +157,7 @@ def get_scores(self, document, average_idf): def _get_scores(bm25, document, average_idf): """Helper function for retrieving bm25 scores of given `document` in parallel in relation to every item in corpus. - + Parameters ---------- bm25 : BM25 object @@ -171,7 +171,7 @@ def _get_scores(bm25, document, average_idf): ------- list of float BM25 scores. - + """ scores = [] for index in xrange(bm25.corpus_size): @@ -185,7 +185,7 @@ def _effective_n_jobs(n_jobs): Just like in sklearn, passing n_jobs=-1 means using all available CPU cores. - + Parameters ---------- n_jobs : int @@ -195,7 +195,7 @@ def _effective_n_jobs(n_jobs): ------- int number of effective jobs - + """ if n_jobs == 0: raise ValueError('n_jobs == 0 in Parallel has no meaning') From 446d6b13cbd75ad2ab594a3b939a8ab668475ca1 Mon Sep 17 00:00:00 2001 From: shiki Date: Sat, 11 Aug 2018 22:43:27 -0400 Subject: [PATCH 15/15] moved effective_n_jobs to utils --- gensim/summarization/bm25.py | 32 +++----------------------------- gensim/utils.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 31 insertions(+), 29 deletions(-) diff --git a/gensim/summarization/bm25.py b/gensim/summarization/bm25.py index 493847adce..50019c32fe 100644 --- a/gensim/summarization/bm25.py +++ b/gensim/summarization/bm25.py @@ -38,8 +38,8 @@ from six import iteritems from six.moves import xrange from functools import partial -from multiprocessing import Pool, cpu_count - +from multiprocessing import Pool +from ..utils import effective_n_jobs PARAM_K1 = 1.5 PARAM_B = 0.75 @@ -180,32 +180,6 @@ def _get_scores(bm25, document, average_idf): return scores -def _effective_n_jobs(n_jobs): - """Determines the number of jobs can run in parallel. - - Just like in sklearn, passing n_jobs=-1 means using all available - CPU cores. - - Parameters - ---------- - n_jobs : int - Number of workers requested by caller. - - Returns - ------- - int - number of effective jobs - - """ - if n_jobs == 0: - raise ValueError('n_jobs == 0 in Parallel has no meaning') - elif n_jobs is None: - return 1 - elif n_jobs < 0: - n_jobs = max(cpu_count() + 1 + n_jobs, 1) - return n_jobs - - def get_bm25_weights(corpus, n_jobs=1): """Returns BM25 scores (weights) of documents in corpus. Each document has to be weighted with every document in given corpus. @@ -236,7 +210,7 @@ def get_bm25_weights(corpus, n_jobs=1): bm25 = BM25(corpus) average_idf = sum(float(val) for val in bm25.idf.values()) / len(bm25.idf) - n_processes = _effective_n_jobs(n_jobs) + n_processes = effective_n_jobs(n_jobs) if n_processes == 1: weights = [bm25.get_scores(doc, average_idf) for doc in corpus] return weights diff --git a/gensim/utils.py b/gensim/utils.py index ec02cf4bb2..35abc203d8 100644 --- a/gensim/utils.py +++ b/gensim/utils.py @@ -44,6 +44,8 @@ from smart_open import smart_open +from multiprocessing import cpu_count + if sys.version_info[0] >= 3: unicode = str @@ -2025,3 +2027,29 @@ def lazy_flatten(nested_list): yield sub else: yield el + + +def effective_n_jobs(n_jobs): + """Determines the number of jobs can run in parallel. + + Just like in sklearn, passing n_jobs=-1 means using all available + CPU cores. + + Parameters + ---------- + n_jobs : int + Number of workers requested by caller. + + Returns + ------- + int + Number of effective jobs. + + """ + if n_jobs == 0: + raise ValueError('n_jobs == 0 in Parallel has no meaning') + elif n_jobs is None: + return 1 + elif n_jobs < 0: + n_jobs = max(cpu_count() + 1 + n_jobs, 1) + return n_jobs