Skip to content

Commit

Permalink
fix: the Abs, Ceil and Floor methods now return proper types (#3948)
Browse files Browse the repository at this point in the history
BREAKING CHANGE: abs, ceil and floor will now return types aligned with
other databases systems (i.e. the same type as the input). Previously
these udfs would always return Double.
  • Loading branch information
agavra authored Nov 22, 2019
1 parent 60a4275 commit 3d6e119
Show file tree
Hide file tree
Showing 20 changed files with 171 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import io.confluent.ksql.function.udf.UdfMetadata;
import io.confluent.ksql.function.udf.json.ArrayContainsKudf;
import io.confluent.ksql.function.udf.json.JsonExtractStringKudf;
import io.confluent.ksql.function.udf.math.CeilKudf;
import io.confluent.ksql.function.udf.math.RandomKudf;
import io.confluent.ksql.function.udf.string.ConcatKudf;
import io.confluent.ksql.function.udf.string.IfNullKudf;
Expand Down Expand Up @@ -62,6 +61,7 @@ public InternalFunctionRegistry() {
new BuiltInInitializer(this).init();
}

@Override
public synchronized UdfFactory getUdfFactory(final String functionName) {
final UdfFactory udfFactory = udfs.get(functionName.toUpperCase());
if (udfFactory == null) {
Expand Down Expand Up @@ -107,6 +107,7 @@ public synchronized boolean isAggregate(final String functionName) {
return udafs.containsKey(functionName.toUpperCase());
}

@Override
public synchronized boolean isTableFunction(final String functionName) {
return udtfs.containsKey(functionName.toUpperCase());
}
Expand Down Expand Up @@ -308,14 +309,6 @@ private void addStringFunctions() {
}

private void addMathFunctions() {

addBuiltInFunction(KsqlScalarFunction.createLegacyBuiltIn(
SqlTypes.DOUBLE,
Collections.singletonList(ParamTypes.DOUBLE),
FunctionName.of("CEIL"),
CeilKudf.class
));

addBuiltInFunction(KsqlScalarFunction.createLegacyBuiltIn(
SqlTypes.DOUBLE,
Collections.emptyList(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,27 @@ public class Abs {


@Udf
public Double abs(@UdfParameter final Integer val) {
return (val == null) ? null : (double)Math.abs(val);
public Integer abs(@UdfParameter final Integer val) {
return (val == null) ? null : Math.abs(val);
}

@Udf
public Double abs(@UdfParameter final Long val) {
return (val == null) ? null : (double)Math.abs(val);
public Long abs(@UdfParameter final Long val) {
return (val == null) ? null : Math.abs(val);
}

@Udf
public Double abs(@UdfParameter final Double val) {
return (val == null) ? null : Math.abs(val);
}

@Udf(schemaProvider = "provideSchema")
@Udf(schemaProvider = "absDecimalProvider")
public BigDecimal abs(@UdfParameter final BigDecimal val) {
return (val == null) ? null : val.abs();
}

@UdfSchemaProvider
public SqlType provideSchema(final List<SqlType> params) {
public SqlType absDecimalProvider(final List<SqlType> params) {
final SqlType s = params.get(0);
if (s.baseType() != SqlBaseType.DECIMAL) {
throw new KsqlException("The schema provider method for Abs expects a BigDecimal parameter"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.math;

import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.function.udf.UdfSchemaProvider;
import io.confluent.ksql.schema.ksql.SqlBaseType;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.util.KsqlException;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.List;

@UdfDescription(name = "Ceil", description = Ceil.DESCRIPTION)
public class Ceil {

static final String DESCRIPTION = "Returns the smallest integer greater than or equal to the "
+ "specified numeric expression.";

@Udf
public Integer ceil(@UdfParameter final Integer val) {
return val;
}

@Udf
public Long ceil(@UdfParameter final Long val) {
return val;
}

@Udf
public Double ceil(@UdfParameter final Double val) {
return (val == null) ? null : Math.ceil(val);
}

@Udf(schemaProvider = "ceilDecimalProvider")
public BigDecimal ceil(@UdfParameter final BigDecimal val) {
return val == null
? null
: val.setScale(0, RoundingMode.CEILING).setScale(val.scale(), RoundingMode.UNNECESSARY);
}

@UdfSchemaProvider
public SqlType ceilDecimalProvider(final List<SqlType> params) {
final SqlType s = params.get(0);
if (s.baseType() != SqlBaseType.DECIMAL) {
throw new KsqlException("The schema provider method for Ceil expects a BigDecimal parameter"
+ "type");
}
return s;
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,51 @@
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.function.udf.UdfSchemaProvider;
import io.confluent.ksql.schema.ksql.SqlBaseType;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.util.KsqlException;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.List;

@UdfDescription(name = "Floor", description = Floor.DESCRIPTION)
public class Floor {

static final String DESCRIPTION = "Returns the largest integer less than or equal to the "
+ "specified numeric expression. NOTE: for backwards compatibility, this returns a DOUBLE "
+ "that has a mantissa of zero.";
+ "specified numeric expression.";


@Udf
public Double floor(@UdfParameter final Integer val) {
return (val == null) ? null : Math.floor(val);
public Integer floor(@UdfParameter final Integer val) {
return val;
}

@Udf
public Double floor(@UdfParameter final Long val) {
return (val == null) ? null : Math.floor(val);
public Long floor(@UdfParameter final Long val) {
return val;
}

@Udf
public Double floor(@UdfParameter final Double val) {
return (val == null) ? null : Math.floor(val);
}

@Udf
public Double floor(@UdfParameter final BigDecimal val) {
return (val == null) ? null : Math.floor(val.doubleValue());
@Udf(schemaProvider = "floorDecimalProvider")
public BigDecimal floor(@UdfParameter final BigDecimal val) {
return val == null
? null
: val.setScale(0, RoundingMode.FLOOR).setScale(val.scale(), RoundingMode.UNNECESSARY);
}

@UdfSchemaProvider
public SqlType floorDecimalProvider(final List<SqlType> params) {
final SqlType s = params.get(0);
if (s.baseType() != SqlBaseType.DECIMAL) {
throw new KsqlException("The schema provider method for Floor expects a BigDecimal parameter"
+ "type");
}
return s;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ public void shouldHaveBuiltInUDFRegistered() {
// String UDF
"LCASE", "UCASE", "CONCAT", "TRIM", "IFNULL", "LEN",
// Math UDF
"CEIL", "RANDOM",
"RANDOM",
// JSON UDF
"EXTRACTJSONFIELD", "ARRAYCONTAINS",
// Struct UDF
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,16 @@ public void shouldHandleNull() {

@Test
public void shouldHandleNegative() {
assertThat(udf.abs(-1), is(1.0));
assertThat(udf.abs(-1L), is(1.0));
assertThat(udf.abs(-1), is(1));
assertThat(udf.abs(-1L), is(1L));
assertThat(udf.abs(-1.0), is(1.0));
assertThat(udf.abs(new BigDecimal(-1)), is(new BigDecimal(-1).abs()));
}

@Test
public void shouldHandlePositive() {
assertThat(udf.abs(1), is(1.0));
assertThat(udf.abs(1L), is(1.0));
assertThat(udf.abs(1), is(1));
assertThat(udf.abs(1L), is(1L));
assertThat(udf.abs(1.0), is(1.0));
assertThat(udf.abs(new BigDecimal(1)), is(new BigDecimal(1).abs()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.confluent.ksql.execution.streams.SelectValueMapperFactory;
import io.confluent.ksql.execution.util.StructKeyUtil;
import io.confluent.ksql.function.InternalFunctionRegistry;
import io.confluent.ksql.function.TestFunctionRegistry;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.planner.plan.PlanNode;
Expand All @@ -47,7 +48,7 @@ public class SelectValueMapperIntegrationTest {
private static final Struct NON_WINDOWED_KEY = StructKeyUtil.asStructKey("someKey");

private final MetaStore metaStore = MetaStoreFixture
.getNewMetaStore(new InternalFunctionRegistry());
.getNewMetaStore(TestFunctionRegistry.INSTANCE.get());

private final KsqlConfig ksqlConfig = new KsqlConfig(Collections.emptyMap());

Expand Down Expand Up @@ -99,7 +100,7 @@ private KsqlValueTransformerWithKey<Struct> givenSelectMapperFor(final String qu
selectExpressions,
schema,
ksqlConfig,
new InternalFunctionRegistry()
TestFunctionRegistry.INSTANCE.get()
).getTransformer(processingLogger);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ CONFIGS_END
CTAS_AVG_0.KsqlTopic.source = STRUCT<ID BIGINT, NAME VARCHAR, VALUE BIGINT> NOT NULL
CTAS_AVG_0.Aggregate.groupby = STRUCT<KSQL_INTERNAL_COL_0 BIGINT, KSQL_INTERNAL_COL_1 BIGINT> NOT NULL
CTAS_AVG_0.Aggregate.aggregate = STRUCT<KSQL_INTERNAL_COL_0 BIGINT, KSQL_INTERNAL_COL_1 BIGINT, KSQL_AGG_VARIABLE_0 BIGINT, KSQL_AGG_VARIABLE_1 BIGINT> NOT NULL
CTAS_AVG_0.AVG = STRUCT<ID BIGINT, AVG DOUBLE> NOT NULL
CTAS_AVG_0.AVG = STRUCT<ID BIGINT, AVG BIGINT> NOT NULL
SCHEMAS_END
Topologies:
Sub-topology: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
}
CONFIGS_END
CSAS_OUTPUT_0.KsqlTopic.source = STRUCT<MY_ARR ARRAY<BIGINT>, F1 BIGINT, F2 BIGINT> NOT NULL
CSAS_OUTPUT_0.OUTPUT = STRUCT<KSQL_COL_0 DOUBLE, KSQL_COL_1 BIGINT, KSQL_COL_2 DOUBLE> NOT NULL
CSAS_OUTPUT_0.OUTPUT = STRUCT<KSQL_COL_0 BIGINT, KSQL_COL_1 BIGINT, KSQL_COL_2 BIGINT> NOT NULL
SCHEMAS_END
Topologies:
Sub-topology: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
}
CONFIGS_END
CSAS_OUTPUT_0.KsqlTopic.source = STRUCT<I INT, L BIGINT, D DOUBLE, B DECIMAL(2, 1)> NOT NULL
CSAS_OUTPUT_0.OUTPUT = STRUCT<I DOUBLE, L DOUBLE, D DOUBLE, B DECIMAL(2, 1)> NOT NULL
CSAS_OUTPUT_0.OUTPUT = STRUCT<I INT, L BIGINT, D DOUBLE, B DECIMAL(2, 1)> NOT NULL
SCHEMAS_END
Topologies:
Sub-topology: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@
"ksql.query.persistent.active.limit" : "2147483647"
}
CONFIGS_END
CSAS_OUTPUT_0.KsqlTopic.source = STRUCT<V DOUBLE> NOT NULL
CSAS_OUTPUT_0.OUTPUT = STRUCT<C0 DOUBLE> NOT NULL
CSAS_OUTPUT_0.KsqlTopic.source = STRUCT<I INT, L BIGINT, D DOUBLE, B DECIMAL(2, 1)> NOT NULL
CSAS_OUTPUT_0.OUTPUT = STRUCT<I INT, L BIGINT, D DOUBLE, B DECIMAL(2, 1)> NOT NULL
SCHEMAS_END
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic])
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-TRANSFORMVALUES-0000000001
Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
--> SELECT-0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
}
CONFIGS_END
CSAS_OUTPUT_0.KsqlTopic.source = STRUCT<I INT, L BIGINT, D DOUBLE, B DECIMAL(2, 1)> NOT NULL
CSAS_OUTPUT_0.OUTPUT = STRUCT<I DOUBLE, L DOUBLE, D DOUBLE, B DOUBLE> NOT NULL
CSAS_OUTPUT_0.OUTPUT = STRUCT<I INT, L BIGINT, D DOUBLE, B DECIMAL(2, 1)> NOT NULL
SCHEMAS_END
Topologies:
Sub-topology: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
}
CONFIGS_END
CSAS_OUTPUT_0.KsqlTopic.source = STRUCT<SELECT INT> NOT NULL
CSAS_OUTPUT_0.OUTPUT = STRUCT<FOO DOUBLE> NOT NULL
CSAS_OUTPUT_0.OUTPUT = STRUCT<FOO INT> NOT NULL
SCHEMAS_END
Topologies:
Sub-topology: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
}
CONFIGS_END
CSAS_OUTPUT_0.KsqlTopic.source = STRUCT<F0 INT, F1 INT, F2 INT, F3 INT> NOT NULL
CSAS_OUTPUT_0.OUTPUT = STRUCT<F0 INT, KSQL_COL_1 DOUBLE> NOT NULL
CSAS_OUTPUT_0.OUTPUT = STRUCT<F0 INT, KSQL_COL_1 INT> NOT NULL
SCHEMAS_END
Topologies:
Sub-topology: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
{"topic": "test_topic", "key": 1, "value": "1,one,10"}
],
"outputs": [
{"topic": "AVG", "key": 0, "value": "0,500.0"},
{"topic": "AVG", "key": 0, "value": "0,300.0"},
{"topic": "AVG", "key": 0, "value": "0,250.0"},
{"topic": "AVG", "key": 1, "value": "1,1000.0"},
{"topic": "AVG", "key": 1, "value": "1,550.0"}
{"topic": "AVG", "key": 0, "value": "0,500"},
{"topic": "AVG", "key": 0, "value": "0,300"},
{"topic": "AVG", "key": 0, "value": "0,250"},
{"topic": "AVG", "key": 1, "value": "1,1000"},
{"topic": "AVG", "key": 1, "value": "1,550"}
]
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
{"topic": "test_topic", "key": 0, "value": {"F1": 1, "F2": 2, "MY_ARR": [1, 2]}}
],
"outputs": [
{"topic": "OUTPUT", "key": "0", "value": {"KSQL_COL_0": 1.0, "KSQL_COL_1": 1, "KSQL_COL_2": 2.0}},
{"topic": "OUTPUT", "key": "0", "value": {"KSQL_COL_0": 1.0, "KSQL_COL_1": 2, "KSQL_COL_2": 2.0}}
{"topic": "OUTPUT", "key": "0", "value": {"KSQL_COL_0": 1, "KSQL_COL_1": 1, "KSQL_COL_2": 2}},
{"topic": "OUTPUT", "key": "0", "value": {"KSQL_COL_0": 1, "KSQL_COL_1": 2, "KSQL_COL_2": 2}}
]
},
{
Expand Down
Loading

0 comments on commit 3d6e119

Please sign in to comment.