diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index 0eae7fa6a17..348f76d3ac4 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -216,6 +216,24 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) { ) m.initTracing(opts) + // Parse feature flags. + // These flags can be used to modify the remaining setup logic in this method. + // They will also be injected into the contexts of incoming HTTP requests at runtime, + // for use in modifying behavior there. + if m.flagger == nil { + m.flagger = feature.DefaultFlagger() + if len(opts.FeatureFlags) > 0 { + f, err := overrideflagger.Make(opts.FeatureFlags, feature.ByKey) + if err != nil { + m.log.Error("Failed to configure feature flag overrides", + zap.Error(err), zap.Any("overrides", opts.FeatureFlags)) + return err + } + m.log.Info("Running with feature flag overrides", zap.Any("overrides", opts.FeatureFlags)) + m.flagger = f + } + } + m.reg = prom.NewRegistry(m.log.With(zap.String("service", "prom_registry"))) m.reg.MustRegister(prometheus.NewGoCollector()) @@ -330,6 +348,32 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) { restoreService platform.RestoreService = m.engine ) + remotesSvc := remotes.NewService(m.sqlStore) + remotesServer := remotesTransport.NewInstrumentedRemotesHandler( + m.log.With(zap.String("handler", "remotes")), m.reg, remotesSvc) + + replicationSvc := replications.NewService(m.sqlStore, ts, pointsWriter, m.log.With(zap.String("service", "replications")), opts.EnginePath) + replicationServer := replicationTransport.NewInstrumentedReplicationHandler( + m.log.With(zap.String("handler", "replications")), m.reg, replicationSvc) + ts.BucketService = replications.NewBucketService( + m.log.With(zap.String("service", "replication_buckets")), ts.BucketService, replicationSvc) + + if feature.ReplicationStreamBackend().Enabled(ctx, m.flagger) { + if err = replicationSvc.Open(ctx); err != nil { + m.log.Error("Failed to open replications service", zap.Error(err)) + return err + } + + m.closers = append(m.closers, labeledCloser{ + label: "replications", + closer: func(context.Context) error { + return replicationSvc.Close() + }, + }) + + pointsWriter = replicationSvc + } + deps, err := influxdb.NewDependencies( storageflux.NewReader(storage2.NewStore(m.engine.TSDBStore(), m.engine.MetaClient())), m.engine, @@ -554,20 +598,6 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) { log.Info("Stopping") }(m.log) - if m.flagger == nil { - m.flagger = feature.DefaultFlagger() - if len(opts.FeatureFlags) > 0 { - f, err := overrideflagger.Make(opts.FeatureFlags, feature.ByKey) - if err != nil { - m.log.Error("Failed to configure feature flag overrides", - zap.Error(err), zap.Any("overrides", opts.FeatureFlags)) - return err - } - m.log.Info("Running with feature flag overrides", zap.Any("overrides", opts.FeatureFlags)) - m.flagger = f - } - } - var sessionSvc platform.SessionService { sessionSvc = session.NewService( @@ -651,33 +681,6 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) { NotificationRuleFinder: notificationRuleSvc, } - remotesSvc := remotes.NewService(m.sqlStore) - remotesServer := remotesTransport.NewInstrumentedRemotesHandler( - m.log.With(zap.String("handler", "remotes")), m.reg, remotesSvc) - - replicationSvc := replications.NewService(m.sqlStore, ts, m.log.With(zap.String("service", "replications")), opts.EnginePath) - replicationServer := replicationTransport.NewInstrumentedReplicationHandler( - m.log.With(zap.String("handler", "replications")), m.reg, replicationSvc) - - ts.BucketService = replications.NewBucketService( - m.log.With(zap.String("service", "replication_buckets")), ts.BucketService, replicationSvc) - - replicationsFlag := feature.ReplicationStreamBackend() - - if replicationsFlag.Enabled(ctx, m.flagger) { - if err = replicationSvc.Open(ctx); err != nil { - m.log.Error("Failed to open replications service", zap.Error(err)) - return err - } - - m.closers = append(m.closers, labeledCloser{ - label: "replications", - closer: func(context.Context) error { - return replicationSvc.Close() - }, - }) - } - errorHandler := kithttp.NewErrorHandler(m.log.With(zap.String("handler", "error_logger"))) m.apibackend = &http.APIBackend{ AssetsPath: opts.AssetsPath, diff --git a/replications/internal/queue_management.go b/replications/internal/queue_management.go index 4f7ab82c5cf..fd2e27bbd68 100644 --- a/replications/internal/queue_management.go +++ b/replications/internal/queue_management.go @@ -238,3 +238,19 @@ func (qm *durableQueueManager) CloseAll() error { return nil } } + +// EnqueueData persists a set of bytes to a replication's durable queue. +func (qm *durableQueueManager) EnqueueData(replicationID platform.ID, data []byte) error { + qm.mutex.RLock() + defer qm.mutex.RUnlock() + + if qm.replicationQueues[replicationID] == nil { + return fmt.Errorf("durable queue not found for replication ID %q", replicationID) + } + + if err := qm.replicationQueues[replicationID].Append(data); err != nil { + return err + } + + return nil +} diff --git a/replications/internal/queue_management_test.go b/replications/internal/queue_management_test.go index 851f6250352..5edc7c90562 100644 --- a/replications/internal/queue_management_test.go +++ b/replications/internal/queue_management_test.go @@ -249,3 +249,34 @@ func shutdown(t *testing.T, qm *durableQueueManager) { emptyMap := make(map[platform.ID]*durablequeue.Queue) qm.replicationQueues = emptyMap } + +func TestEnqueueData(t *testing.T) { + t.Parallel() + + queuePath, err := os.MkdirTemp("", "testqueue") + require.NoError(t, err) + defer os.RemoveAll(queuePath) + + logger := zaptest.NewLogger(t) + qm := NewDurableQueueManager(logger, queuePath) + + require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes)) + require.DirExists(t, filepath.Join(queuePath, id1.String())) + + sizes, err := qm.CurrentQueueSizes([]platform.ID{id1}) + require.NoError(t, err) + // Empty queues are 8 bytes for the footer. + require.Equal(t, map[platform.ID]int64{id1: 8}, sizes) + + data := "some fake data" + + require.NoError(t, qm.EnqueueData(id1, []byte(data))) + sizes, err = qm.CurrentQueueSizes([]platform.ID{id1}) + require.NoError(t, err) + require.Greater(t, sizes[id1], int64(8)) + + written, err := qm.replicationQueues[id1].Current() + require.NoError(t, err) + + require.Equal(t, data, string(written)) +} diff --git a/replications/mock/points_writer.go b/replications/mock/points_writer.go new file mode 100644 index 00000000000..6397c1bd967 --- /dev/null +++ b/replications/mock/points_writer.go @@ -0,0 +1,51 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/influxdata/influxdb/v2/storage (interfaces: PointsWriter) + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + platform "github.com/influxdata/influxdb/v2/kit/platform" + models "github.com/influxdata/influxdb/v2/models" +) + +// MockPointsWriter is a mock of PointsWriter interface. +type MockPointsWriter struct { + ctrl *gomock.Controller + recorder *MockPointsWriterMockRecorder +} + +// MockPointsWriterMockRecorder is the mock recorder for MockPointsWriter. +type MockPointsWriterMockRecorder struct { + mock *MockPointsWriter +} + +// NewMockPointsWriter creates a new mock instance. +func NewMockPointsWriter(ctrl *gomock.Controller) *MockPointsWriter { + mock := &MockPointsWriter{ctrl: ctrl} + mock.recorder = &MockPointsWriterMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPointsWriter) EXPECT() *MockPointsWriterMockRecorder { + return m.recorder +} + +// WritePoints mocks base method. +func (m *MockPointsWriter) WritePoints(arg0 context.Context, arg1, arg2 platform.ID, arg3 []models.Point) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WritePoints", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(error) + return ret0 +} + +// WritePoints indicates an expected call of WritePoints. +func (mr *MockPointsWriterMockRecorder) WritePoints(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WritePoints", reflect.TypeOf((*MockPointsWriter)(nil).WritePoints), arg0, arg1, arg2, arg3) +} diff --git a/replications/mock/queue_management.go b/replications/mock/queue_management.go index 79581c0153b..dcfae94989f 100644 --- a/replications/mock/queue_management.go +++ b/replications/mock/queue_management.go @@ -34,6 +34,20 @@ func (m *MockDurableQueueManager) EXPECT() *MockDurableQueueManagerMockRecorder return m.recorder } +// CloseAll mocks base method. +func (m *MockDurableQueueManager) CloseAll() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CloseAll") + ret0, _ := ret[0].(error) + return ret0 +} + +// CloseAll indicates an expected call of CloseAll. +func (mr *MockDurableQueueManagerMockRecorder) CloseAll() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseAll", reflect.TypeOf((*MockDurableQueueManager)(nil).CloseAll)) +} + // CurrentQueueSizes mocks base method. func (m *MockDurableQueueManager) CurrentQueueSizes(arg0 []platform.ID) (map[platform.ID]int64, error) { m.ctrl.T.Helper() @@ -49,32 +63,32 @@ func (mr *MockDurableQueueManagerMockRecorder) CurrentQueueSizes(arg0 interface{ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CurrentQueueSizes", reflect.TypeOf((*MockDurableQueueManager)(nil).CurrentQueueSizes), arg0) } -// CloseAll mocks base method. -func (m *MockDurableQueueManager) CloseAll() error { +// DeleteQueue mocks base method. +func (m *MockDurableQueueManager) DeleteQueue(arg0 platform.ID) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CloseAll") + ret := m.ctrl.Call(m, "DeleteQueue", arg0) ret0, _ := ret[0].(error) return ret0 } -// CloseAll indicates an expected call of CloseAll. -func (mr *MockDurableQueueManagerMockRecorder) CloseAll() *gomock.Call { +// DeleteQueue indicates an expected call of DeleteQueue. +func (mr *MockDurableQueueManagerMockRecorder) DeleteQueue(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseAll", reflect.TypeOf((*MockDurableQueueManager)(nil).CloseAll)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteQueue", reflect.TypeOf((*MockDurableQueueManager)(nil).DeleteQueue), arg0) } -// DeleteQueue mocks base method. -func (m *MockDurableQueueManager) DeleteQueue(arg0 platform.ID) error { +// EnqueueData mocks base method. +func (m *MockDurableQueueManager) EnqueueData(arg0 platform.ID, arg1 []byte) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DeleteQueue", arg0) + ret := m.ctrl.Call(m, "EnqueueData", arg0, arg1) ret0, _ := ret[0].(error) return ret0 } -// DeleteQueue indicates an expected call of DeleteQueue. -func (mr *MockDurableQueueManagerMockRecorder) DeleteQueue(arg0 interface{}) *gomock.Call { +// EnqueueData indicates an expected call of EnqueueData. +func (mr *MockDurableQueueManagerMockRecorder) EnqueueData(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteQueue", reflect.TypeOf((*MockDurableQueueManager)(nil).DeleteQueue), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnqueueData", reflect.TypeOf((*MockDurableQueueManager)(nil).EnqueueData), arg0, arg1) } // InitializeQueue mocks base method. diff --git a/replications/service.go b/replications/service.go index 106e470f8c8..ebf20b98962 100644 --- a/replications/service.go +++ b/replications/service.go @@ -1,21 +1,27 @@ package replications import ( + "bytes" + "compress/gzip" "context" "database/sql" "errors" "fmt" "path/filepath" + "sync" sq "github.com/Masterminds/squirrel" "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/kit/platform" ierrors "github.com/influxdata/influxdb/v2/kit/platform/errors" + "github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxdb/v2/replications/internal" "github.com/influxdata/influxdb/v2/snowflake" "github.com/influxdata/influxdb/v2/sqlite" + "github.com/influxdata/influxdb/v2/storage" "github.com/mattn/go-sqlite3" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) var errReplicationNotFound = &ierrors.Error{ @@ -39,11 +45,12 @@ func errLocalBucketNotFound(id platform.ID, cause error) error { } } -func NewService(store *sqlite.SqlStore, bktSvc BucketService, log *zap.Logger, enginePath string) *service { +func NewService(store *sqlite.SqlStore, bktSvc BucketService, localWriter storage.PointsWriter, log *zap.Logger, enginePath string) *service { return &service{ store: store, idGenerator: snowflake.NewIDGenerator(), bucketService: bktSvc, + localWriter: localWriter, validator: internal.NewValidator(), log: log, durableQueueManager: internal.NewDurableQueueManager( @@ -70,6 +77,7 @@ type DurableQueueManager interface { CurrentQueueSizes(ids []platform.ID) (map[platform.ID]int64, error) StartReplicationQueues(trackedReplications map[platform.ID]int64) error CloseAll() error + EnqueueData(replicationID platform.ID, data []byte) error } type service struct { @@ -78,6 +86,7 @@ type service struct { bucketService BucketService validator ReplicationValidator durableQueueManager DurableQueueManager + localWriter storage.PointsWriter log *zap.Logger } @@ -394,6 +403,68 @@ func (s service) ValidateReplication(ctx context.Context, id platform.ID) error return nil } +func (s service) WritePoints(ctx context.Context, orgID platform.ID, bucketID platform.ID, points []models.Point) error { + q := sq.Select("id").From("replications").Where(sq.Eq{"org_id": orgID, "local_bucket_id": bucketID}) + query, args, err := q.ToSql() + if err != nil { + return err + } + + var ids []platform.ID + if err := s.store.DB.SelectContext(ctx, &ids, query, args...); err != nil { + return err + } + + // If there are no registered replications, all we need to do is a local write. + if len(ids) == 0 { + return s.localWriter.WritePoints(ctx, orgID, bucketID, points) + } + + // Concurrently... + var egroup errgroup.Group + var buf bytes.Buffer + gzw := gzip.NewWriter(&buf) + + // 1. Write points to local TSM + egroup.Go(func() error { + return s.localWriter.WritePoints(ctx, orgID, bucketID, points) + }) + // 2. Serialize points to gzipped line protocol, to be enqueued for replication if the local write succeeds. + // We gzip the LP to take up less room on disk. On the other end of the queue, we can send the gzip data + // directly to the remote API without needing to decompress it. + egroup.Go(func() error { + for _, p := range points { + if _, err := gzw.Write(append([]byte(p.PrecisionString("ns")), '\n')); err != nil { + _ = gzw.Close() + return fmt.Errorf("failed to serialize points for replication: %w", err) + } + } + if err := gzw.Close(); err != nil { + return err + } + return nil + }) + + if err := egroup.Wait(); err != nil { + return err + } + + // Enqueue the data into all registered replications. + var wg sync.WaitGroup + wg.Add(len(ids)) + for _, id := range ids { + go func(id platform.ID) { + defer wg.Done() + if err := s.durableQueueManager.EnqueueData(id, buf.Bytes()); err != nil { + s.log.Error("Failed to enqueue points for replication", zap.String("id", id.String()), zap.Error(err)) + } + }(id) + } + wg.Wait() + + return nil +} + func (s service) getFullHTTPConfig(ctx context.Context, id platform.ID) (*internal.ReplicationHTTPConfig, error) { q := sq.Select("c.remote_url", "c.remote_api_token", "c.remote_org_id", "c.allow_insecure_tls", "r.remote_bucket_id"). From("replications r").InnerJoin("remotes c ON r.remote_id = c.id AND r.id = ?", id) diff --git a/replications/service_test.go b/replications/service_test.go index 1aea802f4f0..04a8d0f24ce 100644 --- a/replications/service_test.go +++ b/replications/service_test.go @@ -1,6 +1,8 @@ package replications import ( + "bytes" + "compress/gzip" "context" "errors" "fmt" @@ -11,6 +13,7 @@ import ( "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/kit/platform" "github.com/influxdata/influxdb/v2/mock" + "github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxdb/v2/replications/internal" replicationsMock "github.com/influxdata/influxdb/v2/replications/mock" "github.com/influxdata/influxdb/v2/sqlite" @@ -22,6 +25,7 @@ import ( //go:generate go run github.com/golang/mock/mockgen -package mock -destination ./mock/validator.go github.com/influxdata/influxdb/v2/replications ReplicationValidator //go:generate go run github.com/golang/mock/mockgen -package mock -destination ./mock/bucket_service.go github.com/influxdata/influxdb/v2/replications BucketService //go:generate go run github.com/golang/mock/mockgen -package mock -destination ./mock/queue_management.go github.com/influxdata/influxdb/v2/replications DurableQueueManager +//go:generate go run github.com/golang/mock/mockgen -package mock -destination ./mock/points_writer.go github.com/influxdata/influxdb/v2/storage PointsWriter var ( ctx = context.Background() @@ -598,10 +602,116 @@ func TestListReplications(t *testing.T) { }) } +func TestWritePoints(t *testing.T) { + t.Parallel() + + svc, mocks, clean := newTestService(t) + defer clean(t) + + // Register a handful of replications. + createReq2, createReq3 := createReq, createReq + createReq2.Name, createReq3.Name = "test2", "test3" + createReq2.LocalBucketID = platform.ID(77777) + createReq3.RemoteID = updatedReplication.RemoteID + mocks.bucketSvc.EXPECT().RLock().Times(3) + mocks.bucketSvc.EXPECT().RUnlock().Times(3) + mocks.bucketSvc.EXPECT().FindBucketByID(gomock.Any(), createReq.LocalBucketID).Return(&influxdb.Bucket{}, nil).Times(2) + mocks.bucketSvc.EXPECT().FindBucketByID(gomock.Any(), createReq2.LocalBucketID).Return(&influxdb.Bucket{}, nil) + insertRemote(t, svc.store, createReq.RemoteID) + insertRemote(t, svc.store, createReq3.RemoteID) + + for _, req := range []influxdb.CreateReplicationRequest{createReq, createReq2, createReq3} { + mocks.durableQueueManager.EXPECT().InitializeQueue(gomock.Any(), req.MaxQueueSizeBytes) + _, err := svc.CreateReplication(ctx, req) + require.NoError(t, err) + } + + points, err := models.ParsePointsString(` +cpu,host=0 value=1.1 6000000000 +cpu,host=A value=1.2 2000000000 +cpu,host=A value=1.3 3000000000 +cpu,host=B value=1.3 4000000000 +cpu,host=B value=1.3 5000000000 +cpu,host=C value=1.3 1000000000 +mem,host=C value=1.3 1000000000 +disk,host=C value=1.3 1000000000`) + require.NoError(t, err) + + // Points should successfully write to local TSM. + mocks.pointWriter.EXPECT().WritePoints(gomock.Any(), replication.OrgID, replication.LocalBucketID, points).Return(nil) + + // Points should successfully be enqueued in the 2 replications associated with the local bucket. + for _, id := range []platform.ID{initID, initID + 2} { + mocks.durableQueueManager.EXPECT(). + EnqueueData(id, gomock.Any()). + DoAndReturn(func(_ platform.ID, data []byte) error { + gzBuf := bytes.NewBuffer(data) + gzr, err := gzip.NewReader(gzBuf) + require.NoError(t, err) + defer gzr.Close() + + var buf bytes.Buffer + _, err = buf.ReadFrom(gzr) + require.NoError(t, err) + require.NoError(t, gzr.Close()) + + writtenPoints, err := models.ParsePoints(buf.Bytes()) + require.NoError(t, err) + require.ElementsMatch(t, writtenPoints, points) + return nil + }) + } + + require.NoError(t, svc.WritePoints(ctx, replication.OrgID, replication.LocalBucketID, points)) +} + +func TestWritePoints_LocalFailure(t *testing.T) { + t.Parallel() + + svc, mocks, clean := newTestService(t) + defer clean(t) + + // Register a handful of replications. + createReq2, createReq3 := createReq, createReq + createReq2.Name, createReq3.Name = "test2", "test3" + createReq2.LocalBucketID = platform.ID(77777) + createReq3.RemoteID = updatedReplication.RemoteID + mocks.bucketSvc.EXPECT().RLock().Times(3) + mocks.bucketSvc.EXPECT().RUnlock().Times(3) + mocks.bucketSvc.EXPECT().FindBucketByID(gomock.Any(), createReq.LocalBucketID).Return(&influxdb.Bucket{}, nil).Times(2) + mocks.bucketSvc.EXPECT().FindBucketByID(gomock.Any(), createReq2.LocalBucketID).Return(&influxdb.Bucket{}, nil) + insertRemote(t, svc.store, createReq.RemoteID) + insertRemote(t, svc.store, createReq3.RemoteID) + + for _, req := range []influxdb.CreateReplicationRequest{createReq, createReq2, createReq3} { + mocks.durableQueueManager.EXPECT().InitializeQueue(gomock.Any(), req.MaxQueueSizeBytes) + _, err := svc.CreateReplication(ctx, req) + require.NoError(t, err) + } + + points, err := models.ParsePointsString(` +cpu,host=0 value=1.1 6000000000 +cpu,host=A value=1.2 2000000000 +cpu,host=A value=1.3 3000000000 +cpu,host=B value=1.3 4000000000 +cpu,host=B value=1.3 5000000000 +cpu,host=C value=1.3 1000000000 +mem,host=C value=1.3 1000000000 +disk,host=C value=1.3 1000000000`) + require.NoError(t, err) + + // Points should fail to write to local TSM. + writeErr := errors.New("O NO") + mocks.pointWriter.EXPECT().WritePoints(gomock.Any(), replication.OrgID, replication.LocalBucketID, points).Return(writeErr) + // Don't expect any calls to enqueue points. + require.Equal(t, writeErr, svc.WritePoints(ctx, replication.OrgID, replication.LocalBucketID, points)) +} + type mocks struct { bucketSvc *replicationsMock.MockBucketService validator *replicationsMock.MockReplicationValidator durableQueueManager *replicationsMock.MockDurableQueueManager + pointWriter *replicationsMock.MockPointsWriter } func newTestService(t *testing.T) (*service, mocks, func(t *testing.T)) { @@ -619,6 +729,7 @@ func newTestService(t *testing.T) (*service, mocks, func(t *testing.T)) { bucketSvc: replicationsMock.NewMockBucketService(ctrl), validator: replicationsMock.NewMockReplicationValidator(ctrl), durableQueueManager: replicationsMock.NewMockDurableQueueManager(ctrl), + pointWriter: replicationsMock.NewMockPointsWriter(ctrl), } svc := service{ store: store, @@ -627,6 +738,7 @@ func newTestService(t *testing.T) (*service, mocks, func(t *testing.T)) { validator: mocks.validator, log: logger, durableQueueManager: mocks.durableQueueManager, + localWriter: mocks.pointWriter, } return &svc, mocks, clean