Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Update SUBSTRING function to accept BYTES types #7861

Merged
merged 2 commits into from
Jul 23, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions docs/developer-guide/ksqldb-reference/scalar-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -1047,12 +1047,13 @@ SUBSTRING(col1, 2, 5)

```sql
SUBSTRING(str, pos, [len])
SUBSTRING(bytes, pos, [len])
```

Returns a substring of `str` that starts at
`pos` (first character is at position 1) and
Returns the portion of `str` or `bytes` that starts at
`pos` (first character or byte is at position 1) and
has length `len`, or continues to the end of
the string.
the string or bytes.

For example, `SUBSTRING("stream", 1, 4)`
returns "stre".
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.Map;
import java.util.function.Function;
Expand Down Expand Up @@ -91,9 +92,13 @@ public static byte[] getByteArray(final ByteBuffer buffer) {
return null;
}

// ByteBuffer.array() throws an exception if it is in read-only state. Protobuf usually
// returns ByteBuffer in read-only, so this util allows us to get the internal byte array.
if (buffer.isReadOnly()) {
// ByteBuffer.array() throws an exception if it is read-only or the array is null.
// Protobuf returns ByteBuffer as read-only, so this util allows us to get the internal
// byte array.
if (!buffer.hasArray()) {
// Reset internal array position to 0, which affects read-only buffers
buffer.clear();

final byte[] internalByteArray = new byte[buffer.capacity()];
buffer.get(internalByteArray);
return internalByteArray;
Expand All @@ -102,6 +107,10 @@ public static byte[] getByteArray(final ByteBuffer buffer) {
return buffer.array();
}

public static byte[] getByteArray(final ByteBuffer buffer, final int start, final int end) {
return Arrays.copyOfRange(getByteArray(buffer), start, end);
}

private static String hexEncoding(final byte[] value) {
return BaseEncoding.base16().encode(value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,48 @@ public void shouldReturnByteArrayOnWritableByteBuffer() {
// Then
assertThat(bytes, is(new byte[]{5}));
}

@Test
public void shouldReturnFullByteArrayWhenByteBufferPositionIsNotZero() {
// Given
final ByteBuffer buffer = ByteBuffer.wrap(new byte[]{5, 10, 15}).asReadOnlyBuffer();

// This moves the internal array position to the next element and affects when we get
// bytes from read-only buffers
buffer.get();

// When
final byte[] bytes = BytesUtils.getByteArray(buffer);

// Then
assertThat(bytes, is(new byte[]{5, 10, 15}));
}

@Test
public void shouldReturnSubArray() {
// Given
final ByteBuffer buffer = ByteBuffer.wrap(new byte[]{1, 2, 3, 4});

// When
final byte[] bytes = BytesUtils.getByteArray(buffer, 1, 3);

// Then
assertThat(bytes, is(new byte[]{2, 3}));
}

@Test
public void shouldReturnSubArrayWhenByteBufferPositionIsNotZero() {
// Given
final ByteBuffer buffer = ByteBuffer.wrap(new byte[]{1, 2, 3, 4}).asReadOnlyBuffer();

// This moves the internal array position to the next element and affects when we get
// bytes from read-only buffers
buffer.get();

// When
final byte[] bytes = BytesUtils.getByteArray(buffer, 1, 3);

// Then
assertThat(bytes, is(new byte[]{2, 3}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.confluent.ksql.function.FunctionCategory;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.BytesUtils;
import io.confluent.ksql.util.KsqlConstants;

Expand All @@ -32,7 +33,9 @@
public class FromBytes {
@Udf(description = "Converts a BYTES value to STRING in the specified encoding. "
+ "The accepted encoders are 'hex', 'utf8', 'ascii', and 'base64'.")
public String fromBytes(final ByteBuffer value, final String encoding) {
public String fromBytes(
@UdfParameter(description = "The bytes value to convert.") final ByteBuffer value,
@UdfParameter(description = "The encoding to use on conversion.") final String encoding) {
return (value == null) ? null : BytesUtils.encode(BytesUtils.getByteArray(value), encoding);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,33 @@
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.BytesUtils;
import io.confluent.ksql.util.KsqlConstants;

import java.nio.ByteBuffer;

@SuppressWarnings("unused") // Invoked via reflection.
@UdfDescription(
name = "substring",
category = FunctionCategory.STRING,
author = KsqlConstants.CONFLUENT_AUTHOR,
description = "Returns a substring of the passed in value."
description = "Returns the portion of the string or bytes passed in value."
)
public class Substring {

@Udf(description = "Returns a substring of str from pos to the end of str")
@Udf(description = "Returns the portion of str from pos to the end of str")
public String substring(
@UdfParameter(description = "The source string.") final String str,
@UdfParameter(description = "The base-one position to start from.") final Integer pos
) {
if (str == null || pos == null) {
return null;
}
final int start = getStartIndex(str, pos);
final int start = getStartIndex(str.length(), pos);
return str.substring(start);
}

@Udf(description = "Returns a substring of str that starts at pos and is of length len")
@Udf(description = "Returns the portion of str that starts at pos and is of length len")
public String substring(
@UdfParameter(description = "The source string.") final String str,
@UdfParameter(description = "The base-one position to start from.") final Integer pos,
Expand All @@ -51,18 +54,49 @@ public String substring(
if (str == null || pos == null || length == null) {
return null;
}
final int start = getStartIndex(str, pos);
final int end = getEndIndex(str, start, length);
final int start = getStartIndex(str.length(), pos);
final int end = getEndIndex(str.length(), start, length);
return str.substring(start, end);
}

private static int getStartIndex(final String value, final Integer pos) {
@Udf(description = "Returns the portion of the bytes value from pos to the end of the "
+ "bytes value")
public ByteBuffer substring(
@UdfParameter(description = "The source bytes.") final ByteBuffer bytes,
@UdfParameter(description = "The base-one position to start from.") final Integer pos
) {
if (bytes == null || pos == null) {
return null;
}

final int start = getStartIndex(bytes.capacity(), pos);
final int end = bytes.capacity();
return ByteBuffer.wrap(BytesUtils.getByteArray(bytes, start, end));
}

@Udf(description = "Returns the portion of the bytes value that starts at pos and is of "
+ "length len")
public ByteBuffer substring(
@UdfParameter(description = "The source bytes.") final ByteBuffer bytes,
@UdfParameter(description = "The base-one position to start from.") final Integer pos,
@UdfParameter(description = "The length to extract.") final Integer length
) {
if (bytes == null || pos == null) {
return null;
}

final int start = getStartIndex(bytes.capacity(), pos);
final int end = getEndIndex(bytes.capacity(), start, length);
return ByteBuffer.wrap(BytesUtils.getByteArray(bytes, start, end));
}

private static int getStartIndex(final int valueLength, final Integer pos) {
return pos < 0
? Math.max(value.length() + pos, 0)
: Math.max(Math.min(pos - 1, value.length()), 0);
? Math.max(valueLength + pos, 0)
: Math.max(Math.min(pos - 1, valueLength), 0);
}

private static int getEndIndex(final String value, final int start, final int length) {
return Math.max(Math.min(start + length, value.length()), start);
private static int getEndIndex(final int valueLength, final int start, final int length) {
return Math.max(Math.min(start + length, valueLength), start);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.confluent.ksql.function.FunctionCategory;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.BytesUtils;
import io.confluent.ksql.util.KsqlConstants;

Expand All @@ -32,7 +33,9 @@
public class ToBytes {
@Udf(description = "Converts a STRING value in the specified encoding to BYTES. "
+ "The accepted encoders are 'hex', 'utf8', 'ascii', and 'base64'.")
public ByteBuffer toBytes(final String value, final String encoding) {
public ByteBuffer toBytes(
@UdfParameter(description = "The string to convert.") final String value,
@UdfParameter(description = "The type of encoding.") final String encoding) {
return (value == null) ? null : ByteBuffer.wrap(BytesUtils.decode(value, encoding));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.junit.Before;
import org.junit.Test;

import java.nio.ByteBuffer;

public class SubstringTest {

private Substring udf;
Expand All @@ -32,38 +34,83 @@ public void setUp() {
}

@Test
public void shouldReturnNullOnNullValue() {
assertThat(udf.substring(null, 1), is(nullValue()));
assertThat(udf.substring(null, 1, 1), is(nullValue()));
public void shouldReturnNullOnStringNullValue() {
assertThat(udf.substring((String) null, 1), is(nullValue()));
assertThat(udf.substring((String) null, 1, 1), is(nullValue()));
assertThat(udf.substring("some string", null, 1), is(nullValue()));
assertThat(udf.substring("some string", 1, null), is(nullValue()));
}

@Test
public void shouldUseOneBasedIndexing() {
public void shouldReturnNullOnBytesNullValue() {
assertThat(udf.substring((ByteBuffer) null, 1), is(nullValue()));
assertThat(udf.substring((ByteBuffer) null, 1, 1), is(nullValue()));
assertThat(udf.substring("some string", null, 1), is(nullValue()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first argument should be a ByteBuffer

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

assertThat(udf.substring("some string", 1, null), is(nullValue()));
}

@Test
public void shouldUseOneBasedIndexingOnString() {
assertThat(udf.substring("a test string", 1, 1), is("a"));
assertThat(udf.substring("a test string", -1, 1), is("g"));
}

@Test
public void shouldExtractFromStartForPositivePositions() {
public void shouldUseOneBasedIndexingOnBytes() {
assertThat(udf.substring(ByteBuffer.wrap(new byte[]{1,2,3,4}), 1, 1),
is(ByteBuffer.wrap(new byte[]{1})));
assertThat(udf.substring(ByteBuffer.wrap(new byte[]{1,2,3,4}), -1, 1),
is(ByteBuffer.wrap(new byte[]{4})));
}

@Test
public void shouldExtractFromStartForPositivePositionsOnStrings() {
assertThat(udf.substring("a test string", 3), is("test string"));
assertThat(udf.substring("a test string", 3, 4), is("test"));
}

@Test
public void shouldExtractFromEndForNegativePositions() {
public void shouldExtractFromStartForPositivePositionsOnBytes() {
assertThat(udf.substring(ByteBuffer.wrap(new byte[]{1,2,3,4}), 3),
is(ByteBuffer.wrap(new byte[]{3,4})));
assertThat(udf.substring(ByteBuffer.wrap(new byte[]{1,2,3,4}), 3, 4),
is(ByteBuffer.wrap(new byte[]{3,4})));
}

@Test
public void shouldExtractFromEndForNegativePositionsOnStrings() {
assertThat(udf.substring("a test string", -6), is("string"));
assertThat(udf.substring("a test string", -6, 2), is("st"));
}

@Test
public void shouldTruncateOutOfBoundIndexes() {
public void shouldExtractFromEndForNegativePositionsOnBytes() {
assertThat(udf.substring(ByteBuffer.wrap(new byte[]{1,2,3,4}), -3),
is(ByteBuffer.wrap(new byte[]{2,3,4})));
assertThat(udf.substring(ByteBuffer.wrap(new byte[]{1,2,3,4}), -3, 3),
is(ByteBuffer.wrap(new byte[]{2,3,4})));
}

@Test
public void shouldTruncateOutOfBoundIndexesOnStrings() {
assertThat(udf.substring("a test string", 0), is("a test string"));
assertThat(udf.substring("a test string", 100), is(""));
assertThat(udf.substring("a test string", -100), is("a test string"));
assertThat(udf.substring("a test string", 3, 100), is("test string"));
assertThat(udf.substring("a test string", 3, -100), is(""));
}

@Test
public void shouldTruncateOutOfBoundIndexesOnBytes() {
assertThat(udf.substring(ByteBuffer.wrap(new byte[]{1,2,3,4}), 0),
is(ByteBuffer.wrap(new byte[]{1,2,3,4})));
assertThat(udf.substring(ByteBuffer.wrap(new byte[]{1,2,3,4}), 100),
is(ByteBuffer.wrap(new byte[]{})));
assertThat(udf.substring(ByteBuffer.wrap(new byte[]{1,2,3,4}), -100),
is(ByteBuffer.wrap(new byte[]{1,2,3,4})));
assertThat(udf.substring(ByteBuffer.wrap(new byte[]{1,2,3,4}), 3, 100),
is(ByteBuffer.wrap(new byte[]{3,4})));
assertThat(udf.substring(ByteBuffer.wrap(new byte[]{1,2,3,4}), 3, -100),
is(ByteBuffer.wrap(new byte[]{})));
}
}
Loading