Skip to content

Commit

Permalink
feat: Adds UDF regexp_replace (#5504)
Browse files Browse the repository at this point in the history
* feat: Adds UDF regexp_replace
  • Loading branch information
AlanConfluent authored Jun 2, 2020
1 parent 61e5a1b commit 30309bf
Show file tree
Hide file tree
Showing 7 changed files with 376 additions and 0 deletions.
10 changes: 10 additions & 0 deletions docs/developer-guide/ksqldb-reference/scalar-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,16 @@ the entire substring is returned by default.
For example, `REGEXP_EXTRACT("(\\w+) (\\w+)", 'hello there nice day', 2)`
returns `['there', 'day']`.

### `REGEXP_REPLACE`

```sql
REGEXP_REPLACE(col1, 'a.b+', 'bar')
```

Replace all matches of a regex in an input string with a new string.
If either the input string, regular expression, or new string is null,
the result is null.

### `REGEXP_SPLIT_TO_ARRAY`

```sql
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.string;

import io.confluent.ksql.function.KsqlFunctionException;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.util.KsqlConstants;
import java.util.regex.PatternSyntaxException;

@UdfDescription(name = "regexp_replace",
author = KsqlConstants.CONFLUENT_AUTHOR,
description = "Replaces all matches of a regexp in a string with a new substring.")
public class RegexpReplace {

@Udf(description = "Returns a new string with all matches of regexp in str replaced with newStr")
public String regexpReplace(
@UdfParameter(
description = "The source string. If null, then function returns null.") final String str,
@UdfParameter(
description = "The regexp to match."
+ " If null, then function returns null.") final String regexp,
@UdfParameter(
description = "The string to replace the matches with."
+ " If null, then function returns null.") final String newStr) {
if (str == null || regexp == null || newStr == null) {
return null;
}

try {
return str.replaceAll(regexp, newStr);
} catch (PatternSyntaxException e) {
throw new KsqlFunctionException("Invalid regular expression pattern: " + regexp, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.string;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isEmptyOrNullString;

import io.confluent.ksql.function.KsqlFunctionException;
import org.junit.Test;

public class RegexpReplaceTest {

private RegexpReplace udf = new RegexpReplace();

@Test
public void shouldHandleNull() {
assertThat(udf.regexpReplace(null, "foo", "bar"), isEmptyOrNullString());
assertThat(udf.regexpReplace("foo", null, "bar"), isEmptyOrNullString());
assertThat(udf.regexpReplace("foo", "bar", null), isEmptyOrNullString());
}

@Test
public void shouldReplace() {
assertThat(udf.regexpReplace("foobar", "foo", "bar"), is("barbar"));
assertThat(udf.regexpReplace("foobar", "fooo", "bar"), is("foobar"));
assertThat(udf.regexpReplace("foobar", "o", ""), is("fbar"));
assertThat(udf.regexpReplace("abc", "", "n"), is("nanbncn"));

assertThat(udf.regexpReplace("foobar", "(foo|bar)", "cat"), is("catcat"));
assertThat(udf.regexpReplace("foobar", "^foo", "cat"), is("catbar"));
assertThat(udf.regexpReplace("foobar", "^bar", "cat"), is("foobar"));
assertThat(udf.regexpReplace("barbar", "bar$", "cat"), is("barcat"));
assertThat(udf.regexpReplace("aababa", "ab", "xy"), is("axyxya"));
assertThat(udf.regexpReplace("aababa", "(ab)+", "xy"), is("axya"));
}

@Test(expected = KsqlFunctionException.class)
public void shouldThrowExceptionOnBadPattern() {
udf.regexpReplace("foobar", "(()", "bar");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM TEST (K STRING KEY, INPUT_STRING STRING, PATTERN STRING) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "TEST",
"schema" : "`K` STRING KEY, `INPUT_STRING` STRING, `PATTERN` STRING",
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
}
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM OUTPUT AS SELECT\n TEST.K K,\n REGEXP_REPLACE(TEST.INPUT_STRING, TEST.PATTERN, 'cat') EXTRACTED\nFROM TEST TEST\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "OUTPUT",
"schema" : "`K` STRING KEY, `EXTRACTED` STRING",
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
}
},
"queryPlan" : {
"sources" : [ "TEST" ],
"sink" : "OUTPUT",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "OUTPUT"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"sourceSchema" : "`K` STRING KEY, `INPUT_STRING` STRING, `PATTERN` STRING"
},
"keyColumnNames" : [ "K" ],
"selectExpressions" : [ "REGEXP_REPLACE(INPUT_STRING, PATTERN, 'cat') AS EXTRACTED" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"topicName" : "OUTPUT"
},
"queryId" : "CSAS_OUTPUT_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"ksql.transient.prefix" : "transient_",
"ksql.persistence.wrap.single.values" : "true",
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.schema.registry.url" : "",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.query.pull.max.qps" : "2147483647",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.metric.reporters" : "",
"ksql.query.pull.metrics.enabled" : "false",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.metrics.extension" : null,
"ksql.streams.topology.optimization" : "all",
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.streams.num.stream.threads" : "4",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.metrics.tags.custom" : "",
"ksql.pull.queries.enable" : "true",
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.udf.collect.metrics" : "false",
"ksql.persistent.prefix" : "query_",
"ksql.query.persistent.active.limit" : "2147483647",
"ksql.error.classifier.regex" : ""
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
{
"version" : "6.0.0",
"timestamp" : 1591049420147,
"path" : "query-validation-tests/replace.json",
"schemas" : {
"CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT<INPUT_STRING VARCHAR, PATTERN VARCHAR> NOT NULL",
"CSAS_OUTPUT_0.OUTPUT" : "STRUCT<EXTRACTED VARCHAR> NOT NULL"
},
"testCase" : {
"name" : "regexp_replace",
"inputs" : [ {
"topic" : "test_topic",
"key" : "",
"value" : {
"input_string" : "baababaa",
"pattern" : "(ab)+"
}
}, {
"topic" : "test_topic",
"key" : "",
"value" : {
"input_string" : "baabbabaa",
"pattern" : "(ab)+"
}
}, {
"topic" : "test_topic",
"key" : "",
"value" : {
"input_string" : null,
"pattern" : "(ab)+"
}
}, {
"topic" : "test_topic",
"key" : "",
"value" : {
"input_string" : "baababaa",
"pattern" : null
}
} ],
"outputs" : [ {
"topic" : "OUTPUT",
"key" : "",
"value" : {
"EXTRACTED" : "bacataa"
}
}, {
"topic" : "OUTPUT",
"key" : "",
"value" : {
"EXTRACTED" : "bacatbcataa"
}
}, {
"topic" : "OUTPUT",
"key" : "",
"value" : {
"EXTRACTED" : null
}
}, {
"topic" : "OUTPUT",
"key" : "",
"value" : {
"EXTRACTED" : null
}
} ],
"topics" : [ {
"name" : "OUTPUT",
"replicas" : 1,
"numPartitions" : 4
}, {
"name" : "test_topic",
"replicas" : 1,
"numPartitions" : 4
} ],
"statements" : [ "CREATE STREAM TEST (K STRING KEY, input_string VARCHAR, pattern VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE STREAM OUTPUT AS SELECT K, REGEXP_REPLACE(input_string, pattern, 'cat') AS EXTRACTED FROM TEST;" ],
"post" : {
"topics" : {
"topics" : [ {
"name" : "OUTPUT",
"keyFormat" : {
"formatInfo" : {
"format" : "KAFKA"
}
},
"valueFormat" : {
"format" : "JSON"
},
"partitions" : 4
}, {
"name" : "test_topic",
"keyFormat" : {
"formatInfo" : {
"format" : "KAFKA"
}
},
"valueFormat" : {
"format" : "JSON"
},
"partitions" : 4
} ]
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic])
--> KSTREAM-TRANSFORMVALUES-0000000001
Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
--> Project
<-- KSTREAM-SOURCE-0000000000
Processor: Project (stores: [])
--> KSTREAM-SINK-0000000003
<-- KSTREAM-TRANSFORMVALUES-0000000001
Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT)
<-- Project

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,25 @@
{"topic": "OUTPUT", "value": {"REPLACE":null, "REPLACE_NULL":null, "REPLACE_W_NULL":null}},
{"topic": "OUTPUT", "value": {"REPLACE":"popcorn", "REPLACE_NULL":null, "REPLACE_W_NULL":null}}
]
},
{
"name": "regexp_replace",
"statements": [
"CREATE STREAM TEST (K STRING KEY, input_string VARCHAR, pattern VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT K, REGEXP_REPLACE(input_string, pattern, 'cat') AS EXTRACTED FROM TEST;"
],
"inputs": [
{"topic": "test_topic", "value": {"input_string": "baababaa", "pattern": "(ab)+"}},
{"topic": "test_topic", "value": {"input_string": "baabbabaa", "pattern": "(ab)+"}},
{"topic": "test_topic", "value": {"input_string": null, "pattern": "(ab)+"}},
{"topic": "test_topic", "value": {"input_string": "baababaa", "pattern": null}}
],
"outputs": [
{"topic": "OUTPUT", "value": {"EXTRACTED": "bacataa"}},
{"topic": "OUTPUT", "value": {"EXTRACTED": "bacatbcataa"}},
{"topic": "OUTPUT", "value": {"EXTRACTED": null}},
{"topic": "OUTPUT", "value": {"EXTRACTED": null}}
]
}
]
}

0 comments on commit 30309bf

Please sign in to comment.