Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix wait_for_publish that could hang for QoS=0 message #796

Merged
merged 3 commits into from
Jan 20, 2024

Conversation

PierreF
Copy link
Contributor

@PierreF PierreF commented Jan 7, 2024

If reconnect() is called (which happen on auto-reconnection or on first connection done by loop_forever), any pending QoS = 0 message were dropped without unblocking wait_for_publish().

We now ensure wait_for_publish() is unblocked and raise RuntimeError with MQTT_ERR_CONN_LOST

Fix #549

Copy link
Contributor

@akx akx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be a test for this?

if self.rc > 0:
raise RuntimeError(f'Message publish failed: {error_string(self.rc)}')


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

Comment on lines 1025 to 1209
# Before dropping all out_packet, ensure any QoS == 0 message info get
# marked as MQTT_ERR_CONN_LOST or the wait_for_publish() could hang forever
old_queue = self._out_packet
self._out_packet = collections.deque()

for pkt in old_queue:
if pkt["command"] & 0xF0 == PUBLISH and pkt["qos"] == 0:
pkt["info"].rc = MQTT_ERR_CONN_LOST
pkt["info"]._set_as_published()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there some subtlety here as to why old_queue is grabbed first and this is only done exactly after self._out_packet has been reallocated? If not, it feels like

Suggested change
# Before dropping all out_packet, ensure any QoS == 0 message info get
# marked as MQTT_ERR_CONN_LOST or the wait_for_publish() could hang forever
old_queue = self._out_packet
self._out_packet = collections.deque()
for pkt in old_queue:
if pkt["command"] & 0xF0 == PUBLISH and pkt["qos"] == 0:
pkt["info"].rc = MQTT_ERR_CONN_LOST
pkt["info"]._set_as_published()
# Mark all currently outgoing QoS = 0 packets as lost,
# or `wait_for_publish()` could hang forever
for pkt in self._out_packet:
if pkt["command"] & 0xF0 == PUBLISH and pkt["qos"] == 0:
pkt["info"].rc = MQTT_ERR_CONN_LOST
pkt["info"]._set_as_published()
self._out_packet = collections.deque()

would be the more obvious way to write this.

(self._out_packet.clear() would also save some allocations.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason way for what happen if a publish() if call during reconnect(). It would end in the new _out_packet or be marked at published... but well it's not very nice.

I'll check, but I think it better in all aspect to close the socket, then process the _set_as_published + clear the out_packet queue, because once closed we don't add message to out_packet.

@PierreF PierreF force-pushed the wait_for_publish_hang branch from 4223c51 to 81be49f Compare January 10, 2024 22:15
@PierreF PierreF force-pushed the wait_for_publish_hang branch from 81be49f to 532ba6f Compare January 20, 2024 13:34
@PierreF PierreF merged commit 4207e5e into master Jan 20, 2024
18 of 21 checks passed
@PierreF PierreF deleted the wait_for_publish_hang branch January 21, 2024 10:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

loop_start() before connect()
2 participants