Skip to content
This repository has been archived by the owner on Mar 25, 2022. It is now read-only.

[WIP] performance improvements for high cardinality #7

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 33 additions & 5 deletions cmd/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,22 @@ func insertRun(cmd *cobra.Command, args []string) {
return
}

pts := point.NewPoints(seriesKey, fieldStr, seriesN, lineprotocol.Nanosecond)

// pts := point.NewPoints(seriesKey, fieldStr, seriesN, lineprotocol.Nanosecond)
sink := newResultSink(int(concurrency))

var wg sync.WaitGroup
wg.Add(int(concurrency))

var totalWritten uint64

var ptsChans []chan []lineprotocol.Point

start := time.Now()
for i := uint64(0); i < concurrency; i++ {
go func() {
ch := make(chan []lineprotocol.Point)
ptsChans = append(ptsChans, ch)

go func(pc chan []lineprotocol.Point) {
tick := time.Tick(time.Second)

if fast {
Expand All @@ -107,13 +111,37 @@ func insertRun(cmd *cobra.Command, args []string) {
}

// Ignore duration from a single call to Write.
pointsWritten, _ := stress.Write(pts, c, cfg)
pointsWritten, _ := stress.Write(pc, c, cfg)
atomic.AddUint64(&totalWritten, pointsWritten)

wg.Done()
}()
}(ch)
}

// This goroutine pulls batches from the generators and multiplexes them
// onto each writer, so that the correct number of series/points are
// written.
go func() {
generator := point.NewPointGenerator(seriesKey, fieldStr, seriesN, lineprotocol.Nanosecond)

for {
// Next batch of points
pts := generator.Next()
// Multiplex them out to all writers.
for _, c := range ptsChans {
if len(pts) == 0 {
close(c)
continue
}
c <- pts
}

if len(pts) == 0 {
return
}
}
}()

wg.Wait()
totalTime := time.Since(start)
if err := c.Close(); err != nil {
Expand Down
102 changes: 102 additions & 0 deletions point/point.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package point

import (
"fmt"
"sync/atomic"
"time"

Expand Down Expand Up @@ -82,6 +83,107 @@ func (p *point) Update() {
}
}

// SeriesGenerator generates series to be written with influx-stress.
type SeriesGenerator struct {
totalCardinality int
currentCardinality int

template string
tagCardinalities []int

batch []string
idx int
}

func NewSeriesGenerator(tmplt string, cardinality int) *SeriesGenerator {
fmt.Printf("Generating %d cardinality\n", cardinality)
s := &SeriesGenerator{}
s.totalCardinality = cardinality
if cardinality < 100000 {
s.batch = make([]string, 0, cardinality)
} else {
s.batch = make([]string, 0, 100000)
}

var numTags int
s.template, numTags = formatTemplate(tmplt)
s.tagCardinalities = tagCardinalityPartition(numTags, primeFactorization(cardinality))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this call not problematic with 100M series?

return s
}

// Next returns the next series to be generated.
func (s *SeriesGenerator) Next() (string, bool) {
if s.currentCardinality == s.totalCardinality && s.idx == len(s.batch) {
// We have generated all series and written the last batch.
return "", false
}

if s.idx < len(s.batch) {
next := s.batch[s.idx]
s.idx++
return next, true
}

// Need to generate a new batch.
s.batch, s.idx = s.batch[:0], 1
for i := 0; i < cap(s.batch) && s.currentCardinality < s.totalCardinality; i++ {
s.batch = append(s.batch, fmt.Sprintf(s.template, sliceMod(s.currentCardinality, s.tagCardinalities)...))
s.currentCardinality++
}

// Return the first series in the batch.
if len(s.batch) == 0 {
return "", false
}
return s.batch[0], true
}

type PointGenerator struct {
SeriesGenerator *SeriesGenerator

// TODO(edd) make generator.
IntFields []string
// TODO(edd) make generator.
FloatFields []string

Precision lineprotocol.Precision
batch []lineprotocol.Point
}

// NewPointGenerator creates a new generator for emitting points. Points are
// created in batches of 20x the number of series created in a SeriesGenerator's
// batch, up to a maximum batch of 500K points.
func NewPointGenerator(seriesKey, fields string, seriesN int, pc lineprotocol.Precision) *PointGenerator {
p := PointGenerator{
SeriesGenerator: NewSeriesGenerator(seriesKey, seriesN),
}
p.IntFields, p.FloatFields = generateFieldSet(fields)

sz := cap(p.SeriesGenerator.batch) * 20
if sz > 500000 {
sz = 500000
}
p.batch = make([]lineprotocol.Point, 0, sz)

return &p
}

// Next returns a batch of points. A nil batch indicates there are no more
// points in the generator.
func (p *PointGenerator) Next() []lineprotocol.Point {
// Generate a new batch. Fill the batch with points from the series
// generator.
p.batch = p.batch[:0]
for i := 0; i < cap(p.batch); i++ {
series, ok := p.SeriesGenerator.Next()
if !ok {
break
}
p.batch = append(p.batch, New([]byte(series), p.IntFields, p.FloatFields, p.Precision))
}
return p.batch
}

// NewPoints returns a slice of Points of length seriesN shaped like the given seriesKey.
func NewPoints(seriesKey, fields string, seriesN int, pc lineprotocol.Precision) []lineprotocol.Point {
pts := []lineprotocol.Point{}
Expand Down
4 changes: 3 additions & 1 deletion stress/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type WriteConfig struct {
// to write data to the target until one of the following conditions is met.
// 1. We reach that MaxPoints specified in the WriteConfig.
// 2. We've passed the Deadline specified in the WriteConfig.
func Write(pts []lineprotocol.Point, c write.Client, cfg WriteConfig) (uint64, time.Duration) {
func Write(pc <-chan []lineprotocol.Point, c write.Client, cfg WriteConfig) (uint64, time.Duration) {
if cfg.Results == nil {
panic("Results Channel on WriteConfig cannot be nil")
}
Expand Down Expand Up @@ -71,6 +71,8 @@ WRITE_BATCHES:
break
}

// Get next batch of points from the generator.
pts := <-pc
for _, pt := range pts {
pointCount++
pt.SetTime(t)
Expand Down