Skip to content

Commit

Permalink
Refactor lookups behavior while loading/dropping the containers
Browse files Browse the repository at this point in the history
  • Loading branch information
pranavbhole committed Aug 14, 2023
1 parent 2d8e0f2 commit 4b42abb
Show file tree
Hide file tree
Showing 20 changed files with 258 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,18 @@ public LookupIntrospectHandler getIntrospectHandler()
return new KafkaLookupExtractorIntrospectionHandler(this);
}

@Override
public void awaitToInitialise() throws InterruptedException, TimeoutException
{

}

@Override
public boolean isCacheLoaded()
{
return true;
}

@Override
public LookupExtractor get()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -169,6 +170,22 @@ public LookupIntrospectHandler getIntrospectHandler()
return lookupIntrospectHandler;
}

@Override
public void awaitToInitialise() throws InterruptedException, TimeoutException
{
long timeout = extractionNamespace.getLoadTimeout();
if (entry.getCacheState() == CacheScheduler.NoCache.CACHE_NOT_INITIALIZED) {
LOG.info("Cache not initialized yet for namespace %s waiting for %s secs", extractionNamespace, timeout);
entry.awaitTotalUpdatesWithTimeout(1, timeout);
}
}

@Override
public boolean isCacheLoaded()
{
return entry.getCacheState() instanceof CacheScheduler.VersionedCache;
}

@JsonProperty
public ExtractionNamespace getExtractionNamespace()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,9 @@ default long getMaxHeapPercentage()
{
return -1L;
}

default long getLoadTimeout()
{
return 60 * 1000;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public class JdbcExtractionNamespace implements ExtractionNamespace
private final Period pollPeriod;
@JsonProperty
private final long maxHeapPercentage;
@JsonProperty
private final long loadTimeout;

@JsonCreator
public JdbcExtractionNamespace(
Expand All @@ -73,6 +75,7 @@ public JdbcExtractionNamespace(
@JsonProperty(value = "filter") @Nullable final String filter,
@Min(0) @JsonProperty(value = "pollPeriod") @Nullable final Period pollPeriod,
@JsonProperty(value = "maxHeapPercentage") @Nullable final Long maxHeapPercentage,
@JsonProperty(value = "loadTimeout") @Nullable final Long loadTimeout,
@JacksonInject JdbcAccessSecurityConfig securityConfig
)
{
Expand All @@ -96,6 +99,7 @@ public JdbcExtractionNamespace(
this.pollPeriod = pollPeriod;
}
this.maxHeapPercentage = maxHeapPercentage == null ? DEFAULT_MAX_HEAP_PERCENTAGE : maxHeapPercentage;
this.loadTimeout = loadTimeout == null ? getLoadTimeout() : loadTimeout;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ public void awaitTotalUpdates(int totalUpdates) throws InterruptedException
impl.updateCounter.awaitCount(totalUpdates);
}

@VisibleForTesting
public void awaitTotalUpdatesWithTimeout(int totalUpdates, long timeoutMills)
throws InterruptedException, TimeoutException
{
impl.updateCounter.awaitCount(totalUpdates, timeoutMills, TimeUnit.MILLISECONDS);
}
@VisibleForTesting
void awaitNextUpdates(int nextUpdates) throws InterruptedException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public String getConnectURI()
"some filter",
new Period(10),
null,
null,
new JdbcAccessSecurityConfig()
{
@Override
Expand Down Expand Up @@ -101,6 +102,7 @@ public String getConnectURI()
"some filter",
new Period(10),
null,
null,
new JdbcAccessSecurityConfig()
{
@Override
Expand Down Expand Up @@ -137,6 +139,7 @@ public String getConnectURI()
"some filter",
new Period(10),
null,
null,
new JdbcAccessSecurityConfig()
{
@Override
Expand Down Expand Up @@ -175,6 +178,7 @@ public String getConnectURI()
"some filter",
new Period(10),
null,
null,
new JdbcAccessSecurityConfig()
{
@Override
Expand Down Expand Up @@ -217,6 +221,7 @@ public String getConnectURI()
"some filter",
new Period(10),
null,
null,
new JdbcAccessSecurityConfig()
{
@Override
Expand Down Expand Up @@ -255,6 +260,7 @@ public String getConnectURI()
"some filter",
new Period(10),
10L,
null,
new JdbcAccessSecurityConfig()
{
@Override
Expand Down Expand Up @@ -291,6 +297,7 @@ public String getConnectURI()
"some filter",
new Period(10),
null,
null,
new JdbcAccessSecurityConfig()
{
@Override
Expand Down Expand Up @@ -329,6 +336,7 @@ public String getConnectURI()
"some filter",
new Period(10),
null,
null,
new JdbcAccessSecurityConfig()
{
@Override
Expand Down Expand Up @@ -373,6 +381,7 @@ public String getConnectURI()
"some filter",
new Period(10),
null,
null,
new JdbcAccessSecurityConfig()
{
@Override
Expand Down Expand Up @@ -415,6 +424,7 @@ public String getConnectURI()
"some filter",
new Period(10),
null,
null,
new JdbcAccessSecurityConfig()
{
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ private static JdbcExtractionNamespace createJdbcExtractionNamespace(
"filter",
Period.ZERO,
null,
null,
new JdbcAccessSecurityConfig()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ public String getConnectURI()
"some filter",
new Period(10_000),
null,
null,
new JdbcAccessSecurityConfig()
{
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ public void testMappingWithoutFilter()
null,
new Period(0),
null,
null,
new JdbcAccessSecurityConfig()
);
try (CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace)) {
Expand Down Expand Up @@ -361,6 +362,7 @@ public void testMappingWithFilter()
FILTER_COLUMN + "='1'",
new Period(0),
null,
null,
new JdbcAccessSecurityConfig()
);
try (CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace)) {
Expand Down Expand Up @@ -436,6 +438,7 @@ public void testSerde() throws IOException
"some filter",
new Period(10),
null,
null,
securityConfig
);
final ObjectMapper mapper = new DefaultObjectMapper();
Expand All @@ -461,6 +464,7 @@ private CacheScheduler.Entry ensureEntry()
null,
new Period(10),
null,
null,
new JdbcAccessSecurityConfig()
);
CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import javax.annotation.Nullable;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

@JsonTypeName("loadingLookup")
Expand Down Expand Up @@ -111,6 +112,17 @@ public LookupIntrospectHandler getIntrospectHandler()
return null;
}

@Override
public void awaitToInitialise() throws InterruptedException, TimeoutException
{

}

@Override
public boolean isCacheLoaded()
{
return true;
}
@Override
public LoadingLookup get()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,17 @@ public LookupIntrospectHandler getIntrospectHandler()
return null;
}

@Override
public void awaitToInitialise()
{
}

@Override
public boolean isCacheLoaded()
{
return true;
}

@Override
public PollingLookup get()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.base.Supplier;

import javax.annotation.Nullable;
import java.util.concurrent.TimeoutException;

/**
* Users of Lookup Extraction need to implement a {@link LookupExtractorFactory} supplier of type {@link LookupExtractor}.
Expand Down Expand Up @@ -79,4 +80,14 @@ default boolean destroy()
*/
@Nullable
LookupIntrospectHandler getIntrospectHandler();

/**
* awaitToInitialise blocks and wait for the cache to initialize fully.
*/
void awaitToInitialise() throws InterruptedException, TimeoutException;

/**
* @return true if cache is loaded and lookup is queryable else returns false
*/
boolean isCacheLoaded();
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,17 @@ public LookupIntrospectHandler getIntrospectHandler()
return null;
}

@Override
public void awaitToInitialise()
{
}

@Override
public boolean isCacheLoaded()
{
return true;
}

@Override
public LookupExtractor get()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,17 @@ public LookupIntrospectHandler getIntrospectHandler()
throw new UnsupportedOperationException("not needed for this test");
}

@Override
public void awaitToInitialise()
{
}

@Override
public boolean isCacheLoaded()
{
return true;
}

@Override
public LookupExtractor get()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ public Response handleUpdates(InputStream inputStream, ObjectMapper mapper)

try {
state.getToLoad().forEach(manager::add);
state.getToDrop().forEach(manager::remove);
state.getToDrop().forEach(lookName-> {
manager.remove(lookName, state.getToLoad().getOrDefault(lookName, null));
});

return Response.status(Response.Status.ACCEPTED).entity(manager.getAllLookupsState()).build();
}
Expand Down Expand Up @@ -135,7 +137,7 @@ public LookupsState<LookupExtractorFactoryContainer> getAll()
@Override
public Object delete(String id)
{
manager.remove(id);
manager.remove(id, null);
return id;
}
}
Expand Down
Loading

0 comments on commit 4b42abb

Please sign in to comment.