Skip to content

Commit

Permalink
fix: reject mismatched decimals from avro topics (#7544)
Browse files Browse the repository at this point in the history
* fix: reject mismatched decimals from avro topics

* Address review comments
  • Loading branch information
Zara Lim authored May 19, 2021
1 parent a4b47be commit 85ba0f1
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import io.confluent.ksql.serde.connect.ConnectDataTranslator;
import io.confluent.ksql.serde.connect.DataTranslator;
import io.confluent.ksql.util.DecimalUtil;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -117,7 +119,8 @@ private static Object replaceSchema(final Schema schema, final Object object) {

case STRUCT:
return convertStruct((Struct) object, schema);

case BYTES:
return DecimalUtil.ensureFit((BigDecimal) object, schema);
default:
return object;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@

package io.confluent.ksql.serde.avro;

import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThrows;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.util.DecimalUtil;
import io.confluent.ksql.util.KsqlException;
import java.math.BigDecimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
Expand Down Expand Up @@ -288,4 +293,58 @@ public void shouldDropOptionalFromRootStructSchema() {
// Then:
assertThat("Root required", translator.getAvroCompatibleSchema().isOptional(), is(false));
}

@Test
public void shouldRejectUnmatchingDecimalSchema() {
// Given:
final Schema ksqlSchema = DecimalUtil.builder(4, 2).build();
final Schema topicSchema = DecimalUtil.builder(6, 3).build();

// When:
final AvroDataTranslator translator =
new AvroDataTranslator(ksqlSchema, AvroProperties.DEFAULT_AVRO_SCHEMA_FULL_NAME);

// Then:
final ArithmeticException e = assertThrows(
ArithmeticException.class,
() -> translator.toKsqlRow(topicSchema, new BigDecimal("123.456")));
assertThat(
e.getMessage(),
containsString("Numeric field overflow: A field with precision 4 and scale 2 must round to an absolute value less than 10^2"));
}

@Test
public void shouldForceUnmatchingDecimalSchemaIfPossible() {
// Given:
final Schema ksqlSchema = DecimalUtil.builder(4, 2).build();
final Schema topicSchema = DecimalUtil.builder(2, 1).build();

// When:
final AvroDataTranslator translator =
new AvroDataTranslator(ksqlSchema, AvroProperties.DEFAULT_AVRO_SCHEMA_FULL_NAME);

// Then:
assertThat(
translator.toKsqlRow(topicSchema, new BigDecimal("12.1")),
is(new BigDecimal("12.10")));
}

@Test
public void shouldRejectConversionsRequiringRounding() {
// Given:
final Schema ksqlSchema = DecimalUtil.builder(3, 0).build();
final Schema topicSchema = DecimalUtil.builder(4, 1).build();

// When:
final AvroDataTranslator translator =
new AvroDataTranslator(ksqlSchema, AvroProperties.DEFAULT_AVRO_SCHEMA_FULL_NAME);

// Then:
final KsqlException e = assertThrows(
KsqlException.class,
() -> translator.toKsqlRow(topicSchema, new BigDecimal("123.4")));
assertThat(
e.getMessage(),
containsString("Cannot fit decimal '123.4' into DECIMAL(3, 0) without rounding. (Requires 4,1)"));
}
}

0 comments on commit 85ba0f1

Please sign in to comment.