- Timestamp is when the record is read by Kafka, not when it is generated.
- Everything is done in the initial database. Can logical replication span databases or is the replication slot to a single database?
The PostgreSql uses the Logical Decoding feature in PostgreSql 9.x. This will allow this connector to work with Amazon RDS for PostgreSQL.
name=connector1
tasks.max=1
connector.class=com.github.jcustenborder.kafka.connect.cdc.postgres.PostgreSqlSourceConnector
# Set these required values
initial.database=
server.name=
postgres.replication.slot.name=
password=
server.port=
username=
Name | Description | Type | Default | Valid Values | Importance |
---|---|---|---|---|---|
initial.database | The initial database to connect to. | string | high | ||
password | JDBC Password to connect to the database with. | password | high | ||
postgres.replication.slot.name | The PostgreSQL replication slot name to connect to. | string | high | ||
server.name | The server to connect to. | string | high | ||
server.port | The port on the server to connect to. | int | high | ||
username | JDBC Username to connect to the database with. | string | high | ||
schema.key.name.format | Format used to generate the name for the key schema. The following template properties are available for string replacement. ${databaseName} , ${schemaName} , ${tableName} , ${namespace}
|
string | ${namespace}.${tableName}Key | high | |
schema.namespace.format | The namespace for the schemas generated by the connector. The following template properties are available for string replacement. ${databaseName} , ${schemaName} , ${tableName} , ${namespace}
|
string | com.example.data.${databaseName} | high | |
schema.value.name.format | Format used to generate the name for the value schema. The following template properties are available for string replacement. ${databaseName} , ${schemaName} , ${tableName} , ${namespace}
|
string | ${namespace}.${tableName}Value | high | |
topicFormat.format | The topicFormat to write the data to. | string | ${databaseName}.${tableName} | high | |
jdbc.pool.max.idle | The maximum number of idle CONNECTIONS in the connection pool. | int | 10 | medium | |
jdbc.pool.max.total | The maximum number of CONNECTIONS for the connection pool to open. If a number greater than this value is requested, the caller will block waiting for a connection to be returned. | int | 30 | medium | |
jdbc.pool.min.idle | The minimum number of idle CONNECTIONS in the connection pool. | int | 3 | medium | |
backoff.time.ms | The number of milliseconds to wait when no records are returned. | int | 1000 | [50,...] | low |
batch.size | The number of records to return in a batch. | int | 512 | [1,...] | low |
schema.cache.ms | The number of milliseconds to cache schema metadata in memory. | int | 300000 | [60000,...] | low |
schema.caseformat.column.name | This setting is used to control how the column names are cased when the resulting schemas are generated. | string | NONE | ValidEnum{enum=CaseFormat, allowed=[LOWER_HYPHEN, LOWER_UNDERSCORE, LOWER_CAMEL, LOWER, UPPER_CAMEL, UPPER_UNDERSCORE, UPPER, NONE]} | low |
schema.caseformat.database.name | This setting is used to control how the ${databaseName} variable is cased when it is passed to the formatters defined in the schema.namespace.format , schema.key.name.format , schema.value.name.format , topicFormat.format settings. This allows you to control the naming applied to these properties. For example this can be used to take a database name of USER_TRACKING to a more java like case of userTracking or all lowercase usertracking . |
string | NONE | ValidEnum{enum=CaseFormat, allowed=[LOWER_HYPHEN, LOWER_UNDERSCORE, LOWER_CAMEL, LOWER, UPPER_CAMEL, UPPER_UNDERSCORE, UPPER, NONE]} | low |
schema.caseformat.input | The naming convention used by the database format. This is used to define the source naming convention used by the other schema.caseformat.* properties. |
string | UPPER_UNDERSCORE | ValidEnum{enum=CaseFormat, allowed=[LOWER_HYPHEN, LOWER_UNDERSCORE, LOWER_CAMEL, UPPER_CAMEL, UPPER_UNDERSCORE]} | low |
schema.caseformat.schema.name | This setting is used to control how the ${schemaName} variable is cased when it is passed to the formatters defined in the schema.namespace.format , schema.key.name.format , schema.value.name.format , topicFormat.format settings. This allows you to control the naming applied to these properties. For example this can be used to take a schema name of SCOTT to a more java like case of Scott or all lowercase scott . |
string | NONE | ValidEnum{enum=CaseFormat, allowed=[LOWER_HYPHEN, LOWER_UNDERSCORE, LOWER_CAMEL, LOWER, UPPER_CAMEL, UPPER_UNDERSCORE, UPPER, NONE]} | low |
schema.caseformat.table.name | This setting is used to control how the ${tableName} variable is cased when it is passed to the formatters defined in the schema.namespace.format , schema.key.name.format , schema.value.name.format , topicFormat.format settings. This allows you to control the naming applied to these properties. For example this can be used to take a table name of USER_SETTING to a more java like case of UserSetting or all lowercase usersetting . |
string | NONE | ValidEnum{enum=CaseFormat, allowed=[LOWER_HYPHEN, LOWER_UNDERSCORE, LOWER_CAMEL, LOWER, UPPER_CAMEL, UPPER_UNDERSCORE, UPPER, NONE]} | low |