-
Notifications
You must be signed in to change notification settings - Fork 40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ECO-5033] fix: race condition when callingAblyRealtime#connect()
on terminated state
#1042
Conversation
WalkthroughThe changes in the Changes
Assessment against linked issues
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
🧰 Additional context used📓 Learnings (1)
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
- lib/src/main/java/io/ably/lib/transport/ConnectionManager.java (3 hunks)
🧰 Additional context used
🔇 Additional comments (2)
lib/src/main/java/io/ably/lib/transport/ConnectionManager.java (2)
698-698
: Proper synchronization for thread terminationThe use of
ConnectionManager.this.notifyAll();
after settinghandlerThread
tonull
correctly notifies any threads waiting on theConnectionManager
instance. This ensures that threads blocked in thestartup()
method'swait()
call are properly awakened when the handler thread terminates.
1347-1354
: Proper synchronization and interruption handling instartup()
methodThe synchronized
startup()
method now correctly waits for thehandlerThread
to becomenull
before starting a newActionHandler
thread. By using awhile
loop withwait()
, you ensure that only one handler thread is active at any time, preventing potential concurrency issues.Declaring
throws InterruptedException
allows the method to propagate the interruption to the caller, which aligns with best practices for handling thread interruptions.
375817f
to
d1637a3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (4)
lib/src/main/java/io/ably/lib/transport/ConnectionManager.java (4)
1348-1355
: Approve changes instartup()
with a minor suggestionThe addition of the while loop to wait for the termination of the handler thread is a good improvement. It addresses potential race conditions by ensuring proper thread cleanup before starting a new one, which aligns with the PR objective of fixing race conditions.
Consider adding a timeout to the wait operation to prevent potential deadlocks. Here's a suggested improvement:
private synchronized void startup() throws InterruptedException { + long startTime = System.currentTimeMillis(); + long timeout = 5000; // 5 seconds timeout while (handlerThread != null) { Log.v(TAG, "Waiting for termination action to clean up handler thread"); - wait(); + wait(timeout); + if (System.currentTimeMillis() - startTime > timeout) { + Log.w(TAG, "Timeout waiting for handler thread termination"); + break; + } } (handlerThread = new Thread(new ActionHandler())).start(); startConnectivityListener(); }
Line range hint
1099-1138
: Approve changes inonAuthUpdatedAsync()
with a minor suggestionThe use of a single-threaded executor to handle the state transition waiting is a good improvement. It prevents potential blocking of the main thread and aligns with the PR objective of improving thread handling in connection management.
Consider adding error handling for the executor's execution. Here's a suggested improvement:
singleThreadExecutor.execute(() -> { + try { boolean waitingForConnected = true; while (waitingForConnected) { final ErrorInfo reason = waiter.waitForChange(); final ConnectionState connectionState = currentState.state; switch (connectionState) { case connected: authUpdateResult.onUpdate(true, null); Log.v(TAG, "onAuthUpdated: got connected"); waitingForConnected = false; break; case connecting: case disconnected: Log.v(TAG, "onAuthUpdated: " + connectionState); break; default: /* suspended/closed/error: throw the error. */ Log.v(TAG, "onAuthUpdated: throwing exception"); authUpdateResult.onUpdate(false, reason); waitingForConnected = false; } } waiter.close(); + } catch (Exception e) { + Log.e(TAG, "Error in onAuthUpdatedAsync execution", e); + authUpdateResult.onUpdate(false, new ErrorInfo("Internal error in auth update", 50000)); + } });
Line range hint
1165-1186
: Approve changes inonConnected()
with a minor suggestion for consistencyThe additional logging for connection resume scenarios and handling of failed resumes without errors are good improvements. They enhance visibility into the connection process and improve the robustness of the code.
For consistency, consider using a constant for the log tag instead of hardcoding "TAG". Here's a suggested improvement:
if(message.connectionId.equals(connection.id)) { // RTN15c6 - resume success if(message.error == null) { - Log.d(TAG, "connection has reconnected and resumed successfully"); + Log.d(TAG, "Connection has reconnected and resumed successfully"); } else { - Log.d(TAG, "connection resume success with non-fatal error: " + message.error.message); + Log.d(TAG, "Connection resume success with non-fatal error: " + message.error.message); } addPendingMessagesToQueuedMessages(false); } else { // RTN15c7, RTN16d - resume failure if (message.error != null) { - Log.d(TAG, "connection resume failed with error: " + message.error.message); + Log.d(TAG, "Connection resume failed with error: " + message.error.message); } else { // This shouldn't happen but, putting it here for safety - Log.d(TAG, "connection resume failed without error" ); + Log.d(TAG, "Connection resume failed without error" ); } addPendingMessagesToQueuedMessages(true); channels.transferToChannelQueue(extractConnectionQueuePresenceMessages()); }
Line range hint
1203-1222
: Approve changes inaddPendingMessagesToQueuedMessages()
with a minor suggestionThe changes in this method improve the handling of message serials and ensure proper retrying of messages after a failed resume. This aligns well with the PR objective of improving connection resume handling.
Consider extracting the logic for resetting the message serial into a separate method for improved readability. Here's a suggested improvement:
private void addPendingMessagesToQueuedMessages(boolean resetMessageSerial) { synchronized (this) { List<QueuedMessage> allPendingMessages = pendingMessages.popAll(); - if (resetMessageSerial){ // failed resume, so all new published messages start with msgSerial = 0 - msgSerial = 0; //msgSerial will increase in sendImpl when messages are sent, RTN15c7 - } else if (!allPendingMessages.isEmpty()) { // pendingMessages needs to expect next msgSerial to be the earliest previously unacknowledged message - msgSerial = allPendingMessages.get(0).msg.msgSerial; - } + resetMessageSerialIfNeeded(resetMessageSerial, allPendingMessages); // Add messages from pending messages to front of queuedMessages in order to retry them queuedMessages.addAll(0, allPendingMessages); } } +private void resetMessageSerialIfNeeded(boolean resetMessageSerial, List<QueuedMessage> allPendingMessages) { + if (resetMessageSerial) { + // Failed resume, so all new published messages start with msgSerial = 0 + msgSerial = 0; // msgSerial will increase in sendImpl when messages are sent, RTN15c7 + } else if (!allPendingMessages.isEmpty()) { + // pendingMessages needs to expect next msgSerial to be the earliest previously unacknowledged message + msgSerial = allPendingMessages.get(0).msg.msgSerial; + } +}
d1637a3
to
1d28882
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (3)
lib/src/main/java/io/ably/lib/transport/ConnectionManager.java (3)
1348-1355
: Improved connection resume handling and logging inonConnected()
methodThe changes in the
onConnected()
method significantly improve the handling of connection resumes. The additional logging provides better visibility into the connection process, which is crucial for debugging. The method now properly handles both successful and failed connection resumes, aligning with the specifications mentioned in the comments (RTN15c, RTN15c6, RTN15c7, RTN16d).However, there's a minor improvement that could be made:
Consider using
Log.w()
instead ofLog.d()
for logging connection resume failures. This would make it easier to identify issues in production environments where debug logging might be disabled.- Log.d(TAG, "connection resume failed with error: " + message.error.message); + Log.w(TAG, "Connection resume failed with error: " + message.error.message);
1348-1355
: New method for extracting presence messagesThe addition of the
extractConnectionQueuePresenceMessages()
method is a good improvement. It safely extracts presence messages from the queued messages without risking concurrent modification exceptions. The use of an iterator is appropriate for this purpose.The method is well-implemented and the comment about Android compatibility is helpful for future maintainers.
Consider using a
LinkedList
instead of anArrayList
forqueuedPresenceMessages
. This would make theadd
operation more efficient, especially if there are many presence messages:- final List<QueuedMessage> queuedPresenceMessages = new ArrayList<>(); + final List<QueuedMessage> queuedPresenceMessages = new LinkedList<>();This change would improve performance when adding elements, as
LinkedList
has O(1) complexity for add operations at the end, whileArrayList
might need to resize.
1348-1355
: Improved handling of pending messages inaddPendingMessagesToQueuedMessages()
The changes in the
addPendingMessagesToQueuedMessages()
method significantly improve the handling of pending messages during connection state changes. The method now correctly handles the resetting of message serials based on whether a connection resume failed, aligning with the specifications mentioned (RTN19a, RTN19a1, RTN19a2).The addition of pending messages to the front of the queued messages list ensures they are sent first, which is the correct behavior for maintaining message order.
Consider adding a brief comment explaining the significance of resetting
msgSerial
to 0 in the case of a failed resume. This would improve code readability:if (resetMessageSerial){ // failed resume, so all new published messages start with msgSerial = 0 + // Reset msgSerial to 0 for a failed resume to ensure proper message ordering (RTN15c7) msgSerial = 0; //msgSerial will increase in sendImpl when messages are sent, RTN15c7 }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
- lib/src/main/java/io/ably/lib/transport/ConnectionManager.java (3 hunks)
🧰 Additional context used
🔇 Additional comments (3)
lib/src/main/java/io/ably/lib/transport/ConnectionManager.java (3)
794-800
: Improved error handling inconnect()
methodThe addition of the try-catch block for
InterruptedException
is a good improvement. It properly handles thread interruptions during connection startup by logging the error, restoring the interrupt status, and returning early. This change prevents further execution after an interruption, which is a good practice for maintaining thread safety and consistency.
1348-1355
: Enhanced thread management instartup()
methodThe addition of the while loop to wait for the termination of the handler thread is a significant improvement. This change ensures that only one handler thread is active at any given time, preventing potential concurrency issues. The method now properly handles thread interruptions by throwing an InterruptedException, allowing for better interrupt handling up the call stack. The use of wait() inside the loop allows for responsive termination of the thread.
Line range hint
1-2230
: Overall assessment of changes in ConnectionManager.javaThe modifications in this file significantly improve the robustness and reliability of the connection management system. Key improvements include:
- Enhanced error handling, particularly for thread interruptions.
- Improved thread management in the
startup()
method.- Better handling of connection resumes, including proper logging.
- New method for safely extracting presence messages.
- Improved handling of pending messages during connection state changes.
These changes effectively address the race condition mentioned in the PR summary and align well with the specified objectives. The code is now more resilient to edge cases in connection handling, which should lead to a more stable connection management system.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add test around this covering happy and failing path?
1d28882
to
29c1395
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (1)
lib/src/main/java/io/ably/lib/transport/ConnectionManager.java (1)
1355-1365
: LGTM: Enhanced startup process with improved synchronizationThe changes in the
startup()
method significantly improve the synchronization between startup and termination processes. The while loop ensures that startup doesn't proceed until any ongoing termination is complete, preventing potential race conditions. ThrowingInterruptedException
allows for better handling of thread interruptions.One minor suggestion:
Consider using a more descriptive log message for the case when
connect()
is called twice. For example:- Log.v(TAG, "`connect()` has been called twice on uninitialized or terminal state"); + Log.v(TAG, "`connect()` called on already initialized or non-terminal state. Ignoring redundant call.");This provides more context about why the second call is being ignored.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
- lib/src/main/java/io/ably/lib/transport/ConnectionManager.java (5 hunks)
🧰 Additional context used
🔇 Additional comments (4)
lib/src/main/java/io/ably/lib/transport/ConnectionManager.java (4)
74-77
: LGTM: New termination flag improves state managementThe addition of the
terminating
flag is a good improvement. It will help in managing the connection state more effectively, especially during the shutdown process. This can prevent potential race conditions and improve the overall reliability of the connection management.
800-806
: LGTM: Improved error handling inconnect()
methodThe changes in the
connect()
method enhance error handling by properly catching and managingInterruptedException
. The early return after logging the error and restoring the thread's interrupt status is a good practice. This improvement helps maintain the consistency of the connection state and aids in debugging.
704-705
: LGTM: Improved cleanup process in ActionHandlerThe changes in the
ActionHandler
class enhance the cleanup process when the handler thread is about to terminate. Settingterminating
to false allows for a potential restart of the connection manager, while thenotifyAll()
call ensures that any waiting threads are properly notified of this state change. These improvements contribute to a more robust and reliable connection management system.
Line range hint
1-1839
: Overall, these changes significantly improve connection managementThe modifications made to the
ConnectionManager
class effectively address the race condition mentioned in the PR objectives. The introduction of theterminating
flag, along with the enhancements in error handling and thread synchronization, contribute to a more robust and reliable connection management system.Key improvements include:
- Better handling of connection termination states
- Enhanced error management in the
connect()
method- Improved synchronization between startup and termination processes
- More efficient cleanup in the
ActionHandler
classThese changes align well with the goal of enhancing the stability and reliability of the connection process in the Ably Java library.
@sacOO7 Unfortunately, we don’t have the test infrastructure for unit testing thread synchronization, and introducing it is beyond the scope of this PR. It’s also impossible to reliably reproduce this in e2e tests. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…n terminated state
29c1395
to
a70c827
Compare
Resolves #1041
Summary by CodeRabbit
New Features
Bug Fixes
Logging Enhancements