From 5cafcacafc30d9d6d43ab83d9e779317b92ed8f0 Mon Sep 17 00:00:00 2001 From: "vincent.pericart" Date: Wed, 13 Dec 2023 10:43:58 +0900 Subject: [PATCH 1/3] Cache CD dynamic properties --- .../CentralDogmaPropertySupplier.java | 47 +++++++++++++------ ...lDogmaPropertySupplierIntegrationTest.java | 6 +++ 2 files changed, 39 insertions(+), 14 deletions(-) diff --git a/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplier.java b/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplier.java index 4051fcd1..d2ea03cc 100644 --- a/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplier.java +++ b/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplier.java @@ -18,6 +18,8 @@ import java.util.List; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -44,6 +46,9 @@ import com.linecorp.decaton.processor.runtime.PropertyDefinition; import com.linecorp.decaton.processor.runtime.PropertySupplier; +import lombok.AccessLevel; +import lombok.Getter; + /** * A {@link PropertySupplier} implementation with Central Dogma backend. * @@ -72,6 +77,9 @@ public class CentralDogmaPropertySupplier implements PropertySupplier, AutoClose private final Watcher rootWatcher; + @Getter(AccessLevel.PACKAGE)// visible for testing + private final ConcurrentMap>> cachedProperties = new ConcurrentHashMap<>(); + /** * Creates a new {@link CentralDogmaPropertySupplier}. * @param centralDogma a {@link CentralDogma} instance to use to access Central Dogma server. @@ -118,32 +126,43 @@ public Optional> getProperty(PropertyDefinition definition) { return Optional.empty(); } - DynamicProperty prop = new DynamicProperty<>(definition); - Watcher child = rootWatcher.newChild(jsonNode -> jsonNode.path(definition.name())); - child.watch(node -> { + // note: cache DynamicProperties to avoid using too many child watchers if getProperty is called repeatedly. + // for most use cases though, this cache is only filled/read once. + return cachedProperties.computeIfAbsent(definition.name(), name -> { + DynamicProperty prop = new DynamicProperty<>(definition); + Watcher child = rootWatcher.newChild(jsonNode -> jsonNode.path(definition.name())); + child.watch(node -> { + try { + setValue(prop, node); + } catch (Exception e) { + // Catching Exception instead of RuntimeException, since + // Kotlin-implemented DynamicProperty would throw checked exceptions + logger.warn("Failed to set value updated from CentralDogma for {}", definition.name(), e); + } + }); try { + JsonNode node = child.initialValueFuture().join().value(); //doesn't fail since it's a child watcher setValue(prop, node); } catch (Exception e) { // Catching Exception instead of RuntimeException, since // Kotlin-implemented DynamicProperty would throw checked exceptions - logger.warn("Failed to set value updated from CentralDogma for {}", definition.name(), e); + logger.warn("Failed to set initial value from CentralDogma for {}", definition.name(), e); } - }); - try { - JsonNode node = child.initialValueFuture().join().value(); //doesn't fail since it's a child watcher - setValue(prop, node); - } catch (Exception e) { - // Catching Exception instead of RuntimeException, since - // Kotlin-implemented DynamicProperty would throw checked exceptions - logger.warn("Failed to set initial value from CentralDogma for {}", definition.name(), e); - } - return Optional.of(prop); + return Optional.of(prop); + }).map(prop -> { + if (prop.definition().runtimeType() != definition.runtimeType()) { + throw new IllegalStateException("Several different properties have the same name: " + definition.name()); + } + //noinspection unchecked + return (Property) prop; + }); } @Override public void close() { rootWatcher.close(); + cachedProperties.clear(); } /** diff --git a/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierIntegrationTest.java b/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierIntegrationTest.java index d0fd0966..f3b4f8ff 100644 --- a/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierIntegrationTest.java +++ b/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierIntegrationTest.java @@ -31,6 +31,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; import org.junit.Rule; import org.junit.Test; @@ -119,6 +120,11 @@ public void testCDIntegration() throws InterruptedException { latch.await(); assertEquals(20, prop.value().intValue()); + + IntStream.range(0, 10000) + .mapToObj(i -> CONFIG_PARTITION_CONCURRENCY) + .map(supplier::getProperty); + assertEquals(1, supplier.getCachedProperties().size()); } @Test From 8f6d55f8896ef586c43e80d76d06d820a95cac0e Mon Sep 17 00:00:00 2001 From: "vincent.pericart" Date: Thu, 14 Dec 2023 22:46:57 +0900 Subject: [PATCH 2/3] Store the DynamicProperties directly instead of wrapping in Optional --- .../CentralDogmaPropertySupplier.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplier.java b/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplier.java index d2ea03cc..6a9f28ea 100644 --- a/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplier.java +++ b/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplier.java @@ -78,7 +78,7 @@ public class CentralDogmaPropertySupplier implements PropertySupplier, AutoClose private final Watcher rootWatcher; @Getter(AccessLevel.PACKAGE)// visible for testing - private final ConcurrentMap>> cachedProperties = new ConcurrentHashMap<>(); + private final ConcurrentMap> cachedProperties = new ConcurrentHashMap<>(); /** * Creates a new {@link CentralDogmaPropertySupplier}. @@ -128,7 +128,7 @@ public Optional> getProperty(PropertyDefinition definition) { // note: cache DynamicProperties to avoid using too many child watchers if getProperty is called repeatedly. // for most use cases though, this cache is only filled/read once. - return cachedProperties.computeIfAbsent(definition.name(), name -> { + final DynamicProperty cachedProp = cachedProperties.computeIfAbsent(definition.name(), name -> { DynamicProperty prop = new DynamicProperty<>(definition); Watcher child = rootWatcher.newChild(jsonNode -> jsonNode.path(definition.name())); child.watch(node -> { @@ -149,14 +149,14 @@ public Optional> getProperty(PropertyDefinition definition) { logger.warn("Failed to set initial value from CentralDogma for {}", definition.name(), e); } - return Optional.of(prop); - }).map(prop -> { - if (prop.definition().runtimeType() != definition.runtimeType()) { - throw new IllegalStateException("Several different properties have the same name: " + definition.name()); - } - //noinspection unchecked - return (Property) prop; + return prop; }); + + if (cachedProp.definition().runtimeType() != definition.runtimeType()) { + throw new IllegalStateException("Several different properties have the same name: " + definition.name()); + } + //noinspection unchecked + return Optional.of((Property) cachedProp); } @Override From 714dd40bc3720321605dbeaff493263cb9ba7458 Mon Sep 17 00:00:00 2001 From: "vincent.pericart" Date: Fri, 15 Dec 2023 15:08:05 +0900 Subject: [PATCH 3/3] Verify the same instance is always returned --- .../centraldogma/CentralDogmaPropertySupplier.java | 4 ---- ...CentralDogmaPropertySupplierIntegrationTest.java | 13 +++++++++---- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplier.java b/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplier.java index 6a9f28ea..c468d19f 100644 --- a/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplier.java +++ b/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplier.java @@ -46,9 +46,6 @@ import com.linecorp.decaton.processor.runtime.PropertyDefinition; import com.linecorp.decaton.processor.runtime.PropertySupplier; -import lombok.AccessLevel; -import lombok.Getter; - /** * A {@link PropertySupplier} implementation with Central Dogma backend. * @@ -77,7 +74,6 @@ public class CentralDogmaPropertySupplier implements PropertySupplier, AutoClose private final Watcher rootWatcher; - @Getter(AccessLevel.PACKAGE)// visible for testing private final ConcurrentMap> cachedProperties = new ConcurrentHashMap<>(); /** diff --git a/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierIntegrationTest.java b/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierIntegrationTest.java index f3b4f8ff..9cdbf9e1 100644 --- a/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierIntegrationTest.java +++ b/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierIntegrationTest.java @@ -19,6 +19,7 @@ import static com.linecorp.decaton.processor.runtime.ProcessorProperties.CONFIG_PARTITION_CONCURRENCY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -121,10 +122,14 @@ public void testCDIntegration() throws InterruptedException { latch.await(); assertEquals(20, prop.value().intValue()); - IntStream.range(0, 10000) - .mapToObj(i -> CONFIG_PARTITION_CONCURRENCY) - .map(supplier::getProperty); - assertEquals(1, supplier.getCachedProperties().size()); + assertEquals(20, IntStream + .range(0, 10000) + .mapToObj(i -> CONFIG_PARTITION_CONCURRENCY) + .map(supplier::getProperty) + .reduce((l, r) -> { + assertSame(l.get(), r.get()); + return l; + }).get().get().value().intValue()); } @Test