Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Unclosed AIOKafkaConnection when connecting to semi-broken broker #739

Closed

Conversation

bitphage
Copy link
Contributor

@bitphage bitphage commented Apr 23, 2021

Changes

Fixes #715

Checklist

  • I think the code is well written
  • Unit tests for the changes exist
  • Documentation reflects the changes
  • Add a new news fragment into the CHANGES folder
    • name it <issue_id>.<type> (e.g. 588.bugfix)
    • if you don't have an issue_id change it to the pr id after creating the PR
    • ensure type is one of the following:
      • .feature: Signifying a new feature.
      • .bugfix: Signifying a bug fix.
      • .doc: Signifying a documentation improvement.
      • .removal: Signifying a deprecation or removal of public API.
      • .misc: A ticket has been closed, but it is not of interest to users.
    • Make sure to use full sentences with correct case and punctuation, for example: Fix issue with non-ascii contents in doctest text files.

@bitphage bitphage changed the title Fix Unclosed AIOKafkaConnection case Fix Unclosed AIOKafkaConnection when connecting to semi-broken broker Apr 23, 2021
await conn.connect()
except Exception:
# Cleanup to prevent `Unclosed AIOKafkaConnection` if we failed to connect here
conn.close()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't be this logic inside AIOKafkaConnection.connect(), so that it maintains invariant too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about that, because AIOKafkaConnection.connect() is not being used directly, only via create_conn factory. Though, I can move this wrapping logic if you think that it should be more appropriate to have it there.

close_called = True

with mock.patch.object(AIOKafkaConnection, 'connect', mock_connect):
with mock.patch.object(AIOKafkaConnection, 'close', mock_close):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to reproduce the problem without mocking?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know how exactly kafka broker can be brought up to exactly this broken state unfortunately. We've got this situation multiple times on Amazon MSK when kafka cluster was under high load, which caused failures on some brokers + "under replicated partitions" issues.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without it we can be never sure, that we actually fixed the problem. Any script that eventually fails is fine, not necessary suitable for unit tests one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you remove exception handling from create_conn and run python -m pytest -v --log-cli-level=INFO -s --docker-image aiolibs/kafka:2.12_2.2.2 tests/test_conn.py::ConnIntegrationTest::test_semi_broken_connection there will be original error logged in live log as:

Exception ignored in: <function AIOKafkaConnection.__del__ at 0x7f01a90fc3b0>
Traceback (most recent call last):
  File "/home/vvk/devel/aiokafka/aiokafka/conn.py", line 194, in __del__
    source=self)
ResourceWarning: Unclosed AIOKafkaConnection <AIOKafkaConnection host=127.0.0.1 port=43803>

So the unittest is still mimics the issue. Basically any exception from connect() after self._read_task is set is a problem.

Copy link
Collaborator

@ods ods Apr 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there other possible race conditions there? Like initializing self._reader, but not self._read_task?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I think not. The most possible source of exceptions is later call await self._do_version_lookup()

@codecov
Copy link

codecov bot commented Apr 11, 2023

Codecov Report

Merging #739 (7b69257) into master (57439a6) will decrease coverage by 0.04%.
The diff coverage is 100.00%.

@@            Coverage Diff             @@
##           master     #739      +/-   ##
==========================================
- Coverage   97.56%   97.52%   -0.04%     
==========================================
  Files          30       30              
  Lines        5451     5455       +4     
==========================================
+ Hits         5318     5320       +2     
- Misses        133      135       +2     
Flag Coverage Δ
cext 88.37% <100.00%> (-0.03%) ⬇️
integration 97.48% <100.00%> (-0.04%) ⬇️
purepy 97.06% <100.00%> (+<0.01%) ⬆️
unit 38.07% <0.00%> (-0.03%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
aiokafka/conn.py 93.72% <100.00%> (+0.05%) ⬆️

... and 1 file with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@ods
Copy link
Collaborator

ods commented Jul 2, 2023

Looks like it was already fixed in #810

@ods ods closed this Jul 2, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Connection should be close if there is a failure during start
2 participants