Skip to content

Commit

Permalink
use pd clock in storage sink
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Dec 23, 2023
1 parent 43a5d0f commit 4529f89
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 29 deletions.
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func (m *SinkManager) initSinkFactory() (chan error, uint64) {
return m.sinkFactory.errors, m.sinkFactory.version
}

m.sinkFactory.f, err = factory.New(m.managerCtx, m.changefeedID, uri, cfg, m.sinkFactory.errors)
m.sinkFactory.f, err = factory.New(m.managerCtx, m.changefeedID, m.up.PDClock, uri, cfg, m.sinkFactory.errors)
if err != nil {
emitError(err)
return m.sinkFactory.errors, m.sinkFactory.version
Expand Down
6 changes: 3 additions & 3 deletions cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ import (
"github.com/pingcap/tiflow/cdc/sink/metrics"
"github.com/pingcap/tiflow/cdc/sink/tablesink/state"
"github.com/pingcap/tiflow/cdc/sink/util"
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
"github.com/pingcap/tiflow/pkg/sink/codec/builder"
Expand Down Expand Up @@ -96,6 +96,7 @@ type DMLSink struct {
// NewDMLSink creates a cloud storage sink.
func NewDMLSink(ctx context.Context,
changefeedID model.ChangeFeedID,
pdClock pdutil.Clock,
sinkURI *url.URL,
replicaConfig *config.ReplicaConfig,
errCh chan error,
Expand Down Expand Up @@ -157,11 +158,10 @@ func NewDMLSink(ctx context.Context,
// create defragmenter.
s.defragmenter = newDefragmenter(encodedCh, workerChannels)
// create a group of dml workers.
clock := clock.New()
for i := 0; i < cfg.WorkerCount; i++ {
inputCh := chann.NewAutoDrainChann[eventFragment]()
s.workers[i] = newDMLWorker(i, s.changefeedID, storage, cfg, ext,
inputCh, clock, s.statistics)
inputCh, pdClock, s.statistics)
workerChannels[i] = inputCh
}

Expand Down
21 changes: 13 additions & 8 deletions cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@ import (
"github.com/pingcap/tiflow/cdc/sink/tablesink/state"
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/util"
"github.com/stretchr/testify/require"
)

func setClock(s *DMLSink, clock clock.Clock) {
for _, w := range s.workers {
w.filePathGenerator.SetClock(clock)
w.filePathGenerator.SetClock(pdutil.NewMonotonicClock4Test(clock))
}
}

Expand Down Expand Up @@ -129,6 +130,7 @@ func TestCloudStorageWriteEventsWithoutDateSeparator(t *testing.T) {
errCh := make(chan error, 5)
s, err := NewDMLSink(ctx,
model.DefaultChangeFeedID("test"),
pdutil.NewMonotonicClock4Test(clock.New()),
sinkURI, replicaConfig, errCh)
require.Nil(t, err)
var cnt uint64 = 0
Expand Down Expand Up @@ -197,11 +199,12 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) {
replicaConfig.Sink.FileIndexWidth = util.AddressOf(6)

errCh := make(chan error, 5)
mockClock := clock.NewMock()
s, err := NewDMLSink(ctx,
model.DefaultChangeFeedID("test"), sinkURI, replicaConfig, errCh)
model.DefaultChangeFeedID("test"),
pdutil.NewMonotonicClock4Test(mockClock),
sinkURI, replicaConfig, errCh)
require.Nil(t, err)
mockClock := clock.NewMock()
setClock(s, mockClock)

var cnt uint64 = 0
batch := 100
Expand Down Expand Up @@ -272,12 +275,14 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) {
// test table is scheduled from one node to another
cnt = 0
ctx, cancel = context.WithCancel(context.Background())
s, err = NewDMLSink(ctx,
model.DefaultChangeFeedID("test"), sinkURI, replicaConfig, errCh)
require.Nil(t, err)

mockClock = clock.NewMock()
mockClock.Set(time.Date(2023, 3, 9, 0, 1, 10, 0, time.UTC))
setClock(s, mockClock)
s, err = NewDMLSink(ctx,
model.DefaultChangeFeedID("test"),
pdutil.NewMonotonicClock4Test(mockClock),
sinkURI, replicaConfig, errCh)
require.Nil(t, err)

err = s.WriteEvents(txns...)
require.Nil(t, err)
Expand Down
6 changes: 3 additions & 3 deletions cdc/sink/dmlsink/cloudstorage/dml_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/metrics"
mcloudstorage "github.com/pingcap/tiflow/cdc/sink/metrics/cloudstorage"
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -107,7 +107,7 @@ func newDMLWorker(
config *cloudstorage.Config,
extension string,
inputCh *chann.DrainableChann[eventFragment],
clock clock.Clock,
pdClock pdutil.Clock,
statistics *metrics.Statistics,
) *dmlWorker {
d := &dmlWorker{
Expand All @@ -118,7 +118,7 @@ func newDMLWorker(
inputCh: inputCh,
flushNotifyCh: make(chan dmlTask, 64),
statistics: statistics,
filePathGenerator: cloudstorage.NewFilePathGenerator(config, storage, extension, clock),
filePathGenerator: cloudstorage.NewFilePathGenerator(config, storage, extension, pdClock),
metricWriteBytes: mcloudstorage.CloudStorageWriteBytesGauge.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricFileCount: mcloudstorage.CloudStorageFileCountGauge.
Expand Down
4 changes: 3 additions & 1 deletion cdc/sink/dmlsink/cloudstorage/dml_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
Expand All @@ -52,8 +53,9 @@ func testDMLWorker(ctx context.Context, t *testing.T, dir string) *dmlWorker {

statistics := metrics.NewStatistics(ctx, model.DefaultChangeFeedID("dml-worker-test"),
sink.TxnSink)
pdlock := pdutil.NewMonotonicClock4Test(clock.New())
d := newDMLWorker(1, model.DefaultChangeFeedID("dml-worker-test"), storage,
cfg, ".json", chann.NewAutoDrainChann[eventFragment](), clock.New(), statistics)
cfg, ".json", chann.NewAutoDrainChann[eventFragment](), pdlock, statistics)
return d
}

Expand Down
4 changes: 3 additions & 1 deletion cdc/sink/dmlsink/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tiflow/cdc/sink/tablesink"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/kafka"
v2 "github.com/pingcap/tiflow/pkg/sink/kafka/v2"
Expand Down Expand Up @@ -67,6 +68,7 @@ type SinkFactory struct {
func New(
ctx context.Context,
changefeedID model.ChangeFeedID,
pdClock pdutil.Clock,
sinkURIStr string,
cfg *config.ReplicaConfig,
errCh chan error,
Expand Down Expand Up @@ -100,7 +102,7 @@ func New(
s.txnSink = mqs
s.category = CategoryMQ
case sink.S3Scheme, sink.FileScheme, sink.GCSScheme, sink.GSScheme, sink.AzblobScheme, sink.AzureScheme, sink.CloudStorageNoopScheme:
storageSink, err := cloudstorage.NewDMLSink(ctx, changefeedID, sinkURI, cfg, errCh)
storageSink, err := cloudstorage.NewDMLSink(ctx, changefeedID, pdClock, sinkURI, cfg, errCh)
if err != nil {
return nil, err
}
Expand Down
17 changes: 17 additions & 0 deletions pkg/pdutil/clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
pclock "github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
Expand Down Expand Up @@ -136,3 +137,19 @@ func (c *clock4Test) Run(ctx context.Context) {

func (c *clock4Test) Stop() {
}

type monotonicClock4Test struct {
Clock
pClock pclock.Clock
}

// NewMonotonicClock4Test return a new monotonic clock for test.
func NewMonotonicClock4Test(pClock pclock.Clock) Clock {
return &monotonicClock4Test{
pClock: pClock,
}
}

func (m *monotonicClock4Test) CurrentTime() time.Time {
return m.pClock.Now()
}
14 changes: 7 additions & 7 deletions pkg/sink/cloudstorage/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/hash"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/util"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
Expand Down Expand Up @@ -135,7 +135,7 @@ type VersionedTableName struct {
type FilePathGenerator struct {
extension string
config *Config
clock clock.Clock
pdClock pdutil.Clock
storage storage.ExternalStorage
fileIndex map[VersionedTableName]*indexWithDate

Expand All @@ -148,13 +148,13 @@ func NewFilePathGenerator(
config *Config,
storage storage.ExternalStorage,
extension string,
clock clock.Clock,
pdclock pdutil.Clock,
) *FilePathGenerator {
return &FilePathGenerator{
config: config,
extension: extension,
storage: storage,
clock: clock,
pdClock: pdclock,
fileIndex: make(map[VersionedTableName]*indexWithDate),
hasher: hash.NewPositionInertia(),
versionMap: make(map[VersionedTableName]uint64),
Expand Down Expand Up @@ -247,16 +247,16 @@ func (f *FilePathGenerator) CheckOrWriteSchema(
}

// SetClock is used for unit test
func (f *FilePathGenerator) SetClock(clock clock.Clock) {
f.clock = clock
func (f *FilePathGenerator) SetClock(pdClock pdutil.Clock) {
f.pdClock = pdClock
}

// GenerateDateStr generates a date string base on current time
// and the date-separator configuration item.
func (f *FilePathGenerator) GenerateDateStr() string {
var dateStr string

currTime := f.clock.Now()
currTime := f.pdClock.CurrentTime()
switch f.config.DateSeparator {
case config.DateSeparatorYear.String():
dateStr = currTime.Format("2006")
Expand Down
14 changes: 9 additions & 5 deletions pkg/sink/cloudstorage/path_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/util"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
Expand All @@ -49,7 +50,7 @@ func testFilePathGenerator(ctx context.Context, t *testing.T, dir string) *FileP
err = cfg.Apply(ctx, sinkURI, replicaConfig)
require.NoError(t, err)

f := NewFilePathGenerator(cfg, storage, ".json", clock.New())
f := NewFilePathGenerator(cfg, storage, ".json", pdutil.NewMonotonicClock4Test(clock.New()))
return f
}

Expand Down Expand Up @@ -84,7 +85,7 @@ func TestGenerateDataFilePath(t *testing.T) {
f = testFilePathGenerator(ctx, t, dir)
f.versionMap[table] = table.TableInfoVersion
f.config.DateSeparator = config.DateSeparatorYear.String()
f.clock = mockClock
f.SetClock(pdutil.NewMonotonicClock4Test(mockClock))
mockClock.Set(time.Date(2022, 12, 31, 23, 59, 59, 0, time.UTC))
date = f.GenerateDateStr()
path, err = f.GenerateDataFilePath(ctx, table, date)
Expand All @@ -108,7 +109,8 @@ func TestGenerateDataFilePath(t *testing.T) {
f = testFilePathGenerator(ctx, t, dir)
f.versionMap[table] = table.TableInfoVersion
f.config.DateSeparator = config.DateSeparatorMonth.String()
f.clock = mockClock
f.SetClock(pdutil.NewMonotonicClock4Test(mockClock))

mockClock.Set(time.Date(2022, 12, 31, 23, 59, 59, 0, time.UTC))
date = f.GenerateDateStr()
path, err = f.GenerateDataFilePath(ctx, table, date)
Expand All @@ -132,7 +134,8 @@ func TestGenerateDataFilePath(t *testing.T) {
f = testFilePathGenerator(ctx, t, dir)
f.versionMap[table] = table.TableInfoVersion
f.config.DateSeparator = config.DateSeparatorDay.String()
f.clock = mockClock
f.SetClock(pdutil.NewMonotonicClock4Test(mockClock))

mockClock.Set(time.Date(2022, 12, 31, 23, 59, 59, 0, time.UTC))
date = f.GenerateDateStr()
path, err = f.GenerateDataFilePath(ctx, table, date)
Expand Down Expand Up @@ -210,7 +213,8 @@ func TestGenerateDataFilePathWithIndexFile(t *testing.T) {
f := testFilePathGenerator(ctx, t, dir)
mockClock := clock.NewMock()
f.config.DateSeparator = config.DateSeparatorDay.String()
f.clock = mockClock
f.SetClock(pdutil.NewMonotonicClock4Test(mockClock))

mockClock.Set(time.Date(2023, 3, 9, 23, 59, 59, 0, time.UTC))
table := VersionedTableName{
TableNameWithPhysicTableID: model.TableName{
Expand Down

0 comments on commit 4529f89

Please sign in to comment.