Skip to content

Commit

Permalink
pkg/pdutil(ticdc): improve pd time accuracy (#7024)
Browse files Browse the repository at this point in the history
ref #4757
  • Loading branch information
overvenus authored Sep 12, 2022
1 parent 24f7f18 commit 52a78bd
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 17 deletions.
47 changes: 30 additions & 17 deletions pkg/pdutil/clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const pdTimeUpdateInterval = 200 * time.Millisecond

// Clock is a time source of PD cluster.
type Clock interface {
// CurrentTime returns current time from PD.
// CurrentTime returns approximate current time from pd.
CurrentTime() (time.Time, error)
Run(ctx context.Context)
Stop()
Expand All @@ -41,32 +41,40 @@ type clock struct {
pdClient pd.Client
mu struct {
sync.RWMutex
timeCache time.Time
err error
// The time encoded in PD ts.
tsEventTime time.Time
// The time we receives PD ts.
tsProcessingTime time.Time
err error
}
cancel context.CancelFunc
stopCh chan struct{}
updateInterval time.Duration
cancel context.CancelFunc
stopCh chan struct{}
}

// NewClock return a new clock
func NewClock(ctx context.Context, pdClient pd.Client) (*clock, error) {
ret := &clock{
pdClient: pdClient,
stopCh: make(chan struct{}, 1),
pdClient: pdClient,
stopCh: make(chan struct{}, 1),
updateInterval: pdTimeUpdateInterval,
}
physical, _, err := pdClient.GetTS(ctx)
if err != nil {
return nil, errors.Trace(err)
}
ret.mu.timeCache = oracle.GetTimeFromTS(oracle.ComposeTS(physical, 0))
ret.mu.tsEventTime = oracle.GetTimeFromTS(oracle.ComposeTS(physical, 0))
ret.mu.tsProcessingTime = time.Now()
return ret, nil
}

// Run will get time from pd periodically to cache in timeCache
// Run gets time from pd periodically.
func (c *clock) Run(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
c.mu.Lock()
c.cancel = cancel
ticker := time.NewTicker(pdTimeUpdateInterval)
c.mu.Unlock()
ticker := time.NewTicker(c.updateInterval)
defer func() { c.stopCh <- struct{}{} }()
for {
select {
Expand All @@ -81,33 +89,38 @@ func (c *clock) Run(ctx context.Context) {
return err
}
c.mu.Lock()
c.mu.timeCache = oracle.GetTimeFromTS(oracle.ComposeTS(physical, 0))
c.mu.tsEventTime = oracle.GetTimeFromTS(oracle.ComposeTS(physical, 0))
c.mu.tsProcessingTime = time.Now()
c.mu.err = nil
c.mu.Unlock()
return nil
}, retry.WithBackoffBaseDelay(200), retry.WithMaxTries(10))
if err != nil {
log.Warn("get time from pd failed, will use local time as pd time")
c.mu.Lock()
c.mu.timeCache = time.Now()
now := time.Now()
c.mu.tsEventTime = now
c.mu.tsProcessingTime = now
c.mu.err = err
c.mu.Unlock()
}
}
}
}

// CurrentTime returns current time from timeCache
// CurrentTime returns approximate current time from pd.
func (c *clock) CurrentTime() (time.Time, error) {
c.mu.RLock()
err := c.mu.err
cacheTime := c.mu.timeCache
c.mu.RUnlock()
return cacheTime, errors.Trace(err)
defer c.mu.RUnlock()
tsEventTime := c.mu.tsEventTime
current := tsEventTime.Add(time.Since(c.mu.tsProcessingTime))
return current, errors.Trace(c.mu.err)
}

// Stop clock.
func (c *clock) Stop() {
c.mu.Lock()
defer c.mu.Unlock()
c.cancel()
<-c.stopCh
}
Expand Down
22 changes: 22 additions & 0 deletions pkg/pdutil/clock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,25 @@ func TestTimeFromPD(t *testing.T) {
// should return new time
require.NotEqual(t, t1, t2)
}

func TestEventTimeAndProcessingTime(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mockPDClient := &MockPDClient{}
clock, err := NewClock(ctx, mockPDClient)
require.NoError(t, err)

// Disable update in test by setting a very long update interval.
clock.updateInterval = time.Hour
go clock.Run(ctx)
defer clock.Stop()

sleep := time.Second
time.Sleep(sleep)
t1, err := clock.CurrentTime()
now := time.Now()
require.Nil(t, err)
require.Less(t, now.Sub(t1), sleep/2)
}

0 comments on commit 52a78bd

Please sign in to comment.