Skip to content

Commit

Permalink
[feature][jdbc][TiDB] add TiDB catalog (#4438)
Browse files Browse the repository at this point in the history
  • Loading branch information
ashulin authored Mar 30, 2023
1 parent edbccca commit 9a32db6
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ public AbstractJdbcCatalog(
String catalogName, String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo) {

checkArgument(StringUtils.isNotBlank(username));
checkArgument(StringUtils.isNotBlank(pwd));
urlInfo.getDefaultDatabase()
.orElseThrow(
() -> new IllegalArgumentException("Can't find default database in url"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@

public class MySqlCatalog extends AbstractJdbcCatalog {

private static final Set<String> SYS_DATABASES = new HashSet<>(4);
protected static final Set<String> SYS_DATABASES = new HashSet<>(4);

static {
SYS_DATABASES.add("information_schema");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.tidb;

import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalog;

public class TiDBCatalog extends MySqlCatalog {

static {
SYS_DATABASES.clear();
SYS_DATABASES.add("information_schema");
SYS_DATABASES.add("mysql");
SYS_DATABASES.add("performance_schema");
SYS_DATABASES.add("metrics_schema");
}

public TiDBCatalog(
String catalogName, String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo) {
super(catalogName, username, pwd, urlInfo);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.tidb;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.configuration.util.OptionValidationException;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;

import com.google.auto.service.AutoService;

import java.util.Optional;

@AutoService(Factory.class)
public class TiDBCatalogFactory implements CatalogFactory {

public static final String IDENTIFIER = "TiDB";

@Override
public String factoryIdentifier() {
return IDENTIFIER;
}

@Override
public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
String urlWithDatabase = options.get(JdbcCatalogOptions.BASE_URL);
JdbcUrlUtil.UrlInfo urlInfo = JdbcUrlUtil.getUrlInfo(urlWithDatabase);
Optional<String> defaultDatabase = urlInfo.getDefaultDatabase();
if (!defaultDatabase.isPresent()) {
throw new OptionValidationException(JdbcCatalogOptions.BASE_URL);
}
return new TiDBCatalog(
catalogName,
options.get(JdbcCatalogOptions.USERNAME),
options.get(JdbcCatalogOptions.PASSWORD),
urlInfo);
}

@Override
public OptionRule optionRule() {
return JdbcCatalogOptions.BASE_RULE.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,36 +19,48 @@

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportDataSaveMode;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.tidb.TiDBCatalogFactory;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;

import org.apache.commons.lang3.StringUtils;

import com.google.auto.service.AutoService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

@AutoService(SeaTunnelSink.class)
public class JdbcSink
implements SeaTunnelSink<SeaTunnelRow, JdbcSinkState, XidInfo, JdbcAggregatedCommitInfo> {

private Config pluginConfig;
implements SeaTunnelSink<SeaTunnelRow, JdbcSinkState, XidInfo, JdbcAggregatedCommitInfo>,
SupportDataSaveMode {

private SeaTunnelRowType seaTunnelRowType;

Expand All @@ -58,17 +70,39 @@ public class JdbcSink

private JdbcDialect dialect;

private ReadonlyConfig config;

private DataSaveMode dataSaveMode;

private CatalogTable catalogTable;

public JdbcSink(
ReadonlyConfig config,
JdbcSinkConfig jdbcSinkConfig,
JdbcDialect dialect,
DataSaveMode dataSaveMode,
CatalogTable catalogTable) {
this.config = config;
this.jdbcSinkConfig = jdbcSinkConfig;
this.dialect = dialect;
this.dataSaveMode = dataSaveMode;
this.catalogTable = catalogTable;
this.seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType();
}

public JdbcSink() {}

@Override
public String getPluginName() {
return "Jdbc";
}

@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
ReadonlyConfig config = ReadonlyConfig.fromConfig(pluginConfig);
this.config = ReadonlyConfig.fromConfig(pluginConfig);
this.jdbcSinkConfig = JdbcSinkConfig.of(config);
this.pluginConfig = pluginConfig;
this.dialect = JdbcDialectLoader.load(jdbcSinkConfig.getJdbcConnectionConfig().getUrl());
this.dataSaveMode = DataSaveMode.KEEP_SCHEMA_AND_DATA;
}

@Override
Expand Down Expand Up @@ -140,4 +174,41 @@ public Optional<Serializer<XidInfo>> getCommitInfoSerializer() {
}
return Optional.empty();
}

@Override
public DataSaveMode getDataSaveMode() {
return dataSaveMode;
}

@Override
public List<DataSaveMode> supportedDataSaveModeValues() {
return Collections.singletonList(DataSaveMode.KEEP_SCHEMA_AND_DATA);
}

@Override
public void handleSaveMode(DataSaveMode saveMode) {
if (catalogTable != null) {
Map<String, String> catalogOptions = config.get(CatalogOptions.CATALOG_OPTIONS);
if (catalogOptions != null
&& TiDBCatalogFactory.IDENTIFIER.equalsIgnoreCase(
catalogOptions.get(CommonOptions.FACTORY_ID.key()))) {
if (StringUtils.isBlank(jdbcSinkConfig.getDatabase())) {
return;
}
Catalog catalog =
new TiDBCatalogFactory()
.createCatalog(
TiDBCatalogFactory.IDENTIFIER,
ReadonlyConfig.fromMap(new HashMap<>(catalogOptions)));
TablePath tablePath =
TablePath.of(jdbcSinkConfig.getDatabase(), jdbcSinkConfig.getTable());
if (!catalog.databaseExists(jdbcSinkConfig.getDatabase())) {
catalog.createDatabase(tablePath, true);
}
if (!catalog.tableExists(tablePath)) {
catalog.createTable(tablePath, catalogTable, true);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,16 @@

package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableFactoryContext;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader;

import com.google.auto.service.AutoService;

Expand Down Expand Up @@ -50,6 +57,20 @@ public String factoryIdentifier() {
return "Jdbc";
}

@Override
public TableSink createSink(TableFactoryContext context) {
ReadonlyConfig config = context.getOptions();
JdbcSinkConfig sinkConfig = JdbcSinkConfig.of(config);
JdbcDialect dialect = JdbcDialectLoader.load(sinkConfig.getJdbcConnectionConfig().getUrl());
return () ->
new JdbcSink(
config,
sinkConfig,
dialect,
DataSaveMode.KEEP_SCHEMA_AND_DATA,
context.getCatalogTable());
}

@Override
public OptionRule optionRule() {
return OptionRule.builder()
Expand All @@ -64,13 +85,13 @@ public OptionRule optionRule() {
GENERATE_SINK_SQL,
AUTO_COMMIT,
SUPPORT_UPSERT_BY_QUERY_PRIMARY_KEY_EXIST)
.optional(MAX_RETRIES)
.conditional(
IS_EXACTLY_ONCE,
true,
XA_DATA_SOURCE_CLASS_NAME,
MAX_COMMIT_ATTEMPTS,
TRANSACTION_TIMEOUT_SEC)
.conditional(IS_EXACTLY_ONCE, false, MAX_RETRIES)
.conditional(GENERATE_SINK_SQL, true, DATABASE, TABLE)
.conditional(GENERATE_SINK_SQL, false, QUERY)
.conditional(SUPPORT_UPSERT_BY_QUERY_PRIMARY_KEY_EXIST, true, PRIMARY_KEYS)
Expand Down

0 comments on commit 9a32db6

Please sign in to comment.