Skip to content

Commit

Permalink
feat: implemention of KLIP-13 (#4099)
Browse files Browse the repository at this point in the history
alex-dukhno authored and agavra committed Dec 13, 2019

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 858e4dc commit b23dae9
Showing 9 changed files with 196 additions and 65 deletions.
2 changes: 1 addition & 1 deletion design-proposals/README.md
Original file line number Diff line number Diff line change
@@ -53,7 +53,7 @@ Next KLIP number: **14**
| [KLIP-10: Suppress](klip-10-suppress.md) | Proposal | N/A |
| [KLIP-11: Redesign KSQL query language](klip-11-DQL.md) | Proposal | N/A |
| [KLIP-12: Implement High-Availability for Pull queries](klip-12-pull-high-availability.md)| Proposal | N/A |
| [KLIP-13: Introduce KSQL command to print connect worker properties to the console](klip-13-introduce-KSQL-command-to-print-connect-worker-properties-to-the-console.md) | Proposal | N/A |
| [KLIP-13: Introduce KSQL command to print connect worker properties to the console](klip-13-introduce-KSQL-command-to-print-connect-worker-properties-to-the-console.md) | Merged | 5.5 |
| [KLIP-14: ROWTIME as Pseudocolumn](klip-14-rowtime-as-pseudocolumn.md) | Approved | N/A |
| [KLIP-15: KSQLDB new API and Client(klip-15-new-api-and-client.md | Proposal | N/A |
| [KLIP-16: Introduce 'K$' dynamic views | Proposal | N/A |
Original file line number Diff line number Diff line change
@@ -19,17 +19,18 @@
import io.confluent.ksql.cli.console.table.Table;
import io.confluent.ksql.cli.console.table.Table.Builder;
import io.confluent.ksql.rest.entity.PropertiesList;
import io.confluent.ksql.rest.entity.PropertiesList.Property;

import java.util.Comparator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;

public class PropertiesListTableBuilder implements TableBuilder<PropertiesList> {

private static final List<String> HEADERS =
ImmutableList.of("Property", "Default override", "Effective Value");
ImmutableList.of("Property", "Scope", "Default override", "Effective Value");

@Override
public Table buildTable(final PropertiesList entity) {
@@ -42,42 +43,47 @@ public Table buildTable(final PropertiesList entity) {
private static List<List<String>> defRowValues(final List<PropertyDef> properties) {
return properties.stream()
.sorted(Comparator.comparing(propertyDef -> propertyDef.propertyName))
.map(
def -> ImmutableList.of(def.propertyName, def.overrideType, def.effectiveValue))
.map(def -> ImmutableList.of(
def.propertyName, def.scope, def.overrideType, def.effectiveValue))
.collect(Collectors.toList());
}

private static List<PropertyDef> propertiesListWithOverrides(final PropertiesList properties) {

final Function<Entry<String, ?>, PropertyDef> toPropertyDef = e -> {
final String value = e.getValue() == null ? "NULL" : e.getValue().toString();
if (properties.getOverwrittenProperties().contains(e.getKey())) {
return new PropertyDef(e.getKey(), "SESSION", value);
final Function<Property, PropertyDef> toPropertyDef = property -> {
final String value = property.getValue() == null ? "NULL" : property.getValue();
final String name = property.getName();
final String scope = property.getScope();
if (properties.getOverwrittenProperties().contains(name)) {
return new PropertyDef(name, scope, "SESSION", value);
}

if (properties.getDefaultProperties().contains(e.getKey())) {
return new PropertyDef(e.getKey(), "", value);
if (properties.getDefaultProperties().contains(name)) {
return new PropertyDef(name, scope, "", value);
}

return new PropertyDef(e.getKey(), "SERVER", value);
return new PropertyDef(name, scope, "SERVER", value);
};

return properties.getProperties().entrySet().stream()
return properties.getProperties().stream()
.map(toPropertyDef)
.collect(Collectors.toList());
}

private static class PropertyDef {

private final String propertyName;
private final String scope;
private final String overrideType;
private final String effectiveValue;

PropertyDef(
final String propertyName,
final String scope,
final String overrideType,
final String effectiveValue) {
this.propertyName = Objects.requireNonNull(propertyName, "propertyName");
this.scope = Objects.requireNonNull(scope, "scope");
this.overrideType = Objects.requireNonNull(overrideType, "overrideType");
this.effectiveValue = Objects.requireNonNull(effectiveValue, "effectiveValue");
}
3 changes: 3 additions & 0 deletions ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java
Original file line number Diff line number Diff line change
@@ -443,17 +443,20 @@ public void testPropertySetUnset() {
// SERVER OVERRIDES:
row(
KsqlConfig.KSQL_STREAMS_PREFIX + StreamsConfig.NUM_STREAM_THREADS_CONFIG,
"KSQL",
SERVER_OVERRIDE,
"4"
),
row(
KsqlConfig.SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_MS_PROPERTY,
"KSQL",
SERVER_OVERRIDE,
"" + (KsqlConstants.defaultSinkWindowChangeLogAdditionalRetention + 1)
),
// SESSION OVERRIDES:
row(
KsqlConfig.KSQL_STREAMS_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"KSQL",
SESSION_OVERRIDE,
"latest"
)
Original file line number Diff line number Diff line change
@@ -56,6 +56,7 @@
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.KsqlWarning;
import io.confluent.ksql.rest.entity.PropertiesList;
import io.confluent.ksql.rest.entity.PropertiesList.Property;
import io.confluent.ksql.rest.entity.Queries;
import io.confluent.ksql.rest.entity.RunningQuery;
import io.confluent.ksql.rest.entity.SchemaInfo;
@@ -78,9 +79,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
@@ -233,10 +232,10 @@ public void testPrintCommandStatus() {
@Test
public void testPrintPropertyList() {
// Given:
final Map<String, Object> properties = new HashMap<>();
properties.put("k1", 1);
properties.put("k2", "v2");
properties.put("k3", true);
final List<Property> properties = new ArrayList<>();
properties.add(new Property("k1", "KSQL", "1"));
properties.add(new Property("k2", "KSQL", "v2"));
properties.add(new Property("k3", "KSQL", "true"));

final KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of(
new PropertiesList("e", properties, Collections.emptyList(), Collections.emptyList())
@@ -251,23 +250,31 @@ public void testPrintPropertyList() {
assertThat(output, is("[ {\n"
+ " \"@type\" : \"properties\",\n"
+ " \"statementText\" : \"e\",\n"
+ " \"properties\" : {\n"
+ " \"k1\" : 1,\n"
+ " \"k2\" : \"v2\",\n"
+ " \"k3\" : true\n"
+ " },\n"
+ " \"properties\" : [ {\n"
+ " \"name\" : \"k1\",\n"
+ " \"scope\" : \"KSQL\",\n"
+ " \"value\" : \"1\"\n"
+ " }, {\n"
+ " \"name\" : \"k2\",\n"
+ " \"scope\" : \"KSQL\",\n"
+ " \"value\" : \"v2\"\n"
+ " }, {\n"
+ " \"name\" : \"k3\",\n"
+ " \"scope\" : \"KSQL\",\n"
+ " \"value\" : \"true\"\n"
+ " } ],\n"
+ " \"overwrittenProperties\" : [ ],\n"
+ " \"defaultProperties\" : [ ],\n"
+ " \"warnings\" : [ ]\n"
+ "} ]\n"));
} else {
assertThat(output, is("\n"
+ " Property | Default override | Effective Value \n"
+ "-----------------------------------------------\n"
+ " k1 | SERVER | 1 \n"
+ " k2 | SERVER | v2 \n"
+ " k3 | SERVER | true \n"
+ "-----------------------------------------------\n"));
+ " Property | Scope | Default override | Effective Value \n"
+ "-------------------------------------------------------\n"
+ " k1 | KSQL | SERVER | 1 \n"
+ " k2 | KSQL | SERVER | v2 \n"
+ " k3 | KSQL | SERVER | true \n"
+ "-------------------------------------------------------\n"));
}
}

Original file line number Diff line number Diff line change
@@ -22,10 +22,10 @@
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.cli.console.Console;
import io.confluent.ksql.cli.console.table.Table;
import io.confluent.ksql.rest.entity.PropertiesList;
import io.confluent.ksql.rest.entity.PropertiesList.Property;
import io.confluent.ksql.util.KsqlConfig;
import java.io.PrintWriter;
import java.util.Collections;
@@ -61,7 +61,7 @@ public void setUp() {
public void shouldHandleClientOverwrittenProperties() {
// Given:
final PropertiesList propList = new PropertiesList("list properties;",
ImmutableMap.of(SOME_KEY, "earliest"),
ImmutableList.of(new Property(SOME_KEY, "KSQL", "earliest")),
ImmutableList.of(SOME_KEY),
Collections.emptyList()
);
@@ -70,14 +70,14 @@ public void shouldHandleClientOverwrittenProperties() {
final Table table = builder.buildTable(propList);

// Then:
assertThat(getRows(table), contains(row(SOME_KEY, "SESSION", "earliest")));
assertThat(getRows(table), contains(row(SOME_KEY, "KSQL", "SESSION", "earliest")));
}

@Test
public void shouldHandleServerOverwrittenProperties() {
// Given:
final PropertiesList propList = new PropertiesList("list properties;",
ImmutableMap.of(SOME_KEY, "earliest"),
ImmutableList.of(new Property(SOME_KEY, "KSQL", "earliest")),
Collections.emptyList(),
Collections.emptyList()
);
@@ -86,14 +86,14 @@ public void shouldHandleServerOverwrittenProperties() {
final Table table = builder.buildTable(propList);

// Then:
assertThat(getRows(table), contains(row(SOME_KEY, "SERVER", "earliest")));
assertThat(getRows(table), contains(row(SOME_KEY, "KSQL", "SERVER", "earliest")));
}

@Test
public void shouldHandleDefaultProperties() {
// Given:
final PropertiesList propList = new PropertiesList("list properties;",
ImmutableMap.of(SOME_KEY, "earliest"),
ImmutableList.of(new Property(SOME_KEY, "KSQL", "earliest")),
Collections.emptyList(),
ImmutableList.of(SOME_KEY)
);
@@ -102,14 +102,14 @@ public void shouldHandleDefaultProperties() {
final Table table = builder.buildTable(propList);

// Then:
assertThat(getRows(table), contains(row(SOME_KEY, "", "earliest")));
assertThat(getRows(table), contains(row(SOME_KEY, "KSQL", "", "earliest")));
}

@Test
public void shouldHandlePropertiesWithNullValue() {
// Given:
final PropertiesList propList = new PropertiesList("list properties;",
Collections.singletonMap(SOME_KEY, null),
Collections.singletonList(new Property(SOME_KEY, "KSQL", null)),
Collections.emptyList(),
ImmutableList.of(SOME_KEY)
);
@@ -118,7 +118,7 @@ public void shouldHandlePropertiesWithNullValue() {
final Table table = builder.buildTable(propList);

// Then:
assertThat(getRows(table), contains(row(SOME_KEY, "", "NULL")));
assertThat(getRows(table), contains(row(SOME_KEY, "KSQL", "", "NULL")));
}

private List<List<String>> getRows(final Table table) {
@@ -130,9 +130,10 @@ private List<List<String>> getRows(final Table table) {
@SuppressWarnings("SameParameterValue")
private static List<String> row(
final String property,
final String scope,
final String defaultValue,
final String actualValue
) {
return ImmutableList.of(property, defaultValue, actualValue);
return ImmutableList.of(property, scope, defaultValue, actualValue);
}
}
Original file line number Diff line number Diff line change
@@ -20,15 +20,23 @@
import io.confluent.ksql.parser.tree.ListProperties;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.PropertiesList;
import io.confluent.ksql.rest.entity.PropertiesList.Property;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;

import org.apache.kafka.common.utils.Utils;

public final class ListPropertiesExecutor {

private ListPropertiesExecutor() { }
@@ -44,25 +52,56 @@ public static Optional<KsqlEntity> execute(
final Map<String, String> engineProperties
= statement.getConfig().getAllConfigPropsWithSecretsObfuscated();

final Map<String, String> mergedProperties = statement.getConfig()
.cloneWithPropertyOverwrite(statement.getOverrides())
.getAllConfigPropsWithSecretsObfuscated();
final List<Property> mergedProperties = mergedProperties(statement);

final List<String> overwritten = mergedProperties.entrySet()
final List<String> overwritten = mergedProperties
.stream()
.filter(e -> !Objects.equals(engineProperties.get(e.getKey()), e.getValue()))
.map(Entry::getKey)
.filter(property -> !Objects.equals(
engineProperties.get(property.getName()), property.getValue()))
.map(Property::getName)
.collect(Collectors.toList());

final List<String> defaultProps = mergedProperties.entrySet().stream()
.filter(e -> resolver.resolve(e.getKey(), false)
.map(resolved -> resolved.isDefaultValue(e.getValue()))
final List<String> defaultProps = mergedProperties.stream()
.filter(property -> resolver.resolve(property.getName(), false)
.map(resolved -> resolved.isDefaultValue(property.getValue()))
.orElse(false))
.map(Entry::getKey)
.map(Property::getName)
.collect(Collectors.toList());

return Optional.of(new PropertiesList(
statement.getStatementText(), mergedProperties, overwritten, defaultProps));
}

private static List<Property> mergedProperties(
ConfiguredStatement<ListProperties> statement) {
final List<Property> mergedProperties = new ArrayList<>();

statement.getConfig()
.cloneWithPropertyOverwrite(statement.getOverrides())
.getAllConfigPropsWithSecretsObfuscated()
.forEach((key, value) -> mergedProperties.add(new Property(key, "KSQL", value)));

embeddedConnectWorkerProperties(statement)
.forEach((key, value) ->
mergedProperties.add(new Property(key, "EMBEDDED CONNECT WORKER", value)));

return mergedProperties;
}

private static Map<String, String> embeddedConnectWorkerProperties(
ConfiguredStatement<ListProperties> statement) {
String configFile = statement.getConfig()
.getString(KsqlConfig.CONNECT_WORKER_CONFIG_FILE_PROPERTY);
return !configFile.isEmpty()
? Utils.propsToStringMap(getWorkerProps(configFile))
: Collections.emptyMap();
}

private static Properties getWorkerProps(String configFile) {
try {
return Utils.loadProps(configFile);
} catch (IOException e) {
return new Properties();
}
}
}
Original file line number Diff line number Diff line change
@@ -19,21 +19,24 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isIn;
import static org.hamcrest.Matchers.not;

import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.rest.entity.PropertiesList;
import io.confluent.ksql.rest.entity.PropertiesList.Property;
import io.confluent.ksql.rest.server.TemporaryEngine;
import io.confluent.ksql.util.KsqlConfig;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;

import java.util.HashMap;
import java.util.Map;

@RunWith(MockitoJUnitRunner.class)
public class ListPropertiesExecutorTest {

@@ -50,11 +53,20 @@ public void shouldListProperties() {
).orElseThrow(IllegalStateException::new);

// Then:
assertThat(properties.getProperties(),
assertThat(
toMap(properties),
equalTo(engine.getKsqlConfig().getAllConfigPropsWithSecretsObfuscated()));
assertThat(properties.getOverwrittenProperties(), is(empty()));
}

private Map<String, String> toMap(PropertiesList properties) {
Map<String, String> map = new HashMap<>();
for (Property property : properties.getProperties()) {
map.put(property.getName(), property.getValue());
}
return map;
}

@Test
public void shouldListPropertiesWithOverrides() {
// When:
@@ -67,8 +79,9 @@ public void shouldListPropertiesWithOverrides() {
).orElseThrow(IllegalStateException::new);

// Then:
assertThat(properties.getProperties(),
hasEntry("ksql.streams.auto.offset.reset", "latest"));
assertThat(
properties.getProperties(),
hasItem(new Property("ksql.streams.auto.offset.reset", "KSQL", "latest")));
assertThat(properties.getOverwrittenProperties(), hasItem("ksql.streams.auto.offset.reset"));
}

@@ -83,8 +96,8 @@ public void shouldNotListSslProperties() {
).orElseThrow(IllegalStateException::new);

// Then:
assertThat(properties.getProperties(), not(hasKey(isIn(KsqlConfig.SSL_CONFIG_NAMES))));
assertThat(
toMap(properties),
not(hasKey(isIn(KsqlConfig.SSL_CONFIG_NAMES))));
}


}
Original file line number Diff line number Diff line change
@@ -98,6 +98,7 @@
import io.confluent.ksql.rest.entity.KsqlRequest;
import io.confluent.ksql.rest.entity.KsqlStatementErrorMessage;
import io.confluent.ksql.rest.entity.PropertiesList;
import io.confluent.ksql.rest.entity.PropertiesList.Property;
import io.confluent.ksql.rest.entity.Queries;
import io.confluent.ksql.rest.entity.QueryDescription;
import io.confluent.ksql.rest.entity.QueryDescriptionEntity;
@@ -1516,7 +1517,9 @@ public void shouldListPropertiesWithOverrides() {
new KsqlRequest("list properties;", overrides, null), PropertiesList.class);

// Then:
assertThat(props.getProperties().get("ksql.streams.auto.offset.reset"), is("latest"));
assertThat(
props.getProperties(),
hasItem(new Property("ksql.streams.auto.offset.reset", "KSQL", "latest")));
assertThat(props.getOverwrittenProperties(), hasItem("ksql.streams.auto.offset.reset"));
}

@@ -1661,7 +1664,8 @@ public void shouldNotIncludeSslPropertiesInListPropertiesOutput() {
final PropertiesList props = makeSingleRequest("list properties;", PropertiesList.class);

// Then:
assertThat(props.getProperties().keySet(),
assertThat(props.getProperties().stream().map(Property::getName).collect(
Collectors.toList()),
not(hasItems(KsqlConfig.SSL_CONFIG_NAMES.toArray(new String[0]))));
}

Original file line number Diff line number Diff line change
@@ -18,34 +18,92 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;

@JsonIgnoreProperties(ignoreUnknown = true)
public class PropertiesList extends KsqlEntity {
private final Map<String, ?> properties;
@JsonIgnoreProperties(ignoreUnknown = true)
public static class Property {
private final String name;
private final String scope;
private final String value;

@JsonCreator
public Property(
@JsonProperty("name") final String name,
@JsonProperty("scope") final String scope,
@JsonProperty("value") final String value
) {
this.name = name;
this.scope = scope;
this.value = value;
}

public String getName() {
return name;
}

public String getScope() {
return scope;
}

public String getValue() {
return value;
}

@Override
public boolean equals(Object object) {
if (this == object) {
return true;
}
if (object == null || getClass() != object.getClass()) {
return false;
}
Property that = (Property) object;
return Objects.equals(name, that.name)
&& Objects.equals(scope, that.scope)
&& Objects.equals(value, that.value);
}

@Override
public int hashCode() {
return Objects.hash(name, scope, value);
}

@Override
public String toString() {
return "Property{"
+ "name='" + name + '\''
+ ", scope='" + scope + '\''
+ ", value='" + value + '\''
+ '}';
}
}

private final List<Property> properties;
private final List<String> overwrittenProperties;
private final List<String> defaultProperties;

@JsonCreator
public PropertiesList(
@JsonProperty("statementText") final String statementText,
@JsonProperty("properties") final Map<String, ?> properties,
@JsonProperty("properties") final List<Property> properties,
@JsonProperty("overwrittenProperties") final List<String> overwrittenProperties,
@JsonProperty("defaultProperties") final List<String> defaultProperties
) {
super(statementText);
this.properties = properties == null
? Collections.emptyMap() : properties;
? Collections.emptyList() : properties;
this.overwrittenProperties = overwrittenProperties == null
? Collections.emptyList() : overwrittenProperties;
this.defaultProperties = defaultProperties == null
? Collections.emptyList() : defaultProperties;
}

public Map<String, ?> getProperties() {
public List<Property> getProperties() {
return properties;
}

0 comments on commit b23dae9

Please sign in to comment.