Skip to content

Commit

Permalink
feat: add ARRAY_CONCAT UDF
Browse files Browse the repository at this point in the history
  • Loading branch information
Patrick Stuedi committed Jul 7, 2021
1 parent 24ff537 commit e421153
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 1 deletion.
18 changes: 18 additions & 0 deletions docs/developer-guide/ksqldb-reference/scalar-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,24 @@ ARRAY_UNION(ARRAY[1, 2, 3, 1, 2], [4, 1]) => [1, 2, 3, 4]
ARRAY_UNION(ARRAY['apple', 'apple', NULL, 'cherry'], ARRAY['cherry']) => ['apple', NULL, 'cherry']
```

### `ARRAY_CONCAT`

Since: 0.20.0

```sql
ARRAY_CONCAT(array1, array2)
```

Returns an array representing the concatenation of both input arrays.

Returns NULL if both input arrays are NULL.

Examples:
```sql
ARRAY_CONCAT(ARRAY[1, 2, 3, 1, 2], [4, 1]) => [1, 2, 3, 1, 2, 4, 1]
ARRAY_CONCAT(ARRAY['apple', 'apple', NULL, 'cherry'], ARRAY['cherry']) => ['apple', 'apple', NULL, 'cherry', 'cherry']
```

### `AS_MAP`

Since: 0.6.0
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License; you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the
* License.
*/

package io.confluent.ksql.function.udf.array;

import io.confluent.ksql.function.FunctionCategory;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import java.util.ArrayList;
import java.util.List;


@UdfDescription(
name = "array_concat",
category = FunctionCategory.ARRAY,
description = "Concatenates two arrays, creating an array that contains all the elements"
+ "in the first array followed by all the elements in the second array."
+ " Returns NULL if both input arrays are NULL. "
+ "The two arrays must be of the same type.")
public class ArrayConcat {
@Udf
public <T> List<T> concat(
@UdfParameter(description = "First array of values") final List<T> left,
@UdfParameter(description = "Second array of values") final List<T> right) {
if (left == null && right == null) {
return null;
}
final List<T> result = new ArrayList(left.size() + right.size());
if (left != null) {
result.addAll(left);
}
if (right != null) {
result.addAll(right);
}
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License; you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the
* License.
*/

package io.confluent.ksql.function.udf.array;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;

import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.junit.Test;

public class ArrayConcatTest {

private final ArrayConcat udf = new ArrayConcat();

@Test
public void shouldConcatArraysOfLikeType() {
final List<String> input1 = Arrays.asList("foo", " ", "bar");
final List<String> input2 = Arrays.asList("baz");
final List<String> result = udf.concat(input1, input2);
assertThat(result, is(Arrays.asList("foo", " ", "bar", "baz")));
}

@Test
public void shouldReturnDuplicateValues() {
final List<String> input1 = Arrays.asList("foo", "foo", "bar");
final List<String> input2 = Arrays.asList("baz", "foo");
final List<String> result = udf.concat(input1, input2);
assertThat(result, is(Arrays.asList("foo", "foo", "bar", "baz", "foo")));
}

@Test
public void shouldConcatArraysContainingNulls() {
final List<String> input1 = Arrays.asList(null, "bar");
final List<String> input2 = Arrays.asList("foo");
final List<String> result = udf.concat(input1, input2);
assertThat(result, is(Arrays.asList(null, "bar", "foo")));
}

@Test
public void shouldConcatArraysBothContainingNulls() {
final List<String> input1 = Arrays.asList(null, "foo", "bar");
final List<String> input2 = Arrays.asList("foo", null);
final List<String> result = udf.concat(input1, input2);
assertThat(result, is(Arrays.asList(null, "foo", "bar", "foo", null)));
}

@Test
public void shouldConcatArraysOfOnlyNulls() {
final List<String> input1 = Arrays.asList(null, null);
final List<String> input2 = Arrays.asList(null, null, null);
final List<String> result = udf.concat(input1, input2);
assertThat(result, is(Arrays.asList(null, null, null, null, null)));
}

@Test
public void shouldReturnNonNullForNullRightInput() {
final List<String> input1 = Arrays.asList("foo");
final List<String> result = udf.concat(input1, null);
assertThat(result, is(Arrays.asList("foo")));
}

@Test
public void shouldReturnNullForNullLeftInput() {
final List<String> input1 = Arrays.asList("foo");
final List<String> result = udf.concat(null, input1);
assertThat(result, is(Arrays.asList("foo")));
}

@Test
public void shouldReturnNullForAllNullInputs() {
final List<Long> result = udf.concat((List<Long>) null, (List<Long>) null);
assertThat(result, is(nullValue()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,44 @@
{"topic": "OUTPUT", "key": "r5", "value": {"RESULT": null }},
{"topic": "OUTPUT", "key": "r6", "value": {"RESULT": null }}
]
}
},
{
"name": "array_concat with literals",
"statements": [
"CREATE STREAM INPUT (id STRING KEY, dummy INTEGER) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT id, array_concat(array['foo', 'bar', 'foo'], array['foo', 'baz']) as a1 FROM INPUT;"
],
"inputs": [
{"topic": "test_topic", "key": "r1", "value": {"dummy": 0 }},
{"topic": "test_topic", "key": "r2", "value": {"dummy": 0 }}
],
"outputs": [
{"topic": "OUTPUT", "key": "r1", "value": {"A1": ["foo", "bar", "foo", "foo", "baz"]}},
{"topic": "OUTPUT", "key": "r2", "value": {"A1": ["foo", "bar", "foo", "foo", "baz"]}}
]
},
{
"name": "array_concat with primitive type",
"statements": [
"CREATE STREAM INPUT (id STRING KEY, arr1 ARRAY<INT>, arr2 ARRAY<INT>) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT id, array_concat(arr1, arr2) as result FROM INPUT;"
],
"inputs": [
{"topic": "test_topic", "key": "r1", "value": {"arr1": [0,0,1,0,-1], "arr2": [1,-2,0]}},
{"topic": "test_topic", "key": "r2", "value": {"arr1": [0,0,1,0,-1], "arr2": [3,4]}},
{"topic": "test_topic", "key": "r3", "value": {"arr1": [], "arr2": [1,-2]}},
{"topic": "test_topic", "key": "r4", "value": {"arr1": [0,0,1,0,-1], "arr2": []}},
{"topic": "test_topic", "key": "r5", "value": {"arr1": null, "arr2": [1,-2]}},
{"topic": "test_topic", "key": "r6", "value": {"arr1": [0,0,1,0,-1], "arr2": null}}
],
"outputs": [
{"topic": "OUTPUT", "key": "r1", "value": {"RESULT": [0,0,1,0,-1,1,-2,0] }},
{"topic": "OUTPUT", "key": "r2", "value": {"RESULT": [0,0,1,0,-1,3,4] }},
{"topic": "OUTPUT", "key": "r3", "value": {"RESULT": [1,-2] }},
{"topic": "OUTPUT", "key": "r4", "value": {"RESULT": [0,0,1,0,-1] }},
{"topic": "OUTPUT", "key": "r5", "value": {"RESULT": [1,-2] }},
{"topic": "OUTPUT", "key": "r6", "value": {"RESULT": [0,0,1,0,-1] }}
]
}
]
}

0 comments on commit e421153

Please sign in to comment.