Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

catalog/lease: detect if synchronous lease releases are successful #118543

Merged
merged 1 commit into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions pkg/sql/catalog/lease/descriptor_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,11 +292,18 @@ func (t *descriptorState) release(ctx context.Context, s *descriptorVersionState
t.mu.Lock()
defer t.mu.Unlock()
if l := maybeMarkRemoveStoredLease(s); l != nil {
t.mu.active.remove(s)
leaseReleased := true
// For testing, we will synchronously release leases, but that
// exposes us to the danger of the context getting cancelled. To
// eliminate this risk, we are going first remove the lease from
// storage and then delete if from mqemory.
if t.m.storage.testingKnobs.RemoveOnceDereferenced {
releaseLease(ctx, l, t.m)
leaseReleased = releaseLease(ctx, l, t.m)
l = nil
}
if leaseReleased {
t.mu.active.remove(s)
}
return l
}
return nil
Expand Down
9 changes: 6 additions & 3 deletions pkg/sql/catalog/lease/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ func acquireNodeLease(
}

// releaseLease deletes an entry from system.lease.
func releaseLease(ctx context.Context, lease *storedLease, m *Manager) {
func releaseLease(ctx context.Context, lease *storedLease, m *Manager) (released bool) {
// Force the release to happen synchronously, if we are draining or, when we
// force removals for unit tests. This didn't matter with expiration based leases
// since each renewal would have a different expiry (but the same version in
Expand All @@ -647,8 +647,9 @@ func releaseLease(ctx context.Context, lease *storedLease, m *Manager) {
// because we release only if a new version exists.
if m.IsDraining() || m.removeOnceDereferenced() {
// Release synchronously to guarantee release before exiting.
m.storage.release(ctx, m.stopper, lease)
return
// It's possible for the context to get cancelled, so return if
// it was released.
return m.storage.release(ctx, m.stopper, lease)
}

// Release to the store asynchronously, without the descriptorState lock.
Expand All @@ -663,6 +664,8 @@ func releaseLease(ctx context.Context, lease *storedLease, m *Manager) {
}); err != nil {
log.Warningf(ctx, "error: %s, not releasing lease: %q", err, lease)
}
// Asynchronous job is releasing it.
return true
}

// purgeOldVersions removes old unused descriptor versions older than
Expand Down
8 changes: 6 additions & 2 deletions pkg/sql/catalog/lease/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func (s storage) acquire(
// written a value to the database, which we'd leak if we did not delete it.
// Note that the expiration is part of the primary key in the table, so we
// would not overwrite the old entry if we just were to do another insert.
//repeatIteration = desc != nil
if (!expiration.IsEmpty() || sessionID != nil) && desc != nil {
prevExpirationTS := storedLeaseExpiration(expiration)
if err := s.writer.deleteLease(ctx, txn, leaseFields{
Expand Down Expand Up @@ -191,7 +192,6 @@ func (s storage) acquire(
log.VEventf(ctx, 2, "storage attempting to acquire lease %v@%v", desc, expiration)

ts := storedLeaseExpiration(expiration)

var isLeaseRenewal bool
var lastLeaseWasWrittenWithSessionID bool
// If there was a previous lease then determine if this a renewal and
Expand Down Expand Up @@ -254,7 +254,9 @@ func (s storage) acquire(
// read a descriptor because it can be called while modifying a
// descriptor through a schema change before the schema change has committed
// that can result in a deadlock.
func (s storage) release(ctx context.Context, stopper *stop.Stopper, lease *storedLease) {
func (s storage) release(
ctx context.Context, stopper *stop.Stopper, lease *storedLease,
) (released bool) {
ctx = multitenant.WithTenantCostControlExemption(ctx)
retryOptions := base.DefaultRetryOptions()
retryOptions.Closer = stopper.ShouldQuiesce()
Expand Down Expand Up @@ -283,13 +285,15 @@ func (s storage) release(ctx context.Context, stopper *stop.Stopper, lease *stor
}
continue
}
released = true
s.outstandingLeases.Dec(1)
if s.testingKnobs.LeaseReleasedEvent != nil {
s.testingKnobs.LeaseReleasedEvent(
lease.id, descpb.DescriptorVersion(lease.version), err)
}
break
}
return released
}

// Get the descriptor valid for the expiration time from the store.
Expand Down
Loading