-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor lookups behavior while loading/dropping the containers #14806
Refactor lookups behavior while loading/dropping the containers #14806
Conversation
329e3a7
to
4b42abb
Compare
@@ -256,11 +259,12 @@ | |||
).addChunk(strResult); | |||
EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder); | |||
EasyMock.replay(druidLeaderClient); | |||
LookupExtractorFactoryContainer container = new LookupExtractorFactoryContainer("0", lookupExtractorFactory); |
Check notice
Code scanning / CodeQL
Possible confusion of local and field Note test
testDestroyIsCalledAfterRemove
container
/** | ||
* awaitToInitialise blocks and wait for the cache to initialize fully. | ||
*/ | ||
void awaitToInitialise() throws InterruptedException, TimeoutException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think "initialize" is the most standard US english spelling over "initialise", I find hundreds of matches in java codes for "initialize" and none for "initialise". Additionally, we have some other methods named awaitInitialization()
on things like broker server view so maybe that is a good name
/** | ||
* @return true if cache is loaded and lookup is queryable else returns false | ||
*/ | ||
boolean isCacheLoaded(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to be more consistent with the other method name to block waiting for this to be true, i recommend isInitialized
or something
RetryUtils.retry(() -> { | ||
lookupExtractorFactoryContainer.getLookupExtractorFactory().awaitToInitialise(); | ||
return null; | ||
}, e -> true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i dont' think this needs to be a blocker, but it would be nice if we could distinguish errors which are retryable from errors which are not, but i imagine there is too much variety in the errors which could be thrown for us to do this and we would probably need to standardize the error states for lookup implementations to do this effectively.
@@ -526,6 +530,7 @@ public void testRealModeWithMainThread() throws Exception | |||
LookupExtractorFactory lookupExtractorFactory = EasyMock.createMock(LookupExtractorFactory.class); | |||
EasyMock.expect(lookupExtractorFactory.start()).andReturn(true).once(); | |||
EasyMock.expect(lookupExtractorFactory.destroy()).andReturn(true).once(); | |||
EasyMock.expect(lookupExtractorFactory.isCacheLoaded()).andReturn(true).anyTimes(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be nice to add some tests that simulate sad paths where the new lookup does not load to ensure proper behavior where the new container is destroyed and old container is still operational, and also tests where new doesn't initially load and so goes through the retry path but then successfully loads and the old is destroyed
23c2c61
to
dfa7e3b
Compare
dfa7e3b
to
913768a
Compare
@@ -73,6 +75,7 @@ public JdbcExtractionNamespace( | |||
@JsonProperty(value = "filter") @Nullable final String filter, | |||
@Min(0) @JsonProperty(value = "pollPeriod") @Nullable final Period pollPeriod, | |||
@JsonProperty(value = "maxHeapPercentage") @Nullable final Long maxHeapPercentage, | |||
@JsonProperty(value = "loadTimeout") @Nullable final Long loadTimeout, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should update the docs if we are adding a new parameter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added docs
@Override | ||
public void awaitInitialization() | ||
{ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i guess this doesn't need await because its continuously updated instead of swapped? if that is the case could you leave a comment? or if not the case, explain why it doesn't need to wait on stuff?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
@Override | ||
public void awaitInitialization() | ||
{ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a reason the lookups-cached-single implementations don't implement these methods? I don't think it necessarily needs to be a blocker for them to be implemented for this PR if they should someday implement it, but a comment on why or why not would be nice to leave here in the code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
|
||
default long getLoadTimeoutMills() | ||
{ | ||
return 60 * 1000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QQ. Is there any reason we have kept this to be 60 sec but the one for default on Jdbc extraction namespace is 120 sec ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, default loadTime for lookups is 60 secs that will be enough time to load all non jdbc lookups like uri, maps, and other loaders. Overriding this loadTime for JDBC to 2 mins as it can take little longer to load entries if they in millions. Also documented it so that users can customize it for their use cases of big lookups.
@Override | ||
public void awaitInitialization() | ||
{ | ||
// Kafka lookup do not need await on initialization as it is realtime kafka lookups. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would updating the topic or some other config of the kafka lookup also result in a new cache being initialized, but because we just return true here we potentially don't wait on it? I guess i'm wanting to make sure that it isn't a problem that this one wouldn't wait until it has read the topic and populated the cache for the first time, or is it not a problem for some other reason (i tried to remember how all this stuff works, but i'm still not certain 🙃 )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For first time, KafkaLookupExtractorFactory has lifecycle and process will wait for started latch in start method.
Today, On updating the config, it deletes the old lookup immediately and it kicks the start lifecycle again. I just wanted to keep the same behavior as this change was mainly to cover jdbc use case, thus overridden await methods.
But you have great point, I think we should await for kafka, reading from kafka and populating is asynchronus and ideally it should await on initialization as well to avoid the disruption in the looking serving requests.
New behavior will be:
kafka lookup will wait until forever (as there is no definite loadTimeout that we have currently for kafka lookups) to drain all messages from kafka topic and it will be initialized fully and then it will drop the old container and starting serving from new one, else it will continue serving from old lookup.
Question is: should we do this change along with same PR or not. I would prefer to do this in next followup PR as it needs bunch of unit tests case to cover the the wait scenario until topic is drained.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think its fine to do as a follow-up, thanks for looking into it
Fixes #14796.
Description
This PR is major change is lookup container loading and dropping behavior and avoid the lookup inconsistencies in update container call. It also introduces the user defined SLA of loading the lookups, loading and dropping behavior will avail this SLA and wait for new container to load.
Load Container call:
Starts the new container, if cache is loaded then drop the old container if exists and start serving from new container.
If cache is not loaded then wait for container to come up with startRetries times and then declare the new container as failed and kill new container, still continue serving from old container.
Drop Container call:
We get drop call from 2 flow, Update and Delete call from lookups. Drop Notice also accepts the optional loadedContainer.
LoadedContainer is only passed in update flow. We make sure that loadedContainer is loaded with cache before dropping the existing container. If cache is not loaded, wait for given SLA for loadedContainer to load.
If cache is loaded in loadedContainer then go ahead and drop it.
Release note
This PR has: