diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/InternalFunctionRegistry.java b/ksql-engine/src/main/java/io/confluent/ksql/function/InternalFunctionRegistry.java index 54fa8a34ccb9..1f7cac08fdb4 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/function/InternalFunctionRegistry.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/InternalFunctionRegistry.java @@ -29,7 +29,6 @@ import io.confluent.ksql.function.udf.UdfMetadata; import io.confluent.ksql.function.udf.json.ArrayContainsKudf; import io.confluent.ksql.function.udf.json.JsonExtractStringKudf; -import io.confluent.ksql.function.udf.math.CeilKudf; import io.confluent.ksql.function.udf.math.RandomKudf; import io.confluent.ksql.function.udf.string.ConcatKudf; import io.confluent.ksql.function.udf.string.IfNullKudf; @@ -62,6 +61,7 @@ public InternalFunctionRegistry() { new BuiltInInitializer(this).init(); } + @Override public synchronized UdfFactory getUdfFactory(final String functionName) { final UdfFactory udfFactory = udfs.get(functionName.toUpperCase()); if (udfFactory == null) { @@ -107,6 +107,7 @@ public synchronized boolean isAggregate(final String functionName) { return udafs.containsKey(functionName.toUpperCase()); } + @Override public synchronized boolean isTableFunction(final String functionName) { return udtfs.containsKey(functionName.toUpperCase()); } @@ -308,14 +309,6 @@ private void addStringFunctions() { } private void addMathFunctions() { - - addBuiltInFunction(KsqlScalarFunction.createLegacyBuiltIn( - SqlTypes.DOUBLE, - Collections.singletonList(ParamTypes.DOUBLE), - FunctionName.of("CEIL"), - CeilKudf.class - )); - addBuiltInFunction(KsqlScalarFunction.createLegacyBuiltIn( SqlTypes.DOUBLE, Collections.emptyList(), diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/udf/math/Abs.java b/ksql-engine/src/main/java/io/confluent/ksql/function/udf/math/Abs.java index bf4566a66a2d..893329543ece 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/function/udf/math/Abs.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/udf/math/Abs.java @@ -34,13 +34,13 @@ public class Abs { @Udf - public Double abs(@UdfParameter final Integer val) { - return (val == null) ? null : (double)Math.abs(val); + public Integer abs(@UdfParameter final Integer val) { + return (val == null) ? null : Math.abs(val); } @Udf - public Double abs(@UdfParameter final Long val) { - return (val == null) ? null : (double)Math.abs(val); + public Long abs(@UdfParameter final Long val) { + return (val == null) ? null : Math.abs(val); } @Udf @@ -48,13 +48,13 @@ public Double abs(@UdfParameter final Double val) { return (val == null) ? null : Math.abs(val); } - @Udf(schemaProvider = "provideSchema") + @Udf(schemaProvider = "absDecimalProvider") public BigDecimal abs(@UdfParameter final BigDecimal val) { return (val == null) ? null : val.abs(); } @UdfSchemaProvider - public SqlType provideSchema(final List params) { + public SqlType absDecimalProvider(final List params) { final SqlType s = params.get(0); if (s.baseType() != SqlBaseType.DECIMAL) { throw new KsqlException("The schema provider method for Abs expects a BigDecimal parameter" diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/udf/math/Ceil.java b/ksql-engine/src/main/java/io/confluent/ksql/function/udf/math/Ceil.java new file mode 100644 index 000000000000..422d3a8a474e --- /dev/null +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/udf/math/Ceil.java @@ -0,0 +1,67 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.function.udf.math; + +import io.confluent.ksql.function.udf.Udf; +import io.confluent.ksql.function.udf.UdfDescription; +import io.confluent.ksql.function.udf.UdfParameter; +import io.confluent.ksql.function.udf.UdfSchemaProvider; +import io.confluent.ksql.schema.ksql.SqlBaseType; +import io.confluent.ksql.schema.ksql.types.SqlType; +import io.confluent.ksql.util.KsqlException; +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.util.List; + +@UdfDescription(name = "Ceil", description = Ceil.DESCRIPTION) +public class Ceil { + + static final String DESCRIPTION = "Returns the smallest integer greater than or equal to the " + + "specified numeric expression."; + + @Udf + public Integer ceil(@UdfParameter final Integer val) { + return val; + } + + @Udf + public Long ceil(@UdfParameter final Long val) { + return val; + } + + @Udf + public Double ceil(@UdfParameter final Double val) { + return (val == null) ? null : Math.ceil(val); + } + + @Udf(schemaProvider = "ceilDecimalProvider") + public BigDecimal ceil(@UdfParameter final BigDecimal val) { + return val == null + ? null + : val.setScale(0, RoundingMode.CEILING).setScale(val.scale(), RoundingMode.UNNECESSARY); + } + + @UdfSchemaProvider + public SqlType ceilDecimalProvider(final List params) { + final SqlType s = params.get(0); + if (s.baseType() != SqlBaseType.DECIMAL) { + throw new KsqlException("The schema provider method for Ceil expects a BigDecimal parameter" + + "type"); + } + return s; + } + +} diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/udf/math/CeilKudf.java b/ksql-engine/src/main/java/io/confluent/ksql/function/udf/math/CeilKudf.java deleted file mode 100644 index 14a4b3efd657..000000000000 --- a/ksql-engine/src/main/java/io/confluent/ksql/function/udf/math/CeilKudf.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright 2018 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.function.udf.math; - -import io.confluent.ksql.function.KsqlFunctionException; -import io.confluent.ksql.function.udf.Kudf; - -public class CeilKudf implements Kudf { - - @Override - public Object evaluate(final Object... args) { - if (args.length != 1) { - throw new KsqlFunctionException("Ceil udf should have one input argument."); - } - return Math.ceil((Double) args[0]); - } -} diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/udf/math/Floor.java b/ksql-engine/src/main/java/io/confluent/ksql/function/udf/math/Floor.java index 34623d52e1a5..99f5a26c269c 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/function/udf/math/Floor.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/udf/math/Floor.java @@ -18,24 +18,29 @@ import io.confluent.ksql.function.udf.Udf; import io.confluent.ksql.function.udf.UdfDescription; import io.confluent.ksql.function.udf.UdfParameter; +import io.confluent.ksql.function.udf.UdfSchemaProvider; +import io.confluent.ksql.schema.ksql.SqlBaseType; +import io.confluent.ksql.schema.ksql.types.SqlType; +import io.confluent.ksql.util.KsqlException; import java.math.BigDecimal; +import java.math.RoundingMode; +import java.util.List; @UdfDescription(name = "Floor", description = Floor.DESCRIPTION) public class Floor { static final String DESCRIPTION = "Returns the largest integer less than or equal to the " - + "specified numeric expression. NOTE: for backwards compatibility, this returns a DOUBLE " - + "that has a mantissa of zero."; + + "specified numeric expression."; @Udf - public Double floor(@UdfParameter final Integer val) { - return (val == null) ? null : Math.floor(val); + public Integer floor(@UdfParameter final Integer val) { + return val; } @Udf - public Double floor(@UdfParameter final Long val) { - return (val == null) ? null : Math.floor(val); + public Long floor(@UdfParameter final Long val) { + return val; } @Udf @@ -43,9 +48,21 @@ public Double floor(@UdfParameter final Double val) { return (val == null) ? null : Math.floor(val); } - @Udf - public Double floor(@UdfParameter final BigDecimal val) { - return (val == null) ? null : Math.floor(val.doubleValue()); + @Udf(schemaProvider = "floorDecimalProvider") + public BigDecimal floor(@UdfParameter final BigDecimal val) { + return val == null + ? null + : val.setScale(0, RoundingMode.FLOOR).setScale(val.scale(), RoundingMode.UNNECESSARY); + } + + @UdfSchemaProvider + public SqlType floorDecimalProvider(final List params) { + final SqlType s = params.get(0); + if (s.baseType() != SqlBaseType.DECIMAL) { + throw new KsqlException("The schema provider method for Floor expects a BigDecimal parameter" + + "type"); + } + return s; } } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/function/InternalFunctionRegistryTest.java b/ksql-engine/src/test/java/io/confluent/ksql/function/InternalFunctionRegistryTest.java index 9492bf78f5ef..ce722f073c77 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/function/InternalFunctionRegistryTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/function/InternalFunctionRegistryTest.java @@ -319,7 +319,7 @@ public void shouldHaveBuiltInUDFRegistered() { // String UDF "LCASE", "UCASE", "CONCAT", "TRIM", "IFNULL", "LEN", // Math UDF - "CEIL", "RANDOM", + "RANDOM", // JSON UDF "EXTRACTJSONFIELD", "ARRAYCONTAINS", // Struct UDF diff --git a/ksql-engine/src/test/java/io/confluent/ksql/function/udf/math/AbsTest.java b/ksql-engine/src/test/java/io/confluent/ksql/function/udf/math/AbsTest.java index 671ff00748e6..767a4ef46718 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/function/udf/math/AbsTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/function/udf/math/AbsTest.java @@ -42,16 +42,16 @@ public void shouldHandleNull() { @Test public void shouldHandleNegative() { - assertThat(udf.abs(-1), is(1.0)); - assertThat(udf.abs(-1L), is(1.0)); + assertThat(udf.abs(-1), is(1)); + assertThat(udf.abs(-1L), is(1L)); assertThat(udf.abs(-1.0), is(1.0)); assertThat(udf.abs(new BigDecimal(-1)), is(new BigDecimal(-1).abs())); } @Test public void shouldHandlePositive() { - assertThat(udf.abs(1), is(1.0)); - assertThat(udf.abs(1L), is(1.0)); + assertThat(udf.abs(1), is(1)); + assertThat(udf.abs(1L), is(1L)); assertThat(udf.abs(1.0), is(1.0)); assertThat(udf.abs(new BigDecimal(1)), is(new BigDecimal(1).abs())); } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/integration/SelectValueMapperIntegrationTest.java b/ksql-engine/src/test/java/io/confluent/ksql/integration/SelectValueMapperIntegrationTest.java index 0dceb0db03f9..3072e040eb85 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/integration/SelectValueMapperIntegrationTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/integration/SelectValueMapperIntegrationTest.java @@ -24,6 +24,7 @@ import io.confluent.ksql.execution.streams.SelectValueMapperFactory; import io.confluent.ksql.execution.util.StructKeyUtil; import io.confluent.ksql.function.InternalFunctionRegistry; +import io.confluent.ksql.function.TestFunctionRegistry; import io.confluent.ksql.logging.processing.ProcessingLogger; import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.planner.plan.PlanNode; @@ -47,7 +48,7 @@ public class SelectValueMapperIntegrationTest { private static final Struct NON_WINDOWED_KEY = StructKeyUtil.asStructKey("someKey"); private final MetaStore metaStore = MetaStoreFixture - .getNewMetaStore(new InternalFunctionRegistry()); + .getNewMetaStore(TestFunctionRegistry.INSTANCE.get()); private final KsqlConfig ksqlConfig = new KsqlConfig(Collections.emptyMap()); @@ -99,7 +100,7 @@ private KsqlValueTransformerWithKey givenSelectMapperFor(final String qu selectExpressions, schema, ksqlConfig, - new InternalFunctionRegistry() + TestFunctionRegistry.INSTANCE.get() ).getTransformer(processingLogger); } diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/average_-_calculate_average_in_select b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/average_-_calculate_average_in_select index d2e01ad85b1a..e62b2476b806 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/average_-_calculate_average_in_select +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/average_-_calculate_average_in_select @@ -42,7 +42,7 @@ CONFIGS_END CTAS_AVG_0.KsqlTopic.source = STRUCT NOT NULL CTAS_AVG_0.Aggregate.groupby = STRUCT NOT NULL CTAS_AVG_0.Aggregate.aggregate = STRUCT NOT NULL -CTAS_AVG_0.AVG = STRUCT NOT NULL +CTAS_AVG_0.AVG = STRUCT NOT NULL SCHEMAS_END Topologies: Sub-topology: 0 diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/explode_-_udfs_with_table_functions_and_no_aliases,_verifies_intermediate_generated_column_names_don't_clash_with_aliases b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/explode_-_udfs_with_table_functions_and_no_aliases,_verifies_intermediate_generated_column_names_don't_clash_with_aliases index 5d8dafee5e28..9bb5954e61f0 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/explode_-_udfs_with_table_functions_and_no_aliases,_verifies_intermediate_generated_column_names_don't_clash_with_aliases +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/explode_-_udfs_with_table_functions_and_no_aliases,_verifies_intermediate_generated_column_names_don't_clash_with_aliases @@ -40,7 +40,7 @@ } CONFIGS_END CSAS_OUTPUT_0.KsqlTopic.source = STRUCT, F1 BIGINT, F2 BIGINT> NOT NULL -CSAS_OUTPUT_0.OUTPUT = STRUCT NOT NULL +CSAS_OUTPUT_0.OUTPUT = STRUCT NOT NULL SCHEMAS_END Topologies: Sub-topology: 0 diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/math_-_abs b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/math_-_abs index 58c0987863de..91e1a4a634ac 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/math_-_abs +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/math_-_abs @@ -40,7 +40,7 @@ } CONFIGS_END CSAS_OUTPUT_0.KsqlTopic.source = STRUCT NOT NULL -CSAS_OUTPUT_0.OUTPUT = STRUCT NOT NULL +CSAS_OUTPUT_0.OUTPUT = STRUCT NOT NULL SCHEMAS_END Topologies: Sub-topology: 0 diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/math_-_calculate_CEIL_function b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/math_-_calculate_CEIL_function index 71922c5f6122..91e1a4a634ac 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/math_-_calculate_CEIL_function +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/math_-_calculate_CEIL_function @@ -39,12 +39,12 @@ "ksql.query.persistent.active.limit" : "2147483647" } CONFIGS_END -CSAS_OUTPUT_0.KsqlTopic.source = STRUCT NOT NULL -CSAS_OUTPUT_0.OUTPUT = STRUCT NOT NULL +CSAS_OUTPUT_0.KsqlTopic.source = STRUCT NOT NULL +CSAS_OUTPUT_0.OUTPUT = STRUCT NOT NULL SCHEMAS_END Topologies: Sub-topology: 0 - Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + Source: KSTREAM-SOURCE-0000000000 (topics: [input]) --> KSTREAM-TRANSFORMVALUES-0000000001 Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) --> SELECT-0 diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/math_-_floor b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/math_-_floor index a4638305e39a..91e1a4a634ac 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/math_-_floor +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/math_-_floor @@ -40,7 +40,7 @@ } CONFIGS_END CSAS_OUTPUT_0.KsqlTopic.source = STRUCT NOT NULL -CSAS_OUTPUT_0.OUTPUT = STRUCT NOT NULL +CSAS_OUTPUT_0.OUTPUT = STRUCT NOT NULL SCHEMAS_END Topologies: Sub-topology: 0 diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/quoted-identifiers_-_udf_using_fields_that_require_quotes b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/quoted-identifiers_-_udf_using_fields_that_require_quotes index 173ad49d8989..93c86a5c5923 100644 --- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/quoted-identifiers_-_udf_using_fields_that_require_quotes +++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/quoted-identifiers_-_udf_using_fields_that_require_quotes @@ -40,7 +40,7 @@ } CONFIGS_END CSAS_OUTPUT_0.KsqlTopic.source = STRUCT