-
Notifications
You must be signed in to change notification settings - Fork 64
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added support for no synchronization callbacks #1478
Conversation
Codecov ReportAttention:
... and 104 files with indirect coverage changes 📢 Thoughts on this report? Let us know!. |
…ient into rj/async-callbacks
…ient into rj/async-callbacks
def _get_callback(provided: Optional[NeptuneObjectCallback], env_name: str) -> Optional[NeptuneObjectCallback]: | ||
if provided is not None: | ||
return provided | ||
if env_name in os.environ and os.getenv(env_name) == "TRUE": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we can simplify it to just
if os.getenv(env_name, "") == "TRUE":
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Will do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
self._op_processor: OperationProcessor = get_operation_processor( | ||
mode=mode, | ||
container_id=self._id, | ||
container_type=self.container_type, | ||
backend=self._backend, | ||
lock=self._lock, | ||
flush_period=flush_period, | ||
async_lag_callback=self._async_lag_callback_method, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
async_lag_callback=partial(async_lag_callback, self) if async_lag_callback else None,
async_no_progress_callback=partial(async_no_progress_callback, self) if async_no_progress_callback else None,
?
Then we don't need those two additional methods _async_lag_callback_method
, _async_no_progress_callback_method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I was even thinking about that 😉
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
self._lag_exceeded = True | ||
|
||
def _check_no_progress(self): | ||
if self._should_call_no_progress_callback: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we rewrite it slightly so that it's less indented?
if not self._should_call_no_progress_callback:
return
with self._lock:
if self._should_call_no_progress_callback:
self._async_no_progress_callback()
self._should_call_no_progress_callback = False
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, is it necessary to check this condition twice?
if self._should_call_no_progress_callback
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is common pattern as lock acquiring is heavy operation: https://en.wikipedia.org/wiki/Double-checked_locking
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -107,6 +129,20 @@ def wait(self): | |||
if not self._consumer.is_running(): | |||
raise NeptuneSynchronizationAlreadyStoppedException() | |||
|
|||
def _check_lag(self): | |||
if not self._lag_exceeded and self._last_ack and monotonic() - self._last_ack > self._async_lag_threshold: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also here a de-dent?
if (
self._lag_exceeded or
not self._last_ack or
monotonic() - self._last_ack <= self._async_lag_threshold
):
return
with self._lock:
if not self._lag_exceeded:
self._async_no_progress_callback()
self._lag_exceeded = True
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As you wish 😉
LGTM (just a few non-blocking styling questions) |
Before submitting checklist