Skip to content

Commit

Permalink
feat: Update SUBSTRING function to accept BYTES types (#7861)
Browse files Browse the repository at this point in the history
  • Loading branch information
spena authored Jul 23, 2021
1 parent 7a3dc54 commit fccc56d
Show file tree
Hide file tree
Showing 49 changed files with 2,037 additions and 67 deletions.
7 changes: 4 additions & 3 deletions docs/developer-guide/ksqldb-reference/scalar-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -1047,12 +1047,13 @@ SUBSTRING(col1, 2, 5)

```sql
SUBSTRING(str, pos, [len])
SUBSTRING(bytes, pos, [len])
```

Returns a substring of `str` that starts at
`pos` (first character is at position 1) and
Returns the portion of `str` or `bytes` that starts at
`pos` (first character or byte is at position 1) and
has length `len`, or continues to the end of
the string.
the string or bytes.

For example, `SUBSTRING("stream", 1, 4)`
returns "stre".
Expand Down
4 changes: 2 additions & 2 deletions ksqldb-cli/src/test/java/io/confluent/ksql/cli/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1039,7 +1039,7 @@ public void shouldDescribeOverloadedScalarFunction() {
assertThat(output, containsString(
"Name : SUBSTRING\n"
+ "Author : Confluent\n"
+ "Overview : Returns a substring of the passed in value.\n"
+ "Overview : Returns the portion of the string or bytes passed in value.\n"
));
assertThat(output, containsString(
"Type : SCALAR\n"
Expand All @@ -1051,7 +1051,7 @@ public void shouldDescribeOverloadedScalarFunction() {
assertThat(output, containsString(
"\tVariation : SUBSTRING(str VARCHAR, pos INT)\n"
+ "\tReturns : VARCHAR\n"
+ "\tDescription : Returns a substring of str from pos to the end of str"
+ "\tDescription : Returns the portion of str from pos to the end of str"
));
assertThat(output, containsString(
"\tstr : The source string.\n"
Expand Down
15 changes: 12 additions & 3 deletions ksqldb-common/src/main/java/io/confluent/ksql/util/BytesUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.Map;
import java.util.function.Function;
Expand Down Expand Up @@ -91,9 +92,13 @@ public static byte[] getByteArray(final ByteBuffer buffer) {
return null;
}

// ByteBuffer.array() throws an exception if it is in read-only state. Protobuf usually
// returns ByteBuffer in read-only, so this util allows us to get the internal byte array.
if (buffer.isReadOnly()) {
// ByteBuffer.array() throws an exception if it is read-only or the array is null.
// Protobuf returns ByteBuffer as read-only, so this util allows us to get the internal
// byte array.
if (!buffer.hasArray()) {
// Reset internal array position to 0, which affects read-only buffers
buffer.clear();

final byte[] internalByteArray = new byte[buffer.capacity()];
buffer.get(internalByteArray);
return internalByteArray;
Expand All @@ -102,6 +107,10 @@ public static byte[] getByteArray(final ByteBuffer buffer) {
return buffer.array();
}

public static byte[] getByteArray(final ByteBuffer buffer, final int start, final int end) {
return Arrays.copyOfRange(getByteArray(buffer), start, end);
}

private static String hexEncoding(final byte[] value) {
return BaseEncoding.base16().encode(value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,48 @@ public void shouldReturnByteArrayOnWritableByteBuffer() {
// Then
assertThat(bytes, is(new byte[]{5}));
}

@Test
public void shouldReturnFullByteArrayWhenByteBufferPositionIsNotZero() {
// Given
final ByteBuffer buffer = ByteBuffer.wrap(new byte[]{5, 10, 15}).asReadOnlyBuffer();

// This moves the internal array position to the next element and affects when we get
// bytes from read-only buffers
buffer.get();

// When
final byte[] bytes = BytesUtils.getByteArray(buffer);

// Then
assertThat(bytes, is(new byte[]{5, 10, 15}));
}

@Test
public void shouldReturnSubArray() {
// Given
final ByteBuffer buffer = ByteBuffer.wrap(new byte[]{1, 2, 3, 4});

// When
final byte[] bytes = BytesUtils.getByteArray(buffer, 1, 3);

// Then
assertThat(bytes, is(new byte[]{2, 3}));
}

@Test
public void shouldReturnSubArrayWhenByteBufferPositionIsNotZero() {
// Given
final ByteBuffer buffer = ByteBuffer.wrap(new byte[]{1, 2, 3, 4}).asReadOnlyBuffer();

// This moves the internal array position to the next element and affects when we get
// bytes from read-only buffers
buffer.get();

// When
final byte[] bytes = BytesUtils.getByteArray(buffer, 1, 3);

// Then
assertThat(bytes, is(new byte[]{2, 3}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.confluent.ksql.function.FunctionCategory;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.BytesUtils;
import io.confluent.ksql.util.KsqlConstants;

Expand All @@ -32,7 +33,9 @@
public class FromBytes {
@Udf(description = "Converts a BYTES value to STRING in the specified encoding. "
+ "The accepted encoders are 'hex', 'utf8', 'ascii', and 'base64'.")
public String fromBytes(final ByteBuffer value, final String encoding) {
public String fromBytes(
@UdfParameter(description = "The bytes value to convert.") final ByteBuffer value,
@UdfParameter(description = "The encoding to use on conversion.") final String encoding) {
return (value == null) ? null : BytesUtils.encode(BytesUtils.getByteArray(value), encoding);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,33 @@
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.BytesUtils;
import io.confluent.ksql.util.KsqlConstants;

import java.nio.ByteBuffer;

@SuppressWarnings("unused") // Invoked via reflection.
@UdfDescription(
name = "substring",
category = FunctionCategory.STRING,
author = KsqlConstants.CONFLUENT_AUTHOR,
description = "Returns a substring of the passed in value."
description = "Returns the portion of the string or bytes passed in value."
)
public class Substring {

@Udf(description = "Returns a substring of str from pos to the end of str")
@Udf(description = "Returns the portion of str from pos to the end of str")
public String substring(
@UdfParameter(description = "The source string.") final String str,
@UdfParameter(description = "The base-one position to start from.") final Integer pos
) {
if (str == null || pos == null) {
return null;
}
final int start = getStartIndex(str, pos);
final int start = getStartIndex(str.length(), pos);
return str.substring(start);
}

@Udf(description = "Returns a substring of str that starts at pos and is of length len")
@Udf(description = "Returns the portion of str that starts at pos and is of length len")
public String substring(
@UdfParameter(description = "The source string.") final String str,
@UdfParameter(description = "The base-one position to start from.") final Integer pos,
Expand All @@ -51,18 +54,49 @@ public String substring(
if (str == null || pos == null || length == null) {
return null;
}
final int start = getStartIndex(str, pos);
final int end = getEndIndex(str, start, length);
final int start = getStartIndex(str.length(), pos);
final int end = getEndIndex(str.length(), start, length);
return str.substring(start, end);
}

private static int getStartIndex(final String value, final Integer pos) {
@Udf(description = "Returns the portion of the bytes value from pos to the end of the "
+ "bytes value")
public ByteBuffer substring(
@UdfParameter(description = "The source bytes.") final ByteBuffer bytes,
@UdfParameter(description = "The base-one position to start from.") final Integer pos
) {
if (bytes == null || pos == null) {
return null;
}

final int start = getStartIndex(bytes.capacity(), pos);
final int end = bytes.capacity();
return ByteBuffer.wrap(BytesUtils.getByteArray(bytes, start, end));
}

@Udf(description = "Returns the portion of the bytes value that starts at pos and is of "
+ "length len")
public ByteBuffer substring(
@UdfParameter(description = "The source bytes.") final ByteBuffer bytes,
@UdfParameter(description = "The base-one position to start from.") final Integer pos,
@UdfParameter(description = "The length to extract.") final Integer length
) {
if (bytes == null || pos == null || length == null) {
return null;
}

final int start = getStartIndex(bytes.capacity(), pos);
final int end = getEndIndex(bytes.capacity(), start, length);
return ByteBuffer.wrap(BytesUtils.getByteArray(bytes, start, end));
}

private static int getStartIndex(final int valueLength, final Integer pos) {
return pos < 0
? Math.max(value.length() + pos, 0)
: Math.max(Math.min(pos - 1, value.length()), 0);
? Math.max(valueLength + pos, 0)
: Math.max(Math.min(pos - 1, valueLength), 0);
}

private static int getEndIndex(final String value, final int start, final int length) {
return Math.max(Math.min(start + length, value.length()), start);
private static int getEndIndex(final int valueLength, final int start, final int length) {
return Math.max(Math.min(start + length, valueLength), start);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.confluent.ksql.function.FunctionCategory;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.BytesUtils;
import io.confluent.ksql.util.KsqlConstants;

Expand All @@ -32,7 +33,9 @@
public class ToBytes {
@Udf(description = "Converts a STRING value in the specified encoding to BYTES. "
+ "The accepted encoders are 'hex', 'utf8', 'ascii', and 'base64'.")
public ByteBuffer toBytes(final String value, final String encoding) {
public ByteBuffer toBytes(
@UdfParameter(description = "The string to convert.") final String value,
@UdfParameter(description = "The type of encoding.") final String encoding) {
return (value == null) ? null : ByteBuffer.wrap(BytesUtils.decode(value, encoding));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.junit.Before;
import org.junit.Test;

import java.nio.ByteBuffer;

public class SubstringTest {

private Substring udf;
Expand All @@ -32,38 +34,83 @@ public void setUp() {
}

@Test
public void shouldReturnNullOnNullValue() {
assertThat(udf.substring(null, 1), is(nullValue()));
assertThat(udf.substring(null, 1, 1), is(nullValue()));
public void shouldReturnNullOnStringNullValue() {
assertThat(udf.substring((String) null, 1), is(nullValue()));
assertThat(udf.substring((String) null, 1, 1), is(nullValue()));
assertThat(udf.substring("some string", null, 1), is(nullValue()));
assertThat(udf.substring("some string", 1, null), is(nullValue()));
}

@Test
public void shouldUseOneBasedIndexing() {
public void shouldReturnNullOnBytesNullValue() {
assertThat(udf.substring((ByteBuffer) null, 1), is(nullValue()));
assertThat(udf.substring((ByteBuffer) null, 1, 1), is(nullValue()));
assertThat(udf.substring(ByteBuffer.wrap(new byte[]{1}), null, 1), is(nullValue()));
assertThat(udf.substring(ByteBuffer.wrap(new byte[]{1}), 1, null), is(nullValue()));
}

@Test
public void shouldUseOneBasedIndexingOnString() {
assertThat(udf.substring("a test string", 1, 1), is("a"));
assertThat(udf.substring("a test string", -1, 1), is("g"));
}

@Test
public void shouldExtractFromStartForPositivePositions() {
public void shouldUseOneBasedIndexingOnBytes() {
assertThat(udf.substring(ByteBuffer.wrap(new byte[]{1,2,3,4}), 1, 1),
is(ByteBuffer.wrap(new byte[]{1})));
assertThat(udf.substring(ByteBuffer.wrap(new byte[]{1,2,3,4}), -1, 1),
is(ByteBuffer.wrap(new byte[]{4})));
}

@Test
public void shouldExtractFromStartForPositivePositionsOnStrings() {
assertThat(udf.substring("a test string", 3), is("test string"));
assertThat(udf.substring("a test string", 3, 4), is("test"));
}

@Test
public void shouldExtractFromEndForNegativePositions() {
public void shouldExtractFromStartForPositivePositionsOnBytes() {
assertThat(udf.substring(ByteBuffer.wrap(new byte[]{1,2,3,4}), 3),
is(ByteBuffer.wrap(new byte[]{3,4})));
assertThat(udf.substring(ByteBuffer.wrap(new byte[]{1,2,3,4}), 3, 4),
is(ByteBuffer.wrap(new byte[]{3,4})));
}

@Test
public void shouldExtractFromEndForNegativePositionsOnStrings() {
assertThat(udf.substring("a test string", -6), is("string"));
assertThat(udf.substring("a test string", -6, 2), is("st"));
}

@Test
public void shouldTruncateOutOfBoundIndexes() {
public void shouldExtractFromEndForNegativePositionsOnBytes() {
assertThat(udf.substring(ByteBuffer.wrap(new byte[]{1,2,3,4}), -3),
is(ByteBuffer.wrap(new byte[]{2,3,4})));
assertThat(udf.substring(ByteBuffer.wrap(new byte[]{1,2,3,4}), -3, 3),
is(ByteBuffer.wrap(new byte[]{2,3,4})));
}

@Test
public void shouldTruncateOutOfBoundIndexesOnStrings() {
assertThat(udf.substring("a test string", 0), is("a test string"));
assertThat(udf.substring("a test string", 100), is(""));
assertThat(udf.substring("a test string", -100), is("a test string"));
assertThat(udf.substring("a test string", 3, 100), is("test string"));
assertThat(udf.substring("a test string", 3, -100), is(""));
}

@Test
public void shouldTruncateOutOfBoundIndexesOnBytes() {
assertThat(udf.substring(ByteBuffer.wrap(new byte[]{1,2,3,4}), 0),
is(ByteBuffer.wrap(new byte[]{1,2,3,4})));
assertThat(udf.substring(ByteBuffer.wrap(new byte[]{1,2,3,4}), 100),
is(ByteBuffer.wrap(new byte[]{})));
assertThat(udf.substring(ByteBuffer.wrap(new byte[]{1,2,3,4}), -100),
is(ByteBuffer.wrap(new byte[]{1,2,3,4})));
assertThat(udf.substring(ByteBuffer.wrap(new byte[]{1,2,3,4}), 3, 100),
is(ByteBuffer.wrap(new byte[]{3,4})));
assertThat(udf.substring(ByteBuffer.wrap(new byte[]{1,2,3,4}), 3, -100),
is(ByteBuffer.wrap(new byte[]{})));
}
}
Loading

0 comments on commit fccc56d

Please sign in to comment.