Skip to content

Commit

Permalink
[Feature] [Postgre CDC]support array type (#8560)
Browse files Browse the repository at this point in the history
Co-authored-by: litiliu <[email protected]>
  • Loading branch information
litiliu and litiliu authored Jan 21, 2025
1 parent c5751b0 commit 021af14
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.seatunnel.connectors.cdc.debezium.row;

import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;

import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
Expand Down Expand Up @@ -48,6 +51,7 @@
import java.time.LocalTime;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;

/** Deserialization schema from Debezium object to {@link SeaTunnelRow} */
Expand Down Expand Up @@ -173,12 +177,49 @@ public Object convert(Object dbzObj, Schema schema) throws Exception {
return createRowConverter(
(SeaTunnelRowType) type, serverTimeZone, userDefinedConverterFactory);
case ARRAY:
return createArrayConverter(type);
case MAP:
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
}
}

@VisibleForTesting
protected static DebeziumDeserializationConverter createArrayConverter(
SeaTunnelDataType<?> type) {
SeaTunnelDataType elementType = ((ArrayType) type).getElementType();
switch (elementType.getSqlType()) {
case BOOLEAN:
return (dbzObj, schema) ->
convertListToArray((List<Boolean>) dbzObj, Boolean.class);
case SMALLINT:
return (dbzObj, schema) -> convertListToArray((List<Short>) dbzObj, Short.class);
case INT:
return (dbzObj, schema) ->
convertListToArray((List<Integer>) dbzObj, Integer.class);
case BIGINT:
return (dbzObj, schema) -> convertListToArray((List<Long>) dbzObj, Long.class);
case FLOAT:
return (dbzObj, schema) -> convertListToArray((List<Float>) dbzObj, Float.class);
case DOUBLE:
return (dbzObj, schema) -> convertListToArray((List<Double>) dbzObj, Double.class);
case STRING:
return (dbzObj, schema) -> convertListToArray((List<String>) dbzObj, String.class);
default:
throw new IllegalArgumentException(
"Unsupported SQL type: " + elementType.getSqlType());
}
}

@SuppressWarnings("unchecked")
private static <T> T[] convertListToArray(List<T> list, Class<T> clazz) {
T[] array = (T[]) java.lang.reflect.Array.newInstance(clazz, list.size());
for (int i = 0; i < list.size(); i++) {
array[i] = list.get(i);
}
return array;
}

private static DebeziumDeserializationConverter convertToBoolean() {
return new DebeziumDeserializationConverter() {
private static final long serialVersionUID = 1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package org.apache.seatunnel.connectors.cdc.debezium.row;

import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationConverter;
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationConverterFactory;
import org.apache.seatunnel.connectors.cdc.debezium.MetadataConverter;

Expand All @@ -34,6 +36,7 @@

import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;

public class SeaTunnelRowDebeziumDeserializationConvertersTest {
Expand Down Expand Up @@ -75,4 +78,53 @@ void testDefaultValueNotUsed() throws Exception {
Assertions.assertEquals(row.getField(0), 1);
Assertions.assertNull(row.getField(1));
}

@Test
void testArrayConverter() throws Exception {
DebeziumDeserializationConverter converter;
// bool array converter
converter =
SeaTunnelRowDebeziumDeserializationConverters.createArrayConverter(
ArrayType.BOOLEAN_ARRAY_TYPE);
Boolean[] booleans = new Boolean[] {false, true};
Assertions.assertTrue(
Arrays.equals(
booleans, (Boolean[]) (converter.convert(Arrays.asList(booleans), null))));
// smallInt array converter
converter =
SeaTunnelRowDebeziumDeserializationConverters.createArrayConverter(
ArrayType.SHORT_ARRAY_TYPE);
Short[] shorts = new Short[] {(short) 1, (short) 2};
Assertions.assertTrue(
Arrays.equals(shorts, (Short[]) (converter.convert(Arrays.asList(shorts), null))));
// int array converter
converter =
SeaTunnelRowDebeziumDeserializationConverters.createArrayConverter(
ArrayType.INT_ARRAY_TYPE);
Integer[] ints = new Integer[] {1, 2};
Assertions.assertTrue(
Arrays.equals(ints, (Integer[]) (converter.convert(Arrays.asList(ints), null))));
// long array converter
converter =
SeaTunnelRowDebeziumDeserializationConverters.createArrayConverter(
ArrayType.LONG_ARRAY_TYPE);
Long[] longs = new Long[] {1L, 2L};
Assertions.assertTrue(
Arrays.equals(longs, (Long[]) (converter.convert(Arrays.asList(longs), null))));
// float array converter
converter =
SeaTunnelRowDebeziumDeserializationConverters.createArrayConverter(
ArrayType.FLOAT_ARRAY_TYPE);
Float[] floats = new Float[] {1.0f, 2.0f};
Assertions.assertTrue(
Arrays.equals(floats, (Float[]) (converter.convert(Arrays.asList(floats), null))));
// double array converter
converter =
SeaTunnelRowDebeziumDeserializationConverters.createArrayConverter(
ArrayType.DOUBLE_ARRAY_TYPE);
Double[] doubles = new Double[] {1.0, 2.0};
Assertions.assertTrue(
Arrays.equals(
doubles, (Double[]) (converter.convert(Arrays.asList(doubles), null))));
}
}

0 comments on commit 021af14

Please sign in to comment.