-
Notifications
You must be signed in to change notification settings - Fork 85
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Returning success count from the .populate()
call
#1050
Changes from 16 commits
29357fe
b737003
d1011fb
6f7a0c0
1f358a9
eb827e6
1b4806e
0abd3c0
6bf2afc
37801d6
7a258d4
9480435
0f84560
02127a0
c061f8a
9ef2046
c66ff04
ff6b81c
45938aa
e143ce8
291a468
008a723
18fd619
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -160,6 +160,7 @@ def populate( | |||||||||||||||||||||||||||
max_calls=None, | ||||||||||||||||||||||||||||
display_progress=False, | ||||||||||||||||||||||||||||
processes=1, | ||||||||||||||||||||||||||||
return_success_count=False, | ||||||||||||||||||||||||||||
make_kwargs=None, | ||||||||||||||||||||||||||||
): | ||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||
|
@@ -176,6 +177,8 @@ def populate( | |||||||||||||||||||||||||||
:param max_calls: if not None, populate at most this many keys | ||||||||||||||||||||||||||||
:param display_progress: if True, report progress_bar | ||||||||||||||||||||||||||||
:param processes: number of processes to use. Set to None to use all cores | ||||||||||||||||||||||||||||
:param return_success_count: if True, return the count of successful `make()` calls. | ||||||||||||||||||||||||||||
If suppress_errors is also True, returns a tuple: (success_count, errors) | ||||||||||||||||||||||||||||
:param make_kwargs: Keyword arguments which do not affect the result of computation | ||||||||||||||||||||||||||||
to be passed down to each ``make()`` call. Computation arguments should be | ||||||||||||||||||||||||||||
specified within the pipeline e.g. using a `dj.Lookup` table. | ||||||||||||||||||||||||||||
|
@@ -222,49 +225,62 @@ def handler(signum, frame): | |||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
keys = keys[:max_calls] | ||||||||||||||||||||||||||||
nkeys = len(keys) | ||||||||||||||||||||||||||||
if not nkeys: | ||||||||||||||||||||||||||||
return | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
processes = min(_ for _ in (processes, nkeys, mp.cpu_count()) if _) | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
error_list = [] | ||||||||||||||||||||||||||||
populate_kwargs = dict( | ||||||||||||||||||||||||||||
suppress_errors=suppress_errors, | ||||||||||||||||||||||||||||
return_exception_objects=return_exception_objects, | ||||||||||||||||||||||||||||
make_kwargs=make_kwargs, | ||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||
success_list = [] | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
if nkeys: | ||||||||||||||||||||||||||||
processes = min(_ for _ in (processes, nkeys, mp.cpu_count()) if _) | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
if processes == 1: | ||||||||||||||||||||||||||||
for key in ( | ||||||||||||||||||||||||||||
tqdm(keys, desc=self.__class__.__name__) if display_progress else keys | ||||||||||||||||||||||||||||
): | ||||||||||||||||||||||||||||
error = self._populate1(key, jobs, **populate_kwargs) | ||||||||||||||||||||||||||||
if error is not None: | ||||||||||||||||||||||||||||
error_list.append(error) | ||||||||||||||||||||||||||||
else: | ||||||||||||||||||||||||||||
# spawn multiple processes | ||||||||||||||||||||||||||||
self.connection.close() # disconnect parent process from MySQL server | ||||||||||||||||||||||||||||
del self.connection._conn.ctx # SSLContext is not pickleable | ||||||||||||||||||||||||||||
with mp.Pool( | ||||||||||||||||||||||||||||
processes, _initialize_populate, (self, jobs, populate_kwargs) | ||||||||||||||||||||||||||||
) as pool, ( | ||||||||||||||||||||||||||||
tqdm(desc="Processes: ", total=nkeys) | ||||||||||||||||||||||||||||
if display_progress | ||||||||||||||||||||||||||||
else contextlib.nullcontext() | ||||||||||||||||||||||||||||
) as progress_bar: | ||||||||||||||||||||||||||||
for error in pool.imap(_call_populate1, keys, chunksize=1): | ||||||||||||||||||||||||||||
if error is not None: | ||||||||||||||||||||||||||||
error_list.append(error) | ||||||||||||||||||||||||||||
if display_progress: | ||||||||||||||||||||||||||||
progress_bar.update() | ||||||||||||||||||||||||||||
self.connection.connect() # reconnect parent process to MySQL server | ||||||||||||||||||||||||||||
populate_kwargs = dict( | ||||||||||||||||||||||||||||
suppress_errors=suppress_errors, | ||||||||||||||||||||||||||||
return_exception_objects=return_exception_objects, | ||||||||||||||||||||||||||||
make_kwargs=make_kwargs, | ||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
if processes == 1: | ||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is if not processes:
return {
"success_count": 0,
"error_list": [],
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is handled in the |
||||||||||||||||||||||||||||
for key in ( | ||||||||||||||||||||||||||||
tqdm(keys, desc=self.__class__.__name__) | ||||||||||||||||||||||||||||
if display_progress | ||||||||||||||||||||||||||||
else keys | ||||||||||||||||||||||||||||
): | ||||||||||||||||||||||||||||
status = self._populate1(key, jobs, **populate_kwargs) | ||||||||||||||||||||||||||||
if status is not None: | ||||||||||||||||||||||||||||
if isinstance(status, tuple): | ||||||||||||||||||||||||||||
error_list.append(status) | ||||||||||||||||||||||||||||
elif status: | ||||||||||||||||||||||||||||
success_list.append(1) | ||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we change
Suggested change
|
||||||||||||||||||||||||||||
else: | ||||||||||||||||||||||||||||
# spawn multiple processes | ||||||||||||||||||||||||||||
self.connection.close() # disconnect parent process from MySQL server | ||||||||||||||||||||||||||||
del self.connection._conn.ctx # SSLContext is not pickleable | ||||||||||||||||||||||||||||
with mp.Pool( | ||||||||||||||||||||||||||||
processes, _initialize_populate, (self, jobs, populate_kwargs) | ||||||||||||||||||||||||||||
) as pool, ( | ||||||||||||||||||||||||||||
tqdm(desc="Processes: ", total=nkeys) | ||||||||||||||||||||||||||||
if display_progress | ||||||||||||||||||||||||||||
else contextlib.nullcontext() | ||||||||||||||||||||||||||||
) as progress_bar: | ||||||||||||||||||||||||||||
for status in pool.imap(_call_populate1, keys, chunksize=1): | ||||||||||||||||||||||||||||
if status is not None: | ||||||||||||||||||||||||||||
if isinstance(status, tuple): | ||||||||||||||||||||||||||||
error_list.append(status) | ||||||||||||||||||||||||||||
elif status: | ||||||||||||||||||||||||||||
success_list.append(1) | ||||||||||||||||||||||||||||
if display_progress: | ||||||||||||||||||||||||||||
progress_bar.update() | ||||||||||||||||||||||||||||
self.connection.connect() # reconnect parent process to MySQL server | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
# restore original signal handler: | ||||||||||||||||||||||||||||
if reserve_jobs: | ||||||||||||||||||||||||||||
signal.signal(signal.SIGTERM, old_handler) | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
if suppress_errors and return_success_count: | ||||||||||||||||||||||||||||
return sum(success_list), error_list | ||||||||||||||||||||||||||||
if suppress_errors: | ||||||||||||||||||||||||||||
return error_list | ||||||||||||||||||||||||||||
if return_success_count: | ||||||||||||||||||||||||||||
return sum(success_list) | ||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If success_list is only used to report the sum, why not use a counter instead? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I went with this approach because of the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. make sense. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These tuple returns are tough because the order matters. How about we return this as a dict instead?
Suggested change
This would be a backward-incompatible change, so we need to update users. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we use this dictionary return, then we can always return the success count and the user does not need to request it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed, but as you said, the biggest thing is breaking backward compatibility and if we can avoid it, we should. Tuple returns are not as clean, but we can compensate with clear documentation of this feature in the docstring There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's okay to break backward compatibility for better design decisions long-term. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At this point, the docstring did not specify the return argument. So it's okay to introduce a modification now with proper documentation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @dimitri-yatsenko , yes, docstring did not specify the return argument. But we would still break backward compatibility if we make the changes you suggested. I'm okay with it if you're okay with it |
||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
def _populate1( | ||||||||||||||||||||||||||||
self, key, jobs, suppress_errors, return_exception_objects, make_kwargs=None | ||||||||||||||||||||||||||||
|
@@ -322,6 +338,7 @@ def _populate1( | |||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||
if jobs is not None: | ||||||||||||||||||||||||||||
jobs.complete(self.target.table_name, self._job_key(key)) | ||||||||||||||||||||||||||||
return True | ||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure about this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about returning There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When does it ever return There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Then you should add There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does not populating anything constitute a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The current function produces There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed, I'll update accordingly |
||||||||||||||||||||||||||||
finally: | ||||||||||||||||||||||||||||
self.__class__._allow_insert = False | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -53,6 +53,23 @@ def test_populate(self): | |
assert_true(self.ephys) | ||
assert_true(self.channel) | ||
|
||
def test_populate_with_success_count(self): | ||
# test simple populate | ||
assert_true(self.subject, "root tables are empty") | ||
assert_false(self.experiment, "table already filled?") | ||
success_count = self.experiment.populate(return_success_count=True) | ||
assert_equal(len(self.experiment.key_source & self.experiment), success_count) | ||
|
||
# test restricted populate | ||
assert_false(self.trial, "table already filled?") | ||
restriction = self.subject.proj(animal="subject_id").fetch("KEY")[0] | ||
d = self.trial.connection.dependencies | ||
d.load() | ||
success_count, _ = self.trial.populate( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will populate return a dict now? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the test should reflect that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
restriction, return_success_count=True, suppress_errors=True | ||
) | ||
assert_equal(len(self.trial.key_source & self.trial), success_count) | ||
|
||
def test_populate_exclude_error_and_ignore_jobs(self): | ||
# test simple populate | ||
assert_true(self.subject, "root tables are empty") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the docstring appears to be missing a description of the return argument.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we omit this input and always include the success count as part of the return dict?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit unclear what you mean here. The docstring is included there.
Do you mean it is not descriptive enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see what you mean
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docstring should include
:return: ...