Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new method(overload) for CentralDogmaPropertySupplier#createPropertyFile/register #134

Merged
merged 22 commits into from
Dec 14, 2021
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
320cd69
Add new method overload for createPropertyFile, register
lazmond3 Dec 2, 2021
3efe319
Change argument type StaticPropertySupplier to PropertySupplier
lazmond3 Dec 9, 2021
c17b3c7
Change return type of method: defaultProperties: List
lazmond3 Dec 9, 2021
df825c9
Add tests for CentralDogmaPropertySupplier#register
lazmond3 Dec 9, 2021
e8c311b
Add Javadoc for CentralDogmaPropertySupplier#register
lazmond3 Dec 10, 2021
6b49b94
Delete Unused CentralDogmaPropertySupplier#createPropertyFile
lazmond3 Dec 10, 2021
8d38efe
Reduce access to external systems in CentralDogmaPropertySupplier#reg…
lazmond3 Dec 10, 2021
03ef312
Move defaultProperties From CentralDogmaPropertySupplier To Processor…
lazmond3 Dec 10, 2021
6d9e69e
Use Property.ofStatic() instead of DynamicProperty
lazmond3 Dec 10, 2021
61cea29
Delete not meaningful verify method
lazmond3 Dec 10, 2021
382f706
Use primitive types int: CentralDogmaPropertySupplierTest
lazmond3 Dec 10, 2021
f2ee11d
Use any(String.class) for commit message of centraldogma
lazmond3 Dec 10, 2021
0f2db81
Use any(String.class) for commit message of centraldogma 2
lazmond3 Dec 13, 2021
2e70198
Reword javadoc for CentralDogmaPropertySupplier#register
lazmond3 Dec 13, 2021
f82bae3
Added a short javadoc for ProcessorProperties#defaultProperties
lazmond3 Dec 13, 2021
e852943
Fixed format in CentralDogmaPropertySupplierTest
lazmond3 Dec 13, 2021
ecf4284
Fix format in CentralDogmaPropertySupplierTest
lazmond3 Dec 14, 2021
e1952c4
Rename test private method: defaultPropertiesAsJsonNode
lazmond3 Dec 14, 2021
ed02a7c
Fix test: default properties list: not fragile: testRegisterWithCusto…
lazmond3 Dec 14, 2021
85f64d7
Merge remote-tracking branch 'origin/master' into feature/add-initial…
lazmond3 Dec 14, 2021
e6ae753
Provide only changed properties: testRegisterWithCustomizedSettings i…
lazmond3 Dec 14, 2021
bea0861
make the code short listPropertiesForVerifyingMock
lazmond3 Dec 14, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@

package com.linecorp.decaton.centraldogma;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -119,7 +122,7 @@ public <T> Optional<Property<T>> getProperty(PropertyDefinition<T> definition) {
try {
JsonNode node = child.initialValueFuture().join().value(); //doesn't fail since it's a child watcher
setValue(prop, node);
} catch (RuntimeException e) {
} catch (RuntimeException e) {
logger.warn("Failed to set initial value from CentralDogma for {}", definition.name(), e);
}

Expand All @@ -141,23 +144,51 @@ public void close() {
*/
public static CentralDogmaPropertySupplier register(CentralDogma centralDogma, String project,
String repository, String filename) {
createPropertyFile(centralDogma, project, repository, filename);
createPropertyFile(centralDogma, project, repository, filename, ProcessorProperties.defaultProperties());
return new CentralDogmaPropertySupplier(centralDogma, project, repository, filename);
}

/**
* Create a default property file if it doesn't exist on Central Dogma and
* return a {@link CentralDogmaPropertySupplier}.
* @param centralDogma a {@link CentralDogma} instance to use to access Central Dogma server.
* @param project the project name where the properties are placed.
* @param repository the repository name where the properties are placed.
* @param filename the name of the file containing properties as top-level fields.
* @param supplier a {@link PropertySupplier} which provides a set of properties with customized initial values.
*/
public static CentralDogmaPropertySupplier register(CentralDogma centralDogma, String project,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As its a public interface, please give it a javadoc like another overload of register.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a javadoc for the new added register method: e8c311b

String repository, String filename,
PropertySupplier supplier) {
List<Property<?>> properties = ProcessorProperties.defaultProperties().stream().map(defaultProperty -> {
Optional<? extends Property<?>> prop = supplier.getProperty(defaultProperty.definition());
if (prop.isPresent()) {
return prop.get();
} else {
return defaultProperty;
}
}).collect(Collectors.toList());

createPropertyFile(centralDogma, project, repository, filename, properties);
return new CentralDogmaPropertySupplier(centralDogma, project, repository, filename);
}

private static void createPropertyFile(CentralDogma centralDogma, String project,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this overload no longer meaningful. Let's just use the below signature and pass default properties directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I deleted this original method createPropertyFile (no longer used), since a new method which has more arguments was added. 6b49b94

String repository, String fileName) {
String repository, String fileName,
List<Property<?>> properties) {
Revision baseRevision = normalizeRevision(centralDogma, project, repository, Revision.HEAD);
boolean fileExists = fileExists(centralDogma, project, repository, fileName, baseRevision);
long startedTime = System.currentTimeMillis();
long remainingTime = remainingTime(PROPERTY_CREATION_TIMEOUT_MILLIS, startedTime);

JsonNode jsonNodeProperties = convertPropertyListToJsonNode(properties);

while (!fileExists && remainingTime > 0) {
try {
centralDogma.push(project, repository, baseRevision,
String.format("[CentralDogmaPropertySupplier] Property file created: %s",
fileName),
Change.ofJsonUpsert(fileName, defaultProperties()))
Change.ofJsonUpsert(fileName, jsonNodeProperties))
.get(remainingTime, TimeUnit.MILLISECONDS);
logger.info("New property file registered on Central Dogma: {}/{}/{}",
project, repository, fileName);
Expand Down Expand Up @@ -225,13 +256,16 @@ private static long remainingTime(long totalTime, long startedTime) {
}

// visible for testing
static JsonNode defaultProperties() {
final ObjectNode properties = objectMapper.createObjectNode();
ProcessorProperties.PROPERTY_DEFINITIONS
.forEach(definition -> properties.set(definition.name(),
objectMapper.valueToTree(definition.defaultValue()))
);

return properties;
static JsonNode convertPropertyListToJsonNode(List<Property<?>> properties) {
final ObjectNode propertiesObjectNode = objectMapper.createObjectNode();
properties.forEach(
property -> {
propertiesObjectNode.set(
property.definition().name(),
objectMapper.valueToTree(property.value())
);
}
);
return propertiesObjectNode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.linecorp.centraldogma.common.Revision;
import com.linecorp.centraldogma.internal.Jackson;
import com.linecorp.centraldogma.testing.junit4.CentralDogmaRule;
import com.linecorp.decaton.processor.runtime.ProcessorProperties;
import com.linecorp.decaton.processor.runtime.Property;

public class CentralDogmaPropertySupplierIntegrationTest {
Expand All @@ -55,6 +56,11 @@ public class CentralDogmaPropertySupplierIntegrationTest {
private static final String REPOSITORY_NAME = "repo";
private static final String FILENAME = "/subscription.json";

private JsonNode defaultProperties() {
return CentralDogmaPropertySupplier.convertPropertyListToJsonNode(
ProcessorProperties.defaultProperties());
}

@Test(timeout = 50000)
public void testCDIntegration() throws InterruptedException {
CentralDogma client = centralDogmaRule.client();
Expand Down Expand Up @@ -131,7 +137,7 @@ public void testCDRegisterSuccess() {
Entry<JsonNode> prop = client.getFile(PROJECT_NAME, REPOSITORY_NAME,
Revision.HEAD, Query.ofJson(FILENAME)).join();

assertEquals(CentralDogmaPropertySupplier.defaultProperties().asText(),
assertEquals(defaultProperties().asText(),
prop.content().asText());
}

Expand Down Expand Up @@ -174,7 +180,7 @@ public void testCDRegisterConflict() throws Exception {
return i.callRealMethod();
}).when(userA)
.push(eq(PROJECT_NAME), eq(REPOSITORY_NAME), any(), any(),
eq(Change.ofJsonUpsert(FILENAME, CentralDogmaPropertySupplier.defaultProperties())));
eq(Change.ofJsonUpsert(FILENAME, defaultProperties())));

ExecutorService service = Executors.newFixedThreadPool(2);
service.submit(() -> CentralDogmaPropertySupplier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,19 @@
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import org.junit.Before;
import org.junit.Rule;
Expand All @@ -45,9 +49,16 @@

import com.linecorp.centraldogma.client.CentralDogma;
import com.linecorp.centraldogma.client.Watcher;
import com.linecorp.centraldogma.common.Change;
import com.linecorp.centraldogma.common.PushResult;
import com.linecorp.centraldogma.common.Query;
import com.linecorp.centraldogma.common.Revision;
import com.linecorp.decaton.processor.runtime.DynamicProperty;
import com.linecorp.decaton.processor.runtime.ProcessorProperties;
import com.linecorp.decaton.processor.runtime.Property;
import com.linecorp.decaton.processor.runtime.PropertyDefinition;
import com.linecorp.decaton.processor.runtime.PropertySupplier;
import com.linecorp.decaton.processor.runtime.StaticPropertySupplier;

public class CentralDogmaPropertySupplierTest {
@Rule
Expand Down Expand Up @@ -86,7 +97,8 @@ public void setUp() {
@Test
@SuppressWarnings("unchecked")
public void testWatcherSetup() {
when(rootWatcher.latestValue()).thenReturn(objectMapper.createObjectNode().put(LONG_PROPERTY.name(), 123L));
when(rootWatcher.latestValue()).thenReturn(
objectMapper.createObjectNode().put(LONG_PROPERTY.name(), 123L));

Watcher<JsonNode> longPropertyWatcher = mock(Watcher.class);
Watcher<JsonNode> listPropertyWatcher = mock(Watcher.class);
Expand Down Expand Up @@ -133,4 +145,110 @@ public void testGetPropertyAbsentName() {
PropertyDefinition<Object> missingProperty = PropertyDefinition.define("absent.value", Long.class);
assertFalse(supplier.getProperty(missingProperty).isPresent());
}

@Test
public void testRegisterWithDefaultSettings() {
when(centralDogma.normalizeRevision(PROJECT_NAME, REPOSITORY_NAME, Revision.HEAD)).thenReturn(
CompletableFuture.completedFuture(Revision.HEAD)
);
when(centralDogma.listFiles(PROJECT_NAME, REPOSITORY_NAME, Revision.HEAD, FILENAME)).thenReturn(
CompletableFuture.completedFuture(Collections.emptyMap())
);
when(centralDogma.push(
eq(PROJECT_NAME),
eq(REPOSITORY_NAME),
eq(Revision.HEAD),
any(String.class),
eq(Change.ofJsonUpsert(FILENAME, defaultProperties())))
).thenReturn(
CompletableFuture.completedFuture(
new PushResult(Revision.HEAD, 1)
)
);

CentralDogmaPropertySupplier.register(centralDogma, PROJECT_NAME, REPOSITORY_NAME, FILENAME);
verify(centralDogma, times(1)).push(
eq(PROJECT_NAME),
eq(REPOSITORY_NAME),
eq(Revision.HEAD),
any(String.class),
eq(Change.ofJsonUpsert(FILENAME, defaultProperties()))
);

}

@Test
public void testRegisterWithCustomizedSettings() {
final int settingForPartitionConcurrency = 188;
final int settingForMaxPendingRecords = 121212;
final int whenCentralDogmaPushed = 111111;

final PropertySupplier properties = StaticPropertySupplier.of(
Property.ofStatic(ProcessorProperties.CONFIG_IGNORE_KEYS),
Property.ofStatic(ProcessorProperties.CONFIG_PROCESSING_RATE),

// Customized User Settings
Property.ofStatic(
ProcessorProperties.CONFIG_PARTITION_CONCURRENCY,
settingForPartitionConcurrency
),
Property.ofStatic(
ProcessorProperties.CONFIG_MAX_PENDING_RECORDS,
settingForMaxPendingRecords
),

Property.ofStatic(ProcessorProperties.CONFIG_COMMIT_INTERVAL_MS),
Property.ofStatic(ProcessorProperties.CONFIG_GROUP_REBALANCE_TIMEOUT_MS),
Property.ofStatic(ProcessorProperties.CONFIG_SHUTDOWN_TIMEOUT_MS),
Property.ofStatic(ProcessorProperties.CONFIG_LOGGING_MDC_ENABLED),
Property.ofStatic(ProcessorProperties.CONFIG_DEFERRED_COMPLETE_TIMEOUT_MS)
);

final List<Property<?>> listProperties = ProcessorProperties
.defaultProperties()
.stream()
.map(
defaultProperty -> properties
.getProperty(defaultProperty.definition())
.get()
).collect(Collectors.toList());

final JsonNode jsonNodeProperties = CentralDogmaPropertySupplier
.convertPropertyListToJsonNode(listProperties);

when(centralDogma.normalizeRevision(PROJECT_NAME, REPOSITORY_NAME, Revision.HEAD)).thenReturn(
CompletableFuture.completedFuture(Revision.HEAD)
);
when(centralDogma.listFiles(PROJECT_NAME, REPOSITORY_NAME, Revision.HEAD, FILENAME)).thenReturn(
CompletableFuture.completedFuture(Collections.emptyMap())
);

when(centralDogma.push(
eq(PROJECT_NAME),
eq(REPOSITORY_NAME),
eq(Revision.HEAD),
any(String.class),
eq(Change.ofJsonUpsert(FILENAME, jsonNodeProperties)))
).thenReturn(
CompletableFuture.completedFuture(
new PushResult(Revision.HEAD, whenCentralDogmaPushed)
)
);

CentralDogmaPropertySupplier.register(centralDogma, PROJECT_NAME, REPOSITORY_NAME, FILENAME,
properties);

verify(centralDogma, times(1)).push(
eq(PROJECT_NAME),
eq(REPOSITORY_NAME),
eq(Revision.HEAD),
any(String.class),
eq(Change.ofJsonUpsert(FILENAME, jsonNodeProperties))
);
}

private static JsonNode defaultProperties() {
return CentralDogmaPropertySupplier.convertPropertyListToJsonNode(
ProcessorProperties.defaultProperties());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.linecorp.decaton.processor.runtime;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -218,4 +219,13 @@ public static Builder<ProcessorProperties> builder() {
public ProcessorProperties(Map<PropertyDefinition<?>, Property<?>> properties) {
super(properties);
}

/**
* Returns a List of properties with default values.
*/
public static List<Property<?>> defaultProperties() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please give it even a short javadoc since its also a public interface.

Copy link
Contributor Author

@lazmond3 lazmond3 Dec 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a short description for this method: f82bae3

List<Property<?>> properties = new ArrayList<>();
PROPERTY_DEFINITIONS.forEach(definition -> properties.add(Property.ofStatic(definition)));
return properties;
}
}