Skip to content

Commit

Permalink
Fix imports of multiple databases in a single import file from `influ…
Browse files Browse the repository at this point in the history
…x -import`

If multiple databases were specified, then the earlier writes would be
written to the later database and/or retention policy because points
were only written out after the batch size was reached. This forces the
batcher to flush whenever the database context switches.
  • Loading branch information
jsternberg committed Feb 9, 2018
1 parent ca7cc02 commit c58ca8d
Showing 1 changed file with 21 additions and 11 deletions.
32 changes: 21 additions & 11 deletions importer/v8/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Importer struct {
failedInserts int
totalCommands int
throttlePointsWritten int
startTime time.Time
lastWrite time.Time
throttle *time.Ticker

Expand Down Expand Up @@ -167,7 +168,7 @@ func (i *Importer) processDDL(scanner *bufio.Reader) error {
}

func (i *Importer) processDML(scanner *bufio.Reader) error {
start := time.Now()
i.startTime = time.Now()
for {
line, err := scanner.ReadString(byte('\n'))
if err != nil && err != io.EOF {
Expand All @@ -178,9 +179,11 @@ func (i *Importer) processDML(scanner *bufio.Reader) error {
return nil
}
if strings.HasPrefix(line, "# CONTEXT-DATABASE:") {
i.batchWrite()
i.database = strings.TrimSpace(strings.Split(line, ":")[1])
}
if strings.HasPrefix(line, "# CONTEXT-RETENTION-POLICY:") {
i.batchWrite()
i.retentionPolicy = strings.TrimSpace(strings.Split(line, ":")[1])
}
if strings.HasPrefix(line, "#") {
Expand All @@ -190,7 +193,7 @@ func (i *Importer) processDML(scanner *bufio.Reader) error {
if strings.TrimSpace(line) == "" {
continue
}
i.batchAccumulator(line, start)
i.batchAccumulator(line)
}
}

Expand All @@ -210,22 +213,19 @@ func (i *Importer) queryExecutor(command string) {
i.execute(command)
}

func (i *Importer) batchAccumulator(line string, start time.Time) {
func (i *Importer) batchAccumulator(line string) {
i.batch = append(i.batch, line)
if len(i.batch) == batchSize {
i.batchWrite()
i.batch = i.batch[:0]
// Give some status feedback every 100000 lines processed
processed := i.totalInserts + i.failedInserts
if processed%100000 == 0 {
since := time.Since(start)
pps := float64(processed) / since.Seconds()
i.stdoutLogger.Printf("Processed %d lines. Time elapsed: %s. Points per second (PPS): %d", processed, since.String(), int64(pps))
}
}
}

func (i *Importer) batchWrite() {
// Exit early if there are no points in the batch.
if len(i.batch) == 0 {
return
}

// Accumulate the batch size to see how many points we have written this second
i.throttlePointsWritten += len(i.batch)

Expand Down Expand Up @@ -261,4 +261,14 @@ func (i *Importer) batchWrite() {
}
i.throttlePointsWritten = 0
i.lastWrite = time.Now()

// Clear the batch and record the number of processed points.
i.batch = i.batch[:0]
// Give some status feedback every 100000 lines processed
processed := i.totalInserts + i.failedInserts
if processed%100000 == 0 {
since := time.Since(i.startTime)
pps := float64(processed) / since.Seconds()
i.stdoutLogger.Printf("Processed %d lines. Time elapsed: %s. Points per second (PPS): %d", processed, since.String(), int64(pps))
}
}

0 comments on commit c58ca8d

Please sign in to comment.