Skip to content

Commit

Permalink
feat: enable BYTES for LPAD and RPAD (#7909)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sullivan-Patrick authored Jul 30, 2021
1 parent 41a05d0 commit f0c23b1
Show file tree
Hide file tree
Showing 13 changed files with 1,101 additions and 31 deletions.
11 changes: 6 additions & 5 deletions docs/developer-guide/ksqldb-reference/scalar-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -809,10 +809,10 @@ Since: 0.10.0
LPAD(input, length, padding)
```

Pads the input string, beginning from the left, with the specified padding string, until the target length is reached.
If the input string is longer than the specified target length, it is truncated.
Pads the input string or bytes, starting from the left, with the specified padding of the same type until the target length is reached.
If the input is longer than the specified target length, it is truncated.

If the padding string is empty or NULL, or the target length is negative, NULL is returned.
If the padding string or byte array is empty or NULL, or the target length is negative, NULL is returned.

Examples:
```sql
Expand Down Expand Up @@ -989,9 +989,10 @@ Since: 0.10.0
RPAD(input, length, padding)
```

Pads the input string, starting from the end, with the specified padding string until the target length is reached. If the input string is longer than the specified target length it will be truncated.
Pads the input string or bytes, starting from the end, with the specified padding of the same type until the target length is reached.
If the input is longer than the specified target length, it is truncated.

If the padding string is empty or NULL, or the target length is negative, then NULL is returned.
If the padding string or byte array is empty or NULL, or the target length is negative, NULL is returned.

Examples:
```sql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.BytesUtils;
import java.nio.ByteBuffer;
import java.util.Arrays;

@UdfDescription(
name = "LPad",
category = FunctionCategory.STRING,
description = "Pads the input string, beginning from the left, with the specified padding"
+ " string until the target length is reached. If the input string is longer than the"
+ " specified target length it will be truncated. If the padding string is empty or"
+ " NULL, or the target length is negative, then NULL is returned.")
description = "Pads the input string or bytes, starting from the beginning, with the specified"
+ " padding string until the target length is reached. If the input string or bytes are"
+ " longer than the specified target length it will be truncated. If the padding string or"
+ " bytes are empty or NULL, or the target length is negative, then NULL is returned.")
public class LPad {

@Udf
Expand All @@ -50,4 +53,42 @@ public String lpad(
sb.setLength(targetLen);
return sb.toString();
}

@Udf
public ByteBuffer lpad(
@UdfParameter(description = "Bytes to be padded") final ByteBuffer input,
@UdfParameter(description = "Target length") final Integer targetLen,
@UdfParameter(description = "Padding bytes") final ByteBuffer padding) {

if (input == null) {
return null;
}
if (padding == null
|| padding.capacity() == 0
|| targetLen == null
|| targetLen < 0) {
return null;
}

final byte[] start = BytesUtils.getByteArray(input);

if (start.length > targetLen) {
return ByteBuffer.wrap(Arrays.copyOfRange(start, 0, targetLen));
}

final byte[] padded = new byte[targetLen];
final byte[] paddingArray = BytesUtils.getByteArray(padding);

for (int i = 0; i < targetLen; i++) {
final int padUpTo = targetLen - start.length;
if (i >= padUpTo) {
padded[i] = start[i - padUpTo];
} else {
final int paddingIndex = i % paddingArray.length;
padded[i] = paddingArray[paddingIndex];
}

}
return ByteBuffer.wrap(padded);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.BytesUtils;
import java.nio.ByteBuffer;
import java.util.Arrays;

@UdfDescription(
name = "RPad",
category = FunctionCategory.STRING,
description = "Pads the input string, starting from the end, with the specified padding"
+ " string until the target length is reached. If the input string is longer than the"
+ " specified target length it will be truncated. If the padding string is empty or"
+ " NULL, or the target length is negative, then NULL is returned.")
description = "Pads the input string or bytes, starting from the end, with the specified"
+ " padding string until the target length is reached. If the input string or bytes are"
+ " longer than the specified target length it will be truncated. If the padding string or"
+ " bytes are empty or NULL, or the target length is negative, then NULL is returned.")
public class RPad {

@Udf
Expand All @@ -49,4 +52,38 @@ public String rpad(
sb.setLength(targetLen);
return sb.toString();
}

@Udf
public ByteBuffer rpad(
@UdfParameter(description = "Bytes to be padded") final ByteBuffer input,
@UdfParameter(description = "Target length") final Integer targetLen,
@UdfParameter(description = "Padding bytes") final ByteBuffer padding) {

if (input == null) {
return null;
}
if (padding == null
|| padding.capacity() == 0
|| targetLen == null
|| targetLen < 0) {
return null;
}

final byte[] start = BytesUtils.getByteArray(input);

if (start.length > targetLen) {
return ByteBuffer.wrap(Arrays.copyOfRange(start, 0, targetLen));
}

final byte[] padded = new byte[targetLen];
final byte[] paddingArray = BytesUtils.getByteArray(padding);

System.arraycopy(start, 0, padded, 0, start.length);

for (int i = start.length; i < targetLen; i++) {
final int paddingIndex = (i - start.length) % paddingArray.length;
padded[i] = paddingArray[paddingIndex];
}
return ByteBuffer.wrap(padded);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,63 +18,134 @@
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;

import io.confluent.ksql.util.BytesUtils;
import java.nio.ByteBuffer;
import org.junit.Test;

public class LPadTest {
private final LPad udf = new LPad();
private static final ByteBuffer BYTES_123 = ByteBuffer.wrap(new byte[]{1,2,3});
private static final ByteBuffer BYTES_45 = ByteBuffer.wrap(new byte[]{4,5});
private static final ByteBuffer EMPTY_BYTES = ByteBuffer.wrap(new byte[]{});

@Test
public void shouldPadInput() {
public void shouldPadInputString() {
final String result = udf.lpad("foo", 7, "Bar");
assertThat(result, is("BarBfoo"));
}

@Test
public void shouldReturnNullForNullInput() {
public void shouldPadInputBytes() {
final ByteBuffer result = udf.lpad(BYTES_123, 7, BYTES_45);
assertThat(result, is(ByteBuffer.wrap(new byte[]{4,5,4,5,1,2,3})));
}

@Test
public void shouldAppendPartialPaddingString() {
final String result = udf.lpad("foo", 4, "Bar");
assertThat(result, is("Bfoo"));
}

@Test
public void shouldAppendPartialPaddingBytes() {
final ByteBuffer result = udf.lpad(BYTES_123, 4, BYTES_45);
assertThat(BytesUtils.getByteArray(result), is(new byte[]{4,1,2,3}));
}

@Test
public void shouldReturnNullForNullInputString() {
final String result = udf.lpad(null, 4, "foo");
assertThat(result, is(nullValue()));
}

@Test
public void shouldReturnNullForNullPadding() {
public void shouldReturnNullForNullInputBytes() {
final ByteBuffer result = udf.lpad(null, 4, BYTES_45);
assertThat(result, is(nullValue()));
}

@Test
public void shouldReturnNullForNullPaddingString() {
final String result = udf.lpad("foo", 4, null);
assertThat(result, is(nullValue()));
}

@Test
public void shouldReturnNullForEmptyPadding() {
public void shouldReturnNullForNullPaddingBytes() {
final ByteBuffer result = udf.lpad(BYTES_123, 4, null);
assertThat(result, is(nullValue()));
}

@Test
public void shouldReturnNullForEmptyPaddingString() {
final String result = udf.lpad("foo", 4, "");
assertThat(result, is(nullValue()));
}

@Test
public void shouldPadEmptyInput() {
public void shouldReturnNullForEmptyPaddingBytes() {
final ByteBuffer result = udf.lpad(BYTES_123, 4, EMPTY_BYTES);
assertThat(result, is(nullValue()));
}

@Test
public void shouldPadEmptyInputString() {
final String result = udf.lpad("", 4, "foo");
assertThat(result, is("foof"));
}

@Test
public void shouldTruncateInputIfTargetLengthTooSmall() {
public void shouldPadEmptyInputBytes() {
final ByteBuffer result = udf.lpad(EMPTY_BYTES, 4, BYTES_45);
assertThat(result, is(ByteBuffer.wrap(new byte[]{4,5,4,5})));
}

@Test
public void shouldTruncateInputIfTargetLengthTooSmallString() {
final String result = udf.lpad("foo", 2, "bar");
assertThat(result, is("fo"));
}

@Test
public void shouldReturnNullForNegativeLength() {
public void shouldTruncateInputIfTargetLengthTooSmallBytes() {
final ByteBuffer result = udf.lpad(BYTES_123, 2, BYTES_45);
assertThat(result, is(ByteBuffer.wrap(new byte[]{1,2})));
}

@Test
public void shouldReturnNullForNegativeLengthString() {
final String result = udf.lpad("foo", -1, "bar");
assertThat(result, is(nullValue()));
}

@Test
public void shouldReturnNullForNullLength() {
public void shouldReturnNullForNegativeLengthBytes() {
final ByteBuffer result = udf.lpad(BYTES_123, -1, BYTES_45);
assertThat(result, is(nullValue()));
}

@Test
public void shouldReturnNullForNullLengthString() {
final String result = udf.lpad("foo", null, "bar");
assertThat(result, is(nullValue()));
}

@Test
public void shouldReturnNullForNullLengthBytes() {
final ByteBuffer result = udf.lpad(BYTES_123, null, BYTES_45);
assertThat(result, is(nullValue()));
}

@Test
public void shouldReturnEmptyStringForZeroLength() {
final String result = udf.lpad("foo", 0, "bar");
assertThat(result, is(""));
}

@Test
public void shouldReturnEmptyByteBufferForZeroLength() {
final ByteBuffer result = udf.lpad(BYTES_123, 0, BYTES_45);
assertThat(result, is(EMPTY_BYTES));
}

}
Loading

0 comments on commit f0c23b1

Please sign in to comment.