Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
55794: backupccl: optimize spans selected for backup and ts protection r=pbardea a=adityamaru

Previously, to account for interleaved tables, backup would generate
spans for every table index, which would then be used by the protected
ts record. Recently we saw a couple of instances where this was
resulting in a very large number of spans being generated during backup.
A direct consequence of this is that the record can exceed the default
size of the limits of the protected timestamp subsystem.

In this new scheme we attempt to merge spans using the following
rules:

- Contiguous index spans are merged.
- Two non-contiguous index spans are merged if a scan request for the index
IDs between them does not return any results.

Egs: {/Table/51/1 - /Table/51/2}, {/Table/51/3 - /Table/51/4} => {/Table/51/1 - /Table/51/4}
provided the dropped index represented by the span
{/Table/51/2 - /Table/51/3} has been gc'ed.

The resultant merged spans are what we BACKUP and what we protect from
gc using a protected ts record.

Informs: #54747

Release note: None

56298: pgwire: rework DecodeOidDatum to DecodeDatum to parse OidFamily types r=rafiss a=otan

Resolves #56193 

pgwire: rework DecodeOidDatum to DecodeDatum to parse OidFamily types

Reworked DecodeOidDatum to DecodeDatum to take in a type, which
encodes additional useful information necessary for ENUMs and oid
family types.

Release note (bug fix): Fixed a bug where reg* types were not parsed
properly over pgwire, COPY or prepared statements.

tree: create ParseDOid method

* Move ParseDOid and associated methods to new function.
* Move datum_test.go to datum_integration_test.go, as it does not import
  datum.go tests.
* Move some of datum_invariants_test.go out into datum_test.go.
* Created new datum_test.go with pure unit tests.

Release note: None



56920: importccl: Add DROP TABLE [IF EXISTS] support for import pgdump. r=adityamaru a=mokaixu

Previously, whenever a DROP TABLE statement was parsed, an
error was thrown and the import would fail since DROP TABLE
statements were not supported.

Now, when we encounter a DROP TABLE statement for a target table
foo, if foo exists, then we throw an error indicating to the user
to drop the table foo. Otherwise, if foo does not exist, we silently
ignore the DROP statement and proceed with the pgdump import.

Resolves: #53112

Release note: None

57004: geos: link to docs if GEOS is not installed r=rytaft,sumeerbhola a=otan

Release note (sql change): Introduce a hint when GEOS is improperly
installed to the docs instructions on installing CockroachDB.

Co-authored-by: Aditya Maru <[email protected]>
Co-authored-by: Oliver Tan <[email protected]>
Co-authored-by: Monica Xu <[email protected]>
  • Loading branch information
4 people committed Nov 23, 2020
5 parents 22da098 + 5156cf0 + 6f55d8b + ea381b0 + 0ab6c8a commit c2b4fff
Show file tree
Hide file tree
Showing 24 changed files with 2,278 additions and 1,603 deletions.
205 changes: 190 additions & 15 deletions pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
cryptorand "crypto/rand"
"fmt"
"net/url"
"sort"
"strconv"

"github.com/cockroachdb/cockroach/pkg/base"
Expand Down Expand Up @@ -146,20 +147,176 @@ func (e *encryptedDataKeyMap) rangeOverMap(fn func(masterKeyID hashedMasterKeyID
}
}

type sortedIndexIDs []descpb.IndexID

func (s sortedIndexIDs) Less(i, j int) bool {
return s[i] < s[j]
}

func (s sortedIndexIDs) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

func (s sortedIndexIDs) Len() int {
return len(s)
}

// getLogicallyMergedTableSpans returns all the non-drop index spans of the
// provided table but after merging them so as to minimize the number of spans
// generated. The following rules are used to logically merge the sorted set of
// non-drop index spans:
// - Contiguous index spans are merged.
// - Two non-contiguous index spans are merged if a scan request for the index
// IDs between them does not return any results.
//
// Egs: {/Table/51/1 - /Table/51/2}, {/Table/51/3 - /Table/51/4} => {/Table/51/1 - /Table/51/4}
// provided the dropped index represented by the span
// {/Table/51/2 - /Table/51/3} has been gc'ed.
func getLogicallyMergedTableSpans(
table catalog.TableDescriptor,
added map[tableAndIndex]bool,
codec keys.SQLCodec,
endTime hlc.Timestamp,
checkForKVInBounds func(start, end roachpb.Key, endTime hlc.Timestamp) (bool, error),
) ([]roachpb.Span, error) {
var nonDropIndexIDs []descpb.IndexID
if err := table.ForeachNonDropIndex(func(idxDesc *descpb.IndexDescriptor) error {
key := tableAndIndex{tableID: table.GetID(), indexID: idxDesc.ID}
if added[key] {
return nil
}
added[key] = true
nonDropIndexIDs = append(nonDropIndexIDs, idxDesc.ID)
return nil
}); err != nil {
return nil, err
}

if len(nonDropIndexIDs) == 0 {
return nil, nil
}

// There is no merging possible with only a single index, short circuit.
if len(nonDropIndexIDs) == 1 {
return []roachpb.Span{table.IndexSpan(codec, nonDropIndexIDs[0])}, nil
}

sort.Sort(sortedIndexIDs(nonDropIndexIDs))

var mergedIndexSpans []roachpb.Span

// mergedSpan starts off as the first span in the set of spans being
// considered for a logical merge.
// The logical span merge algorithm walks over the table's non drop indexes
// using an lhsSpan and rhsSpan (always offset by 1). It checks all index IDs
// between lhsSpan and rhsSpan to look for dropped but non-gced KVs. The
// existence of such a KV indicates that the rhsSpan cannot be included in the
// current set of spans being logically merged, and so we update the
// mergedSpan to encompass the lhsSpan as that is the furthest we can go.
// After recording the new "merged" span, we update mergedSpan to be the
// rhsSpan, and start processing the next logically mergeable span set.
mergedSpan := table.IndexSpan(codec, nonDropIndexIDs[0])
for curIndex := 0; curIndex < len(nonDropIndexIDs)-1; curIndex++ {
lhsIndexID := nonDropIndexIDs[curIndex]
rhsIndexID := nonDropIndexIDs[curIndex+1]

lhsSpan := table.IndexSpan(codec, lhsIndexID)
rhsSpan := table.IndexSpan(codec, rhsIndexID)

lhsIndex, err := table.FindIndexByID(lhsIndexID)
if err != nil {
return nil, err
}
rhsIndex, err := table.FindIndexByID(rhsIndexID)
if err != nil {
return nil, err
}

// If either the lhs or rhs is an interleaved index, we do not attempt to
// perform a logical merge of the spans because the index span for
// interleaved contains the tableID/indexID of the furthest ancestor in
// the interleaved chain.
if lhsIndex.IsInterleaved() || rhsIndex.IsInterleaved() {
mergedIndexSpans = append(mergedIndexSpans, mergedSpan)
mergedSpan = rhsSpan
} else {
var foundDroppedKV bool
// Iterate over all index IDs between the two candidates (lhs and rhs)
// which may be logically merged. These index IDs represent dropped
// indexes between the two non-drop index spans.
for i := lhsIndexID + 1; i < rhsIndexID; i++ {
// If we find an index which has been dropped but not gc'ed, we cannot
// merge the lhs and rhs spans.
foundDroppedKV, err = checkForKVInBounds(lhsSpan.EndKey, rhsSpan.Key, endTime)
if err != nil {
return nil, err
}
if foundDroppedKV {
mergedSpan.EndKey = lhsSpan.EndKey
mergedIndexSpans = append(mergedIndexSpans, mergedSpan)
mergedSpan = rhsSpan
break
}
}
}

// The loop will terminate after this iteration and so we must update the
// current mergedSpan to encompass the last element in the nonDropIndexIDs
// slice as well.
if curIndex == len(nonDropIndexIDs)-2 {
mergedSpan.EndKey = rhsSpan.EndKey
mergedIndexSpans = append(mergedIndexSpans, mergedSpan)
}
}

return mergedIndexSpans, nil
}

// spansForAllTableIndexes returns non-overlapping spans for every index and
// table passed in. They would normally overlap if any of them are interleaved.
// The outputted spans are merged as described by the method
// getLogicallyMergedTableSpans, so as to optimize the size/number of the spans
// we BACKUP and lay protected ts records for.
func spansForAllTableIndexes(
codec keys.SQLCodec, tables []catalog.TableDescriptor, revs []BackupManifest_DescriptorRevision,
) []roachpb.Span {
ctx context.Context,
execCfg *sql.ExecutorConfig,
endTime hlc.Timestamp,
tables []catalog.TableDescriptor,
revs []BackupManifest_DescriptorRevision,
) ([]roachpb.Span, error) {

added := make(map[tableAndIndex]bool, len(tables))
sstIntervalTree := interval.NewTree(interval.ExclusiveOverlapper)
var mergedIndexSpans []roachpb.Span
var err error

// checkForKVInBounds issues a scan request between start and end at endTime,
// and returns true if a non-nil result is returned.
checkForKVInBounds := func(start, end roachpb.Key, endTime hlc.Timestamp) (bool, error) {
var foundKV bool
err := execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetFixedTimestamp(ctx, endTime)
res, err := txn.Scan(ctx, start, end, 1 /* maxRows */)
if err != nil {
return err
}
foundKV = len(res) != 0
return nil
})
return foundKV, err
}

for _, table := range tables {
for _, index := range table.AllNonDropIndexes() {
if err := sstIntervalTree.Insert(intervalSpan(table.IndexSpan(codec, index.ID)), false); err != nil {
mergedIndexSpans, err = getLogicallyMergedTableSpans(table, added, execCfg.Codec, endTime,
checkForKVInBounds)
if err != nil {
return nil, err
}

for _, indexSpan := range mergedIndexSpans {
if err := sstIntervalTree.Insert(intervalSpan(indexSpan), false); err != nil {
panic(errors.NewAssertionErrorWithWrappedErrf(err, "IndexSpan"))
}
added[tableAndIndex{tableID: table.GetID(), indexID: index.ID}] = true
}
}
// If there are desc revisions, ensure that we also add any index spans
Expand All @@ -175,13 +332,16 @@ func spansForAllTableIndexes(
rawTbl := descpb.TableFromDescriptor(rev.Desc, hlc.Timestamp{})
if rawTbl != nil && rawTbl.State != descpb.DescriptorState_DROP {
tbl := tabledesc.NewImmutable(*rawTbl)
for _, idx := range tbl.AllNonDropIndexes() {
key := tableAndIndex{tableID: tbl.ID, indexID: idx.ID}
if !added[key] {
if err := sstIntervalTree.Insert(intervalSpan(tbl.IndexSpan(codec, idx.ID)), false); err != nil {
panic(errors.NewAssertionErrorWithWrappedErrf(err, "IndexSpan"))
}
added[key] = true
revSpans, err := getLogicallyMergedTableSpans(tbl, added, execCfg.Codec, rev.Time,
checkForKVInBounds)
if err != nil {
return nil, err
}

mergedIndexSpans = append(mergedIndexSpans, revSpans...)
for _, indexSpan := range mergedIndexSpans {
if err := sstIntervalTree.Insert(intervalSpan(indexSpan), false); err != nil {
panic(errors.NewAssertionErrorWithWrappedErrf(err, "IndexSpan"))
}
}
}
Expand All @@ -195,7 +355,19 @@ func spansForAllTableIndexes(
})
return false
})
return spans

// Attempt to merge any contiguous spans generated from the tables and revs.
mergedSpans, distinct := roachpb.MergeSpans(spans)
if !distinct {
return nil, errors.NewAssertionErrorWithWrappedErrf(errors.New("expected all resolved spans for the BACKUP to be distinct"), "IndexSpan")
}

knobs := execCfg.BackupRestoreTestingKnobs
if knobs != nil && knobs.CaptureResolvedTableDescSpans != nil {
knobs.CaptureResolvedTableDescSpans(mergedSpans)
}

return mergedSpans, nil
}

func getLocalityAndBaseURI(uri, appendPath string) (string, string, error) {
Expand Down Expand Up @@ -769,11 +941,14 @@ func backupPlanHook(

tenantRows = append(tenantRows, ds)
} else {
spans = append(spans, spansForAllTableIndexes(p.ExecCfg().Codec, tables, revs)...)
tableSpans, err := spansForAllTableIndexes(ctx, p.ExecCfg(), endTime, tables, revs)
if err != nil {
return err
}
spans = append(spans, tableSpans...)

// Include all tenants.
// TODO(tbg): make conditional on cluster setting.
var err error
tenantRows, err = p.ExecCfg().InternalExecutor.Query(
ctx, "backup-lookup-tenant", p.ExtendedEvalContext().Txn,
`SELECT id, active, info FROM system.tenants`,
Expand Down
Loading

0 comments on commit c2b4fff

Please sign in to comment.