Skip to content

Commit

Permalink
feat: estimate Cloud2 cardinality on 1.X databases
Browse files Browse the repository at this point in the history
To ease migrations to Cloud 2 installations from
1.X databases, estimate Cloud 2 cardinality for
a datanode (or OSS system).

closes #23356
  • Loading branch information
davidby-influx committed May 23, 2022
1 parent 3074e62 commit 20f1975
Show file tree
Hide file tree
Showing 8 changed files with 401 additions and 14 deletions.
170 changes: 170 additions & 0 deletions cmd/influx_inspect/cardinality/aggregators.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package cardinality

import (
"fmt"
"strings"
"sync"
"text/tabwriter"

"github.com/influxdata/influxdb/cmd/influx_inspect/report"
"github.com/influxdata/influxdb/models"
)

var detailedHeader = []string{"DB", "RP", "measurement", "series", "fields", "tag total", "tags"}
var simpleHeader = []string{"DB", "RP", "measurement", "series"}

type measurementFactory struct {
header []string
estTitle string
newNode func(inner bool) node
counter func() report.Counter
}

func (f *measurementFactory) printHeader(tw *tabwriter.Writer) error {
_, err := fmt.Fprintln(tw, strings.Join(f.header, "\t"))
return err
}

func (f *measurementFactory) printDivider(tw *tabwriter.Writer) error {
divLine := makeTabDiv(f.header)
_, err := fmt.Fprintln(tw, divLine)
return err
}

func makeTabDiv(headers []string) string {
div := make([]string, 0, len(headers))
for _, s := range headers {
div = append(div, strings.Repeat("-", len(s)))
}
return strings.Join(div, "\t")
}

func newSimpleMeasurementFactory(newCounterFn func() report.Counter, est string) *measurementFactory {
return &measurementFactory{
header: simpleHeader,
estTitle: est,
newNode: func(inner bool) node { return newSimpleNode(inner, newCounterFn) },
counter: newCounterFn,
}
}

func newDetailedMeasurementFactory(newCounterFn func() report.Counter, est string) *measurementFactory {
return &measurementFactory{
header: detailedHeader,
estTitle: est,
newNode: func(inner bool) node { return newDetailedNode(inner, newCounterFn) },
counter: newCounterFn,
}
}

type simpleNode struct {
sync.Mutex
report.Counter
nodeMap
}

func (s *simpleNode) nextLevel() nodeMap {
return s.nodeMap
}

func newSimpleNode(inner bool, fn func() report.Counter) *simpleNode {
s := &simpleNode{Counter: fn()}
if inner {
s.nodeMap = make(nodeMap)
}
return s
}

func (s *simpleNode) recordMeasurement(key, _ []byte, _ models.Tags, _ func() report.Counter) {
s.Add(key)
}

func (s *simpleNode) print(tw *tabwriter.Writer, db, rp, ms string) error {
_, err := fmt.Fprintf(tw, "%s\t%s\t%s\t%d\n",
db,
rp,
ms,
s.Count())
return err
}

type detailedNode struct {
simpleNode
fields report.Counter
tags map[string]report.Counter
}

func (d *detailedNode) nextLevel() nodeMap {
return d.nodeMap
}

func newDetailedNode(inner bool, fn func() report.Counter) *detailedNode {
d := &detailedNode{
simpleNode: simpleNode{
Counter: fn(),
},
fields: fn(),
tags: make(map[string]report.Counter),
}
if inner {
d.simpleNode.nodeMap = make(nodeMap)
}
return d
}

func (d *detailedNode) recordMeasurement(key, field []byte, tags models.Tags, newCounterFn func() report.Counter) {
d.Add(key)
d.fields.Add(field)
for _, t := range tags {
tc, ok := d.tags[string(t.Key)]
if !ok {
tc = newCounterFn()
d.tags[string(t.Key)] = tc
}
tc.Add(t.Value)
}
}

func (d *detailedNode) print(tw *tabwriter.Writer, db, rp, ms string) error {
var tagKeys []string
seriesN := d.Count()
fieldsN := d.fields.Count()
if ms != "" {
tagKeys = make([]string, 0, len(d.tags))
}
tagN := uint64(0)
for k, v := range d.tags {
c := v.Count()
tagN += c
if ms != "" {
tagKeys = append(tagKeys, fmt.Sprintf("%q: %d", k, c))
}
}

_, err := fmt.Fprintf(tw, "%s\t%s\t%s\t%d\t%d\t%d\t%s\n",
db,
rp,
ms,
seriesN,
fieldsN,
tagN,
strings.Join(tagKeys, ", "))
return err
}

func initRecord(n node, key []byte, field []byte, tags models.Tags, nodeFn func(bool) node, counterFn func() report.Counter, levelKeys ...string) {
n.Lock()
n.recordMeasurement(key, field, tags, counterFn)

if len(levelKeys) > 0 && nil != n.nextLevel() {
lc, ok := n.nextLevel()[levelKeys[0]]
if !ok {
lc = nodeFn(len(levelKeys) > 1)
}
n.nextLevel()[levelKeys[0]] = lc
n.Unlock()
initRecord(lc, key, field, tags, nodeFn, counterFn, levelKeys[1:]...)
} else {
n.Unlock()
}
}
199 changes: 199 additions & 0 deletions cmd/influx_inspect/cardinality/cardinality.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
package cardinality

import (
"bytes"
"context"
"errors"
"flag"
"fmt"
"io"
"os"
"text/tabwriter"

"github.com/influxdata/influxdb/cmd/influx_inspect/report"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/reporthelper"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
"golang.org/x/sync/errgroup"
)

// Command represents the program execution for "influxd cardinality".
type Command struct {
// Standard input/output, overridden for testing.
Stderr io.Writer
Stdout io.Writer

dbPath string
shardPaths map[uint64]string
exact bool
detailed bool
// How many goroutines to dedicate to calculating cardinality.
concurrency int
// t, d, r, m for Total, Database, Retention Policy, Measurement
rollup string
}

// NewCommand returns a new instance of Command with default setting applied.
func NewCommand() *Command {
return &Command{
Stderr: os.Stderr,
Stdout: os.Stdout,
shardPaths: map[uint64]string{},
concurrency: 1,
detailed: false,
rollup: "m",
}
}

// Run executes the command.
func (cmd *Command) Run(args ...string) (err error) {
var legalRollups = map[string]struct{}{"d": {}, "m": {}, "r": {}, "t": {}}
fs := flag.NewFlagSet("report-db", flag.ExitOnError)
fs.StringVar(&cmd.dbPath, "db-path", "", "Path to database. Required.")
fs.IntVar(&cmd.concurrency, "c", 1, "Set worker concurrency. Defaults to one.")
fs.BoolVar(&cmd.detailed, "detailed", false, "Include counts for fields, tags, ")
fs.BoolVar(&cmd.exact, "exact", false, "Report exact counts")
fs.StringVar(&cmd.rollup, "rollup", "m", "Rollup level - t: total, d: database, r: retention policy, m: measurement")
fs.SetOutput(cmd.Stdout)
if err := fs.Parse(args); err != nil {
return err
}

if cmd.dbPath == "" {
return errors.New("path to database must be provided")
}

if _, ok := legalRollups[cmd.rollup]; !ok {
return fmt.Errorf("invalid rollup specified: %q", cmd.rollup)
}

estTitle := " (est.)"
newCounterFn := report.NewHLLCounter
if cmd.exact {
newCounterFn = report.NewExactCounter
estTitle = ""
}

var factory *measurementFactory
if cmd.detailed {
factory = newDetailedMeasurementFactory(newCounterFn, estTitle)
} else {
factory = newSimpleMeasurementFactory(newCounterFn, estTitle)
}

dbMap := factory.newNode(true)

g, ctx := errgroup.WithContext(context.Background())
g.SetLimit(cmd.concurrency)
processTSM := func(db, rp, id, path string) error {
file, err := os.OpenFile(path, os.O_RDONLY, 0600)
if err != nil {
_, _ = fmt.Fprintf(cmd.Stderr, "error: %s: %v. Skipping.\n", path, err)
return nil
}

reader, err := tsm1.NewTSMReader(file)
if err != nil {
_, _ = fmt.Fprintf(cmd.Stderr, "error: %s: %v. Skipping.\n", file.Name(), err)
// NewTSMReader won't close the file handle on failure, so do it here.
_ = file.Close()
return nil
}
defer func() {
// The TSMReader will close the underlying file handle here.
if err := reader.Close(); err != nil {
_, _ = fmt.Fprintf(cmd.Stderr, "error closing: %s: %v.\n", file.Name(), err)
}
}()

seriesCount := reader.KeyCount()
for i := 0; i < seriesCount; i++ {
func() {
key, _ := reader.KeyAt(i)
seriesKey, field, _ := bytes.Cut(key, []byte("#!~#"))
measurement, tags := models.ParseKey(seriesKey)
if cmd.rollup == "m" {
initRecord(dbMap, key, field, tags, factory.newNode, factory.counter, db, rp, measurement)
} else if cmd.rollup == "r" {
initRecord(dbMap, key, field, tags, factory.newNode, factory.counter, db, rp)
} else if cmd.rollup == "d" {
initRecord(dbMap, key, field, tags, factory.newNode, factory.counter, db)
} else {
initRecord(dbMap, key, field, tags, factory.newNode, factory.counter)
}
}()
}
return nil
}
done := ctx.Done()
err = reporthelper.WalkShardDirs(cmd.dbPath, func(db, rp, id, path string) error {
select {
case <-done:
return nil
default:
g.Go(func() error {
return processTSM(db, rp, id, path)
})
return nil
}
})

if err != nil {
_, _ = fmt.Fprintf(cmd.Stderr, "%s: %v\n", cmd.dbPath, err)
return err
}
err = g.Wait()
if err != nil {
_, _ = fmt.Fprintf(cmd.Stderr, "%s: %v\n", cmd.dbPath, err)
return err
}

tw := tabwriter.NewWriter(cmd.Stdout, 8, 2, 1, ' ', 0)

if err = factory.printHeader(tw); err != nil {
return err
}
if err = factory.printDivider(tw); err != nil {
return err
}
for d, db := range dbMap.nextLevel() {
for r, rp := range db.nextLevel() {
for m, measure := range rp.nextLevel() {
if cmd.rollup == "m" {
err = measure.print(tw, fmt.Sprintf("%q", d), fmt.Sprintf("%q", r), fmt.Sprintf("%q", m))
if err != nil {
return err
}
}
}
if cmd.rollup == "m" || cmd.rollup == "r" {
if err = rp.print(tw, fmt.Sprintf("%q", d), fmt.Sprintf("%q", r), ""); err != nil {
return err
}
}
}
if cmd.rollup != "t" {
if err = db.print(tw, fmt.Sprintf("%q", d), "", ""); err != nil {
return err
}
}
}
if err = dbMap.print(tw, "Total"+factory.estTitle, "", ""); err != nil {
return err
}
return tw.Flush()
}

type nodeMap map[string]node
type nodeLock interface {
Lock()
Unlock()
}

type node interface {
nodeLock
report.Counter
nextLevel() nodeMap
recordMeasurement(key, field []byte, tags models.Tags, newCounterFn func() report.Counter)
print(tw *tabwriter.Writer, db, rp, ms string) error
}
1 change: 1 addition & 0 deletions cmd/influx_inspect/help/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ The commands are:
buildtsi generates tsi1 indexes from tsm1 data
help display this help message
report displays a shard level cardinality report
report-db estimates cloud 2 cardinality for a database
report-disk displays a shard level disk usage report
verify verifies integrity of TSM files
verify-seriesfile verifies integrity of the Series file
Expand Down
6 changes: 6 additions & 0 deletions cmd/influx_inspect/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/influxdata/influxdb/cmd"
"github.com/influxdata/influxdb/cmd/influx_inspect/buildtsi"
"github.com/influxdata/influxdb/cmd/influx_inspect/cardinality"
"github.com/influxdata/influxdb/cmd/influx_inspect/deletetsm"
"github.com/influxdata/influxdb/cmd/influx_inspect/dumptsi"
"github.com/influxdata/influxdb/cmd/influx_inspect/dumptsm"
Expand Down Expand Up @@ -61,6 +62,11 @@ func (m *Main) Run(args ...string) error {
if err := help.NewCommand().Run(args...); err != nil {
return fmt.Errorf("help: %s", err)
}
case "report-db":
name := cardinality.NewCommand()
if err := name.Run(args...); err != nil {
return fmt.Errorf("report-db: %w", err)
}
case "deletetsm":
name := deletetsm.NewCommand()
if err := name.Run(args...); err != nil {
Expand Down
Loading

0 comments on commit 20f1975

Please sign in to comment.