Skip to content

Commit

Permalink
feat: enables BYTES for CONCAT and CONCAT_WS (confluentinc#7876)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zara Lim authored and Sullivan-Patrick committed Jul 28, 2021
1 parent fccc56d commit e249c9c
Show file tree
Hide file tree
Showing 15 changed files with 1,046 additions and 22 deletions.
5 changes: 3 additions & 2 deletions docs/developer-guide/ksqldb-reference/scalar-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -650,9 +650,10 @@ Since: -

```sql
CONCAT(col1, col2, 'hello', ..., col-n)
CONCAT(bytes1, bytes2, ..., bytes-n)
```

Concatenate two or more string expressions. Any input strings which evaluate to NULL are replaced with empty string in the output.
Concatenate two or more string or bytes expressions. Any inputs which evaluate to NULL are replaced with an empty string or bytes in the output.

### `CONCAT_WS`

Expand All @@ -662,7 +663,7 @@ Since: 0.10.0
CONCAT_WS(separator, expr1, expr2, ...)
```

Concatenates two or more string expressions, inserting a separator string between each.
Concatenates two or more string or bytes expressions, inserting a separator string or bytes between each.

If the separator is NULL, this function returns NULL.
Any expressions which evaluate to NULL are skipped.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Objects;
import java.util.stream.Collectors;

@UdfDescription(
name = "concat",
category = FunctionCategory.STRING,
description = "Concatenate an arbitrary number of string fields together")
description = "Concatenate an arbitrary number of string or bytes fields together")
public class Concat {

@Udf
Expand All @@ -41,4 +42,28 @@ public String concat(@UdfParameter(
.collect(Collectors.joining());
}

@Udf
public ByteBuffer concat(@UdfParameter(
description = "The bytes fields to concatenate") final ByteBuffer... inputs) {
if (inputs == null) {
return null;
}

int capacity = 0;

for (final ByteBuffer bytes : inputs) {
if (Objects.nonNull(bytes)) {
capacity += bytes.capacity();
}
}

final ByteBuffer concatenated = ByteBuffer.allocate(capacity);
Arrays.stream(inputs)
.filter(Objects::nonNull)
.forEachOrdered(bytes -> concatenated.put(bytes));

concatenated.rewind();
return concatenated;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,22 @@
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

@UdfDescription(
name = "concat_ws",
category = FunctionCategory.STRING,
description = "Concatenate several strings, inserting a separator string passed as the "
description = "Concatenate several strings or bytes, inserting a separator passed as the "
+ "first argument between each one.")
public class ConcatWS {

private static final Concat CONCAT = new Concat();

@Udf
public String concatWS(
@UdfParameter(description = "Separator string and values to join") final String... inputs) {
Expand All @@ -47,4 +52,30 @@ public String concatWS(
.filter(Objects::nonNull)
.collect(Collectors.joining(separator));
}

@Udf
public ByteBuffer concatWS(
@UdfParameter(description = "Separator and bytes values to join")
final ByteBuffer... inputs) {
if (inputs == null || inputs.length < 2) {
throw new KsqlFunctionException("Function Concat_WS expects at least two input arguments.");
}

final ByteBuffer separator = inputs[0];
if (separator == null) {
return null;
}

final List<ByteBuffer> concatInputs = new ArrayList<>();
for (int i = 1; i < inputs.length; i++) {
if (Objects.nonNull(inputs[i])) {
if (concatInputs.size() != 0) {
concatInputs.add(separator.duplicate());
}
concatInputs.add(inputs[i]);
}
}

return CONCAT.concat(concatInputs.toArray(new ByteBuffer[0]));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

import java.nio.ByteBuffer;
import org.junit.Before;
import org.junit.Test;

Expand All @@ -35,25 +36,36 @@ public void shouldConcatStrings() {
assertThat(udf.concat("The", "Quick", "Brown", "Fox"), is("TheQuickBrownFox"));
}

@Test
public void shouldConcatBytes() {
assertThat(udf.concat(ByteBuffer.wrap(new byte[] {1}), ByteBuffer.wrap(new byte[] {2}), ByteBuffer.wrap(new byte[] {3})),
is(ByteBuffer.wrap(new byte[] {1, 2, 3})));
}

@Test
public void shouldIgnoreNullInputs() {
assertThat(udf.concat(null, "this ", null, "should ", null, "work!", null),
is("this should work!"));
assertThat(udf.concat(null, ByteBuffer.wrap(new byte[] {1}), null, ByteBuffer.wrap(new byte[] {2}), null),
is(ByteBuffer.wrap(new byte[] {1, 2})));
}

@Test
public void shouldReturnEmptyStringIfAllInputsNull() {
assertThat(udf.concat(null, null), is(""));
public void shouldReturnEmptyIfAllInputsNull() {
assertThat(udf.concat((String) null, null), is(""));
assertThat(udf.concat((ByteBuffer) null, null), is(ByteBuffer.wrap(new byte[] {})));
}

@Test
public void shouldReturnSingleInput() {
assertThat(udf.concat("singular"), is("singular"));
assertThat(udf.concat(ByteBuffer.wrap(new byte[] {2})), is(ByteBuffer.wrap(new byte[] {2})));
}

@Test
public void shouldReturnEmptyStringForSingleNullInput() {
public void shouldReturnEmptyForSingleNullInput() {
assertThat(udf.concat((String) null), is(""));
assertThat(udf.concat((ByteBuffer) null), is(ByteBuffer.wrap(new byte[] {})));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@

import io.confluent.ksql.function.KsqlFunctionException;
import io.confluent.ksql.util.KsqlException;
import java.nio.ByteBuffer;
import org.junit.Before;
import org.junit.Test;

public class ConcatWSTest {

private ConcatWS udf;
private static final ByteBuffer EMPTY_BYTES = ByteBuffer.wrap(new byte[] {});

@Before
public void setUp() {
Expand All @@ -39,6 +41,12 @@ public void shouldConcatStrings() {
assertThat(udf.concatWS(" ", "The", "Quick", "Brown", "Fox"), is("The Quick Brown Fox"));
}

@Test
public void shouldConcatBytes() {
assertThat(udf.concatWS(ByteBuffer.wrap(new byte[] {1}), ByteBuffer.wrap(new byte[] {2}), ByteBuffer.wrap(new byte[] {3})),
is(ByteBuffer.wrap(new byte[] {2, 1, 3})));
}

@Test
public void shouldConcatLongerSeparator() {
final String result = udf.concatWS("SEP", "foo", "bar", "baz");
Expand All @@ -48,51 +56,64 @@ public void shouldConcatLongerSeparator() {
@Test
public void shouldReturnSingleInputUnchanged() {
assertThat(udf.concatWS("SEP", "singular"), is("singular"));
assertThat(udf.concatWS(ByteBuffer.wrap(new byte[] {1}), ByteBuffer.wrap(new byte[] {2})), is(ByteBuffer.wrap(new byte[] {2})));
}

@Test
public void shouldReturnNullForNullSeparator() {
final Object result = udf.concatWS(null, "foo", "bar");
assertThat(result, is(nullValue()));
assertThat(udf.concatWS(null, "foo", "bar"), is(nullValue()));
assertThat(udf.concatWS(null, ByteBuffer.wrap(new byte[] {1}), ByteBuffer.wrap(new byte[] {2})), is(nullValue()));
}

@Test
public void shouldReturnEmptyStringIfAllInputsNull() {
final Object result = udf.concatWS("SEP", null, null);
assertThat(result, is(""));
public void shouldReturnEmptyIfAllInputsNull() {
assertThat(udf.concatWS("SEP", null, null), is(""));
assertThat(udf.concatWS(ByteBuffer.wrap(new byte[] {2}), null, null), is(EMPTY_BYTES));
}

@Test
public void shouldSkipAnyNullInputs() {
final Object result = udf.concatWS("SEP", "foo", null, "bar");
assertThat(result, is("fooSEPbar"));
assertThat(udf.concatWS("SEP", "foo", null, "bar"), is("fooSEPbar"));
assertThat(udf.concatWS(ByteBuffer.wrap(new byte[] {1}), ByteBuffer.wrap(new byte[] {2}), null, ByteBuffer.wrap(new byte[] {3})),
is(ByteBuffer.wrap(new byte[] {2, 1, 3})));
}

@Test
public void shouldFailIfOnlySeparatorInput() {
public void shouldFailIfOnlySeparatorStringInput() {
// When:
final KsqlException e = assertThrows(KsqlFunctionException.class, () -> udf.concatWS("SEP"));

// Then:
assertThat(e.getMessage(), containsString("expects at least two input arguments"));
}

@Test
public void shouldFailIfOnlySeparatorBytesInput() {
// When:
final KsqlException e = assertThrows(KsqlFunctionException.class, () -> udf.concatWS(ByteBuffer.wrap(new byte[] {3})));

// Then:
assertThat(e.getMessage(), containsString("expects at least two input arguments"));
}

@Test
public void shouldWorkWithEmptySeparator() {
final Object result = udf.concatWS("", "foo", "bar");
assertThat(result, is("foobar"));
assertThat(udf.concatWS("", "foo", "bar"), is("foobar"));
assertThat(udf.concatWS(EMPTY_BYTES, ByteBuffer.wrap(new byte[] {1}), ByteBuffer.wrap(new byte[] {2})),
is(ByteBuffer.wrap(new byte[] {1, 2})));
}

@Test
public void shouldHandleEmptyInputs() {
final Object result = udf.concatWS("SEP", "foo", "", "bar");
assertThat(result, is("fooSEPSEPbar"));
assertThat(udf.concatWS("SEP", "foo", "", "bar"), is("fooSEPSEPbar"));
assertThat(udf.concatWS(ByteBuffer.wrap(new byte[] {1}), ByteBuffer.wrap(new byte[] {2}), EMPTY_BYTES, ByteBuffer.wrap(new byte[] {3})),
is(ByteBuffer.wrap(new byte[] {2, 1, 1, 3})));
}

@Test
public void shouldReturnEmptyIfEverythingEmpty() {
final Object result = udf.concatWS("", "", "");
assertThat(result, is(""));
assertThat(udf.concatWS("", "", ""), is(""));
assertThat(udf.concatWS(EMPTY_BYTES, EMPTY_BYTES, EMPTY_BYTES), is(EMPTY_BYTES));
}

}
Loading

0 comments on commit e249c9c

Please sign in to comment.