diff --git a/cmd/influx_inspect/cardinality/aggregators/aggregators.go b/cmd/influx_inspect/cardinality/aggregators/aggregators.go new file mode 100644 index 00000000000..a40c7e2d4a7 --- /dev/null +++ b/cmd/influx_inspect/cardinality/aggregators/aggregators.go @@ -0,0 +1,242 @@ +package aggregators + +import ( + "fmt" + "strings" + "sync" + "text/tabwriter" + + "github.com/influxdata/influxdb/cmd/influx_inspect/report" + "github.com/influxdata/influxdb/models" +) + +type rollupNodeMap map[string]RollupNode + +type RollupNode interface { + sync.Locker + report.Counter + Children() rollupNodeMap + RecordSeries(db, rp, ms string, key, field []byte, tags models.Tags) + Print(tw *tabwriter.Writer, printTags bool, db, rp, ms string) error + isLeaf() bool + child(key string, isLeaf bool) NodeWrapper +} + +type NodeWrapper struct { + RollupNode +} + +var detailedHeader = []string{"DB", "RP", "measurement", "series", "fields", "tag total", "tags"} +var simpleHeader = []string{"DB", "RP", "measurement", "series"} + +type RollupNodeFactory struct { + header []string + EstTitle string + NewNode func(isLeaf bool) NodeWrapper + counter func() report.Counter +} + +var nodeFactory *RollupNodeFactory + +func CreateNodeFactory(detailed, exact bool) *RollupNodeFactory { + estTitle := " (est.)" + newCounterFn := report.NewHLLCounter + if exact { + newCounterFn = report.NewExactCounter + estTitle = "" + } + + if detailed { + nodeFactory = newDetailedNodeFactory(newCounterFn, estTitle) + } else { + nodeFactory = newSimpleNodeFactory(newCounterFn, estTitle) + } + return nodeFactory +} + +func (f *RollupNodeFactory) PrintHeader(tw *tabwriter.Writer) error { + _, err := fmt.Fprintln(tw, strings.Join(f.header, "\t")) + return err +} + +func (f *RollupNodeFactory) PrintDivider(tw *tabwriter.Writer) error { + divLine := f.makeTabDivider() + _, err := fmt.Fprintln(tw, divLine) + return err +} + +func (f *RollupNodeFactory) makeTabDivider() string { + div := make([]string, 0, len(f.header)) + for _, s := range f.header { + div = append(div, strings.Repeat("-", len(s))) + } + return strings.Join(div, "\t") +} + +func newSimpleNodeFactory(newCounterFn func() report.Counter, est string) *RollupNodeFactory { + return &RollupNodeFactory{ + header: simpleHeader, + EstTitle: est, + NewNode: func(isLeaf bool) NodeWrapper { return NodeWrapper{newSimpleNode(isLeaf, newCounterFn)} }, + counter: newCounterFn, + } +} + +func newDetailedNodeFactory(newCounterFn func() report.Counter, est string) *RollupNodeFactory { + return &RollupNodeFactory{ + header: detailedHeader, + EstTitle: est, + NewNode: func(isLeaf bool) NodeWrapper { return NodeWrapper{newDetailedNode(isLeaf, newCounterFn)} }, + counter: newCounterFn, + } +} + +type simpleNode struct { + sync.Mutex + report.Counter + rollupNodeMap +} + +func (s *simpleNode) Children() rollupNodeMap { + return s.rollupNodeMap +} + +func (s *simpleNode) child(key string, isLeaf bool) NodeWrapper { + if s.isLeaf() { + panic("Trying to get the child to a leaf node") + } + s.Lock() + defer s.Unlock() + c, ok := s.Children()[key] + if !ok { + c = nodeFactory.NewNode(isLeaf) + s.Children()[key] = c + } + return NodeWrapper{c} +} + +func (s *simpleNode) isLeaf() bool { + return s.Children() == nil +} + +func newSimpleNode(isLeaf bool, fn func() report.Counter) *simpleNode { + s := &simpleNode{Counter: fn()} + if !isLeaf { + s.rollupNodeMap = make(rollupNodeMap) + } else { + s.rollupNodeMap = nil + } + return s +} + +func (s *simpleNode) RecordSeries(db, rp, _ string, key, _ []byte, _ models.Tags) { + s.Lock() + defer s.Unlock() + s.recordSeriesNoLock(db, rp, key) +} + +func (s *simpleNode) recordSeriesNoLock(db, rp string, key []byte) { + s.Add([]byte(fmt.Sprintf("%s.%s.%s", db, rp, key))) +} + +func (s *simpleNode) Print(tw *tabwriter.Writer, _ bool, db, rp, ms string) error { + _, err := fmt.Fprintf(tw, "%s\t%s\t%s\t%d\n", + db, + rp, + ms, + s.Count()) + return err +} + +type detailedNode struct { + simpleNode + fields report.Counter + tags map[string]report.Counter +} + +func newDetailedNode(isLeaf bool, fn func() report.Counter) *detailedNode { + d := &detailedNode{ + simpleNode: simpleNode{ + Counter: fn(), + }, + fields: fn(), + tags: make(map[string]report.Counter), + } + if !isLeaf { + d.simpleNode.rollupNodeMap = make(rollupNodeMap) + } else { + d.simpleNode.rollupNodeMap = nil + } + return d +} + +func (d *detailedNode) RecordSeries(db, rp, ms string, key, field []byte, tags models.Tags) { + d.Lock() + defer d.Unlock() + d.simpleNode.recordSeriesNoLock(db, rp, key) + d.fields.Add([]byte(fmt.Sprintf("%s.%s.%s.%s", db, rp, ms, field))) + for _, t := range tags { + // Add database, retention policy, and measurement + // to correctly aggregate in inner (non-leaf) nodes + canonTag := fmt.Sprintf("%s.%s.%s.%s", db, rp, ms, t.Key) + tc, ok := d.tags[canonTag] + if !ok { + tc = nodeFactory.counter() + d.tags[canonTag] = tc + } + tc.Add(t.Value) + } +} + +func (d *detailedNode) Print(tw *tabwriter.Writer, printTags bool, db, rp, ms string) error { + seriesN := d.Count() + fieldsN := d.fields.Count() + var tagKeys []string + tagN := uint64(0) + + if printTags { + tagKeys = make([]string, 0, len(d.tags)) + } + for k, v := range d.tags { + c := v.Count() + tagN += c + if printTags { + tagKeys = append(tagKeys, fmt.Sprintf("%q: %d", k[strings.LastIndex(k, ".")+1:], c)) + } + } + _, err := fmt.Fprintf(tw, "%s\t%s\t%s\t%d\t%d\t%d\t%s\n", + db, + rp, + ms, + seriesN, + fieldsN, + tagN, + strings.Join(tagKeys, ", ")) + return err +} + +func (r *NodeWrapper) Record(depth, totalDepth int, db, rp, measurement string, key []byte, field []byte, tags models.Tags) { + r.RecordSeries(db, rp, measurement, key, field, tags) + + switch depth { + case 2: + if depth < totalDepth { + // Create measurement level in tree + c := r.child(measurement, true) + c.RecordSeries(db, rp, measurement, key, field, tags) + } + case 1: + if depth < totalDepth { + // Create retention policy level in tree + c := r.child(rp, (depth+1) == totalDepth) + c.Record(depth+1, totalDepth, db, rp, measurement, key, field, tags) + } + case 0: + if depth < totalDepth { + // Create database level in tree + c := r.child(db, (depth+1) == totalDepth) + c.Record(depth+1, totalDepth, db, rp, measurement, key, field, tags) + } + default: + } +} diff --git a/cmd/influx_inspect/cardinality/aggregators/aggregators_test.go b/cmd/influx_inspect/cardinality/aggregators/aggregators_test.go new file mode 100644 index 00000000000..4032001d621 --- /dev/null +++ b/cmd/influx_inspect/cardinality/aggregators/aggregators_test.go @@ -0,0 +1,329 @@ +package aggregators + +import ( + "bytes" + "sync" + "testing" + + "github.com/influxdata/influxdb/models" + "github.com/stretchr/testify/require" +) + +type result struct { + fields uint64 + tags uint64 + series uint64 +} + +type test struct { + db string + rp string + key []byte +} + +// Ensure that tags and fields and series which differ only in database, retention policy, or measurement +// are correctly counted. +func Test_canonicalize(t *testing.T) { + totalDepth := 3 + + // measurement,tag1=tag1_value1,tag2=tag2_value1#!~#field1 + tests := []test{ + { + db: "db1", + rp: "rp1", + key: []byte("m1,t1=t1_v1,t2=t2_v1#!~#f1"), + }, + { + db: "db1", + rp: "rp1", + key: []byte("m1,t1=t1_v2,t2=t2_v1#!~#f1"), + }, + { + db: "db1", + rp: "rp1", + key: []byte("m1,t1=t1_v1,t2=t2_v2#!~#f1"), + }, + { + db: "db1", + rp: "rp1", + key: []byte("m1,t1=t1_v2,t2=t2_v2#!~#f1"), + }, + { + db: "db1", + rp: "rp1", + key: []byte("m1,t1=t1_v2,t2=t2_v2#!~#f2"), + }, + { + db: "db1", + rp: "rp2", + key: []byte("m1,t1=t1_v1,t2=t2_v1#!~#f1"), + }, + { + db: "db1", + rp: "rp2", + key: []byte("m1,t1=t1_v2,t2=t2_v1#!~#f1"), + }, + { + db: "db1", + rp: "rp2", + key: []byte("m1,t1=t1_v1,t2=t2_v2#!~#f1"), + }, + { + db: "db1", + rp: "rp2", + key: []byte("m1,t1=t1_v2,t2=t2_v2#!~#f3"), + }, + { + db: "db1", + rp: "rp2", + key: []byte("m1,t1=t1_v2,t2=t2_v2#!~#f2"), + }, + { + db: "db1", + rp: "rp1", + key: []byte("m2,t1=t1_v1,t2=t2_v1#!~#f1"), + }, + { + db: "db1", + rp: "rp1", + key: []byte("m2,t1=t1_v2,t2=t2_v1#!~#f1"), + }, + { + db: "db1", + rp: "rp1", + key: []byte("m2,t1=t1_v1,t2=t2_v2#!~#f1"), + }, + { + db: "db1", + rp: "rp1", + key: []byte("m2,t1=t1_v2,t2=t2_v2#!~#f1"), + }, + { + db: "db1", + rp: "rp1", + key: []byte("m2,t1=t1_v2,t2=t2_v2#!~#f2"), + }, + { + db: "db1", + rp: "rp2", + key: []byte("m2,t1=t1_v1,t2=t2_v1#!~#f1"), + }, + { + db: "db1", + rp: "rp2", + key: []byte("m2,t1=t1_v2,t2=t2_v1#!~#f1"), + }, + { + db: "db1", + rp: "rp2", + key: []byte("m2,t1=t1_v1,t2=t2_v2#!~#f1"), + }, + { + db: "db1", + rp: "rp2", + key: []byte("m2,t1=t1_v2,t2=t2_v2#!~#f1"), + }, + { + db: "db1", + rp: "rp2", + key: []byte("m2,t1=t1_v2,t2=t2_v2#!~#f2"), + }, + { + db: "db2", + rp: "rp1", + key: []byte("m1,t1=t1_v1,t2=t2_v1#!~#f1"), + }, + { + db: "db2", + rp: "rp1", + key: []byte("m1,t1=t1_v2,t2=t2_v1#!~#f1"), + }, + { + db: "db2", + rp: "rp1", + key: []byte("m1,t1=t1_v1,t2=t2_v2#!~#f1"), + }, + { + db: "db2", + rp: "rp1", + key: []byte("m1,t1=t1_v2,t2=t2_v2#!~#f1"), + }, + { + db: "db2", + rp: "rp1", + key: []byte("m1,t1=t1_v2,t2=t2_v2#!~#f2"), + }, + { + db: "db2", + rp: "rp2", + key: []byte("m1,t1=t1_v1,t2=t2_v1#!~#f1"), + }, + { + db: "db2", + rp: "rp2", + key: []byte("m1,t1=t1_v2,t2=t2_v1#!~#f1"), + }, + { + db: "db2", + rp: "rp2", + key: []byte("m1,t1=t1_v1,t2=t2_v2#!~#f1"), + }, + { + db: "db2", + rp: "rp2", + key: []byte("m1,t1=t1_v2,t2=t2_v2#!~#f1"), + }, + { + db: "db2", + rp: "rp2", + key: []byte("m1,t1=t1_v2,t2=t2_v2#!~#f2"), + }, + { + db: "db2", + rp: "rp1", + key: []byte("m2,t1=t1_v1,t2=t2_v1#!~#f1"), + }, + { + db: "db2", + rp: "rp1", + key: []byte("m2,t1=t1_v2,t2=t2_v1#!~#f1"), + }, + { + db: "db2", + rp: "rp1", + key: []byte("m2,t1=t1_v1,t2=t2_v2#!~#f1"), + }, + { + db: "db2", + rp: "rp1", + key: []byte("m2,t1=t1_v2,t2=t2_v2#!~#f1"), + }, + { + db: "db2", + rp: "rp1", + key: []byte("m2,t1=t1_v2,t2=t2_v2#!~#f2"), + }, + { + db: "db2", + rp: "rp2", + key: []byte("m2,t1=t1_v1,t2=t2_v1#!~#f1"), + }, + { + db: "db2", + rp: "rp2", + key: []byte("m2,t1=t1_v2,t2=t2_v1#!~#f1"), + }, + { + db: "db2", + rp: "rp2", + key: []byte("m2,t1=t1_v1,t2=t2_v2#!~#f1"), + }, + { + db: "db2", + rp: "rp2", + key: []byte("m2,t1=t1_v2,t2=t2_v2#!~#f1"), + }, + { + db: "db2", + rp: "rp2", + key: []byte("m2,t1=t1_v2,t2=t2_v2#!~#f2"), + }, + } + + results := map[string]map[string]map[string]*result{ + "db1": { + "rp1": { + "m1": {2, 4, 5}, + "m2": {2, 4, 5}, + "": {4, 8, 10}, + }, + "rp2": { + "m1": {3, 4, 5}, + "m2": {2, 4, 5}, + "": {5, 8, 10}, + }, + "": { + "": {9, 16, 20}, + }, + }, + "db2": { + "rp1": { + "m1": {2, 4, 5}, + "m2": {2, 4, 5}, + "": {4, 8, 10}, + }, + "rp2": { + "m1": {2, 4, 5}, + "m2": {2, 4, 5}, + "": {4, 8, 10}, + }, + "": { + "": {8, 16, 20}, + }, + }, + "": { + "": { + "": {17, 32, 40}, + }, + }, + } + + testLoop(t, false, true, totalDepth, tests, results) + testLoop(t, true, true, totalDepth, tests, results) + testLoop(t, false, false, totalDepth, tests, results) + testLoop(t, true, false, totalDepth, tests, results) +} + +func testLoop(t *testing.T, detailed bool, exact bool, totalDepth int, tests []test, results map[string]map[string]map[string]*result) { + factory := CreateNodeFactory(detailed, exact) + tree := factory.NewNode(totalDepth == 0) + + wg := sync.WaitGroup{} + tf := func() { + for i, _ := range tests { + seriesKey, field, _ := bytes.Cut(tests[i].key, []byte("#!~#")) + measurement, tags := models.ParseKey(seriesKey) + tree.Record(0, totalDepth, tests[i].db, tests[i].rp, measurement, tests[i].key, field, tags) + } + wg.Done() + } + const concurrency = 5 + wg.Add(concurrency) + for j := 0; j < concurrency; j++ { + go tf() + } + wg.Wait() + + for d, db := range tree.Children() { + for r, rp := range db.Children() { + for m, measure := range rp.Children() { + checkNode(t, measure, results[d][r][m], d, r, m) + } + checkNode(t, rp, results[d][r][""], d, r, "") + } + checkNode(t, db, results[d][""][""], d, "", "") + } + checkNode(t, tree, results[""][""][""], "", "", "") +} + +func checkNode(t *testing.T, measure RollupNode, results *result, d string, r string, m string) { + mr, ok := measure.(NodeWrapper) + if !ok { + t.Fatalf("internal error: expected a NodeWrapper type") + } + + switch node := mr.RollupNode.(type) { + case *detailedNode: + require.Equalf(t, results.series, node.Count(), "series count wrong. db: %q, rp: %q, ms: %q", d, r, m) + require.Equalf(t, results.fields, node.fields.Count(), "field count wrong. db: %q, rp: %q, ms: %q", d, r, m) + tagSum := uint64(0) + for _, t := range node.tags { + tagSum += t.Count() + } + require.Equalf(t, results.tags, tagSum, "tag value count wrong. db: %q, rp: %q, ms: %q", d, r, m) + case *simpleNode: + require.Equalf(t, results.series, node.Count(), "series count wrong. db: %q, rp: %q, ms: %q", d, r, m) + default: + t.Fatalf("internal error: unknown node type") + } +} diff --git a/cmd/influx_inspect/cardinality/cardinality.go b/cmd/influx_inspect/cardinality/cardinality.go new file mode 100644 index 00000000000..179607ca20b --- /dev/null +++ b/cmd/influx_inspect/cardinality/cardinality.go @@ -0,0 +1,160 @@ +package cardinality + +import ( + "bytes" + "context" + "errors" + "flag" + "fmt" + "io" + "os" + "text/tabwriter" + + "github.com/influxdata/influxdb/cmd/influx_inspect/cardinality/aggregators" + "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/pkg/reporthelper" + "github.com/influxdata/influxdb/tsdb/engine/tsm1" + "golang.org/x/sync/errgroup" +) + +// Command represents the program execution for "influxd cardinality". +type Command struct { + // Standard input/output, overridden for testing. + Stderr io.Writer + Stdout io.Writer + + dbPath string + shardPaths map[uint64]string + exact bool + detailed bool + // How many goroutines to dedicate to calculating cardinality. + concurrency int + // t, d, r, m for Total, Database, Retention Policy, Measurement + rollup string +} + +// NewCommand returns a new instance of Command with default setting applied. +func NewCommand() *Command { + return &Command{ + Stderr: os.Stderr, + Stdout: os.Stdout, + shardPaths: map[uint64]string{}, + concurrency: 1, + detailed: false, + rollup: "m", + } +} + +// Run executes the command. +func (cmd *Command) Run(args ...string) (err error) { + var legalRollups = map[string]int{"m": 3, "r": 2, "d": 1, "t": 0} + fs := flag.NewFlagSet("report-db", flag.ExitOnError) + fs.StringVar(&cmd.dbPath, "db-path", "", "Path to database. Required.") + fs.IntVar(&cmd.concurrency, "c", 1, "Set worker concurrency. Defaults to one.") + fs.BoolVar(&cmd.detailed, "detailed", false, "Include counts for fields, tags, ") + fs.BoolVar(&cmd.exact, "exact", false, "Report exact counts") + fs.StringVar(&cmd.rollup, "rollup", "m", "Rollup level - t: total, d: database, r: retention policy, m: measurement") + fs.SetOutput(cmd.Stdout) + if err := fs.Parse(args); err != nil { + return err + } + + if cmd.dbPath == "" { + return errors.New("path to database must be provided") + } + + totalDepth, ok := legalRollups[cmd.rollup] + + if !ok { + return fmt.Errorf("invalid rollup specified: %q", cmd.rollup) + } + + factory := aggregators.CreateNodeFactory(cmd.detailed, cmd.exact) + totalsTree := factory.NewNode(totalDepth == 0) + + g, ctx := errgroup.WithContext(context.Background()) + g.SetLimit(cmd.concurrency) + processTSM := func(db, rp, id, path string) error { + file, err := os.OpenFile(path, os.O_RDONLY, 0600) + if err != nil { + _, _ = fmt.Fprintf(cmd.Stderr, "error: %s: %v. Skipping.\n", path, err) + return nil + } + + reader, err := tsm1.NewTSMReader(file) + if err != nil { + _, _ = fmt.Fprintf(cmd.Stderr, "error: %s: %v. Skipping.\n", file.Name(), err) + // NewTSMReader won't close the file handle on failure, so do it here. + _ = file.Close() + return nil + } + defer func() { + // The TSMReader will close the underlying file handle here. + if err := reader.Close(); err != nil { + _, _ = fmt.Fprintf(cmd.Stderr, "error closing: %s: %v.\n", file.Name(), err) + } + }() + + seriesCount := reader.KeyCount() + for i := 0; i < seriesCount; i++ { + func() { + key, _ := reader.KeyAt(i) + seriesKey, field, _ := bytes.Cut(key, []byte("#!~#")) + measurement, tags := models.ParseKey(seriesKey) + totalsTree.Record(0, totalDepth, db, rp, measurement, key, field, tags) + }() + } + return nil + } + done := ctx.Done() + err = reporthelper.WalkShardDirs(cmd.dbPath, func(db, rp, id, path string) error { + select { + case <-done: + return nil + default: + g.Go(func() error { + return processTSM(db, rp, id, path) + }) + return nil + } + }) + + if err != nil { + _, _ = fmt.Fprintf(cmd.Stderr, "%s: %v\n", cmd.dbPath, err) + return err + } + err = g.Wait() + if err != nil { + _, _ = fmt.Fprintf(cmd.Stderr, "%s: %v\n", cmd.dbPath, err) + return err + } + + tw := tabwriter.NewWriter(cmd.Stdout, 8, 2, 1, ' ', 0) + + if err = factory.PrintHeader(tw); err != nil { + return err + } + if err = factory.PrintDivider(tw); err != nil { + return err + } + for d, db := range totalsTree.Children() { + for r, rp := range db.Children() { + for m, measure := range rp.Children() { + err = measure.Print(tw, true, fmt.Sprintf("%q", d), fmt.Sprintf("%q", r), fmt.Sprintf("%q", m)) + if err != nil { + return err + } + } + if err = rp.Print(tw, false, fmt.Sprintf("%q", d), fmt.Sprintf("%q", r), ""); err != nil { + return err + } + } + if err = db.Print(tw, false, fmt.Sprintf("%q", d), "", ""); err != nil { + return err + } + } + if err = totalsTree.Print(tw, false, "Total"+factory.EstTitle, "", ""); err != nil { + return err + } + return tw.Flush() +} diff --git a/cmd/influx_inspect/help/help.go b/cmd/influx_inspect/help/help.go index ae68298c4e9..57fd0cb37e0 100644 --- a/cmd/influx_inspect/help/help.go +++ b/cmd/influx_inspect/help/help.go @@ -38,6 +38,7 @@ The commands are: buildtsi generates tsi1 indexes from tsm1 data help display this help message report displays a shard level cardinality report + report-db estimates cloud 2 cardinality for a database report-disk displays a shard level disk usage report verify verifies integrity of TSM files verify-seriesfile verifies integrity of the Series file diff --git a/cmd/influx_inspect/main.go b/cmd/influx_inspect/main.go index 74dd3d82821..4e652c0ca7f 100644 --- a/cmd/influx_inspect/main.go +++ b/cmd/influx_inspect/main.go @@ -9,6 +9,7 @@ import ( "github.com/influxdata/influxdb/cmd" "github.com/influxdata/influxdb/cmd/influx_inspect/buildtsi" + "github.com/influxdata/influxdb/cmd/influx_inspect/cardinality" "github.com/influxdata/influxdb/cmd/influx_inspect/deletetsm" "github.com/influxdata/influxdb/cmd/influx_inspect/dumptsi" "github.com/influxdata/influxdb/cmd/influx_inspect/dumptsm" @@ -61,6 +62,11 @@ func (m *Main) Run(args ...string) error { if err := help.NewCommand().Run(args...); err != nil { return fmt.Errorf("help: %s", err) } + case "report-db": + name := cardinality.NewCommand() + if err := name.Run(args...); err != nil { + return fmt.Errorf("report-db: %w", err) + } case "deletetsm": name := deletetsm.NewCommand() if err := name.Run(args...); err != nil { diff --git a/cmd/influx_inspect/report/report.go b/cmd/influx_inspect/report/report.go index e61cf642802..ddabc906406 100644 --- a/cmd/influx_inspect/report/report.go +++ b/cmd/influx_inspect/report/report.go @@ -53,11 +53,11 @@ func (cmd *Command) Run(args ...string) error { return err } - newCounterFn := newHLLCounter + newCounterFn := NewHLLCounter estTitle := " (est)" if cmd.exact { estTitle = "" - newCounterFn = newExactCounter + newCounterFn = NewExactCounter } cmd.dir = fs.Arg(0) @@ -68,11 +68,11 @@ func (cmd *Command) Run(args ...string) error { } totalSeries := newCounterFn() - tagCardinalities := map[string]counter{} - measCardinalities := map[string]counter{} - fieldCardinalities := map[string]counter{} + tagCardinalities := map[string]Counter{} + measCardinalities := map[string]Counter{} + fieldCardinalities := map[string]Counter{} - dbCardinalities := map[string]counter{} + dbCardinalities := map[string]Counter{} start := time.Now() @@ -210,7 +210,7 @@ func (cmd *Command) Run(args ...string) error { } // sortKeys is a quick helper to return the sorted set of a map's keys -func sortKeys(vals map[string]counter) (keys []string) { +func sortKeys(vals map[string]Counter) (keys []string) { for k := range vals { keys = append(keys, k) } @@ -238,14 +238,14 @@ Usage: influx_inspect report [flags] fmt.Fprintf(cmd.Stdout, usage) } -// counter abstracts a a method of counting keys. -type counter interface { +// Counter abstracts a a method of counting keys. +type Counter interface { Add(key []byte) Count() uint64 } -// newHLLCounter returns an approximate counter using HyperLogLogs for cardinality estimation. -func newHLLCounter() counter { +// NewHLLCounter returns an approximate Counter using HyperLogLogs for cardinality estimation. +func NewHLLCounter() Counter { return hllpp.New() } @@ -262,7 +262,7 @@ func (c *exactCounter) Count() uint64 { return uint64(len(c.m)) } -func newExactCounter() counter { +func NewExactCounter() Counter { return &exactCounter{ m: make(map[string]struct{}), } diff --git a/go.mod b/go.mod index e35c00cb093..755b13212ae 100644 --- a/go.mod +++ b/go.mod @@ -44,7 +44,7 @@ require ( go.uber.org/multierr v1.6.0 go.uber.org/zap v1.16.0 golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871 - golang.org/x/sync v0.0.0-20210220032951-036812b2e83c + golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e golang.org/x/text v0.3.7 golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba diff --git a/go.sum b/go.sum index f4dbe5be6f5..65c47ece7c9 100644 --- a/go.sum +++ b/go.sum @@ -1142,8 +1142,9 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 h1:w8s32wxx3sY+OjLlv9qltkLU5yvJzxjjgiHWLjdIcw4= +golang.org/x/sync v0.0.0-20220513210516-0976fa681c29/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=