Skip to content

Commit

Permalink
debezium(ticdc): cli support debezium-disable-schema (#10372)
Browse files Browse the repository at this point in the history
close #10371
  • Loading branch information
sdojjy authored Dec 29, 2023
1 parent fdef38b commit 1931873
Show file tree
Hide file tree
Showing 9 changed files with 24 additions and 3 deletions.
7 changes: 7 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,9 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
if c.Sink.AdvanceTimeoutInSec != nil {
res.Sink.AdvanceTimeoutInSec = util.AddressOf(*c.Sink.AdvanceTimeoutInSec)
}
if c.Sink.DebeziumDisableSchema != nil {
res.Sink.DebeziumDisableSchema = util.AddressOf(*c.Sink.DebeziumDisableSchema)
}

}
if c.Mounter != nil {
Expand Down Expand Up @@ -776,6 +779,9 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
if cloned.Sink.AdvanceTimeoutInSec != nil {
res.Sink.AdvanceTimeoutInSec = util.AddressOf(*cloned.Sink.AdvanceTimeoutInSec)
}
if cloned.Sink.DebeziumDisableSchema != nil {
res.Sink.DebeziumDisableSchema = util.AddressOf(*cloned.Sink.DebeziumDisableSchema)
}
}
if cloned.Consistent != nil {
res.Consistent = &ConsistentConfig{
Expand Down Expand Up @@ -950,6 +956,7 @@ type SinkConfig struct {
MySQLConfig *MySQLConfig `json:"mysql_config,omitempty"`
CloudStorageConfig *CloudStorageConfig `json:"cloud_storage_config,omitempty"`
AdvanceTimeoutInSec *uint `json:"advance_timeout,omitempty"`
DebeziumDisableSchema *bool `json:"debezium_disable_schema,omitempty"`
}

// CSVConfig denotes the csv config
Expand Down
1 change: 1 addition & 0 deletions cdc/api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ var defaultAPIConfig = &ReplicaConfig{
DeleteOnlyOutputHandleKeyColumns: util.AddressOf(false),
ContentCompatible: util.AddressOf(false),
AdvanceTimeoutInSec: util.AddressOf(uint(150)),
DebeziumDisableSchema: util.AddressOf(false),
},
Consistent: &ConsistentConfig{
Level: "none",
Expand Down
2 changes: 2 additions & 0 deletions pkg/cmd/util/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ func TestAndWriteExampleReplicaTOML(t *testing.T) {
ContentCompatible: util.AddressOf(false),
Protocol: util.AddressOf("open-protocol"),
AdvanceTimeoutInSec: util.AddressOf(uint(150)),
DebeziumDisableSchema: util.AddressOf(false),
}, cfg.Sink)
}

Expand Down Expand Up @@ -249,6 +250,7 @@ func TestAndWriteStorageSinkTOML(t *testing.T) {
DeleteOnlyOutputHandleKeyColumns: util.AddressOf(false),
ContentCompatible: util.AddressOf(false),
AdvanceTimeoutInSec: util.AddressOf(uint(150)),
DebeziumDisableSchema: util.AddressOf(false),
}, cfg.Sink)
}

Expand Down
9 changes: 6 additions & 3 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ const (
"large-message-handle-compression": "",
"claim-check-storage-uri": ""
},
"advance-timeout-in-sec": 150
"advance-timeout-in-sec": 150,
"debezium-disable-schema": false
},
"consistent": {
"level": "none",
Expand Down Expand Up @@ -318,7 +319,8 @@ const (
"file-size": 1024,
"output-column-id":false
},
"advance-timeout-in-sec": 150
"advance-timeout-in-sec": 150,
"debezium-disable-schema": false
},
"consistent": {
"level": "none",
Expand Down Expand Up @@ -479,7 +481,8 @@ const (
"file-size": 1024,
"output-column-id":false
},
"advance-timeout-in-sec": 150
"advance-timeout-in-sec": 150,
"debezium-disable-schema": false
},
"consistent": {
"level": "none",
Expand Down
1 change: 1 addition & 0 deletions pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ var defaultReplicaConfig = &ReplicaConfig{
ContentCompatible: util.AddressOf(false),
TiDBSourceID: 1,
AdvanceTimeoutInSec: util.AddressOf(DefaultAdvanceTimeoutInSec),
DebeziumDisableSchema: util.AddressOf(false),
},
Consistent: &ConsistentConfig{
Level: "none",
Expand Down
1 change: 1 addition & 0 deletions pkg/config/replica_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func TestReplicaConfigMarshal(t *testing.T) {
conf.Sink.ContentCompatible = aws.Bool(true)
conf.Sink.SafeMode = aws.Bool(true)
conf.Sink.AdvanceTimeoutInSec = util.AddressOf(uint(150))
conf.Sink.DebeziumDisableSchema = util.AddressOf(false)
conf.Sink.KafkaConfig = &KafkaConfig{
PartitionNum: aws.Int32(1),
ReplicationFactor: aws.Int16(1),
Expand Down
3 changes: 3 additions & 0 deletions pkg/orchestrator/reactor_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func TestChangefeedStateUpdate(t *testing.T) {
OnlyOutputUpdatedColumns: config.GetDefaultReplicaConfig().Sink.OnlyOutputUpdatedColumns,
DeleteOnlyOutputHandleKeyColumns: config.GetDefaultReplicaConfig().Sink.DeleteOnlyOutputHandleKeyColumns,
ContentCompatible: config.GetDefaultReplicaConfig().Sink.ContentCompatible,
DebeziumDisableSchema: config.GetDefaultReplicaConfig().Sink.DebeziumDisableSchema,
},
Consistent: config.GetDefaultReplicaConfig().Consistent,
Integrity: config.GetDefaultReplicaConfig().Integrity,
Expand Down Expand Up @@ -191,6 +192,7 @@ func TestChangefeedStateUpdate(t *testing.T) {
OnlyOutputUpdatedColumns: config.GetDefaultReplicaConfig().Sink.OnlyOutputUpdatedColumns,
DeleteOnlyOutputHandleKeyColumns: config.GetDefaultReplicaConfig().Sink.DeleteOnlyOutputHandleKeyColumns,
ContentCompatible: config.GetDefaultReplicaConfig().Sink.ContentCompatible,
DebeziumDisableSchema: config.GetDefaultReplicaConfig().Sink.DebeziumDisableSchema,
},
Scheduler: config.GetDefaultReplicaConfig().Scheduler,
Integrity: config.GetDefaultReplicaConfig().Integrity,
Expand Down Expand Up @@ -261,6 +263,7 @@ func TestChangefeedStateUpdate(t *testing.T) {
OnlyOutputUpdatedColumns: config.GetDefaultReplicaConfig().Sink.OnlyOutputUpdatedColumns,
DeleteOnlyOutputHandleKeyColumns: config.GetDefaultReplicaConfig().Sink.DeleteOnlyOutputHandleKeyColumns,
ContentCompatible: config.GetDefaultReplicaConfig().Sink.ContentCompatible,
DebeziumDisableSchema: config.GetDefaultReplicaConfig().Sink.DebeziumDisableSchema,
},
Consistent: config.GetDefaultReplicaConfig().Consistent,
Scheduler: config.GetDefaultReplicaConfig().Scheduler,
Expand Down
2 changes: 2 additions & 0 deletions tests/integration_tests/api_v2/cases.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ var customReplicaConfig = &ReplicaConfig{
EncoderConcurrency: util.AddressOf(32),
EnablePartitionSeparator: util.AddressOf(true),
ContentCompatible: util.AddressOf(true),
DebeziumDisableSchema: util.AddressOf(true),
},
Scheduler: &ChangefeedSchedulerConfig{
EnableTableAcrossNodes: false,
Expand Down Expand Up @@ -127,6 +128,7 @@ var defaultReplicaConfig = &ReplicaConfig{
EncoderConcurrency: util.AddressOf(32),
EnablePartitionSeparator: util.AddressOf(true),
ContentCompatible: util.AddressOf(false),
DebeziumDisableSchema: util.AddressOf(false),
},
Scheduler: &ChangefeedSchedulerConfig{
EnableTableAcrossNodes: false,
Expand Down
1 change: 1 addition & 0 deletions tests/integration_tests/api_v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ type SinkConfig struct {
DateSeparator string `json:"date_separator,omitempty"`
EnablePartitionSeparator *bool `json:"enable_partition_separator,omitempty"`
ContentCompatible *bool `json:"content_compatible"`
DebeziumDisableSchema *bool `json:"debezium_disable_schema,omitempty"`
}

// CSVConfig denotes the csv config
Expand Down

0 comments on commit 1931873

Please sign in to comment.