Skip to content

Commit

Permalink
fix: earliest/latest_by_offset should accept nulls (#5729)
Browse files Browse the repository at this point in the history
* fix: earliest/latest_by_offset should accept nulls

fixes: #5727

The `EARLIEST_BY_OFFSET` and `LATEST_BY_OFFSET` should treat a `NULL` value the same as any other value.

If the first value seen is `null`, then `EARLIEST_BY_OFFSET` should return `null`.
If the last value seen is `null`, then `LATEST_BY_OFFSET` should return `null`.


Co-authored-by: Andy Coates <[email protected]>
  • Loading branch information
big-andy-coates and big-andy-coates authored Sep 11, 2020
1 parent 2d1697a commit 6eb5a41
Show file tree
Hide file tree
Showing 94 changed files with 10,974 additions and 275 deletions.
2 changes: 2 additions & 0 deletions docs/concepts/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,8 @@ enables you to define UDAFs like `average`, as shown in the following example.
When you create a UDAF, use the `map` method to provide the logic that
transforms an intermediate aggregate value to the returned value.

The `merge` method is only called when merging sessions when session windowing is used.

##### Example UDAF class

The following class creates a UDAF named `my_average`. The name of the UDAF
Expand Down
50 changes: 38 additions & 12 deletions docs/developer-guide/ksqldb-reference/aggregate-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,25 +102,38 @@ to estimate cardinalities of 10^9 with a typical standard error of 2%.
Since: 0.10.0

```sql
EARLIEST_BY_OFFSET(col1)
EARLIEST_BY_OFFSET(col1, [ignoreNulls])
```

Stream

Return the earliest value for a given column. Earliest here is defined as the value in the partition
with the lowest offset. Rows that have `col1` set to null are ignored.
Return the earliest value for the specified column. The earliest value in the partition

has the lowest offset.


The optional `ignoreNulls` parameter, available since version 0.13.0, controls whether nulls are ignored. The default

is to ignore null values.



Since: 0.13.0

```sql
EARLIEST_BY_OFFSET(col1,earliestN)
EARLIEST_BY_OFFSET(col1, earliestN, [ignoreNulls])
```

Stream

Return the earliest N values for a given column. Earliest here is defined as the values in the partition
with the lowest offsets. Rows that have `col1` set to null are ignored.
Return the earliest _N_ values for the specified column as an `ARRAY`. The earliest values

in the partition have the lowest offsets.


The optional `ignoreNulls` parameter controls whether nulls are ignored. The default

is to ignore null values.


## `HISTOGRAM`
Expand Down Expand Up @@ -150,24 +163,37 @@ the order they were originally processed.
Since: 0.8.0

```sql
LATEST_BY_OFFSET(col1)
LATEST_BY_OFFSET(col1, [ignoreNulls])
```

Stream

Return the latest value for a given column. Latest here is defined as the value in the partition
with the greatest offset. Rows that have `col1` set to null are ignored.
Return the latest value for the specified column. The latest value in the partition

has the largest offset.


The optional `ignoreNulls` parameter, available since version 0.13.0, controls whether nulls are ignored. The default

is to ignore null values.


Since: 0.13.0

```sql
LATEST_BY_OFFSET(col1,latestN)
LATEST_BY_OFFSET(col1, latestN, [ignoreNulls])
```

Stream

Returns the latest N values for a given column as an array of values. Latest here is also defined
with the greatest offset. Rows that have `col1` set to null are ignored.
Returns the latest _N_ values for the specified column as an `ARRAY`. The latest values have

the largest offset.


The optional `ignoreNulls` parameter controls whether nulls are ignored. The default is to ignore

null values.

## `MAX`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@

package io.confluent.ksql.function.udaf.offset;

import static io.confluent.ksql.function.udaf.KudafByOffsetUtils.INTERMEDIATE_STRUCT_COMPARATOR;
import static io.confluent.ksql.function.udaf.KudafByOffsetUtils.SEQ_FIELD;
import static io.confluent.ksql.function.udaf.KudafByOffsetUtils.STRUCT_BOOLEAN;
import static io.confluent.ksql.function.udaf.KudafByOffsetUtils.STRUCT_DOUBLE;
import static io.confluent.ksql.function.udaf.KudafByOffsetUtils.STRUCT_INTEGER;
import static io.confluent.ksql.function.udaf.KudafByOffsetUtils.STRUCT_LONG;
import static io.confluent.ksql.function.udaf.KudafByOffsetUtils.STRUCT_STRING;
import static io.confluent.ksql.function.udaf.KudafByOffsetUtils.VAL_FIELD;
import static io.confluent.ksql.function.udaf.offset.KudafByOffsetUtils.INTERMEDIATE_STRUCT_COMPARATOR;
import static io.confluent.ksql.function.udaf.offset.KudafByOffsetUtils.STRUCT_BOOLEAN;
import static io.confluent.ksql.function.udaf.offset.KudafByOffsetUtils.STRUCT_DOUBLE;
import static io.confluent.ksql.function.udaf.offset.KudafByOffsetUtils.STRUCT_INTEGER;
import static io.confluent.ksql.function.udaf.offset.KudafByOffsetUtils.STRUCT_LONG;
import static io.confluent.ksql.function.udaf.offset.KudafByOffsetUtils.STRUCT_STRING;
import static io.confluent.ksql.function.udaf.offset.KudafByOffsetUtils.VAL_FIELD;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.function.KsqlFunctionException;
import io.confluent.ksql.function.udaf.Udaf;
import io.confluent.ksql.function.udaf.UdafDescription;
Expand Down Expand Up @@ -54,93 +54,182 @@ private EarliestByOffset() {
@UdafFactory(description = "return the earliest value of an integer column",
aggregateSchema = "STRUCT<SEQ BIGINT, VAL INT>")
public static Udaf<Integer, Struct, Integer> earliestInteger() {
return earliest(STRUCT_INTEGER);
return earliestInteger(true);
}

@UdafFactory(description = "return the earliest value of an integer column",
aggregateSchema = "STRUCT<SEQ BIGINT, VAL INT>")
public static Udaf<Integer, Struct, Integer> earliestInteger(final boolean ignoreNulls) {
return earliest(STRUCT_INTEGER, ignoreNulls);
}

@UdafFactory(description = "return the earliest N values of an integer column",
aggregateSchema = "ARRAY<STRUCT<SEQ BIGINT, VAL INT>>")
public static Udaf<Integer, List<Struct>, List<Integer>> earliestIntegers(final int earliestN) {
return earliestN(STRUCT_INTEGER, earliestN);
return earliestIntegers(earliestN, true);
}

@UdafFactory(description = "return the earliest N values of an integer column",
aggregateSchema = "ARRAY<STRUCT<SEQ BIGINT, VAL INT>>")
public static Udaf<Integer, List<Struct>, List<Integer>> earliestIntegers(
final int earliestN,
final boolean ignoreNulls
) {
return earliestN(STRUCT_INTEGER, earliestN, ignoreNulls);
}

@UdafFactory(description = "return the earliest value of an big integer column",
aggregateSchema = "STRUCT<SEQ BIGINT, VAL BIGINT>")
public static Udaf<Long, Struct, Long> earliestLong() {
return earliest(STRUCT_LONG);
return earliestLong(true);
}

@UdafFactory(description = "return the earliest value of an big integer column",
aggregateSchema = "STRUCT<SEQ BIGINT, VAL BIGINT>")
public static Udaf<Long, Struct, Long> earliestLong(final boolean ignoreNulls) {
return earliest(STRUCT_LONG, ignoreNulls);
}

@UdafFactory(description = "return the earliest N values of an long column",
aggregateSchema = "ARRAY<STRUCT<SEQ BIGINT, VAL BIGINT>>")
public static Udaf<Long, List<Struct>, List<Long>> earliestLongs(final int earliestN) {
return earliestN(STRUCT_LONG, earliestN);
return earliestLongs(earliestN, true);
}

@UdafFactory(description = "return the earliest N values of an long column",
aggregateSchema = "ARRAY<STRUCT<SEQ BIGINT, VAL BIGINT>>")
public static Udaf<Long, List<Struct>, List<Long>> earliestLongs(
final int earliestN,
final boolean ignoreNulls
) {
return earliestN(STRUCT_LONG, earliestN, ignoreNulls);
}

@UdafFactory(description = "return the earliest value of a double column",
aggregateSchema = "STRUCT<SEQ BIGINT, VAL DOUBLE>")
public static Udaf<Double, Struct, Double> earliestDouble() {
return earliest(STRUCT_DOUBLE);
return earliestDouble(true);
}

@UdafFactory(description = "return the earliest value of a double column",
aggregateSchema = "STRUCT<SEQ BIGINT, VAL DOUBLE>")
public static Udaf<Double, Struct, Double> earliestDouble(final boolean ignoreNulls) {
return earliest(STRUCT_DOUBLE, ignoreNulls);
}

@UdafFactory(description = "return the earliest N values of a double column",
aggregateSchema = "ARRAY<STRUCT<SEQ BIGINT, VAL DOUBLE>>")
public static Udaf<Double, List<Struct>, List<Double>> earliestDoubles(final int earliestN) {
return earliestN(STRUCT_DOUBLE, earliestN);
return earliestDoubles(earliestN, true);
}

@UdafFactory(description = "return the earliest N values of a double column",
aggregateSchema = "ARRAY<STRUCT<SEQ BIGINT, VAL DOUBLE>>")
public static Udaf<Double, List<Struct>, List<Double>> earliestDoubles(
final int earliestN,
final boolean ignoreNulls
) {
return earliestN(STRUCT_DOUBLE, earliestN, ignoreNulls);
}

@UdafFactory(description = "return the earliest value of a boolean column",
aggregateSchema = "STRUCT<SEQ BIGINT, VAL BOOLEAN>")
public static Udaf<Boolean, Struct, Boolean> earliestBoolean() {
return earliest(STRUCT_BOOLEAN);
return earliestBoolean(true);
}

@UdafFactory(description = "return the earliest value of a boolean column",
aggregateSchema = "STRUCT<SEQ BIGINT, VAL BOOLEAN>")
public static Udaf<Boolean, Struct, Boolean> earliestBoolean(final boolean ignoreNulls) {
return earliest(STRUCT_BOOLEAN, ignoreNulls);
}

@UdafFactory(description = "return the earliest N values of a boolean column",
aggregateSchema = "ARRAY<STRUCT<SEQ BIGINT, VAL BOOLEAN>>")
public static Udaf<Boolean, List<Struct>, List<Boolean>> earliestBooleans(final int earliestN) {
return earliestN(STRUCT_BOOLEAN, earliestN);
return earliestBooleans(earliestN, true);
}

@UdafFactory(description = "return the earliest N values of a boolean column",
aggregateSchema = "ARRAY<STRUCT<SEQ BIGINT, VAL BOOLEAN>>")
public static Udaf<Boolean, List<Struct>, List<Boolean>> earliestBooleans(
final int earliestN,
final boolean ignoreNulls
) {
return earliestN(STRUCT_BOOLEAN, earliestN, ignoreNulls);
}

@UdafFactory(description = "return the earliest value of a string column",
aggregateSchema = "STRUCT<SEQ BIGINT, VAL STRING>")
public static Udaf<String, Struct, String> earliestString() {
return earliest(STRUCT_STRING);
return earliestString(true);
}

@UdafFactory(description = "return the earliest value of a string column",
aggregateSchema = "STRUCT<SEQ BIGINT, VAL STRING>")
public static Udaf<String, Struct, String> earliestString(final boolean ignoreNulls) {
return earliest(STRUCT_STRING, ignoreNulls);
}

@UdafFactory(description = "return the earliest N values of a string column",
aggregateSchema = "ARRAY<STRUCT<SEQ BIGINT, VAL STRING>>")
public static Udaf<String, List<Struct>, List<String>> earliestStrings(final int earliestN) {
return earliestN(STRUCT_STRING, earliestN);
return earliestStrings(earliestN, true);
}

@UdafFactory(description = "return the earliest N values of a string column",
aggregateSchema = "ARRAY<STRUCT<SEQ BIGINT, VAL STRING>>")
public static Udaf<String, List<Struct>, List<String>> earliestStrings(
final int earliestN,
final boolean ignoreNulls
) {
return earliestN(STRUCT_STRING, earliestN, ignoreNulls);
}


@VisibleForTesting
static <T> Struct createStruct(final Schema schema, final T val) {
final Struct struct = new Struct(schema);
struct.put(SEQ_FIELD, generateSequence());
struct.put(VAL_FIELD, val);
return struct;
return KudafByOffsetUtils.createStruct(schema, generateSequence(), val);
}

private static long generateSequence() {
return sequence.getAndIncrement();
}

static <T> Udaf<T, Struct, T> earliest(final Schema structSchema) {
@VisibleForTesting
static <T> Udaf<T, Struct, T> earliest(
final Schema structSchema,
final boolean ignoreNulls
) {
return new Udaf<T, Struct, T>() {

@Override
public Struct initialize() {
return createStruct(structSchema, null);
return null;
}

@Override
public Struct aggregate(final T current, final Struct aggregate) {
if (current == null || aggregate.get(VAL_FIELD) != null) {
if (aggregate != null) {
return aggregate;
} else {
return createStruct(structSchema, current);
}

if (current == null && ignoreNulls) {
return null;
}

return createStruct(structSchema, current);
}

@Override
public Struct merge(final Struct aggOne, final Struct aggTwo) {
if (aggOne == null) {
return aggTwo;
}

if (aggTwo == null) {
return aggOne;
}

// When merging we need some way of evaluating the "earliest' one.
// We do this by keeping track of the sequence of when it was originally processed
if (INTERMEDIATE_STRUCT_COMPARATOR.compare(aggOne, aggTwo) < 0) {
Expand All @@ -153,29 +242,34 @@ public Struct merge(final Struct aggOne, final Struct aggTwo) {
@Override
@SuppressWarnings("unchecked")
public T map(final Struct agg) {
if (agg == null) {
return null;
}

return (T) agg.get(VAL_FIELD);
}
};
}

@VisibleForTesting
static <T> Udaf<T, List<Struct>, List<T>> earliestN(
final Schema structSchema,
final int earliestN
final Schema structSchema,
final int earliestN,
final boolean ignoreNulls
) {

if (earliestN <= 0) {
throw new KsqlFunctionException("earliestN must be 1 or greater");
}

return new Udaf<T, List<Struct>, List<T>>() {
@Override
public List<Struct> initialize() {
return new ArrayList<Struct>(earliestN);
return new ArrayList<>(earliestN);
}

@Override
public List<Struct> aggregate(final T current, final List<Struct> aggregate) {
if (current == null) {
if (current == null && ignoreNulls) {
return aggregate;
}

Expand Down
Loading

0 comments on commit 6eb5a41

Please sign in to comment.