Skip to content

Commit

Permalink
[Improve][Connector-V2] Optimize sqlserver package structure (#7715)
Browse files Browse the repository at this point in the history
  • Loading branch information
corgy-w authored Sep 23, 2024
1 parent bd2b7c5 commit 9720f11
Show file tree
Hide file tree
Showing 21 changed files with 60 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.config;
package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.config;

import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.config.StartupConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.config;
package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.config;

import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfigFactory;
import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source;
package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source;

import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
Expand All @@ -27,14 +27,15 @@
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.cdc.base.utils.CatalogTableUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.config.SqlServerSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.config.SqlServerSourceConfigFactory;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.enumerator.SqlServerChunkSplitter;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.reader.fetch.SqlServerSourceFetchTaskContext;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.reader.fetch.scan.SqlServerSnapshotFetchTask;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.reader.fetch.transactionlog.SqlServerTransactionLogFetchTask;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerSchema;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.TableDiscoveryUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.config.SqlServerSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.config.SqlServerSourceConfigFactory;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.enumerator.SqlServerChunkSplitter;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.reader.fetch.SqlServerSourceFetchTaskContext;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.reader.fetch.scan.SqlServerSnapshotFetchTask;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.reader.fetch.transactionlog.SqlServerTransactionLogFetchTask;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.utils.SqlServerConnectionUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.utils.SqlServerSchema;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.utils.TableDiscoveryUtils;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;

import io.debezium.jdbc.JdbcConnection;
Expand All @@ -46,8 +47,6 @@
import java.util.Map;
import java.util.Optional;

import static org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerConnectionUtils.createSqlServerConnection;

/** The {@link JdbcDataSourceDialect} implementation for MySQL datasource. */
public class SqlServerDialect implements JdbcDataSourceDialect {

Expand Down Expand Up @@ -76,7 +75,8 @@ public boolean isDataCollectionIdCaseSensitive(JdbcSourceConfig sourceConfig) {

@Override
public JdbcConnection openJdbcConnection(JdbcSourceConfig sourceConfig) {
return createSqlServerConnection(sourceConfig.getDbzConfiguration());
return SqlServerConnectionUtils.createSqlServerConnection(
sourceConfig.getDbzConfiguration());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source;
package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
Expand All @@ -36,8 +36,8 @@
import org.apache.seatunnel.connectors.cdc.debezium.DeserializeFormat;
import org.apache.seatunnel.connectors.cdc.debezium.row.DebeziumJsonDeserializeSchema;
import org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.config.SqlServerSourceConfigFactory;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.offset.LsnOffsetFactory;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.config.SqlServerSourceConfigFactory;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.offset.LsnOffsetFactory;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver.SqlServerURLParser;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source;
package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source;
package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source;

import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.SingleChoiceOption;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.enumerator;
package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.enumerator;

import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.AbstractJdbcSourceChunkSplitter;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerTypeUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.utils.SqlServerTypeUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.utils.SqlServerUtils;

import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.offset;
package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.offset;

import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.offset;
package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.offset;

import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.config.SqlServerSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.config.SqlServerSourceConfigFactory;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.SqlServerDialect;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.config.SqlServerSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.config.SqlServerSourceConfigFactory;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.SqlServerDialect;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.utils.SqlServerUtils;

import io.debezium.connector.sqlserver.SourceInfo;
import io.debezium.connector.sqlserver.SqlServerConnection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.reader.fetch;
package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.reader.fetch;

import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
Expand All @@ -26,9 +26,10 @@
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.config.SqlServerSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.offset.LsnOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.config.SqlServerSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.offset.LsnOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.utils.SqlServerConnectionUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.utils.SqlServerUtils;

import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
Expand Down Expand Up @@ -66,8 +67,6 @@
import java.util.List;
import java.util.Map;

import static org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerConnectionUtils.createSqlServerConnection;

/** The context for fetch task that fetching data of snapshot split from MySQL data source. */
@Slf4j
public class SqlServerSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
Expand All @@ -92,7 +91,9 @@ public SqlServerSourceFetchTaskContext(
SqlServerSourceConfig sourceConfig, JdbcDataSourceDialect dataSourceDialect) {
super(sourceConfig, dataSourceDialect);

this.dataConnection = createSqlServerConnection(sourceConfig.getDbzConfiguration());
this.dataConnection =
SqlServerConnectionUtils.createSqlServerConnection(
sourceConfig.getDbzConfiguration());
this.metadataProvider = new SqlServerEventMetadataProvider();
}

Expand Down Expand Up @@ -175,7 +176,8 @@ private void initMetadataConnection() {
synchronized (this) {
if (this.metadataConnection == null) {
this.metadataConnection =
createSqlServerConnection(sourceConfig.getDbzConfiguration());
SqlServerConnectionUtils.createSqlServerConnection(
sourceConfig.getDbzConfiguration());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.reader.fetch.scan;
package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.reader.fetch.scan;

import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.offset.LsnOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.offset.LsnOffset;

import io.debezium.pipeline.source.spi.ChangeEventSource;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.reader.fetch.scan;
package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.reader.fetch.scan;

import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkKind;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.reader.fetch.SqlServerSourceFetchTaskContext;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.reader.fetch.transactionlog.SqlServerTransactionLogFetchTask;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.reader.fetch.SqlServerSourceFetchTaskContext;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.reader.fetch.transactionlog.SqlServerTransactionLogFetchTask;

import io.debezium.config.Configuration;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.reader.fetch.scan;
package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.reader.fetch.scan;

import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkKind;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.offset.LsnOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.offset.LsnOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.utils.SqlServerUtils;

import org.apache.kafka.connect.errors.ConnectException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.reader.fetch.transactionlog;
package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.reader.fetch.transactionlog;

import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkKind;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.offset.LsnOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.reader.fetch.SqlServerSourceFetchTaskContext;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.reader.fetch.scan.SqlServerSnapshotFetchTask;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.offset.LsnOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.reader.fetch.SqlServerSourceFetchTaskContext;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.reader.fetch.scan.SqlServerSnapshotFetchTask;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -42,8 +42,8 @@

import java.util.Map;

import static org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.offset.LsnOffset.NO_STOPPING_OFFSET;
import static org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerUtils.getLsnPosition;
import static org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.offset.LsnOffset.NO_STOPPING_OFFSET;
import static org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.utils.SqlServerUtils.getLsnPosition;

public class SqlServerTransactionLogFetchTask implements FetchTask<SourceSplitBase> {
private final IncrementalSplit split;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils;
package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.utils;

import io.debezium.config.Configuration;
import io.debezium.connector.sqlserver.SqlServerConnection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils;
package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.utils;

import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.common.utils.SeaTunnelException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils;
package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.utils;

import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils;
package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.utils;

import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.offset.LsnOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.offset.LsnOffset;

import org.apache.kafka.connect.source.SourceRecord;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils;
package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.utils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source;
package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils;
package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.utils;

import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
Expand Down

0 comments on commit 9720f11

Please sign in to comment.