Skip to content

Commit

Permalink
Add new method(overload) for CentralDogmaPropertySupplier#createPrope…
Browse files Browse the repository at this point in the history
…rtyFile/register (#134)

* Add new method overload for createPropertyFile, register

* Change argument type StaticPropertySupplier to PropertySupplier

* Change return type of method: defaultProperties: List

* Add tests for CentralDogmaPropertySupplier#register

* Add Javadoc for CentralDogmaPropertySupplier#register

* Delete Unused CentralDogmaPropertySupplier#createPropertyFile

* Reduce access to external systems in CentralDogmaPropertySupplier#register

* Move defaultProperties From CentralDogmaPropertySupplier To ProcessorProperties

* Use Property.ofStatic() instead of DynamicProperty

* Delete not meaningful verify method

* Use primitive types int: CentralDogmaPropertySupplierTest

* Use any(String.class) for commit message of centraldogma

* Use any(String.class) for commit message of centraldogma 2

* Reword javadoc for CentralDogmaPropertySupplier#register

* Added a short javadoc for ProcessorProperties#defaultProperties

* Fixed format in CentralDogmaPropertySupplierTest

* Fix format in CentralDogmaPropertySupplierTest

* Rename test private method: defaultPropertiesAsJsonNode

* Fix test: default properties list: not fragile: testRegisterWithCustomizedSettings in CentralDogmaPropertySupplierTest

* Provide only changed properties: testRegisterWithCustomizedSettings in public class CentralDogmaPropertySupplierTest

* make the code short listPropertiesForVerifyingMock

Co-authored-by: lazmond3 <[email protected]>
  • Loading branch information
lazmond3 and lazmond3 authored Dec 14, 2021
1 parent 129f787 commit 5b49d0f
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@

package com.linecorp.decaton.centraldogma;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -119,7 +122,7 @@ public <T> Optional<Property<T>> getProperty(PropertyDefinition<T> definition) {
try {
JsonNode node = child.initialValueFuture().join().value(); //doesn't fail since it's a child watcher
setValue(prop, node);
} catch (RuntimeException e) {
} catch (RuntimeException e) {
logger.warn("Failed to set initial value from CentralDogma for {}", definition.name(), e);
}

Expand All @@ -141,23 +144,51 @@ public void close() {
*/
public static CentralDogmaPropertySupplier register(CentralDogma centralDogma, String project,
String repository, String filename) {
createPropertyFile(centralDogma, project, repository, filename);
createPropertyFile(centralDogma, project, repository, filename, ProcessorProperties.defaultProperties());
return new CentralDogmaPropertySupplier(centralDogma, project, repository, filename);
}

/**
* Create a default property file if it doesn't exist on Central Dogma and
* return a {@link CentralDogmaPropertySupplier}.
* @param centralDogma a {@link CentralDogma} instance to use to access Central Dogma server.
* @param project the project name where the properties are placed.
* @param repository the repository name where the properties are placed.
* @param filename the name of the file containing properties as top-level fields.
* @param supplier a {@link PropertySupplier} which provides a set of properties with customized initial values.
*/
public static CentralDogmaPropertySupplier register(CentralDogma centralDogma, String project,
String repository, String filename,
PropertySupplier supplier) {
List<Property<?>> properties = ProcessorProperties.defaultProperties().stream().map(defaultProperty -> {
Optional<? extends Property<?>> prop = supplier.getProperty(defaultProperty.definition());
if (prop.isPresent()) {
return prop.get();
} else {
return defaultProperty;
}
}).collect(Collectors.toList());

createPropertyFile(centralDogma, project, repository, filename, properties);
return new CentralDogmaPropertySupplier(centralDogma, project, repository, filename);
}

private static void createPropertyFile(CentralDogma centralDogma, String project,
String repository, String fileName) {
String repository, String fileName,
List<Property<?>> properties) {
Revision baseRevision = normalizeRevision(centralDogma, project, repository, Revision.HEAD);
boolean fileExists = fileExists(centralDogma, project, repository, fileName, baseRevision);
long startedTime = System.currentTimeMillis();
long remainingTime = remainingTime(PROPERTY_CREATION_TIMEOUT_MILLIS, startedTime);

JsonNode jsonNodeProperties = convertPropertyListToJsonNode(properties);

while (!fileExists && remainingTime > 0) {
try {
centralDogma.push(project, repository, baseRevision,
String.format("[CentralDogmaPropertySupplier] Property file created: %s",
fileName),
Change.ofJsonUpsert(fileName, defaultProperties()))
Change.ofJsonUpsert(fileName, jsonNodeProperties))
.get(remainingTime, TimeUnit.MILLISECONDS);
logger.info("New property file registered on Central Dogma: {}/{}/{}",
project, repository, fileName);
Expand Down Expand Up @@ -225,13 +256,16 @@ private static long remainingTime(long totalTime, long startedTime) {
}

// visible for testing
static JsonNode defaultProperties() {
final ObjectNode properties = objectMapper.createObjectNode();
ProcessorProperties.PROPERTY_DEFINITIONS
.forEach(definition -> properties.set(definition.name(),
objectMapper.valueToTree(definition.defaultValue()))
);

return properties;
static JsonNode convertPropertyListToJsonNode(List<Property<?>> properties) {
final ObjectNode propertiesObjectNode = objectMapper.createObjectNode();
properties.forEach(
property -> {
propertiesObjectNode.set(
property.definition().name(),
objectMapper.valueToTree(property.value())
);
}
);
return propertiesObjectNode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.linecorp.centraldogma.common.Revision;
import com.linecorp.centraldogma.internal.Jackson;
import com.linecorp.centraldogma.testing.junit4.CentralDogmaRule;
import com.linecorp.decaton.processor.runtime.ProcessorProperties;
import com.linecorp.decaton.processor.runtime.Property;

public class CentralDogmaPropertySupplierIntegrationTest {
Expand All @@ -55,6 +56,11 @@ public class CentralDogmaPropertySupplierIntegrationTest {
private static final String REPOSITORY_NAME = "repo";
private static final String FILENAME = "/subscription.json";

private JsonNode defaultProperties() {
return CentralDogmaPropertySupplier.convertPropertyListToJsonNode(
ProcessorProperties.defaultProperties());
}

@Test(timeout = 50000)
public void testCDIntegration() throws InterruptedException {
CentralDogma client = centralDogmaRule.client();
Expand Down Expand Up @@ -131,7 +137,7 @@ public void testCDRegisterSuccess() {
Entry<JsonNode> prop = client.getFile(PROJECT_NAME, REPOSITORY_NAME,
Revision.HEAD, Query.ofJson(FILENAME)).join();

assertEquals(CentralDogmaPropertySupplier.defaultProperties().asText(),
assertEquals(defaultProperties().asText(),
prop.content().asText());
}

Expand Down Expand Up @@ -174,7 +180,7 @@ public void testCDRegisterConflict() throws Exception {
return i.callRealMethod();
}).when(userA)
.push(eq(PROJECT_NAME), eq(REPOSITORY_NAME), any(), any(),
eq(Change.ofJsonUpsert(FILENAME, CentralDogmaPropertySupplier.defaultProperties())));
eq(Change.ofJsonUpsert(FILENAME, defaultProperties())));

ExecutorService service = Executors.newFixedThreadPool(2);
service.submit(() -> CentralDogmaPropertySupplier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,20 @@
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import org.junit.Before;
import org.junit.Rule;
Expand All @@ -45,9 +50,16 @@

import com.linecorp.centraldogma.client.CentralDogma;
import com.linecorp.centraldogma.client.Watcher;
import com.linecorp.centraldogma.common.Change;
import com.linecorp.centraldogma.common.PushResult;
import com.linecorp.centraldogma.common.Query;
import com.linecorp.centraldogma.common.Revision;
import com.linecorp.decaton.processor.runtime.DynamicProperty;
import com.linecorp.decaton.processor.runtime.ProcessorProperties;
import com.linecorp.decaton.processor.runtime.Property;
import com.linecorp.decaton.processor.runtime.PropertyDefinition;
import com.linecorp.decaton.processor.runtime.PropertySupplier;
import com.linecorp.decaton.processor.runtime.StaticPropertySupplier;

public class CentralDogmaPropertySupplierTest {
@Rule
Expand Down Expand Up @@ -86,7 +98,8 @@ public void setUp() {
@Test
@SuppressWarnings("unchecked")
public void testWatcherSetup() {
when(rootWatcher.latestValue()).thenReturn(objectMapper.createObjectNode().put(LONG_PROPERTY.name(), 123L));
when(rootWatcher.latestValue()).thenReturn(
objectMapper.createObjectNode().put(LONG_PROPERTY.name(), 123L));

Watcher<JsonNode> longPropertyWatcher = mock(Watcher.class);
Watcher<JsonNode> listPropertyWatcher = mock(Watcher.class);
Expand Down Expand Up @@ -133,4 +146,102 @@ public void testGetPropertyAbsentName() {
PropertyDefinition<Object> missingProperty = PropertyDefinition.define("absent.value", Long.class);
assertFalse(supplier.getProperty(missingProperty).isPresent());
}

@Test
public void testRegisterWithDefaultSettings() {
when(centralDogma.normalizeRevision(PROJECT_NAME, REPOSITORY_NAME, Revision.HEAD)).thenReturn(
CompletableFuture.completedFuture(Revision.HEAD)
);
when(centralDogma.listFiles(PROJECT_NAME, REPOSITORY_NAME, Revision.HEAD, FILENAME)).thenReturn(
CompletableFuture.completedFuture(Collections.emptyMap())
);
when(centralDogma.push(
eq(PROJECT_NAME),
eq(REPOSITORY_NAME),
eq(Revision.HEAD),
any(String.class),
eq(Change.ofJsonUpsert(FILENAME, defaultPropertiesAsJsonNode())))
).thenReturn(
CompletableFuture.completedFuture(
new PushResult(Revision.HEAD, 1)
)
);

CentralDogmaPropertySupplier.register(centralDogma, PROJECT_NAME, REPOSITORY_NAME, FILENAME);
verify(centralDogma, times(1)).push(
eq(PROJECT_NAME),
eq(REPOSITORY_NAME),
eq(Revision.HEAD),
any(String.class),
eq(Change.ofJsonUpsert(FILENAME, defaultPropertiesAsJsonNode()))
);

}

@Test
public void testRegisterWithCustomizedSettings() {
final int settingForPartitionConcurrency = 188;
final int settingForMaxPendingRecords = 121212;
final int whenCentralDogmaPushed = 111111;

List<Property<?>> listPropertiesProvidedByUser = Arrays.asList(
Property.ofStatic(
ProcessorProperties.CONFIG_PARTITION_CONCURRENCY,
settingForPartitionConcurrency),
Property.ofStatic(
ProcessorProperties.CONFIG_MAX_PENDING_RECORDS,
settingForMaxPendingRecords
)
);
final PropertySupplier supplier = StaticPropertySupplier.of(listPropertiesProvidedByUser);

final List<Property<?>> listPropertiesForVerifyingMock = ProcessorProperties
.defaultProperties()
.stream()
.map(defaultProperty -> {
Optional<? extends Property<?>> prop = supplier.getProperty(defaultProperty.definition());
if (prop.isPresent()) {
return prop.get();
} else {
return defaultProperty;
}
}).collect(Collectors.toList());

final JsonNode jsonNodeProperties = CentralDogmaPropertySupplier
.convertPropertyListToJsonNode(listPropertiesForVerifyingMock);

when(centralDogma.normalizeRevision(PROJECT_NAME, REPOSITORY_NAME, Revision.HEAD)).thenReturn(
CompletableFuture.completedFuture(Revision.HEAD)
);
when(centralDogma.listFiles(PROJECT_NAME, REPOSITORY_NAME, Revision.HEAD, FILENAME)).thenReturn(
CompletableFuture.completedFuture(Collections.emptyMap())
);

when(centralDogma.push(
eq(PROJECT_NAME),
eq(REPOSITORY_NAME),
eq(Revision.HEAD),
any(String.class),
eq(Change.ofJsonUpsert(FILENAME, jsonNodeProperties)))
).thenReturn(
CompletableFuture.completedFuture(
new PushResult(Revision.HEAD, whenCentralDogmaPushed)
)
);

CentralDogmaPropertySupplier.register(centralDogma, PROJECT_NAME, REPOSITORY_NAME, FILENAME, supplier);

verify(centralDogma, times(1)).push(
eq(PROJECT_NAME),
eq(REPOSITORY_NAME),
eq(Revision.HEAD),
any(String.class),
eq(Change.ofJsonUpsert(FILENAME, jsonNodeProperties))
);
}

private static JsonNode defaultPropertiesAsJsonNode() {
return CentralDogmaPropertySupplier.convertPropertyListToJsonNode(
ProcessorProperties.defaultProperties());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.linecorp.decaton.processor.runtime;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -229,4 +230,13 @@ public static Builder<ProcessorProperties> builder() {
public ProcessorProperties(Map<PropertyDefinition<?>, Property<?>> properties) {
super(properties);
}

/**
* Returns a List of properties with default values.
*/
public static List<Property<?>> defaultProperties() {
List<Property<?>> properties = new ArrayList<>();
PROPERTY_DEFINITIONS.forEach(definition -> properties.add(Property.ofStatic(definition)));
return properties;
}
}

0 comments on commit 5b49d0f

Please sign in to comment.