diff --git a/src/aggregator/aggregator/map.go b/src/aggregator/aggregator/map.go index 130c20d680..6d8139390c 100644 --- a/src/aggregator/aggregator/map.go +++ b/src/aggregator/aggregator/map.go @@ -35,7 +35,7 @@ import ( "github.com/m3db/m3/src/metrics/metric/aggregated" "github.com/m3db/m3/src/metrics/metric/unaggregated" "github.com/m3db/m3/src/x/clock" - "github.com/m3db/m3/src/x/close" + xresource "github.com/m3db/m3/src/x/resource" "github.com/uber-go/tally" ) @@ -108,7 +108,7 @@ type metricMap struct { firstInsertAt time.Time rateLimiter *rate.Limiter runtimeOpts runtime.Options - runtimeOptsCloser close.SimpleCloser + runtimeOptsCloser xresource.SimpleCloser sleepFn sleepFn metrics metricMapMetrics } diff --git a/src/aggregator/runtime/options_manager.go b/src/aggregator/runtime/options_manager.go index 08c26060f2..b2182d81a0 100644 --- a/src/aggregator/runtime/options_manager.go +++ b/src/aggregator/runtime/options_manager.go @@ -21,7 +21,7 @@ package runtime import ( - "github.com/m3db/m3/src/x/close" + xresource "github.com/m3db/m3/src/x/resource" "github.com/m3db/m3/src/x/watch" ) @@ -36,7 +36,7 @@ type OptionsManager interface { // RegisterWatcher registers a watcher that watches updates to runtime options. // When an update arrives, the manager will deliver the update to all registered // watchers. - RegisterWatcher(l OptionsWatcher) close.SimpleCloser + RegisterWatcher(l OptionsWatcher) xresource.SimpleCloser // Close closes the watcher and all descendent watches Close() @@ -72,7 +72,7 @@ func (w *optionsManager) RuntimeOptions() Options { func (w *optionsManager) RegisterWatcher( watcher OptionsWatcher, -) close.SimpleCloser { +) xresource.SimpleCloser { _, watch, _ := w.watchable.Watch() // The watchable is always initialized so it's okay to do a blocking read. diff --git a/src/dbnode/client/connection_pool.go b/src/dbnode/client/connection_pool.go index e8db551890..4d1e1af1d2 100644 --- a/src/dbnode/client/connection_pool.go +++ b/src/dbnode/client/connection_pool.go @@ -31,7 +31,7 @@ import ( "github.com/m3db/m3/src/dbnode/generated/thrift/rpc" "github.com/m3db/m3/src/dbnode/topology" - xclose "github.com/m3db/m3/src/x/close" + xresource "github.com/m3db/m3/src/x/resource" murmur3 "github.com/m3db/stackmurmur3/v2" "github.com/uber-go/tally" @@ -68,14 +68,14 @@ type connPool struct { } type conn struct { - channel xclose.SimpleCloser + channel xresource.SimpleCloser client rpc.TChanNode } // NewConnectionFn is a function that creates a connection. type NewConnectionFn func( channelName string, addr string, opts Options, -) (xclose.SimpleCloser, rpc.TChanNode, error) +) (xresource.SimpleCloser, rpc.TChanNode, error) type healthCheckFn func(client rpc.TChanNode, opts Options) error diff --git a/src/dbnode/client/connection_pool_test.go b/src/dbnode/client/connection_pool_test.go index 0724171f6f..f4d391180c 100644 --- a/src/dbnode/client/connection_pool_test.go +++ b/src/dbnode/client/connection_pool_test.go @@ -30,7 +30,7 @@ import ( "github.com/m3db/m3/src/dbnode/generated/thrift/rpc" "github.com/m3db/m3/src/dbnode/topology" xclock "github.com/m3db/m3/src/x/clock" - xclose "github.com/m3db/m3/src/x/close" + xresource "github.com/m3db/m3/src/x/resource" "github.com/stretchr/testify/require" "github.com/golang/mock/gomock" @@ -85,7 +85,7 @@ func TestConnectionPoolConnectsAndRetriesConnects(t *testing.T) { fn := func( ch string, addr string, opts Options, - ) (xclose.SimpleCloser, rpc.TChanNode, error) { + ) (xresource.SimpleCloser, rpc.TChanNode, error) { attempt := int(atomic.AddInt32(&attempts, 1)) if attempt == 1 { return nil, nil, fmt.Errorf("a connect error") @@ -237,7 +237,7 @@ func TestConnectionPoolHealthChecks(t *testing.T) { fn := func( ch string, addr string, opts Options, - ) (xclose.SimpleCloser, rpc.TChanNode, error) { + ) (xresource.SimpleCloser, rpc.TChanNode, error) { attempt := atomic.AddInt32(&newConnAttempt, 1) if attempt == 1 { return channelNone, client1, nil diff --git a/src/dbnode/client/options.go b/src/dbnode/client/options.go index e36001ff83..c8e350b199 100644 --- a/src/dbnode/client/options.go +++ b/src/dbnode/client/options.go @@ -38,11 +38,11 @@ import ( "github.com/m3db/m3/src/dbnode/storage/index" "github.com/m3db/m3/src/dbnode/topology" "github.com/m3db/m3/src/x/clock" - xclose "github.com/m3db/m3/src/x/close" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/pool" + xresource "github.com/m3db/m3/src/x/resource" xretry "github.com/m3db/m3/src/x/retry" "github.com/m3db/m3/src/x/sampler" "github.com/m3db/m3/src/x/serialize" @@ -319,7 +319,7 @@ func NewOptionsForAsyncClusters(opts Options, topoInits []topology.Initializer, func defaultNewConnectionFn( channelName string, address string, opts Options, -) (xclose.SimpleCloser, rpc.TChanNode, error) { +) (xresource.SimpleCloser, rpc.TChanNode, error) { channel, err := tchannel.NewChannel(channelName, opts.ChannelOptions()) if err != nil { return nil, nil, err diff --git a/src/dbnode/client/session.go b/src/dbnode/client/session.go index f803750d0b..a188f105cf 100644 --- a/src/dbnode/client/session.go +++ b/src/dbnode/client/session.go @@ -48,12 +48,12 @@ import ( "github.com/m3db/m3/src/dbnode/x/xpool" "github.com/m3db/m3/src/x/checked" "github.com/m3db/m3/src/x/clock" - xclose "github.com/m3db/m3/src/x/close" "github.com/m3db/m3/src/x/context" xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/pool" + xresource "github.com/m3db/m3/src/x/resource" xretry "github.com/m3db/m3/src/x/retry" "github.com/m3db/m3/src/x/sampler" "github.com/m3db/m3/src/x/serialize" @@ -136,7 +136,7 @@ type sessionState struct { type session struct { state sessionState opts Options - runtimeOptsListenerCloser xclose.Closer + runtimeOptsListenerCloser xresource.SimpleCloser scope tally.Scope nowFn clock.NowFn log *zap.Logger diff --git a/src/dbnode/client/session_fetch_high_concurrency_test.go b/src/dbnode/client/session_fetch_high_concurrency_test.go index 77a07ea747..a4acd087c3 100644 --- a/src/dbnode/client/session_fetch_high_concurrency_test.go +++ b/src/dbnode/client/session_fetch_high_concurrency_test.go @@ -34,8 +34,8 @@ import ( "github.com/m3db/m3/src/dbnode/sharding" "github.com/m3db/m3/src/dbnode/topology" "github.com/m3db/m3/src/dbnode/ts" - xclose "github.com/m3db/m3/src/x/close" "github.com/m3db/m3/src/x/ident" + xresource "github.com/m3db/m3/src/x/resource" xtime "github.com/m3db/m3/src/x/time" "github.com/golang/mock/gomock" @@ -102,7 +102,7 @@ func TestSessionFetchIDsHighConcurrency(t *testing.T) { // to be able to mock the entire end to end pipeline newConnFn := func( _ string, addr string, _ Options, - ) (xclose.SimpleCloser, rpc.TChanNode, error) { + ) (xresource.SimpleCloser, rpc.TChanNode, error) { mockClient := rpc.NewMockTChanNode(ctrl) mockClient.EXPECT().Health(gomock.Any()). Return(healthCheckResult, nil). diff --git a/src/dbnode/digest/fd_digest.go b/src/dbnode/digest/fd_digest.go index af316ff3e3..c8db06a14c 100644 --- a/src/dbnode/digest/fd_digest.go +++ b/src/dbnode/digest/fd_digest.go @@ -25,12 +25,12 @@ import ( "hash/adler32" "os" - xclose "github.com/m3db/m3/src/x/close" + xresource "github.com/m3db/m3/src/x/resource" ) // FdWithDigest is a container for a file descriptor and the digest for the file contents. type FdWithDigest interface { - xclose.Closer + xresource.Closer // Fd returns the file descriptor. Fd() *os.File diff --git a/src/dbnode/namespace/namespace_mock.go b/src/dbnode/namespace/namespace_mock.go index 1807675287..4fba8bd53d 100644 --- a/src/dbnode/namespace/namespace_mock.go +++ b/src/dbnode/namespace/namespace_mock.go @@ -30,9 +30,9 @@ import ( "github.com/m3db/m3/src/cluster/client" "github.com/m3db/m3/src/dbnode/retention" - "github.com/m3db/m3/src/x/close" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" + "github.com/m3db/m3/src/x/resource" "github.com/gogo/protobuf/proto" "github.com/golang/mock/gomock" @@ -879,10 +879,10 @@ func (mr *MockSchemaRegistryMockRecorder) SetSchemaHistory(id, history interface } // RegisterListener mocks base method -func (m *MockSchemaRegistry) RegisterListener(id ident.ID, listener SchemaListener) (close.SimpleCloser, error) { +func (m *MockSchemaRegistry) RegisterListener(id ident.ID, listener SchemaListener) (resource.SimpleCloser, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RegisterListener", id, listener) - ret0, _ := ret[0].(close.SimpleCloser) + ret0, _ := ret[0].(resource.SimpleCloser) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/src/dbnode/namespace/namespace_runtime_options.go b/src/dbnode/namespace/namespace_runtime_options.go index 8531cc6eaa..26538c7a7a 100644 --- a/src/dbnode/namespace/namespace_runtime_options.go +++ b/src/dbnode/namespace/namespace_runtime_options.go @@ -23,7 +23,7 @@ package namespace import ( "sync" - xclose "github.com/m3db/m3/src/x/close" + xresource "github.com/m3db/m3/src/x/resource" "github.com/m3db/m3/src/x/watch" ) @@ -88,7 +88,7 @@ type RuntimeOptionsManager interface { // RegisterListener registers a listener for updates to runtime options, // it will synchronously call back the listener when this method is called // to deliver the current set of runtime options. - RegisterListener(l RuntimeOptionsListener) xclose.SimpleCloser + RegisterListener(l RuntimeOptionsListener) xresource.SimpleCloser // Close closes the watcher and all descendent watches. Close() @@ -228,7 +228,7 @@ func (w *runtimeOptionsManager) Get() RuntimeOptions { func (w *runtimeOptionsManager) RegisterListener( listener RuntimeOptionsListener, -) xclose.SimpleCloser { +) xresource.SimpleCloser { _, watch, _ := w.watchable.Watch() // We always initialize the watchable so always read diff --git a/src/dbnode/namespace/schema_registry.go b/src/dbnode/namespace/schema_registry.go index 014f19d4d2..08e8829cbb 100644 --- a/src/dbnode/namespace/schema_registry.go +++ b/src/dbnode/namespace/schema_registry.go @@ -24,8 +24,8 @@ import ( "fmt" "sync" - xclose "github.com/m3db/m3/src/x/close" "github.com/m3db/m3/src/x/ident" + xresource "github.com/m3db/m3/src/x/resource" xwatch "github.com/m3db/m3/src/x/watch" "go.uber.org/zap" @@ -145,7 +145,7 @@ func (sr *schemaRegistry) getSchemaHistory(nsIDStr string) (SchemaHistory, error func (sr *schemaRegistry) RegisterListener( nsID ident.ID, listener SchemaListener, -) (xclose.SimpleCloser, error) { +) (xresource.SimpleCloser, error) { if !sr.protoEnabled { return nil, nil } diff --git a/src/dbnode/namespace/types.go b/src/dbnode/namespace/types.go index 39b71a1154..8afe60ea8d 100644 --- a/src/dbnode/namespace/types.go +++ b/src/dbnode/namespace/types.go @@ -25,9 +25,9 @@ import ( "github.com/m3db/m3/src/cluster/client" "github.com/m3db/m3/src/dbnode/retention" - xclose "github.com/m3db/m3/src/x/close" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" + xresource "github.com/m3db/m3/src/x/resource" "github.com/gogo/protobuf/proto" ) @@ -205,7 +205,7 @@ type SchemaRegistry interface { // RegisterListener registers a schema listener for the namespace. // If proto is not enabled, nil, nil is returned - RegisterListener(id ident.ID, listener SchemaListener) (xclose.SimpleCloser, error) + RegisterListener(id ident.ID, listener SchemaListener) (xresource.SimpleCloser, error) // Close closes all the listeners. Close() diff --git a/src/dbnode/network/server/httpjson/cluster/server.go b/src/dbnode/network/server/httpjson/cluster/server.go index 2ccd905580..70d62a9e43 100644 --- a/src/dbnode/network/server/httpjson/cluster/server.go +++ b/src/dbnode/network/server/httpjson/cluster/server.go @@ -28,8 +28,8 @@ import ( ns "github.com/m3db/m3/src/dbnode/network/server" "github.com/m3db/m3/src/dbnode/network/server/httpjson" ttcluster "github.com/m3db/m3/src/dbnode/network/server/tchannelthrift/cluster" - xclose "github.com/m3db/m3/src/x/close" "github.com/m3db/m3/src/x/context" + xresource "github.com/m3db/m3/src/x/resource" ) type server struct { @@ -83,6 +83,6 @@ func (s *server) ListenAndServe() (ns.Close, error) { return func() { listener.Close() - xclose.TryClose(service) + xresource.TryClose(service) // nolint: errcheck }, nil } diff --git a/src/dbnode/network/server/tchannelthrift/cluster/server.go b/src/dbnode/network/server/tchannelthrift/cluster/server.go index 0790618a05..5b05af00e8 100644 --- a/src/dbnode/network/server/tchannelthrift/cluster/server.go +++ b/src/dbnode/network/server/tchannelthrift/cluster/server.go @@ -25,8 +25,8 @@ import ( "github.com/m3db/m3/src/dbnode/generated/thrift/rpc" ns "github.com/m3db/m3/src/dbnode/network/server" "github.com/m3db/m3/src/dbnode/network/server/tchannelthrift" - xclose "github.com/m3db/m3/src/x/close" "github.com/m3db/m3/src/x/context" + xresource "github.com/m3db/m3/src/x/resource" "github.com/uber/tchannel-go" ) @@ -76,6 +76,6 @@ func (s *server) ListenAndServe() (ns.Close, error) { return func() { channel.Close() - xclose.TryClose(service) + xresource.TryClose(service) // nolint: errcheck }, nil } diff --git a/src/dbnode/network/server/tchannelthrift/node/service.go b/src/dbnode/network/server/tchannelthrift/node/service.go index 99d6c9cc51..871dd91b20 100644 --- a/src/dbnode/network/server/tchannelthrift/node/service.go +++ b/src/dbnode/network/server/tchannelthrift/node/service.go @@ -49,7 +49,7 @@ import ( "github.com/m3db/m3/src/x/instrument" xopentracing "github.com/m3db/m3/src/x/opentracing" "github.com/m3db/m3/src/x/pool" - "github.com/m3db/m3/src/x/resource" + xresource "github.com/m3db/m3/src/x/resource" "github.com/m3db/m3/src/x/serialize" xtime "github.com/m3db/m3/src/x/time" @@ -2302,7 +2302,7 @@ func (s *service) readEncodedResult( segments := s.pools.segmentsArray.Get() segments = segmentsArr(segments).grow(len(encoded)) segments = segments[:0] - ctx.RegisterFinalizer(resource.FinalizerFn(func() { + ctx.RegisterFinalizer(xresource.FinalizerFn(func() { s.pools.segmentsArray.Put(segments) })) diff --git a/src/dbnode/persist/fs/files.go b/src/dbnode/persist/fs/files.go index 1de9a425d6..a0017e8075 100644 --- a/src/dbnode/persist/fs/files.go +++ b/src/dbnode/persist/fs/files.go @@ -38,7 +38,6 @@ import ( "github.com/m3db/m3/src/dbnode/persist/fs/msgpack" "github.com/m3db/m3/src/dbnode/persist/schema" idxpersist "github.com/m3db/m3/src/m3ninx/persist" - xclose "github.com/m3db/m3/src/x/close" xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" @@ -360,17 +359,6 @@ func openFiles(opener fileOpener, fds map[string]**os.File) error { return firstErr } -// TODO(xichen): move closeAll to m3x/close. -func closeAll(closers ...xclose.Closer) error { - multiErr := xerrors.NewMultiError() - for _, closer := range closers { - if err := closer.Close(); err != nil { - multiErr = multiErr.Add(err) - } - } - return multiErr.FinalError() -} - // DeleteFiles delete a set of files, returning all the errors encountered during // the deletion process. func DeleteFiles(filePaths []string) error { diff --git a/src/dbnode/persist/fs/files_test.go b/src/dbnode/persist/fs/files_test.go index 60a4b68dd0..556bf9efb2 100644 --- a/src/dbnode/persist/fs/files_test.go +++ b/src/dbnode/persist/fs/files_test.go @@ -39,6 +39,7 @@ import ( "github.com/m3db/m3/src/dbnode/retention" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" + xresource "github.com/m3db/m3/src/x/resource" "github.com/pborman/uuid" "github.com/stretchr/testify/assert" @@ -88,7 +89,7 @@ func TestCloseAllFails(t *testing.T) { defer os.Remove(file.Name()) assert.NoError(t, file.Close()) - assert.Error(t, closeAll(file)) + assert.Error(t, xresource.CloseAll(file)) } func TestDeleteFiles(t *testing.T) { diff --git a/src/dbnode/persist/fs/persist_manager.go b/src/dbnode/persist/fs/persist_manager.go index 37e1021b8c..8d201a298b 100644 --- a/src/dbnode/persist/fs/persist_manager.go +++ b/src/dbnode/persist/fs/persist_manager.go @@ -35,8 +35,8 @@ import ( m3ninxpersist "github.com/m3db/m3/src/m3ninx/persist" "github.com/m3db/m3/src/x/checked" "github.com/m3db/m3/src/x/clock" - xclose "github.com/m3db/m3/src/x/close" "github.com/m3db/m3/src/x/instrument" + xresource "github.com/m3db/m3/src/x/resource" "github.com/pborman/uuid" "github.com/uber-go/tally" @@ -93,7 +93,7 @@ type persistManager struct { metrics persistManagerMetrics - runtimeOptsListener xclose.SimpleCloser + runtimeOptsListener xresource.SimpleCloser } type dataPersistManager struct { diff --git a/src/dbnode/persist/fs/write.go b/src/dbnode/persist/fs/write.go index 5b2c17e6a6..939609547c 100644 --- a/src/dbnode/persist/fs/write.go +++ b/src/dbnode/persist/fs/write.go @@ -36,6 +36,7 @@ import ( "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/x/checked" "github.com/m3db/m3/src/x/ident" + xresource "github.com/m3db/m3/src/x/resource" "github.com/m3db/m3/src/x/serialize" xtime "github.com/m3db/m3/src/x/time" @@ -390,7 +391,7 @@ func (w *writer) closeWOIndex() error { return err } - return closeAll( + return xresource.CloseAll( w.infoFdWithDigest, w.indexFdWithDigest, w.summariesFdWithDigest, diff --git a/src/dbnode/runtime/runtime_mock.go b/src/dbnode/runtime/runtime_mock.go index aa45644751..187dc92e50 100644 --- a/src/dbnode/runtime/runtime_mock.go +++ b/src/dbnode/runtime/runtime_mock.go @@ -30,7 +30,7 @@ import ( "github.com/m3db/m3/src/dbnode/ratelimit" "github.com/m3db/m3/src/dbnode/topology" - "github.com/m3db/m3/src/x/close" + "github.com/m3db/m3/src/x/resource" "github.com/golang/mock/gomock" ) @@ -488,10 +488,10 @@ func (mr *MockOptionsManagerMockRecorder) Get() *gomock.Call { } // RegisterListener mocks base method -func (m *MockOptionsManager) RegisterListener(l OptionsListener) close.SimpleCloser { +func (m *MockOptionsManager) RegisterListener(l OptionsListener) resource.SimpleCloser { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RegisterListener", l) - ret0, _ := ret[0].(close.SimpleCloser) + ret0, _ := ret[0].(resource.SimpleCloser) return ret0 } diff --git a/src/dbnode/runtime/runtime_options_manager.go b/src/dbnode/runtime/runtime_options_manager.go index 290b6ca7b0..bcb8630b13 100644 --- a/src/dbnode/runtime/runtime_options_manager.go +++ b/src/dbnode/runtime/runtime_options_manager.go @@ -23,7 +23,7 @@ package runtime import ( "fmt" - xclose "github.com/m3db/m3/src/x/close" + xresource "github.com/m3db/m3/src/x/resource" xwatch "github.com/m3db/m3/src/x/watch" ) @@ -52,7 +52,7 @@ func (w *optionsManager) Get() Options { func (w *optionsManager) RegisterListener( listener OptionsListener, -) xclose.SimpleCloser { +) xresource.SimpleCloser { _, watch, _ := w.watchable.Watch() // We always initialize the watchable so always read @@ -98,7 +98,7 @@ func (n noOpOptionsManager) Get() Options { func (n noOpOptionsManager) RegisterListener( listener OptionsListener, -) xclose.SimpleCloser { +) xresource.SimpleCloser { // noOpOptionsManager never changes its options, not worth // registering listener return noOpCloser{} diff --git a/src/dbnode/runtime/types.go b/src/dbnode/runtime/types.go index cafe9f8046..4b8ae81f9a 100644 --- a/src/dbnode/runtime/types.go +++ b/src/dbnode/runtime/types.go @@ -25,7 +25,7 @@ import ( "github.com/m3db/m3/src/dbnode/ratelimit" "github.com/m3db/m3/src/dbnode/topology" - xclose "github.com/m3db/m3/src/x/close" + xresource "github.com/m3db/m3/src/x/resource" ) // Options is a set of runtime options. @@ -195,7 +195,7 @@ type OptionsManager interface { // RegisterListener registers a listener for updates to runtime options, // it will synchronously call back the listener when this method is called // to deliver the current set of runtime options. - RegisterListener(l OptionsListener) xclose.SimpleCloser + RegisterListener(l OptionsListener) xresource.SimpleCloser // Close closes the watcher and all descendent watches. Close() diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go index 3f09a360ef..d4f7e05560 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go @@ -45,10 +45,10 @@ import ( "github.com/m3db/m3/src/m3ninx/index/segment/fst" idxpersist "github.com/m3db/m3/src/m3ninx/persist" "github.com/m3db/m3/src/x/clock" - xclose "github.com/m3db/m3/src/x/close" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" + xresource "github.com/m3db/m3/src/x/resource" xsync "github.com/m3db/m3/src/x/sync" xtime "github.com/m3db/m3/src/x/time" @@ -340,7 +340,7 @@ func (s *peersSource) startPersistenceQueueWorkerLoop( persistFlush, bootstrapResult, lock) }() - return xclose.CloserFn(persistFlush.DoneFlush), nil + return xresource.CloserFn(persistFlush.DoneFlush), nil } // runPersistenceQueueWorkerLoop is meant to be run in its own goroutine, and it creates a worker that diff --git a/src/dbnode/storage/index.go b/src/dbnode/storage/index.go index 9dabe60f96..6a61795e8b 100644 --- a/src/dbnode/storage/index.go +++ b/src/dbnode/storage/index.go @@ -53,13 +53,12 @@ import ( "github.com/m3db/m3/src/m3ninx/index/segment/builder" idxpersist "github.com/m3db/m3/src/m3ninx/persist" "github.com/m3db/m3/src/x/clock" - xclose "github.com/m3db/m3/src/x/close" "github.com/m3db/m3/src/x/context" xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" xopentracing "github.com/m3db/m3/src/x/opentracing" - "github.com/m3db/m3/src/x/resource" + xresource "github.com/m3db/m3/src/x/resource" xsync "github.com/m3db/m3/src/x/sync" xtime "github.com/m3db/m3/src/x/time" @@ -117,8 +116,8 @@ type nsIndex struct { logger *zap.Logger opts Options nsMetadata namespace.Metadata - runtimeOptsListener xclose.SimpleCloser - runtimeNsOptsListener xclose.SimpleCloser + runtimeOptsListener xresource.SimpleCloser + runtimeNsOptsListener xresource.SimpleCloser resultsPool index.QueryResultsPool aggregateResultsPool index.AggregateResultsPool @@ -211,7 +210,7 @@ type newNamespaceIndexOpts struct { // execBlockQueryFn executes a query against the given block whilst tracking state. type execBlockQueryFn func( ctx context.Context, - cancellable *resource.CancellableLifetime, + cancellable *xresource.CancellableLifetime, block index.Block, query index.Query, opts index.QueryOptions, @@ -1555,7 +1554,7 @@ func (i *nsIndex) queryWithSpan( // Create a cancellable lifetime and cancel it at end of this method so that // no child async task modifies the result after this method returns. - cancellable := resource.NewCancellableLifetime() + cancellable := xresource.NewCancellableLifetime() defer cancellable.Cancel() for _, block := range blocks { @@ -1663,7 +1662,7 @@ func (i *nsIndex) queryWithSpan( func (i *nsIndex) execBlockQueryFn( ctx context.Context, - cancellable *resource.CancellableLifetime, + cancellable *xresource.CancellableLifetime, block index.Block, query index.Query, opts index.QueryOptions, @@ -1701,7 +1700,7 @@ func (i *nsIndex) execBlockQueryFn( func (i *nsIndex) execBlockWideQueryFn( ctx context.Context, - cancellable *resource.CancellableLifetime, + cancellable *xresource.CancellableLifetime, block index.Block, query index.Query, opts index.QueryOptions, @@ -1745,7 +1744,7 @@ func (i *nsIndex) execBlockWideQueryFn( func (i *nsIndex) execBlockAggregateQueryFn( ctx context.Context, - cancellable *resource.CancellableLifetime, + cancellable *xresource.CancellableLifetime, block index.Block, query index.Query, opts index.QueryOptions, diff --git a/src/dbnode/storage/index/block.go b/src/dbnode/storage/index/block.go index f1f0e80b9f..120fe09a3d 100644 --- a/src/dbnode/storage/index/block.go +++ b/src/dbnode/storage/index/block.go @@ -43,7 +43,7 @@ import ( xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" - "github.com/m3db/m3/src/x/resource" + xresource "github.com/m3db/m3/src/x/resource" xtime "github.com/m3db/m3/src/x/time" "github.com/opentracing/opentracing-go" @@ -400,7 +400,7 @@ func (b *block) segmentReadersWithRLock() ([]segment.Reader, error) { // to the results datastructure). func (b *block) Query( ctx context.Context, - cancellable *resource.CancellableLifetime, + cancellable *xresource.CancellableLifetime, query Query, opts QueryOptions, results BaseResults, @@ -420,7 +420,7 @@ func (b *block) Query( func (b *block) queryWithSpan( ctx context.Context, - cancellable *resource.CancellableLifetime, + cancellable *xresource.CancellableLifetime, query Query, opts QueryOptions, results BaseResults, @@ -465,7 +465,7 @@ func (b *block) queryWithSpan( return false, errCancelledQuery } execCloseRegistered = true // Make sure to not locally close it. - ctx.RegisterFinalizer(resource.FinalizerFn(func() { + ctx.RegisterFinalizer(xresource.FinalizerFn(func() { b.closeAsync(exec) })) cancellable.ReleaseCheckout() @@ -530,7 +530,7 @@ func (b *block) closeAsync(closer io.Closer) { } func (b *block) addQueryResults( - cancellable *resource.CancellableLifetime, + cancellable *xresource.CancellableLifetime, results BaseResults, batch []doc.Document, ) ([]doc.Document, int, int, error) { @@ -548,7 +548,7 @@ func (b *block) addQueryResults( return batch, 0, 0, errCancelledQuery } - // try to add the docs to the resource. + // try to add the docs to the xresource. size, docsCount, err := results.AddDocuments(batch) // immediately release the checkout on the lifetime of query. @@ -572,7 +572,7 @@ func (b *block) addQueryResults( // pre-aggregated results via the FST underlying the index. func (b *block) Aggregate( ctx context.Context, - cancellable *resource.CancellableLifetime, + cancellable *xresource.CancellableLifetime, opts QueryOptions, results AggregateResults, logFields []opentracinglog.Field, @@ -591,7 +591,7 @@ func (b *block) Aggregate( func (b *block) aggregateWithSpan( ctx context.Context, - cancellable *resource.CancellableLifetime, + cancellable *xresource.CancellableLifetime, opts QueryOptions, results AggregateResults, sp opentracing.Span, @@ -667,7 +667,7 @@ func (b *block) aggregateWithSpan( // read by the readers. for _, reader := range readers { reader := reader // Capture for inline function. - ctx.RegisterFinalizer(resource.FinalizerFn(func() { + ctx.RegisterFinalizer(xresource.FinalizerFn(func() { b.closeAsync(reader) })) } @@ -792,7 +792,7 @@ func (b *block) pooledID(id []byte) ident.ID { } func (b *block) addAggregateResults( - cancellable *resource.CancellableLifetime, + cancellable *xresource.CancellableLifetime, results AggregateResults, batch []AggregateResultsEntry, ) ([]AggregateResultsEntry, int, int, error) { @@ -810,7 +810,7 @@ func (b *block) addAggregateResults( return batch, 0, 0, errCancelledQuery } - // try to add the docs to the resource. + // try to add the docs to the xresource. size, docsCount := results.AddFields(batch) // immediately release the checkout on the lifetime of query. diff --git a/src/dbnode/storage/index/block_prop_test.go b/src/dbnode/storage/index/block_prop_test.go index 447ca2f12e..0624a3923a 100644 --- a/src/dbnode/storage/index/block_prop_test.go +++ b/src/dbnode/storage/index/block_prop_test.go @@ -40,7 +40,7 @@ import ( "github.com/m3db/m3/src/m3ninx/search/proptest" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/instrument" - "github.com/m3db/m3/src/x/resource" + xresource "github.com/m3db/m3/src/x/resource" "github.com/leanovate/gopter" "github.com/leanovate/gopter/prop" @@ -107,7 +107,7 @@ func TestPostingsListCacheDoesNotAffectBlockQueryResults(t *testing.T) { idx.NewQueryFromSearchQuery(q), } - cancellable := resource.NewCancellableLifetime() + cancellable := xresource.NewCancellableLifetime() cancelled := false doneQuery := func() { if !cancelled { diff --git a/src/dbnode/storage/index/block_test.go b/src/dbnode/storage/index/block_test.go index 5bb078b676..e8f4e64ee4 100644 --- a/src/dbnode/storage/index/block_test.go +++ b/src/dbnode/storage/index/block_test.go @@ -40,7 +40,7 @@ import ( "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/pool" - "github.com/m3db/m3/src/x/resource" + xresource "github.com/m3db/m3/src/x/resource" xtime "github.com/m3db/m3/src/x/time" "github.com/golang/mock/gomock" @@ -366,7 +366,7 @@ func TestBlockQueryAfterClose(t *testing.T) { require.Equal(t, start.Add(time.Hour), b.EndTime()) require.NoError(t, b.Close()) - _, err = b.Query(context.NewContext(), resource.NewCancellableLifetime(), + _, err = b.Query(context.NewContext(), xresource.NewCancellableLifetime(), defaultQuery, QueryOptions{}, nil, emptyLogFields) require.Error(t, err) } @@ -382,7 +382,7 @@ func TestBlockQueryWithCancelledQuery(t *testing.T) { require.Equal(t, start.Add(time.Hour), b.EndTime()) // Precancel query. - cancellable := resource.NewCancellableLifetime() + cancellable := xresource.NewCancellableLifetime() cancellable.Cancel() _, err = b.Query(context.NewContext(), cancellable, @@ -405,7 +405,7 @@ func TestBlockQueryExecutorError(t *testing.T) { return nil, fmt.Errorf("random-err") } - _, err = b.Query(context.NewContext(), resource.NewCancellableLifetime(), + _, err = b.Query(context.NewContext(), xresource.NewCancellableLifetime(), defaultQuery, QueryOptions{}, nil, emptyLogFields) require.Error(t, err) } @@ -428,7 +428,7 @@ func TestBlockQuerySegmentReaderError(t *testing.T) { randErr := fmt.Errorf("random-err") seg.EXPECT().Reader().Return(nil, randErr) - _, err = b.Query(context.NewContext(), resource.NewCancellableLifetime(), + _, err = b.Query(context.NewContext(), xresource.NewCancellableLifetime(), defaultQuery, QueryOptions{}, nil, emptyLogFields) require.Equal(t, randErr, err) } @@ -468,7 +468,7 @@ func TestBlockQueryAddResultsSegmentsError(t *testing.T) { randErr := fmt.Errorf("random-err") seg3.EXPECT().Reader().Return(nil, randErr) - _, err = b.Query(context.NewContext(), resource.NewCancellableLifetime(), + _, err = b.Query(context.NewContext(), xresource.NewCancellableLifetime(), defaultQuery, QueryOptions{}, nil, emptyLogFields) require.Equal(t, randErr, err) } @@ -495,7 +495,7 @@ func TestBlockMockQueryExecutorExecError(t *testing.T) { exec.EXPECT().Execute(gomock.Any()).Return(nil, fmt.Errorf("randomerr")), exec.EXPECT().Close(), ) - _, err = b.Query(context.NewContext(), resource.NewCancellableLifetime(), + _, err = b.Query(context.NewContext(), xresource.NewCancellableLifetime(), defaultQuery, QueryOptions{}, nil, emptyLogFields) require.Error(t, err) } @@ -531,7 +531,7 @@ func TestBlockMockQueryExecutorExecIterErr(t *testing.T) { ctx := context.NewContext() - _, err = b.Query(ctx, resource.NewCancellableLifetime(), + _, err = b.Query(ctx, xresource.NewCancellableLifetime(), defaultQuery, QueryOptions{}, NewQueryResults(nil, QueryResultsOptions{}, testOpts), emptyLogFields) require.Error(t, err) @@ -575,7 +575,7 @@ func TestBlockMockQueryExecutorExecLimit(t *testing.T) { ctx := context.NewContext() - exhaustive, err := b.Query(ctx, resource.NewCancellableLifetime(), + exhaustive, err := b.Query(ctx, xresource.NewCancellableLifetime(), defaultQuery, QueryOptions{SeriesLimit: limit}, results, emptyLogFields) require.NoError(t, err) require.False(t, exhaustive) @@ -622,7 +622,7 @@ func TestBlockMockQueryExecutorExecIterCloseErr(t *testing.T) { ctx := context.NewContext() - _, err = b.Query(ctx, resource.NewCancellableLifetime(), + _, err = b.Query(ctx, xresource.NewCancellableLifetime(), defaultQuery, QueryOptions{}, results, emptyLogFields) require.Error(t, err) @@ -664,7 +664,7 @@ func TestBlockMockQuerySeriesLimitNonExhaustive(t *testing.T) { ctx := context.NewContext() - exhaustive, err := b.Query(ctx, resource.NewCancellableLifetime(), + exhaustive, err := b.Query(ctx, xresource.NewCancellableLifetime(), defaultQuery, QueryOptions{SeriesLimit: limit}, results, emptyLogFields) require.NoError(t, err) require.False(t, exhaustive) @@ -715,7 +715,7 @@ func TestBlockMockQuerySeriesLimitExhaustive(t *testing.T) { ctx := context.NewContext() - exhaustive, err := b.Query(ctx, resource.NewCancellableLifetime(), + exhaustive, err := b.Query(ctx, xresource.NewCancellableLifetime(), defaultQuery, QueryOptions{SeriesLimit: limit}, results, emptyLogFields) require.NoError(t, err) require.True(t, exhaustive) @@ -766,7 +766,7 @@ func TestBlockMockQueryDocsLimitNonExhaustive(t *testing.T) { ctx := context.NewContext() - exhaustive, err := b.Query(ctx, resource.NewCancellableLifetime(), + exhaustive, err := b.Query(ctx, xresource.NewCancellableLifetime(), defaultQuery, QueryOptions{DocsLimit: docsLimit}, results, emptyLogFields) require.NoError(t, err) require.False(t, exhaustive) @@ -817,7 +817,7 @@ func TestBlockMockQueryDocsLimitExhaustive(t *testing.T) { ctx := context.NewContext() - exhaustive, err := b.Query(ctx, resource.NewCancellableLifetime(), + exhaustive, err := b.Query(ctx, xresource.NewCancellableLifetime(), defaultQuery, QueryOptions{DocsLimit: docsLimit}, results, emptyLogFields) require.NoError(t, err) require.True(t, exhaustive) @@ -871,7 +871,7 @@ func TestBlockMockQueryMergeResultsMapLimit(t *testing.T) { ctx := context.NewContext() - exhaustive, err := b.Query(ctx, resource.NewCancellableLifetime(), + exhaustive, err := b.Query(ctx, xresource.NewCancellableLifetime(), defaultQuery, QueryOptions{SeriesLimit: limit}, results, emptyLogFields) require.NoError(t, err) require.False(t, exhaustive) @@ -926,7 +926,7 @@ func TestBlockMockQueryMergeResultsDupeID(t *testing.T) { ctx := context.NewContext() - exhaustive, err := b.Query(ctx, resource.NewCancellableLifetime(), + exhaustive, err := b.Query(ctx, xresource.NewCancellableLifetime(), defaultQuery, QueryOptions{}, results, emptyLogFields) require.NoError(t, err) require.True(t, exhaustive) @@ -1395,7 +1395,7 @@ func TestBlockE2EInsertQuery(t *testing.T) { ctx.SetGoContext(opentracing.ContextWithSpan(stdlibctx.Background(), sp)) results := NewQueryResults(nil, QueryResultsOptions{}, testOpts) - exhaustive, err := b.Query(ctx, resource.NewCancellableLifetime(), + exhaustive, err := b.Query(ctx, xresource.NewCancellableLifetime(), Query{q}, QueryOptions{}, results, emptyLogFields) require.NoError(t, err) require.True(t, exhaustive) @@ -1471,7 +1471,7 @@ func TestBlockE2EInsertQueryLimit(t *testing.T) { limit := 1 results := NewQueryResults(nil, QueryResultsOptions{SizeLimit: limit}, testOpts) - exhaustive, err := b.Query(context.NewContext(), resource.NewCancellableLifetime(), + exhaustive, err := b.Query(context.NewContext(), xresource.NewCancellableLifetime(), Query{q}, QueryOptions{SeriesLimit: limit}, results, emptyLogFields) require.NoError(t, err) require.False(t, exhaustive) @@ -1560,7 +1560,7 @@ func TestBlockE2EInsertAddResultsQuery(t *testing.T) { ctx.SetGoContext(opentracing.ContextWithSpan(stdlibctx.Background(), sp)) results := NewQueryResults(nil, QueryResultsOptions{}, testOpts) - exhaustive, err := b.Query(ctx, resource.NewCancellableLifetime(), + exhaustive, err := b.Query(ctx, xresource.NewCancellableLifetime(), Query{q}, QueryOptions{}, results, emptyLogFields) require.NoError(t, err) require.True(t, exhaustive) @@ -1639,7 +1639,7 @@ func TestBlockE2EInsertAddResultsMergeQuery(t *testing.T) { ctx.SetGoContext(opentracing.ContextWithSpan(stdlibctx.Background(), sp)) results := NewQueryResults(nil, QueryResultsOptions{}, testOpts) - exhaustive, err := b.Query(ctx, resource.NewCancellableLifetime(), + exhaustive, err := b.Query(ctx, xresource.NewCancellableLifetime(), Query{q}, QueryOptions{}, results, emptyLogFields) require.NoError(t, err) require.True(t, exhaustive) @@ -1782,7 +1782,7 @@ func TestBlockAggregateAfterClose(t *testing.T) { require.Equal(t, start.Add(time.Hour), b.EndTime()) require.NoError(t, b.Close()) - _, err = b.Aggregate(context.NewContext(), resource.NewCancellableLifetime(), + _, err = b.Aggregate(context.NewContext(), xresource.NewCancellableLifetime(), QueryOptions{}, nil, emptyLogFields) require.Error(t, err) } @@ -1829,7 +1829,12 @@ func TestBlockAggregateIterationErr(t *testing.T) { ctx := context.NewContext() defer ctx.BlockingClose() - _, err = b.Aggregate(ctx, resource.NewCancellableLifetime(), QueryOptions{SeriesLimit: 3}, results, emptyLogFields) + _, err = b.Aggregate( + ctx, + xresource.NewCancellableLifetime(), + QueryOptions{SeriesLimit: 3}, + results, + emptyLogFields) require.Error(t, err) } @@ -1885,7 +1890,12 @@ func TestBlockAggregate(t *testing.T) { iter.EXPECT().Err().Return(nil), iter.EXPECT().Close().Return(nil), ) - exhaustive, err := b.Aggregate(ctx, resource.NewCancellableLifetime(), QueryOptions{SeriesLimit: 3}, results, emptyLogFields) + exhaustive, err := b.Aggregate( + ctx, + xresource.NewCancellableLifetime(), + QueryOptions{SeriesLimit: 3}, + results, + emptyLogFields) require.NoError(t, err) require.True(t, exhaustive) @@ -1956,7 +1966,12 @@ func TestBlockAggregateNotExhaustive(t *testing.T) { iter.EXPECT().Err().Return(nil), iter.EXPECT().Close().Return(nil), ) - exhaustive, err := b.Aggregate(ctx, resource.NewCancellableLifetime(), QueryOptions{SeriesLimit: 1}, results, emptyLogFields) + exhaustive, err := b.Aggregate( + ctx, + xresource.NewCancellableLifetime(), + QueryOptions{SeriesLimit: 1}, + results, + emptyLogFields) require.NoError(t, err) require.False(t, exhaustive) @@ -2043,7 +2058,12 @@ func TestBlockE2EInsertAggregate(t *testing.T) { sp := mtr.StartSpan("root") ctx.SetGoContext(opentracing.ContextWithSpan(stdlibctx.Background(), sp)) - exhaustive, err := b.Aggregate(ctx, resource.NewCancellableLifetime(), QueryOptions{SeriesLimit: 10}, results, emptyLogFields) + exhaustive, err := b.Aggregate( + ctx, + xresource.NewCancellableLifetime(), + QueryOptions{SeriesLimit: 10}, + results, + emptyLogFields) require.NoError(t, err) require.True(t, exhaustive) assertAggregateResultsMapEquals(t, map[string][]string{ @@ -2056,7 +2076,12 @@ func TestBlockE2EInsertAggregate(t *testing.T) { Type: AggregateTagNamesAndValues, FieldFilter: AggregateFieldFilter{[]byte("bar")}, }, testOpts) - exhaustive, err = b.Aggregate(ctx, resource.NewCancellableLifetime(), QueryOptions{SeriesLimit: 10}, results, emptyLogFields) + exhaustive, err = b.Aggregate( + ctx, + xresource.NewCancellableLifetime(), + QueryOptions{SeriesLimit: 10}, + results, + emptyLogFields) require.NoError(t, err) require.True(t, exhaustive) assertAggregateResultsMapEquals(t, map[string][]string{ @@ -2068,7 +2093,12 @@ func TestBlockE2EInsertAggregate(t *testing.T) { Type: AggregateTagNamesAndValues, FieldFilter: AggregateFieldFilter{[]byte("random")}, }, testOpts) - exhaustive, err = b.Aggregate(ctx, resource.NewCancellableLifetime(), QueryOptions{SeriesLimit: 10}, results, emptyLogFields) + exhaustive, err = b.Aggregate( + ctx, + xresource.NewCancellableLifetime(), + QueryOptions{SeriesLimit: 10}, + results, + emptyLogFields) require.NoError(t, err) require.True(t, exhaustive) assertAggregateResultsMapEquals(t, map[string][]string{}, results) diff --git a/src/dbnode/storage/index/mutable_segments.go b/src/dbnode/storage/index/mutable_segments.go index baa904cd7b..256bb4a313 100644 --- a/src/dbnode/storage/index/mutable_segments.go +++ b/src/dbnode/storage/index/mutable_segments.go @@ -35,10 +35,10 @@ import ( "github.com/m3db/m3/src/m3ninx/index/segment" "github.com/m3db/m3/src/m3ninx/index/segment/builder" "github.com/m3db/m3/src/m3ninx/index/segment/fst" - xclose "github.com/m3db/m3/src/x/close" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/mmap" + xresource "github.com/m3db/m3/src/x/resource" "github.com/uber-go/tally" "go.uber.org/zap" @@ -73,7 +73,7 @@ type mutableSegments struct { blockOpts BlockOptions opts Options iopts instrument.Options - optsListener xclose.SimpleCloser + optsListener xresource.SimpleCloser writeIndexingConcurrency int metrics mutableSegmentsMetrics diff --git a/src/dbnode/storage/index/types.go b/src/dbnode/storage/index/types.go index 58afff9776..78b2cbc1ea 100644 --- a/src/dbnode/storage/index/types.go +++ b/src/dbnode/storage/index/types.go @@ -41,7 +41,7 @@ import ( "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/mmap" "github.com/m3db/m3/src/x/pool" - "github.com/m3db/m3/src/x/resource" + xresource "github.com/m3db/m3/src/x/resource" xtime "github.com/m3db/m3/src/x/time" opentracinglog "github.com/opentracing/opentracing-go/log" @@ -352,7 +352,7 @@ type Block interface { // Query resolves the given query into known IDs. Query( ctx context.Context, - cancellable *resource.CancellableLifetime, + cancellable *xresource.CancellableLifetime, query Query, opts QueryOptions, results BaseResults, @@ -364,7 +364,7 @@ type Block interface { // avoid going to documents, relying purely on the indexed FSTs. Aggregate( ctx context.Context, - cancellable *resource.CancellableLifetime, + cancellable *xresource.CancellableLifetime, opts QueryOptions, results AggregateResults, logFields []opentracinglog.Field, diff --git a/src/dbnode/storage/index_query_concurrent_test.go b/src/dbnode/storage/index_query_concurrent_test.go index f7f1172367..0c41e5df02 100644 --- a/src/dbnode/storage/index_query_concurrent_test.go +++ b/src/dbnode/storage/index_query_concurrent_test.go @@ -34,7 +34,7 @@ import ( "github.com/m3db/m3/src/m3ninx/doc" "github.com/m3db/m3/src/m3ninx/idx" "github.com/m3db/m3/src/x/context" - "github.com/m3db/m3/src/x/resource" + xresource "github.com/m3db/m3/src/x/resource" xsync "github.com/m3db/m3/src/x/sync" xtest "github.com/m3db/m3/src/x/test" "go.uber.org/zap" @@ -232,7 +232,7 @@ func testNamespaceIndexHighConcurrentQueries( Query(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). DoAndReturn(func( _ context.Context, - _ *resource.CancellableLifetime, + _ *xresource.CancellableLifetime, _ index.Query, _ index.QueryOptions, _ index.QueryResults, @@ -246,7 +246,7 @@ func testNamespaceIndexHighConcurrentQueries( Query(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). DoAndReturn(func( ctx context.Context, - c *resource.CancellableLifetime, + c *xresource.CancellableLifetime, q index.Query, opts index.QueryOptions, r index.QueryResults, diff --git a/src/dbnode/storage/namespace.go b/src/dbnode/storage/namespace.go index e535169c8c..5367b17574 100644 --- a/src/dbnode/storage/namespace.go +++ b/src/dbnode/storage/namespace.go @@ -43,12 +43,12 @@ import ( "github.com/m3db/m3/src/dbnode/ts/writes" "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3/src/x/clock" - xclose "github.com/m3db/m3/src/x/close" "github.com/m3db/m3/src/x/context" xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" xopentracing "github.com/m3db/m3/src/x/opentracing" + xresource "github.com/m3db/m3/src/x/resource" xsync "github.com/m3db/m3/src/x/sync" xtime "github.com/m3db/m3/src/x/time" @@ -122,7 +122,7 @@ type dbNamespace struct { // schemaDescr caches the latest schema for the namespace. // schemaDescr is updated whenever schema registry is updated. - schemaListener xclose.SimpleCloser + schemaListener xresource.SimpleCloser schemaDescr namespace.SchemaDescr // Contains an entry to all shards for fast shard lookup, an diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index 056afa8166..49f8642757 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -54,11 +54,11 @@ import ( "github.com/m3db/m3/src/m3ninx/doc" "github.com/m3db/m3/src/x/checked" "github.com/m3db/m3/src/x/clock" - xclose "github.com/m3db/m3/src/x/close" "github.com/m3db/m3/src/x/context" xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" + xresource "github.com/m3db/m3/src/x/resource" xtime "github.com/m3db/m3/src/x/time" "github.com/gogo/protobuf/proto" @@ -185,7 +185,7 @@ type dbShard struct { contextPool context.Pool flushState shardFlushState tickWg *sync.WaitGroup - runtimeOptsListenClosers []xclose.SimpleCloser + runtimeOptsListenClosers []xresource.SimpleCloser currRuntimeOptions dbShardRuntimeOptions logger *zap.Logger metrics dbShardMetrics diff --git a/src/dbnode/x/xio/types.go b/src/dbnode/x/xio/types.go index 8fe7cf1680..fd02692fed 100644 --- a/src/dbnode/x/xio/types.go +++ b/src/dbnode/x/xio/types.go @@ -26,7 +26,7 @@ import ( "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/x/pool" - "github.com/m3db/m3/src/x/resource" + xresource "github.com/m3db/m3/src/x/resource" ) // BlockReader represents a block reader backed by a @@ -43,7 +43,7 @@ var EmptyBlockReader = BlockReader{} // SegmentReader implements the io reader interface backed by a segment. type SegmentReader interface { io.Reader - resource.Finalizer + xresource.Finalizer // Segment gets the segment read by this reader. Segment() (ts.Segment, error) diff --git a/src/query/storage/m3/cluster_namespaces_watcher.go b/src/query/storage/m3/cluster_namespaces_watcher.go index 945dbd7166..a630d8f45c 100644 --- a/src/query/storage/m3/cluster_namespaces_watcher.go +++ b/src/query/storage/m3/cluster_namespaces_watcher.go @@ -21,7 +21,7 @@ package m3 import ( - xclose "github.com/m3db/m3/src/x/close" + xresource "github.com/m3db/m3/src/x/resource" xwatch "github.com/m3db/m3/src/x/watch" ) @@ -48,7 +48,9 @@ func (n *clusterNamespacesWatcher) Get() ClusterNamespaces { return value.(ClusterNamespaces) } -func (n *clusterNamespacesWatcher) RegisterListener(listener ClusterNamespacesListener) xclose.SimpleCloser { +func (n *clusterNamespacesWatcher) RegisterListener( + listener ClusterNamespacesListener, +) xresource.SimpleCloser { _, watch, _ := n.watchable.Watch() namespaces := watch.Get() diff --git a/src/query/storage/m3/types.go b/src/query/storage/m3/types.go index 9413e92f19..99ef4f4410 100644 --- a/src/query/storage/m3/types.go +++ b/src/query/storage/m3/types.go @@ -27,8 +27,8 @@ import ( "github.com/m3db/m3/src/dbnode/namespace" genericstorage "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/storage/m3/consolidators" - xclose "github.com/m3db/m3/src/x/close" "github.com/m3db/m3/src/x/instrument" + xresource "github.com/m3db/m3/src/x/resource" ) // Cleanup is a cleanup function to be called after resources are freed. @@ -116,7 +116,7 @@ type ClusterNamespacesWatcher interface { // RegisterListener registers a listener for updates to cluster namespaces. // If a value is currently present, it will synchronously call back the listener. - RegisterListener(listener ClusterNamespacesListener) xclose.SimpleCloser + RegisterListener(listener ClusterNamespacesListener) xresource.SimpleCloser // Close closes the watcher and all descendent watches. Close() diff --git a/src/x/checked/checked_mock.go b/src/x/checked/checked_mock.go index 2c5040ea5e..a546388273 100644 --- a/src/x/checked/checked_mock.go +++ b/src/x/checked/checked_mock.go @@ -144,10 +144,10 @@ func (mr *MockBytesMockRecorder) DecWrites() *gomock.Call { } // DelayFinalizer mocks base method -func (m *MockBytes) DelayFinalizer() resource.Closer { +func (m *MockBytes) DelayFinalizer() resource.SimpleCloser { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "DelayFinalizer") - ret0, _ := ret[0].(resource.Closer) + ret0, _ := ret[0].(resource.SimpleCloser) return ret0 } diff --git a/src/x/checked/ref.go b/src/x/checked/ref.go index 569adbe9de..5257683a62 100644 --- a/src/x/checked/ref.go +++ b/src/x/checked/ref.go @@ -28,7 +28,7 @@ import ( "sync/atomic" "unsafe" - "github.com/m3db/m3/src/x/resource" + xresource "github.com/m3db/m3/src/x/resource" ) // RefCount is an embeddable checked.Ref. @@ -103,14 +103,14 @@ func (c *RefCount) finalizeWithLock() { // until the closer returned by the method is called at least once. // This is useful for dependent resources requiring the lifetime of this // entityt to be extended. -func (c *RefCount) DelayFinalizer() resource.Closer { +func (c *RefCount) DelayFinalizer() xresource.SimpleCloser { c.finalizeState.Lock() c.finalizeState.delayRef++ c.finalizeState.Unlock() return c } -// Close implements resource.Closer for the purpose of use with DelayFinalizer. +// Close implements xresource.SimpleCloser for the purpose of use with DelayFinalizer. func (c *RefCount) Close() { c.finalizeState.Lock() c.finalizeState.delayRef-- diff --git a/src/x/checked/ref_test.go b/src/x/checked/ref_test.go index fe0a93827d..c17d751e95 100644 --- a/src/x/checked/ref_test.go +++ b/src/x/checked/ref_test.go @@ -30,13 +30,13 @@ import ( "testing" "time" - "github.com/m3db/m3/src/x/resource" - "github.com/leanovate/gopter" "github.com/leanovate/gopter/gen" "github.com/leanovate/gopter/prop" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + xresource "github.com/m3db/m3/src/x/resource" ) func TestRefCountNegativeRefCount(t *testing.T) { @@ -249,7 +249,7 @@ func TestRefCountDelayFinalizer(t *testing.T) { elem.IncRef() elem.DecRef() - delays := make([]resource.Closer, 0, test.numDelay) + delays := make([]xresource.SimpleCloser, 0, test.numDelay) for i := 0; i < test.numDelay; i++ { delays = append(delays, elem.DelayFinalizer()) } diff --git a/src/x/checked/types.go b/src/x/checked/types.go index 0b0e2f26b7..269fdb91d5 100644 --- a/src/x/checked/types.go +++ b/src/x/checked/types.go @@ -22,7 +22,7 @@ package checked import ( - "github.com/m3db/m3/src/x/resource" + xresource "github.com/m3db/m3/src/x/resource" ) // Ref is an entity that checks ref counts. @@ -43,7 +43,7 @@ type Ref interface { // until the closer returned by the method is called at least once. // This is useful for dependent resources requiring the lifetime of this // entityt to be extended. - DelayFinalizer() resource.Closer + DelayFinalizer() xresource.SimpleCloser // Finalize will call the finalizer if any, ref count must be zero. Finalize() diff --git a/src/x/context/context.go b/src/x/context/context.go index 5d72e11670..c6617e79e8 100644 --- a/src/x/context/context.go +++ b/src/x/context/context.go @@ -25,7 +25,7 @@ import ( "sync" xopentracing "github.com/m3db/m3/src/x/opentracing" - "github.com/m3db/m3/src/x/resource" + xresource "github.com/m3db/m3/src/x/resource" lightstep "github.com/lightstep/lightstep-tracer-go" "github.com/opentracing/opentracing-go" @@ -52,8 +52,8 @@ type ctx struct { } type finalizeable struct { - finalizer resource.Finalizer - closer resource.Closer + finalizer xresource.Finalizer + closer xresource.SimpleCloser } // NewContext creates a new context. @@ -96,7 +96,7 @@ func (c *ctx) IsClosed() bool { return done } -func (c *ctx) RegisterFinalizer(f resource.Finalizer) { +func (c *ctx) RegisterFinalizer(f xresource.Finalizer) { parent := c.parentCtx() if parent != nil { parent.RegisterFinalizer(f) @@ -106,7 +106,7 @@ func (c *ctx) RegisterFinalizer(f resource.Finalizer) { c.registerFinalizeable(finalizeable{finalizer: f}) } -func (c *ctx) RegisterCloser(f resource.Closer) { +func (c *ctx) RegisterCloser(f xresource.SimpleCloser) { parent := c.parentCtx() if parent != nil { parent.RegisterCloser(f) diff --git a/src/x/context/context_test.go b/src/x/context/context_test.go index f5a36c8c44..b2afe6abfe 100644 --- a/src/x/context/context_test.go +++ b/src/x/context/context_test.go @@ -27,12 +27,12 @@ import ( "testing" "time" - "github.com/m3db/m3/src/x/resource" - "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/mocktracer" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + xresource "github.com/m3db/m3/src/x/resource" ) func TestRegisterFinalizerWithChild(t *testing.T) { @@ -48,7 +48,7 @@ func TestRegisterFinalizerWithChild(t *testing.T) { ) wg.Add(1) - childCtx.RegisterFinalizer(resource.FinalizerFn(func() { + childCtx.RegisterFinalizer(xresource.FinalizerFn(func() { childClosed = true wg.Done() })) @@ -71,7 +71,7 @@ func TestRegisterFinalizer(t *testing.T) { ) wg.Add(1) - ctx.RegisterFinalizer(resource.FinalizerFn(func() { + ctx.RegisterFinalizer(xresource.FinalizerFn(func() { closed = true wg.Done() })) @@ -97,7 +97,7 @@ func TestRegisterCloserWithChild(t *testing.T) { ) wg.Add(1) - childCtx.RegisterCloser(resource.CloserFn(func() { + childCtx.RegisterCloser(xresource.SimpleCloserFn(func() { childClosed = true wg.Done() })) @@ -120,7 +120,7 @@ func TestRegisterCloser(t *testing.T) { ) wg.Add(1) - ctx.RegisterCloser(resource.CloserFn(func() { + ctx.RegisterCloser(xresource.SimpleCloserFn(func() { closed = true wg.Done() })) @@ -136,7 +136,7 @@ func TestRegisterCloser(t *testing.T) { func TestDoesNotRegisterFinalizerWhenClosed(t *testing.T) { ctx := NewContext().(*ctx) ctx.Close() - ctx.RegisterFinalizer(resource.FinalizerFn(func() {})) + ctx.RegisterFinalizer(xresource.FinalizerFn(func() {})) assert.Equal(t, 0, ctx.numFinalizeables()) } @@ -145,7 +145,7 @@ func TestDoesNotCloseTwice(t *testing.T) { ctx := NewContext().(*ctx) var closed int32 - ctx.RegisterFinalizer(resource.FinalizerFn(func() { + ctx.RegisterFinalizer(xresource.FinalizerFn(func() { atomic.AddInt32(&closed, 1) })) @@ -187,7 +187,7 @@ func testDependsOn(t *testing.T, c *ctx) { other := NewContext().(*ctx) wg.Add(1) - c.RegisterFinalizer(resource.FinalizerFn(func() { + c.RegisterFinalizer(xresource.FinalizerFn(func() { atomic.AddInt32(&closed, 1) wg.Done() })) @@ -221,7 +221,7 @@ func TestDependsOnWithChild(t *testing.T) { ) wg.Add(1) - c.RegisterFinalizer(resource.FinalizerFn(func() { + c.RegisterFinalizer(xresource.FinalizerFn(func() { atomic.AddInt32(&closed, 1) wg.Done() })) diff --git a/src/x/context/pool_test.go b/src/x/context/pool_test.go index 7757171243..a2106a4995 100644 --- a/src/x/context/pool_test.go +++ b/src/x/context/pool_test.go @@ -23,7 +23,7 @@ package context import ( "testing" - "github.com/m3db/m3/src/x/resource" + xresource "github.com/m3db/m3/src/x/resource" "github.com/stretchr/testify/assert" ) @@ -34,7 +34,7 @@ func TestContextPool(t *testing.T) { ctx := pool.Get() finalizeCalled := false - ctx.RegisterFinalizer(resource.FinalizerFn(func() { + ctx.RegisterFinalizer(xresource.FinalizerFn(func() { finalizeCalled = true })) ctx.BlockingClose() diff --git a/src/x/context/types.go b/src/x/context/types.go index bf152f05ca..d384dc3411 100644 --- a/src/x/context/types.go +++ b/src/x/context/types.go @@ -24,7 +24,7 @@ import ( stdctx "context" "github.com/m3db/m3/src/x/pool" - "github.com/m3db/m3/src/x/resource" + xresource "github.com/m3db/m3/src/x/resource" "github.com/opentracing/opentracing-go" ) @@ -47,10 +47,10 @@ type Context interface { IsClosed() bool // RegisterFinalizer will register a resource finalizer. - RegisterFinalizer(resource.Finalizer) + RegisterFinalizer(xresource.Finalizer) // RegisterCloser will register a resource closer. - RegisterCloser(resource.Closer) + RegisterCloser(xresource.SimpleCloser) // DependsOn will register a blocking context that // must complete first before finalizers can be called. diff --git a/src/x/close/close.go b/src/x/resource/close.go similarity index 70% rename from src/x/close/close.go rename to src/x/resource/close.go index 1b652005fa..668674127f 100644 --- a/src/x/close/close.go +++ b/src/x/resource/close.go @@ -1,4 +1,4 @@ -// Copyright (c) 2016 Uber Technologies, Inc. +// Copyright (c) 2020 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -18,12 +18,12 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -// Package close provides utilities for closing resources. -package close +package resource import ( "errors" - "io" + + xerrors "github.com/m3db/m3/src/x/errors" ) var ( @@ -32,32 +32,6 @@ var ( ErrNotCloseable = errors.New("not a closeable resource") ) -// Closer is a resource that can be closed. -type Closer interface { - io.Closer -} - -// CloserFn implements the SimpleCloser interface. -type CloserFn func() error - -// Close implements the SimplerCloser interface. -func (fn CloserFn) Close() error { - return fn() -} - -// SimpleCloser is a resource that can be closed without returning a result. -type SimpleCloser interface { - Close() -} - -// SimpleCloserFn implements the SimpleCloser interface. -type SimpleCloserFn func() - -// Close implements the SimplerCloser interface. -func (fn SimpleCloserFn) Close() { - fn() -} - // TryClose attempts to close a resource, the resource is expected to // implement either Closeable or CloseableResult. func TryClose(r interface{}) error { @@ -70,3 +44,15 @@ func TryClose(r interface{}) error { } return ErrNotCloseable } + +// CloseAll closes all closers and combines any errors. +func CloseAll(closers ...Closer) error { + multiErr := xerrors.NewMultiError() + for _, closer := range closers { + if err := closer.Close(); err != nil { + multiErr = multiErr.Add(err) + } + } + + return multiErr.FinalError() +} diff --git a/src/x/close/close_test.go b/src/x/resource/close_test.go similarity index 92% rename from src/x/close/close_test.go rename to src/x/resource/close_test.go index 926df11e76..a8ab96103b 100644 --- a/src/x/close/close_test.go +++ b/src/x/resource/close_test.go @@ -1,4 +1,4 @@ -// Copyright (c) 2017 Uber Technologies, Inc. +// Copyright (c) 2020 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -17,14 +17,13 @@ // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. - -package close +package resource import ( "errors" "testing" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestTryClose(t *testing.T) { @@ -53,10 +52,10 @@ func TestTryClose(t *testing.T) { for _, test := range tests { err := TryClose(test.input) if test.expectErr { - assert.Error(t, err) + require.Error(t, err) continue } - assert.NoError(t, err) + require.NoError(t, err) } } diff --git a/src/x/resource/types.go b/src/x/resource/types.go index 05933c593d..42b2b0d02c 100644 --- a/src/x/resource/types.go +++ b/src/x/resource/types.go @@ -38,15 +38,28 @@ func (fn FinalizerFn) Finalize() { fn() } -// Closer is an object that can be closed. -type Closer interface { +// SimpleCloser is an object that can be closed. +type SimpleCloser interface { Close() } -// CloserFn is a function literal that is a closer. -type CloserFn func() +// SimpleCloserFn is a function literal that is a closer. +type SimpleCloserFn func() // Close will call the function literal as a closer. -func (fn CloserFn) Close() { +func (fn SimpleCloserFn) Close() { fn() } + +// Closer is an object that can be closed which returns an error. +type Closer interface { + Close() error +} + +// CloserFn is a function literal that is a closer which returns an error. +type CloserFn func() error + +// Close will call the function literal as a closer. +func (fn CloserFn) Close() error { + return fn() +} diff --git a/src/x/watch/source.go b/src/x/watch/source.go index 4d174e1cb2..aba93b0087 100644 --- a/src/x/watch/source.go +++ b/src/x/watch/source.go @@ -24,7 +24,7 @@ import ( "errors" "sync" - "github.com/m3db/m3/src/x/close" + xresource "github.com/m3db/m3/src/x/resource" "go.uber.org/zap" ) @@ -41,7 +41,7 @@ type SourceInput interface { // Source polls data by calling SourcePollFn and notifies its watches on updates. type Source interface { - close.SimpleCloser + xresource.SimpleCloser // Get returns the latest value. Get() interface{} diff --git a/src/x/watch/watch.go b/src/x/watch/watch.go index 42e531fc77..5b35b17578 100644 --- a/src/x/watch/watch.go +++ b/src/x/watch/watch.go @@ -25,7 +25,7 @@ import ( "errors" "sync" - xclose "github.com/m3db/m3/src/x/close" + xresource "github.com/m3db/m3/src/x/resource" ) var errClosed = errors.New("closed") @@ -34,7 +34,7 @@ type closer func() // Updatable can be updated. type Updatable interface { - xclose.SimpleCloser + xresource.SimpleCloser // C returns the notification channel for updates. C() <-chan struct{} @@ -50,7 +50,7 @@ type Watch interface { // Watchable can be watched type Watchable interface { - xclose.SimpleCloser + xresource.SimpleCloser // IsClosed returns true if the Watchable is closed IsClosed() bool