diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/InternalFunctionRegistry.java b/ksql-engine/src/main/java/io/confluent/ksql/function/InternalFunctionRegistry.java
index 54fa8a34ccb9..1f7cac08fdb4 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/function/InternalFunctionRegistry.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/function/InternalFunctionRegistry.java
@@ -29,7 +29,6 @@
 import io.confluent.ksql.function.udf.UdfMetadata;
 import io.confluent.ksql.function.udf.json.ArrayContainsKudf;
 import io.confluent.ksql.function.udf.json.JsonExtractStringKudf;
-import io.confluent.ksql.function.udf.math.CeilKudf;
 import io.confluent.ksql.function.udf.math.RandomKudf;
 import io.confluent.ksql.function.udf.string.ConcatKudf;
 import io.confluent.ksql.function.udf.string.IfNullKudf;
@@ -62,6 +61,7 @@ public InternalFunctionRegistry() {
     new BuiltInInitializer(this).init();
   }
 
+  @Override
   public synchronized UdfFactory getUdfFactory(final String functionName) {
     final UdfFactory udfFactory = udfs.get(functionName.toUpperCase());
     if (udfFactory == null) {
@@ -107,6 +107,7 @@ public synchronized boolean isAggregate(final String functionName) {
     return udafs.containsKey(functionName.toUpperCase());
   }
 
+  @Override
   public synchronized boolean isTableFunction(final String functionName) {
     return udtfs.containsKey(functionName.toUpperCase());
   }
@@ -308,14 +309,6 @@ private void addStringFunctions() {
     }
 
     private void addMathFunctions() {
-
-      addBuiltInFunction(KsqlScalarFunction.createLegacyBuiltIn(
-          SqlTypes.DOUBLE,
-          Collections.singletonList(ParamTypes.DOUBLE),
-          FunctionName.of("CEIL"),
-          CeilKudf.class
-      ));
-
       addBuiltInFunction(KsqlScalarFunction.createLegacyBuiltIn(
           SqlTypes.DOUBLE,
           Collections.emptyList(),
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/udf/math/Abs.java b/ksql-engine/src/main/java/io/confluent/ksql/function/udf/math/Abs.java
index bf4566a66a2d..893329543ece 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/function/udf/math/Abs.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/function/udf/math/Abs.java
@@ -34,13 +34,13 @@ public class Abs {
 
 
   @Udf
-  public Double abs(@UdfParameter final Integer val) {
-    return (val == null) ? null : (double)Math.abs(val);
+  public Integer abs(@UdfParameter final Integer val) {
+    return (val == null) ? null : Math.abs(val);
   }
 
   @Udf
-  public Double abs(@UdfParameter final Long val) {
-    return (val == null) ? null : (double)Math.abs(val);
+  public Long abs(@UdfParameter final Long val) {
+    return (val == null) ? null : Math.abs(val);
   }
 
   @Udf
@@ -48,13 +48,13 @@ public Double abs(@UdfParameter final Double val) {
     return (val == null) ? null : Math.abs(val);
   }
 
-  @Udf(schemaProvider = "provideSchema")
+  @Udf(schemaProvider = "absDecimalProvider")
   public BigDecimal abs(@UdfParameter final BigDecimal val) {
     return (val == null) ? null : val.abs();
   }
 
   @UdfSchemaProvider
-  public SqlType provideSchema(final List<SqlType> params) {
+  public SqlType absDecimalProvider(final List<SqlType> params) {
     final SqlType s = params.get(0);
     if (s.baseType() != SqlBaseType.DECIMAL) {
       throw new KsqlException("The schema provider method for Abs expects a BigDecimal parameter"
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/udf/math/Ceil.java b/ksql-engine/src/main/java/io/confluent/ksql/function/udf/math/Ceil.java
new file mode 100644
index 000000000000..422d3a8a474e
--- /dev/null
+++ b/ksql-engine/src/main/java/io/confluent/ksql/function/udf/math/Ceil.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2019 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License.  You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.function.udf.math;
+
+import io.confluent.ksql.function.udf.Udf;
+import io.confluent.ksql.function.udf.UdfDescription;
+import io.confluent.ksql.function.udf.UdfParameter;
+import io.confluent.ksql.function.udf.UdfSchemaProvider;
+import io.confluent.ksql.schema.ksql.SqlBaseType;
+import io.confluent.ksql.schema.ksql.types.SqlType;
+import io.confluent.ksql.util.KsqlException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.List;
+
+@UdfDescription(name = "Ceil", description = Ceil.DESCRIPTION)
+public class Ceil {
+
+  static final String DESCRIPTION = "Returns the smallest integer greater than or equal to the "
+      + "specified numeric expression.";
+
+  @Udf
+  public Integer ceil(@UdfParameter final Integer val) {
+    return val;
+  }
+
+  @Udf
+  public Long ceil(@UdfParameter final Long val) {
+    return val;
+  }
+
+  @Udf
+  public Double ceil(@UdfParameter final Double val) {
+    return (val == null) ? null : Math.ceil(val);
+  }
+
+  @Udf(schemaProvider = "ceilDecimalProvider")
+  public BigDecimal ceil(@UdfParameter final BigDecimal val) {
+    return val == null
+        ? null
+        : val.setScale(0, RoundingMode.CEILING).setScale(val.scale(), RoundingMode.UNNECESSARY);
+  }
+
+  @UdfSchemaProvider
+  public SqlType ceilDecimalProvider(final List<SqlType> params) {
+    final SqlType s = params.get(0);
+    if (s.baseType() != SqlBaseType.DECIMAL) {
+      throw new KsqlException("The schema provider method for Ceil expects a BigDecimal parameter"
+          + "type");
+    }
+    return s;
+  }
+
+}
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/udf/math/CeilKudf.java b/ksql-engine/src/main/java/io/confluent/ksql/function/udf/math/CeilKudf.java
deleted file mode 100644
index 14a4b3efd657..000000000000
--- a/ksql-engine/src/main/java/io/confluent/ksql/function/udf/math/CeilKudf.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright 2018 Confluent Inc.
- *
- * Licensed under the Confluent Community License (the "License"); you may not use
- * this file except in compliance with the License.  You may obtain a copy of the
- * License at
- *
- * http://www.confluent.io/confluent-community-license
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OF ANY KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package io.confluent.ksql.function.udf.math;
-
-import io.confluent.ksql.function.KsqlFunctionException;
-import io.confluent.ksql.function.udf.Kudf;
-
-public class CeilKudf implements Kudf {
-
-  @Override
-  public Object evaluate(final Object... args) {
-    if (args.length != 1) {
-      throw new KsqlFunctionException("Ceil udf should have one input argument.");
-    }
-    return Math.ceil((Double) args[0]);
-  }
-}
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/udf/math/Floor.java b/ksql-engine/src/main/java/io/confluent/ksql/function/udf/math/Floor.java
index 34623d52e1a5..99f5a26c269c 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/function/udf/math/Floor.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/function/udf/math/Floor.java
@@ -18,24 +18,29 @@
 import io.confluent.ksql.function.udf.Udf;
 import io.confluent.ksql.function.udf.UdfDescription;
 import io.confluent.ksql.function.udf.UdfParameter;
+import io.confluent.ksql.function.udf.UdfSchemaProvider;
+import io.confluent.ksql.schema.ksql.SqlBaseType;
+import io.confluent.ksql.schema.ksql.types.SqlType;
+import io.confluent.ksql.util.KsqlException;
 import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.List;
 
 @UdfDescription(name = "Floor", description = Floor.DESCRIPTION)
 public class Floor {
 
   static final String DESCRIPTION = "Returns the largest integer less than or equal to the "
-      + "specified numeric expression. NOTE: for backwards compatibility, this returns a DOUBLE "
-      + "that has a mantissa of zero.";
+      + "specified numeric expression.";
 
 
   @Udf
-  public Double floor(@UdfParameter final Integer val) {
-    return (val == null) ? null : Math.floor(val);
+  public Integer floor(@UdfParameter final Integer val) {
+    return val;
   }
 
   @Udf
-  public Double floor(@UdfParameter final Long val) {
-    return (val == null) ? null : Math.floor(val);
+  public Long floor(@UdfParameter final Long val) {
+    return val;
   }
 
   @Udf
@@ -43,9 +48,21 @@ public Double floor(@UdfParameter final Double val) {
     return (val == null) ? null : Math.floor(val);
   }
 
-  @Udf
-  public Double floor(@UdfParameter final BigDecimal val) {
-    return (val == null) ? null : Math.floor(val.doubleValue());
+  @Udf(schemaProvider = "floorDecimalProvider")
+  public BigDecimal floor(@UdfParameter final BigDecimal val) {
+    return val == null
+        ? null
+        : val.setScale(0, RoundingMode.FLOOR).setScale(val.scale(), RoundingMode.UNNECESSARY);
+  }
+
+  @UdfSchemaProvider
+  public SqlType floorDecimalProvider(final List<SqlType> params) {
+    final SqlType s = params.get(0);
+    if (s.baseType() != SqlBaseType.DECIMAL) {
+      throw new KsqlException("The schema provider method for Floor expects a BigDecimal parameter"
+          + "type");
+    }
+    return s;
   }
 
 }
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/function/InternalFunctionRegistryTest.java b/ksql-engine/src/test/java/io/confluent/ksql/function/InternalFunctionRegistryTest.java
index 9492bf78f5ef..ce722f073c77 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/function/InternalFunctionRegistryTest.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/function/InternalFunctionRegistryTest.java
@@ -319,7 +319,7 @@ public void shouldHaveBuiltInUDFRegistered() {
         // String UDF
         "LCASE", "UCASE", "CONCAT", "TRIM", "IFNULL", "LEN",
         // Math UDF
-        "CEIL", "RANDOM",
+        "RANDOM",
         // JSON UDF
         "EXTRACTJSONFIELD", "ARRAYCONTAINS",
         // Struct UDF
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/function/udf/math/AbsTest.java b/ksql-engine/src/test/java/io/confluent/ksql/function/udf/math/AbsTest.java
index 671ff00748e6..767a4ef46718 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/function/udf/math/AbsTest.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/function/udf/math/AbsTest.java
@@ -42,16 +42,16 @@ public void shouldHandleNull() {
 
   @Test
   public void shouldHandleNegative() {
-    assertThat(udf.abs(-1), is(1.0));
-    assertThat(udf.abs(-1L), is(1.0));
+    assertThat(udf.abs(-1), is(1));
+    assertThat(udf.abs(-1L), is(1L));
     assertThat(udf.abs(-1.0), is(1.0));
     assertThat(udf.abs(new BigDecimal(-1)), is(new BigDecimal(-1).abs()));
   }
 
   @Test
   public void shouldHandlePositive() {
-    assertThat(udf.abs(1), is(1.0));
-    assertThat(udf.abs(1L), is(1.0));
+    assertThat(udf.abs(1), is(1));
+    assertThat(udf.abs(1L), is(1L));
     assertThat(udf.abs(1.0), is(1.0));
     assertThat(udf.abs(new BigDecimal(1)), is(new BigDecimal(1).abs()));
   }
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/integration/SelectValueMapperIntegrationTest.java b/ksql-engine/src/test/java/io/confluent/ksql/integration/SelectValueMapperIntegrationTest.java
index 0dceb0db03f9..3072e040eb85 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/integration/SelectValueMapperIntegrationTest.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/integration/SelectValueMapperIntegrationTest.java
@@ -24,6 +24,7 @@
 import io.confluent.ksql.execution.streams.SelectValueMapperFactory;
 import io.confluent.ksql.execution.util.StructKeyUtil;
 import io.confluent.ksql.function.InternalFunctionRegistry;
+import io.confluent.ksql.function.TestFunctionRegistry;
 import io.confluent.ksql.logging.processing.ProcessingLogger;
 import io.confluent.ksql.metastore.MetaStore;
 import io.confluent.ksql.planner.plan.PlanNode;
@@ -47,7 +48,7 @@ public class SelectValueMapperIntegrationTest {
   private static final Struct NON_WINDOWED_KEY = StructKeyUtil.asStructKey("someKey");
 
   private final MetaStore metaStore = MetaStoreFixture
-      .getNewMetaStore(new InternalFunctionRegistry());
+      .getNewMetaStore(TestFunctionRegistry.INSTANCE.get());
 
   private final KsqlConfig ksqlConfig = new KsqlConfig(Collections.emptyMap());
 
@@ -99,7 +100,7 @@ private KsqlValueTransformerWithKey<Struct> givenSelectMapperFor(final String qu
         selectExpressions,
         schema,
         ksqlConfig,
-        new InternalFunctionRegistry()
+        TestFunctionRegistry.INSTANCE.get()
     ).getTransformer(processingLogger);
   }
 
diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/average_-_calculate_average_in_select b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/average_-_calculate_average_in_select
index d2e01ad85b1a..e62b2476b806 100644
--- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/average_-_calculate_average_in_select
+++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/average_-_calculate_average_in_select
@@ -42,7 +42,7 @@ CONFIGS_END
 CTAS_AVG_0.KsqlTopic.source = STRUCT<ID BIGINT, NAME VARCHAR, VALUE BIGINT> NOT NULL
 CTAS_AVG_0.Aggregate.groupby = STRUCT<KSQL_INTERNAL_COL_0 BIGINT, KSQL_INTERNAL_COL_1 BIGINT> NOT NULL
 CTAS_AVG_0.Aggregate.aggregate = STRUCT<KSQL_INTERNAL_COL_0 BIGINT, KSQL_INTERNAL_COL_1 BIGINT, KSQL_AGG_VARIABLE_0 BIGINT, KSQL_AGG_VARIABLE_1 BIGINT> NOT NULL
-CTAS_AVG_0.AVG = STRUCT<ID BIGINT, AVG DOUBLE> NOT NULL
+CTAS_AVG_0.AVG = STRUCT<ID BIGINT, AVG BIGINT> NOT NULL
 SCHEMAS_END
 Topologies:
    Sub-topology: 0
diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/explode_-_udfs_with_table_functions_and_no_aliases,_verifies_intermediate_generated_column_names_don't_clash_with_aliases b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/explode_-_udfs_with_table_functions_and_no_aliases,_verifies_intermediate_generated_column_names_don't_clash_with_aliases
index 5d8dafee5e28..9bb5954e61f0 100644
--- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/explode_-_udfs_with_table_functions_and_no_aliases,_verifies_intermediate_generated_column_names_don't_clash_with_aliases
+++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/explode_-_udfs_with_table_functions_and_no_aliases,_verifies_intermediate_generated_column_names_don't_clash_with_aliases
@@ -40,7 +40,7 @@
 }
 CONFIGS_END
 CSAS_OUTPUT_0.KsqlTopic.source = STRUCT<MY_ARR ARRAY<BIGINT>, F1 BIGINT, F2 BIGINT> NOT NULL
-CSAS_OUTPUT_0.OUTPUT = STRUCT<KSQL_COL_0 DOUBLE, KSQL_COL_1 BIGINT, KSQL_COL_2 DOUBLE> NOT NULL
+CSAS_OUTPUT_0.OUTPUT = STRUCT<KSQL_COL_0 BIGINT, KSQL_COL_1 BIGINT, KSQL_COL_2 BIGINT> NOT NULL
 SCHEMAS_END
 Topologies:
    Sub-topology: 0
diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/math_-_abs b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/math_-_abs
index 58c0987863de..91e1a4a634ac 100644
--- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/math_-_abs
+++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/math_-_abs
@@ -40,7 +40,7 @@
 }
 CONFIGS_END
 CSAS_OUTPUT_0.KsqlTopic.source = STRUCT<I INT, L BIGINT, D DOUBLE, B DECIMAL(2, 1)> NOT NULL
-CSAS_OUTPUT_0.OUTPUT = STRUCT<I DOUBLE, L DOUBLE, D DOUBLE, B DECIMAL(2, 1)> NOT NULL
+CSAS_OUTPUT_0.OUTPUT = STRUCT<I INT, L BIGINT, D DOUBLE, B DECIMAL(2, 1)> NOT NULL
 SCHEMAS_END
 Topologies:
    Sub-topology: 0
diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/math_-_calculate_CEIL_function b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/math_-_calculate_CEIL_function
index 71922c5f6122..91e1a4a634ac 100644
--- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/math_-_calculate_CEIL_function
+++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/math_-_calculate_CEIL_function
@@ -39,12 +39,12 @@
   "ksql.query.persistent.active.limit" : "2147483647"
 }
 CONFIGS_END
-CSAS_OUTPUT_0.KsqlTopic.source = STRUCT<V DOUBLE> NOT NULL
-CSAS_OUTPUT_0.OUTPUT = STRUCT<C0 DOUBLE> NOT NULL
+CSAS_OUTPUT_0.KsqlTopic.source = STRUCT<I INT, L BIGINT, D DOUBLE, B DECIMAL(2, 1)> NOT NULL
+CSAS_OUTPUT_0.OUTPUT = STRUCT<I INT, L BIGINT, D DOUBLE, B DECIMAL(2, 1)> NOT NULL
 SCHEMAS_END
 Topologies:
    Sub-topology: 0
-    Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic])
+    Source: KSTREAM-SOURCE-0000000000 (topics: [input])
       --> KSTREAM-TRANSFORMVALUES-0000000001
     Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
       --> SELECT-0
diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/math_-_floor b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/math_-_floor
index a4638305e39a..91e1a4a634ac 100644
--- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/math_-_floor
+++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/math_-_floor
@@ -40,7 +40,7 @@
 }
 CONFIGS_END
 CSAS_OUTPUT_0.KsqlTopic.source = STRUCT<I INT, L BIGINT, D DOUBLE, B DECIMAL(2, 1)> NOT NULL
-CSAS_OUTPUT_0.OUTPUT = STRUCT<I DOUBLE, L DOUBLE, D DOUBLE, B DOUBLE> NOT NULL
+CSAS_OUTPUT_0.OUTPUT = STRUCT<I INT, L BIGINT, D DOUBLE, B DECIMAL(2, 1)> NOT NULL
 SCHEMAS_END
 Topologies:
    Sub-topology: 0
diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/quoted-identifiers_-_udf_using_fields_that_require_quotes b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/quoted-identifiers_-_udf_using_fields_that_require_quotes
index 173ad49d8989..93c86a5c5923 100644
--- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/quoted-identifiers_-_udf_using_fields_that_require_quotes
+++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/quoted-identifiers_-_udf_using_fields_that_require_quotes
@@ -40,7 +40,7 @@
 }
 CONFIGS_END
 CSAS_OUTPUT_0.KsqlTopic.source = STRUCT<SELECT INT> NOT NULL
-CSAS_OUTPUT_0.OUTPUT = STRUCT<FOO DOUBLE> NOT NULL
+CSAS_OUTPUT_0.OUTPUT = STRUCT<FOO INT> NOT NULL
 SCHEMAS_END
 Topologies:
    Sub-topology: 0
diff --git a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/table-functions_-_table_functions_with_complex_expressions b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/table-functions_-_table_functions_with_complex_expressions
index 7d3926b6547e..d6e04bbcb18d 100644
--- a/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/table-functions_-_table_functions_with_complex_expressions
+++ b/ksql-functional-tests/src/test/resources/expected_topology/0_6_0-pre/table-functions_-_table_functions_with_complex_expressions
@@ -40,7 +40,7 @@
 }
 CONFIGS_END
 CSAS_OUTPUT_0.KsqlTopic.source = STRUCT<F0 INT, F1 INT, F2 INT, F3 INT> NOT NULL
-CSAS_OUTPUT_0.OUTPUT = STRUCT<F0 INT, KSQL_COL_1 DOUBLE> NOT NULL
+CSAS_OUTPUT_0.OUTPUT = STRUCT<F0 INT, KSQL_COL_1 INT> NOT NULL
 SCHEMAS_END
 Topologies:
    Sub-topology: 0
diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/average.json b/ksql-functional-tests/src/test/resources/query-validation-tests/average.json
index 05dc5fb26ddb..0858f46b31b7 100644
--- a/ksql-functional-tests/src/test/resources/query-validation-tests/average.json
+++ b/ksql-functional-tests/src/test/resources/query-validation-tests/average.json
@@ -22,11 +22,11 @@
         {"topic": "test_topic", "key": 1, "value": "1,one,10"}
       ],
       "outputs": [
-        {"topic": "AVG", "key": 0, "value": "0,500.0"},
-        {"topic": "AVG", "key": 0, "value": "0,300.0"},
-        {"topic": "AVG", "key": 0, "value": "0,250.0"},
-        {"topic": "AVG", "key": 1, "value": "1,1000.0"},
-        {"topic": "AVG", "key": 1, "value": "1,550.0"}
+        {"topic": "AVG", "key": 0, "value": "0,500"},
+        {"topic": "AVG", "key": 0, "value": "0,300"},
+        {"topic": "AVG", "key": 0, "value": "0,250"},
+        {"topic": "AVG", "key": 1, "value": "1,1000"},
+        {"topic": "AVG", "key": 1, "value": "1,550"}
       ]
     }
   ]
diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/explode.json b/ksql-functional-tests/src/test/resources/query-validation-tests/explode.json
index 004fbfd2c1f1..23d5edb4d5d3 100644
--- a/ksql-functional-tests/src/test/resources/query-validation-tests/explode.json
+++ b/ksql-functional-tests/src/test/resources/query-validation-tests/explode.json
@@ -38,8 +38,8 @@
         {"topic": "test_topic", "key": 0, "value": {"F1": 1, "F2": 2, "MY_ARR": [1, 2]}}
       ],
       "outputs": [
-        {"topic": "OUTPUT", "key": "0", "value": {"KSQL_COL_0": 1.0, "KSQL_COL_1": 1, "KSQL_COL_2": 2.0}},
-        {"topic": "OUTPUT", "key": "0", "value": {"KSQL_COL_0": 1.0, "KSQL_COL_1": 2, "KSQL_COL_2": 2.0}}
+        {"topic": "OUTPUT", "key": "0", "value": {"KSQL_COL_0": 1, "KSQL_COL_1": 1, "KSQL_COL_2": 2}},
+        {"topic": "OUTPUT", "key": "0", "value": {"KSQL_COL_0": 1, "KSQL_COL_1": 2, "KSQL_COL_2": 2}}
       ]
     },
     {
diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/math.json b/ksql-functional-tests/src/test/resources/query-validation-tests/math.json
index f48ce60b5ea3..0c148d4bc697 100644
--- a/ksql-functional-tests/src/test/resources/query-validation-tests/math.json
+++ b/ksql-functional-tests/src/test/resources/query-validation-tests/math.json
@@ -102,43 +102,55 @@
       ],
       "outputs": [
         {"topic": "OUTPUT", "value": {"I": null, "L": null, "D": null, "B":  null}},
-        {"topic": "OUTPUT", "value": {"I": 0.0, "L": 0.0, "D": 0.0, "B": 0.0}},
-        {"topic": "OUTPUT", "value": {"I": 1.0, "L": 1.0, "D": 1.0, "B": 1.0}},
-        {"topic": "OUTPUT", "value": {"I": 1.0, "L": 1.0, "D": 1.0, "B": 1.0}},
-        {"topic": "OUTPUT", "value": {"I": 1.0, "L": 1.0, "D": 1.0, "B": 1.0}},
-        {"topic": "OUTPUT", "value": {"I": 1.0, "L": 1.0, "D": 1.0, "B": 1.0}},
-        {"topic": "OUTPUT", "value": {"I": 1.0, "L": 1.0, "D": 2.0, "B": 2.0}},
+        {"topic": "OUTPUT", "value": {"I": 0, "L": 0, "D": 0.0, "B": 0.0}},
+        {"topic": "OUTPUT", "value": {"I": 1, "L": 1, "D": 1.0, "B": 1.0}},
+        {"topic": "OUTPUT", "value": {"I": 1, "L": 1, "D": 1.0, "B": 1.0}},
+        {"topic": "OUTPUT", "value": {"I": 1, "L": 1, "D": 1.0, "B": 1.0}},
+        {"topic": "OUTPUT", "value": {"I": 1, "L": 1, "D": 1.0, "B": 1.0}},
+        {"topic": "OUTPUT", "value": {"I": 1, "L": 1, "D": 2.0, "B": 2.0}},
 
-        {"topic": "OUTPUT", "value": {"I": -1.0, "L": -1.0, "D": -1.0, "B": -1.0}},
-        {"topic": "OUTPUT", "value": {"I": -1.0, "L": -1.0, "D": -2.0, "B": -2.0}},
-        {"topic": "OUTPUT", "value": {"I": -1.0, "L": -1.0, "D": -2.0, "B": -2.0}},
-        {"topic": "OUTPUT", "value": {"I": -1.0, "L": -1.0, "D": -2.0, "B": -2.0}},
-        {"topic": "OUTPUT", "value": {"I": -1.0, "L": -1.0, "D": -2.0, "B": -2.0}}
+        {"topic": "OUTPUT", "value": {"I": -1, "L": -1, "D": -1.0, "B": -1.0}},
+        {"topic": "OUTPUT", "value": {"I": -1, "L": -1, "D": -2.0, "B": -2.0}},
+        {"topic": "OUTPUT", "value": {"I": -1, "L": -1, "D": -2.0, "B": -2.0}},
+        {"topic": "OUTPUT", "value": {"I": -1, "L": -1, "D": -2.0, "B": -2.0}},
+        {"topic": "OUTPUT", "value": {"I": -1, "L": -1, "D": -2.0, "B": -2.0}}
       ]
     },
     {
       "name": "calculate CEIL function",
       "statements": [
-        "CREATE STREAM test (v DOUBLE) WITH (kafka_topic='test_topic', value_format='JSON');",
-        "CREATE STREAM OUTPUT AS SELECT CEIL(v) as C0 FROM test;"
+        "CREATE STREAM INPUT (i INT, l BIGINT, d DOUBLE, b DECIMAL(2,1)) WITH (kafka_topic='input', value_format='AVRO');",
+        "CREATE STREAM OUTPUT AS SELECT ceil(i) i, ceil(l) l, ceil(d) d, ceil(b) b FROM INPUT;"
       ],
       "inputs": [
-        {"topic": "test_topic", "value": {"v" : 1.2}},
-        {"topic": "test_topic", "value": {"v" : 1.7}},
-        {"topic": "test_topic", "value": {"v" : 1.5}},
-        {"topic": "test_topic", "value": {"v" : 3}},
-        {"topic": "test_topic", "value": {"v" : 1.234567}},
-        {"topic": "test_topic", "value": {"v" : 0}},
-        {"topic": "test_topic", "value": {"v" : null}}
+        {"topic": "input", "value": {"i": null, "l": null, "d": null}},
+        {"topic": "input", "value": {"i": 0, "l": 0, "d": 0.0, "b":  "0.0"}},
+        {"topic": "input", "value": {"i": 1, "l": 1, "d": 1.0, "b": "1.0"}},
+        {"topic": "input", "value": {"i": 1, "l": 1, "d": 1.1, "b": "1.1"}},
+        {"topic": "input", "value": {"i": 1, "l": 1, "d": 1.5, "b": "1.5"}},
+        {"topic": "input", "value": {"i": 1, "l": 1, "d": 1.7, "b": "1.7"}},
+        {"topic": "input", "value": {"i": 1, "l": 1, "d": 2.0, "b": "2.0"}},
+
+        {"topic": "input", "value": {"i": -1, "l": -1, "d": -1.0, "b": "-1.0"}},
+        {"topic": "input", "value": {"i": -1, "l": -1, "d": -1.1, "b": "-1.1"}},
+        {"topic": "input", "value": {"i": -1, "l": -1, "d": -1.5, "b": "-1.5"}},
+        {"topic": "input", "value": {"i": -1, "l": -1, "d": -1.7, "b": "-1.7"}},
+        {"topic": "input", "value": {"i": -1, "l": -1, "d": -2.0, "b": "-2.0"}}
       ],
       "outputs": [
-        {"topic": "OUTPUT", "value": {"C0" : 2.0}},
-        {"topic": "OUTPUT", "value": {"C0" : 2.0}},
-        {"topic": "OUTPUT", "value": {"C0" : 2.0}},
-        {"topic": "OUTPUT", "value": {"C0" : 3.0}},
-        {"topic": "OUTPUT", "value": {"C0" : 2.0}},
-        {"topic": "OUTPUT", "value": {"C0" : 0.0}},
-        {"topic": "OUTPUT", "value": {"C0" : null}}
+        {"topic": "OUTPUT", "value": {"I": null, "L": null, "D": null, "B":  null}},
+        {"topic": "OUTPUT", "value": {"I": 0, "L": 0, "D": 0.0, "B": 0.0}},
+        {"topic": "OUTPUT", "value": {"I": 1, "L": 1, "D": 1.0, "B": 1.0}},
+        {"topic": "OUTPUT", "value": {"I": 1, "L": 1, "D": 2.0, "B": 2.0}},
+        {"topic": "OUTPUT", "value": {"I": 1, "L": 1, "D": 2.0, "B": 2.0}},
+        {"topic": "OUTPUT", "value": {"I": 1, "L": 1, "D": 2.0, "B": 2.0}},
+        {"topic": "OUTPUT", "value": {"I": 1, "L": 1, "D": 2.0, "B": 2.0}},
+
+        {"topic": "OUTPUT", "value": {"I": -1, "L": -1, "D": -1.0, "B": -1.0}},
+        {"topic": "OUTPUT", "value": {"I": -1, "L": -1, "D": -1.0, "B": -1.0}},
+        {"topic": "OUTPUT", "value": {"I": -1, "L": -1, "D": -1.0, "B": -1.0}},
+        {"topic": "OUTPUT", "value": {"I": -1, "L": -1, "D": -1.0, "B": -1.0}},
+        {"topic": "OUTPUT", "value": {"I": -1, "L": -1, "D": -2.0, "B": -2.0}}
       ]
     },
     {
@@ -155,9 +167,9 @@
       ],
       "outputs": [
         {"topic": "OUTPUT", "value": {"I": null, "L": null, "D": null, "B":  null}},
-        {"topic": "OUTPUT", "value": {"I": 1.0, "L": 2.0, "D": 3.1, "B": "3.2"}},
-        {"topic": "OUTPUT", "value": {"I": 0.0, "L": 0.0, "D": 0.0, "B": "0.0"}},
-        {"topic": "OUTPUT", "value": {"I": 1.0, "L": 2.0, "D": 3.3, "B": "3.4"}}
+        {"topic": "OUTPUT", "value": {"I": 1, "L": 2, "D": 3.1, "B": "3.2"}},
+        {"topic": "OUTPUT", "value": {"I": 0, "L": 0, "D": 0.0, "B": "0.0"}},
+        {"topic": "OUTPUT", "value": {"I": 1, "L": 2, "D": 3.3, "B": "3.4"}}
       ]
     },
     {
diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/quoted-identifiers.json b/ksql-functional-tests/src/test/resources/query-validation-tests/quoted-identifiers.json
index 09c7f111e465..137822ec4290 100644
--- a/ksql-functional-tests/src/test/resources/query-validation-tests/quoted-identifiers.json
+++ b/ksql-functional-tests/src/test/resources/query-validation-tests/quoted-identifiers.json
@@ -39,7 +39,7 @@
         {"topic": "test_topic", "value": {"SELECT":  -2}}
       ],
       "outputs": [
-        {"topic": "OUTPUT", "value": { "FOO": 2.0}}
+        {"topic": "OUTPUT", "value": { "FOO": 2}}
       ]
     },
     {
diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/table-functions.json b/ksql-functional-tests/src/test/resources/query-validation-tests/table-functions.json
index c4733a59370e..e5c1265ed678 100644
--- a/ksql-functional-tests/src/test/resources/query-validation-tests/table-functions.json
+++ b/ksql-functional-tests/src/test/resources/query-validation-tests/table-functions.json
@@ -126,9 +126,9 @@
         {"topic": "test_topic", "key": 0, "value": {"ID":  0, "F0": 1, "F1": 10, "F2": 11, "F3":  12}}
       ],
       "outputs": [
-        {"topic": "OUTPUT", "key": "0", "value": {"F0": 1, "KSQL_COL_1": 21.0}},
-        {"topic": "OUTPUT", "key": "0", "value": {"F0": 1, "KSQL_COL_1": 23.0}},
-        {"topic": "OUTPUT", "key": "0", "value": {"F0": 1, "KSQL_COL_1": 22.0}}
+        {"topic": "OUTPUT", "key": "0", "value": {"F0": 1, "KSQL_COL_1": 21}},
+        {"topic": "OUTPUT", "key": "0", "value": {"F0": 1, "KSQL_COL_1": 23}},
+        {"topic": "OUTPUT", "key": "0", "value": {"F0": 1, "KSQL_COL_1": 22}}
       ]
     },
     {