Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor lookups behavior while loading/dropping the containers #14806

Merged
merged 6 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/development/extensions-core/lookups-cached-global.md
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ The JDBC lookups will poll a database to populate its local cache. If the `tsCol
|`tsColumn`| The column in `table` which contains when the key was updated|No|Not used|
|`pollPeriod`|How often to poll the DB|No|0 (only once)|
|`jitterSeconds`| How much jitter to add (in seconds) up to maximum as a delay (actual value will be used as random from 0 to `jitterSeconds`), used to distribute db load more evenly|No|0|
|`loadTimeoutSeconds`| How much time (in seconds) it can take to query and populate lookup values. It will be helpful in lookup updates. On lookup update, it will wait maximum of `loadTimeoutSeconds` for new lookup to come up and continue serving from old lookup until new lookup successfully loads. |No|0|
|`maxHeapPercentage`|The maximum percentage of heap size that the lookup should consume. If the lookup grows beyond this size, warning messages will be logged in the respective service logs.|No|10% of JVM heap size|

```json
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,18 @@ public LookupIntrospectHandler getIntrospectHandler()
return new KafkaLookupExtractorIntrospectionHandler(this);
}

@Override
public void awaitInitialization()
{
// Kafka lookup do not need await on initialization as it is realtime kafka lookups.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would updating the topic or some other config of the kafka lookup also result in a new cache being initialized, but because we just return true here we potentially don't wait on it? I guess i'm wanting to make sure that it isn't a problem that this one wouldn't wait until it has read the topic and populated the cache for the first time, or is it not a problem for some other reason (i tried to remember how all this stuff works, but i'm still not certain 🙃 )

Copy link
Contributor Author

@pranavbhole pranavbhole Nov 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For first time, KafkaLookupExtractorFactory has lifecycle and process will wait for started latch in start method.
Today, On updating the config, it deletes the old lookup immediately and it kicks the start lifecycle again. I just wanted to keep the same behavior as this change was mainly to cover jdbc use case, thus overridden await methods.

But you have great point, I think we should await for kafka, reading from kafka and populating is asynchronus and ideally it should await on initialization as well to avoid the disruption in the looking serving requests.
New behavior will be:
kafka lookup will wait until forever (as there is no definite loadTimeout that we have currently for kafka lookups) to drain all messages from kafka topic and it will be initialized fully and then it will drop the old container and starting serving from new one, else it will continue serving from old lookup.

Question is: should we do this change along with same PR or not. I would prefer to do this in next followup PR as it needs bunch of unit tests case to cover the the wait scenario until topic is drained.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think its fine to do as a follow-up, thanks for looking into it

}

@Override
public boolean isInitialized()
{
return true;
}

@Override
public LookupExtractor get()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,13 @@ public void testSimpleSerDe() throws Exception
mapper.writeValueAsString(expected),
KafkaLookupExtractorFactory.class
);
result.awaitInitialization();
Assert.assertEquals(expected.getKafkaTopic(), result.getKafkaTopic());
Assert.assertEquals(expected.getKafkaProperties(), result.getKafkaProperties());
Assert.assertEquals(cacheManager, result.getCacheManager());
Assert.assertEquals(0, expected.getCompletedEventCount());
Assert.assertEquals(0, result.getCompletedEventCount());
Assert.assertTrue(result.isInitialized());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -169,6 +170,22 @@ public LookupIntrospectHandler getIntrospectHandler()
return lookupIntrospectHandler;
}

@Override
public void awaitInitialization() throws InterruptedException, TimeoutException
{
long timeout = extractionNamespace.getLoadTimeoutMills();
if (entry.getCacheState() == CacheScheduler.NoCache.CACHE_NOT_INITIALIZED) {
LOG.info("Cache not initialized yet for namespace %s waiting for %s mills", extractionNamespace, timeout);
entry.awaitTotalUpdatesWithTimeout(1, timeout);
}
}

@Override
public boolean isInitialized()
{
return entry.getCacheState() instanceof CacheScheduler.VersionedCache;
}

@JsonProperty
public ExtractionNamespace getExtractionNamespace()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,9 @@ default long getJitterMills()
{
return 0;
}

default long getLoadTimeoutMills()
{
return 60 * 1000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QQ. Is there any reason we have kept this to be 60 sec but the one for default on Jdbc extraction namespace is 120 sec ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, default loadTime for lookups is 60 secs that will be enough time to load all non jdbc lookups like uri, maps, and other loaders. Overriding this loadTime for JDBC to 2 mins as it can take little longer to load entries if they in millions. Also documented it so that users can customize it for their use cases of big lookups.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class JdbcExtractionNamespace implements ExtractionNamespace
private static final Logger LOG = new Logger(JdbcExtractionNamespace.class);

long DEFAULT_MAX_HEAP_PERCENTAGE = 10L;
long DEFAULT_LOOKUP_LOAD_TIME_SECONDS = 120;

@JsonProperty
private final MetadataStorageConnectorConfig connectorConfig;
Expand All @@ -63,6 +64,8 @@ public class JdbcExtractionNamespace implements ExtractionNamespace
@JsonProperty
private final long maxHeapPercentage;
@JsonProperty
private final long loadTimeoutSeconds;
@JsonProperty
private final int jitterSeconds;

@JsonCreator
Expand All @@ -77,6 +80,7 @@ public JdbcExtractionNamespace(
@Min(0) @JsonProperty(value = "pollPeriod") @Nullable final Period pollPeriod,
@JsonProperty(value = "maxHeapPercentage") @Nullable final Long maxHeapPercentage,
@JsonProperty(value = "jitterSeconds") @Nullable Integer jitterSeconds,
@JsonProperty(value = "loadTimeoutSeconds") @Nullable final Long loadTimeoutSeconds,
@JacksonInject JdbcAccessSecurityConfig securityConfig
)
{
Expand All @@ -101,6 +105,7 @@ public JdbcExtractionNamespace(
}
this.jitterSeconds = jitterSeconds == null ? 0 : jitterSeconds;
this.maxHeapPercentage = maxHeapPercentage == null ? DEFAULT_MAX_HEAP_PERCENTAGE : maxHeapPercentage;
this.loadTimeoutSeconds = loadTimeoutSeconds == null ? DEFAULT_LOOKUP_LOAD_TIME_SECONDS : loadTimeoutSeconds;
}

/**
Expand Down Expand Up @@ -176,6 +181,12 @@ public long getJitterMills()
return 1000L * ThreadLocalRandom.current().nextInt(jitterSeconds + 1);
}

@Override
public long getLoadTimeoutMills()
{
return 1000L * loadTimeoutSeconds;
}

@Override
public String toString()
{
Expand All @@ -187,6 +198,8 @@ public String toString()
", tsColumn='" + tsColumn + '\'' +
", filter='" + filter + '\'' +
", pollPeriod=" + pollPeriod +
", jitterSeconds=" + jitterSeconds +
", loadTimeoutSeconds=" + loadTimeoutSeconds +
", maxHeapPercentage=" + maxHeapPercentage +
'}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ public void awaitTotalUpdates(int totalUpdates) throws InterruptedException
impl.updateCounter.awaitCount(totalUpdates);
}

@VisibleForTesting
public void awaitTotalUpdatesWithTimeout(int totalUpdates, long timeoutMills)
throws InterruptedException, TimeoutException
{
impl.updateCounter.awaitCount(totalUpdates, timeoutMills, TimeUnit.MILLISECONDS);
}
@VisibleForTesting
void awaitNextUpdates(int nextUpdates) throws InterruptedException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import static org.mockito.Mockito.atMostOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
Expand Down Expand Up @@ -227,7 +228,6 @@ public void testSimpleStartStart() throws Exception
);
Assert.assertTrue(namespaceLookupExtractorFactory.start());
Assert.assertTrue(namespaceLookupExtractorFactory.start());

verify(scheduler).scheduleAndWait(extractionNamespace, 60000L);
verifyNoMoreInteractions(scheduler, entry, versionedCache);
}
Expand Down Expand Up @@ -287,6 +287,40 @@ public long getPollMs()
verifyNoMoreInteractions(scheduler, entry, versionedCache);
}

@Test
public void testAwaitInitializationOnCacheNotInitialized() throws Exception
{
final ExtractionNamespace extractionNamespace = new ExtractionNamespace()
{
@Override
public long getPollMs()
{
return 0;
}

@Override
public long getLoadTimeoutMills()
{
return 1;
}
};
expectScheduleAndWaitOnce(extractionNamespace);
when(entry.getCacheState()).thenReturn(CacheScheduler.NoCache.CACHE_NOT_INITIALIZED);

final NamespaceLookupExtractorFactory namespaceLookupExtractorFactory = new NamespaceLookupExtractorFactory(
extractionNamespace,
scheduler
);
Assert.assertTrue(namespaceLookupExtractorFactory.start());
namespaceLookupExtractorFactory.awaitInitialization();
Assert.assertThrows(ISE.class, () -> namespaceLookupExtractorFactory.get());
verify(scheduler).scheduleAndWait(extractionNamespace, 60000L);
verify(entry, times(2)).getCacheState();
verify(entry).awaitTotalUpdatesWithTimeout(1, 1);
Thread.sleep(10);
verifyNoMoreInteractions(scheduler, entry, versionedCache);
}

private void expectScheduleAndWaitOnce(ExtractionNamespace extractionNamespace)
{
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ public String getConnectURI()
"some filter",
new Period(10),
null,
0, new JdbcAccessSecurityConfig()
0,
1000L,
new JdbcAccessSecurityConfig()
{
@Override
public Set<String> getAllowedProperties()
Expand Down Expand Up @@ -102,6 +104,7 @@ public String getConnectURI()
new Period(10),
null,
0,
null,
new JdbcAccessSecurityConfig()
{
@Override
Expand Down Expand Up @@ -139,6 +142,7 @@ public String getConnectURI()
new Period(10),
null,
0,
null,
new JdbcAccessSecurityConfig()
{
@Override
Expand Down Expand Up @@ -178,6 +182,7 @@ public String getConnectURI()
new Period(10),
null,
0,
null,
new JdbcAccessSecurityConfig()
{
@Override
Expand Down Expand Up @@ -221,6 +226,7 @@ public String getConnectURI()
new Period(10),
null,
0,
null,
new JdbcAccessSecurityConfig()
{
@Override
Expand Down Expand Up @@ -260,6 +266,7 @@ public String getConnectURI()
new Period(10),
10L,
0,
null,
new JdbcAccessSecurityConfig()
{
@Override
Expand Down Expand Up @@ -296,7 +303,9 @@ public String getConnectURI()
"some filter",
new Period(10),
null,
0, new JdbcAccessSecurityConfig()
0,
null,
new JdbcAccessSecurityConfig()
{
@Override
public Set<String> getAllowedProperties()
Expand Down Expand Up @@ -335,6 +344,7 @@ public String getConnectURI()
new Period(10),
null,
0,
null,
new JdbcAccessSecurityConfig()
{
@Override
Expand Down Expand Up @@ -380,6 +390,7 @@ public String getConnectURI()
new Period(10),
null,
0,
null,
new JdbcAccessSecurityConfig()
{
@Override
Expand Down Expand Up @@ -423,6 +434,7 @@ public String getConnectURI()
new Period(10),
null,
0,
null,
new JdbcAccessSecurityConfig()
{
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ private static JdbcExtractionNamespace createJdbcExtractionNamespace(
Period.ZERO,
null,
0,
null,
new JdbcAccessSecurityConfig()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,24 @@ public void testSimpleSubmission() throws InterruptedException
Assert.assertEquals(VALUE, entry.getCache().get(KEY));
}

@Test(timeout = 60_000L)
public void testInitialization() throws InterruptedException, TimeoutException
{
UriExtractionNamespace namespace = new UriExtractionNamespace(
tmpFile.toURI(),
null, null,
new UriExtractionNamespace.ObjectMapperFlatDataParser(
UriExtractionNamespaceTest.registerTypes(new ObjectMapper())
),
new Period(0),
null,
null
);
CacheScheduler.Entry entry = scheduler.schedule(namespace);
entry.awaitTotalUpdatesWithTimeout(1, 2000);
Assert.assertEquals(VALUE, entry.getCache().get(KEY));
}

@Test(timeout = 60_000L)
public void testPeriodicUpdatesScheduled() throws InterruptedException
{
Expand Down Expand Up @@ -459,6 +477,7 @@ public String getConnectURI()
new Period(10_000),
null,
0,
null,
new JdbcAccessSecurityConfig()
{
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ public void testMappingWithoutFilter()
new Period(0),
null,
0,
null,
new JdbcAccessSecurityConfig()
);
try (CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace)) {
Expand Down Expand Up @@ -363,6 +364,7 @@ public void testMappingWithFilter()
new Period(0),
null,
0,
null,
new JdbcAccessSecurityConfig()
);
try (CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace)) {
Expand Down Expand Up @@ -414,6 +416,7 @@ public void testRandomJitter()
new Period(0),
null,
120,
null,
new JdbcAccessSecurityConfig()
);
long jitter = extractionNamespace.getJitterMills();
Expand All @@ -433,6 +436,7 @@ public void testRandomJitterNotSpecified()
FILTER_COLUMN + "='1'",
new Period(0),
null,
0,
null,
new JdbcAccessSecurityConfig()
);
Expand Down Expand Up @@ -478,6 +482,7 @@ public void testSerde() throws IOException
new Period(10),
null,
0,
null,
securityConfig
);
final ObjectMapper mapper = new DefaultObjectMapper();
Expand All @@ -504,6 +509,7 @@ private CacheScheduler.Entry ensureEntry()
new Period(10),
null,
0,
null,
new JdbcAccessSecurityConfig()
);
CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,17 @@ public LookupIntrospectHandler getIntrospectHandler()
return null;
}

@Override
public void awaitInitialization()
{
// LoadingLookupFactory does not have any initialization period as it fetches the key from loadingCache and DataFetcher as necessary.
}

@Override
public boolean isInitialized()
{
return true;
}
@Override
public LoadingLookup get()
{
Expand Down
Loading
Loading