Skip to content

Commit

Permalink
sql/catalog/lease: switch lease manager to use KVs
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
ajwerner committed Nov 4, 2022
1 parent 3e6dca9 commit 1c11f2c
Show file tree
Hide file tree
Showing 5 changed files with 378 additions and 13 deletions.
9 changes: 8 additions & 1 deletion pkg/sql/catalog/lease/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ go_library(
"descriptor_set.go",
"descriptor_state.go",
"descriptor_version_state.go",
"ie_writer.go",
"kv_writer.go",
"lease.go",
"lease_test_utils.go",
"name_cache.go",
Expand All @@ -28,11 +28,14 @@ go_library(
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/catalog",
"//pkg/sql/catalog/bootstrap",
"//pkg/sql/catalog/catalogkeys",
"//pkg/sql/catalog/descbuilder",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/internal/catkv",
"//pkg/sql/catalog/nstree",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/sem/tree",
Expand Down Expand Up @@ -61,6 +64,8 @@ go_test(
size = "large",
srcs = [
"helpers_test.go",
"ie_writer_test.go",
"kv_writer_test.go",
"lease_internal_test.go",
"lease_test.go",
"main_test.go",
Expand Down Expand Up @@ -88,6 +93,7 @@ go_test(
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/desctestutils",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/rowenc/keyside",
Expand All @@ -97,6 +103,7 @@ go_test(
"//pkg/sql/sqlutil",
"//pkg/sql/tests",
"//pkg/sql/types",
"//pkg/storage",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,44 +12,57 @@ package lease

import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/errors"
)

type ieWriter struct {
ie sqlutil.InternalExecutor
insertQuery string
deleteQuery string
ie sqlutil.InternalExecutor
}

func (w *ieWriter) deleteLease(ctx context.Context, txn *kv.Txn, l leaseFields) error {
const deleteLease = `
DELETE FROM system.public.lease
func newInternalExecutorWriter(ie sqlutil.InternalExecutor, tableName string) *ieWriter {
const (
deleteLease = `
DELETE FROM %s
WHERE ("descID", version, "nodeID", expiration)
= ($1, $2, $3, $4);`
insertLease = `
INSERT
INTO %s ("descID", version, "nodeID", expiration)
VALUES ($1, $2, $3, $4)`
)
return &ieWriter{
ie: ie,
insertQuery: fmt.Sprintf(insertLease, tableName),
deleteQuery: fmt.Sprintf(deleteLease, tableName),
}
}

func (w *ieWriter) deleteLease(ctx context.Context, txn *kv.Txn, l leaseFields) error {
_, err := w.ie.Exec(
ctx,
"lease-release",
nil, /* txn */
deleteLease,
w.deleteQuery,
l.descID, l.version, l.instanceID, &l.expiration,
)
return err
}

func (w *ieWriter) insertLease(ctx context.Context, txn *kv.Txn, l leaseFields) error {
const insertLease = `
INSERT
INTO system.public.lease ("descID", version, "nodeID", expiration)
VALUES ($1, $2, $3, $4)`
count, err := w.ie.Exec(ctx, "lease-insert", txn, insertLease,
count, err := w.ie.Exec(ctx, "lease-insert", txn, w.insertQuery,
l.descID, l.version, l.instanceID, &l.expiration,
)
if err != nil {
return err
}
if count != 1 {
return errors.Errorf("%s: expected 1 result, found %d", insertLease, count)
return errors.Errorf("%s: expected 1 result, found %d", w.insertQuery, count)
}
return nil
}
Expand Down
89 changes: 89 additions & 0 deletions pkg/sql/catalog/lease/kv_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package lease

import (
"context"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/errors"
)

// kvWriter implements writer using the raw KV API.
type kvWriter struct {
db *kv.DB
w bootstrap.KVWriter
}

func newKVWriter(codec keys.SQLCodec, db *kv.DB, id descpb.ID) *kvWriter {
return &kvWriter{
db: db,
w: bootstrap.MakeKVWriter(codec, leaseTableWithID(id)),
}
}

func leaseTableWithID(id descpb.ID) catalog.TableDescriptor {
if id == keys.LeaseTableID {
return systemschema.LeaseTable
}
// Custom IDs are only used for testing.
mut := systemschema.LeaseTable.NewBuilder().
BuildExistingMutable().(*tabledesc.Mutable)
mut.ID = id
return mut.ImmutableCopy().(catalog.TableDescriptor)
}

func (w *kvWriter) insertLease(ctx context.Context, txn *kv.Txn, l leaseFields) error {
return w.do(ctx, txn, l, w.w.Insert)
}

func (w *kvWriter) deleteLease(ctx context.Context, txn *kv.Txn, l leaseFields) error {
return w.do(ctx, txn, l, w.w.Delete)
}

type addToBatchFunc = func(*kv.Batch, ...tree.Datum) error

func (w *kvWriter) do(ctx context.Context, txn *kv.Txn, l leaseFields, f addToBatchFunc) error {
run := (*kv.Txn).Run
do := func(ctx context.Context, txn *kv.Txn) error {
b, err := newBatch(txn, l, f)
if err != nil {
return err
}
return run(txn, ctx, b)
}
if txn != nil {
return do(ctx, txn)
}
run = (*kv.Txn).CommitInBatch
return w.db.Txn(ctx, do)
}

func newBatch(txn *kv.Txn, l leaseFields, f addToBatchFunc) (*kv.Batch, error) {
entries := [...]tree.Datum{
tree.NewDInt(tree.DInt(l.descID)),
tree.NewDInt(tree.DInt(l.version)),
tree.NewDInt(tree.DInt(l.instanceID)),
&l.expiration,
}
b := txn.NewBatch()
if err := f(b, entries[:]...); err != nil {
return nil, errors.NewAssertionErrorWithWrappedErrf(err, "failed to encode lease entry")
}
return b, nil
}
Loading

0 comments on commit 1c11f2c

Please sign in to comment.