Skip to content

Commit

Permalink
refactor: fix GenericRow encapsulation (#4414)
Browse files Browse the repository at this point in the history
* refactor: fix GenericRow encapsulation

The `GenericRow` that is so pervasive through the code has always been a bit of a mess:
 * it's untyped: just a list of objects.
 * it has verbose `toString` implementation.
 * it has a weird `equals`, which is also out of whack with its `hashCode`.
 * it leaks its internal state like crazy: i.e. it breaks encapsulation. It was basically a very leaky wrapper around a `List<Object>`.

Following on from #4412, which added some new functionality to `GenericRow`, I couldn't resist spending some time fixing some of these issues.

This change fixes all but the first issue. It:
 * simplifies `toString` impl, removing array type handling, (which prod code doesn't use).
   Note: `toString` is not called in prod code that I can tell, it's just for debugging.
 * removes funkiness from `equals`. Looks like `equals` is also only called from tests.
 * it fixes encapsulations: it no longer leaks mutable internal state via `getColumns` or constructors not taking defensive copies.  Mutating the state now requires explicit invocation of `GenericRow` methods. It's use of `List<Object>` internally is now an implementation detail, as it should be.

 This change does not fix the fact that `GenericRow` is untyped.

 To facilitate this change `KudafAggregator` has also been updated to accept the number of non-aggregate columns, rather than a list of non-aggregate column indexes. This is possible as the list of indexes was always sequential and zero-based, e.g. [0, 1, 2].
  • Loading branch information
big-andy-coates authored Feb 4, 2020
1 parent 29b7ef7 commit c03c9b7
Show file tree
Hide file tree
Showing 60 changed files with 588 additions and 523 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ public void printStreamedRow(final StreamedRow row) {
if (row.getRow().isPresent()) {
switch (outputFormat) {
case JSON:
printAsJson(row.getRow().get().getColumns());
printAsJson(row.getRow().get().values());
break;
case TABULAR:
printAsTable(row.getRow().get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public static TabularRow createRow(
) {
return new TabularRow(
width,
value.getColumns().stream().map(Objects::toString).collect(Collectors.toList()),
value.values().stream().map(Objects::toString).collect(Collectors.toList()),
false,
config.getString(CliConfig.WRAP_CONFIG).equalsIgnoreCase(OnOff.ON.toString())
);
Expand Down
119 changes: 56 additions & 63 deletions ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.ksql.cli;

import static io.confluent.ksql.GenericRow.genericRow;
import static io.confluent.ksql.test.util.AssertEventually.assertThatEventually;
import static javax.ws.rs.core.Response.Status.NOT_ACCEPTABLE;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -486,52 +487,45 @@ public void testPersistentSelectStar() {
@Test
public void testSelectProject() {
final Map<Long, GenericRow> expectedResults = new HashMap<>();
expectedResults.put(1L, new GenericRow(
ImmutableList.of(
"ITEM_1",
10.0,
new Double[]{100.0, 110.99, 90.0})));
expectedResults.put(2L, new GenericRow(
ImmutableList.of(
"ITEM_2",
20.0,
new Double[]{10.0, 10.99, 9.0})));

expectedResults.put(3L, new GenericRow(
ImmutableList.of(
"ITEM_3",
30.0,
new Double[]{10.0, 10.99, 91.0})));

expectedResults.put(4L, new GenericRow(
ImmutableList.of(
"ITEM_4",
40.0,
new Double[]{10.0, 140.99, 94.0})));

expectedResults.put(5L, new GenericRow(
ImmutableList.of(
"ITEM_5",
50.0,
new Double[]{160.0, 160.99, 98.0})));

expectedResults.put(6L, new GenericRow(
ImmutableList.of(
"ITEM_6",
60.0,
new Double[]{1000.0, 1100.99, 900.0})));

expectedResults.put(7L, new GenericRow(
ImmutableList.of(
"ITEM_7",
70.0,
new Double[]{1100.0, 1110.99, 190.0})));

expectedResults.put(8L, new GenericRow(
ImmutableList.of(
"ITEM_8",
80.0,
new Double[]{1100.0, 1110.99, 970.0})));
expectedResults.put(1L, genericRow(
"ITEM_1",
10.0,
ImmutableList.of(100.0, 110.99, 90.0)));

expectedResults.put(2L, genericRow(
"ITEM_2",
20.0,
ImmutableList.of(10.0, 10.99, 9.0)));

expectedResults.put(3L, genericRow(
"ITEM_3",
30.0,
ImmutableList.of(10.0, 10.99, 91.0)));

expectedResults.put(4L, genericRow(
"ITEM_4",
40.0,
ImmutableList.of(10.0, 140.99, 94.0)));

expectedResults.put(5L, genericRow(
"ITEM_5",
50.0,
ImmutableList.of(160.0, 160.99, 98.0)));

expectedResults.put(6L, genericRow(
"ITEM_6",
60.0,
ImmutableList.of(1000.0, 1100.99, 900.0)));

expectedResults.put(7L, genericRow(
"ITEM_7",
70.0,
ImmutableList.of(1100.0, 1110.99, 190.0)));

expectedResults.put(8L, genericRow(
"ITEM_8",
80.0,
ImmutableList.of(1100.0, 1110.99, 970.0)));

final PhysicalSchema resultSchema = PhysicalSchema.from(
LogicalSchema.builder()
Expand All @@ -557,15 +551,14 @@ public void testSelectFilter() {
mapField.put("key1", 1.0);
mapField.put("key2", 2.0);
mapField.put("key3", 3.0);
expectedResults.put(8L, new GenericRow(
ImmutableList.of(
8L,
"ORDER_6",
"ITEM_8",
80.0,
"2018-01-08",
new Double[]{1100.0, 1110.99, 970.0},
mapField)));
expectedResults.put(8L, genericRow(
8L,
"ORDER_6",
"ITEM_8",
80.0,
"2018-01-08",
ImmutableList.of(1100.0, 1110.99, 970.0),
mapField));

testCreateStreamAsSelect(
"SELECT * FROM " + orderDataProvider.kstreamName() + " WHERE ORDERUNITS > 20 AND ITEMID = 'ITEM_8';",
Expand All @@ -577,9 +570,9 @@ public void testSelectFilter() {
@Test
public void testTransientSelect() {
final Map<Long, GenericRow> streamData = orderDataProvider.data();
final List<Object> row1 = streamData.get(1L).getColumns();
final List<Object> row2 = streamData.get(2L).getColumns();
final List<Object> row3 = streamData.get(3L).getColumns();
final List<Object> row1 = streamData.get(1L).values();
final List<Object> row2 = streamData.get(2L).values();
final List<Object> row3 = streamData.get(3L).values();

selectWithLimit(
"SELECT ORDERID, ITEMID FROM " + orderDataProvider.kstreamName() + " EMIT CHANGES",
Expand Down Expand Up @@ -640,9 +633,9 @@ public void shouldOutputPullQueryHeader() {
@Test
public void testTransientContinuousSelectStar() {
final Map<Long, GenericRow> streamData = orderDataProvider.data();
final List<Object> row1 = streamData.get(1L).getColumns();
final List<Object> row2 = streamData.get(2L).getColumns();
final List<Object> row3 = streamData.get(3L).getColumns();
final List<Object> row1 = streamData.get(1L).values();
final List<Object> row2 = streamData.get(2L).values();
final List<Object> row3 = streamData.get(3L).values();

selectWithLimit(
"SELECT * FROM " + orderDataProvider.kstreamName() + " EMIT CHANGES",
Expand Down Expand Up @@ -696,7 +689,7 @@ public void testSelectUDFs() {
);

final Map<Long, GenericRow> expectedResults = new HashMap<>();
expectedResults.put(8L, new GenericRow(ImmutableList.of("ITEM_8", 800.0, 1110.0, 12.0, true)));
expectedResults.put(8L, genericRow("ITEM_8", 800.0, 1110.0, 12.0, true));

testCreateStreamAsSelect(queryString, resultSchema, expectedResults);
}
Expand Down Expand Up @@ -1240,7 +1233,7 @@ public void addRows(final List<List<String>> rows) {

@Override
public void addRow(final GenericRow row) {
addRow(row.getColumns());
addRow(row.values());
}

private void addRow(final List<?> row) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.ksql.cli.console;

import static io.confluent.ksql.GenericRow.genericRow;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
Expand All @@ -31,7 +32,6 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.confluent.ksql.FakeException;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.TestTerminal;
import io.confluent.ksql.cli.console.Console.NoOpRowCaptor;
import io.confluent.ksql.cli.console.cmd.CliSpecificCommand;
Expand Down Expand Up @@ -154,7 +154,7 @@ public void after() {
@Test
public void testPrintGenericStreamedRow() {
// Given:
final StreamedRow row = StreamedRow.row(new GenericRow(ImmutableList.of("col_1", "col_2")));
final StreamedRow row = StreamedRow.row(genericRow("col_1", "col_2"));

// When:
console.printStreamedRow(row);
Expand Down
106 changes: 59 additions & 47 deletions ksql-common/src/main/java/io/confluent/ksql/GenericRow.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,99 +15,111 @@

package io.confluent.ksql;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

public class GenericRow {

private final ArrayList<Object> columns;
private final ArrayList<Object> values;

public GenericRow() {
this(0);
}

public GenericRow(final int initialCapacity) {
this.columns = new ArrayList<>(initialCapacity);
}

public GenericRow(final List<Object> columns) {
Objects.requireNonNull(columns);
this.columns = new ArrayList<>(columns);
this.values = new ArrayList<>(initialCapacity);
}

@VisibleForTesting // Only use from tests
public static GenericRow genericRow(final Object... columns) {
return new GenericRow(Arrays.asList(columns));
return new GenericRow().appendAll(Arrays.asList(columns));
}

/**
* Ensure the row has enough capacity to hold {@code additionalCapacity} more elements
* that its current size.
* Ensure the row has enough capacity to hold {@code additionalCapacity} more elements than its
* current size.
*
* <p>Useful to avoid unnecessary array copies when adding multiple elements.
*
* @param additionalCapacity the number of additional elements
*/
public void ensureAdditionalCapacity(final int additionalCapacity) {
columns.ensureCapacity(additionalCapacity + columns.size());
values.ensureCapacity(additionalCapacity + values.size());
}

public int size() {
return values.size();
}

public Object get(final int index) {
return values.get(index);
}

public void set(final int index, final Object value) {
values.set(index, value);
}

public GenericRow append(final Object value) {
values.add(value);
return this;
}

public GenericRow appendAll(final Collection<?> values) {
this.values.addAll(values);
return this;
}

@JsonProperty("columns")
public List<Object> values() {
return Collections.unmodifiableList(values);
}

@Override
public String toString() {
final StringBuilder stringBuilder = new StringBuilder("[ ");
int currentIndex = 0;
for (int i = 0; i < columns.size(); i++) {
final Object obj = columns.get(i);
if (obj == null) {
stringBuilder.append("null");
} else if (obj.getClass().isArray()) {
stringBuilder.append(Arrays.toString((Object[]) obj));
} else if (obj instanceof String) {
stringBuilder.append("'")
.append(obj)
.append("'");
} else {
stringBuilder.append(obj);
}

currentIndex++;
if (currentIndex < columns.size()) {
stringBuilder.append(" | ");
}
}
stringBuilder.append(" ]");
return stringBuilder.toString();
return values.stream()
.map(GenericRow::formatValue)
.collect(Collectors.joining(" | ", "[ ", " ]"));
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}

if (o == null || getClass() != o.getClass()) {
return false;
}
final GenericRow that = (GenericRow) o;
if (columns.size() != that.columns.size()) {
return false;
}

// For now string matching is used to compare the rows as double comparison will cause issues
return this.toString().equals(that.toString());
final GenericRow that = (GenericRow) o;
return Objects.equals(this.values, that.values);
}

@Override
public int hashCode() {
return Objects.hash(columns);
return Objects.hash(values);
}

public List<Object> getColumns() {
return columns;
}
private static String formatValue(final Object value) {
if (value == null) {
return "null";
}

if (value instanceof String) {
return "'" + value + "'";
}

if (value instanceof Long) {
return value.toString() + "L";
}

@SuppressWarnings("unchecked")
public <T> T getColumnValue(final int columnIndex) {
return (T) columns.get(columnIndex);
return value.toString();
}
}
Loading

0 comments on commit c03c9b7

Please sign in to comment.