Skip to content

Commit

Permalink
feat: mirror writes to registered replications (#22833)
Browse files Browse the repository at this point in the history
  • Loading branch information
danxmoran authored Nov 10, 2021
1 parent 5a0051a commit 6b56af3
Show file tree
Hide file tree
Showing 7 changed files with 352 additions and 54 deletions.
85 changes: 44 additions & 41 deletions cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,24 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
)
m.initTracing(opts)

// Parse feature flags.
// These flags can be used to modify the remaining setup logic in this method.
// They will also be injected into the contexts of incoming HTTP requests at runtime,
// for use in modifying behavior there.
if m.flagger == nil {
m.flagger = feature.DefaultFlagger()
if len(opts.FeatureFlags) > 0 {
f, err := overrideflagger.Make(opts.FeatureFlags, feature.ByKey)
if err != nil {
m.log.Error("Failed to configure feature flag overrides",
zap.Error(err), zap.Any("overrides", opts.FeatureFlags))
return err
}
m.log.Info("Running with feature flag overrides", zap.Any("overrides", opts.FeatureFlags))
m.flagger = f
}
}

m.reg = prom.NewRegistry(m.log.With(zap.String("service", "prom_registry")))
m.reg.MustRegister(prometheus.NewGoCollector())

Expand Down Expand Up @@ -330,6 +348,32 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
restoreService platform.RestoreService = m.engine
)

remotesSvc := remotes.NewService(m.sqlStore)
remotesServer := remotesTransport.NewInstrumentedRemotesHandler(
m.log.With(zap.String("handler", "remotes")), m.reg, remotesSvc)

replicationSvc := replications.NewService(m.sqlStore, ts, pointsWriter, m.log.With(zap.String("service", "replications")), opts.EnginePath)
replicationServer := replicationTransport.NewInstrumentedReplicationHandler(
m.log.With(zap.String("handler", "replications")), m.reg, replicationSvc)
ts.BucketService = replications.NewBucketService(
m.log.With(zap.String("service", "replication_buckets")), ts.BucketService, replicationSvc)

if feature.ReplicationStreamBackend().Enabled(ctx, m.flagger) {
if err = replicationSvc.Open(ctx); err != nil {
m.log.Error("Failed to open replications service", zap.Error(err))
return err
}

m.closers = append(m.closers, labeledCloser{
label: "replications",
closer: func(context.Context) error {
return replicationSvc.Close()
},
})

pointsWriter = replicationSvc
}

deps, err := influxdb.NewDependencies(
storageflux.NewReader(storage2.NewStore(m.engine.TSDBStore(), m.engine.MetaClient())),
m.engine,
Expand Down Expand Up @@ -554,20 +598,6 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
log.Info("Stopping")
}(m.log)

if m.flagger == nil {
m.flagger = feature.DefaultFlagger()
if len(opts.FeatureFlags) > 0 {
f, err := overrideflagger.Make(opts.FeatureFlags, feature.ByKey)
if err != nil {
m.log.Error("Failed to configure feature flag overrides",
zap.Error(err), zap.Any("overrides", opts.FeatureFlags))
return err
}
m.log.Info("Running with feature flag overrides", zap.Any("overrides", opts.FeatureFlags))
m.flagger = f
}
}

var sessionSvc platform.SessionService
{
sessionSvc = session.NewService(
Expand Down Expand Up @@ -651,33 +681,6 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
NotificationRuleFinder: notificationRuleSvc,
}

remotesSvc := remotes.NewService(m.sqlStore)
remotesServer := remotesTransport.NewInstrumentedRemotesHandler(
m.log.With(zap.String("handler", "remotes")), m.reg, remotesSvc)

replicationSvc := replications.NewService(m.sqlStore, ts, m.log.With(zap.String("service", "replications")), opts.EnginePath)
replicationServer := replicationTransport.NewInstrumentedReplicationHandler(
m.log.With(zap.String("handler", "replications")), m.reg, replicationSvc)

ts.BucketService = replications.NewBucketService(
m.log.With(zap.String("service", "replication_buckets")), ts.BucketService, replicationSvc)

replicationsFlag := feature.ReplicationStreamBackend()

if replicationsFlag.Enabled(ctx, m.flagger) {
if err = replicationSvc.Open(ctx); err != nil {
m.log.Error("Failed to open replications service", zap.Error(err))
return err
}

m.closers = append(m.closers, labeledCloser{
label: "replications",
closer: func(context.Context) error {
return replicationSvc.Close()
},
})
}

errorHandler := kithttp.NewErrorHandler(m.log.With(zap.String("handler", "error_logger")))
m.apibackend = &http.APIBackend{
AssetsPath: opts.AssetsPath,
Expand Down
16 changes: 16 additions & 0 deletions replications/internal/queue_management.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,3 +238,19 @@ func (qm *durableQueueManager) CloseAll() error {
return nil
}
}

// EnqueueData persists a set of bytes to a replication's durable queue.
func (qm *durableQueueManager) EnqueueData(replicationID platform.ID, data []byte) error {
qm.mutex.RLock()
defer qm.mutex.RUnlock()

if qm.replicationQueues[replicationID] == nil {
return fmt.Errorf("durable queue not found for replication ID %q", replicationID)
}

if err := qm.replicationQueues[replicationID].Append(data); err != nil {
return err
}

return nil
}
31 changes: 31 additions & 0 deletions replications/internal/queue_management_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,34 @@ func shutdown(t *testing.T, qm *durableQueueManager) {
emptyMap := make(map[platform.ID]*durablequeue.Queue)
qm.replicationQueues = emptyMap
}

func TestEnqueueData(t *testing.T) {
t.Parallel()

queuePath, err := os.MkdirTemp("", "testqueue")
require.NoError(t, err)
defer os.RemoveAll(queuePath)

logger := zaptest.NewLogger(t)
qm := NewDurableQueueManager(logger, queuePath)

require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes))
require.DirExists(t, filepath.Join(queuePath, id1.String()))

sizes, err := qm.CurrentQueueSizes([]platform.ID{id1})
require.NoError(t, err)
// Empty queues are 8 bytes for the footer.
require.Equal(t, map[platform.ID]int64{id1: 8}, sizes)

data := "some fake data"

require.NoError(t, qm.EnqueueData(id1, []byte(data)))
sizes, err = qm.CurrentQueueSizes([]platform.ID{id1})
require.NoError(t, err)
require.Greater(t, sizes[id1], int64(8))

written, err := qm.replicationQueues[id1].Current()
require.NoError(t, err)

require.Equal(t, data, string(written))
}
51 changes: 51 additions & 0 deletions replications/mock/points_writer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 26 additions & 12 deletions replications/mock/queue_management.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 6b56af3

Please sign in to comment.