Skip to content

Commit

Permalink
[feature][core] Unified engine initialization connector logic (#8536)
Browse files Browse the repository at this point in the history
  • Loading branch information
liugddx authored Jan 22, 2025
1 parent 9268f5a commit a05ba93
Show file tree
Hide file tree
Showing 25 changed files with 314 additions and 623 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.plugin.discovery;
package org.apache.seatunnel.api.common;

import org.apache.commons.lang3.StringUtils;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.api.table.factory;

import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.PluginIdentifier;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.configuration.util.OptionRule;
Expand All @@ -36,11 +37,13 @@
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.common.utils.ExceptionUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import scala.Tuple2;

import java.io.Serializable;
Expand All @@ -51,12 +54,17 @@
import java.util.Optional;
import java.util.ServiceConfigurationError;
import java.util.ServiceLoader;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;

/**
* Use SPI to create {@link TableSourceFactory}, {@link TableSinkFactory} and {@link
* CatalogFactory}.
*/
@Slf4j
public final class FactoryUtil {

private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class);
Expand All @@ -65,31 +73,60 @@ public final class FactoryUtil {

public static <T, SplitT extends SourceSplit, StateT extends Serializable>
Tuple2<SeaTunnelSource<T, SplitT, StateT>, List<CatalogTable>> createAndPrepareSource(
ReadonlyConfig options, ClassLoader classLoader, String factoryIdentifier) {
return restoreAndPrepareSource(options, classLoader, factoryIdentifier, null);
ReadonlyConfig options,
ClassLoader classLoader,
String factoryIdentifier,
Function<PluginIdentifier, SeaTunnelSource> fallbackCreateSource,
TableSourceFactory factory) {
return restoreAndPrepareSource(
options, classLoader, factoryIdentifier, null, fallbackCreateSource, factory);
}

public static <T, SplitT extends SourceSplit, StateT extends Serializable>
Tuple2<SeaTunnelSource<T, SplitT, StateT>, List<CatalogTable>> restoreAndPrepareSource(
ReadonlyConfig options,
ClassLoader classLoader,
String factoryIdentifier,
ChangeStreamTableSourceCheckpoint checkpoint) {
ChangeStreamTableSourceCheckpoint checkpoint,
Function<PluginIdentifier, SeaTunnelSource> fallbackCreateSource,
TableSourceFactory factory) {

try {
final TableSourceFactory factory =
discoverFactory(classLoader, TableSourceFactory.class, factoryIdentifier);

SeaTunnelSource<T, SplitT, StateT> source;
if (factory instanceof ChangeStreamTableSourceFactory && checkpoint != null) {
ChangeStreamTableSourceFactory changeStreamTableSourceFactory =
(ChangeStreamTableSourceFactory) factory;
ChangeStreamTableSourceState<Serializable, SourceSplit> state =
changeStreamTableSourceFactory.deserializeTableSourceState(checkpoint);
final String factoryId = options.get(PLUGIN_NAME);

boolean fallback =
isFallback(
classLoader,
TableSourceFactory.class,
factoryId,
(sourceFactory) -> sourceFactory.createSource(null));

if (fallback) {
source =
restoreAndPrepareSource(
changeStreamTableSourceFactory, options, classLoader, state);
fallbackCreateSource.apply(
PluginIdentifier.of("seatunnel", "source", factoryId));
source.prepare(options.toConfig());

} else {
source = createAndPrepareSource(factory, options, classLoader);
if (factory == null) {
factory =
discoverFactory(
classLoader, TableSourceFactory.class, factoryIdentifier);
}

if (factory instanceof ChangeStreamTableSourceFactory && checkpoint != null) {
ChangeStreamTableSourceFactory changeStreamTableSourceFactory =
(ChangeStreamTableSourceFactory) factory;
ChangeStreamTableSourceState<Serializable, SourceSplit> state =
changeStreamTableSourceFactory.deserializeTableSourceState(checkpoint);
source =
restoreAndPrepareSource(
changeStreamTableSourceFactory, options, classLoader, state);
} else {
source = createAndPrepareSource(factory, options, classLoader);
}
}
List<CatalogTable> catalogTables;
try {
Expand All @@ -115,6 +152,7 @@ Tuple2<SeaTunnelSource<T, SplitT, StateT>, List<CatalogTable>> restoreAndPrepare
catalogTables.add(catalogTable);
}
return new Tuple2<>(source, catalogTables);

} catch (Throwable t) {
throw new FactoryException(
String.format(
Expand Down Expand Up @@ -150,25 +188,50 @@ SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> createAndPrepareSi
CatalogTable catalogTable,
ReadonlyConfig config,
ClassLoader classLoader,
String factoryIdentifier) {
String factoryIdentifier,
Function<PluginIdentifier, SeaTunnelSink> fallbackCreateSink,
TableSinkFactory<IN, StateT, CommitInfoT, AggregatedCommitInfoT>
tableSinkFactory) {
try {
TableSinkFactory<IN, StateT, CommitInfoT, AggregatedCommitInfoT> factory =
discoverFactory(classLoader, TableSinkFactory.class, factoryIdentifier);
final String factoryId = config.get(PLUGIN_NAME);

boolean fallback =
isFallback(
classLoader,
TableSinkFactory.class,
factoryId,
(sinkFactory) -> sinkFactory.createSink(null));

if (fallback) {
SeaTunnelSink sink =
fallbackCreateSink.apply(
PluginIdentifier.of("seatunnel", "sink", factoryId));
sink.prepare(config.toConfig());
sink.setTypeInfo(catalogTable.getSeaTunnelRowType());

return sink;
}

if (tableSinkFactory == null) {
tableSinkFactory =
discoverFactory(classLoader, TableSinkFactory.class, factoryIdentifier);
}

TableSinkFactoryContext context =
TableSinkFactoryContext.replacePlaceholderAndCreate(
catalogTable,
config,
classLoader,
factory.excludeTablePlaceholderReplaceKeys());
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
tableSinkFactory.excludeTablePlaceholderReplaceKeys());
ConfigValidator.of(context.getOptions()).validate(tableSinkFactory.optionRule());

LOG.info(
"Create sink '{}' with upstream input catalog-table[database: {}, schema: {}, table: {}]",
factoryIdentifier,
catalogTable.getTablePath().getDatabaseName(),
catalogTable.getTablePath().getSchemaName(),
catalogTable.getTablePath().getTableName());
return factory.createSink(context).createSink();
return tableSinkFactory.createSink(context).createSink();
} catch (Throwable t) {
throw new FactoryException(
String.format(
Expand Down Expand Up @@ -351,4 +414,26 @@ public static SeaTunnelTransform<?> createAndPrepareMultiTableTransform(
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
return factory.createTransform(context).createTransform();
}

private static <T extends Factory> boolean isFallback(
ClassLoader classLoader,
Class<T> factoryClass,
String factoryId,
Consumer<T> virtualCreator) {
Optional<T> factory = discoverOptionalFactory(classLoader, factoryClass, factoryId);
if (!factory.isPresent()) {
return true;
}
try {
virtualCreator.accept(factory.get());
} catch (Exception e) {
if (e instanceof UnsupportedOperationException
&& "The Factory has not been implemented and the deprecated Plugin will be used."
.equals(e.getMessage())) {
return true;
}
log.debug(ExceptionUtils.getMessage(e));
}
return false;
}
}
Loading

0 comments on commit a05ba93

Please sign in to comment.