Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combined set of changes for discussion, focussed on a new methodology for record grouping #319

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

mkeskells
Copy link
Contributor

@mkeskells mkeskells commented Oct 29, 2024

This is a very draft form, and would be structured into several PRs. It's a merge of some existing open PRs, a few minor fixes and a more major rework of the way that records are buffered and written to files. Its structured as a subclass, to make it easier to reason about, but may well be a separate sink connector, or merge into the parent

It's not expected that this PR will be merged in its current form. Bits may be cherry picked from theist merge. Its looking for a review of the approach and whether this approach would work well in this connector, or its too much of a change and forked into a private repo (which would be sad)

In summary 087b4b2 is focussed on this approach

  1. When files are full, write them
  2. When records are too old, write the files
  3. Don't hold record in memory if we don't have to
  4. Request commits from Kafka connect when we have completed to write of a file (with a bit of debouncing), rather than waiting for commit to be forced by timeout. This makes writing less batched
  5. Avoid OOM issues, and reduce memory pressure by hastening the write of files, and if needed pausing consumption of records until writes have occurred
  6. Provide different writer models (currently just eager or lazy) provide different options on writer behaviour, different resource pressures and timings. There are probably more options and tweaking to do here

These changes could be applied to S3 - most of the logic isn't GCS specific, it just batching and file writing

This does change the semantics of the existing connector - e.g. at least

  • overwrites of records with the same key don't occur - there isn't a batch in the same way (I don't need this, and it could be added)
  • different file names could happen, so a reset and rewrite could cause duplicates.
    This can happen in the existing record grouper if there is some reconfiguration after a incomplete write and restart
    With these changes it could also occur because the files are written earlier (based on timing), and the batching causes different groups to happen. I don't need this behaviour as I can remove duplicates downstream

(all of the above is in 087b4b2d9681578005dc97b61e7e4e2e6229656c

prior commits are from other PRs, here because this builds upon them, or for the ease of my development

Mike Skells added 7 commits October 8, 2024 13:23
from a header value
from a data field
via a custom extractors

Remove a few simple classes and make a DataExtractor to read things from the `sinkRecord`
and few tidyups
Introduce a new property `file.record.grouper.builder` to specify a builder for a
grouper
enable the grouper to define additional properties and associated documentation

Minor refactors of the 'File' common configuration shared between S3 and GCS
introduce some more validators

Add tests for custom record grouper factory
add tests for additional config definition
… RecordGrouper, and more a something that tells the caller the group, not manages the group.

Allow full files to be written in the background
Allow files to be written when we reach a timeout (i.e. a max delay for a record)
Update the kafka commits when we have written files, rather than waiting for commit to be forced by timeout
Avoid OOM issues by having back pressure, so that we can flush or cause earlier writes if we have too many buffered records
Provide different writer models so that we can write data before the flush (removing the memory pressure potentially, depending on the writer)
@mkeskells mkeskells requested review from a team as code owners October 29, 2024 17:44
@mkeskells mkeskells marked this pull request as draft October 29, 2024 17:44
@aindriu-aiven
Copy link
Contributor

Thanks @mkeskells I'll take a look through today, I have also reached out to some other long time members and have asked them to take a look!

@mkeskells
Copy link
Contributor Author

@aindriu-aiven happy to get on a call or some other form to discuss this. I am sure that a PR is not the most effective way to progress this

} else {
stream = recordGrouper.records().entrySet().stream();
}
stream.forEach( entry -> flushFile(entry.getKey(), entry.getValue()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @mkeskells,
I have been looking through this PR this morning, at the moment, I have a PR open that initiates the upload to S3 on the put() operation and once records are successfully uploaded they are removed from the record grouping, this is an effort to reduce memory usage and improve performance.
#318

I am trying to see how we can dovetail this with the record streamer you have proposed here because, I really like the addition of this functionality, especially adding a max record age for files and I was previously looking at extending the "rotator" functionality to handle this, and I would be interested in your thoughts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @aindriu-aiven happy to see how we can do this.
Can we connect and progress this together? I have production deadlines (and not using S3)
I did design this change so that I think it should work in a common manner, not bound to a particular storage provider

As I am reading this I think that the eager writing model that I have here, is roughly what you have built for S3. Is that correct. I wanted to have the non eager model as well, so that would allow record overwriting for duplicate keys

I was also trying to ensure that what was written would work well with virtual threads & loom. I know that we can't use them yet here until we support a later JVM, but using the FJ pool and the CompletableFuture allows this to use this and scale later without having to rewrite continuations, and its much easier to debug!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its does seem that the goals and the concepts overlap
I have connected to you on LinkedIn, so that we can discuss how to progress outside of a PR comment, which doesn't seem to be the correct forum

}

protected static int addFileConfigGroup(final ConfigDef configDef, final String groupFile, final String type,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also have a PR open #312 that will split AivenCommonConfig into a Source and Sink Common config so that we can start building the Source connectors to start reading back the data we have stored here, This is mostly a minor structural change but just wanted to give you a heads up. This info does make sense to be moved to the Sink Common config imo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to work with what you are doing. It does seems that the config is duplicate a lot, and some bits don't seem to be used.
What do you suggest as the best was to do this, without causing too much conflict that we can avoid @aindriu-aiven ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants