Skip to content

Commit

Permalink
Refactor lookups behavior while loading/dropping the containers (apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
pranavbhole authored Nov 7, 2023
1 parent 54fa342 commit e2fde8c
Show file tree
Hide file tree
Showing 25 changed files with 446 additions and 45 deletions.
1 change: 1 addition & 0 deletions docs/development/extensions-core/lookups-cached-global.md
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ The JDBC lookups will poll a database to populate its local cache. If the `tsCol
|`tsColumn`| The column in `table` which contains when the key was updated|No|Not used|
|`pollPeriod`|How often to poll the DB|No|0 (only once)|
|`jitterSeconds`| How much jitter to add (in seconds) up to maximum as a delay (actual value will be used as random from 0 to `jitterSeconds`), used to distribute db load more evenly|No|0|
|`loadTimeoutSeconds`| How much time (in seconds) it can take to query and populate lookup values. It will be helpful in lookup updates. On lookup update, it will wait maximum of `loadTimeoutSeconds` for new lookup to come up and continue serving from old lookup until new lookup successfully loads. |No|0|
|`maxHeapPercentage`|The maximum percentage of heap size that the lookup should consume. If the lookup grows beyond this size, warning messages will be logged in the respective service logs.|No|10% of JVM heap size|

```json
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,18 @@ public LookupIntrospectHandler getIntrospectHandler()
return new KafkaLookupExtractorIntrospectionHandler(this);
}

@Override
public void awaitInitialization()
{
// Kafka lookup do not need await on initialization as it is realtime kafka lookups.
}

@Override
public boolean isInitialized()
{
return true;
}

@Override
public LookupExtractor get()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,13 @@ public void testSimpleSerDe() throws Exception
mapper.writeValueAsString(expected),
KafkaLookupExtractorFactory.class
);
result.awaitInitialization();
Assert.assertEquals(expected.getKafkaTopic(), result.getKafkaTopic());
Assert.assertEquals(expected.getKafkaProperties(), result.getKafkaProperties());
Assert.assertEquals(cacheManager, result.getCacheManager());
Assert.assertEquals(0, expected.getCompletedEventCount());
Assert.assertEquals(0, result.getCompletedEventCount());
Assert.assertTrue(result.isInitialized());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -169,6 +170,22 @@ public LookupIntrospectHandler getIntrospectHandler()
return lookupIntrospectHandler;
}

@Override
public void awaitInitialization() throws InterruptedException, TimeoutException
{
long timeout = extractionNamespace.getLoadTimeoutMills();
if (entry.getCacheState() == CacheScheduler.NoCache.CACHE_NOT_INITIALIZED) {
LOG.info("Cache not initialized yet for namespace %s waiting for %s mills", extractionNamespace, timeout);
entry.awaitTotalUpdatesWithTimeout(1, timeout);
}
}

@Override
public boolean isInitialized()
{
return entry.getCacheState() instanceof CacheScheduler.VersionedCache;
}

@JsonProperty
public ExtractionNamespace getExtractionNamespace()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,9 @@ default long getJitterMills()
{
return 0;
}

default long getLoadTimeoutMills()
{
return 60 * 1000;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class JdbcExtractionNamespace implements ExtractionNamespace
private static final Logger LOG = new Logger(JdbcExtractionNamespace.class);

long DEFAULT_MAX_HEAP_PERCENTAGE = 10L;
long DEFAULT_LOOKUP_LOAD_TIME_SECONDS = 120;

@JsonProperty
private final MetadataStorageConnectorConfig connectorConfig;
Expand All @@ -63,6 +64,8 @@ public class JdbcExtractionNamespace implements ExtractionNamespace
@JsonProperty
private final long maxHeapPercentage;
@JsonProperty
private final long loadTimeoutSeconds;
@JsonProperty
private final int jitterSeconds;

@JsonCreator
Expand All @@ -77,6 +80,7 @@ public JdbcExtractionNamespace(
@Min(0) @JsonProperty(value = "pollPeriod") @Nullable final Period pollPeriod,
@JsonProperty(value = "maxHeapPercentage") @Nullable final Long maxHeapPercentage,
@JsonProperty(value = "jitterSeconds") @Nullable Integer jitterSeconds,
@JsonProperty(value = "loadTimeoutSeconds") @Nullable final Long loadTimeoutSeconds,
@JacksonInject JdbcAccessSecurityConfig securityConfig
)
{
Expand All @@ -101,6 +105,7 @@ public JdbcExtractionNamespace(
}
this.jitterSeconds = jitterSeconds == null ? 0 : jitterSeconds;
this.maxHeapPercentage = maxHeapPercentage == null ? DEFAULT_MAX_HEAP_PERCENTAGE : maxHeapPercentage;
this.loadTimeoutSeconds = loadTimeoutSeconds == null ? DEFAULT_LOOKUP_LOAD_TIME_SECONDS : loadTimeoutSeconds;
}

/**
Expand Down Expand Up @@ -176,6 +181,12 @@ public long getJitterMills()
return 1000L * ThreadLocalRandom.current().nextInt(jitterSeconds + 1);
}

@Override
public long getLoadTimeoutMills()
{
return 1000L * loadTimeoutSeconds;
}

@Override
public String toString()
{
Expand All @@ -187,6 +198,8 @@ public String toString()
", tsColumn='" + tsColumn + '\'' +
", filter='" + filter + '\'' +
", pollPeriod=" + pollPeriod +
", jitterSeconds=" + jitterSeconds +
", loadTimeoutSeconds=" + loadTimeoutSeconds +
", maxHeapPercentage=" + maxHeapPercentage +
'}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ public void awaitTotalUpdates(int totalUpdates) throws InterruptedException
impl.updateCounter.awaitCount(totalUpdates);
}

@VisibleForTesting
public void awaitTotalUpdatesWithTimeout(int totalUpdates, long timeoutMills)
throws InterruptedException, TimeoutException
{
impl.updateCounter.awaitCount(totalUpdates, timeoutMills, TimeUnit.MILLISECONDS);
}
@VisibleForTesting
void awaitNextUpdates(int nextUpdates) throws InterruptedException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import static org.mockito.Mockito.atMostOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
Expand Down Expand Up @@ -227,7 +228,6 @@ public void testSimpleStartStart() throws Exception
);
Assert.assertTrue(namespaceLookupExtractorFactory.start());
Assert.assertTrue(namespaceLookupExtractorFactory.start());

verify(scheduler).scheduleAndWait(extractionNamespace, 60000L);
verifyNoMoreInteractions(scheduler, entry, versionedCache);
}
Expand Down Expand Up @@ -287,6 +287,40 @@ public long getPollMs()
verifyNoMoreInteractions(scheduler, entry, versionedCache);
}

@Test
public void testAwaitInitializationOnCacheNotInitialized() throws Exception
{
final ExtractionNamespace extractionNamespace = new ExtractionNamespace()
{
@Override
public long getPollMs()
{
return 0;
}

@Override
public long getLoadTimeoutMills()
{
return 1;
}
};
expectScheduleAndWaitOnce(extractionNamespace);
when(entry.getCacheState()).thenReturn(CacheScheduler.NoCache.CACHE_NOT_INITIALIZED);

final NamespaceLookupExtractorFactory namespaceLookupExtractorFactory = new NamespaceLookupExtractorFactory(
extractionNamespace,
scheduler
);
Assert.assertTrue(namespaceLookupExtractorFactory.start());
namespaceLookupExtractorFactory.awaitInitialization();
Assert.assertThrows(ISE.class, () -> namespaceLookupExtractorFactory.get());
verify(scheduler).scheduleAndWait(extractionNamespace, 60000L);
verify(entry, times(2)).getCacheState();
verify(entry).awaitTotalUpdatesWithTimeout(1, 1);
Thread.sleep(10);
verifyNoMoreInteractions(scheduler, entry, versionedCache);
}

private void expectScheduleAndWaitOnce(ExtractionNamespace extractionNamespace)
{
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ public String getConnectURI()
"some filter",
new Period(10),
null,
0, new JdbcAccessSecurityConfig()
0,
1000L,
new JdbcAccessSecurityConfig()
{
@Override
public Set<String> getAllowedProperties()
Expand Down Expand Up @@ -102,6 +104,7 @@ public String getConnectURI()
new Period(10),
null,
0,
null,
new JdbcAccessSecurityConfig()
{
@Override
Expand Down Expand Up @@ -139,6 +142,7 @@ public String getConnectURI()
new Period(10),
null,
0,
null,
new JdbcAccessSecurityConfig()
{
@Override
Expand Down Expand Up @@ -178,6 +182,7 @@ public String getConnectURI()
new Period(10),
null,
0,
null,
new JdbcAccessSecurityConfig()
{
@Override
Expand Down Expand Up @@ -221,6 +226,7 @@ public String getConnectURI()
new Period(10),
null,
0,
null,
new JdbcAccessSecurityConfig()
{
@Override
Expand Down Expand Up @@ -260,6 +266,7 @@ public String getConnectURI()
new Period(10),
10L,
0,
null,
new JdbcAccessSecurityConfig()
{
@Override
Expand Down Expand Up @@ -296,7 +303,9 @@ public String getConnectURI()
"some filter",
new Period(10),
null,
0, new JdbcAccessSecurityConfig()
0,
null,
new JdbcAccessSecurityConfig()
{
@Override
public Set<String> getAllowedProperties()
Expand Down Expand Up @@ -335,6 +344,7 @@ public String getConnectURI()
new Period(10),
null,
0,
null,
new JdbcAccessSecurityConfig()
{
@Override
Expand Down Expand Up @@ -380,6 +390,7 @@ public String getConnectURI()
new Period(10),
null,
0,
null,
new JdbcAccessSecurityConfig()
{
@Override
Expand Down Expand Up @@ -423,6 +434,7 @@ public String getConnectURI()
new Period(10),
null,
0,
null,
new JdbcAccessSecurityConfig()
{
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ private static JdbcExtractionNamespace createJdbcExtractionNamespace(
Period.ZERO,
null,
0,
null,
new JdbcAccessSecurityConfig()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,24 @@ public void testSimpleSubmission() throws InterruptedException
Assert.assertEquals(VALUE, entry.getCache().get(KEY));
}

@Test(timeout = 60_000L)
public void testInitialization() throws InterruptedException, TimeoutException
{
UriExtractionNamespace namespace = new UriExtractionNamespace(
tmpFile.toURI(),
null, null,
new UriExtractionNamespace.ObjectMapperFlatDataParser(
UriExtractionNamespaceTest.registerTypes(new ObjectMapper())
),
new Period(0),
null,
null
);
CacheScheduler.Entry entry = scheduler.schedule(namespace);
entry.awaitTotalUpdatesWithTimeout(1, 2000);
Assert.assertEquals(VALUE, entry.getCache().get(KEY));
}

@Test(timeout = 60_000L)
public void testPeriodicUpdatesScheduled() throws InterruptedException
{
Expand Down Expand Up @@ -459,6 +477,7 @@ public String getConnectURI()
new Period(10_000),
null,
0,
null,
new JdbcAccessSecurityConfig()
{
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ public void testMappingWithoutFilter()
new Period(0),
null,
0,
null,
new JdbcAccessSecurityConfig()
);
try (CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace)) {
Expand Down Expand Up @@ -363,6 +364,7 @@ public void testMappingWithFilter()
new Period(0),
null,
0,
null,
new JdbcAccessSecurityConfig()
);
try (CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace)) {
Expand Down Expand Up @@ -414,6 +416,7 @@ public void testRandomJitter()
new Period(0),
null,
120,
null,
new JdbcAccessSecurityConfig()
);
long jitter = extractionNamespace.getJitterMills();
Expand All @@ -433,6 +436,7 @@ public void testRandomJitterNotSpecified()
FILTER_COLUMN + "='1'",
new Period(0),
null,
0,
null,
new JdbcAccessSecurityConfig()
);
Expand Down Expand Up @@ -478,6 +482,7 @@ public void testSerde() throws IOException
new Period(10),
null,
0,
null,
securityConfig
);
final ObjectMapper mapper = new DefaultObjectMapper();
Expand All @@ -504,6 +509,7 @@ private CacheScheduler.Entry ensureEntry()
new Period(10),
null,
0,
null,
new JdbcAccessSecurityConfig()
);
CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,17 @@ public LookupIntrospectHandler getIntrospectHandler()
return null;
}

@Override
public void awaitInitialization()
{
// LoadingLookupFactory does not have any initialization period as it fetches the key from loadingCache and DataFetcher as necessary.
}

@Override
public boolean isInitialized()
{
return true;
}
@Override
public LoadingLookup get()
{
Expand Down
Loading

0 comments on commit e2fde8c

Please sign in to comment.