From 869a362306456fb95ffdf77a15f0c8186047636a Mon Sep 17 00:00:00 2001 From: zhangdonghao Date: Wed, 13 Nov 2024 18:29:54 +0800 Subject: [PATCH 1/2] [Bug][connectors-v2]fix mongodb Bson convert exception #8042 --- ...MongoDBConnectorDeserializationSchema.java | 70 ++++++- ...oDBConnectorDeserializationSchemaTest.java | 191 ++++++++++++++++-- .../serde/BsonToRowDataConverters.java | 35 +++- .../serde/BsonToRowDataConvertersTest.java | 93 ++++++++- 4 files changed, 364 insertions(+), 25 deletions(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java index edbeca4fab6..823ed5d9ec1 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java @@ -343,6 +343,15 @@ public Object apply(BsonValue bsonValue) { return convertToLocalDateTime(bsonValue).toLocalDate(); } }; + case TIME: + return new SerializableFunction() { + private static final long serialVersionUID = 1L; + + @Override + public Object apply(BsonValue bsonValue) { + return convertToLocalDateTime(bsonValue).toLocalTime(); + } + }; case TIMESTAMP: return new SerializableFunction() { private static final long serialVersionUID = 1L; @@ -382,7 +391,7 @@ public Object apply(BsonValue bsonValue) { private static LocalDateTime convertToLocalDateTime(BsonValue bsonValue) { Instant instant; if (bsonValue.isTimestamp()) { - instant = Instant.ofEpochSecond(bsonValue.asTimestamp().getTime()); + instant = Instant.ofEpochSecond(bsonValue.asTimestamp().getValue()); } else if (bsonValue.isDateTime()) { instant = Instant.ofEpochMilli(bsonValue.asDateTime().getValue()); } else { @@ -521,7 +530,7 @@ private static boolean convertToBoolean(@Nonnull BsonValue bsonValue) { } private static double convertToDouble(@Nonnull BsonValue bsonValue) { - if (bsonValue.isDouble()) { + if (bsonValue.isNumber()) { return bsonValue.asNumber().doubleValue(); } throw new MongodbConnectorException( @@ -532,9 +541,20 @@ private static double convertToDouble(@Nonnull BsonValue bsonValue) { + bsonValue.getBsonType()); } - private static int convertToInt(@Nonnull BsonValue bsonValue) { + private static int convertToInt(BsonValue bsonValue) { if (bsonValue.isInt32()) { - return bsonValue.asNumber().intValue(); + return bsonValue.asInt32().getValue(); + } else if (bsonValue.isNumber()) { + long longValue = bsonValue.asNumber().longValue(); + if (longValue > Integer.MAX_VALUE || longValue < Integer.MIN_VALUE) { + throw new MongodbConnectorException( + UNSUPPORTED_DATA_TYPE, + "Unable to convert to integer from unexpected value '" + + bsonValue + + "' of type " + + bsonValue.getBsonType()); + } + return (int) longValue; } throw new MongodbConnectorException( UNSUPPORTED_DATA_TYPE, @@ -568,8 +588,19 @@ private static byte[] convertToBinary(@Nonnull BsonValue bsonValue) { "Unsupported BYTES value type: " + bsonValue.getClass().getSimpleName()); } - private static long convertToLong(@Nonnull BsonValue bsonValue) { - if (bsonValue.isInt64()) { + private static long convertToLong(BsonValue bsonValue) { + if (bsonValue.isInt64() || bsonValue.isInt32()) { + return bsonValue.asNumber().longValue(); + } else if (bsonValue.isDouble()) { + double value = bsonValue.asNumber().doubleValue(); + if (value > Long.MAX_VALUE || value < Long.MIN_VALUE) { + throw new MongodbConnectorException( + UNSUPPORTED_DATA_TYPE, + "Unable to convert to long from unexpected value '" + + bsonValue + + "' of type " + + bsonValue.getBsonType()); + } return bsonValue.asNumber().longValue(); } throw new MongodbConnectorException( @@ -599,4 +630,31 @@ private static BigDecimal convertToBigDecimal(@Nonnull BsonValue bsonValue) { + "' of type " + bsonValue.getBsonType()); } + + @VisibleForTesting + public Object convertToObject( + @Nonnull SeaTunnelDataType dataType, @Nonnull BsonValue bsonValue) { + switch (dataType.getSqlType()) { + case INT: + return convertToInt(bsonValue); + case BIGINT: + return convertToLong(bsonValue); + case DOUBLE: + return convertToDouble(bsonValue); + case STRING: + return convertToString(bsonValue); + case DATE: + return convertToLocalDateTime(bsonValue).toLocalDate(); + case TIME: + return convertToLocalDateTime(bsonValue).toLocalTime(); + case TIMESTAMP: + return convertToLocalDateTime(bsonValue); + case DECIMAL: + DecimalType decimalType = (DecimalType) dataType; + BigDecimal decimalValue = convertToBigDecimal(bsonValue); + return fromBigDecimal( + decimalValue, decimalType.getPrecision(), decimalType.getScale()); + } + return null; + } } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/test/java/mongodb/sender/MongoDBConnectorDeserializationSchemaTest.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/test/java/mongodb/sender/MongoDBConnectorDeserializationSchemaTest.java index 1b904900938..cd609d553de 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/test/java/mongodb/sender/MongoDBConnectorDeserializationSchemaTest.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/test/java/mongodb/sender/MongoDBConnectorDeserializationSchemaTest.java @@ -17,24 +17,40 @@ package mongodb.sender; +import org.apache.kafka.connect.source.SourceRecord; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.PhysicalColumn; import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.exception.MongodbConnectorException; import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.sender.MongoDBConnectorDeserializationSchema; import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils; - -import org.apache.kafka.connect.source.SourceRecord; - +import org.bson.BsonDateTime; +import org.bson.BsonDecimal128; import org.bson.BsonDocument; +import org.bson.BsonDouble; +import org.bson.BsonInt32; import org.bson.BsonInt64; +import org.bson.BsonObjectId; import org.bson.BsonString; +import org.bson.types.Decimal128; +import org.bson.types.ObjectId; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import java.math.BigDecimal; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.temporal.ChronoUnit; import java.util.Collections; import java.util.Map; +import java.util.Objects; +import java.util.stream.IntStream; import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.COLL_FIELD; import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DB_FIELD; @@ -52,25 +68,65 @@ public class MongoDBConnectorDeserializationSchemaTest { - @Test - public void extractTableId() { - CatalogTable catalogTable = + private static TableSchema tableSchema; + private static CatalogTable catalogTable; + + @BeforeAll + public static void setUp() { + tableSchema = + TableSchema.builder() + .column(PhysicalColumn.of("int", BasicType.INT_TYPE, 1L, true, null, "")) + .column(PhysicalColumn.of("long", BasicType.LONG_TYPE, 1L, true, null, "")) + .column( + PhysicalColumn.of( + "double", BasicType.DOUBLE_TYPE, 1L, true, null, "")) + .column( + PhysicalColumn.of( + "decimal", new DecimalType(10, 2), 1L, true, null, "")) + .column( + PhysicalColumn.of( + "string", BasicType.STRING_TYPE, 200L, true, null, "")) + .column( + PhysicalColumn.of( + "date", + LocalTimeType.LOCAL_DATE_TYPE, + null, + null, + true, + null, + null)) + .column( + PhysicalColumn.of( + "time", + LocalTimeType.LOCAL_TIME_TYPE, + null, + null, + true, + null, + null)) + .column( + PhysicalColumn.of( + "timestamp", + LocalTimeType.LOCAL_DATE_TIME_TYPE, + null, + null, + true, + null, + null)) + .build(); + catalogTable = CatalogTable.of( TableIdentifier.of("catalog", "database", "table"), - TableSchema.builder() - .column( - PhysicalColumn.of( - "name1", BasicType.STRING_TYPE, 1L, true, null, "")) - .column( - PhysicalColumn.of( - "name1", BasicType.STRING_TYPE, 1L, true, null, "")) - .build(), + tableSchema, Collections.emptyMap(), Collections.emptyList(), "comment"); + } + + @Test + public void extractTableId() { MongoDBConnectorDeserializationSchema schema = - new MongoDBConnectorDeserializationSchema( - Collections.singletonList(catalogTable), Collections.emptyMap()); + new MongoDBConnectorDeserializationSchema(Collections.singletonList(catalogTable)); // Build SourceRecord Map partitionMap = @@ -106,4 +162,107 @@ public void extractTableId() { Object tableId = schema.extractTableIdForTest(sourceRecord); Assertions.assertEquals("inventory.products", tableId); } + + @Test + public void testBsonConvert() { + MongoDBConnectorDeserializationSchema schema = + new MongoDBConnectorDeserializationSchema(Collections.singletonList(catalogTable)); + // check int + Assertions.assertEquals( + 123456, schema.convertToObject(getDataType("int"), new BsonInt32(123456))); + Assertions.assertEquals( + Integer.MAX_VALUE, + schema.convertToObject(getDataType("int"), new BsonInt64(Integer.MAX_VALUE))); + Assertions.assertEquals( + 123456, schema.convertToObject(getDataType("int"), new BsonDouble(123456))); + Assertions.assertThrowsExactly( + MongodbConnectorException.class, + () -> + schema.convertToObject( + getDataType("int"), new BsonDouble(1234567890123456789.0d))); + Assertions.assertThrowsExactly( + MongodbConnectorException.class, + () -> schema.convertToObject(getDataType("int"), new BsonInt64(Long.MIN_VALUE))); + // check long + Assertions.assertEquals( + 123456L, schema.convertToObject(getDataType("long"), new BsonInt32(123456))); + Assertions.assertEquals( + (long) Integer.MAX_VALUE, + schema.convertToObject(getDataType("long"), new BsonInt64(Integer.MAX_VALUE))); + Assertions.assertEquals( + 123456L, schema.convertToObject(getDataType("long"), new BsonDouble(123456))); + Assertions.assertThrowsExactly( + MongodbConnectorException.class, + () -> + schema.convertToObject( + getDataType("long"), + new BsonDouble(12345678901234567891234567890123456789.0d))); + + // check double + Assertions.assertEquals( + 1.0d, schema.convertToObject(getDataType("double"), new BsonInt32(1))); + Assertions.assertEquals( + 1.0d, schema.convertToObject(getDataType("double"), new BsonInt64(1))); + Assertions.assertEquals( + 4.4d, schema.convertToObject(getDataType("double"), new BsonDouble(4.4))); + // check decimal + Assertions.assertEquals( + new BigDecimal("3.14"), + schema.convertToObject( + getDataType("decimal"), new BsonDecimal128(Decimal128.parse("3.1415926")))); + // check string + Assertions.assertEquals( + "123456", schema.convertToObject(getDataType("string"), new BsonString("123456"))); + Assertions.assertEquals( + "507f191e810c19729de860ea", + schema.convertToObject( + getDataType("string"), + new BsonObjectId(new ObjectId("507f191e810c19729de860ea")))); + BsonDocument document = + new BsonDocument() + .append("key", new BsonString("123456")) + .append("value", new BsonInt64(123456789L)); + Assertions.assertEquals( + "{\"key\": \"123456\", \"value\": 123456789}", + schema.convertToObject(getDataType("string"), document)); + + LocalDateTime now = LocalDateTime.now().truncatedTo(ChronoUnit.MILLIS); + long epochMilli = now.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); + // check localDate + Assertions.assertEquals( + now.toLocalDate(), + schema.convertToObject(getDataType("date"), new BsonDateTime(epochMilli))); + Assertions.assertEquals( + now.toLocalDate(), + schema.convertToObject(getDataType("date"), new BsonDateTime(epochMilli))); + // check localTime + Assertions.assertEquals( + now.toLocalTime(), + schema.convertToObject(getDataType("time"), new BsonDateTime(epochMilli))); + Assertions.assertEquals( + now.toLocalTime(), + schema.convertToObject(getDataType("time"), new BsonDateTime(epochMilli))); + // check localDateTime + Assertions.assertEquals( + now, + schema.convertToObject(getDataType("timestamp"), new BsonDateTime(epochMilli))); + Assertions.assertEquals( + now, + schema.convertToObject(getDataType("timestamp"), new BsonDateTime(epochMilli))); + } + + private SeaTunnelDataType getDataType(String fieldName) { + String[] fieldNames = tableSchema.getFieldNames(); + return IntStream.range(0, fieldNames.length) + .mapToObj( + i -> { + if (fieldName.equals(fieldNames[i])) { + return tableSchema.getColumns().get(i).getDataType(); + } + return null; + }) + .filter(Objects::nonNull) + .findFirst() + .orElseThrow(() -> new RuntimeException("not found field")); + } } diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/BsonToRowDataConverters.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/BsonToRowDataConverters.java index 8eda6612c70..4993a0db46e 100644 --- a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/BsonToRowDataConverters.java +++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/BsonToRowDataConverters.java @@ -178,6 +178,15 @@ public Object apply(BsonValue bsonValue) { return convertToLocalDateTime(bsonValue).toLocalDate(); } }; + case TIME: + return new SerializableFunction() { + private static final long serialVersionUID = 1L; + + @Override + public Object apply(BsonValue bsonValue) { + return convertToLocalDateTime(bsonValue).toLocalTime(); + } + }; case TIMESTAMP: return new SerializableFunction() { private static final long serialVersionUID = 1L; @@ -217,7 +226,7 @@ public Object apply(BsonValue bsonValue) { private static LocalDateTime convertToLocalDateTime(BsonValue bsonValue) { Instant instant; if (bsonValue.isTimestamp()) { - instant = Instant.ofEpochSecond(bsonValue.asTimestamp().getTime()); + instant = Instant.ofEpochMilli(bsonValue.asTimestamp().getValue()); } else if (bsonValue.isDateTime()) { instant = Instant.ofEpochMilli(bsonValue.asDateTime().getValue()); } else { @@ -366,7 +375,18 @@ private static double convertToDouble(BsonValue bsonValue) { private static int convertToInt(BsonValue bsonValue) { if (bsonValue.isInt32()) { - return bsonValue.asNumber().intValue(); + return bsonValue.asInt32().getValue(); + } else if (bsonValue.isNumber()) { + long longValue = bsonValue.asNumber().longValue(); + if (longValue > Integer.MAX_VALUE || longValue < Integer.MIN_VALUE) { + throw new MongodbConnectorException( + UNSUPPORTED_DATA_TYPE, + "Unable to convert to integer from unexpected value '" + + bsonValue + + "' of type " + + bsonValue.getBsonType()); + } + return (int) longValue; } throw new MongodbConnectorException( UNSUPPORTED_DATA_TYPE, @@ -403,6 +423,17 @@ private static byte[] convertToBinary(BsonValue bsonValue) { private static long convertToLong(BsonValue bsonValue) { if (bsonValue.isInt64() || bsonValue.isInt32()) { return bsonValue.asNumber().longValue(); + } else if (bsonValue.isDouble()) { + double value = bsonValue.asNumber().doubleValue(); + if (value > Long.MAX_VALUE || value < Long.MIN_VALUE) { + throw new MongodbConnectorException( + UNSUPPORTED_DATA_TYPE, + "Unable to convert to long from unexpected value '" + + bsonValue + + "' of type " + + bsonValue.getBsonType()); + } + return bsonValue.asNumber().longValue(); } throw new MongodbConnectorException( UNSUPPORTED_DATA_TYPE, diff --git a/seatunnel-connectors-v2/connector-mongodb/src/test/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/BsonToRowDataConvertersTest.java b/seatunnel-connectors-v2/connector-mongodb/src/test/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/BsonToRowDataConvertersTest.java index b47769c0aca..26c268a4e7b 100644 --- a/seatunnel-connectors-v2/connector-mongodb/src/test/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/BsonToRowDataConvertersTest.java +++ b/seatunnel-connectors-v2/connector-mongodb/src/test/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/BsonToRowDataConvertersTest.java @@ -18,13 +18,29 @@ package org.apache.seatunnel.connectors.seatunnel.mongodb.serde; import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException; +import org.bson.BsonDateTime; +import org.bson.BsonDecimal128; +import org.bson.BsonDocument; import org.bson.BsonDouble; import org.bson.BsonInt32; import org.bson.BsonInt64; +import org.bson.BsonObjectId; +import org.bson.BsonString; +import org.bson.BsonTimestamp; +import org.bson.types.Decimal128; +import org.bson.types.ObjectId; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.math.BigDecimal; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.temporal.ChronoUnit; + public class BsonToRowDataConvertersTest { private final BsonToRowDataConverters converterFactory = new BsonToRowDataConverters(); @@ -42,7 +58,7 @@ public void testConvertAnyNumberToDouble() { } @Test - public void testConvertBsonIntToBigInt() { + public void testConvertBsonNumberToLong() { // It covered #7567 BsonToRowDataConverters.BsonToRowDataConverter converter = converterFactory.createConverter(BasicType.LONG_TYPE); @@ -51,5 +67,80 @@ public void testConvertBsonIntToBigInt() { Assertions.assertEquals( (long) Integer.MAX_VALUE, converter.convert(new BsonInt64(Integer.MAX_VALUE))); + + Assertions.assertEquals(123456L, converter.convert(new BsonDouble(123456))); + + Assertions.assertThrowsExactly( + MongodbConnectorException.class, + () -> converter.convert(new BsonDouble(12345678901234567891234567890123456789.0d))); + } + + @Test + public void testConvertBsonNumberToInt() { + // It covered #8042 + BsonToRowDataConverters.BsonToRowDataConverter converter = + converterFactory.createConverter(BasicType.INT_TYPE); + Assertions.assertEquals(123456, converter.convert(new BsonInt32(123456))); + Assertions.assertEquals( + Integer.MAX_VALUE, converter.convert(new BsonInt64(Integer.MAX_VALUE))); + Assertions.assertEquals(123456, converter.convert(new BsonDouble(123456))); + Assertions.assertThrowsExactly( + MongodbConnectorException.class, + () -> converter.convert(new BsonDouble(1234567890123456789.0d))); + } + + @Test + public void testConvertBsonDecimal128ToDecimal() { + BsonToRowDataConverters.BsonToRowDataConverter converter = + converterFactory.createConverter(new DecimalType(10, 2)); + Assertions.assertEquals( + new BigDecimal("3.14"), + converter.convert(new BsonDecimal128(Decimal128.parse("3.1415926")))); + } + + @Test + public void testConvertBsonToString() { + BsonToRowDataConverters.BsonToRowDataConverter converter = + converterFactory.createConverter(BasicType.STRING_TYPE); + Assertions.assertEquals("123456", converter.convert(new BsonString("123456"))); + + Assertions.assertEquals( + "507f191e810c19729de860ea", + converter.convert(new BsonObjectId(new ObjectId("507f191e810c19729de860ea")))); + + BsonDocument document = + new BsonDocument() + .append("key", new BsonString("123456")) + .append("value", new BsonInt64(123456789L)); + Assertions.assertEquals( + "{\"key\": \"123456\", \"value\": 123456789}", converter.convert(document)); + } + + @Test + public void testConvertBsonToLocalDateTime() { + LocalDateTime now = LocalDateTime.now().truncatedTo(ChronoUnit.MILLIS); + long epochMilli = now.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); + + // localDataTime converter + BsonToRowDataConverters.BsonToRowDataConverter localDataTimeConverter = + converterFactory.createConverter(LocalTimeType.LOCAL_DATE_TIME_TYPE); + Assertions.assertEquals(now, localDataTimeConverter.convert(new BsonTimestamp(epochMilli))); + Assertions.assertEquals(now, localDataTimeConverter.convert(new BsonDateTime(epochMilli))); + + // localDate converter + BsonToRowDataConverters.BsonToRowDataConverter localDataConverter = + converterFactory.createConverter(LocalTimeType.LOCAL_DATE_TYPE); + Assertions.assertEquals( + now.toLocalDate(), localDataConverter.convert(new BsonTimestamp(epochMilli))); + Assertions.assertEquals( + now.toLocalDate(), localDataConverter.convert(new BsonDateTime(epochMilli))); + + // localTime converter + BsonToRowDataConverters.BsonToRowDataConverter localTimeConverter = + converterFactory.createConverter(LocalTimeType.LOCAL_TIME_TYPE); + Assertions.assertEquals( + now.toLocalTime(), localTimeConverter.convert(new BsonTimestamp(epochMilli))); + Assertions.assertEquals( + now.toLocalTime(), localTimeConverter.convert(new BsonDateTime(epochMilli))); } } From 637d7abea4def375d88c50d6ec231658fd9b70a5 Mon Sep 17 00:00:00 2001 From: zhangdonghao Date: Thu, 14 Nov 2024 13:50:45 +0800 Subject: [PATCH 2/2] [Bug][connectors-v2]fix mongodb Bson convert exception #8042 --- .../sender/MongoDBConnectorDeserializationSchemaTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/test/java/mongodb/sender/MongoDBConnectorDeserializationSchemaTest.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/test/java/mongodb/sender/MongoDBConnectorDeserializationSchemaTest.java index cd609d553de..91c9fb47bc7 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/test/java/mongodb/sender/MongoDBConnectorDeserializationSchemaTest.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/test/java/mongodb/sender/MongoDBConnectorDeserializationSchemaTest.java @@ -17,7 +17,6 @@ package mongodb.sender; -import org.apache.kafka.connect.source.SourceRecord; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.PhysicalColumn; import org.apache.seatunnel.api.table.catalog.TableIdentifier; @@ -29,6 +28,9 @@ import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.exception.MongodbConnectorException; import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.sender.MongoDBConnectorDeserializationSchema; import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils; + +import org.apache.kafka.connect.source.SourceRecord; + import org.bson.BsonDateTime; import org.bson.BsonDecimal128; import org.bson.BsonDocument;