Skip to content

Commit

Permalink
Merge branch 'master' into fix-pd-reset-by-peer
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand authored Sep 16, 2022
2 parents be3824b + 08ac627 commit 6636c3f
Show file tree
Hide file tree
Showing 94 changed files with 1,058 additions and 1,112 deletions.
8 changes: 6 additions & 2 deletions cdc/puller/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (p *ddlJobPullerImpl) handleRenameTables(job *timodel.Job) (skip bool, err
// we skip a rename table ddl only when its old table name and new table name are both filtered.
skip := p.filter.ShouldDiscardDDL(job.Type, oldSchemaNames[i].O, table.Name.O)
if skip {
// if a table should be skip by its old name and its new name is in filter rule, return error.
// if a table should be skipped by its old name and its new name is in filter rule, return error.
if !p.filter.ShouldDiscardDDL(job.Type, schema.Name.O, newTableNames[i].O) {
return true, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(tableInfo.ID, job.Query)
}
Expand Down Expand Up @@ -322,6 +322,10 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {

// Do this first to fill the schema name to its origin schema name.
if err := p.schemaSnapshot.FillSchemaName(job); err != nil {
// If we can't find a job's schema, check if it's been filtered.
if p.filter.ShouldIgnoreTable(job.SchemaName, job.TableName) {
return true, nil
}
return true, errors.Trace(err)
}

Expand Down Expand Up @@ -358,7 +362,7 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
} else {
// 2. If we can find the preTableInfo, we filter it by the old table name.
skip = p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, oldTable.Name.O)
// 3. If it old table name is not in filter rule, and it new table name in filter rule, return error.
// 3. If its old table name is not in filter rule, and its new table name in filter rule, return error.
if skip && !p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O) {
return true, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(job.TableID, job.Query)
}
Expand Down
11 changes: 11 additions & 0 deletions cdc/puller/ddl_puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,11 @@ func TestHandleJob(t *testing.T) {
skip, err = ddlJobPullerImpl.handleJob(job)
require.NoError(t, err)
require.True(t, skip)

job = helper.DDL2Job("create database test3")
skip, err = ddlJobPullerImpl.handleJob(job)
require.NoError(t, err)
require.True(t, skip)
}

// test drop databases
Expand Down Expand Up @@ -378,6 +383,12 @@ func TestHandleJob(t *testing.T) {
skip, err = ddlJobPullerImpl.handleJob(job)
require.NoError(t, err)
require.True(t, skip)

// make sure no schema not found error
job = helper.DDL2Job("create table test3.t1(id int) partition by range(id) (partition p0 values less than (10))")
skip, err = ddlJobPullerImpl.handleJob(job)
require.NoError(t, err)
require.True(t, skip)
}

// test drop table
Expand Down
2 changes: 1 addition & 1 deletion cdc/redo/writer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ const (
// It should be a multiple of the minimum sector size so that log can safely
// distinguish between torn writes and ordinary data corruption.
pageBytes = 8 * common.MinSectorSize
defaultS3Timeout = 3 * time.Second
defaultS3Timeout = 15 * time.Second
)

var (
Expand Down
40 changes: 0 additions & 40 deletions cdc/sink/mq/manager/pulsar_manager.go

This file was deleted.

48 changes: 0 additions & 48 deletions cdc/sink/mq/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/pingcap/tiflow/cdc/sink/mq/manager"
"github.com/pingcap/tiflow/cdc/sink/mq/producer"
"github.com/pingcap/tiflow/cdc/sink/mq/producer/kafka"
"github.com/pingcap/tiflow/cdc/sink/mq/producer/pulsar"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
Expand Down Expand Up @@ -472,50 +471,3 @@ func NewKafkaSaramaSink(ctx context.Context, sinkURI *url.URL,
}
return sink, nil
}

// NewPulsarSink creates a new Pulsar mqSink.
func NewPulsarSink(ctx context.Context, sinkURI *url.URL,
replicaConfig *config.ReplicaConfig, errCh chan error,
) (*mqSink, error) {
log.Warn("Pulsar Sink is not recommended for production use.")
s := sinkURI.Query().Get(config.ProtocolKey)
if s != "" {
replicaConfig.Sink.Protocol = s
}

var protocol config.Protocol
if err := protocol.FromString(replicaConfig.Sink.Protocol); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}

encoderConfig := common.NewConfig(protocol)
if err := encoderConfig.Apply(sinkURI, replicaConfig); err != nil {
return nil, errors.Trace(err)
}
// todo: set by pulsar producer's `max.message.bytes`
// encoderConfig = encoderConfig.WithMaxMessageBytes()
if err := encoderConfig.Validate(); err != nil {
return nil, errors.Trace(err)
}

producer, err := pulsar.NewProducer(sinkURI, errCh)
if err != nil {
return nil, errors.Trace(err)
}
fakeTopicManager := manager.NewPulsarTopicManager(
producer.GetPartitionNum(),
)
sink, err := newMqSink(
ctx,
fakeTopicManager,
producer,
"",
replicaConfig,
encoderConfig,
errCh,
)
if err != nil {
return nil, errors.Trace(err)
}
return sink, nil
}
32 changes: 0 additions & 32 deletions cdc/sink/mq/mq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"github.com/Shopify/sarama"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/codec/open"
Expand Down Expand Up @@ -165,37 +164,6 @@ func TestKafkaSink(t *testing.T) {
}
}

func TestPulsarSinkEncoderConfig(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

err := failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/mq/producer/pulsar/MockPulsar",
"return(true)")
require.Nil(t, err)

uri := "pulsar://127.0.0.1:1234/kafka-test?" +
"max-message-bytes=4194304&max-batch-size=1&protocol=open-protocol"

sinkURI, err := url.Parse(uri)
require.Nil(t, err)
replicaConfig := config.GetDefaultReplicaConfig()
errCh := make(chan error, 1)

sink, err := NewPulsarSink(ctx, sinkURI, replicaConfig, errCh)
require.Nil(t, err)

encoder := sink.encoderBuilder.Build()
require.IsType(t, &open.BatchEncoder{}, encoder)
require.Equal(t, 1, encoder.(*open.BatchEncoder).MaxBatchSize)
require.Equal(t, 4194304, encoder.(*open.BatchEncoder).MaxMessageBytes)

// FIXME: mock pulsar client doesn't support close,
// so we can't call sink.Close() to close it.
// We will leak goroutine if we don't close it.
cancel()
sink.flushWorker.close()
sink.resolvedBuffer.Close()
}

func TestFlushRowChangedEvents(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

Expand Down
32 changes: 0 additions & 32 deletions cdc/sink/mq/producer/pulsar/doc.go

This file was deleted.

Loading

0 comments on commit 6636c3f

Please sign in to comment.