-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Postgres] Use Incremental Snapshot Framework for Postgres CDC Connector #1823
Conversation
Update:
|
@xiaom We planed to open a contributor sync meeting to discuss the 2.4 roadmap, are you interested to join? please contact me if you'd like to. |
Hey @leonardBang, thanks for the invitation! Yeah, I am interested. will DM you on Twitter. |
Does it support snapshot the new added tables? I need this function, is it works good? |
There is a separate PR for that: #1838 |
Nice, hope to merge as soon as possible. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xiaom , thanks for your work. I left some comments.
flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/Utils.java
Outdated
Show resolved
Hide resolved
...ector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresObjectFactory.java
Outdated
Show resolved
Hide resolved
...ector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresObjectFactory.java
Outdated
Show resolved
Hide resolved
...dc/src/main/java/com/ververica/cdc/connectors/postgres/source/utils/TableDiscoveryUtils.java
Outdated
Show resolved
Hide resolved
...dc/src/main/java/com/ververica/cdc/connectors/postgres/source/utils/TableDiscoveryUtils.java
Outdated
Show resolved
Hide resolved
...tgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/utils/PgQueryUtils.java
Outdated
Show resolved
Hide resolved
.../src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java
Show resolved
Hide resolved
.../src/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java
Outdated
Show resolved
Hide resolved
...rc/main/java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresStreamFetchTask.java
Show resolved
Hide resolved
.../java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java
Show resolved
Hide resolved
Hi, @xiaom. |
Hi @ruanhang1993, thanks for the review! I will find some time to update the PR either later this week or next week. |
Hi, @xiaom . Is there any update about this PR? Thanks a lot~ |
Apologize for the delay in updating the PR. I've had some unexpected personal commitments come up. |
Hey @ruanhang1993, I've addressed some comments in this commit for you to review. Let me know what you think. I have not rebased the branch yet. I will do it in the next update. Also, I'd like to point out a caveat of this feature for any potential users: its scalability with large tables is not ideal. In the snapshotting phase, backfill tasks are created to capture new data changes. However, for larger tables, since snapshotting takes longer, WAL also grows larger and backfilling tasks will take significantly more time. Contrary to MySQL, where the process can be parallelized through additional binlog readers, this isn't straightforward for Postgres. To achieve similar parallelism, we would require more replication slots, a resource that is not advisable to overuse due to its limited availability. In light of this, we implement a snapshot-only reader (with option |
Hi, @xiaom . About the problem you mentioned, the snapshot phase for the big table is actually a common pain point.
The issue #1687 for mysql aims to the usage. |
Co-Authored-By: Yaroslav Tkachenko <[email protected]>
Hey @ruanhang1993, I've rebased the PR. Also, thanks for mentioning various solutions for parallelized snapshotting. Good to know that this is a common pain point. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @xiaom . I have reviewed the cdc-base and will review pg cdc part later.
Would you mind take a look at the failed CI ? Thanks ~
@@ -358,6 +359,8 @@ private void writeTableIds(Collection<TableId> tableIds, DataOutputSerializer ou | |||
final int size = tableIds.size(); | |||
out.writeInt(size); | |||
for (TableId tableId : tableIds) { | |||
boolean useCatalogBeforeSchema = SerializerUtils.shouldUseCatalogBeforeSchema(tableId); | |||
out.writeBoolean(useCatalogBeforeSchema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will make the state in 2.3.0 not be able to be used in 2.4.0.
We should update the state serializer version and use a different logic.
flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/SerializerUtils.java
Show resolved
Hide resolved
...com/ververica/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java
Outdated
Show resolved
Hide resolved
...com/ververica/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java
Show resolved
Hide resolved
...ain/java/com/ververica/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java
Outdated
Show resolved
Hide resolved
.../java/com/ververica/cdc/connectors/base/source/meta/offset/OffsetDeserializerSerializer.java
Outdated
Show resolved
Hide resolved
flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/meta/offset/Offset.java
Outdated
Show resolved
Hide resolved
...stgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java
Show resolved
Hide resolved
...es-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/offset/PostgresOffset.java
Show resolved
Hide resolved
...es-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/offset/PostgresOffset.java
Outdated
Show resolved
Hide resolved
...ector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresObjectFactory.java
Outdated
Show resolved
Hide resolved
...ector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresObjectFactory.java
Outdated
Show resolved
Hide resolved
...es-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceBuilder.java
Show resolved
Hide resolved
...es-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceBuilder.java
Show resolved
Hide resolved
...src/main/java/com/ververica/cdc/connectors/postgres/source/config/PostgresSourceOptions.java
Outdated
Show resolved
Hide resolved
...-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/utils/PostgresTypeUtils.java
Outdated
Show resolved
Hide resolved
.../java/com/ververica/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java
Show resolved
Hide resolved
Hi, @xiaom . |
I've addressed some review feedback ( emoji "👍" marked) as part 1 0861f46. will continue the left one and fix CI later |
Co-Authored-By: Yaroslav Tkachenko [email protected]
Hey @leonardBang,
This is our first PR resolving #1163 😄
The core functionality of DataStream is implemented under the package
com.ververica.cdc.connectors.postgres.source
with a similar layout to MySQL/Oracle's incremental snapshotting implementation.source ├── PostgresChunkSplitter.java ├── PostgresConnectionPoolFactory.java ├── PostgresDialect.java ├── PostgresSourceBuilder.java ├── config │ ├── PostgresSourceConfig.java │ ├── PostgresSourceConfigFactory.java │ └── PostgresSourceOptions.java ├── fetch │ ├── PostgresScanFetchTask.java │ ├── PostgresSourceFetchTaskContext.java │ └── PostgresStreamFetchTask.java ├── offset │ ├── PostgresOffset.java │ └── PostgresOffsetFactory.java └── utils ├── PgQueryUtils.java ├── PgSchema.java ├── PgTypeUtils.java └── TableDiscoveryUtils.java
The corresponding Table API can be enabled by setting
scan.incremental.snapshot.enabled=true
.A few notes:
package
io.debezium.connector.postgresql
The package is mostly getting around the limitation of using some Debezium classes directly.
Utils.java
: a utility class to access some package-private methods of DebeziumPostgresObjectFactory.java
: a factory to create various Debezium object constructor which needs package private access.java
: copied from Debezium 1.6.4-final and modified to support injecting connection factory with Hikari connection pool.Major changes to CDC-base
List<String> schemaList
and make it compatible with PostgreSQL and Debezium’s terminology. (see Debezium’s TableId).useCatalogBeforeSchema
flag (true by default)Other changes not related to PostgreSQL
*ScanFetchTask
Notes on Approximate Count query for Postgres
We use the following query in chunk splitter to estimate the approximate count of rows
The query requires a prior run of
VACCUM
orANALYZE
to get a good estimation. For any PostgreSQL instances with autovacuum on, you won’t need to worry about it.We are also actively working on supporting the
scan.newly-added-table.enabled
feature as @sap1ens mentioned here.Appreciate any feedback!