From b9ede87508462a24979c10cd83c29ce212b1bc4b Mon Sep 17 00:00:00 2001 From: Max U Date: Thu, 18 Jul 2019 11:22:10 -0700 Subject: [PATCH 01/18] fix(tsi): error trace for engine failure, not working --- pkg/mmap/mmap_unix.go | 6 +++++- tsdb/series_file.go | 2 ++ tsdb/series_partition.go | 5 +++++ tsdb/series_segment.go | 2 ++ tsdb/tsi1/index_file.go | 1 + 5 files changed, 15 insertions(+), 1 deletion(-) diff --git a/pkg/mmap/mmap_unix.go b/pkg/mmap/mmap_unix.go index 13629c1a4e5..71ca021598b 100644 --- a/pkg/mmap/mmap_unix.go +++ b/pkg/mmap/mmap_unix.go @@ -8,6 +8,8 @@ package mmap import ( + "errors" + "fmt" "os" "syscall" ) @@ -32,8 +34,10 @@ func Map(path string, sz int64) ([]byte, error) { sz = fi.Size() } - data, err := syscall.Mmap(int(f.Fd()), 0, int(sz), syscall.PROT_READ, syscall.MAP_SHARED) + data, err := syscall.Mmap(int(f.Fd()), 0, int(sz), syscall.PROT_READ, syscall.MAP_PRIVATE) + //data, err := syscall.Mmap(int(f.Fd()), 0, int(sz), syscall.PROT_READ, syscall.MAP_SHARED) if err != nil { + return nil, errors.New("map err: " + fmt.Sprintf("%v", int(sz))) return nil, err } diff --git a/tsdb/series_file.go b/tsdb/series_file.go index d7cd31f4f1c..842b3671990 100644 --- a/tsdb/series_file.go +++ b/tsdb/series_file.go @@ -89,6 +89,7 @@ func (f *SeriesFile) DisableMetrics() { func (f *SeriesFile) Open(ctx context.Context) error { f.mu.Lock() defer f.mu.Unlock() + f.Logger.Error("starting open") if f.res.Opened() { return errors.New("series file already opened") @@ -102,6 +103,7 @@ func (f *SeriesFile) Open(ctx context.Context) error { // Create path if it doesn't exist. if err := os.MkdirAll(filepath.Join(f.path), 0777); err != nil { + f.Logger.Error("creating path") return err } diff --git a/tsdb/series_partition.go b/tsdb/series_partition.go index b7fbab2ad41..f03898de1d3 100644 --- a/tsdb/series_partition.go +++ b/tsdb/series_partition.go @@ -8,6 +8,7 @@ import ( "io/ioutil" "os" "path/filepath" + "strconv" "sync" "time" @@ -83,6 +84,7 @@ func (p *SeriesPartition) Open() error { // Open components. if err := func() (err error) { if err := p.openSegments(); err != nil { + p.Logger.Error("open seg") return err } // Init last segment for writes. @@ -114,12 +116,15 @@ func (p *SeriesPartition) openSegments() error { for _, fi := range fis { segmentID, err := ParseSeriesSegmentFilename(fi.Name()) + p.Logger.Error("got segID: " + strconv.Itoa(int(segmentID)) + " from name: " + fi.Name()) if err != nil { + p.Logger.Error("got err" + fmt.Sprintf("%v", err)) continue } segment := NewSeriesSegment(segmentID, filepath.Join(p.path, fi.Name())) if err := segment.Open(); err != nil { + p.Logger.Error("segment err: " + fmt.Sprintf("%v", err)) return err } p.segments = append(p.segments, segment) diff --git a/tsdb/series_segment.go b/tsdb/series_segment.go index 16b32c5aa8c..3bbe5d94a73 100644 --- a/tsdb/series_segment.go +++ b/tsdb/series_segment.go @@ -94,6 +94,7 @@ func (s *SeriesSegment) Open() error { if err := func() (err error) { // Memory map file data. if s.data, err = mmap.Map(s.path, int64(SeriesSegmentSize(s.id))); err != nil { + //return errors.New("map err: " + fmt.Sprintf("%v, size: %v", s.path, int64(SeriesSegmentSize(s.id)))) return err } @@ -312,6 +313,7 @@ func IsValidSeriesSegmentFilename(filename string) bool { // ParseSeriesSegmentFilename returns the id represented by the hexadecimal filename. func ParseSeriesSegmentFilename(filename string) (uint16, error) { i, err := strconv.ParseUint(filename, 16, 32) + //return uint16(i), errors.New("err- i:" + string(i) + ", to: " + string(uint16(i))) return uint16(i), err } diff --git a/tsdb/tsi1/index_file.go b/tsdb/tsi1/index_file.go index 7881ec5d565..c959f61d4b7 100644 --- a/tsdb/tsi1/index_file.go +++ b/tsdb/tsi1/index_file.go @@ -118,6 +118,7 @@ func (f *IndexFile) Open() (err error) { // Try to acquire a reference to the series file. f.sfileref, err = f.sfile.Acquire() + f.sfile.Logger.Error("err before1") if err != nil { return err } From 41cc23cc358a8a6d19a1376baa60364a0b42b9de Mon Sep 17 00:00:00 2001 From: Max U Date: Fri, 19 Jul 2019 13:45:46 -0700 Subject: [PATCH 02/18] fix(tsi1): clean up some error checking --- pkg/mmap/mmap_unix.go | 6 +- tsdb/series_file.go | 2 - tsdb/series_partition.go | 5 - tsdb/series_segment.go | 1 - tsdb/tsi1/index_file.go | 4 +- tsdb/tsi1/tsi1_report.go | 555 +++++++++++++++++++++++++++++++++++++++ 6 files changed, 559 insertions(+), 14 deletions(-) create mode 100644 tsdb/tsi1/tsi1_report.go diff --git a/pkg/mmap/mmap_unix.go b/pkg/mmap/mmap_unix.go index 71ca021598b..13629c1a4e5 100644 --- a/pkg/mmap/mmap_unix.go +++ b/pkg/mmap/mmap_unix.go @@ -8,8 +8,6 @@ package mmap import ( - "errors" - "fmt" "os" "syscall" ) @@ -34,10 +32,8 @@ func Map(path string, sz int64) ([]byte, error) { sz = fi.Size() } - data, err := syscall.Mmap(int(f.Fd()), 0, int(sz), syscall.PROT_READ, syscall.MAP_PRIVATE) - //data, err := syscall.Mmap(int(f.Fd()), 0, int(sz), syscall.PROT_READ, syscall.MAP_SHARED) + data, err := syscall.Mmap(int(f.Fd()), 0, int(sz), syscall.PROT_READ, syscall.MAP_SHARED) if err != nil { - return nil, errors.New("map err: " + fmt.Sprintf("%v", int(sz))) return nil, err } diff --git a/tsdb/series_file.go b/tsdb/series_file.go index 842b3671990..d7cd31f4f1c 100644 --- a/tsdb/series_file.go +++ b/tsdb/series_file.go @@ -89,7 +89,6 @@ func (f *SeriesFile) DisableMetrics() { func (f *SeriesFile) Open(ctx context.Context) error { f.mu.Lock() defer f.mu.Unlock() - f.Logger.Error("starting open") if f.res.Opened() { return errors.New("series file already opened") @@ -103,7 +102,6 @@ func (f *SeriesFile) Open(ctx context.Context) error { // Create path if it doesn't exist. if err := os.MkdirAll(filepath.Join(f.path), 0777); err != nil { - f.Logger.Error("creating path") return err } diff --git a/tsdb/series_partition.go b/tsdb/series_partition.go index f03898de1d3..b7fbab2ad41 100644 --- a/tsdb/series_partition.go +++ b/tsdb/series_partition.go @@ -8,7 +8,6 @@ import ( "io/ioutil" "os" "path/filepath" - "strconv" "sync" "time" @@ -84,7 +83,6 @@ func (p *SeriesPartition) Open() error { // Open components. if err := func() (err error) { if err := p.openSegments(); err != nil { - p.Logger.Error("open seg") return err } // Init last segment for writes. @@ -116,15 +114,12 @@ func (p *SeriesPartition) openSegments() error { for _, fi := range fis { segmentID, err := ParseSeriesSegmentFilename(fi.Name()) - p.Logger.Error("got segID: " + strconv.Itoa(int(segmentID)) + " from name: " + fi.Name()) if err != nil { - p.Logger.Error("got err" + fmt.Sprintf("%v", err)) continue } segment := NewSeriesSegment(segmentID, filepath.Join(p.path, fi.Name())) if err := segment.Open(); err != nil { - p.Logger.Error("segment err: " + fmt.Sprintf("%v", err)) return err } p.segments = append(p.segments, segment) diff --git a/tsdb/series_segment.go b/tsdb/series_segment.go index 3bbe5d94a73..c9571c22534 100644 --- a/tsdb/series_segment.go +++ b/tsdb/series_segment.go @@ -94,7 +94,6 @@ func (s *SeriesSegment) Open() error { if err := func() (err error) { // Memory map file data. if s.data, err = mmap.Map(s.path, int64(SeriesSegmentSize(s.id))); err != nil { - //return errors.New("map err: " + fmt.Sprintf("%v, size: %v", s.path, int64(SeriesSegmentSize(s.id)))) return err } diff --git a/tsdb/tsi1/index_file.go b/tsdb/tsi1/index_file.go index c959f61d4b7..445a991215e 100644 --- a/tsdb/tsi1/index_file.go +++ b/tsdb/tsi1/index_file.go @@ -118,7 +118,6 @@ func (f *IndexFile) Open() (err error) { // Try to acquire a reference to the series file. f.sfileref, err = f.sfile.Acquire() - f.sfile.Logger.Error("err before1") if err != nil { return err } @@ -128,11 +127,14 @@ func (f *IndexFile) Open() (err error) { data, err := mmap.Map(f.Path(), 0) if err != nil { + f.sfile.Logger.Error("err here1") f.sfileref.Release() return err } if err := f.UnmarshalBinary(data); err != nil { + f.sfile.Logger.Error("err here2") + f.sfileref.Release() f.Close() return err diff --git a/tsdb/tsi1/tsi1_report.go b/tsdb/tsi1/tsi1_report.go new file mode 100644 index 00000000000..3b9667f6dbc --- /dev/null +++ b/tsdb/tsi1/tsi1_report.go @@ -0,0 +1,555 @@ +package tsi1 + +import ( + "context" + "errors" + "fmt" + "io" + "io/ioutil" + "math" + "os" + "path/filepath" + "runtime" + "sort" + "strconv" + "sync/atomic" + "text/tabwriter" + + "github.com/influxdata/influxdb/tsdb" + "go.uber.org/zap" +) + +const ( + // Number of series IDs to stored in slice before we convert to a roaring + // bitmap. Roaring bitmaps have a non-trivial initial cost to construct. + useBitmapN = 25 +) + +// TsiReport represents the program execution for "influxd reporttsi". +type ReportTsi struct { + // Standard input/output, overridden for testing. + Stderr io.Writer + Stdout io.Writer + Logger *zap.Logger + + Path string + shardPaths map[uint64]string + shardIdxs map[uint64]*Index + cardinalities map[uint64]map[string]*cardinality + + seriesFilePath string // optional. Defaults to dbPath/_series + sfile *tsdb.SeriesFile + + topN int + byMeasurement bool + byTagKey bool + + // How many goroutines to dedicate to calculating cardinality. + Concurrency int +} + +// NewCommand returns a new instance of Command with default setting applied. +func NewReportTsi() *ReportTsi { + return &ReportTsi{ + Logger: zap.NewNop(), + shardIdxs: map[uint64]*Index{}, + shardPaths: map[uint64]string{}, + cardinalities: map[uint64]map[string]*cardinality{}, + topN: 0, + byMeasurement: true, + byTagKey: false, + Concurrency: runtime.GOMAXPROCS(0), + } +} + +func (report *ReportTsi) RunTsiReport() error { + // Open all the indexes. + // Walk engine to find first each series file, then each index file + //seriesFiles := make([]*tsdb.SeriesFile, 0) + seriesDir := filepath.Join(report.Path, "_series") + //seriesFileInfos, err := ioutil.ReadDir(seriesDir) + // if err != nil { + // return err + // } + report.Logger.Error("searching series: " + seriesDir) + sFile := tsdb.NewSeriesFile(seriesDir) + sFile.WithLogger(report.Logger) + if err := sFile.Open(context.Background()); err != nil { + report.Logger.Error("failed to open series") + return err + } + defer sFile.Close() + + // for _, seriesFolder := range seriesFileInfos { + // folder := filepath.Join(seriesDir, seriesFolder.Name()) + // //folderInfos, err := ioutil.ReadDir(folder) + // // if err != nil { + // // return err + // // // } + + // file, err := os.Open(folder) + // if err != nil { + // return err + // } + // fStat, err := file.Stat() + // if err != nil { + // return err + // } + + // if !fStat.IsDir() { + // report.Logger.Error("not a dir: " + folder) + // continue + // } + // // report.Logger.Error("appending seriesfile: " + folder) + + // sFile := tsdb.NewSeriesFile(folder) + // sFile.WithLogger(report.Logger) + // if err := sFile.Open(context.Background()); err != nil { + // report.Logger.Error("failed to open") + // return err + // } + // defer sFile.Close() + // seriesFiles = append(seriesFiles, sFile) + + // for _, seriesFile := range folderInfos { + // path := filepath.Join(folder, seriesFile.Name()) + // report.Logger.Error("adding seriesFile: " + path) + // sFile := tsdb.NewSeriesFile(path) + // sFile.WithLogger(report.Logger) + // if err := sFile.Open(context.Background()); err != nil { + // report.Logger.Error("failed") + // return err + // } + // defer sFile.Close() + // seriesFiles = append(seriesFiles, sFile) + // } + //} + + indexFiles := make([]*Index, 0) + indexDir := filepath.Join(report.Path, "index") + indexFileInfos, _ := ioutil.ReadDir(indexDir) + report.Logger.Error("searching index: " + indexDir) + for _, indexFile := range indexFileInfos { + path := filepath.Join(indexDir, indexFile.Name()) + + file, err := os.Open(path) + if err != nil { + return err + } + fStat, err := file.Stat() + if err != nil { + return err + } + + if !fStat.IsDir() { + report.Logger.Error("not a dir: " + path) + continue + } + report.Logger.Error("adding: " + path) + if ok, err := IsIndexDir(path); err != nil { + return err + } else if !ok { + return fmt.Errorf("not a TSI index directory: %q", path) + } + + id, err := strconv.Atoi(fStat.Name()) + if err != nil { + return err + } + + //indexFile := NewIndexFile(seriesFiles[len(indexFiles)]) + indexFile := NewIndex(sFile, NewConfig(), WithPath(path), DisableCompactions()) + report.Logger.Error("created new index") + if err := indexFile.Open(context.Background()); err != nil { + return err + } + defer indexFile.Close() + report.Logger.Error("finished opening") + indexFiles = append(indexFiles, indexFile) + report.shardIdxs[uint64(id)] = indexFile + report.shardPaths[uint64(id)] = path + report.cardinalities[uint64(id)] = map[string]*cardinality{} + report.Logger.Error("finished mapping") + } + + // Open all the indexes. + // Walk engine to find first each series file, then each index file + // TODO (me) we do not want manual entry of paths. We should be able to find all indexes + // start path at: ./influxdbv2/engine + // for id, pth := range report.shardPaths { + // pth = path.Join(pth, "index") + // // Verify directory is an index before opening it. + // if ok, err := IsIndexDir(pth); err != nil { + // return err + // } else if !ok { + // return fmt.Errorf("not a TSI index directory: %q", pth) + // } + + // report.shardIdxs[id] = NewIndex(report.sfile, + // NewConfig(), + // WithPath(pth), + // DisableCompactions(), + // ) + // if err := report.shardIdxs[id].Open(context.Background()); err != nil { + // return err + // } + // defer report.shardIdxs[id].Close() + + // // Initialise cardinality set to store cardinalities for this shard. + // report.cardinalities[id] = map[string]*cardinality{} + // } + + // Calculate cardinalities of shards. + fn := report.cardinalityByMeasurement + // if cmd.byTagKey { + // TODO(edd) + // } + + // Blocks until all work done. + report.calculateCardinalities(fn) + report.Logger.Error("calculateCard") + + // Print summary. + if err := report.printSummaryByMeasurement(); err != nil { + return err + } + report.Logger.Error("printed") + + allIDs := make([]uint64, 0, len(report.shardIdxs)) + for id := range report.shardIdxs { + allIDs = append(allIDs, id) + } + sort.Slice(allIDs, func(i int, j int) bool { return allIDs[i] < allIDs[j] }) + + for _, id := range allIDs { + if err := report.printShardByMeasurement(id); err != nil { + return err + } + } + return nil +} + +// calculateCardinalities calculates the cardinalities of the set of shard being +// worked on concurrently. The provided function determines how cardinality is +// calculated and broken down. +func (report *ReportTsi) calculateCardinalities(fn func(id uint64) error) error { + // Get list of shards to work on. + shardIDs := make([]uint64, 0, len(report.shardIdxs)) + for id := range report.shardIdxs { + shardIDs = append(shardIDs, id) + } + + errC := make(chan error, len(shardIDs)) + var maxi uint32 // index of maximumm shard being worked on. + for k := 0; k < report.Concurrency; k++ { + go func() { + for { + i := int(atomic.AddUint32(&maxi, 1) - 1) // Get next partition to work on. + if i >= len(shardIDs) { + return // No more work. + } + errC <- fn(shardIDs[i]) + } + }() + } + + // Check for error + for i := 0; i < cap(errC); i++ { + if err := <-errC; err != nil { + return err + } + } + return nil +} + +type cardinality struct { + name []byte + short []uint32 + set *tsdb.SeriesIDSet +} + +func (c *cardinality) add(x uint64) { + if c.set != nil { + c.set.AddNoLock(tsdb.NewSeriesID(x)) + return + } + + c.short = append(c.short, uint32(x)) // Series IDs never get beyond 2^32 + + // Cheaper to store in bitmap. + if len(c.short) > useBitmapN { + c.set = tsdb.NewSeriesIDSet() + for i := 0; i < len(c.short); i++ { + c.set.AddNoLock(tsdb.NewSeriesID(uint64(c.short[i]))) + } + c.short = nil + return + } +} + +func (c *cardinality) cardinality() int64 { + if c == nil || (c.short == nil && c.set == nil) { + return 0 + } + + if c.short != nil { + return int64(len(c.short)) + } + return int64(c.set.Cardinality()) +} + +type cardinalities []*cardinality + +func (a cardinalities) Len() int { return len(a) } +func (a cardinalities) Less(i, j int) bool { return a[i].cardinality() < a[j].cardinality() } +func (a cardinalities) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +func (report *ReportTsi) cardinalityByMeasurement(shardID uint64) error { + idx := report.shardIdxs[shardID] + itr, err := idx.MeasurementIterator() + if err != nil { + return err + } else if itr == nil { + return nil + } + defer itr.Close() + +OUTER: + for { + name, err := itr.Next() + if err != nil { + return err + } else if name == nil { + break OUTER + } + + // Get series ID set to track cardinality under measurement. + c, ok := report.cardinalities[shardID][string(name)] + if !ok { + c = &cardinality{name: name} + report.cardinalities[shardID][string(name)] = c + } + + sitr, err := idx.MeasurementSeriesIDIterator(name) + if err != nil { + return err + } else if sitr == nil { + continue + } + + var e tsdb.SeriesIDElem + for e, err = sitr.Next(); err == nil && e.SeriesID.ID != 0; e, err = sitr.Next() { + if e.SeriesID.ID > math.MaxUint32 { + panic(fmt.Sprintf("series ID is too large: %d (max %d). Corrupted series file?", e.SeriesID, uint32(math.MaxUint32))) + } + c.add(e.SeriesID.ID) + } + sitr.Close() + + if err != nil { + return err + } + } + return nil +} + +type result struct { + name []byte + count int64 + + // For low cardinality measurements just track series using map + lowCardinality map[uint32]struct{} + + // For higher cardinality measurements track using bitmap. + set *tsdb.SeriesIDSet +} + +func (r *result) addShort(ids []uint32) { + // There is already a bitset of this result. + if r.set != nil { + for _, id := range ids { + r.set.AddNoLock(tsdb.NewSeriesID(uint64(id))) + } + return + } + + // Still tracking low cardinality sets + if r.lowCardinality == nil { + r.lowCardinality = map[uint32]struct{}{} + } + + for _, id := range ids { + r.lowCardinality[id] = struct{}{} + } + + // Cardinality is large enough that we will benefit from using a bitmap + if len(r.lowCardinality) > useBitmapN { + r.set = tsdb.NewSeriesIDSet() + for id := range r.lowCardinality { + r.set.AddNoLock(tsdb.NewSeriesID(uint64(id))) + } + r.lowCardinality = nil + } +} + +func (r *result) merge(other *tsdb.SeriesIDSet) { + if r.set == nil { + r.set = tsdb.NewSeriesIDSet() + for id := range r.lowCardinality { + r.set.AddNoLock(tsdb.NewSeriesID(uint64(id))) + } + r.lowCardinality = nil + } + r.set.Merge(other) +} + +type results []*result + +func (a results) Len() int { return len(a) } +func (a results) Less(i, j int) bool { return a[i].count < a[j].count } +func (a results) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +func (report *ReportTsi) printSummaryByMeasurement() error { + //Get global set of measurement names across shards. + //idxs := &tsdb.IndexSet{SeriesFile: report.sfile} + // for _, idx := range report.shardIdxs { + // idxs.Indexes = append(idxs.Indexes, idx) + // } + + // we are going to get a measurement iterator for each index + count := 0 + var mitr tsdb.MeasurementIterator + for _, index := range report.shardIdxs { + small, err := index.MeasurementIterator() + name, _ := small.Next() + report.Logger.Error("called small.Next1 " + strconv.Itoa(len(name))) + name, _ = small.Next() + report.Logger.Error("called small.Next2 " + strconv.Itoa(len(name))) + name, _ = small.Next() + report.Logger.Error("called small.Next3 " + strconv.Itoa(len(name))) + if err != nil { + return err + } else if small == nil { + return errors.New("got nil measurement iterator for index set") + } + //defer small.Close() + // name, _ := small.Next() + // report.Logger.Error("called small.Next " + string(name)) + mitr = tsdb.MergeMeasurementIterators(mitr, small) + count++ + } + //defer mitr.Close() + report.Logger.Error("alright we got " + strconv.Itoa(count)) + report.Logger.Error("calling mitr next") + name, _ := mitr.Next() + report.Logger.Error("mitr next: " + string(name)) + + //var name []byte + var totalCardinality int64 + measurements := results{} + for name, err := mitr.Next(); err == nil && name != nil; name, err = mitr.Next() { + res := &result{name: name} + report.Logger.Error("name: " + string(name)) + for _, shardCards := range report.cardinalities { + report.Logger.Error("cards: " + strconv.Itoa(len(shardCards))) + other, ok := shardCards[string(name)] + if !ok { + continue // this shard doesn't have anything for this measurement. + } + + if other.short != nil && other.set != nil { + panic("cardinality stored incorrectly") + } + + if other.short != nil { // low cardinality case + res.addShort(other.short) + } else if other.set != nil { // High cardinality case + res.merge(other.set) + } + + // Shard does not have any series for this measurement. + } + + // Determine final cardinality and allow intermediate structures to be + // GCd. + if res.lowCardinality != nil { + res.count = int64(len(res.lowCardinality)) + } else { + res.count = int64(res.set.Cardinality()) + } + totalCardinality += res.count + res.set = nil + res.lowCardinality = nil + measurements = append(measurements, res) + } + + // if err != nil { + // return err + // } + + // sort measurements by cardinality. + sort.Sort(sort.Reverse(measurements)) + + if report.topN > 0 { + // There may not be "topN" measurement cardinality to sub-slice. + n := int(math.Min(float64(report.topN), float64(len(measurements)))) + measurements = measurements[:n] + } + + tw := tabwriter.NewWriter(report.Stdout, 4, 4, 1, '\t', 0) + fmt.Fprintf(tw, "Summary\nDatabase Path: %s\nCardinality (exact): %d\n\n", report.Path, totalCardinality) + fmt.Fprint(tw, "Measurement\tCardinality (exact)\n\n") + for _, res := range measurements { + fmt.Fprintf(tw, "%q\t\t%d\n", res.name, res.count) + } + + if err := tw.Flush(); err != nil { + return err + } + fmt.Fprint(report.Stdout, "\n\n") + return nil +} + +func (report *ReportTsi) printShardByMeasurement(id uint64) error { + allMap, ok := report.cardinalities[id] + if !ok { + return nil + } + + var totalCardinality int64 + all := make(cardinalities, 0, len(allMap)) + for _, card := range allMap { + n := card.cardinality() + if n == 0 { + continue + } + + totalCardinality += n + all = append(all, card) + report.Logger.Error("appended to all") + } + + sort.Sort(sort.Reverse(all)) + + // Trim to top-n + if report.topN > 0 { + // There may not be "topN" measurement cardinality to sub-slice. + n := int(math.Min(float64(report.topN), float64(len(all)))) + all = all[:n] + } + report.Logger.Error("shard: " + strconv.Itoa(int(id)) + ", len " + strconv.Itoa(len(allMap))) + report.Logger.Error("shard: " + strconv.Itoa(int(id)) + ", path " + report.shardPaths[id]) + + tw := tabwriter.NewWriter(report.Stdout, 4, 4, 1, '\t', 0) + fmt.Fprintf(tw, "===============\nShard ID: %d\nPath: %s\nCardinality (exact): %d\n\n", id, report.shardPaths[id], totalCardinality) + fmt.Fprint(tw, "Measurement\tCardinality (exact)\n\n") + for _, card := range all { + fmt.Fprintf(tw, "%q\t\t%d\n", card.name, card.cardinality()) + } + fmt.Fprint(tw, "===============\n\n") + if err := tw.Flush(); err != nil { + return err + } + fmt.Fprint(report.Stdout, "\n\n") + return nil +} From 36e578122e40aafba9446a014f6e9355a0c97482 Mon Sep 17 00:00:00 2001 From: Max U Date: Mon, 22 Jul 2019 16:30:55 -0700 Subject: [PATCH 03/18] feat(tsi): placeholder --- tsdb/tsi1/tsi1_report.go | 149 ++++++++++++++++++++++----------------- 1 file changed, 84 insertions(+), 65 deletions(-) diff --git a/tsdb/tsi1/tsi1_report.go b/tsdb/tsi1/tsi1_report.go index 3b9667f6dbc..b3118ced1e4 100644 --- a/tsdb/tsi1/tsi1_report.go +++ b/tsdb/tsi1/tsi1_report.go @@ -2,12 +2,9 @@ package tsi1 import ( "context" - "errors" "fmt" "io" - "io/ioutil" "math" - "os" "path/filepath" "runtime" "sort" @@ -39,6 +36,7 @@ type ReportTsi struct { seriesFilePath string // optional. Defaults to dbPath/_series sfile *tsdb.SeriesFile + indexFile *Index topN int byMeasurement bool @@ -124,53 +122,69 @@ func (report *ReportTsi) RunTsiReport() error { // seriesFiles = append(seriesFiles, sFile) // } //} + report.indexFile = NewIndex(sFile, NewConfig(), WithPath(filepath.Join(report.Path, "index")), DisableCompactions()) + report.Logger.Error("created new index") + if err := report.indexFile.Open(context.Background()); err != nil { + return err + } + defer report.indexFile.Close() + report.Logger.Error("finished opening") + mitr, err := report.indexFile.MeasurementIterator() + if err != nil { + return err + } + n, err := mitr.Next() + if err != nil { + report.Logger.Error("err on next") + } + report.Logger.Error("mitr: " + string(n)) - indexFiles := make([]*Index, 0) - indexDir := filepath.Join(report.Path, "index") - indexFileInfos, _ := ioutil.ReadDir(indexDir) - report.Logger.Error("searching index: " + indexDir) - for _, indexFile := range indexFileInfos { - path := filepath.Join(indexDir, indexFile.Name()) + // indexFiles := make([]*Index, 0) + // indexDir := filepath.Join(report.Path, "index") + // indexFileInfos, _ := ioutil.ReadDir(indexDir) + // report.Logger.Error("searching index: " + indexDir) + // for _, indexFile := range indexFileInfos { + // path := filepath.Join(indexDir, indexFile.Name()) - file, err := os.Open(path) - if err != nil { - return err - } - fStat, err := file.Stat() - if err != nil { - return err - } + // file, err := os.Open(path) + // if err != nil { + // return err + // } + // fStat, err := file.Stat() + // if err != nil { + // return err + // } - if !fStat.IsDir() { - report.Logger.Error("not a dir: " + path) - continue - } - report.Logger.Error("adding: " + path) - if ok, err := IsIndexDir(path); err != nil { - return err - } else if !ok { - return fmt.Errorf("not a TSI index directory: %q", path) - } + // if !fStat.IsDir() { + // report.Logger.Error("not a dir: " + path) + // continue + // } + // report.Logger.Error("adding: " + path) + // if ok, err := IsIndexDir(path); err != nil { + // return err + // } else if !ok { + // return fmt.Errorf("not a TSI index directory: %q", path) + // } - id, err := strconv.Atoi(fStat.Name()) - if err != nil { - return err - } + // id, err := strconv.Atoi(fStat.Name()) + // if err != nil { + // return err + // } - //indexFile := NewIndexFile(seriesFiles[len(indexFiles)]) - indexFile := NewIndex(sFile, NewConfig(), WithPath(path), DisableCompactions()) - report.Logger.Error("created new index") - if err := indexFile.Open(context.Background()); err != nil { - return err - } - defer indexFile.Close() - report.Logger.Error("finished opening") - indexFiles = append(indexFiles, indexFile) - report.shardIdxs[uint64(id)] = indexFile - report.shardPaths[uint64(id)] = path - report.cardinalities[uint64(id)] = map[string]*cardinality{} - report.Logger.Error("finished mapping") - } + // //indexFile := NewIndexFile(seriesFiles[len(indexFiles)]) + // indexFile := NewIndex(sFile, NewConfig(), WithPath(path), DisableCompactions()) + // report.Logger.Error("created new index") + // if err := indexFile.Open(context.Background()); err != nil { + // return err + // } + // defer indexFile.Close() + // report.Logger.Error("finished opening") + // indexFiles = append(indexFiles, indexFile) + // report.shardIdxs[uint64(id)] = indexFile + // report.shardPaths[uint64(id)] = path + // report.cardinalities[uint64(id)] = map[string]*cardinality{} + // report.Logger.Error("finished mapping") + // } // Open all the indexes. // Walk engine to find first each series file, then each index file @@ -417,29 +431,34 @@ func (report *ReportTsi) printSummaryByMeasurement() error { // } // we are going to get a measurement iterator for each index - count := 0 var mitr tsdb.MeasurementIterator - for _, index := range report.shardIdxs { - small, err := index.MeasurementIterator() - name, _ := small.Next() - report.Logger.Error("called small.Next1 " + strconv.Itoa(len(name))) - name, _ = small.Next() - report.Logger.Error("called small.Next2 " + strconv.Itoa(len(name))) - name, _ = small.Next() - report.Logger.Error("called small.Next3 " + strconv.Itoa(len(name))) - if err != nil { - return err - } else if small == nil { - return errors.New("got nil measurement iterator for index set") - } - //defer small.Close() - // name, _ := small.Next() - // report.Logger.Error("called small.Next " + string(name)) - mitr = tsdb.MergeMeasurementIterators(mitr, small) - count++ + mitr, err := report.indexFile.MeasurementIterator() + if err != nil { + report.Logger.Error("got err") + return err } + defer mitr.Close() + + // for _, index := range report.shardIdxs { + // small, err := index.MeasurementIterator() + // name, _ := small.Next() + // report.Logger.Error("called small.Next1 " + strconv.Itoa(len(name))) + // name, _ = small.Next() + // report.Logger.Error("called small.Next2 " + strconv.Itoa(len(name))) + // name, _ = small.Next() + // report.Logger.Error("called small.Next3 " + strconv.Itoa(len(name))) + // if err != nil { + // return err + // } else if small == nil { + // return errors.New("got nil measurement iterator for index set") + // } + // //defer small.Close() + // // name, _ := small.Next() + // // report.Logger.Error("called small.Next " + string(name)) + // mitr = tsdb.MergeMeasurementIterators(mitr, small) + // count++ + // } //defer mitr.Close() - report.Logger.Error("alright we got " + strconv.Itoa(count)) report.Logger.Error("calling mitr next") name, _ := mitr.Next() report.Logger.Error("mitr next: " + string(name)) From eb6d0f447885011c5790faa0be04465be4d96cba Mon Sep 17 00:00:00 2001 From: Max U Date: Tue, 23 Jul 2019 11:58:07 -0700 Subject: [PATCH 04/18] feat(tsi): report cardinality for all indexes, still needs to be cleaned Fix iteration logic and clean up --- cmd/influxd/inspect/report_tsi1.go | 105 ++++++ tsdb/tsi1/index_file.go | 3 - tsdb/tsi1/tsi1_report.go | 556 +++++++++++++---------------- 3 files changed, 347 insertions(+), 317 deletions(-) create mode 100644 cmd/influxd/inspect/report_tsi1.go diff --git a/cmd/influxd/inspect/report_tsi1.go b/cmd/influxd/inspect/report_tsi1.go new file mode 100644 index 00000000000..7f70bc0a817 --- /dev/null +++ b/cmd/influxd/inspect/report_tsi1.go @@ -0,0 +1,105 @@ +package inspect + +import ( + "errors" + "io" + "os" + "runtime" + + "github.com/influxdata/influxdb" + + "github.com/influxdata/influxdb/logger" + "github.com/influxdata/influxdb/tsdb" + "github.com/influxdata/influxdb/tsdb/tsi1" + "github.com/spf13/cobra" + "go.uber.org/zap/zapcore" +) + +// Command represents the program execution for "influxd reporttsi". +var tsiFlags = struct { + // Standard input/output, overridden for testing. + Stderr io.Writer + Stdout io.Writer + + Path string + org string + bucket string + + seriesFilePath string // optional. Defaults to dbPath/_series + sfile *tsdb.SeriesFile + + topN int + byMeasurement bool + byTagKey bool + + // How many goroutines to dedicate to calculating cardinality. + concurrency int +}{} + +// NewReportTsiCommand returns a new instance of Command with default setting applied. +func NewReportTsiCommand() *cobra.Command { + reportTsiCommand := &cobra.Command{ + Use: "report-tsi", + Short: "Reports the cardinality of tsi files short", + Long: `Reports the cardinality of tsi files long.`, + RunE: RunReportTsi, + } + reportTsiCommand.Flags().StringVar(&tsiFlags.Path, "path", os.Getenv("HOME")+"/.influxdbv2/engine", "Path to data engine. Defaults $HOME/.influxdbv2/engine") + reportTsiCommand.Flags().StringVar(&tsiFlags.seriesFilePath, "series-file", "", "Optional path to series file. Defaults /path/to/db-path/_series") + reportTsiCommand.Flags().BoolVar(&tsiFlags.byMeasurement, "measurements", true, "Segment cardinality by measurements") + // TODO(edd): Not yet implemented. + // fs.BoolVar(&cmd.byTagKey, "tag-key", false, "Segment cardinality by tag keys (overrides `measurements`") + reportTsiCommand.Flags().IntVar(&tsiFlags.topN, "top", 0, "Limit results to top n") + reportTsiCommand.Flags().IntVar(&tsiFlags.concurrency, "c", runtime.GOMAXPROCS(0), "Set worker concurrency. Defaults to GOMAXPROCS setting.") + reportTsiCommand.Flags().StringVar(&tsiFlags.bucket, "bucket", "", "If bucket is specified, org must be specified") + reportTsiCommand.Flags().StringVar(&tsiFlags.org, "org", "", "org to be searched") + + reportTsiCommand.SetOutput(tsiFlags.Stdout) + + return reportTsiCommand +} + +// RunReportTsi executes the run command for ReportTsi. +func RunReportTsi(cmd *cobra.Command, args []string) error { + // set up log + config := logger.NewConfig() + config.Level = zapcore.InfoLevel + log, err := config.New(os.Stderr) + // do some filepath walking, we are looking for index files + //dir := os.Getenv("HOME") + "/.influxdbv2/engine/index" + + // if path is unset, set to os.Getenv("HOME") + "/.influxdbv2/engine" + if tsiFlags.Path == "" { + tsiFlags.Path = os.Getenv("HOME") + "/.influxdbv2/engine" + } + + report := tsi1.NewReportCommand() + report.Concurrency = tsiFlags.concurrency + report.DataPath = tsiFlags.Path + report.Logger = log + + if tsiFlags.org != "" { + if orgID, err := influxdb.IDFromString(tsiFlags.org); err != nil { + return err + } else { + report.OrgID = orgID + } + } + + if tsiFlags.bucket != "" { + if bucketID, err := influxdb.IDFromString(tsiFlags.bucket); err != nil { + return err + } else if report.OrgID == nil { + return errors.New("org must be provided if filtering by bucket ") + } else { + report.BucketID = bucketID + } + } + + report.Logger.Error("running report") + err = report.Run() + if err != nil { + return err + } + return nil +} diff --git a/tsdb/tsi1/index_file.go b/tsdb/tsi1/index_file.go index 445a991215e..7881ec5d565 100644 --- a/tsdb/tsi1/index_file.go +++ b/tsdb/tsi1/index_file.go @@ -127,14 +127,11 @@ func (f *IndexFile) Open() (err error) { data, err := mmap.Map(f.Path(), 0) if err != nil { - f.sfile.Logger.Error("err here1") f.sfileref.Release() return err } if err := f.UnmarshalBinary(data); err != nil { - f.sfile.Logger.Error("err here2") - f.sfileref.Release() f.Close() return err diff --git a/tsdb/tsi1/tsi1_report.go b/tsdb/tsi1/tsi1_report.go index b3118ced1e4..3a96a4992c7 100644 --- a/tsdb/tsi1/tsi1_report.go +++ b/tsdb/tsi1/tsi1_report.go @@ -5,13 +5,12 @@ import ( "fmt" "io" "math" + "os" "path/filepath" "runtime" - "sort" - "strconv" - "sync/atomic" "text/tabwriter" + "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/tsdb" "go.uber.org/zap" ) @@ -22,21 +21,23 @@ const ( useBitmapN = 25 ) -// TsiReport represents the program execution for "influxd reporttsi". -type ReportTsi struct { +// ReportCommand represents the program execution for "influxd reporttsi". +type ReportCommand struct { // Standard input/output, overridden for testing. Stderr io.Writer Stdout io.Writer Logger *zap.Logger - Path string - shardPaths map[uint64]string - shardIdxs map[uint64]*Index - cardinalities map[uint64]map[string]*cardinality + // Filters + DataPath string + OrgID, BucketID *influxdb.ID - seriesFilePath string // optional. Defaults to dbPath/_series - sfile *tsdb.SeriesFile - indexFile *Index + // Maps org and bucket IDs with measurement name + orgBucketCardinality map[influxdb.ID]map[influxdb.ID]*cardinality + + SeriesDirPath string // optional. Defaults to dbPath/_series + sfile *tsdb.SeriesFile + indexFile *Index topN int byMeasurement bool @@ -47,231 +48,97 @@ type ReportTsi struct { } // NewCommand returns a new instance of Command with default setting applied. -func NewReportTsi() *ReportTsi { - return &ReportTsi{ - Logger: zap.NewNop(), - shardIdxs: map[uint64]*Index{}, - shardPaths: map[uint64]string{}, - cardinalities: map[uint64]map[string]*cardinality{}, - topN: 0, - byMeasurement: true, - byTagKey: false, - Concurrency: runtime.GOMAXPROCS(0), +func NewReportCommand() *ReportCommand { + return &ReportCommand{ + Logger: zap.NewNop(), + orgBucketCardinality: make(map[influxdb.ID]map[influxdb.ID]*cardinality), + topN: 0, + byMeasurement: true, + byTagKey: false, + Concurrency: runtime.GOMAXPROCS(0), } } -func (report *ReportTsi) RunTsiReport() error { - // Open all the indexes. - // Walk engine to find first each series file, then each index file - //seriesFiles := make([]*tsdb.SeriesFile, 0) - seriesDir := filepath.Join(report.Path, "_series") - //seriesFileInfos, err := ioutil.ReadDir(seriesDir) - // if err != nil { - // return err - // } - report.Logger.Error("searching series: " + seriesDir) - sFile := tsdb.NewSeriesFile(seriesDir) +func (report *ReportCommand) Run() error { + report.Stdout = os.Stdout + + if report.SeriesDirPath == "" { + report.SeriesDirPath = filepath.Join(report.DataPath, "_series") + } + + sFile := tsdb.NewSeriesFile(report.SeriesDirPath) sFile.WithLogger(report.Logger) if err := sFile.Open(context.Background()); err != nil { report.Logger.Error("failed to open series") return err } defer sFile.Close() + report.sfile = sFile - // for _, seriesFolder := range seriesFileInfos { - // folder := filepath.Join(seriesDir, seriesFolder.Name()) - // //folderInfos, err := ioutil.ReadDir(folder) - // // if err != nil { - // // return err - // // // } - - // file, err := os.Open(folder) - // if err != nil { - // return err - // } - // fStat, err := file.Stat() - // if err != nil { - // return err - // } - - // if !fStat.IsDir() { - // report.Logger.Error("not a dir: " + folder) - // continue - // } - // // report.Logger.Error("appending seriesfile: " + folder) - - // sFile := tsdb.NewSeriesFile(folder) - // sFile.WithLogger(report.Logger) - // if err := sFile.Open(context.Background()); err != nil { - // report.Logger.Error("failed to open") - // return err - // } - // defer sFile.Close() - // seriesFiles = append(seriesFiles, sFile) - - // for _, seriesFile := range folderInfos { - // path := filepath.Join(folder, seriesFile.Name()) - // report.Logger.Error("adding seriesFile: " + path) - // sFile := tsdb.NewSeriesFile(path) - // sFile.WithLogger(report.Logger) - // if err := sFile.Open(context.Background()); err != nil { - // report.Logger.Error("failed") - // return err - // } - // defer sFile.Close() - // seriesFiles = append(seriesFiles, sFile) - // } - //} - report.indexFile = NewIndex(sFile, NewConfig(), WithPath(filepath.Join(report.Path, "index")), DisableCompactions()) - report.Logger.Error("created new index") + path := filepath.Join(report.DataPath, "index") + report.indexFile = NewIndex(sFile, NewConfig(), WithPath(path)) if err := report.indexFile.Open(context.Background()); err != nil { return err } defer report.indexFile.Close() - report.Logger.Error("finished opening") - mitr, err := report.indexFile.MeasurementIterator() - if err != nil { - return err - } - n, err := mitr.Next() - if err != nil { - report.Logger.Error("err on next") - } - report.Logger.Error("mitr: " + string(n)) - // indexFiles := make([]*Index, 0) - // indexDir := filepath.Join(report.Path, "index") - // indexFileInfos, _ := ioutil.ReadDir(indexDir) - // report.Logger.Error("searching index: " + indexDir) - // for _, indexFile := range indexFileInfos { - // path := filepath.Join(indexDir, indexFile.Name()) - - // file, err := os.Open(path) - // if err != nil { - // return err - // } - // fStat, err := file.Stat() - // if err != nil { - // return err - // } - - // if !fStat.IsDir() { - // report.Logger.Error("not a dir: " + path) - // continue - // } - // report.Logger.Error("adding: " + path) - // if ok, err := IsIndexDir(path); err != nil { - // return err - // } else if !ok { - // return fmt.Errorf("not a TSI index directory: %q", path) - // } - - // id, err := strconv.Atoi(fStat.Name()) - // if err != nil { - // return err - // } - - // //indexFile := NewIndexFile(seriesFiles[len(indexFiles)]) - // indexFile := NewIndex(sFile, NewConfig(), WithPath(path), DisableCompactions()) - // report.Logger.Error("created new index") - // if err := indexFile.Open(context.Background()); err != nil { - // return err - // } - // defer indexFile.Close() - // report.Logger.Error("finished opening") - // indexFiles = append(indexFiles, indexFile) - // report.shardIdxs[uint64(id)] = indexFile - // report.shardPaths[uint64(id)] = path - // report.cardinalities[uint64(id)] = map[string]*cardinality{} - // report.Logger.Error("finished mapping") - // } - - // Open all the indexes. - // Walk engine to find first each series file, then each index file - // TODO (me) we do not want manual entry of paths. We should be able to find all indexes - // start path at: ./influxdbv2/engine - // for id, pth := range report.shardPaths { - // pth = path.Join(pth, "index") - // // Verify directory is an index before opening it. - // if ok, err := IsIndexDir(pth); err != nil { - // return err - // } else if !ok { - // return fmt.Errorf("not a TSI index directory: %q", pth) - // } - - // report.shardIdxs[id] = NewIndex(report.sfile, - // NewConfig(), - // WithPath(pth), - // DisableCompactions(), - // ) - // if err := report.shardIdxs[id].Open(context.Background()); err != nil { - // return err - // } - // defer report.shardIdxs[id].Close() - - // // Initialise cardinality set to store cardinalities for this shard. - // report.cardinalities[id] = map[string]*cardinality{} - // } - - // Calculate cardinalities of shards. + // Calculate cardinalities for every org and bucket fn := report.cardinalityByMeasurement - // if cmd.byTagKey { - // TODO(edd) - // } // Blocks until all work done. report.calculateCardinalities(fn) - report.Logger.Error("calculateCard") // Print summary. if err := report.printSummaryByMeasurement(); err != nil { return err } - report.Logger.Error("printed") - - allIDs := make([]uint64, 0, len(report.shardIdxs)) - for id := range report.shardIdxs { - allIDs = append(allIDs, id) - } - sort.Slice(allIDs, func(i int, j int) bool { return allIDs[i] < allIDs[j] }) - for _, id := range allIDs { - if err := report.printShardByMeasurement(id); err != nil { - return err - } - } + //allIDs := make([]uint64, 0, len(report.shardIdxs)) + //for id := range report.shardIdxs { + // allIDs = append(allIDs, id) + //} + //sort.Slice(allIDs, func(i int, j int) bool { return allIDs[i] < allIDs[j] }) + // + //for _, id := range allIDs { + // if err := report.printShardByMeasurement(id); err != nil { + // return err + // } + //} return nil } // calculateCardinalities calculates the cardinalities of the set of shard being // worked on concurrently. The provided function determines how cardinality is // calculated and broken down. -func (report *ReportTsi) calculateCardinalities(fn func(id uint64) error) error { +func (report *ReportCommand) calculateCardinalities(fn func() error) error { // Get list of shards to work on. - shardIDs := make([]uint64, 0, len(report.shardIdxs)) - for id := range report.shardIdxs { - shardIDs = append(shardIDs, id) - } - - errC := make(chan error, len(shardIDs)) - var maxi uint32 // index of maximumm shard being worked on. - for k := 0; k < report.Concurrency; k++ { - go func() { - for { - i := int(atomic.AddUint32(&maxi, 1) - 1) // Get next partition to work on. - if i >= len(shardIDs) { - return // No more work. - } - errC <- fn(shardIDs[i]) - } - }() - } - - // Check for error - for i := 0; i < cap(errC); i++ { - if err := <-errC; err != nil { - return err - } + //shardIDs := make([]uint64, 0, len(report.shardIdxs)) + //for id := range report.shardIdxs { + // shardIDs = append(shardIDs, id) + //} + // + //errC := make(chan error, len(shardIDs)) + //var maxi uint32 // index of maximumm shard being worked on. + //for k := 0; k < report.Concurrency; k++ { + // go func() { + // for { + // i := int(atomic.AddUint32(&maxi, 1) - 1) // Get next partition to work on. + // if i >= len(shardIDs) { + // return // No more work. + // } + // errC <- fn(shardIDs[i]) + // } + // }() + //} + // + //// Check for error + //for i := 0; i < cap(errC); i++ { + // if err := <-errC; err != nil { + // return err + // } + //} + if err := fn(); err != nil { + return err } return nil } @@ -318,8 +185,8 @@ func (a cardinalities) Len() int { return len(a) } func (a cardinalities) Less(i, j int) bool { return a[i].cardinality() < a[j].cardinality() } func (a cardinalities) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (report *ReportTsi) cardinalityByMeasurement(shardID uint64) error { - idx := report.shardIdxs[shardID] +func (report *ReportCommand) cardinalityByMeasurement() error { + idx := report.indexFile itr, err := idx.MeasurementIterator() if err != nil { return err @@ -337,11 +204,14 @@ OUTER: break OUTER } - // Get series ID set to track cardinality under measurement. - c, ok := report.cardinalities[shardID][string(name)] - if !ok { - c = &cardinality{name: name} - report.cardinalities[shardID][string(name)] = c + var a [16]byte // TODO(edd) if this shows up we can use a different API to DecodeName. + copy(a[:], name[:16]) + org, bucket := tsdb.DecodeName(a) + + if report.OrgID != nil && *report.OrgID != org { + continue + } else if report.BucketID != nil && *report.BucketID != bucket { + continue } sitr, err := idx.MeasurementSeriesIDIterator(name) @@ -351,13 +221,30 @@ OUTER: continue } + // initialize map of bucket to cardinality + if _, ok := report.orgBucketCardinality[org]; !ok { + report.orgBucketCardinality[org] = make(map[influxdb.ID]*cardinality) + } + + var card *cardinality + if c, ok := report.orgBucketCardinality[org][bucket]; !ok { + card = &cardinality{name: []byte(bucket.String())} + report.orgBucketCardinality[org][bucket] = card + } else { + card = c + } + var e tsdb.SeriesIDElem for e, err = sitr.Next(); err == nil && e.SeriesID.ID != 0; e, err = sitr.Next() { - if e.SeriesID.ID > math.MaxUint32 { + id := e.SeriesID.ID + if id > math.MaxUint32 { panic(fmt.Sprintf("series ID is too large: %d (max %d). Corrupted series file?", e.SeriesID, uint32(math.MaxUint32))) } - c.add(e.SeriesID.ID) + // note: first tag in array (from sfile.Series(id) is measurement + + card.add(id) } + sitr.Close() if err != nil { @@ -423,7 +310,7 @@ func (a results) Len() int { return len(a) } func (a results) Less(i, j int) bool { return a[i].count < a[j].count } func (a results) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (report *ReportTsi) printSummaryByMeasurement() error { +func (report *ReportCommand) printSummaryByMeasurement() error { //Get global set of measurement names across shards. //idxs := &tsdb.IndexSet{SeriesFile: report.sfile} // for _, idx := range report.shardIdxs { @@ -431,13 +318,13 @@ func (report *ReportTsi) printSummaryByMeasurement() error { // } // we are going to get a measurement iterator for each index - var mitr tsdb.MeasurementIterator - mitr, err := report.indexFile.MeasurementIterator() - if err != nil { - report.Logger.Error("got err") - return err - } - defer mitr.Close() + //var mitr tsdb.MeasurementIterator + //mitr, err := report.indexFile.MeasurementIterator() + //if err != nil { + // report.Logger.Error("got err") + // return err + //} + //defer mitr.Close() // for _, index := range report.shardIdxs { // small, err := index.MeasurementIterator() @@ -459,116 +346,157 @@ func (report *ReportTsi) printSummaryByMeasurement() error { // count++ // } //defer mitr.Close() - report.Logger.Error("calling mitr next") - name, _ := mitr.Next() - report.Logger.Error("mitr next: " + string(name)) + // report.Logger.Error("calling mitr next") + // name, _ := mitr.Next() + // report.Logger.Error("mitr next: " + string(name)) //var name []byte - var totalCardinality int64 - measurements := results{} - for name, err := mitr.Next(); err == nil && name != nil; name, err = mitr.Next() { - res := &result{name: name} - report.Logger.Error("name: " + string(name)) - for _, shardCards := range report.cardinalities { - report.Logger.Error("cards: " + strconv.Itoa(len(shardCards))) - other, ok := shardCards[string(name)] - if !ok { - continue // this shard doesn't have anything for this measurement. - } - - if other.short != nil && other.set != nil { - panic("cardinality stored incorrectly") - } - - if other.short != nil { // low cardinality case - res.addShort(other.short) - } else if other.set != nil { // High cardinality case - res.merge(other.set) - } - - // Shard does not have any series for this measurement. - } - - // Determine final cardinality and allow intermediate structures to be - // GCd. - if res.lowCardinality != nil { - res.count = int64(len(res.lowCardinality)) - } else { - res.count = int64(res.set.Cardinality()) - } - totalCardinality += res.count - res.set = nil - res.lowCardinality = nil - measurements = append(measurements, res) - } + //var totalCardinality int64 + //measurements := results{} + //for name, err := mitr.Next(); err == nil && name != nil; name, err = mitr.Next() { + // res := &result{name: name} + // for _, shardCards := range report.cardinalities { + // other, ok := shardCards[string(name)] + // if !ok { + // continue // this shard doesn't have anything for this measurement. + // } + // + // if other.short != nil && other.set != nil { + // panic("cardinality stored incorrectly") + // } + // + // if other.short != nil { // low cardinality case + // res.addShort(other.short) + // } else if other.set != nil { // High cardinality case + // res.merge(other.set) + // } + // + // // Shard does not have any series for this measurement. + // } + // + // // Determine final cardinality and allow intermediate structures to be + // // GCd. + // if res.lowCardinality != nil { + // res.count = int64(len(res.lowCardinality)) + // } else { + // res.count = int64(res.set.Cardinality()) + // } + // totalCardinality += res.count + // res.set = nil + // res.lowCardinality = nil + // measurements = append(measurements, res) + //} // if err != nil { // return err // } // sort measurements by cardinality. - sort.Sort(sort.Reverse(measurements)) - - if report.topN > 0 { - // There may not be "topN" measurement cardinality to sub-slice. - n := int(math.Min(float64(report.topN), float64(len(measurements)))) - measurements = measurements[:n] - } + //sort.Sort(sort.Reverse(measurements)) + // + //if report.topN > 0 { + // // There may not be "topN" measurement cardinality to sub-slice. + // n := int(math.Min(float64(report.topN), float64(len(measurements)))) + // measurements = measurements[:n] + //} - tw := tabwriter.NewWriter(report.Stdout, 4, 4, 1, '\t', 0) - fmt.Fprintf(tw, "Summary\nDatabase Path: %s\nCardinality (exact): %d\n\n", report.Path, totalCardinality) - fmt.Fprint(tw, "Measurement\tCardinality (exact)\n\n") - for _, res := range measurements { - fmt.Fprintf(tw, "%q\t\t%d\n", res.name, res.count) - } + // tw := tabwriter.NewWriter(report.Stdout, 4, 4, 1, '\t', 0) + // fmt.Fprintf(tw, "Summary\nDatabase Path: %s\nCardinality (exact): %d\n\n", report.Path, totalCardinality) + // fmt.Fprint(tw, "Measurement\tCardinality (exact)\n\n") + // for _, res := range measurements { + // fmt.Fprintf(tw, "%q\t\t%d\n", res.name, res.count) + // } - if err := tw.Flush(); err != nil { - return err - } - fmt.Fprint(report.Stdout, "\n\n") + // if err := tw.Flush(); err != nil { + // return err + // } + // fmt.Fprint(report.Stdout, "\n\n") + report.printOrgBucketCardinality() return nil } -func (report *ReportTsi) printShardByMeasurement(id uint64) error { - allMap, ok := report.cardinalities[id] - if !ok { - return nil - } +//func (report *ReportCommand) printShardByMeasurement(id uint64) error { +// allMap, ok := report.cardinalities[id] +// if !ok { +// return nil +// } +// +// var totalCardinality int64 +// all := make(cardinalities, 0, len(allMap)) +// for _, card := range allMap { +// n := card.cardinality() +// if n == 0 { +// continue +// } +// +// totalCardinality += n +// all = append(all, card) +// } +// +// sort.Sort(sort.Reverse(all)) +// +// // Trim to top-n +// if report.topN > 0 { +// // There may not be "topN" measurement cardinality to sub-slice. +// n := int(math.Min(float64(report.topN), float64(len(all)))) +// all = all[:n] +// } +// +// tw := tabwriter.NewWriter(report.Stdout, 4, 4, 1, '\t', 0) +// fmt.Fprintf(tw, "===============\nShard ID: %d\nPath: %s\nCardinality (exact): %d\n\n", id, report.shardPaths[id], totalCardinality) +// fmt.Fprint(tw, "Measurement\tCardinality (exact)\n\n") +// for _, card := range all { +// fmt.Fprintf(tw, "%q\t\t%d\n", card.name, card.cardinality()) +// } +// fmt.Fprint(tw, "===============\n\n") +// if err := tw.Flush(); err != nil { +// return err +// } +// fmt.Fprint(report.Stdout, "\n\n") +// return nil +//} + +func (report *ReportCommand) printOrgBucketCardinality() { + tw := tabwriter.NewWriter(report.Stdout, 4, 4, 1, '\t', 0) + //for k, v := range report.orgs { + // report.Logger.Error("calculating org") + // for _, j := range v { + // cCard := j.cardinality() + // totalCount = totalCount + j.cardinality() + // if influxdb.ID(k).String() == report.Org { + // orgCount += cCard + // } + // } + //} + // + //for k, v := range report.buckets { + // for _, j := range v { + // cCard := j.cardinality() + // if influxdb.ID(k).String() == report.Bucket { + // bucketCount += cCard + // } + // } + //} - var totalCardinality int64 - all := make(cardinalities, 0, len(allMap)) - for _, card := range allMap { - n := card.cardinality() - if n == 0 { - continue + totalCard := int64(0) + orgTotals := make(map[influxdb.ID]int64) + for org, orgToBucket := range report.orgBucketCardinality { + orgTotal := int64(0) + for _, bucketCard := range orgToBucket { + totalCard += bucketCard.cardinality() + orgTotal += bucketCard.cardinality() } - - totalCardinality += n - all = append(all, card) - report.Logger.Error("appended to all") + orgTotals[org] = orgTotal } - sort.Sort(sort.Reverse(all)) + fmt.Fprintf(tw, "Summary (total): %v \n\n", totalCard) - // Trim to top-n - if report.topN > 0 { - // There may not be "topN" measurement cardinality to sub-slice. - n := int(math.Min(float64(report.topN), float64(len(all)))) - all = all[:n] - } - report.Logger.Error("shard: " + strconv.Itoa(int(id)) + ", len " + strconv.Itoa(len(allMap))) - report.Logger.Error("shard: " + strconv.Itoa(int(id)) + ", path " + report.shardPaths[id]) + fmt.Println(report.orgBucketCardinality) - tw := tabwriter.NewWriter(report.Stdout, 4, 4, 1, '\t', 0) - fmt.Fprintf(tw, "===============\nShard ID: %d\nPath: %s\nCardinality (exact): %d\n\n", id, report.shardPaths[id], totalCardinality) - fmt.Fprint(tw, "Measurement\tCardinality (exact)\n\n") - for _, card := range all { - fmt.Fprintf(tw, "%q\t\t%d\n", card.name, card.cardinality()) - } - fmt.Fprint(tw, "===============\n\n") - if err := tw.Flush(); err != nil { - return err + for orgName, orgToBucket := range report.orgBucketCardinality { + fmt.Fprintf(tw, "Org %s total: %d \n\n", orgName.String(), orgTotals[orgName]) + for bucketName, bucketCard := range orgToBucket { + fmt.Fprintf(tw, " Bucket %s %d\n", bucketName.String(), bucketCard.cardinality()) + } } - fmt.Fprint(report.Stdout, "\n\n") - return nil } From 8f99d20debc163d7cc4bf0208a008fbb898aae9c Mon Sep 17 00:00:00 2001 From: Max U Date: Wed, 24 Jul 2019 11:07:50 -0700 Subject: [PATCH 05/18] feat(tsi1): port report-tsi tool to influxdb 2.x --- cmd/influxd/inspect/report_tsi1.go | 5 +- tsdb/tsi1/tsi1_report.go | 220 +++-------------------------- 2 files changed, 20 insertions(+), 205 deletions(-) diff --git a/cmd/influxd/inspect/report_tsi1.go b/cmd/influxd/inspect/report_tsi1.go index 7f70bc0a817..54fdb4eec09 100644 --- a/cmd/influxd/inspect/report_tsi1.go +++ b/cmd/influxd/inspect/report_tsi1.go @@ -52,7 +52,7 @@ func NewReportTsiCommand() *cobra.Command { reportTsiCommand.Flags().IntVar(&tsiFlags.topN, "top", 0, "Limit results to top n") reportTsiCommand.Flags().IntVar(&tsiFlags.concurrency, "c", runtime.GOMAXPROCS(0), "Set worker concurrency. Defaults to GOMAXPROCS setting.") reportTsiCommand.Flags().StringVar(&tsiFlags.bucket, "bucket", "", "If bucket is specified, org must be specified") - reportTsiCommand.Flags().StringVar(&tsiFlags.org, "org", "", "org to be searched") + reportTsiCommand.Flags().StringVar(&tsiFlags.org, "org", "", "Org to be reported") reportTsiCommand.SetOutput(tsiFlags.Stdout) @@ -65,8 +65,6 @@ func RunReportTsi(cmd *cobra.Command, args []string) error { config := logger.NewConfig() config.Level = zapcore.InfoLevel log, err := config.New(os.Stderr) - // do some filepath walking, we are looking for index files - //dir := os.Getenv("HOME") + "/.influxdbv2/engine/index" // if path is unset, set to os.Getenv("HOME") + "/.influxdbv2/engine" if tsiFlags.Path == "" { @@ -96,7 +94,6 @@ func RunReportTsi(cmd *cobra.Command, args []string) error { } } - report.Logger.Error("running report") err = report.Run() if err != nil { return err diff --git a/tsdb/tsi1/tsi1_report.go b/tsdb/tsi1/tsi1_report.go index 3a96a4992c7..6c4458194a8 100644 --- a/tsdb/tsi1/tsi1_report.go +++ b/tsdb/tsi1/tsi1_report.go @@ -47,7 +47,7 @@ type ReportCommand struct { Concurrency int } -// NewCommand returns a new instance of Command with default setting applied. +// NewReportCommand returns a new instance of ReportCommand with default setting applied. func NewReportCommand() *ReportCommand { return &ReportCommand{ Logger: zap.NewNop(), @@ -59,6 +59,8 @@ func NewReportCommand() *ReportCommand { } } +// Run initializes the orgBucketCardinality map which can be used to find the cardinality +// any org or bucket. Run() must be called before GetOrgCardinality() or GetOrgBucketCardinality() func (report *ReportCommand) Run() error { report.Stdout = os.Stdout @@ -89,21 +91,7 @@ func (report *ReportCommand) Run() error { report.calculateCardinalities(fn) // Print summary. - if err := report.printSummaryByMeasurement(); err != nil { - return err - } - - //allIDs := make([]uint64, 0, len(report.shardIdxs)) - //for id := range report.shardIdxs { - // allIDs = append(allIDs, id) - //} - //sort.Slice(allIDs, func(i int, j int) bool { return allIDs[i] < allIDs[j] }) - // - //for _, id := range allIDs { - // if err := report.printShardByMeasurement(id); err != nil { - // return err - // } - //} + report.printOrgBucketCardinality() return nil } @@ -111,32 +99,6 @@ func (report *ReportCommand) Run() error { // worked on concurrently. The provided function determines how cardinality is // calculated and broken down. func (report *ReportCommand) calculateCardinalities(fn func() error) error { - // Get list of shards to work on. - //shardIDs := make([]uint64, 0, len(report.shardIdxs)) - //for id := range report.shardIdxs { - // shardIDs = append(shardIDs, id) - //} - // - //errC := make(chan error, len(shardIDs)) - //var maxi uint32 // index of maximumm shard being worked on. - //for k := 0; k < report.Concurrency; k++ { - // go func() { - // for { - // i := int(atomic.AddUint32(&maxi, 1) - 1) // Get next partition to work on. - // if i >= len(shardIDs) { - // return // No more work. - // } - // errC <- fn(shardIDs[i]) - // } - // }() - //} - // - //// Check for error - //for i := 0; i < cap(errC); i++ { - // if err := <-errC; err != nil { - // return err - // } - //} if err := fn(); err != nil { return err } @@ -311,172 +273,28 @@ func (a results) Less(i, j int) bool { return a[i].count < a[j].count } func (a results) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (report *ReportCommand) printSummaryByMeasurement() error { - //Get global set of measurement names across shards. - //idxs := &tsdb.IndexSet{SeriesFile: report.sfile} - // for _, idx := range report.shardIdxs { - // idxs.Indexes = append(idxs.Indexes, idx) - // } - - // we are going to get a measurement iterator for each index - //var mitr tsdb.MeasurementIterator - //mitr, err := report.indexFile.MeasurementIterator() - //if err != nil { - // report.Logger.Error("got err") - // return err - //} - //defer mitr.Close() - - // for _, index := range report.shardIdxs { - // small, err := index.MeasurementIterator() - // name, _ := small.Next() - // report.Logger.Error("called small.Next1 " + strconv.Itoa(len(name))) - // name, _ = small.Next() - // report.Logger.Error("called small.Next2 " + strconv.Itoa(len(name))) - // name, _ = small.Next() - // report.Logger.Error("called small.Next3 " + strconv.Itoa(len(name))) - // if err != nil { - // return err - // } else if small == nil { - // return errors.New("got nil measurement iterator for index set") - // } - // //defer small.Close() - // // name, _ := small.Next() - // // report.Logger.Error("called small.Next " + string(name)) - // mitr = tsdb.MergeMeasurementIterators(mitr, small) - // count++ - // } - //defer mitr.Close() - // report.Logger.Error("calling mitr next") - // name, _ := mitr.Next() - // report.Logger.Error("mitr next: " + string(name)) - - //var name []byte - //var totalCardinality int64 - //measurements := results{} - //for name, err := mitr.Next(); err == nil && name != nil; name, err = mitr.Next() { - // res := &result{name: name} - // for _, shardCards := range report.cardinalities { - // other, ok := shardCards[string(name)] - // if !ok { - // continue // this shard doesn't have anything for this measurement. - // } - // - // if other.short != nil && other.set != nil { - // panic("cardinality stored incorrectly") - // } - // - // if other.short != nil { // low cardinality case - // res.addShort(other.short) - // } else if other.set != nil { // High cardinality case - // res.merge(other.set) - // } - // - // // Shard does not have any series for this measurement. - // } - // - // // Determine final cardinality and allow intermediate structures to be - // // GCd. - // if res.lowCardinality != nil { - // res.count = int64(len(res.lowCardinality)) - // } else { - // res.count = int64(res.set.Cardinality()) - // } - // totalCardinality += res.count - // res.set = nil - // res.lowCardinality = nil - // measurements = append(measurements, res) - //} - - // if err != nil { - // return err - // } - - // sort measurements by cardinality. - //sort.Sort(sort.Reverse(measurements)) - // - //if report.topN > 0 { - // // There may not be "topN" measurement cardinality to sub-slice. - // n := int(math.Min(float64(report.topN), float64(len(measurements)))) - // measurements = measurements[:n] - //} - - // tw := tabwriter.NewWriter(report.Stdout, 4, 4, 1, '\t', 0) - // fmt.Fprintf(tw, "Summary\nDatabase Path: %s\nCardinality (exact): %d\n\n", report.Path, totalCardinality) - // fmt.Fprint(tw, "Measurement\tCardinality (exact)\n\n") - // for _, res := range measurements { - // fmt.Fprintf(tw, "%q\t\t%d\n", res.name, res.count) - // } - - // if err := tw.Flush(); err != nil { - // return err - // } - // fmt.Fprint(report.Stdout, "\n\n") report.printOrgBucketCardinality() return nil } -//func (report *ReportCommand) printShardByMeasurement(id uint64) error { -// allMap, ok := report.cardinalities[id] -// if !ok { -// return nil -// } -// -// var totalCardinality int64 -// all := make(cardinalities, 0, len(allMap)) -// for _, card := range allMap { -// n := card.cardinality() -// if n == 0 { -// continue -// } -// -// totalCardinality += n -// all = append(all, card) -// } -// -// sort.Sort(sort.Reverse(all)) -// -// // Trim to top-n -// if report.topN > 0 { -// // There may not be "topN" measurement cardinality to sub-slice. -// n := int(math.Min(float64(report.topN), float64(len(all)))) -// all = all[:n] -// } -// -// tw := tabwriter.NewWriter(report.Stdout, 4, 4, 1, '\t', 0) -// fmt.Fprintf(tw, "===============\nShard ID: %d\nPath: %s\nCardinality (exact): %d\n\n", id, report.shardPaths[id], totalCardinality) -// fmt.Fprint(tw, "Measurement\tCardinality (exact)\n\n") -// for _, card := range all { -// fmt.Fprintf(tw, "%q\t\t%d\n", card.name, card.cardinality()) -// } -// fmt.Fprint(tw, "===============\n\n") -// if err := tw.Flush(); err != nil { -// return err -// } -// fmt.Fprint(report.Stdout, "\n\n") -// return nil -//} +// GetOrgCardinality returns the total cardinality of the org provided. +// Can only be called after Run(). +func (report *ReportCommand) GetOrgCardinality(orgID influxdb.ID) int64 { + orgTotal := int64(0) + for _, bucket := range report.orgBucketCardinality[orgID] { + orgTotal += bucket.cardinality() + } + return orgTotal +} + +// GetBucketCardinality returns the total cardinality of the bucket in the org provided +// Can only be called after Run() +func (report *ReportCommand) GetBucketCardinality(orgID, bucketID influxdb.ID) int64 { + return report.orgBucketCardinality[orgID][bucketID].cardinality() +} func (report *ReportCommand) printOrgBucketCardinality() { tw := tabwriter.NewWriter(report.Stdout, 4, 4, 1, '\t', 0) - //for k, v := range report.orgs { - // report.Logger.Error("calculating org") - // for _, j := range v { - // cCard := j.cardinality() - // totalCount = totalCount + j.cardinality() - // if influxdb.ID(k).String() == report.Org { - // orgCount += cCard - // } - // } - //} - // - //for k, v := range report.buckets { - // for _, j := range v { - // cCard := j.cardinality() - // if influxdb.ID(k).String() == report.Bucket { - // bucketCount += cCard - // } - // } - //} totalCard := int64(0) orgTotals := make(map[influxdb.ID]int64) From bfd38d93d87cc6a7685296658e96ed8c05643418 Mon Sep 17 00:00:00 2001 From: Max U Date: Wed, 24 Jul 2019 11:47:37 -0700 Subject: [PATCH 06/18] feat(tsi1): provide API tooling for use in testing --- cmd/influxd/inspect/report_tsi1.go | 6 ++-- tsdb/tsi1/tsi1_report.go | 54 +++++++++++++++++++++--------- 2 files changed, 41 insertions(+), 19 deletions(-) diff --git a/cmd/influxd/inspect/report_tsi1.go b/cmd/influxd/inspect/report_tsi1.go index 54fdb4eec09..4b8890666ce 100644 --- a/cmd/influxd/inspect/report_tsi1.go +++ b/cmd/influxd/inspect/report_tsi1.go @@ -15,7 +15,7 @@ import ( "go.uber.org/zap/zapcore" ) -// Command represents the program execution for "influxd reporttsi". +// Command represents the program execution for "influxd inspect report-tsi". var tsiFlags = struct { // Standard input/output, overridden for testing. Stderr io.Writer @@ -88,13 +88,13 @@ func RunReportTsi(cmd *cobra.Command, args []string) error { if bucketID, err := influxdb.IDFromString(tsiFlags.bucket); err != nil { return err } else if report.OrgID == nil { - return errors.New("org must be provided if filtering by bucket ") + return errors.New("org must be provided if filtering by bucket") } else { report.BucketID = bucketID } } - err = report.Run() + _, err = report.Run() if err != nil { return err } diff --git a/tsdb/tsi1/tsi1_report.go b/tsdb/tsi1/tsi1_report.go index 6c4458194a8..24638178bf3 100644 --- a/tsdb/tsi1/tsi1_report.go +++ b/tsdb/tsi1/tsi1_report.go @@ -59,9 +59,23 @@ func NewReportCommand() *ReportCommand { } } -// Run initializes the orgBucketCardinality map which can be used to find the cardinality -// any org or bucket. Run() must be called before GetOrgCardinality() or GetOrgBucketCardinality() -func (report *ReportCommand) Run() error { +// ReportTsiSummary is returned by a report-tsi Run() command and is used to access cardinality information +type ReportTsiSummary struct { + OrgCardinality map[influxdb.ID]int64 + BucketCardinality map[influxdb.ID]int64 +} + +func newTsiSummary() *ReportTsiSummary { + return &ReportTsiSummary{ + OrgCardinality: map[influxdb.ID]int64{}, + BucketCardinality: map[influxdb.ID]int64{}, + } +} + +// Run runs the report-tsi tool which can be used to find the cardinality +// any org or bucket. Run returns a *ReportTsiSummary, which contains maps for finding +// the cardinality of a bucket or org based on it's influxdb.ID +func (report *ReportCommand) Run() (*ReportTsiSummary, error) { report.Stdout = os.Stdout if report.SeriesDirPath == "" { @@ -72,7 +86,7 @@ func (report *ReportCommand) Run() error { sFile.WithLogger(report.Logger) if err := sFile.Open(context.Background()); err != nil { report.Logger.Error("failed to open series") - return err + return nil, err } defer sFile.Close() report.sfile = sFile @@ -80,7 +94,7 @@ func (report *ReportCommand) Run() error { path := filepath.Join(report.DataPath, "index") report.indexFile = NewIndex(sFile, NewConfig(), WithPath(path)) if err := report.indexFile.Open(context.Background()); err != nil { - return err + return nil, err } defer report.indexFile.Close() @@ -90,9 +104,9 @@ func (report *ReportCommand) Run() error { // Blocks until all work done. report.calculateCardinalities(fn) - // Print summary. - report.printOrgBucketCardinality() - return nil + // Generate and print summary. + summary := report.printOrgBucketCardinality() + return summary, nil } // calculateCardinalities calculates the cardinalities of the set of shard being @@ -272,10 +286,10 @@ func (a results) Len() int { return len(a) } func (a results) Less(i, j int) bool { return a[i].count < a[j].count } func (a results) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (report *ReportCommand) printSummaryByMeasurement() error { - report.printOrgBucketCardinality() - return nil -} +// func (report *ReportCommand) printSummaryByMeasurement() error { +// report.printOrgBucketCardinality() +// return nil +// } // GetOrgCardinality returns the total cardinality of the org provided. // Can only be called after Run(). @@ -293,18 +307,24 @@ func (report *ReportCommand) GetBucketCardinality(orgID, bucketID influxdb.ID) i return report.orgBucketCardinality[orgID][bucketID].cardinality() } -func (report *ReportCommand) printOrgBucketCardinality() { +func (report *ReportCommand) printOrgBucketCardinality() *ReportTsiSummary { tw := tabwriter.NewWriter(report.Stdout, 4, 4, 1, '\t', 0) + // Generate a new summary + summary := newTsiSummary() + totalCard := int64(0) orgTotals := make(map[influxdb.ID]int64) for org, orgToBucket := range report.orgBucketCardinality { orgTotal := int64(0) - for _, bucketCard := range orgToBucket { - totalCard += bucketCard.cardinality() - orgTotal += bucketCard.cardinality() + for bucketID, bucketCard := range orgToBucket { + c := bucketCard.cardinality() + totalCard += c + orgTotal += c + summary.BucketCardinality[bucketID] = c } orgTotals[org] = orgTotal + summary.OrgCardinality[org] = orgTotal } fmt.Fprintf(tw, "Summary (total): %v \n\n", totalCard) @@ -317,4 +337,6 @@ func (report *ReportCommand) printOrgBucketCardinality() { fmt.Fprintf(tw, " Bucket %s %d\n", bucketName.String(), bucketCard.cardinality()) } } + + return summary } From 5e5fa96c5b2f0cdee4b6b2d4dfc0c18562e75f1e Mon Sep 17 00:00:00 2001 From: Max U Date: Wed, 24 Jul 2019 14:34:47 -0700 Subject: [PATCH 07/18] feat(tsi1): add flags for --org-id and --bucket-id --- cmd/influxd/inspect/report_tsi1.go | 4 +-- tsdb/tsi1/tsi1_report.go | 50 ++++++++++++++++++++---------- 2 files changed, 35 insertions(+), 19 deletions(-) diff --git a/cmd/influxd/inspect/report_tsi1.go b/cmd/influxd/inspect/report_tsi1.go index 4b8890666ce..80e3dcda356 100644 --- a/cmd/influxd/inspect/report_tsi1.go +++ b/cmd/influxd/inspect/report_tsi1.go @@ -51,8 +51,8 @@ func NewReportTsiCommand() *cobra.Command { // fs.BoolVar(&cmd.byTagKey, "tag-key", false, "Segment cardinality by tag keys (overrides `measurements`") reportTsiCommand.Flags().IntVar(&tsiFlags.topN, "top", 0, "Limit results to top n") reportTsiCommand.Flags().IntVar(&tsiFlags.concurrency, "c", runtime.GOMAXPROCS(0), "Set worker concurrency. Defaults to GOMAXPROCS setting.") - reportTsiCommand.Flags().StringVar(&tsiFlags.bucket, "bucket", "", "If bucket is specified, org must be specified") - reportTsiCommand.Flags().StringVar(&tsiFlags.org, "org", "", "Org to be reported") + reportTsiCommand.Flags().StringVarP(&tsiFlags.bucket, "bucket", "b", "", "If bucket is specified, org must be specified") + reportTsiCommand.Flags().StringVarP(&tsiFlags.org, "org", "o", "", "Org to be reported") reportTsiCommand.SetOutput(tsiFlags.Stdout) diff --git a/tsdb/tsi1/tsi1_report.go b/tsdb/tsi1/tsi1_report.go index 24638178bf3..45bbec01809 100644 --- a/tsdb/tsi1/tsi1_report.go +++ b/tsdb/tsi1/tsi1_report.go @@ -75,6 +75,7 @@ func newTsiSummary() *ReportTsiSummary { // Run runs the report-tsi tool which can be used to find the cardinality // any org or bucket. Run returns a *ReportTsiSummary, which contains maps for finding // the cardinality of a bucket or org based on it's influxdb.ID +// The *ReportTsiSummary will be nil if there is a failure func (report *ReportCommand) Run() (*ReportTsiSummary, error) { report.Stdout = os.Stdout @@ -104,8 +105,26 @@ func (report *ReportCommand) Run() (*ReportTsiSummary, error) { // Blocks until all work done. report.calculateCardinalities(fn) - // Generate and print summary. - summary := report.printOrgBucketCardinality() + // Generate and print summary + var summary *ReportTsiSummary + tw := tabwriter.NewWriter(report.Stdout, 4, 4, 1, '\t', 0) + + // if no org or bucket flags have been specified, print everything + // if not, only print the specified org/bucket + if report.OrgID == nil { + summary = report.printOrgBucketCardinality(true) + } else { + // still need to generate a summary, just without printing + summary = report.printOrgBucketCardinality(false) + + // if we do not have a bucket, print the cardinality of OrgID + if report.BucketID == nil { + fmt.Fprintf(tw, "Org (%v) Cardinality: %v \n\n", report.OrgID, summary.OrgCardinality[*report.OrgID]) + } else { + fmt.Fprintf(tw, "Bucket (%v) Cardinality: %v \n\n", report.BucketID, summary.BucketCardinality[*report.BucketID]) + } + tw.Flush() + } return summary, nil } @@ -286,14 +305,9 @@ func (a results) Len() int { return len(a) } func (a results) Less(i, j int) bool { return a[i].count < a[j].count } func (a results) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -// func (report *ReportCommand) printSummaryByMeasurement() error { -// report.printOrgBucketCardinality() -// return nil -// } - // GetOrgCardinality returns the total cardinality of the org provided. -// Can only be called after Run(). -func (report *ReportCommand) GetOrgCardinality(orgID influxdb.ID) int64 { +// Can only be called after Run() +func (report *ReportCommand) printOrgCardinality(orgID influxdb.ID) int64 { orgTotal := int64(0) for _, bucket := range report.orgBucketCardinality[orgID] { orgTotal += bucket.cardinality() @@ -303,11 +317,11 @@ func (report *ReportCommand) GetOrgCardinality(orgID influxdb.ID) int64 { // GetBucketCardinality returns the total cardinality of the bucket in the org provided // Can only be called after Run() -func (report *ReportCommand) GetBucketCardinality(orgID, bucketID influxdb.ID) int64 { +func (report *ReportCommand) printBucketCardinality(orgID, bucketID influxdb.ID) int64 { return report.orgBucketCardinality[orgID][bucketID].cardinality() } -func (report *ReportCommand) printOrgBucketCardinality() *ReportTsiSummary { +func (report *ReportCommand) printOrgBucketCardinality(print bool) *ReportTsiSummary { tw := tabwriter.NewWriter(report.Stdout, 4, 4, 1, '\t', 0) // Generate a new summary @@ -327,14 +341,16 @@ func (report *ReportCommand) printOrgBucketCardinality() *ReportTsiSummary { summary.OrgCardinality[org] = orgTotal } - fmt.Fprintf(tw, "Summary (total): %v \n\n", totalCard) + if print { + fmt.Fprintf(tw, "Summary (total): %v \n\n", totalCard) - fmt.Println(report.orgBucketCardinality) + fmt.Println(report.orgBucketCardinality) - for orgName, orgToBucket := range report.orgBucketCardinality { - fmt.Fprintf(tw, "Org %s total: %d \n\n", orgName.String(), orgTotals[orgName]) - for bucketName, bucketCard := range orgToBucket { - fmt.Fprintf(tw, " Bucket %s %d\n", bucketName.String(), bucketCard.cardinality()) + for orgName, orgToBucket := range report.orgBucketCardinality { + fmt.Fprintf(tw, "Org %s total: %d \n\n", orgName.String(), summary.OrgCardinality[orgName]) + for bucketName := range orgToBucket { + fmt.Fprintf(tw, " Bucket %s %d\n", bucketName.String(), summary.BucketCardinality[bucketName]) + } } } From 32b283d25ae7384d38e7f2e64685532d2156257d Mon Sep 17 00:00:00 2001 From: Adam Perlin Date: Thu, 25 Jul 2019 10:38:14 -0700 Subject: [PATCH 08/18] feat(tsi1/report): Add ability to filter by measurement; add additional maps for efficient retrieval of total org/bucket cardinalities --- cmd/influxd/inspect/report_tsi1.go | 9 +- tsdb/tsi1/tsi1_report.go | 235 ++++++++++++++++------------- 2 files changed, 136 insertions(+), 108 deletions(-) diff --git a/cmd/influxd/inspect/report_tsi1.go b/cmd/influxd/inspect/report_tsi1.go index 80e3dcda356..b4d668bfde7 100644 --- a/cmd/influxd/inspect/report_tsi1.go +++ b/cmd/influxd/inspect/report_tsi1.go @@ -4,6 +4,7 @@ import ( "errors" "io" "os" + "path" "runtime" "github.com/influxdata/influxdb" @@ -47,7 +48,6 @@ func NewReportTsiCommand() *cobra.Command { reportTsiCommand.Flags().StringVar(&tsiFlags.Path, "path", os.Getenv("HOME")+"/.influxdbv2/engine", "Path to data engine. Defaults $HOME/.influxdbv2/engine") reportTsiCommand.Flags().StringVar(&tsiFlags.seriesFilePath, "series-file", "", "Optional path to series file. Defaults /path/to/db-path/_series") reportTsiCommand.Flags().BoolVar(&tsiFlags.byMeasurement, "measurements", true, "Segment cardinality by measurements") - // TODO(edd): Not yet implemented. // fs.BoolVar(&cmd.byTagKey, "tag-key", false, "Segment cardinality by tag keys (overrides `measurements`") reportTsiCommand.Flags().IntVar(&tsiFlags.topN, "top", 0, "Limit results to top n") reportTsiCommand.Flags().IntVar(&tsiFlags.concurrency, "c", runtime.GOMAXPROCS(0), "Set worker concurrency. Defaults to GOMAXPROCS setting.") @@ -66,15 +66,16 @@ func RunReportTsi(cmd *cobra.Command, args []string) error { config.Level = zapcore.InfoLevel log, err := config.New(os.Stderr) - // if path is unset, set to os.Getenv("HOME") + "/.influxdbv2/engine" + // if path is unset, set to $HOME/.influxdbv2/engine" if tsiFlags.Path == "" { - tsiFlags.Path = os.Getenv("HOME") + "/.influxdbv2/engine" + tsiFlags.Path = path.Join(os.Getenv("HOME"), ".influxdbv2/engine") } report := tsi1.NewReportCommand() report.Concurrency = tsiFlags.concurrency report.DataPath = tsiFlags.Path report.Logger = log + report.ByMeasurement = tsiFlags.byMeasurement if tsiFlags.org != "" { if orgID, err := influxdb.IDFromString(tsiFlags.org); err != nil { @@ -94,7 +95,7 @@ func RunReportTsi(cmd *cobra.Command, args []string) error { } } - _, err = report.Run() + _, err = report.Run(true) if err != nil { return err } diff --git a/tsdb/tsi1/tsi1_report.go b/tsdb/tsi1/tsi1_report.go index 45bbec01809..bc3a49e71dc 100644 --- a/tsdb/tsi1/tsi1_report.go +++ b/tsdb/tsi1/tsi1_report.go @@ -32,15 +32,17 @@ type ReportCommand struct { DataPath string OrgID, BucketID *influxdb.ID - // Maps org and bucket IDs with measurement name - orgBucketCardinality map[influxdb.ID]map[influxdb.ID]*cardinality + byOrg map[influxdb.ID]*cardinality + byBucket map[influxdb.ID]*cardinality + byBucketMeasurement map[influxdb.ID]map[string]*cardinality + orgToBucket map[influxdb.ID][]influxdb.ID SeriesDirPath string // optional. Defaults to dbPath/_series sfile *tsdb.SeriesFile indexFile *Index topN int - byMeasurement bool + ByMeasurement bool byTagKey bool // How many goroutines to dedicate to calculating cardinality. @@ -50,33 +52,37 @@ type ReportCommand struct { // NewReportCommand returns a new instance of ReportCommand with default setting applied. func NewReportCommand() *ReportCommand { return &ReportCommand{ - Logger: zap.NewNop(), - orgBucketCardinality: make(map[influxdb.ID]map[influxdb.ID]*cardinality), - topN: 0, - byMeasurement: true, - byTagKey: false, - Concurrency: runtime.GOMAXPROCS(0), + Logger: zap.NewNop(), + byOrg: make(map[influxdb.ID]*cardinality), + byBucket: make(map[influxdb.ID]*cardinality), + byBucketMeasurement: make(map[influxdb.ID]map[string]*cardinality), + orgToBucket: make(map[influxdb.ID][]influxdb.ID), + topN: 0, + byTagKey: false, + Concurrency: runtime.GOMAXPROCS(0), } } -// ReportTsiSummary is returned by a report-tsi Run() command and is used to access cardinality information -type ReportTsiSummary struct { - OrgCardinality map[influxdb.ID]int64 - BucketCardinality map[influxdb.ID]int64 +// ReportTSISummary is returned by a report-tsi Run() command and is used to access cardinality information +type Summary struct { + OrgCardinality map[influxdb.ID]int64 + BucketCardinality map[influxdb.ID]int64 + BucketMeasurementCardinality map[influxdb.ID]map[string]int64 } -func newTsiSummary() *ReportTsiSummary { - return &ReportTsiSummary{ - OrgCardinality: map[influxdb.ID]int64{}, - BucketCardinality: map[influxdb.ID]int64{}, +func newSummary() *Summary { + return &Summary{ + OrgCardinality: make(map[influxdb.ID]int64), + BucketCardinality: make(map[influxdb.ID]int64), + BucketMeasurementCardinality: make(map[influxdb.ID]map[string]int64), } } // Run runs the report-tsi tool which can be used to find the cardinality -// any org or bucket. Run returns a *ReportTsiSummary, which contains maps for finding +// any org or bucket. Run returns a *ReportTSISummary, which contains maps for finding // the cardinality of a bucket or org based on it's influxdb.ID -// The *ReportTsiSummary will be nil if there is a failure -func (report *ReportCommand) Run() (*ReportTsiSummary, error) { +// The *ReportTSISummary will be nil if there is a failure +func (report *ReportCommand) Run(print bool) (*Summary, error) { report.Stdout = os.Stdout if report.SeriesDirPath == "" { @@ -92,50 +98,23 @@ func (report *ReportCommand) Run() (*ReportTsiSummary, error) { defer sFile.Close() report.sfile = sFile - path := filepath.Join(report.DataPath, "index") - report.indexFile = NewIndex(sFile, NewConfig(), WithPath(path)) + indexPath := filepath.Join(report.DataPath, "index") + report.indexFile = NewIndex(sFile, NewConfig(), WithPath(indexPath)) if err := report.indexFile.Open(context.Background()); err != nil { return nil, err } defer report.indexFile.Close() - // Calculate cardinalities for every org and bucket - fn := report.cardinalityByMeasurement - - // Blocks until all work done. - report.calculateCardinalities(fn) - - // Generate and print summary - var summary *ReportTsiSummary - tw := tabwriter.NewWriter(report.Stdout, 4, 4, 1, '\t', 0) - - // if no org or bucket flags have been specified, print everything - // if not, only print the specified org/bucket - if report.OrgID == nil { - summary = report.printOrgBucketCardinality(true) - } else { - // still need to generate a summary, just without printing - summary = report.printOrgBucketCardinality(false) - - // if we do not have a bucket, print the cardinality of OrgID - if report.BucketID == nil { - fmt.Fprintf(tw, "Org (%v) Cardinality: %v \n\n", report.OrgID, summary.OrgCardinality[*report.OrgID]) - } else { - fmt.Fprintf(tw, "Bucket (%v) Cardinality: %v \n\n", report.BucketID, summary.BucketCardinality[*report.BucketID]) - } - tw.Flush() + summary, err := report.calculateOrgBucketCardinality() + if err != nil { + return nil, err } - return summary, nil -} -// calculateCardinalities calculates the cardinalities of the set of shard being -// worked on concurrently. The provided function determines how cardinality is -// calculated and broken down. -func (report *ReportCommand) calculateCardinalities(fn func() error) error { - if err := fn(); err != nil { - return err + if print { + report.printCardinalitySummary(summary) } - return nil + + return summary, nil } type cardinality struct { @@ -180,7 +159,7 @@ func (a cardinalities) Len() int { return len(a) } func (a cardinalities) Less(i, j int) bool { return a[i].cardinality() < a[j].cardinality() } func (a cardinalities) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (report *ReportCommand) cardinalityByMeasurement() error { +func (report *ReportCommand) calculateCardinalities() error { idx := report.indexFile itr, err := idx.MeasurementIterator() if err != nil { @@ -190,6 +169,7 @@ func (report *ReportCommand) cardinalityByMeasurement() error { } defer itr.Close() + var totalCard = &cardinality{name: []byte("total")} OUTER: for { name, err := itr.Next() @@ -199,7 +179,7 @@ OUTER: break OUTER } - var a [16]byte // TODO(edd) if this shows up we can use a different API to DecodeName. + var a [16]byte copy(a[:], name[:16]) org, bucket := tsdb.DecodeName(a) @@ -216,28 +196,66 @@ OUTER: continue } - // initialize map of bucket to cardinality - if _, ok := report.orgBucketCardinality[org]; !ok { - report.orgBucketCardinality[org] = make(map[influxdb.ID]*cardinality) + var orgCard, bucketCard *cardinality + + // initialize map of bucket to measurements + if _, ok := report.byBucketMeasurement[bucket]; !ok { + report.byBucketMeasurement[bucket] = make(map[string]*cardinality) + } + + if c, ok := report.byBucket[bucket]; !ok { + bucketCard = &cardinality{name: []byte(bucket.String())} + report.byBucket[bucket] = bucketCard + } else { + bucketCard = c } - var card *cardinality - if c, ok := report.orgBucketCardinality[org][bucket]; !ok { - card = &cardinality{name: []byte(bucket.String())} - report.orgBucketCardinality[org][bucket] = card + if c, ok := report.byOrg[org]; !ok { + orgCard = &cardinality{name: []byte(bucket.String())} + report.byOrg[org] = orgCard } else { - card = c + orgCard = c + } + + if _, ok := report.orgToBucket[org]; !ok { + report.orgToBucket[org] = []influxdb.ID{} } + report.orgToBucket[org] = append(report.orgToBucket[org], bucket) + var e tsdb.SeriesIDElem for e, err = sitr.Next(); err == nil && e.SeriesID.ID != 0; e, err = sitr.Next() { id := e.SeriesID.ID + if id > math.MaxUint32 { panic(fmt.Sprintf("series ID is too large: %d (max %d). Corrupted series file?", e.SeriesID, uint32(math.MaxUint32))) } - // note: first tag in array (from sfile.Series(id) is measurement - card.add(id) + totalCard.add(id) + + // add cardinalities to org and bucket maps + orgCard.add(id) + bucketCard.add(id) + + _, tags := report.sfile.Series(e.SeriesID) + if len(tags) == 0 { + panic(fmt.Sprintf("series key too short")) + } + + mName := string(tags[0].Value) // measurement name should be first tag. + fmt.Println("") + + if report.ByMeasurement { + var mCard *cardinality + if cardForM, ok := report.byBucketMeasurement[bucket][mName]; !ok { + mCard = &cardinality{name: []byte(mName)} + report.byBucketMeasurement[bucket][mName] = mCard + } else { + mCard = cardForM + } + + mCard.add(id) + } } sitr.Close() @@ -245,6 +263,7 @@ OUTER: if err != nil { return err } + } return nil } @@ -299,6 +318,7 @@ func (r *result) merge(other *tsdb.SeriesIDSet) { r.set.Merge(other) } +// TODO: remove... not needed (though possibly if we add concurrency) type results []*result func (a results) Len() int { return len(a) } @@ -306,53 +326,60 @@ func (a results) Less(i, j int) bool { return a[i].count < a[j].count } func (a results) Swap(i, j int) { a[i], a[j] = a[j], a[i] } // GetOrgCardinality returns the total cardinality of the org provided. -// Can only be called after Run() -func (report *ReportCommand) printOrgCardinality(orgID influxdb.ID) int64 { - orgTotal := int64(0) - for _, bucket := range report.orgBucketCardinality[orgID] { - orgTotal += bucket.cardinality() - } - return orgTotal -} +//// Can only be called after Run() +//func (report *ReportCommand) printOrgCardinality(orgID influxdb.ID) int64 { +// orgTotal := int64(0) +// for _, bucket := range report.orgBucketCardinality[orgID] { +// orgTotal += bucket.cardinality() +// } +// return orgTotal +//} // GetBucketCardinality returns the total cardinality of the bucket in the org provided -// Can only be called after Run() -func (report *ReportCommand) printBucketCardinality(orgID, bucketID influxdb.ID) int64 { - return report.orgBucketCardinality[orgID][bucketID].cardinality() -} - -func (report *ReportCommand) printOrgBucketCardinality(print bool) *ReportTsiSummary { - tw := tabwriter.NewWriter(report.Stdout, 4, 4, 1, '\t', 0) +//// Can only be called after Run() +//func (report *ReportCommand) printBucketCardinality(orgID, bucketID influxdb.ID) int64 { +// return report.orgBucketCardinality[orgID][bucketID].cardinality() +//} +func (report *ReportCommand) calculateOrgBucketCardinality() (*Summary, error) { + if err := report.calculateCardinalities(); err != nil { + return nil, err + } // Generate a new summary - summary := newTsiSummary() - - totalCard := int64(0) - orgTotals := make(map[influxdb.ID]int64) - for org, orgToBucket := range report.orgBucketCardinality { - orgTotal := int64(0) - for bucketID, bucketCard := range orgToBucket { - c := bucketCard.cardinality() - totalCard += c - orgTotal += c - summary.BucketCardinality[bucketID] = c + summary := newSummary() + for bucketID, bucketCard := range report.byBucket { + summary.BucketCardinality[bucketID] = bucketCard.cardinality() + summary.BucketMeasurementCardinality[bucketID] = make(map[string]int64) + } + + for orgID, orgCard := range report.byOrg { + summary.OrgCardinality[orgID] = orgCard.cardinality() + } + + for bucketID, bucketMeasurement := range report.byBucketMeasurement { + for mName, mCard := range bucketMeasurement { + summary.BucketMeasurementCardinality[bucketID][mName] = mCard.cardinality() } - orgTotals[org] = orgTotal - summary.OrgCardinality[org] = orgTotal } - if print { - fmt.Fprintf(tw, "Summary (total): %v \n\n", totalCard) + return summary, nil +} + +func (report *ReportCommand) printCardinalitySummary(summary *Summary) { + tw := tabwriter.NewWriter(report.Stdout, 4, 4, 1, '\t', 0) - fmt.Println(report.orgBucketCardinality) + for orgID, orgCard := range summary.OrgCardinality { + fmt.Fprintf(tw, "Org %s total: %d\n", orgID.String(), orgCard) - for orgName, orgToBucket := range report.orgBucketCardinality { - fmt.Fprintf(tw, "Org %s total: %d \n\n", orgName.String(), summary.OrgCardinality[orgName]) - for bucketName := range orgToBucket { - fmt.Fprintf(tw, " Bucket %s %d\n", bucketName.String(), summary.BucketCardinality[bucketName]) + for _, bucketID := range report.orgToBucket[orgID] { + fmt.Fprintf(tw, "\tBucket %s total: %d\n", bucketID.String(), summary.BucketCardinality[bucketID]) + if report.ByMeasurement { + for mName, mCard := range summary.BucketMeasurementCardinality[bucketID] { + fmt.Fprintf(tw, "\t\t_m=%s\t%d\n", mName, mCard) + } } } } - return summary + tw.Flush() } From aa2f7a8ff7ea57b29e67cf91d56970ed85f488e6 Mon Sep 17 00:00:00 2001 From: Max U Date: Thu, 25 Jul 2019 15:12:17 -0700 Subject: [PATCH 09/18] feat(tsi1): add a --top flag for limiting output, output now sorted --- cmd/influxd/inspect/report_tsi1.go | 3 +- tsdb/tsi1/tsi1_report.go | 148 +++++++++++++---------------- 2 files changed, 68 insertions(+), 83 deletions(-) diff --git a/cmd/influxd/inspect/report_tsi1.go b/cmd/influxd/inspect/report_tsi1.go index b4d668bfde7..18a265c5bea 100644 --- a/cmd/influxd/inspect/report_tsi1.go +++ b/cmd/influxd/inspect/report_tsi1.go @@ -47,7 +47,7 @@ func NewReportTsiCommand() *cobra.Command { } reportTsiCommand.Flags().StringVar(&tsiFlags.Path, "path", os.Getenv("HOME")+"/.influxdbv2/engine", "Path to data engine. Defaults $HOME/.influxdbv2/engine") reportTsiCommand.Flags().StringVar(&tsiFlags.seriesFilePath, "series-file", "", "Optional path to series file. Defaults /path/to/db-path/_series") - reportTsiCommand.Flags().BoolVar(&tsiFlags.byMeasurement, "measurements", true, "Segment cardinality by measurements") + reportTsiCommand.Flags().BoolVarP(&tsiFlags.byMeasurement, "measurements", "m", false, "Segment cardinality by measurements") // fs.BoolVar(&cmd.byTagKey, "tag-key", false, "Segment cardinality by tag keys (overrides `measurements`") reportTsiCommand.Flags().IntVar(&tsiFlags.topN, "top", 0, "Limit results to top n") reportTsiCommand.Flags().IntVar(&tsiFlags.concurrency, "c", runtime.GOMAXPROCS(0), "Set worker concurrency. Defaults to GOMAXPROCS setting.") @@ -76,6 +76,7 @@ func RunReportTsi(cmd *cobra.Command, args []string) error { report.DataPath = tsiFlags.Path report.Logger = log report.ByMeasurement = tsiFlags.byMeasurement + report.TopN = tsiFlags.topN if tsiFlags.org != "" { if orgID, err := influxdb.IDFromString(tsiFlags.org); err != nil { diff --git a/tsdb/tsi1/tsi1_report.go b/tsdb/tsi1/tsi1_report.go index bc3a49e71dc..001742ff287 100644 --- a/tsdb/tsi1/tsi1_report.go +++ b/tsdb/tsi1/tsi1_report.go @@ -8,6 +8,7 @@ import ( "os" "path/filepath" "runtime" + "sort" "text/tabwriter" "github.com/influxdata/influxdb" @@ -41,7 +42,7 @@ type ReportCommand struct { sfile *tsdb.SeriesFile indexFile *Index - topN int + TopN int ByMeasurement bool byTagKey bool @@ -57,7 +58,7 @@ func NewReportCommand() *ReportCommand { byBucket: make(map[influxdb.ID]*cardinality), byBucketMeasurement: make(map[influxdb.ID]map[string]*cardinality), orgToBucket: make(map[influxdb.ID][]influxdb.ID), - topN: 0, + TopN: 0, byTagKey: false, Concurrency: runtime.GOMAXPROCS(0), } @@ -243,7 +244,6 @@ OUTER: } mName := string(tags[0].Value) // measurement name should be first tag. - fmt.Println("") if report.ByMeasurement { var mCard *cardinality @@ -268,79 +268,6 @@ OUTER: return nil } -type result struct { - name []byte - count int64 - - // For low cardinality measurements just track series using map - lowCardinality map[uint32]struct{} - - // For higher cardinality measurements track using bitmap. - set *tsdb.SeriesIDSet -} - -func (r *result) addShort(ids []uint32) { - // There is already a bitset of this result. - if r.set != nil { - for _, id := range ids { - r.set.AddNoLock(tsdb.NewSeriesID(uint64(id))) - } - return - } - - // Still tracking low cardinality sets - if r.lowCardinality == nil { - r.lowCardinality = map[uint32]struct{}{} - } - - for _, id := range ids { - r.lowCardinality[id] = struct{}{} - } - - // Cardinality is large enough that we will benefit from using a bitmap - if len(r.lowCardinality) > useBitmapN { - r.set = tsdb.NewSeriesIDSet() - for id := range r.lowCardinality { - r.set.AddNoLock(tsdb.NewSeriesID(uint64(id))) - } - r.lowCardinality = nil - } -} - -func (r *result) merge(other *tsdb.SeriesIDSet) { - if r.set == nil { - r.set = tsdb.NewSeriesIDSet() - for id := range r.lowCardinality { - r.set.AddNoLock(tsdb.NewSeriesID(uint64(id))) - } - r.lowCardinality = nil - } - r.set.Merge(other) -} - -// TODO: remove... not needed (though possibly if we add concurrency) -type results []*result - -func (a results) Len() int { return len(a) } -func (a results) Less(i, j int) bool { return a[i].count < a[j].count } -func (a results) Swap(i, j int) { a[i], a[j] = a[j], a[i] } - -// GetOrgCardinality returns the total cardinality of the org provided. -//// Can only be called after Run() -//func (report *ReportCommand) printOrgCardinality(orgID influxdb.ID) int64 { -// orgTotal := int64(0) -// for _, bucket := range report.orgBucketCardinality[orgID] { -// orgTotal += bucket.cardinality() -// } -// return orgTotal -//} - -// GetBucketCardinality returns the total cardinality of the bucket in the org provided -//// Can only be called after Run() -//func (report *ReportCommand) printBucketCardinality(orgID, bucketID influxdb.ID) int64 { -// return report.orgBucketCardinality[orgID][bucketID].cardinality() -//} - func (report *ReportCommand) calculateOrgBucketCardinality() (*Summary, error) { if err := report.calculateCardinalities(); err != nil { return nil, err @@ -367,19 +294,76 @@ func (report *ReportCommand) calculateOrgBucketCardinality() (*Summary, error) { func (report *ReportCommand) printCardinalitySummary(summary *Summary) { tw := tabwriter.NewWriter(report.Stdout, 4, 4, 1, '\t', 0) + fmt.Fprint(tw, "\n") + + sortedOrgs := sortKeys(summary.OrgCardinality, report.TopN) + sortedBuckets := sortKeys(summary.BucketCardinality, report.TopN) + + for orgIndex := range sortedOrgs { + // if we specify a bucket, we do not print the org cardinality + if report.BucketID == nil { + fmt.Fprintf(tw, "Org %s total: %d\n", sortedOrgs[orgIndex].id, sortedOrgs[orgIndex].card) + } - for orgID, orgCard := range summary.OrgCardinality { - fmt.Fprintf(tw, "Org %s total: %d\n", orgID.String(), orgCard) + for bucketIndex := range sortedBuckets { + fmt.Fprintf(tw, "\tBucket %s total: %d\n", sortedBuckets[bucketIndex].id, sortedBuckets[bucketIndex].card) - for _, bucketID := range report.orgToBucket[orgID] { - fmt.Fprintf(tw, "\tBucket %s total: %d\n", bucketID.String(), summary.BucketCardinality[bucketID]) if report.ByMeasurement { - for mName, mCard := range summary.BucketMeasurementCardinality[bucketID] { - fmt.Fprintf(tw, "\t\t_m=%s\t%d\n", mName, mCard) + bucketID, _ := influxdb.IDFromString(sortedBuckets[bucketIndex].id) + sortedMeasurements := sortMeasurements(summary.BucketMeasurementCardinality[*bucketID], report.TopN) + + for measIndex := range sortedMeasurements { + fmt.Fprintf(tw, "\t\t_m=%s\t%d\n", sortedMeasurements[measIndex].id, sortedMeasurements[measIndex].card) } } } } + fmt.Fprint(tw, "\n") tw.Flush() } + +// sortKeys is a quick helper to return the sorted set of a map's keys +// sortKeys will only return report.topN keys if the flag is set +type result struct { + id string + card int64 +} + +type resultList []result + +func (a resultList) Len() int { return len(a) } +func (a resultList) Less(i, j int) bool { return a[i].card < a[j].card } +func (a resultList) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +func sortKeys(vals map[influxdb.ID]int64, topN int) resultList { + sorted := make(resultList, 0) + for k, v := range vals { + sorted = append(sorted, result{k.String(), v}) + } + sort.Sort(sort.Reverse(sorted)) + + if topN == 0 { + return sorted + } + if topN > len(sorted) { + topN = len(sorted) + } + return sorted[:topN] +} + +func sortMeasurements(vals map[string]int64, topN int) resultList { + sorted := make(resultList, 0) + for k, v := range vals { + sorted = append(sorted, result{k, v}) + } + sort.Sort(sort.Reverse(sorted)) + + if topN == 0 { + return sorted + } + if topN > len(sorted) { + topN = len(sorted) + } + return sorted[:topN] +} From 17b1dd856222f272050c6efa6511bcb925ed8349 Mon Sep 17 00:00:00 2001 From: Max U Date: Thu, 25 Jul 2019 15:25:33 -0700 Subject: [PATCH 10/18] feat(tsi1): add shorthand for --top flag as -t, plus cleaning --- cmd/influxd/inspect/report_tsi1.go | 2 +- tsdb/series_segment.go | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/cmd/influxd/inspect/report_tsi1.go b/cmd/influxd/inspect/report_tsi1.go index 18a265c5bea..129baea7e49 100644 --- a/cmd/influxd/inspect/report_tsi1.go +++ b/cmd/influxd/inspect/report_tsi1.go @@ -49,7 +49,7 @@ func NewReportTsiCommand() *cobra.Command { reportTsiCommand.Flags().StringVar(&tsiFlags.seriesFilePath, "series-file", "", "Optional path to series file. Defaults /path/to/db-path/_series") reportTsiCommand.Flags().BoolVarP(&tsiFlags.byMeasurement, "measurements", "m", false, "Segment cardinality by measurements") // fs.BoolVar(&cmd.byTagKey, "tag-key", false, "Segment cardinality by tag keys (overrides `measurements`") - reportTsiCommand.Flags().IntVar(&tsiFlags.topN, "top", 0, "Limit results to top n") + reportTsiCommand.Flags().IntVarP(&tsiFlags.topN, "top", "t", 0, "Limit results to top n") reportTsiCommand.Flags().IntVar(&tsiFlags.concurrency, "c", runtime.GOMAXPROCS(0), "Set worker concurrency. Defaults to GOMAXPROCS setting.") reportTsiCommand.Flags().StringVarP(&tsiFlags.bucket, "bucket", "b", "", "If bucket is specified, org must be specified") reportTsiCommand.Flags().StringVarP(&tsiFlags.org, "org", "o", "", "Org to be reported") diff --git a/tsdb/series_segment.go b/tsdb/series_segment.go index c9571c22534..16b32c5aa8c 100644 --- a/tsdb/series_segment.go +++ b/tsdb/series_segment.go @@ -312,7 +312,6 @@ func IsValidSeriesSegmentFilename(filename string) bool { // ParseSeriesSegmentFilename returns the id represented by the hexadecimal filename. func ParseSeriesSegmentFilename(filename string) (uint16, error) { i, err := strconv.ParseUint(filename, 16, 32) - //return uint16(i), errors.New("err- i:" + string(i) + ", to: " + string(uint16(i))) return uint16(i), err } From 2c1f3e298715eef5b53b49664c71c30b87856efd Mon Sep 17 00:00:00 2001 From: Max U Date: Thu, 25 Jul 2019 16:11:50 -0700 Subject: [PATCH 11/18] fix(tsi1): remove obnoxious log messages --- tsdb/tsi1/tsi1_report.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tsdb/tsi1/tsi1_report.go b/tsdb/tsi1/tsi1_report.go index 001742ff287..661cec358f1 100644 --- a/tsdb/tsi1/tsi1_report.go +++ b/tsdb/tsi1/tsi1_report.go @@ -91,7 +91,10 @@ func (report *ReportCommand) Run(print bool) (*Summary, error) { } sFile := tsdb.NewSeriesFile(report.SeriesDirPath) - sFile.WithLogger(report.Logger) + + // TODO: do we actually want the seriesfile logging? + // sFile.WithLogger(report.Logger) + if err := sFile.Open(context.Background()); err != nil { report.Logger.Error("failed to open series") return nil, err From 9bd6200f1555991ad29afb0f948ceccc58f0adee Mon Sep 17 00:00:00 2001 From: Max U Date: Thu, 25 Jul 2019 16:31:34 -0700 Subject: [PATCH 12/18] fix(tsi1): make mergeable --- cmd/influxd/inspect/report_tsi1.go | 3 +++ tsdb/tsi1/tsi1_report.go | 6 ------ 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/cmd/influxd/inspect/report_tsi1.go b/cmd/influxd/inspect/report_tsi1.go index 129baea7e49..7a802edc22a 100644 --- a/cmd/influxd/inspect/report_tsi1.go +++ b/cmd/influxd/inspect/report_tsi1.go @@ -65,6 +65,9 @@ func RunReportTsi(cmd *cobra.Command, args []string) error { config := logger.NewConfig() config.Level = zapcore.InfoLevel log, err := config.New(os.Stderr) + if err != nil { + return err + } // if path is unset, set to $HOME/.influxdbv2/engine" if tsiFlags.Path == "" { diff --git a/tsdb/tsi1/tsi1_report.go b/tsdb/tsi1/tsi1_report.go index 661cec358f1..ed6d6dedbea 100644 --- a/tsdb/tsi1/tsi1_report.go +++ b/tsdb/tsi1/tsi1_report.go @@ -157,12 +157,6 @@ func (c *cardinality) cardinality() int64 { return int64(c.set.Cardinality()) } -type cardinalities []*cardinality - -func (a cardinalities) Len() int { return len(a) } -func (a cardinalities) Less(i, j int) bool { return a[i].cardinality() < a[j].cardinality() } -func (a cardinalities) Swap(i, j int) { a[i], a[j] = a[j], a[i] } - func (report *ReportCommand) calculateCardinalities() error { idx := report.indexFile itr, err := idx.MeasurementIterator() From 7ce1b8109fca1566bb91544c9673b78ecba41b9a Mon Sep 17 00:00:00 2001 From: Adam Perlin Date: Thu, 25 Jul 2019 16:45:42 -0700 Subject: [PATCH 13/18] chore(tsi1): Clean up flags and naming in report-tsi tool; add comments --- cmd/influxd/inspect/inspect.go | 1 + cmd/influxd/inspect/report_tsi1.go | 73 ++++++++++++++---------------- tsdb/tsi1/tsi1_report.go | 23 ++++++---- 3 files changed, 49 insertions(+), 48 deletions(-) diff --git a/cmd/influxd/inspect/inspect.go b/cmd/influxd/inspect/inspect.go index 0a53d7c889e..aa0e222d099 100644 --- a/cmd/influxd/inspect/inspect.go +++ b/cmd/influxd/inspect/inspect.go @@ -18,6 +18,7 @@ func NewCommand() *cobra.Command { NewReportTSMCommand(), NewVerifyTSMCommand(), NewVerifyWALCommand(), + NewReportTSICommand(), } for _, command := range subCommands { diff --git a/cmd/influxd/inspect/report_tsi1.go b/cmd/influxd/inspect/report_tsi1.go index 7a802edc22a..2d6dc69099e 100644 --- a/cmd/influxd/inspect/report_tsi1.go +++ b/cmd/influxd/inspect/report_tsi1.go @@ -5,62 +5,59 @@ import ( "io" "os" "path" - "runtime" "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/logger" - "github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxdb/tsdb/tsi1" "github.com/spf13/cobra" "go.uber.org/zap/zapcore" ) // Command represents the program execution for "influxd inspect report-tsi". -var tsiFlags = struct { +var reportTSIFlags = struct { // Standard input/output, overridden for testing. Stderr io.Writer Stdout io.Writer - Path string - org string - bucket string + // Data path options + Path string // optional. Defaults to dbPath/engine/index + SeriesFilePath string // optional. Defaults to dbPath/_series - seriesFilePath string // optional. Defaults to dbPath/_series - sfile *tsdb.SeriesFile + // Tenant filtering options + Org string + Bucket string - topN int - byMeasurement bool - byTagKey bool - - // How many goroutines to dedicate to calculating cardinality. - concurrency int + // Reporting options + TopN int + ByMeasurement bool + byTagKey bool // currently unused }{} // NewReportTsiCommand returns a new instance of Command with default setting applied. -func NewReportTsiCommand() *cobra.Command { - reportTsiCommand := &cobra.Command{ +func NewReportTSICommand() *cobra.Command { + reportTSICommand := &cobra.Command{ Use: "report-tsi", Short: "Reports the cardinality of tsi files short", Long: `Reports the cardinality of tsi files long.`, - RunE: RunReportTsi, + RunE: RunReportTSI, } - reportTsiCommand.Flags().StringVar(&tsiFlags.Path, "path", os.Getenv("HOME")+"/.influxdbv2/engine", "Path to data engine. Defaults $HOME/.influxdbv2/engine") - reportTsiCommand.Flags().StringVar(&tsiFlags.seriesFilePath, "series-file", "", "Optional path to series file. Defaults /path/to/db-path/_series") - reportTsiCommand.Flags().BoolVarP(&tsiFlags.byMeasurement, "measurements", "m", false, "Segment cardinality by measurements") + + reportTSICommand.Flags().StringVar(&reportTSIFlags.Path, "path", os.Getenv("HOME")+"/.influxdbv2/engine", "Path to data engine. Defaults $HOME/.influxdbv2/engine") + reportTSICommand.Flags().StringVar(&reportTSIFlags.SeriesFilePath, "series-file", "", "Optional path to series file. Defaults /path/to/db-path/_series") + reportTSICommand.Flags().BoolVarP(&reportTSIFlags.ByMeasurement, "measurements", "m", false, "Segment cardinality by measurements") // fs.BoolVar(&cmd.byTagKey, "tag-key", false, "Segment cardinality by tag keys (overrides `measurements`") - reportTsiCommand.Flags().IntVarP(&tsiFlags.topN, "top", "t", 0, "Limit results to top n") - reportTsiCommand.Flags().IntVar(&tsiFlags.concurrency, "c", runtime.GOMAXPROCS(0), "Set worker concurrency. Defaults to GOMAXPROCS setting.") - reportTsiCommand.Flags().StringVarP(&tsiFlags.bucket, "bucket", "b", "", "If bucket is specified, org must be specified") - reportTsiCommand.Flags().StringVarP(&tsiFlags.org, "org", "o", "", "Org to be reported") + reportTSICommand.Flags().IntVarP(&reportTSIFlags.TopN, "top", "t", 0, "Limit results to top n") + reportTSICommand.Flags().StringVarP(&reportTSIFlags.Bucket, "bucket", "b", "", "If bucket is specified, org must be specified") + reportTSICommand.Flags().StringVarP(&reportTSIFlags.Org, "org", "o", "", "Org to be reported") - reportTsiCommand.SetOutput(tsiFlags.Stdout) + reportTSICommand.SetOutput(reportTSIFlags.Stdout) - return reportTsiCommand + return reportTSICommand } -// RunReportTsi executes the run command for ReportTsi. -func RunReportTsi(cmd *cobra.Command, args []string) error { +// RunReportTSI executes the run command for ReportTSI. +func RunReportTSI(cmd *cobra.Command, args []string) error { // set up log config := logger.NewConfig() config.Level = zapcore.InfoLevel @@ -70,27 +67,26 @@ func RunReportTsi(cmd *cobra.Command, args []string) error { } // if path is unset, set to $HOME/.influxdbv2/engine" - if tsiFlags.Path == "" { - tsiFlags.Path = path.Join(os.Getenv("HOME"), ".influxdbv2/engine") + if reportTSIFlags.Path == "" { + reportTSIFlags.Path = path.Join(os.Getenv("HOME"), ".influxdbv2/engine") } report := tsi1.NewReportCommand() - report.Concurrency = tsiFlags.concurrency - report.DataPath = tsiFlags.Path + report.DataPath = reportTSIFlags.Path report.Logger = log - report.ByMeasurement = tsiFlags.byMeasurement - report.TopN = tsiFlags.topN + report.ByMeasurement = reportTSIFlags.ByMeasurement + report.TopN = reportTSIFlags.TopN - if tsiFlags.org != "" { - if orgID, err := influxdb.IDFromString(tsiFlags.org); err != nil { + if reportTSIFlags.Org != "" { + if orgID, err := influxdb.IDFromString(reportTSIFlags.Org); err != nil { return err } else { report.OrgID = orgID } } - if tsiFlags.bucket != "" { - if bucketID, err := influxdb.IDFromString(tsiFlags.bucket); err != nil { + if reportTSIFlags.Bucket != "" { + if bucketID, err := influxdb.IDFromString(reportTSIFlags.Bucket); err != nil { return err } else if report.OrgID == nil { return errors.New("org must be provided if filtering by bucket") @@ -99,6 +95,7 @@ func RunReportTsi(cmd *cobra.Command, args []string) error { } } + // Run command with printing enabled _, err = report.Run(true) if err != nil { return err diff --git a/tsdb/tsi1/tsi1_report.go b/tsdb/tsi1/tsi1_report.go index ed6d6dedbea..93d9a0ef52b 100644 --- a/tsdb/tsi1/tsi1_report.go +++ b/tsdb/tsi1/tsi1_report.go @@ -7,7 +7,6 @@ import ( "math" "os" "path/filepath" - "runtime" "sort" "text/tabwriter" @@ -22,7 +21,7 @@ const ( useBitmapN = 25 ) -// ReportCommand represents the program execution for "influxd reporttsi". +// ReportCommand represents the program execution for "influxd inspect report-tsi". type ReportCommand struct { // Standard input/output, overridden for testing. Stderr io.Writer @@ -45,9 +44,6 @@ type ReportCommand struct { TopN int ByMeasurement bool byTagKey bool - - // How many goroutines to dedicate to calculating cardinality. - Concurrency int } // NewReportCommand returns a new instance of ReportCommand with default setting applied. @@ -60,7 +56,6 @@ func NewReportCommand() *ReportCommand { orgToBucket: make(map[influxdb.ID][]influxdb.ID), TopN: 0, byTagKey: false, - Concurrency: runtime.GOMAXPROCS(0), } } @@ -81,8 +76,7 @@ func newSummary() *Summary { // Run runs the report-tsi tool which can be used to find the cardinality // any org or bucket. Run returns a *ReportTSISummary, which contains maps for finding -// the cardinality of a bucket or org based on it's influxdb.ID -// The *ReportTSISummary will be nil if there is a failure +// the cardinality of a bucket or org based on its influxdb.ID func (report *ReportCommand) Run(print bool) (*Summary, error) { report.Stdout = os.Stdout @@ -177,6 +171,7 @@ OUTER: break OUTER } + // decode org and bucket from measurement name var a [16]byte copy(a[:], name[:16]) org, bucket := tsdb.DecodeName(a) @@ -201,6 +196,7 @@ OUTER: report.byBucketMeasurement[bucket] = make(map[string]*cardinality) } + // initialize total cardinality tracking struct for this bucket if c, ok := report.byBucket[bucket]; !ok { bucketCard = &cardinality{name: []byte(bucket.String())} report.byBucket[bucket] = bucketCard @@ -208,6 +204,7 @@ OUTER: bucketCard = c } + // initialize total cardinality tracking struct for this org if c, ok := report.byOrg[org]; !ok { orgCard = &cardinality{name: []byte(bucket.String())} report.byOrg[org] = orgCard @@ -215,6 +212,7 @@ OUTER: orgCard = c } + // track org to bucket mappings so we can associate org an bucket in the printed summary if _, ok := report.orgToBucket[org]; !ok { report.orgToBucket[org] = []influxdb.ID{} } @@ -235,13 +233,17 @@ OUTER: orgCard.add(id) bucketCard.add(id) + // retrieve tags associated with series id so we can get + // associated measurement _, tags := report.sfile.Series(e.SeriesID) if len(tags) == 0 { - panic(fmt.Sprintf("series key too short")) + panic(fmt.Sprintf("empty series key")) } - mName := string(tags[0].Value) // measurement name should be first tag. + // measurement name should be first tag. + mName := string(tags[0].Value) + // update measurement-level cardinality if tracking by measurement if report.ByMeasurement { var mCard *cardinality if cardForM, ok := report.byBucketMeasurement[bucket][mName]; !ok { @@ -293,6 +295,7 @@ func (report *ReportCommand) printCardinalitySummary(summary *Summary) { tw := tabwriter.NewWriter(report.Stdout, 4, 4, 1, '\t', 0) fmt.Fprint(tw, "\n") + // sort total org and bucket cardinalities and limit to top n values sortedOrgs := sortKeys(summary.OrgCardinality, report.TopN) sortedBuckets := sortKeys(summary.BucketCardinality, report.TopN) From d47a578258b1743c42c8dc4460fa11411b985238 Mon Sep 17 00:00:00 2001 From: Adam Perlin Date: Thu, 25 Jul 2019 17:50:37 -0700 Subject: [PATCH 14/18] fix(tsi1): map org to bucket in report-tsi tool so output is more useful --- tsdb/tsi1/tsi1_report.go | 97 +++++++++++++++++++++------------------- 1 file changed, 51 insertions(+), 46 deletions(-) diff --git a/tsdb/tsi1/tsi1_report.go b/tsdb/tsi1/tsi1_report.go index 93d9a0ef52b..1ce84530d25 100644 --- a/tsdb/tsi1/tsi1_report.go +++ b/tsdb/tsi1/tsi1_report.go @@ -9,6 +9,7 @@ import ( "path/filepath" "sort" "text/tabwriter" + "time" "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/tsdb" @@ -32,8 +33,7 @@ type ReportCommand struct { DataPath string OrgID, BucketID *influxdb.ID - byOrg map[influxdb.ID]*cardinality - byBucket map[influxdb.ID]*cardinality + byOrgBucket map[influxdb.ID]map[influxdb.ID]*cardinality byBucketMeasurement map[influxdb.ID]map[string]*cardinality orgToBucket map[influxdb.ID][]influxdb.ID @@ -44,14 +44,15 @@ type ReportCommand struct { TopN int ByMeasurement bool byTagKey bool + + start time.Time } // NewReportCommand returns a new instance of ReportCommand with default setting applied. func NewReportCommand() *ReportCommand { return &ReportCommand{ Logger: zap.NewNop(), - byOrg: make(map[influxdb.ID]*cardinality), - byBucket: make(map[influxdb.ID]*cardinality), + byOrgBucket: make(map[influxdb.ID]map[influxdb.ID]*cardinality), byBucketMeasurement: make(map[influxdb.ID]map[string]*cardinality), orgToBucket: make(map[influxdb.ID][]influxdb.ID), TopN: 0, @@ -61,15 +62,16 @@ func NewReportCommand() *ReportCommand { // ReportTSISummary is returned by a report-tsi Run() command and is used to access cardinality information type Summary struct { + TotalCardinality int64 OrgCardinality map[influxdb.ID]int64 - BucketCardinality map[influxdb.ID]int64 + BucketByOrgCardinality map[influxdb.ID]map[influxdb.ID]int64 BucketMeasurementCardinality map[influxdb.ID]map[string]int64 } func newSummary() *Summary { return &Summary{ OrgCardinality: make(map[influxdb.ID]int64), - BucketCardinality: make(map[influxdb.ID]int64), + BucketByOrgCardinality: make(map[influxdb.ID]map[influxdb.ID]int64), BucketMeasurementCardinality: make(map[influxdb.ID]map[string]int64), } } @@ -78,6 +80,7 @@ func newSummary() *Summary { // any org or bucket. Run returns a *ReportTSISummary, which contains maps for finding // the cardinality of a bucket or org based on its influxdb.ID func (report *ReportCommand) Run(print bool) (*Summary, error) { + report.start = time.Now() report.Stdout = os.Stdout if report.SeriesDirPath == "" { @@ -161,7 +164,6 @@ func (report *ReportCommand) calculateCardinalities() error { } defer itr.Close() - var totalCard = &cardinality{name: []byte("total")} OUTER: for { name, err := itr.Next() @@ -189,36 +191,25 @@ OUTER: continue } - var orgCard, bucketCard *cardinality + var bucketCard *cardinality // initialize map of bucket to measurements if _, ok := report.byBucketMeasurement[bucket]; !ok { report.byBucketMeasurement[bucket] = make(map[string]*cardinality) } + if _, ok := report.byOrgBucket[org]; !ok { + report.byOrgBucket[org] = make(map[influxdb.ID]*cardinality) + } + // initialize total cardinality tracking struct for this bucket - if c, ok := report.byBucket[bucket]; !ok { + if c, ok := report.byOrgBucket[org][bucket]; !ok { bucketCard = &cardinality{name: []byte(bucket.String())} - report.byBucket[bucket] = bucketCard + report.byOrgBucket[org][bucket] = bucketCard } else { bucketCard = c } - // initialize total cardinality tracking struct for this org - if c, ok := report.byOrg[org]; !ok { - orgCard = &cardinality{name: []byte(bucket.String())} - report.byOrg[org] = orgCard - } else { - orgCard = c - } - - // track org to bucket mappings so we can associate org an bucket in the printed summary - if _, ok := report.orgToBucket[org]; !ok { - report.orgToBucket[org] = []influxdb.ID{} - } - - report.orgToBucket[org] = append(report.orgToBucket[org], bucket) - var e tsdb.SeriesIDElem for e, err = sitr.Next(); err == nil && e.SeriesID.ID != 0; e, err = sitr.Next() { id := e.SeriesID.ID @@ -227,10 +218,7 @@ OUTER: panic(fmt.Sprintf("series ID is too large: %d (max %d). Corrupted series file?", e.SeriesID, uint32(math.MaxUint32))) } - totalCard.add(id) - - // add cardinalities to org and bucket maps - orgCard.add(id) + // add cardinality to bucket bucketCard.add(id) // retrieve tags associated with series id so we can get @@ -240,7 +228,7 @@ OUTER: panic(fmt.Sprintf("empty series key")) } - // measurement name should be first tag. + // measurement name should be first tag mName := string(tags[0].Value) // update measurement-level cardinality if tracking by measurement @@ -271,16 +259,24 @@ func (report *ReportCommand) calculateOrgBucketCardinality() (*Summary, error) { if err := report.calculateCardinalities(); err != nil { return nil, err } + + var totalCard int64 // Generate a new summary summary := newSummary() - for bucketID, bucketCard := range report.byBucket { - summary.BucketCardinality[bucketID] = bucketCard.cardinality() - summary.BucketMeasurementCardinality[bucketID] = make(map[string]int64) + for orgID, bucketMap := range report.byOrgBucket { + summary.BucketByOrgCardinality[orgID] = make(map[influxdb.ID]int64) + orgTotal := int64(0) + for bucketID, bucketCard := range bucketMap { + count := bucketCard.cardinality() + summary.BucketByOrgCardinality[orgID][bucketID] = count + summary.BucketMeasurementCardinality[bucketID] = make(map[string]int64) + orgTotal += count + totalCard += count + } + summary.OrgCardinality[orgID] = orgTotal } - for orgID, orgCard := range report.byOrg { - summary.OrgCardinality[orgID] = orgCard.cardinality() - } + summary.TotalCardinality = totalCard for bucketID, bucketMeasurement := range report.byBucketMeasurement { for mName, mCard := range bucketMeasurement { @@ -295,30 +291,39 @@ func (report *ReportCommand) printCardinalitySummary(summary *Summary) { tw := tabwriter.NewWriter(report.Stdout, 4, 4, 1, '\t', 0) fmt.Fprint(tw, "\n") - // sort total org and bucket cardinalities and limit to top n values + fmt.Fprintf(tw, "Total: %d\n", summary.TotalCardinality) + // sort total org and bucket and limit to top n values sortedOrgs := sortKeys(summary.OrgCardinality, report.TopN) - sortedBuckets := sortKeys(summary.BucketCardinality, report.TopN) - for orgIndex := range sortedOrgs { + for i, orgResult := range sortedOrgs { + orgID, _ := influxdb.IDFromString(orgResult.id) + sortedBuckets := sortKeys(summary.BucketByOrgCardinality[*orgID], report.TopN) // if we specify a bucket, we do not print the org cardinality + fmt.Fprintln(tw, "===============") if report.BucketID == nil { - fmt.Fprintf(tw, "Org %s total: %d\n", sortedOrgs[orgIndex].id, sortedOrgs[orgIndex].card) + fmt.Fprintf(tw, "Org %s total: %d\n", orgResult.id, orgResult.card) } - for bucketIndex := range sortedBuckets { - fmt.Fprintf(tw, "\tBucket %s total: %d\n", sortedBuckets[bucketIndex].id, sortedBuckets[bucketIndex].card) + for _, bucketResult := range sortedBuckets { + fmt.Fprintf(tw, "\tBucket %s total: %d\n", bucketResult.id, bucketResult.card) if report.ByMeasurement { - bucketID, _ := influxdb.IDFromString(sortedBuckets[bucketIndex].id) + bucketID, _ := influxdb.IDFromString(bucketResult.id) sortedMeasurements := sortMeasurements(summary.BucketMeasurementCardinality[*bucketID], report.TopN) - for measIndex := range sortedMeasurements { - fmt.Fprintf(tw, "\t\t_m=%s\t%d\n", sortedMeasurements[measIndex].id, sortedMeasurements[measIndex].card) + for _, measResult := range sortedMeasurements { + fmt.Fprintf(tw, "\t\t_m=%s\t%d\n", measResult.id, measResult.card) } } } + if i == len(sortedOrgs)-1 { + fmt.Fprintln(tw, "===============") + } } - fmt.Fprint(tw, "\n") + fmt.Fprint(tw, "\n\n") + + elapsed := time.Since(report.start) + fmt.Fprintf(tw, "Finished in %v\n", elapsed) tw.Flush() } From a0f4d714eaea2421276d62605451016a3a46b398 Mon Sep 17 00:00:00 2001 From: Adam Perlin Date: Thu, 25 Jul 2019 18:09:26 -0700 Subject: [PATCH 15/18] chore(tsi1): rename tsi1_report.go -> report.go --- tsdb/tsi1/{tsi1_report.go => report.go} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename tsdb/tsi1/{tsi1_report.go => report.go} (100%) diff --git a/tsdb/tsi1/tsi1_report.go b/tsdb/tsi1/report.go similarity index 100% rename from tsdb/tsi1/tsi1_report.go rename to tsdb/tsi1/report.go From 4fef1683a0c73caa91cb295f22fb5acd68e7bdea Mon Sep 17 00:00:00 2001 From: Adam Perlin Date: Fri, 26 Jul 2019 16:20:54 -0700 Subject: [PATCH 16/18] refactor(tsi1): address review comments for report-tsi tool --- cmd/influxd/inspect/report_tsi1.go | 30 +++--- tsdb/tsi1/report.go | 149 +++++++++++++++-------------- 2 files changed, 87 insertions(+), 92 deletions(-) diff --git a/cmd/influxd/inspect/report_tsi1.go b/cmd/influxd/inspect/report_tsi1.go index 2d6dc69099e..8db26ff080d 100644 --- a/cmd/influxd/inspect/report_tsi1.go +++ b/cmd/influxd/inspect/report_tsi1.go @@ -36,24 +36,23 @@ var reportTSIFlags = struct { // NewReportTsiCommand returns a new instance of Command with default setting applied. func NewReportTSICommand() *cobra.Command { - reportTSICommand := &cobra.Command{ + cmd := &cobra.Command{ Use: "report-tsi", Short: "Reports the cardinality of tsi files short", Long: `Reports the cardinality of tsi files long.`, RunE: RunReportTSI, } - reportTSICommand.Flags().StringVar(&reportTSIFlags.Path, "path", os.Getenv("HOME")+"/.influxdbv2/engine", "Path to data engine. Defaults $HOME/.influxdbv2/engine") - reportTSICommand.Flags().StringVar(&reportTSIFlags.SeriesFilePath, "series-file", "", "Optional path to series file. Defaults /path/to/db-path/_series") - reportTSICommand.Flags().BoolVarP(&reportTSIFlags.ByMeasurement, "measurements", "m", false, "Segment cardinality by measurements") - // fs.BoolVar(&cmd.byTagKey, "tag-key", false, "Segment cardinality by tag keys (overrides `measurements`") - reportTSICommand.Flags().IntVarP(&reportTSIFlags.TopN, "top", "t", 0, "Limit results to top n") - reportTSICommand.Flags().StringVarP(&reportTSIFlags.Bucket, "bucket", "b", "", "If bucket is specified, org must be specified") - reportTSICommand.Flags().StringVarP(&reportTSIFlags.Org, "org", "o", "", "Org to be reported") + cmd.Flags().StringVar(&reportTSIFlags.Path, "path", os.Getenv("HOME")+"/.influxdbv2/engine", "Path to data engine. Defaults $HOME/.influxdbv2/engine") + cmd.Flags().StringVar(&reportTSIFlags.SeriesFilePath, "series-file", "", "Optional path to series file. Defaults /path/to/db-path/_series") + cmd.Flags().BoolVarP(&reportTSIFlags.ByMeasurement, "measurements", "m", false, "Segment cardinality by measurements") + cmd.Flags().IntVarP(&reportTSIFlags.TopN, "top", "t", 0, "Limit results to top n") + cmd.Flags().StringVarP(&reportTSIFlags.Bucket, "bucket", "b", "", "If bucket is specified, org must be specified") + cmd.Flags().StringVarP(&reportTSIFlags.Org, "org", "o", "", "Org to be reported") - reportTSICommand.SetOutput(reportTSIFlags.Stdout) + cmd.SetOutput(reportTSIFlags.Stdout) - return reportTSICommand + return cmd } // RunReportTSI executes the run command for ReportTSI. @@ -78,26 +77,21 @@ func RunReportTSI(cmd *cobra.Command, args []string) error { report.TopN = reportTSIFlags.TopN if reportTSIFlags.Org != "" { - if orgID, err := influxdb.IDFromString(reportTSIFlags.Org); err != nil { + if report.OrgID, err = influxdb.IDFromString(reportTSIFlags.Org); err != nil { return err - } else { - report.OrgID = orgID } } if reportTSIFlags.Bucket != "" { - if bucketID, err := influxdb.IDFromString(reportTSIFlags.Bucket); err != nil { + if report.BucketID, err = influxdb.IDFromString(reportTSIFlags.Bucket); err != nil { return err } else if report.OrgID == nil { return errors.New("org must be provided if filtering by bucket") - } else { - report.BucketID = bucketID } } // Run command with printing enabled - _, err = report.Run(true) - if err != nil { + if _, err = report.Run(true); err != nil { return err } return nil diff --git a/tsdb/tsi1/report.go b/tsdb/tsi1/report.go index 1ce84530d25..65977bdeff4 100644 --- a/tsdb/tsi1/report.go +++ b/tsdb/tsi1/report.go @@ -81,26 +81,24 @@ func newSummary() *Summary { // the cardinality of a bucket or org based on its influxdb.ID func (report *ReportCommand) Run(print bool) (*Summary, error) { report.start = time.Now() + report.Stdout = os.Stdout if report.SeriesDirPath == "" { report.SeriesDirPath = filepath.Join(report.DataPath, "_series") } - sFile := tsdb.NewSeriesFile(report.SeriesDirPath) - - // TODO: do we actually want the seriesfile logging? - // sFile.WithLogger(report.Logger) + sfile := tsdb.NewSeriesFile(report.SeriesDirPath) - if err := sFile.Open(context.Background()); err != nil { + if err := sfile.Open(context.Background()); err != nil { report.Logger.Error("failed to open series") return nil, err } - defer sFile.Close() - report.sfile = sFile + defer sfile.Close() + report.sfile = sfile indexPath := filepath.Join(report.DataPath, "index") - report.indexFile = NewIndex(sFile, NewConfig(), WithPath(indexPath)) + report.indexFile = NewIndex(sfile, NewConfig(), WithPath(indexPath)) if err := report.indexFile.Open(context.Background()); err != nil { return nil, err } @@ -155,8 +153,7 @@ func (c *cardinality) cardinality() int64 { } func (report *ReportCommand) calculateCardinalities() error { - idx := report.indexFile - itr, err := idx.MeasurementIterator() + itr, err := report.indexFile.MeasurementIterator() if err != nil { return err } else if itr == nil { @@ -164,94 +161,98 @@ func (report *ReportCommand) calculateCardinalities() error { } defer itr.Close() -OUTER: for { name, err := itr.Next() if err != nil { return err } else if name == nil { - break OUTER - } - - // decode org and bucket from measurement name - var a [16]byte - copy(a[:], name[:16]) - org, bucket := tsdb.DecodeName(a) - - if report.OrgID != nil && *report.OrgID != org { - continue - } else if report.BucketID != nil && *report.BucketID != bucket { - continue + return nil } - sitr, err := idx.MeasurementSeriesIDIterator(name) - if err != nil { + if err = report.calculateMeasurementCardinalities(name); err != nil { return err - } else if sitr == nil { - continue - } - - var bucketCard *cardinality - - // initialize map of bucket to measurements - if _, ok := report.byBucketMeasurement[bucket]; !ok { - report.byBucketMeasurement[bucket] = make(map[string]*cardinality) } + } +} - if _, ok := report.byOrgBucket[org]; !ok { - report.byOrgBucket[org] = make(map[influxdb.ID]*cardinality) - } +func (report *ReportCommand) calculateMeasurementCardinalities(name []byte) error { + // decode org and bucket from measurement name + var a [16]byte + copy(a[:], name[:16]) + org, bucket := tsdb.DecodeName(a) + if report.OrgID != nil && *report.OrgID != org || + report.BucketID != nil && *report.BucketID != bucket { + return nil + } - // initialize total cardinality tracking struct for this bucket - if c, ok := report.byOrgBucket[org][bucket]; !ok { - bucketCard = &cardinality{name: []byte(bucket.String())} - report.byOrgBucket[org][bucket] = bucketCard - } else { - bucketCard = c - } + idx := report.indexFile + sitr, err := idx.MeasurementSeriesIDIterator(name) + if err != nil { + return err + } else if sitr == nil { + return nil + } - var e tsdb.SeriesIDElem - for e, err = sitr.Next(); err == nil && e.SeriesID.ID != 0; e, err = sitr.Next() { - id := e.SeriesID.ID + defer sitr.Close() - if id > math.MaxUint32 { - panic(fmt.Sprintf("series ID is too large: %d (max %d). Corrupted series file?", e.SeriesID, uint32(math.MaxUint32))) - } + var bucketCard *cardinality - // add cardinality to bucket - bucketCard.add(id) + // initialize map of bucket to measurements + if _, ok := report.byBucketMeasurement[bucket]; !ok { + report.byBucketMeasurement[bucket] = make(map[string]*cardinality) + } - // retrieve tags associated with series id so we can get - // associated measurement - _, tags := report.sfile.Series(e.SeriesID) - if len(tags) == 0 { - panic(fmt.Sprintf("empty series key")) - } + if _, ok := report.byOrgBucket[org]; !ok { + report.byOrgBucket[org] = make(map[influxdb.ID]*cardinality) + } - // measurement name should be first tag - mName := string(tags[0].Value) + // initialize total cardinality tracking struct for this bucket + if c, ok := report.byOrgBucket[org][bucket]; !ok { + bucketCard = &cardinality{name: []byte(bucket.String())} + report.byOrgBucket[org][bucket] = bucketCard + } else { + bucketCard = c + } - // update measurement-level cardinality if tracking by measurement - if report.ByMeasurement { - var mCard *cardinality - if cardForM, ok := report.byBucketMeasurement[bucket][mName]; !ok { - mCard = &cardinality{name: []byte(mName)} - report.byBucketMeasurement[bucket][mName] = mCard - } else { - mCard = cardForM - } + for { + e, err := sitr.Next() + if err != nil { + return err + } else if e.SeriesID.ID == 0 { + break + } - mCard.add(id) - } + id := e.SeriesID.ID + if id > math.MaxUint32 { + return fmt.Errorf("series ID is too large: %d (max %d). Corrupted series file?", e.SeriesID, uint32(math.MaxUint32)) } - sitr.Close() + // add cardinality to bucket + bucketCard.add(id) - if err != nil { - return err + // retrieve tags associated with series id so we can get + // associated measurement + _, tags := report.sfile.Series(e.SeriesID) + if len(tags) == 0 { + return fmt.Errorf("series ID has empty key: %d", e.SeriesID) } + // measurement name should be first tag + mName := string(tags[0].Value) + + // update measurement-level cardinality if tracking by measurement + if report.ByMeasurement { + var mCard *cardinality + if cardForM, ok := report.byBucketMeasurement[bucket][mName]; !ok { + mCard = &cardinality{name: []byte(mName)} + report.byBucketMeasurement[bucket][mName] = mCard + } else { + mCard = cardForM + } + mCard.add(id) + } } + return nil } From 64747e978179a41e167f2dbf23bfe454305840bc Mon Sep 17 00:00:00 2001 From: Max U Date: Mon, 5 Aug 2019 10:03:32 -0700 Subject: [PATCH 17/18] refactor(tsi1): address config changes to report-tsi tool --- cmd/influxd/inspect/report_tsi1.go | 50 ++++++++++++++++-------------- tsdb/tsi1/report.go | 18 +++++------ 2 files changed, 33 insertions(+), 35 deletions(-) diff --git a/cmd/influxd/inspect/report_tsi1.go b/cmd/influxd/inspect/report_tsi1.go index 8db26ff080d..c210bf985b6 100644 --- a/cmd/influxd/inspect/report_tsi1.go +++ b/cmd/influxd/inspect/report_tsi1.go @@ -4,14 +4,11 @@ import ( "errors" "io" "os" - "path" "github.com/influxdata/influxdb" - "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/tsdb/tsi1" "github.com/spf13/cobra" - "go.uber.org/zap/zapcore" ) // Command represents the program execution for "influxd inspect report-tsi". @@ -38,17 +35,31 @@ var reportTSIFlags = struct { func NewReportTSICommand() *cobra.Command { cmd := &cobra.Command{ Use: "report-tsi", - Short: "Reports the cardinality of tsi files short", - Long: `Reports the cardinality of tsi files long.`, - RunE: RunReportTSI, + Short: "Reports the cardinality of TSI files", + Long: `This command will analyze TSI files within a storage engine directory, reporting + the cardinality of data within the files, divided into org and bucket cardinalities. + + For each report, the following is output: + + * All orgs and buckets in the index; + * The series cardinality within each org and each bucket; + * The time taken to read the index. + + Depending on the --measurements flag, series cardinality is segmented + in the following ways: + + * Series cardinality for each organization; + * Series cardinality for each bucket; + * Series cardinality for each measurement;`, + RunE: RunReportTSI, } - cmd.Flags().StringVar(&reportTSIFlags.Path, "path", os.Getenv("HOME")+"/.influxdbv2/engine", "Path to data engine. Defaults $HOME/.influxdbv2/engine") - cmd.Flags().StringVar(&reportTSIFlags.SeriesFilePath, "series-file", "", "Optional path to series file. Defaults /path/to/db-path/_series") + cmd.Flags().StringVar(&reportTSIFlags.Path, "path", os.Getenv("HOME")+"/.influxdbv2/engine/index", "Path to index. Defaults $HOME/.influxdbv2/engine/index") + cmd.Flags().StringVar(&reportTSIFlags.SeriesFilePath, "series-file", os.Getenv("HOME")+"/.influxdbv2/engine/_series", "Optional path to series file. Defaults $HOME/.influxdbv2/engine/_series") cmd.Flags().BoolVarP(&reportTSIFlags.ByMeasurement, "measurements", "m", false, "Segment cardinality by measurements") cmd.Flags().IntVarP(&reportTSIFlags.TopN, "top", "t", 0, "Limit results to top n") - cmd.Flags().StringVarP(&reportTSIFlags.Bucket, "bucket", "b", "", "If bucket is specified, org must be specified") - cmd.Flags().StringVarP(&reportTSIFlags.Org, "org", "o", "", "Org to be reported") + cmd.Flags().StringVarP(&reportTSIFlags.Bucket, "bucket_id", "b", "", "If bucket is specified, org must be specified. A bucket id must be a base-16 string") + cmd.Flags().StringVarP(&reportTSIFlags.Org, "org_id", "o", "", "Only specified org data will be reported. An org id must be a base-16 string") cmd.SetOutput(reportTSIFlags.Stdout) @@ -57,25 +68,16 @@ func NewReportTSICommand() *cobra.Command { // RunReportTSI executes the run command for ReportTSI. func RunReportTSI(cmd *cobra.Command, args []string) error { - // set up log - config := logger.NewConfig() - config.Level = zapcore.InfoLevel - log, err := config.New(os.Stderr) - if err != nil { - return err - } - - // if path is unset, set to $HOME/.influxdbv2/engine" - if reportTSIFlags.Path == "" { - reportTSIFlags.Path = path.Join(os.Getenv("HOME"), ".influxdbv2/engine") - } - report := tsi1.NewReportCommand() report.DataPath = reportTSIFlags.Path - report.Logger = log report.ByMeasurement = reportTSIFlags.ByMeasurement report.TopN = reportTSIFlags.TopN + report.SeriesDirPath = reportTSIFlags.SeriesFilePath + + report.Stdout = os.Stdout + report.Stderr = os.Stderr + var err error if reportTSIFlags.Org != "" { if report.OrgID, err = influxdb.IDFromString(reportTSIFlags.Org); err != nil { return err diff --git a/tsdb/tsi1/report.go b/tsdb/tsi1/report.go index 65977bdeff4..1ed7728a41e 100644 --- a/tsdb/tsi1/report.go +++ b/tsdb/tsi1/report.go @@ -1,19 +1,19 @@ package tsi1 import ( + "bytes" "context" "fmt" "io" "math" "os" - "path/filepath" "sort" "text/tabwriter" "time" "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/tsdb" - "go.uber.org/zap" ) const ( @@ -27,7 +27,6 @@ type ReportCommand struct { // Standard input/output, overridden for testing. Stderr io.Writer Stdout io.Writer - Logger *zap.Logger // Filters DataPath string @@ -51,7 +50,6 @@ type ReportCommand struct { // NewReportCommand returns a new instance of ReportCommand with default setting applied. func NewReportCommand() *ReportCommand { return &ReportCommand{ - Logger: zap.NewNop(), byOrgBucket: make(map[influxdb.ID]map[influxdb.ID]*cardinality), byBucketMeasurement: make(map[influxdb.ID]map[string]*cardinality), orgToBucket: make(map[influxdb.ID][]influxdb.ID), @@ -83,22 +81,17 @@ func (report *ReportCommand) Run(print bool) (*Summary, error) { report.start = time.Now() report.Stdout = os.Stdout - - if report.SeriesDirPath == "" { - report.SeriesDirPath = filepath.Join(report.DataPath, "_series") - } + report.Stderr = os.Stderr sfile := tsdb.NewSeriesFile(report.SeriesDirPath) if err := sfile.Open(context.Background()); err != nil { - report.Logger.Error("failed to open series") return nil, err } defer sfile.Close() report.sfile = sfile - indexPath := filepath.Join(report.DataPath, "index") - report.indexFile = NewIndex(sfile, NewConfig(), WithPath(indexPath)) + report.indexFile = NewIndex(sfile, NewConfig(), WithPath(report.DataPath)) if err := report.indexFile.Open(context.Background()); err != nil { return nil, err } @@ -238,6 +231,9 @@ func (report *ReportCommand) calculateMeasurementCardinalities(name []byte) erro } // measurement name should be first tag + if !bytes.Equal(tags[0].Key, models.MeasurementTagKeyBytes) { + return fmt.Errorf("corrupted data: first tag should be measurement name, got: %v", string(tags[0].Value)) + } mName := string(tags[0].Value) // update measurement-level cardinality if tracking by measurement From ad188d6465146d6c88c18ab6d0b364322a59d1b6 Mon Sep 17 00:00:00 2001 From: Max U Date: Mon, 5 Aug 2019 13:21:13 -0700 Subject: [PATCH 18/18] refactor(tsi1): remove extraneous logging --- tsdb/tsi1/report.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tsdb/tsi1/report.go b/tsdb/tsi1/report.go index 1ed7728a41e..9f459b805f3 100644 --- a/tsdb/tsi1/report.go +++ b/tsdb/tsi1/report.go @@ -6,7 +6,6 @@ import ( "fmt" "io" "math" - "os" "sort" "text/tabwriter" "time" @@ -80,9 +79,6 @@ func newSummary() *Summary { func (report *ReportCommand) Run(print bool) (*Summary, error) { report.start = time.Now() - report.Stdout = os.Stdout - report.Stderr = os.Stderr - sfile := tsdb.NewSeriesFile(report.SeriesDirPath) if err := sfile.Open(context.Background()); err != nil {