Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clear up job queue parameters in word2vec #2931

Merged
merged 3 commits into from
Sep 8, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 15 additions & 34 deletions gensim/models/word2vec.py
Original file line number Diff line number Diff line change
Expand Up @@ -1045,10 +1045,10 @@ def _worker_loop(self, job_queue, progress_queue):

Parameters
----------
job_queue : Queue of (list of objects, (str, int))
job_queue : Queue of (list of objects, float)
A queue of jobs still to be processed. The worker will take up jobs from this queue.
Each job is represented by a tuple where the first element is the corpus chunk to be processed and
the second is the dictionary of parameters.
the second is the floating-point learning rate.
progress_queue : Queue of (int, int, int)
A queue of progress reports. Each report is represented as a tuple of these 3 elements:
* Size of data chunk processed, for example number of sentences in the corpus chunk.
Expand All @@ -1064,12 +1064,12 @@ def _worker_loop(self, job_queue, progress_queue):
if job is None:
progress_queue.put(None)
break # no more jobs => quit this worker
data_iterable, job_parameters = job
data_iterable, alpha = job

for callback in callbacks:
callback.on_batch_begin(self)

tally, raw_tally = self._do_train_job(data_iterable, job_parameters, thread_private_mem)
tally, raw_tally = self._do_train_job(data_iterable, alpha, thread_private_mem)

for callback in callbacks:
callback.on_batch_end(self)
Expand All @@ -1088,10 +1088,10 @@ def _job_producer(self, data_iterator, job_queue, cur_epoch=0, total_examples=No
----------
data_iterator : iterable of list of objects
The input dataset. This will be split in chunks and these chunks will be pushed to the queue.
job_queue : Queue of (list of object, dict of (str, int))
job_queue : Queue of (list of object, float)
A queue of jobs still to be processed. The worker will take up jobs from this queue.
Each job is represented by a tuple where the first element is the corpus chunk to be processed and
the second is the dictionary of parameters.
the second is the floating-point learning rate.
cur_epoch : int, optional
The current training epoch, needed to compute the training parameters for each job.
For example in many implementations the learning rate would be dropping with the number of epochs.
Expand All @@ -1105,7 +1105,7 @@ def _job_producer(self, data_iterator, job_queue, cur_epoch=0, total_examples=No
"""
job_batch, batch_size = [], 0
pushed_words, pushed_examples = 0, 0
next_job_params = self._get_job_params(cur_epoch)
next_alpha = self._get_next_alpha(0.0, cur_epoch)
job_no = 0

for data_idx, data in enumerate(data_iterator):
Expand All @@ -1118,7 +1118,7 @@ def _job_producer(self, data_iterator, job_queue, cur_epoch=0, total_examples=No
batch_size += data_length
else:
job_no += 1
job_queue.put((job_batch, next_job_params))
job_queue.put((job_batch, next_alpha))

# update the learning rate for the next job
if total_examples:
Expand All @@ -1129,14 +1129,14 @@ def _job_producer(self, data_iterator, job_queue, cur_epoch=0, total_examples=No
# words-based decay
pushed_words += self._raw_word_count(job_batch)
epoch_progress = 1.0 * pushed_words / total_words
next_job_params = self._update_job_params(next_job_params, epoch_progress, cur_epoch)
next_alpha = self._get_next_alpha(epoch_progress, cur_epoch)

# add the sentence that didn't fit as the first item of a new job
job_batch, batch_size = [data], data_length
# add the last job too (may be significantly smaller than batch_words)
if job_batch:
job_no += 1
job_queue.put((job_batch, next_job_params))
job_queue.put((job_batch, next_alpha))

if job_no == 0 and self.train_count == 0:
logger.warning(
Expand All @@ -1160,10 +1160,10 @@ def _log_epoch_progress(self, progress_queue=None, job_queue=None, cur_epoch=0,
* size of data chunk processed, for example number of sentences in the corpus chunk.
* Effective word count used in training (after ignoring unknown words and trimming the sentence length).
* Total word count used in training.
job_queue : Queue of (list of object, dict of (str, int))
job_queue : Queue of (list of object, float)
A queue of jobs still to be processed. The worker will take up jobs from this queue.
Each job is represented by a tuple where the first element is the corpus chunk to be processed and
the second is the dictionary of parameters.
the second is the floating-point learning rate.
cur_epoch : int, optional
The current training epoch, needed to compute the training parameters for each job.
For example in many implementations the learning rate would be dropping with the number of epochs.
Expand Down Expand Up @@ -1342,30 +1342,11 @@ def _train_epoch(self, data_iterable, cur_epoch=0, total_examples=None, total_wo

return trained_word_count, raw_word_count, job_tally

def _get_job_params(self, cur_epoch):
"""Get the learning rate used in the current epoch.

Parameters
----------
cur_epoch : int
Current iteration through the corpus

Returns
-------
float
The learning rate for this epoch (it is linearly reduced with epochs from `self.alpha` to `self.min_alpha`).

"""
alpha = self.alpha - ((self.alpha - self.min_alpha) * float(cur_epoch) / self.epochs)
return alpha

def _update_job_params(self, job_params, epoch_progress, cur_epoch):
def _get_next_alpha(self, epoch_progress, cur_epoch):
"""Get the correct learning rate for the next iteration.

Parameters
----------
job_params : dict of (str, obj)
UNUSED.
epoch_progress : float
Ratio of finished work in the current epoch.
cur_epoch : int
Expand Down Expand Up @@ -1476,9 +1457,9 @@ def _log_progress(self, job_queue, progress_queue, cur_epoch, example_count, tot

Parameters
----------
job_queue : Queue of (list of object, dict of (str, float))
job_queue : Queue of (list of object, float)
The queue of jobs still to be performed by workers. Each job is represented as a tuple containing
the batch of data to be processed and the parameters to be used for the processing as a dict.
the batch of data to be processed and the floating-point learning rate.
progress_queue : Queue of (int, int, int)
A queue of progress reports. Each report is represented as a tuple of these 3 elements:
* size of data chunk processed, for example number of sentences in the corpus chunk.
Expand Down