Skip to content

Commit

Permalink
fix lint
Browse files Browse the repository at this point in the history
Signed-off-by: okJiang <[email protected]>
  • Loading branch information
okJiang committed Nov 26, 2024
1 parent 8b9c654 commit d862594
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 50 deletions.
2 changes: 1 addition & 1 deletion pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (gta *GlobalTSOAllocator) UpdateTSO() error {

// SetTSO sets the physical part with given TSO.
func (gta *GlobalTSOAllocator) SetTSO(tso uint64, ignoreSmaller, skipUpperBoundCheck bool) error {
return gta.timestampOracle.resetUserTimestampInner(gta.member.GetLeadership(), tso, ignoreSmaller, skipUpperBoundCheck)
return gta.timestampOracle.resetUserTimestamp(gta.member.GetLeadership(), tso, ignoreSmaller, skipUpperBoundCheck)
}

// GenerateTSO is used to generate the given number of TSOs.
Expand Down
9 changes: 2 additions & 7 deletions pkg/tso/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,12 +240,7 @@ func (t *timestampOracle) isInitialized() bool {
// When ignoreSmaller is true, resetUserTimestamp will ignore the smaller tso resetting error and do nothing.
// It's used to write MaxTS during the Global TSO synchronization without failing the writing as much as possible.
// cannot set timestamp to one which >= current + maxResetTSGap
func (t *timestampOracle) resetUserTimestamp(ctx context.Context, leadership *election.Leadership, tso uint64, ignoreSmaller bool) error {
defer trace.StartRegion(ctx, "timestampOracle.resetUserTimestamp").End()
return t.resetUserTimestampInner(leadership, tso, ignoreSmaller, false)
}

func (t *timestampOracle) resetUserTimestampInner(leadership *election.Leadership, tso uint64, ignoreSmaller, skipUpperBoundCheck bool) error {
func (t *timestampOracle) resetUserTimestamp(leadership *election.Leadership, tso uint64, ignoreSmaller, skipUpperBoundCheck bool) error {
t.tsoMux.Lock()
defer t.tsoMux.Unlock()
if !leadership.Check() {
Expand Down Expand Up @@ -425,7 +420,7 @@ func (t *timestampOracle) getTS(ctx context.Context, leadership *election.Leader
return resp, nil
}
t.metrics.exceededMaxRetryEvent.Inc()
return resp, errs.ErrGenerateTimestamp.FastGenByArgs(fmt.Sprintf("generate global tso maximum number of retries exceeded"))
return resp, errs.ErrGenerateTimestamp.FastGenByArgs("generate global tso maximum number of retries exceeded")
}

// ResetTimestamp is used to reset the timestamp in memory.
Expand Down
38 changes: 16 additions & 22 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,10 +418,7 @@ func (c *RaftCluster) checkTSOService() {
c.UnsetServiceIndependent(constant.TSOServiceName)
}
} else {
if err := c.stopTSOJobsIfNeeded(); err != nil {
log.Error("failed to stop TSO jobs", errs.ZapError(err))
return
}
c.stopTSOJobsIfNeeded()
if !c.IsServiceIndependent(constant.TSOServiceName) {
log.Info("TSO is provided by TSO server")
c.SetServiceIndependent(constant.TSOServiceName)
Expand Down Expand Up @@ -489,23 +486,22 @@ func (c *RaftCluster) startTSOJobsIfNeeded() error {
return nil
}

func (c *RaftCluster) stopTSOJobsIfNeeded() error {
func (c *RaftCluster) stopTSOJobsIfNeeded() {
allocator := c.tsoAllocator.GetAllocator()
if allocator.IsInitialize() {
log.Info("closing the global TSO allocator")
c.tsoAllocator.ResetAllocatorGroup(true)
failpoint.Inject("updateAfterResetTSO", func() {
allocator := c.tsoAllocator.GetAllocator()
if err := allocator.UpdateTSO(); !errorspkg.Is(err, errs.ErrUpdateTimestamp) {
log.Panic("the tso update after reset should return ErrUpdateTimestamp as expected", zap.Error(err))
}
if allocator.IsInitialize() {
log.Panic("the allocator should be uninitialized after reset")
}
})
if !allocator.IsInitialize() {
return
}

return nil
log.Info("closing the global TSO allocator")
c.tsoAllocator.ResetAllocatorGroup(true)
failpoint.Inject("updateAfterResetTSO", func() {
allocator := c.tsoAllocator.GetAllocator()
if err := allocator.UpdateTSO(); !errorspkg.Is(err, errs.ErrUpdateTimestamp) {
log.Panic("the tso update after reset should return ErrUpdateTimestamp as expected", zap.Error(err))
}
if allocator.IsInitialize() {
log.Panic("the allocator should be uninitialized after reset")
}
})
}

// startGCTuner
Expand Down Expand Up @@ -851,9 +847,7 @@ func (c *RaftCluster) Stop() {
// For example, the cluster meets an error when starting, such as cluster is not bootstrapped.
// In this case, the `running` in `RaftCluster` is false, but the tso job has been started.
// Ref: https://github.com/tikv/pd/issues/8836
if err := c.stopTSOJobsIfNeeded(); err != nil {
log.Error("failed to stop tso jobs", errs.ZapError(err))
}
c.stopTSOJobsIfNeeded()
if !c.running {
c.Unlock()
return
Expand Down
21 changes: 1 addition & 20 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2505,11 +2505,8 @@ func convertAskSplitResponse(resp *schedulingpb.AskBatchSplitResponse) *pdpb.Ask
}
}

// Only used for the TestLocalAllocatorLeaderChange.
var mockLocalAllocatorLeaderChangeFlag = false

// Deprecated.
func (s *GrpcServer) SyncMaxTS(_ context.Context, request *pdpb.SyncMaxTSRequest) (*pdpb.SyncMaxTSResponse, error) {
func (*GrpcServer) SyncMaxTS(_ context.Context, request *pdpb.SyncMaxTSRequest) (*pdpb.SyncMaxTSResponse, error) {

Check failure on line 2509 in server/grpc_service.go

View workflow job for this annotation

GitHub Actions / statics

unused-parameter: parameter 'request' seems to be unused, consider removing or renaming it to match ^_ (revive)
return &pdpb.SyncMaxTSResponse{
Header: wrapHeader(),
}, nil
Expand Down Expand Up @@ -2646,22 +2643,6 @@ func (*GrpcServer) GetDCLocationInfo(_ context.Context, _ *pdpb.GetDCLocationInf
}, nil
}

// validateInternalRequest checks if server is closed, which is used to validate
// the gRPC communication between PD servers internally.
func (s *GrpcServer) validateInternalRequest(header *pdpb.RequestHeader, onlyAllowLeader bool) error {
if s.IsClosed() {
return ErrNotStarted
}
// If onlyAllowLeader is true, check whether the sender is PD leader.
if onlyAllowLeader {
leaderID := s.GetLeader().GetMemberId()
if leaderID != header.GetSenderId() {
return status.Errorf(codes.FailedPrecondition, "%s, need %d but got %d", errs.MismatchLeaderErr, leaderID, header.GetSenderId())
}
}
return nil
}

// for CDC compatibility, we need to initialize config path to `globalConfigPath`
const globalConfigPath = "/global/config/"

Expand Down

0 comments on commit d862594

Please sign in to comment.