Skip to content

Commit

Permalink
feat: implemention of KLIP-13 (#4099)
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-dukhno authored and agavra committed Dec 13, 2019
1 parent 858e4dc commit b23dae9
Show file tree
Hide file tree
Showing 9 changed files with 196 additions and 65 deletions.
2 changes: 1 addition & 1 deletion design-proposals/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ Next KLIP number: **14**
| [KLIP-10: Suppress](klip-10-suppress.md) | Proposal | N/A |
| [KLIP-11: Redesign KSQL query language](klip-11-DQL.md) | Proposal | N/A |
| [KLIP-12: Implement High-Availability for Pull queries](klip-12-pull-high-availability.md)| Proposal | N/A |
| [KLIP-13: Introduce KSQL command to print connect worker properties to the console](klip-13-introduce-KSQL-command-to-print-connect-worker-properties-to-the-console.md) | Proposal | N/A |
| [KLIP-13: Introduce KSQL command to print connect worker properties to the console](klip-13-introduce-KSQL-command-to-print-connect-worker-properties-to-the-console.md) | Merged | 5.5 |
| [KLIP-14: ROWTIME as Pseudocolumn](klip-14-rowtime-as-pseudocolumn.md) | Approved | N/A |
| [KLIP-15: KSQLDB new API and Client(klip-15-new-api-and-client.md | Proposal | N/A |
| [KLIP-16: Introduce 'K$' dynamic views | Proposal | N/A |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@
import io.confluent.ksql.cli.console.table.Table;
import io.confluent.ksql.cli.console.table.Table.Builder;
import io.confluent.ksql.rest.entity.PropertiesList;
import io.confluent.ksql.rest.entity.PropertiesList.Property;

import java.util.Comparator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;

public class PropertiesListTableBuilder implements TableBuilder<PropertiesList> {

private static final List<String> HEADERS =
ImmutableList.of("Property", "Default override", "Effective Value");
ImmutableList.of("Property", "Scope", "Default override", "Effective Value");

@Override
public Table buildTable(final PropertiesList entity) {
Expand All @@ -42,42 +43,47 @@ public Table buildTable(final PropertiesList entity) {
private static List<List<String>> defRowValues(final List<PropertyDef> properties) {
return properties.stream()
.sorted(Comparator.comparing(propertyDef -> propertyDef.propertyName))
.map(
def -> ImmutableList.of(def.propertyName, def.overrideType, def.effectiveValue))
.map(def -> ImmutableList.of(
def.propertyName, def.scope, def.overrideType, def.effectiveValue))
.collect(Collectors.toList());
}

private static List<PropertyDef> propertiesListWithOverrides(final PropertiesList properties) {

final Function<Entry<String, ?>, PropertyDef> toPropertyDef = e -> {
final String value = e.getValue() == null ? "NULL" : e.getValue().toString();
if (properties.getOverwrittenProperties().contains(e.getKey())) {
return new PropertyDef(e.getKey(), "SESSION", value);
final Function<Property, PropertyDef> toPropertyDef = property -> {
final String value = property.getValue() == null ? "NULL" : property.getValue();
final String name = property.getName();
final String scope = property.getScope();
if (properties.getOverwrittenProperties().contains(name)) {
return new PropertyDef(name, scope, "SESSION", value);
}

if (properties.getDefaultProperties().contains(e.getKey())) {
return new PropertyDef(e.getKey(), "", value);
if (properties.getDefaultProperties().contains(name)) {
return new PropertyDef(name, scope, "", value);
}

return new PropertyDef(e.getKey(), "SERVER", value);
return new PropertyDef(name, scope, "SERVER", value);
};

return properties.getProperties().entrySet().stream()
return properties.getProperties().stream()
.map(toPropertyDef)
.collect(Collectors.toList());
}

private static class PropertyDef {

private final String propertyName;
private final String scope;
private final String overrideType;
private final String effectiveValue;

PropertyDef(
final String propertyName,
final String scope,
final String overrideType,
final String effectiveValue) {
this.propertyName = Objects.requireNonNull(propertyName, "propertyName");
this.scope = Objects.requireNonNull(scope, "scope");
this.overrideType = Objects.requireNonNull(overrideType, "overrideType");
this.effectiveValue = Objects.requireNonNull(effectiveValue, "effectiveValue");
}
Expand Down
3 changes: 3 additions & 0 deletions ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -443,17 +443,20 @@ public void testPropertySetUnset() {
// SERVER OVERRIDES:
row(
KsqlConfig.KSQL_STREAMS_PREFIX + StreamsConfig.NUM_STREAM_THREADS_CONFIG,
"KSQL",
SERVER_OVERRIDE,
"4"
),
row(
KsqlConfig.SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_MS_PROPERTY,
"KSQL",
SERVER_OVERRIDE,
"" + (KsqlConstants.defaultSinkWindowChangeLogAdditionalRetention + 1)
),
// SESSION OVERRIDES:
row(
KsqlConfig.KSQL_STREAMS_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"KSQL",
SESSION_OVERRIDE,
"latest"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.KsqlWarning;
import io.confluent.ksql.rest.entity.PropertiesList;
import io.confluent.ksql.rest.entity.PropertiesList.Property;
import io.confluent.ksql.rest.entity.Queries;
import io.confluent.ksql.rest.entity.RunningQuery;
import io.confluent.ksql.rest.entity.SchemaInfo;
Expand All @@ -78,9 +79,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
Expand Down Expand Up @@ -233,10 +232,10 @@ public void testPrintCommandStatus() {
@Test
public void testPrintPropertyList() {
// Given:
final Map<String, Object> properties = new HashMap<>();
properties.put("k1", 1);
properties.put("k2", "v2");
properties.put("k3", true);
final List<Property> properties = new ArrayList<>();
properties.add(new Property("k1", "KSQL", "1"));
properties.add(new Property("k2", "KSQL", "v2"));
properties.add(new Property("k3", "KSQL", "true"));

final KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of(
new PropertiesList("e", properties, Collections.emptyList(), Collections.emptyList())
Expand All @@ -251,23 +250,31 @@ public void testPrintPropertyList() {
assertThat(output, is("[ {\n"
+ " \"@type\" : \"properties\",\n"
+ " \"statementText\" : \"e\",\n"
+ " \"properties\" : {\n"
+ " \"k1\" : 1,\n"
+ " \"k2\" : \"v2\",\n"
+ " \"k3\" : true\n"
+ " },\n"
+ " \"properties\" : [ {\n"
+ " \"name\" : \"k1\",\n"
+ " \"scope\" : \"KSQL\",\n"
+ " \"value\" : \"1\"\n"
+ " }, {\n"
+ " \"name\" : \"k2\",\n"
+ " \"scope\" : \"KSQL\",\n"
+ " \"value\" : \"v2\"\n"
+ " }, {\n"
+ " \"name\" : \"k3\",\n"
+ " \"scope\" : \"KSQL\",\n"
+ " \"value\" : \"true\"\n"
+ " } ],\n"
+ " \"overwrittenProperties\" : [ ],\n"
+ " \"defaultProperties\" : [ ],\n"
+ " \"warnings\" : [ ]\n"
+ "} ]\n"));
} else {
assertThat(output, is("\n"
+ " Property | Default override | Effective Value \n"
+ "-----------------------------------------------\n"
+ " k1 | SERVER | 1 \n"
+ " k2 | SERVER | v2 \n"
+ " k3 | SERVER | true \n"
+ "-----------------------------------------------\n"));
+ " Property | Scope | Default override | Effective Value \n"
+ "-------------------------------------------------------\n"
+ " k1 | KSQL | SERVER | 1 \n"
+ " k2 | KSQL | SERVER | v2 \n"
+ " k3 | KSQL | SERVER | true \n"
+ "-------------------------------------------------------\n"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.cli.console.Console;
import io.confluent.ksql.cli.console.table.Table;
import io.confluent.ksql.rest.entity.PropertiesList;
import io.confluent.ksql.rest.entity.PropertiesList.Property;
import io.confluent.ksql.util.KsqlConfig;
import java.io.PrintWriter;
import java.util.Collections;
Expand Down Expand Up @@ -61,7 +61,7 @@ public void setUp() {
public void shouldHandleClientOverwrittenProperties() {
// Given:
final PropertiesList propList = new PropertiesList("list properties;",
ImmutableMap.of(SOME_KEY, "earliest"),
ImmutableList.of(new Property(SOME_KEY, "KSQL", "earliest")),
ImmutableList.of(SOME_KEY),
Collections.emptyList()
);
Expand All @@ -70,14 +70,14 @@ public void shouldHandleClientOverwrittenProperties() {
final Table table = builder.buildTable(propList);

// Then:
assertThat(getRows(table), contains(row(SOME_KEY, "SESSION", "earliest")));
assertThat(getRows(table), contains(row(SOME_KEY, "KSQL", "SESSION", "earliest")));
}

@Test
public void shouldHandleServerOverwrittenProperties() {
// Given:
final PropertiesList propList = new PropertiesList("list properties;",
ImmutableMap.of(SOME_KEY, "earliest"),
ImmutableList.of(new Property(SOME_KEY, "KSQL", "earliest")),
Collections.emptyList(),
Collections.emptyList()
);
Expand All @@ -86,14 +86,14 @@ public void shouldHandleServerOverwrittenProperties() {
final Table table = builder.buildTable(propList);

// Then:
assertThat(getRows(table), contains(row(SOME_KEY, "SERVER", "earliest")));
assertThat(getRows(table), contains(row(SOME_KEY, "KSQL", "SERVER", "earliest")));
}

@Test
public void shouldHandleDefaultProperties() {
// Given:
final PropertiesList propList = new PropertiesList("list properties;",
ImmutableMap.of(SOME_KEY, "earliest"),
ImmutableList.of(new Property(SOME_KEY, "KSQL", "earliest")),
Collections.emptyList(),
ImmutableList.of(SOME_KEY)
);
Expand All @@ -102,14 +102,14 @@ public void shouldHandleDefaultProperties() {
final Table table = builder.buildTable(propList);

// Then:
assertThat(getRows(table), contains(row(SOME_KEY, "", "earliest")));
assertThat(getRows(table), contains(row(SOME_KEY, "KSQL", "", "earliest")));
}

@Test
public void shouldHandlePropertiesWithNullValue() {
// Given:
final PropertiesList propList = new PropertiesList("list properties;",
Collections.singletonMap(SOME_KEY, null),
Collections.singletonList(new Property(SOME_KEY, "KSQL", null)),
Collections.emptyList(),
ImmutableList.of(SOME_KEY)
);
Expand All @@ -118,7 +118,7 @@ public void shouldHandlePropertiesWithNullValue() {
final Table table = builder.buildTable(propList);

// Then:
assertThat(getRows(table), contains(row(SOME_KEY, "", "NULL")));
assertThat(getRows(table), contains(row(SOME_KEY, "KSQL", "", "NULL")));
}

private List<List<String>> getRows(final Table table) {
Expand All @@ -130,9 +130,10 @@ private List<List<String>> getRows(final Table table) {
@SuppressWarnings("SameParameterValue")
private static List<String> row(
final String property,
final String scope,
final String defaultValue,
final String actualValue
) {
return ImmutableList.of(property, defaultValue, actualValue);
return ImmutableList.of(property, scope, defaultValue, actualValue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,23 @@
import io.confluent.ksql.parser.tree.ListProperties;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.PropertiesList;
import io.confluent.ksql.rest.entity.PropertiesList.Property;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;

import org.apache.kafka.common.utils.Utils;

public final class ListPropertiesExecutor {

private ListPropertiesExecutor() { }
Expand All @@ -44,25 +52,56 @@ public static Optional<KsqlEntity> execute(
final Map<String, String> engineProperties
= statement.getConfig().getAllConfigPropsWithSecretsObfuscated();

final Map<String, String> mergedProperties = statement.getConfig()
.cloneWithPropertyOverwrite(statement.getOverrides())
.getAllConfigPropsWithSecretsObfuscated();
final List<Property> mergedProperties = mergedProperties(statement);

final List<String> overwritten = mergedProperties.entrySet()
final List<String> overwritten = mergedProperties
.stream()
.filter(e -> !Objects.equals(engineProperties.get(e.getKey()), e.getValue()))
.map(Entry::getKey)
.filter(property -> !Objects.equals(
engineProperties.get(property.getName()), property.getValue()))
.map(Property::getName)
.collect(Collectors.toList());

final List<String> defaultProps = mergedProperties.entrySet().stream()
.filter(e -> resolver.resolve(e.getKey(), false)
.map(resolved -> resolved.isDefaultValue(e.getValue()))
final List<String> defaultProps = mergedProperties.stream()
.filter(property -> resolver.resolve(property.getName(), false)
.map(resolved -> resolved.isDefaultValue(property.getValue()))
.orElse(false))
.map(Entry::getKey)
.map(Property::getName)
.collect(Collectors.toList());

return Optional.of(new PropertiesList(
statement.getStatementText(), mergedProperties, overwritten, defaultProps));
}

private static List<Property> mergedProperties(
ConfiguredStatement<ListProperties> statement) {
final List<Property> mergedProperties = new ArrayList<>();

statement.getConfig()
.cloneWithPropertyOverwrite(statement.getOverrides())
.getAllConfigPropsWithSecretsObfuscated()
.forEach((key, value) -> mergedProperties.add(new Property(key, "KSQL", value)));

embeddedConnectWorkerProperties(statement)
.forEach((key, value) ->
mergedProperties.add(new Property(key, "EMBEDDED CONNECT WORKER", value)));

return mergedProperties;
}

private static Map<String, String> embeddedConnectWorkerProperties(
ConfiguredStatement<ListProperties> statement) {
String configFile = statement.getConfig()
.getString(KsqlConfig.CONNECT_WORKER_CONFIG_FILE_PROPERTY);
return !configFile.isEmpty()
? Utils.propsToStringMap(getWorkerProps(configFile))
: Collections.emptyMap();
}

private static Properties getWorkerProps(String configFile) {
try {
return Utils.loadProps(configFile);
} catch (IOException e) {
return new Properties();
}
}
}
Loading

0 comments on commit b23dae9

Please sign in to comment.