Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close ConnectorPageSource when exhausted in CheckpointEntryIterator #19120

Merged
merged 1 commit into from
Sep 22, 2023

Conversation

ebyhr
Copy link
Member

@ebyhr ebyhr commented Sep 22, 2023

Description

Relates to https://trinodb.slack.com/archives/C03E2DYAS0J/p1695295020667509

Release notes

(x) This is not user-visible or is docs only, and no release notes are required.

@ebyhr ebyhr added the no-release-notes This pull request does not require release notes entry label Sep 22, 2023
@cla-bot cla-bot bot added the cla-signed label Sep 22, 2023
@github-actions github-actions bot added the delta-lake Delta Lake connector label Sep 22, 2023
@ebyhr ebyhr self-assigned this Sep 22, 2023
@ebyhr ebyhr force-pushed the ebi/delta-resource branch from b68f374 to 6ab7762 Compare September 22, 2023 02:41
@@ -101,6 +101,7 @@
import static java.util.Objects.requireNonNull;

public class CheckpointEntryIterator
Copy link
Contributor

@findinpath findinpath Sep 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This iterator should be closeable because it opens a resource which might be left open if the iterator doesn't process all its entries.

Probably the code of the connector needs a bigger change to accustom this requirement.
Not sure whether returning a Stream in io.trino.plugin.deltalake.transactionlog.TableSnapshot#getCheckpointTransactionLogEntries is the way to go.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this PR fixes the resource leak when CheckpointEntryIterator is exhausted, which is a good change & let's ship it.

The other question is how to ensure resource is closed when CheckpointEntryIterator is not exhausted (ie due to exception). Returning Stream is not sufficient to guarantee that, but can be helpful. From the class docs

Streams have a BaseStream.close() method and implement AutoCloseable. Operating on a stream after it has been closed will throw IllegalStateException. Most stream instances do not actually need to be closed after use, as they are backed by collections, arrays, or generating functions, which require no special resource management. Generally, only streams whose source is an IO channel, such as those returned by Files.lines(Path), will require closing. If a stream does require closing, it must be opened as a resource within a try-with-resources statement or similar control structure to ensure that it is closed promptly after its operations have completed.

so yes, Stream + twr will be sufficient.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I actually tried the approach first and changed to this AbstractIterator as it required more work. I will take another look when I have the time.

@ebyhr ebyhr force-pushed the ebi/delta-resource branch from 6ab7762 to ac3b813 Compare September 22, 2023 07:57
@ebyhr ebyhr merged commit a6711fa into trinodb:master Sep 22, 2023
@ebyhr ebyhr deleted the ebi/delta-resource branch September 22, 2023 09:40
@github-actions github-actions bot added this to the 427 milestone Sep 22, 2023
@jkylling
Copy link
Contributor

Thank you for getting to this so quickly.

When looking for this bug we found it helpful to add the following to TestDeltaLakeMinioAndHmsConnectorSmokeTest

    @AfterMethod(alwaysRun = true)
    public void validateNoConnectionLeak(Method method) {
        assertThat(TrinoS3FileSystem.getFileSystemStats().getActiveConnections().getTotalCount()).isEqualTo(0);
    }

(we are using the hadoop based filesystem, as it's still the default on 423. Something similar might be possible with the new native S3 file system?)

This immediately pointed us to a test using checkpoints, which then lead us to the CheckpointEntryIterator. It might be useful to always have such an @AfterMethod on the tests.

We experimented briefly with fixing this by making the CheckpointEntryIterator, and using

 return stream(iterator).onClose(() -> {
     try {
         iterator.close();
     }
     catch (IOException e) {
         throw new UncheckedIOException(e);
     }
 });

And ensuring that the stream is always consumed in a try-with-resource statement (this is on 423, before the changes that do not always wrap the iterator in a stream). Perhaps one way to ensure safety would be to only expose CheckpointEntryIterator publicly as a stream.

@jkylling
Copy link
Contributor

We should also add a test case which we know will not trigger the Parquet reader to be closed on the first iteration. A checkpoint of the form added in

together with some special Parquet reader configuration would likely do it.

I'll see if I find time to add such a test case. We are interested in reproducing the connection pool issues we saw.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla-signed delta-lake Delta Lake connector no-release-notes This pull request does not require release notes entry
Development

Successfully merging this pull request may close these issues.

4 participants