Skip to content

Commit

Permalink
[Bug][connectors-v2]fix mongodb Bson convert exception #8042
Browse files Browse the repository at this point in the history
  • Loading branch information
hawk9821 committed Nov 14, 2024
1 parent 392300e commit 869a362
Show file tree
Hide file tree
Showing 4 changed files with 364 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,15 @@ public Object apply(BsonValue bsonValue) {
return convertToLocalDateTime(bsonValue).toLocalDate();
}
};
case TIME:
return new SerializableFunction<BsonValue, Object>() {
private static final long serialVersionUID = 1L;

@Override
public Object apply(BsonValue bsonValue) {
return convertToLocalDateTime(bsonValue).toLocalTime();
}
};
case TIMESTAMP:
return new SerializableFunction<BsonValue, Object>() {
private static final long serialVersionUID = 1L;
Expand Down Expand Up @@ -382,7 +391,7 @@ public Object apply(BsonValue bsonValue) {
private static LocalDateTime convertToLocalDateTime(BsonValue bsonValue) {
Instant instant;
if (bsonValue.isTimestamp()) {
instant = Instant.ofEpochSecond(bsonValue.asTimestamp().getTime());
instant = Instant.ofEpochSecond(bsonValue.asTimestamp().getValue());
} else if (bsonValue.isDateTime()) {
instant = Instant.ofEpochMilli(bsonValue.asDateTime().getValue());
} else {
Expand Down Expand Up @@ -521,7 +530,7 @@ private static boolean convertToBoolean(@Nonnull BsonValue bsonValue) {
}

private static double convertToDouble(@Nonnull BsonValue bsonValue) {
if (bsonValue.isDouble()) {
if (bsonValue.isNumber()) {
return bsonValue.asNumber().doubleValue();
}
throw new MongodbConnectorException(
Expand All @@ -532,9 +541,20 @@ private static double convertToDouble(@Nonnull BsonValue bsonValue) {
+ bsonValue.getBsonType());
}

private static int convertToInt(@Nonnull BsonValue bsonValue) {
private static int convertToInt(BsonValue bsonValue) {
if (bsonValue.isInt32()) {
return bsonValue.asNumber().intValue();
return bsonValue.asInt32().getValue();
} else if (bsonValue.isNumber()) {
long longValue = bsonValue.asNumber().longValue();
if (longValue > Integer.MAX_VALUE || longValue < Integer.MIN_VALUE) {
throw new MongodbConnectorException(
UNSUPPORTED_DATA_TYPE,
"Unable to convert to integer from unexpected value '"
+ bsonValue
+ "' of type "
+ bsonValue.getBsonType());
}
return (int) longValue;
}
throw new MongodbConnectorException(
UNSUPPORTED_DATA_TYPE,
Expand Down Expand Up @@ -568,8 +588,19 @@ private static byte[] convertToBinary(@Nonnull BsonValue bsonValue) {
"Unsupported BYTES value type: " + bsonValue.getClass().getSimpleName());
}

private static long convertToLong(@Nonnull BsonValue bsonValue) {
if (bsonValue.isInt64()) {
private static long convertToLong(BsonValue bsonValue) {
if (bsonValue.isInt64() || bsonValue.isInt32()) {
return bsonValue.asNumber().longValue();
} else if (bsonValue.isDouble()) {
double value = bsonValue.asNumber().doubleValue();
if (value > Long.MAX_VALUE || value < Long.MIN_VALUE) {
throw new MongodbConnectorException(
UNSUPPORTED_DATA_TYPE,
"Unable to convert to long from unexpected value '"
+ bsonValue
+ "' of type "
+ bsonValue.getBsonType());
}
return bsonValue.asNumber().longValue();
}
throw new MongodbConnectorException(
Expand Down Expand Up @@ -599,4 +630,31 @@ private static BigDecimal convertToBigDecimal(@Nonnull BsonValue bsonValue) {
+ "' of type "
+ bsonValue.getBsonType());
}

@VisibleForTesting
public Object convertToObject(
@Nonnull SeaTunnelDataType<?> dataType, @Nonnull BsonValue bsonValue) {
switch (dataType.getSqlType()) {
case INT:
return convertToInt(bsonValue);
case BIGINT:
return convertToLong(bsonValue);
case DOUBLE:
return convertToDouble(bsonValue);
case STRING:
return convertToString(bsonValue);
case DATE:
return convertToLocalDateTime(bsonValue).toLocalDate();
case TIME:
return convertToLocalDateTime(bsonValue).toLocalTime();
case TIMESTAMP:
return convertToLocalDateTime(bsonValue);
case DECIMAL:
DecimalType decimalType = (DecimalType) dataType;
BigDecimal decimalValue = convertToBigDecimal(bsonValue);
return fromBigDecimal(
decimalValue, decimalType.getPrecision(), decimalType.getScale());
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,40 @@

package mongodb.sender;

import org.apache.kafka.connect.source.SourceRecord;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.exception.MongodbConnectorException;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.sender.MongoDBConnectorDeserializationSchema;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils;

import org.apache.kafka.connect.source.SourceRecord;

import org.bson.BsonDateTime;
import org.bson.BsonDecimal128;
import org.bson.BsonDocument;
import org.bson.BsonDouble;
import org.bson.BsonInt32;
import org.bson.BsonInt64;
import org.bson.BsonObjectId;
import org.bson.BsonString;
import org.bson.types.Decimal128;
import org.bson.types.ObjectId;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.stream.IntStream;

import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.COLL_FIELD;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DB_FIELD;
Expand All @@ -52,25 +68,65 @@

public class MongoDBConnectorDeserializationSchemaTest {

@Test
public void extractTableId() {
CatalogTable catalogTable =
private static TableSchema tableSchema;
private static CatalogTable catalogTable;

@BeforeAll
public static void setUp() {
tableSchema =
TableSchema.builder()
.column(PhysicalColumn.of("int", BasicType.INT_TYPE, 1L, true, null, ""))
.column(PhysicalColumn.of("long", BasicType.LONG_TYPE, 1L, true, null, ""))
.column(
PhysicalColumn.of(
"double", BasicType.DOUBLE_TYPE, 1L, true, null, ""))
.column(
PhysicalColumn.of(
"decimal", new DecimalType(10, 2), 1L, true, null, ""))
.column(
PhysicalColumn.of(
"string", BasicType.STRING_TYPE, 200L, true, null, ""))
.column(
PhysicalColumn.of(
"date",
LocalTimeType.LOCAL_DATE_TYPE,
null,
null,
true,
null,
null))
.column(
PhysicalColumn.of(
"time",
LocalTimeType.LOCAL_TIME_TYPE,
null,
null,
true,
null,
null))
.column(
PhysicalColumn.of(
"timestamp",
LocalTimeType.LOCAL_DATE_TIME_TYPE,
null,
null,
true,
null,
null))
.build();
catalogTable =
CatalogTable.of(
TableIdentifier.of("catalog", "database", "table"),
TableSchema.builder()
.column(
PhysicalColumn.of(
"name1", BasicType.STRING_TYPE, 1L, true, null, ""))
.column(
PhysicalColumn.of(
"name1", BasicType.STRING_TYPE, 1L, true, null, ""))
.build(),
tableSchema,
Collections.emptyMap(),
Collections.emptyList(),
"comment");
}

@Test
public void extractTableId() {
MongoDBConnectorDeserializationSchema schema =
new MongoDBConnectorDeserializationSchema(
Collections.singletonList(catalogTable), Collections.emptyMap());
new MongoDBConnectorDeserializationSchema(Collections.singletonList(catalogTable));

// Build SourceRecord
Map<String, String> partitionMap =
Expand Down Expand Up @@ -106,4 +162,107 @@ public void extractTableId() {
Object tableId = schema.extractTableIdForTest(sourceRecord);
Assertions.assertEquals("inventory.products", tableId);
}

@Test
public void testBsonConvert() {
MongoDBConnectorDeserializationSchema schema =
new MongoDBConnectorDeserializationSchema(Collections.singletonList(catalogTable));
// check int
Assertions.assertEquals(
123456, schema.convertToObject(getDataType("int"), new BsonInt32(123456)));
Assertions.assertEquals(
Integer.MAX_VALUE,
schema.convertToObject(getDataType("int"), new BsonInt64(Integer.MAX_VALUE)));
Assertions.assertEquals(
123456, schema.convertToObject(getDataType("int"), new BsonDouble(123456)));
Assertions.assertThrowsExactly(
MongodbConnectorException.class,
() ->
schema.convertToObject(
getDataType("int"), new BsonDouble(1234567890123456789.0d)));
Assertions.assertThrowsExactly(
MongodbConnectorException.class,
() -> schema.convertToObject(getDataType("int"), new BsonInt64(Long.MIN_VALUE)));
// check long
Assertions.assertEquals(
123456L, schema.convertToObject(getDataType("long"), new BsonInt32(123456)));
Assertions.assertEquals(
(long) Integer.MAX_VALUE,
schema.convertToObject(getDataType("long"), new BsonInt64(Integer.MAX_VALUE)));
Assertions.assertEquals(
123456L, schema.convertToObject(getDataType("long"), new BsonDouble(123456)));
Assertions.assertThrowsExactly(
MongodbConnectorException.class,
() ->
schema.convertToObject(
getDataType("long"),
new BsonDouble(12345678901234567891234567890123456789.0d)));

// check double
Assertions.assertEquals(
1.0d, schema.convertToObject(getDataType("double"), new BsonInt32(1)));
Assertions.assertEquals(
1.0d, schema.convertToObject(getDataType("double"), new BsonInt64(1)));
Assertions.assertEquals(
4.4d, schema.convertToObject(getDataType("double"), new BsonDouble(4.4)));
// check decimal
Assertions.assertEquals(
new BigDecimal("3.14"),
schema.convertToObject(
getDataType("decimal"), new BsonDecimal128(Decimal128.parse("3.1415926"))));
// check string
Assertions.assertEquals(
"123456", schema.convertToObject(getDataType("string"), new BsonString("123456")));
Assertions.assertEquals(
"507f191e810c19729de860ea",
schema.convertToObject(
getDataType("string"),
new BsonObjectId(new ObjectId("507f191e810c19729de860ea"))));
BsonDocument document =
new BsonDocument()
.append("key", new BsonString("123456"))
.append("value", new BsonInt64(123456789L));
Assertions.assertEquals(
"{\"key\": \"123456\", \"value\": 123456789}",
schema.convertToObject(getDataType("string"), document));

LocalDateTime now = LocalDateTime.now().truncatedTo(ChronoUnit.MILLIS);
long epochMilli = now.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
// check localDate
Assertions.assertEquals(
now.toLocalDate(),
schema.convertToObject(getDataType("date"), new BsonDateTime(epochMilli)));
Assertions.assertEquals(
now.toLocalDate(),
schema.convertToObject(getDataType("date"), new BsonDateTime(epochMilli)));
// check localTime
Assertions.assertEquals(
now.toLocalTime(),
schema.convertToObject(getDataType("time"), new BsonDateTime(epochMilli)));
Assertions.assertEquals(
now.toLocalTime(),
schema.convertToObject(getDataType("time"), new BsonDateTime(epochMilli)));
// check localDateTime
Assertions.assertEquals(
now,
schema.convertToObject(getDataType("timestamp"), new BsonDateTime(epochMilli)));
Assertions.assertEquals(
now,
schema.convertToObject(getDataType("timestamp"), new BsonDateTime(epochMilli)));
}

private SeaTunnelDataType<?> getDataType(String fieldName) {
String[] fieldNames = tableSchema.getFieldNames();
return IntStream.range(0, fieldNames.length)
.mapToObj(
i -> {
if (fieldName.equals(fieldNames[i])) {
return tableSchema.getColumns().get(i).getDataType();
}
return null;
})
.filter(Objects::nonNull)
.findFirst()
.orElseThrow(() -> new RuntimeException("not found field"));
}
}
Loading

0 comments on commit 869a362

Please sign in to comment.