diff --git a/plugin-mapping.properties b/plugin-mapping.properties index 1942f875d7c..579bf2dac04 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -129,4 +129,15 @@ seatunnel.source.ObsFile = connector-file-obs seatunnel.sink.ObsFile = connector-file-obs seatunnel.source.Milvus = connector-milvus seatunnel.sink.Milvus = connector-milvus -seatunnel.sink.ActiveMQ = connector-activemq \ No newline at end of file +seatunnel.sink.ActiveMQ = connector-activemq + +seatunnel.transform.Sql = seatunnel-transforms-v2 +seatunnel.transform.FieldMapper = seatunnel-transforms-v2 +seatunnel.transform.Filter = seatunnel-transforms-v2 +seatunnel.transform.FilterRowKind = seatunnel-transforms-v2 +seatunnel.transform.JsonPath = seatunnel-transforms-v2 +seatunnel.transform.Replace = seatunnel-transforms-v2 +seatunnel.transform.Split = seatunnel-transforms-v2 +seatunnel.transform.Copy = seatunnel-transforms-v2 +seatunnel.transform.DynamicCompile = seatunnel-transforms-v2 +seatunnel.transform.LLM = seatunnel-transforms-v2 \ No newline at end of file diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java index 0dc4209a8b6..166e581e2d9 100644 --- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java +++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java @@ -31,7 +31,6 @@ import org.apache.seatunnel.api.table.factory.FactoryException; import org.apache.seatunnel.api.table.factory.TableSourceFactory; import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; -import org.apache.seatunnel.api.table.factory.TableTransformFactory; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.common.constants.JobMode; import org.apache.seatunnel.common.utils.SeaTunnelException; @@ -49,7 +48,6 @@ import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME; import static org.apache.seatunnel.api.table.factory.FactoryUtil.DEFAULT_ID; -import static org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory; /** The util used for Spark/Flink to create to SeaTunnelSource etc. */ public class PluginUtil { @@ -130,21 +128,21 @@ private static SeaTunnelSource fallbackCreate( return source; } - public static TableTransformFactory createTransformFactory( + public static Optional createTransformFactory( + SeaTunnelFactoryDiscovery factoryDiscovery, SeaTunnelTransformPluginDiscovery transformPluginDiscovery, Config transformConfig, List pluginJars) { PluginIdentifier pluginIdentifier = PluginIdentifier.of( ENGINE_TYPE, "transform", transformConfig.getString(PLUGIN_NAME.key())); - final ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(transformConfig); - final String factoryId = readonlyConfig.get(PLUGIN_NAME); - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - final TableTransformFactory factory = - discoverFactory(classLoader, TableTransformFactory.class, factoryId); pluginJars.addAll( transformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier))); - return factory; + try { + return factoryDiscovery.createOptionalPluginInstance(pluginIdentifier); + } catch (FactoryException e) { + return Optional.empty(); + } } public static Optional createSinkFactory( diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java index d91bb9d3da7..1ff2cf64372 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java @@ -29,6 +29,7 @@ import org.apache.seatunnel.api.transform.SeaTunnelTransform; import org.apache.seatunnel.core.starter.exception.TaskExecuteException; import org.apache.seatunnel.core.starter.execution.PluginUtil; +import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery; import org.apache.seatunnel.translation.flink.serialization.FlinkRowConverter; import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils; @@ -41,6 +42,7 @@ import java.net.URL; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; import static org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME; @@ -59,15 +61,23 @@ protected TransformExecuteProcessor( @Override protected List initializePlugins( List jarPaths, List pluginConfigs) { + + SeaTunnelFactoryDiscovery factoryDiscovery = + new SeaTunnelFactoryDiscovery(TableTransformFactory.class, ADD_URL_TO_CLASSLOADER); SeaTunnelTransformPluginDiscovery transformPluginDiscovery = new SeaTunnelTransformPluginDiscovery(); - return pluginConfigs.stream() .map( transformConfig -> PluginUtil.createTransformFactory( - transformPluginDiscovery, transformConfig, jarPaths)) + factoryDiscovery, + transformPluginDiscovery, + transformConfig, + jarPaths)) .distinct() + .filter(Optional::isPresent) + .map(Optional::get) + .map(e -> (TableTransformFactory) e) .collect(Collectors.toList()); } diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java index bc7cd5cdbed..fc4a9e00d0d 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java @@ -29,6 +29,7 @@ import org.apache.seatunnel.api.transform.SeaTunnelTransform; import org.apache.seatunnel.core.starter.exception.TaskExecuteException; import org.apache.seatunnel.core.starter.execution.PluginUtil; +import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery; import org.apache.seatunnel.translation.spark.serialization.SeaTunnelRowConverter; import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils; @@ -50,6 +51,7 @@ import java.util.Iterator; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.stream.Collectors; import static org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME; @@ -69,16 +71,23 @@ protected TransformExecuteProcessor( protected List initializePlugins(List pluginConfigs) { SeaTunnelTransformPluginDiscovery transformPluginDiscovery = new SeaTunnelTransformPluginDiscovery(); + + SeaTunnelFactoryDiscovery factoryDiscovery = + new SeaTunnelFactoryDiscovery(TableTransformFactory.class); List pluginJars = new ArrayList<>(); List transforms = pluginConfigs.stream() .map( transformConfig -> PluginUtil.createTransformFactory( + factoryDiscovery, transformPluginDiscovery, transformConfig, - pluginJars)) + new ArrayList<>())) .distinct() + .filter(Optional::isPresent) + .map(Optional::get) + .map(e -> (TableTransformFactory) e) .collect(Collectors.toList()); sparkRuntimeEnvironment.registerPlugin(pluginJars); return transforms; diff --git a/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml b/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml index cc48ac86a2c..4510579d811 100644 --- a/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml +++ b/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml @@ -140,7 +140,7 @@ provided - + false @@ -148,6 +148,7 @@ false org.apache.seatunnel:connector-*:jar + org.apache.seatunnel:seatunnel-transforms-v2:jar org.apache.seatunnel:connector-common @@ -160,36 +161,7 @@ provided - - - false - true - false - - org.apache.seatunnel:seatunnel-transforms-v2:jar - org.apache.hadoop:hadoop-aws:jar - com.amazonaws:aws-java-sdk-bundle:jar - org.apache.seatunnel:seatunnel-hadoop3-3.1.4-uber:jar:*:optional - - org.apache.hadoop:hadoop-aliyun:jar - com.aliyun.oss:aliyun-sdk-oss:jar - org.jdom:jdom:jar - - - io.netty:netty-buffer:jar - io.netty:netty-common:jar - - - org.apache.hive:hive-exec:jar - org.apache.hive:hive-service:jar - org.apache.thrift:libfb303:jar - - ${artifact.file.name} - /lib - provided - - - + false true @@ -209,6 +181,20 @@ com.amazon.redshift:redshift-jdbc42:jar net.snowflake.snowflake-jdbc:jar com.xugudb:xugu-jdbc:jar + org.apache.hadoop:hadoop-aws:jar + com.amazonaws:aws-java-sdk-bundle:jar + org.apache.seatunnel:seatunnel-hadoop3-3.1.4-uber:jar:*:optional + + org.apache.hadoop:hadoop-aliyun:jar + com.aliyun.oss:aliyun-sdk-oss:jar + org.jdom:jdom:jar + + io.netty:netty-buffer:jar + io.netty:netty-common:jar + + org.apache.hive:hive-exec:jar + org.apache.hive:hive-service:jar + org.apache.thrift:libfb303:jar ${artifact.file.name} /lib diff --git a/seatunnel-dist/src/main/assembly/assembly-bin.xml b/seatunnel-dist/src/main/assembly/assembly-bin.xml index 30fc5a6336a..f16841f7a95 100644 --- a/seatunnel-dist/src/main/assembly/assembly-bin.xml +++ b/seatunnel-dist/src/main/assembly/assembly-bin.xml @@ -161,13 +161,12 @@ provided - + false true false - org.apache.seatunnel:seatunnel-transforms-v2:jar org.apache.seatunnel:seatunnel-hadoop3-3.1.4-uber:jar:*:optional ${artifact.file.name} @@ -175,7 +174,7 @@ provided - + false @@ -184,6 +183,7 @@ org.apache.seatunnel:connector-fake:jar org.apache.seatunnel:connector-console:jar + org.apache.seatunnel:seatunnel-transforms-v2:jar /connectors provided diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java index 1c590bb69ab..6c6a8e5cddd 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java @@ -195,13 +195,13 @@ public static void copySeaTunnelStarterToContainer( MountableFile.forHostPath(startJarPath), Paths.get(seatunnelHomeInContainer, "starter", startJarName).toString()); - // copy lib + // copy transform String transformJar = "seatunnel-transforms-v2.jar"; Path transformJarPath = Paths.get(PROJECT_ROOT_PATH, "seatunnel-transforms-v2", "target", transformJar); container.withCopyFileToContainer( MountableFile.forHostPath(transformJarPath), - Paths.get(seatunnelHomeInContainer, "lib", transformJar).toString()); + Paths.get(seatunnelHomeInContainer, "connectors", transformJar).toString()); // copy bin final String startBinPath = startModulePath + File.separator + "src/main/bin/"; diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java index 40a6640c358..d02a76a4c51 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java @@ -182,7 +182,7 @@ public ImmutablePair, Set> parse(ClassLoaderService classLoade TypesafeConfigUtils.getConfigList( seaTunnelJobConfig, "sink", Collections.emptyList()); - List connectorJars = getConnectorJarList(sourceConfigs, sinkConfigs); + List connectorJars = getConnectorJarList(sourceConfigs, transformConfigs, sinkConfigs); if (!commonPluginJars.isEmpty()) { connectorJars.addAll(commonPluginJars); } @@ -238,18 +238,32 @@ public Set getUsedFactoryUrls(List sinkActions) { } private List getConnectorJarList( - List sourceConfigs, List sinkConfigs) { + List sourceConfigs, + List transformConfigs, + List sinkConfigs) { List factoryIds = Stream.concat( - sourceConfigs.stream() - .map(ConfigParserUtil::getFactoryId) - .map( - factory -> - PluginIdentifier.of( - CollectionConstants - .SEATUNNEL_PLUGIN, - CollectionConstants.SOURCE_PLUGIN, - factory)), + Stream.concat( + sourceConfigs.stream() + .map(ConfigParserUtil::getFactoryId) + .map( + factory -> + PluginIdentifier.of( + CollectionConstants + .SEATUNNEL_PLUGIN, + CollectionConstants + .SOURCE_PLUGIN, + factory)), + transformConfigs.stream() + .map(ConfigParserUtil::getFactoryId) + .map( + factory -> + PluginIdentifier.of( + CollectionConstants + .SEATUNNEL_PLUGIN, + CollectionConstants + .TRANSFORM_PLUGIN, + factory))), sinkConfigs.stream() .map(ConfigParserUtil::getFactoryId) .map( diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelTransformPluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelTransformPluginDiscovery.java index 445bf14628d..606cd0d7cae 100644 --- a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelTransformPluginDiscovery.java +++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelTransformPluginDiscovery.java @@ -34,7 +34,7 @@ public class SeaTunnelTransformPluginDiscovery extends AbstractPluginDiscovery { public SeaTunnelTransformPluginDiscovery() { - super(Common.libDir()); + super(Common.connectorDir()); } @Override diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java index 5670bcc1296..632d3af1e41 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java @@ -20,10 +20,12 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.transform.SeaTunnelTransform; import lombok.NonNull; -public abstract class AbstractCatalogSupportTransform extends AbstractSeaTunnelTransform { +public abstract class AbstractCatalogSupportTransform implements SeaTunnelTransform { protected CatalogTable inputCatalogTable; protected volatile CatalogTable outputCatalogTable; @@ -32,6 +34,18 @@ public AbstractCatalogSupportTransform(@NonNull CatalogTable inputCatalogTable) this.inputCatalogTable = inputCatalogTable; } + @Override + public SeaTunnelRow map(SeaTunnelRow row) { + return transformRow(row); + } + + /** + * Outputs transformed row data. + * + * @param inputRow upstream input row data + */ + protected abstract SeaTunnelRow transformRow(SeaTunnelRow inputRow); + @Override public CatalogTable getProducedCatalogTable() { if (outputCatalogTable == null) { diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java deleted file mode 100644 index 1892881c277..00000000000 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.transform.common; - -import org.apache.seatunnel.api.table.catalog.CatalogTable; -import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.api.transform.SeaTunnelTransform; - -public abstract class AbstractSeaTunnelTransform implements SeaTunnelTransform { - - protected String inputTableName; - protected SeaTunnelRowType inputRowType; - - protected SeaTunnelRowType outputRowType; - - @Override - public SeaTunnelRow map(SeaTunnelRow row) { - return transformRow(row); - } - - /** - * Outputs transformed row data. - * - * @param inputRow upstream input row data - */ - protected abstract SeaTunnelRow transformRow(SeaTunnelRow inputRow); - - @Override - public CatalogTable getProducedCatalogTable() { - throw new UnsupportedOperationException( - String.format( - "Connector %s must implement TableTransformFactory.createTransform method", - getPluginName())); - } -} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java index a9d04b07396..00316bba8e7 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java @@ -62,6 +62,8 @@ public class SQLTransform extends AbstractCatalogSupportTransform { private transient SQLEngine sqlEngine; + private final String inputTableName; + public SQLTransform(@NonNull ReadonlyConfig config, @NonNull CatalogTable catalogTable) { super(catalogTable); this.query = config.get(KEY_QUERY); @@ -77,15 +79,6 @@ public SQLTransform(@NonNull ReadonlyConfig config, @NonNull CatalogTable catalo } else { this.inputTableName = catalogTable.getTableId().getTableName(); } - List columns = catalogTable.getTableSchema().getColumns(); - String[] fieldNames = new String[columns.size()]; - SeaTunnelDataType[] fieldTypes = new SeaTunnelDataType[columns.size()]; - for (int i = 0; i < columns.size(); i++) { - Column column = columns.get(i); - fieldNames[i] = column.getName(); - fieldTypes[i] = column.getDataType(); - } - this.inputRowType = new SeaTunnelRowType(fieldNames, fieldTypes); } @Override @@ -98,8 +91,8 @@ public void open() { sqlEngine = SQLEngineFactory.getSQLEngine(engineType); sqlEngine.init( inputTableName, - inputCatalogTable != null ? inputCatalogTable.getTableId().getTableName() : null, - inputRowType, + inputCatalogTable.getTableId().getTableName(), + inputCatalogTable.getSeaTunnelRowType(), query); }