Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: the Abs, Ceil and Floor methods now return proper types #3948

Merged
merged 1 commit into from
Nov 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import io.confluent.ksql.function.udf.UdfMetadata;
import io.confluent.ksql.function.udf.json.ArrayContainsKudf;
import io.confluent.ksql.function.udf.json.JsonExtractStringKudf;
import io.confluent.ksql.function.udf.math.CeilKudf;
import io.confluent.ksql.function.udf.math.RandomKudf;
import io.confluent.ksql.function.udf.string.ConcatKudf;
import io.confluent.ksql.function.udf.string.IfNullKudf;
Expand Down Expand Up @@ -62,6 +61,7 @@ public InternalFunctionRegistry() {
new BuiltInInitializer(this).init();
}

@Override
public synchronized UdfFactory getUdfFactory(final String functionName) {
final UdfFactory udfFactory = udfs.get(functionName.toUpperCase());
if (udfFactory == null) {
Expand Down Expand Up @@ -107,6 +107,7 @@ public synchronized boolean isAggregate(final String functionName) {
return udafs.containsKey(functionName.toUpperCase());
}

@Override
public synchronized boolean isTableFunction(final String functionName) {
return udtfs.containsKey(functionName.toUpperCase());
}
Expand Down Expand Up @@ -308,14 +309,6 @@ private void addStringFunctions() {
}

private void addMathFunctions() {

addBuiltInFunction(KsqlScalarFunction.createLegacyBuiltIn(
SqlTypes.DOUBLE,
Collections.singletonList(ParamTypes.DOUBLE),
FunctionName.of("CEIL"),
CeilKudf.class
));

addBuiltInFunction(KsqlScalarFunction.createLegacyBuiltIn(
SqlTypes.DOUBLE,
Collections.emptyList(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,27 @@ public class Abs {


@Udf
public Double abs(@UdfParameter final Integer val) {
return (val == null) ? null : (double)Math.abs(val);
public Integer abs(@UdfParameter final Integer val) {
return (val == null) ? null : Math.abs(val);
}

@Udf
public Double abs(@UdfParameter final Long val) {
return (val == null) ? null : (double)Math.abs(val);
public Long abs(@UdfParameter final Long val) {
return (val == null) ? null : Math.abs(val);
}

@Udf
public Double abs(@UdfParameter final Double val) {
return (val == null) ? null : Math.abs(val);
}

@Udf(schemaProvider = "provideSchema")
@Udf(schemaProvider = "absDecimalProvider")
public BigDecimal abs(@UdfParameter final BigDecimal val) {
return (val == null) ? null : val.abs();
}

@UdfSchemaProvider
public SqlType provideSchema(final List<SqlType> params) {
public SqlType absDecimalProvider(final List<SqlType> params) {
final SqlType s = params.get(0);
if (s.baseType() != SqlBaseType.DECIMAL) {
throw new KsqlException("The schema provider method for Abs expects a BigDecimal parameter"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.math;

import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.function.udf.UdfSchemaProvider;
import io.confluent.ksql.schema.ksql.SqlBaseType;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.util.KsqlException;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.List;

@UdfDescription(name = "Ceil", description = Ceil.DESCRIPTION)
public class Ceil {

static final String DESCRIPTION = "Returns the smallest integer greater than or equal to the "
+ "specified numeric expression.";

@Udf
public Integer ceil(@UdfParameter final Integer val) {
return val;
}

@Udf
public Long ceil(@UdfParameter final Long val) {
return val;
}

@Udf
public Double ceil(@UdfParameter final Double val) {
return (val == null) ? null : Math.ceil(val);
}

@Udf(schemaProvider = "ceilDecimalProvider")
public BigDecimal ceil(@UdfParameter final BigDecimal val) {
return val == null
? null
: val.setScale(0, RoundingMode.CEILING).setScale(val.scale(), RoundingMode.UNNECESSARY);
}

@UdfSchemaProvider
public SqlType ceilDecimalProvider(final List<SqlType> params) {
final SqlType s = params.get(0);
if (s.baseType() != SqlBaseType.DECIMAL) {
throw new KsqlException("The schema provider method for Ceil expects a BigDecimal parameter"
+ "type");
}
return s;
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,51 @@
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.function.udf.UdfSchemaProvider;
import io.confluent.ksql.schema.ksql.SqlBaseType;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.util.KsqlException;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.List;

@UdfDescription(name = "Floor", description = Floor.DESCRIPTION)
public class Floor {

static final String DESCRIPTION = "Returns the largest integer less than or equal to the "
+ "specified numeric expression. NOTE: for backwards compatibility, this returns a DOUBLE "
+ "that has a mantissa of zero.";
+ "specified numeric expression.";


@Udf
public Double floor(@UdfParameter final Integer val) {
return (val == null) ? null : Math.floor(val);
public Integer floor(@UdfParameter final Integer val) {
return val;
}

@Udf
public Double floor(@UdfParameter final Long val) {
return (val == null) ? null : Math.floor(val);
public Long floor(@UdfParameter final Long val) {
return val;
}

@Udf
public Double floor(@UdfParameter final Double val) {
return (val == null) ? null : Math.floor(val);
}

@Udf
public Double floor(@UdfParameter final BigDecimal val) {
return (val == null) ? null : Math.floor(val.doubleValue());
@Udf(schemaProvider = "floorDecimalProvider")
public BigDecimal floor(@UdfParameter final BigDecimal val) {
return val == null
? null
: val.setScale(0, RoundingMode.FLOOR).setScale(val.scale(), RoundingMode.UNNECESSARY);
}

@UdfSchemaProvider
public SqlType floorDecimalProvider(final List<SqlType> params) {
final SqlType s = params.get(0);
if (s.baseType() != SqlBaseType.DECIMAL) {
throw new KsqlException("The schema provider method for Floor expects a BigDecimal parameter"
+ "type");
}
return s;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ public void shouldHaveBuiltInUDFRegistered() {
// String UDF
"LCASE", "UCASE", "CONCAT", "TRIM", "IFNULL", "LEN",
// Math UDF
"CEIL", "RANDOM",
"RANDOM",
// JSON UDF
"EXTRACTJSONFIELD", "ARRAYCONTAINS",
// Struct UDF
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,16 @@ public void shouldHandleNull() {

@Test
public void shouldHandleNegative() {
assertThat(udf.abs(-1), is(1.0));
assertThat(udf.abs(-1L), is(1.0));
assertThat(udf.abs(-1), is(1));
assertThat(udf.abs(-1L), is(1L));
assertThat(udf.abs(-1.0), is(1.0));
assertThat(udf.abs(new BigDecimal(-1)), is(new BigDecimal(-1).abs()));
}

@Test
public void shouldHandlePositive() {
assertThat(udf.abs(1), is(1.0));
assertThat(udf.abs(1L), is(1.0));
assertThat(udf.abs(1), is(1));
assertThat(udf.abs(1L), is(1L));
assertThat(udf.abs(1.0), is(1.0));
assertThat(udf.abs(new BigDecimal(1)), is(new BigDecimal(1).abs()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.confluent.ksql.execution.streams.SelectValueMapperFactory;
import io.confluent.ksql.execution.util.StructKeyUtil;
import io.confluent.ksql.function.InternalFunctionRegistry;
import io.confluent.ksql.function.TestFunctionRegistry;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.planner.plan.PlanNode;
Expand All @@ -47,7 +48,7 @@ public class SelectValueMapperIntegrationTest {
private static final Struct NON_WINDOWED_KEY = StructKeyUtil.asStructKey("someKey");

private final MetaStore metaStore = MetaStoreFixture
.getNewMetaStore(new InternalFunctionRegistry());
.getNewMetaStore(TestFunctionRegistry.INSTANCE.get());

private final KsqlConfig ksqlConfig = new KsqlConfig(Collections.emptyMap());

Expand Down Expand Up @@ -99,7 +100,7 @@ private KsqlValueTransformerWithKey<Struct> givenSelectMapperFor(final String qu
selectExpressions,
schema,
ksqlConfig,
new InternalFunctionRegistry()
TestFunctionRegistry.INSTANCE.get()
).getTransformer(processingLogger);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ CONFIGS_END
CTAS_AVG_0.KsqlTopic.source = STRUCT<ID BIGINT, NAME VARCHAR, VALUE BIGINT> NOT NULL
CTAS_AVG_0.Aggregate.groupby = STRUCT<KSQL_INTERNAL_COL_0 BIGINT, KSQL_INTERNAL_COL_1 BIGINT> NOT NULL
CTAS_AVG_0.Aggregate.aggregate = STRUCT<KSQL_INTERNAL_COL_0 BIGINT, KSQL_INTERNAL_COL_1 BIGINT, KSQL_AGG_VARIABLE_0 BIGINT, KSQL_AGG_VARIABLE_1 BIGINT> NOT NULL
CTAS_AVG_0.AVG = STRUCT<ID BIGINT, AVG DOUBLE> NOT NULL
CTAS_AVG_0.AVG = STRUCT<ID BIGINT, AVG BIGINT> NOT NULL
SCHEMAS_END
Topologies:
Sub-topology: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
}
CONFIGS_END
CSAS_OUTPUT_0.KsqlTopic.source = STRUCT<MY_ARR ARRAY<BIGINT>, F1 BIGINT, F2 BIGINT> NOT NULL
CSAS_OUTPUT_0.OUTPUT = STRUCT<KSQL_COL_0 DOUBLE, KSQL_COL_1 BIGINT, KSQL_COL_2 DOUBLE> NOT NULL
CSAS_OUTPUT_0.OUTPUT = STRUCT<KSQL_COL_0 BIGINT, KSQL_COL_1 BIGINT, KSQL_COL_2 BIGINT> NOT NULL
SCHEMAS_END
Topologies:
Sub-topology: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
}
CONFIGS_END
CSAS_OUTPUT_0.KsqlTopic.source = STRUCT<I INT, L BIGINT, D DOUBLE, B DECIMAL(2, 1)> NOT NULL
CSAS_OUTPUT_0.OUTPUT = STRUCT<I DOUBLE, L DOUBLE, D DOUBLE, B DECIMAL(2, 1)> NOT NULL
CSAS_OUTPUT_0.OUTPUT = STRUCT<I INT, L BIGINT, D DOUBLE, B DECIMAL(2, 1)> NOT NULL
SCHEMAS_END
Topologies:
Sub-topology: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@
"ksql.query.persistent.active.limit" : "2147483647"
}
CONFIGS_END
CSAS_OUTPUT_0.KsqlTopic.source = STRUCT<V DOUBLE> NOT NULL
CSAS_OUTPUT_0.OUTPUT = STRUCT<C0 DOUBLE> NOT NULL
CSAS_OUTPUT_0.KsqlTopic.source = STRUCT<I INT, L BIGINT, D DOUBLE, B DECIMAL(2, 1)> NOT NULL
CSAS_OUTPUT_0.OUTPUT = STRUCT<I INT, L BIGINT, D DOUBLE, B DECIMAL(2, 1)> NOT NULL
SCHEMAS_END
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic])
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-TRANSFORMVALUES-0000000001
Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
--> SELECT-0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
}
CONFIGS_END
CSAS_OUTPUT_0.KsqlTopic.source = STRUCT<I INT, L BIGINT, D DOUBLE, B DECIMAL(2, 1)> NOT NULL
CSAS_OUTPUT_0.OUTPUT = STRUCT<I DOUBLE, L DOUBLE, D DOUBLE, B DOUBLE> NOT NULL
CSAS_OUTPUT_0.OUTPUT = STRUCT<I INT, L BIGINT, D DOUBLE, B DECIMAL(2, 1)> NOT NULL
SCHEMAS_END
Topologies:
Sub-topology: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
}
CONFIGS_END
CSAS_OUTPUT_0.KsqlTopic.source = STRUCT<SELECT INT> NOT NULL
CSAS_OUTPUT_0.OUTPUT = STRUCT<FOO DOUBLE> NOT NULL
CSAS_OUTPUT_0.OUTPUT = STRUCT<FOO INT> NOT NULL
SCHEMAS_END
Topologies:
Sub-topology: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
}
CONFIGS_END
CSAS_OUTPUT_0.KsqlTopic.source = STRUCT<F0 INT, F1 INT, F2 INT, F3 INT> NOT NULL
CSAS_OUTPUT_0.OUTPUT = STRUCT<F0 INT, KSQL_COL_1 DOUBLE> NOT NULL
CSAS_OUTPUT_0.OUTPUT = STRUCT<F0 INT, KSQL_COL_1 INT> NOT NULL
SCHEMAS_END
Topologies:
Sub-topology: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
{"topic": "test_topic", "key": 1, "value": "1,one,10"}
],
"outputs": [
{"topic": "AVG", "key": 0, "value": "0,500.0"},
{"topic": "AVG", "key": 0, "value": "0,300.0"},
{"topic": "AVG", "key": 0, "value": "0,250.0"},
{"topic": "AVG", "key": 1, "value": "1,1000.0"},
{"topic": "AVG", "key": 1, "value": "1,550.0"}
{"topic": "AVG", "key": 0, "value": "0,500"},
{"topic": "AVG", "key": 0, "value": "0,300"},
{"topic": "AVG", "key": 0, "value": "0,250"},
{"topic": "AVG", "key": 1, "value": "1,1000"},
{"topic": "AVG", "key": 1, "value": "1,550"}
]
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
{"topic": "test_topic", "key": 0, "value": {"F1": 1, "F2": 2, "MY_ARR": [1, 2]}}
],
"outputs": [
{"topic": "OUTPUT", "key": "0", "value": {"KSQL_COL_0": 1.0, "KSQL_COL_1": 1, "KSQL_COL_2": 2.0}},
{"topic": "OUTPUT", "key": "0", "value": {"KSQL_COL_0": 1.0, "KSQL_COL_1": 2, "KSQL_COL_2": 2.0}}
{"topic": "OUTPUT", "key": "0", "value": {"KSQL_COL_0": 1, "KSQL_COL_1": 1, "KSQL_COL_2": 2}},
{"topic": "OUTPUT", "key": "0", "value": {"KSQL_COL_0": 1, "KSQL_COL_1": 2, "KSQL_COL_2": 2}}
]
},
{
Expand Down
Loading