Skip to content

Commit

Permalink
fix: remove nats for scraper processing (#23107)
Browse files Browse the repository at this point in the history
* fix: remove nats for scraper processing

Scrapers now use go channels instead of NATS and interprocess communication.
This should fix #23085 .

Additionally, found and fixed #23106 .

* chore: fix formatting

* chore: fix static check and go.mod

* test: fix some flaky tests

* fix: mark NATS arguments as deprecated
  • Loading branch information
lesam authored Feb 10, 2022
1 parent f68758b commit e20b5e9
Show file tree
Hide file tree
Showing 28 changed files with 252 additions and 970 deletions.
12 changes: 6 additions & 6 deletions cmd/influxd/launcher/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,11 @@ import (
"github.com/influxdata/influxdb/v2/kit/cli"
"github.com/influxdata/influxdb/v2/kit/signals"
influxlogger "github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/nats"
"github.com/influxdata/influxdb/v2/pprof"
"github.com/influxdata/influxdb/v2/sqlite"
"github.com/influxdata/influxdb/v2/storage"
"github.com/influxdata/influxdb/v2/v1/coordinator"
"github.com/influxdata/influxdb/v2/vault"
natsserver "github.com/nats-io/gnatsd/server"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -226,8 +224,8 @@ func NewOpts(viper *viper.Viper) *InfluxdOpts {
StoreType: DiskStore,
SecretStore: BoltStore,

NatsPort: nats.RandomPort,
NatsMaxPayloadBytes: natsserver.MAX_PAYLOAD_SIZE,
NatsPort: 0,
NatsMaxPayloadBytes: 0,

NoTasks: false,

Expand Down Expand Up @@ -595,14 +593,16 @@ func (o *InfluxdOpts) BindCliOpts() []cli.Opt {
{
DestP: &o.NatsPort,
Flag: "nats-port",
Desc: fmt.Sprintf("Port that should be bound by the NATS streaming server. A value of %d will cause a random port to be selected.", nats.RandomPort),
Desc: "deprecated: nats has been replaced",
Default: o.NatsPort,
Hidden: true,
},
{
DestP: &o.NatsMaxPayloadBytes,
Flag: "nats-max-payload-bytes",
Desc: "The maximum number of bytes allowed in a NATS message payload.",
Desc: "deprecated: nats has been replaced",
Default: o.NatsMaxPayloadBytes,
Hidden: true,
},

// Pprof config
Expand Down
58 changes: 15 additions & 43 deletions cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
"github.com/influxdata/influxdb/v2/kv/migration"
"github.com/influxdata/influxdb/v2/kv/migration/all"
"github.com/influxdata/influxdb/v2/label"
"github.com/influxdata/influxdb/v2/nats"
"github.com/influxdata/influxdb/v2/notebooks"
notebookTransport "github.com/influxdata/influxdb/v2/notebooks/transport"
endpointservice "github.com/influxdata/influxdb/v2/notification/endpoint/service"
Expand Down Expand Up @@ -220,6 +219,14 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
m.log.Debug("loaded config file", zap.String("path", p))
}

if opts.NatsPort != 0 {
m.log.Warn("nats-port argument is deprecated and unused")
}

if opts.NatsMaxPayloadBytes != 0 {
m.log.Warn("nats-max-payload-bytes argument is deprecated and unused")
}

// Parse feature flags.
// These flags can be used to modify the remaining setup logic in this method.
// They will also be injected into the contexts of incoming HTTP requests at runtime,
Expand Down Expand Up @@ -281,7 +288,7 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {

secretStore, err := secret.NewStore(m.kvStore)
if err != nil {
m.log.Error("Failed creating new meta store", zap.Error(err))
m.log.Error("Failed creating new secret store", zap.Error(err))
return err
}

Expand Down Expand Up @@ -556,53 +563,18 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
telegrafSvc = telegrafservice.New(m.kvStore)
}

// NATS streaming server
natsOpts := nats.NewDefaultServerOptions()
natsOpts.Port = opts.NatsPort
natsOpts.MaxPayload = opts.NatsMaxPayloadBytes
natsServer := nats.NewServer(&natsOpts, m.log.With(zap.String("service", "nats")))
if err := natsServer.Open(); err != nil {
m.log.Error("Failed to start nats streaming server", zap.Error(err))
scraperScheduler, err := gather.NewScheduler(m.log.With(zap.String("service", "scraper")), 100, 10, scraperTargetSvc, pointsWriter, 10*time.Second)
if err != nil {
m.log.Error("Failed to create scraper subscriber", zap.Error(err))
return err
}
m.closers = append(m.closers, labeledCloser{
label: "nats",
closer: func(context.Context) error {
natsServer.Close()
label: "scraper",
closer: func(ctx context.Context) error {
scraperScheduler.Close()
return nil
},
})
// If a random port was used, the opts will be updated to include the selected value.
natsURL := fmt.Sprintf("http://127.0.0.1:%d", natsOpts.Port)
publisher := nats.NewAsyncPublisher(m.log, fmt.Sprintf("nats-publisher-%d", natsOpts.Port), natsURL)
if err := publisher.Open(); err != nil {
m.log.Error("Failed to connect to streaming server", zap.Error(err))
return err
}

// TODO(jm): this is an example of using a subscriber to consume from the channel. It should be removed.
subscriber := nats.NewQueueSubscriber(fmt.Sprintf("nats-subscriber-%d", natsOpts.Port), natsURL)
if err := subscriber.Open(); err != nil {
m.log.Error("Failed to connect to streaming server", zap.Error(err))
return err
}

subscriber.Subscribe(gather.MetricsSubject, "metrics", gather.NewRecorderHandler(m.log, gather.PointWriter{Writer: pointsWriter}))
scraperScheduler, err := gather.NewScheduler(m.log, 10, scraperTargetSvc, publisher, subscriber, 10*time.Second, 30*time.Second)
if err != nil {
m.log.Error("Failed to create scraper subscriber", zap.Error(err))
return err
}

m.wg.Add(1)
go func(log *zap.Logger) {
defer m.wg.Done()
log = log.With(zap.String("service", "scraper"))
if err := scraperScheduler.Run(ctx); err != nil {
log.Error("Failed scraper service", zap.Error(err))
}
log.Info("Stopping")
}(m.log)

var sessionSvc platform.SessionService
{
Expand Down
4 changes: 4 additions & 0 deletions cmd/influxd/launcher/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/influxdata/influxdb/v2/http"
"github.com/influxdata/influxdb/v2/tsdb"
"github.com/influxdata/influxdb/v2/v1/services/meta"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -210,6 +211,9 @@ func TestLauncher_BucketDelete(t *testing.T) {
if got, exp := engine.SeriesCardinality(ctx, l.Bucket.ID), int64(0); got != exp {
t.Fatalf("after bucket delete got %d, exp %d", got, exp)
}

databaseInfo := engine.MetaClient().Database(l.Bucket.ID.String())
assert.Nil(t, databaseInfo)
}

func TestLauncher_DeleteWithPredicate(t *testing.T) {
Expand Down
50 changes: 0 additions & 50 deletions gather/handler.go

This file was deleted.

3 changes: 1 addition & 2 deletions gather/prometheus.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package gather

import (
"context"
"crypto/tls"
"fmt"
"io"
Expand Down Expand Up @@ -32,7 +31,7 @@ func newPrometheusScraper() *prometheusScraper {
}

// Gather parse metrics from a scraper target url.
func (p *prometheusScraper) Gather(ctx context.Context, target influxdb.ScraperTarget) (collected MetricsCollection, err error) {
func (p *prometheusScraper) Gather(target influxdb.ScraperTarget) (collected MetricsCollection, err error) {
var (
resp *http.Response
)
Expand Down
59 changes: 0 additions & 59 deletions gather/recorder.go

This file was deleted.

Loading

0 comments on commit e20b5e9

Please sign in to comment.