Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix: the Abs, Ceil and Floor methods now return proper types
Browse files Browse the repository at this point in the history
BREAKING CHANGE: abs, ceil and floor will now return types aligned with
other databases systems (i.e. the same type as the input). Previously
these udfs would always return Double.
agavra committed Nov 22, 2019
1 parent 8cc913a commit 50d9675
Showing 9 changed files with 148 additions and 89 deletions.
Original file line number Diff line number Diff line change
@@ -29,7 +29,6 @@
import io.confluent.ksql.function.udf.UdfMetadata;
import io.confluent.ksql.function.udf.json.ArrayContainsKudf;
import io.confluent.ksql.function.udf.json.JsonExtractStringKudf;
import io.confluent.ksql.function.udf.math.CeilKudf;
import io.confluent.ksql.function.udf.math.RandomKudf;
import io.confluent.ksql.function.udf.string.ConcatKudf;
import io.confluent.ksql.function.udf.string.IfNullKudf;
@@ -62,6 +61,7 @@ public InternalFunctionRegistry() {
new BuiltInInitializer(this).init();
}

@Override
public synchronized UdfFactory getUdfFactory(final String functionName) {
final UdfFactory udfFactory = udfs.get(functionName.toUpperCase());
if (udfFactory == null) {
@@ -107,6 +107,7 @@ public synchronized boolean isAggregate(final String functionName) {
return udafs.containsKey(functionName.toUpperCase());
}

@Override
public synchronized boolean isTableFunction(final String functionName) {
return udtfs.containsKey(functionName.toUpperCase());
}
@@ -308,14 +309,6 @@ private void addStringFunctions() {
}

private void addMathFunctions() {

addBuiltInFunction(KsqlScalarFunction.createLegacyBuiltIn(
SqlTypes.DOUBLE,
Collections.singletonList(ParamTypes.DOUBLE),
FunctionName.of("CEIL"),
CeilKudf.class
));

addBuiltInFunction(KsqlScalarFunction.createLegacyBuiltIn(
SqlTypes.DOUBLE,
Collections.emptyList(),
Original file line number Diff line number Diff line change
@@ -34,27 +34,27 @@ public class Abs {


@Udf
public Double abs(@UdfParameter final Integer val) {
return (val == null) ? null : (double)Math.abs(val);
public Integer abs(@UdfParameter final Integer val) {
return (val == null) ? null : Math.abs(val);
}

@Udf
public Double abs(@UdfParameter final Long val) {
return (val == null) ? null : (double)Math.abs(val);
public Long abs(@UdfParameter final Long val) {
return (val == null) ? null : Math.abs(val);
}

@Udf
public Double abs(@UdfParameter final Double val) {
return (val == null) ? null : Math.abs(val);
}

@Udf(schemaProvider = "provideSchema")
@Udf(schemaProvider = "absDecimalProvider")
public BigDecimal abs(@UdfParameter final BigDecimal val) {
return (val == null) ? null : val.abs();
}

@UdfSchemaProvider
public SqlType provideSchema(final List<SqlType> params) {
public SqlType absDecimalProvider(final List<SqlType> params) {
final SqlType s = params.get(0);
if (s.baseType() != SqlBaseType.DECIMAL) {
throw new KsqlException("The schema provider method for Abs expects a BigDecimal parameter"
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.math;

import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.function.udf.UdfSchemaProvider;
import io.confluent.ksql.schema.ksql.SqlBaseType;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.util.KsqlException;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.List;

@UdfDescription(name = "Ceil", description = Ceil.DESCRIPTION)
public class Ceil {

static final String DESCRIPTION = "Returns the smallest integer greater than or equal to the "
+ "specified numeric expression.";

@Udf
public Integer ceil(@UdfParameter final Integer val) {
return val;
}

@Udf
public Long ceil(@UdfParameter final Long val) {
return val;
}

@Udf
public Double ceil(@UdfParameter final Double val) {
return (val == null) ? null : Math.ceil(val);
}

@Udf(schemaProvider = "ceilDecimalProvider")
public BigDecimal ceil(@UdfParameter final BigDecimal val) {
return val == null
? null
: val.setScale(0, RoundingMode.CEILING).setScale(val.scale(), RoundingMode.UNNECESSARY);
}

@UdfSchemaProvider
public SqlType ceilDecimalProvider(final List<SqlType> params) {
final SqlType s = params.get(0);
if (s.baseType() != SqlBaseType.DECIMAL) {
throw new KsqlException("The schema provider method for Ceil expects a BigDecimal parameter"
+ "type");
}
return s;
}

}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -18,34 +18,51 @@
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.function.udf.UdfSchemaProvider;
import io.confluent.ksql.schema.ksql.SqlBaseType;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.util.KsqlException;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.List;

@UdfDescription(name = "Floor", description = Floor.DESCRIPTION)
public class Floor {

static final String DESCRIPTION = "Returns the largest integer less than or equal to the "
+ "specified numeric expression. NOTE: for backwards compatibility, this returns a DOUBLE "
+ "that has a mantissa of zero.";
+ "specified numeric expression.";


@Udf
public Double floor(@UdfParameter final Integer val) {
return (val == null) ? null : Math.floor(val);
public Integer floor(@UdfParameter final Integer val) {
return val;
}

@Udf
public Double floor(@UdfParameter final Long val) {
return (val == null) ? null : Math.floor(val);
public Long floor(@UdfParameter final Long val) {
return val;
}

@Udf
public Double floor(@UdfParameter final Double val) {
return (val == null) ? null : Math.floor(val);
}

@Udf
public Double floor(@UdfParameter final BigDecimal val) {
return (val == null) ? null : Math.floor(val.doubleValue());
@Udf(schemaProvider = "floorDecimalProvider")
public BigDecimal floor(@UdfParameter final BigDecimal val) {
return val == null
? null
: val.setScale(0, RoundingMode.FLOOR).setScale(val.scale(), RoundingMode.UNNECESSARY);
}

@UdfSchemaProvider
public SqlType floorDecimalProvider(final List<SqlType> params) {
final SqlType s = params.get(0);
if (s.baseType() != SqlBaseType.DECIMAL) {
throw new KsqlException("The schema provider method for Floor expects a BigDecimal parameter"
+ "type");
}
return s;
}

}
Original file line number Diff line number Diff line change
@@ -40,7 +40,7 @@
}
CONFIGS_END
CSAS_OUTPUT_0.KsqlTopic.source = STRUCT<I INT, L BIGINT, D DOUBLE, B DECIMAL(2, 1)> NOT NULL
CSAS_OUTPUT_0.OUTPUT = STRUCT<I DOUBLE, L DOUBLE, D DOUBLE, B DECIMAL(2, 1)> NOT NULL
CSAS_OUTPUT_0.OUTPUT = STRUCT<I INT, L BIGINT, D DOUBLE, B DECIMAL(2, 1)> NOT NULL
SCHEMAS_END
Topologies:
Sub-topology: 0
Original file line number Diff line number Diff line change
@@ -39,12 +39,12 @@
"ksql.query.persistent.active.limit" : "2147483647"
}
CONFIGS_END
CSAS_OUTPUT_0.KsqlTopic.source = STRUCT<V DOUBLE> NOT NULL
CSAS_OUTPUT_0.OUTPUT = STRUCT<C0 DOUBLE> NOT NULL
CSAS_OUTPUT_0.KsqlTopic.source = STRUCT<I INT, L BIGINT, D DOUBLE, B DECIMAL(2, 1)> NOT NULL
CSAS_OUTPUT_0.OUTPUT = STRUCT<I INT, L BIGINT, D DOUBLE, B DECIMAL(2, 1)> NOT NULL
SCHEMAS_END
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic])
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-TRANSFORMVALUES-0000000001
Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
--> SELECT-0
Original file line number Diff line number Diff line change
@@ -40,7 +40,7 @@
}
CONFIGS_END
CSAS_OUTPUT_0.KsqlTopic.source = STRUCT<I INT, L BIGINT, D DOUBLE, B DECIMAL(2, 1)> NOT NULL
CSAS_OUTPUT_0.OUTPUT = STRUCT<I DOUBLE, L DOUBLE, D DOUBLE, B DOUBLE> NOT NULL
CSAS_OUTPUT_0.OUTPUT = STRUCT<I INT, L BIGINT, D DOUBLE, B DECIMAL(2, 1)> NOT NULL
SCHEMAS_END
Topologies:
Sub-topology: 0
Original file line number Diff line number Diff line change
@@ -102,43 +102,55 @@
],
"outputs": [
{"topic": "OUTPUT", "value": {"I": null, "L": null, "D": null, "B": null}},
{"topic": "OUTPUT", "value": {"I": 0.0, "L": 0.0, "D": 0.0, "B": 0.0}},
{"topic": "OUTPUT", "value": {"I": 1.0, "L": 1.0, "D": 1.0, "B": 1.0}},
{"topic": "OUTPUT", "value": {"I": 1.0, "L": 1.0, "D": 1.0, "B": 1.0}},
{"topic": "OUTPUT", "value": {"I": 1.0, "L": 1.0, "D": 1.0, "B": 1.0}},
{"topic": "OUTPUT", "value": {"I": 1.0, "L": 1.0, "D": 1.0, "B": 1.0}},
{"topic": "OUTPUT", "value": {"I": 1.0, "L": 1.0, "D": 2.0, "B": 2.0}},
{"topic": "OUTPUT", "value": {"I": 0, "L": 0, "D": 0.0, "B": 0.0}},
{"topic": "OUTPUT", "value": {"I": 1, "L": 1, "D": 1.0, "B": 1.0}},
{"topic": "OUTPUT", "value": {"I": 1, "L": 1, "D": 1.0, "B": 1.0}},
{"topic": "OUTPUT", "value": {"I": 1, "L": 1, "D": 1.0, "B": 1.0}},
{"topic": "OUTPUT", "value": {"I": 1, "L": 1, "D": 1.0, "B": 1.0}},
{"topic": "OUTPUT", "value": {"I": 1, "L": 1, "D": 2.0, "B": 2.0}},

{"topic": "OUTPUT", "value": {"I": -1.0, "L": -1.0, "D": -1.0, "B": -1.0}},
{"topic": "OUTPUT", "value": {"I": -1.0, "L": -1.0, "D": -2.0, "B": -2.0}},
{"topic": "OUTPUT", "value": {"I": -1.0, "L": -1.0, "D": -2.0, "B": -2.0}},
{"topic": "OUTPUT", "value": {"I": -1.0, "L": -1.0, "D": -2.0, "B": -2.0}},
{"topic": "OUTPUT", "value": {"I": -1.0, "L": -1.0, "D": -2.0, "B": -2.0}}
{"topic": "OUTPUT", "value": {"I": -1, "L": -1, "D": -1.0, "B": -1.0}},
{"topic": "OUTPUT", "value": {"I": -1, "L": -1, "D": -2.0, "B": -2.0}},
{"topic": "OUTPUT", "value": {"I": -1, "L": -1, "D": -2.0, "B": -2.0}},
{"topic": "OUTPUT", "value": {"I": -1, "L": -1, "D": -2.0, "B": -2.0}},
{"topic": "OUTPUT", "value": {"I": -1, "L": -1, "D": -2.0, "B": -2.0}}
]
},
{
"name": "calculate CEIL function",
"statements": [
"CREATE STREAM test (v DOUBLE) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT CEIL(v) as C0 FROM test;"
"CREATE STREAM INPUT (i INT, l BIGINT, d DOUBLE, b DECIMAL(2,1)) WITH (kafka_topic='input', value_format='AVRO');",
"CREATE STREAM OUTPUT AS SELECT ceil(i) i, ceil(l) l, ceil(d) d, ceil(b) b FROM INPUT;"
],
"inputs": [
{"topic": "test_topic", "value": {"v" : 1.2}},
{"topic": "test_topic", "value": {"v" : 1.7}},
{"topic": "test_topic", "value": {"v" : 1.5}},
{"topic": "test_topic", "value": {"v" : 3}},
{"topic": "test_topic", "value": {"v" : 1.234567}},
{"topic": "test_topic", "value": {"v" : 0}},
{"topic": "test_topic", "value": {"v" : null}}
{"topic": "input", "value": {"i": null, "l": null, "d": null}},
{"topic": "input", "value": {"i": 0, "l": 0, "d": 0.0, "b": "0.0"}},
{"topic": "input", "value": {"i": 1, "l": 1, "d": 1.0, "b": "1.0"}},
{"topic": "input", "value": {"i": 1, "l": 1, "d": 1.1, "b": "1.1"}},
{"topic": "input", "value": {"i": 1, "l": 1, "d": 1.5, "b": "1.5"}},
{"topic": "input", "value": {"i": 1, "l": 1, "d": 1.7, "b": "1.7"}},
{"topic": "input", "value": {"i": 1, "l": 1, "d": 2.0, "b": "2.0"}},

{"topic": "input", "value": {"i": -1, "l": -1, "d": -1.0, "b": "-1.0"}},
{"topic": "input", "value": {"i": -1, "l": -1, "d": -1.1, "b": "-1.1"}},
{"topic": "input", "value": {"i": -1, "l": -1, "d": -1.5, "b": "-1.5"}},
{"topic": "input", "value": {"i": -1, "l": -1, "d": -1.7, "b": "-1.7"}},
{"topic": "input", "value": {"i": -1, "l": -1, "d": -2.0, "b": "-2.0"}}
],
"outputs": [
{"topic": "OUTPUT", "value": {"C0" : 2.0}},
{"topic": "OUTPUT", "value": {"C0" : 2.0}},
{"topic": "OUTPUT", "value": {"C0" : 2.0}},
{"topic": "OUTPUT", "value": {"C0" : 3.0}},
{"topic": "OUTPUT", "value": {"C0" : 2.0}},
{"topic": "OUTPUT", "value": {"C0" : 0.0}},
{"topic": "OUTPUT", "value": {"C0" : null}}
{"topic": "OUTPUT", "value": {"I": null, "L": null, "D": null, "B": null}},
{"topic": "OUTPUT", "value": {"I": 0, "L": 0, "D": 0.0, "B": 0.0}},
{"topic": "OUTPUT", "value": {"I": 1, "L": 1, "D": 1.0, "B": 1.0}},
{"topic": "OUTPUT", "value": {"I": 1, "L": 1, "D": 2.0, "B": 2.0}},
{"topic": "OUTPUT", "value": {"I": 1, "L": 1, "D": 2.0, "B": 2.0}},
{"topic": "OUTPUT", "value": {"I": 1, "L": 1, "D": 2.0, "B": 2.0}},
{"topic": "OUTPUT", "value": {"I": 1, "L": 1, "D": 2.0, "B": 2.0}},

{"topic": "OUTPUT", "value": {"I": -1, "L": -1, "D": -1.0, "B": -1.0}},
{"topic": "OUTPUT", "value": {"I": -1, "L": -1, "D": -1.0, "B": -1.0}},
{"topic": "OUTPUT", "value": {"I": -1, "L": -1, "D": -1.0, "B": -1.0}},
{"topic": "OUTPUT", "value": {"I": -1, "L": -1, "D": -1.0, "B": -1.0}},
{"topic": "OUTPUT", "value": {"I": -1, "L": -1, "D": -2.0, "B": -2.0}}
]
},
{
@@ -155,9 +167,9 @@
],
"outputs": [
{"topic": "OUTPUT", "value": {"I": null, "L": null, "D": null, "B": null}},
{"topic": "OUTPUT", "value": {"I": 1.0, "L": 2.0, "D": 3.1, "B": "3.2"}},
{"topic": "OUTPUT", "value": {"I": 0.0, "L": 0.0, "D": 0.0, "B": "0.0"}},
{"topic": "OUTPUT", "value": {"I": 1.0, "L": 2.0, "D": 3.3, "B": "3.4"}}
{"topic": "OUTPUT", "value": {"I": 1, "L": 2, "D": 3.1, "B": "3.2"}},
{"topic": "OUTPUT", "value": {"I": 0, "L": 0, "D": 0.0, "B": "0.0"}},
{"topic": "OUTPUT", "value": {"I": 1, "L": 2, "D": 3.3, "B": "3.4"}}
]
},
{

0 comments on commit 50d9675

Please sign in to comment.