diff --git a/x/mongo/driver/topology/rtt_monitor.go b/x/mongo/driver/topology/rtt_monitor.go index 07f508caae..c7b168dc2c 100644 --- a/x/mongo/driver/topology/rtt_monitor.go +++ b/x/mongo/driver/topology/rtt_monitor.go @@ -56,7 +56,6 @@ type rttMonitor struct { cfg *rttConfig ctx context.Context cancelFn context.CancelFunc - started bool } var _ driver.RTTMonitor = &rttMonitor{} @@ -83,7 +82,6 @@ func (r *rttMonitor) connect() { r.connMu.Lock() defer r.connMu.Unlock() - r.started = true r.closeWg.Add(1) go func() { @@ -97,10 +95,6 @@ func (r *rttMonitor) disconnect() { r.connMu.Lock() defer r.connMu.Unlock() - if !r.started { - return - } - r.cancelFn() // Wait for the existing connection to complete. diff --git a/x/mongo/driver/topology/server.go b/x/mongo/driver/topology/server.go index f4c6d744aa..99f8dd618b 100644 --- a/x/mongo/driver/topology/server.go +++ b/x/mongo/driver/topology/server.go @@ -125,6 +125,7 @@ type Server struct { processErrorLock sync.Mutex rttMonitor *rttMonitor + monitorOnce sync.Once } // updateTopologyCallback is a callback used to create a server that should be called when the parent Topology instance @@ -285,10 +286,10 @@ func (s *Server) Disconnect(ctx context.Context) error { close(s.done) s.cancelCheck() - s.rttMonitor.disconnect() s.pool.close(ctx) s.closewg.Wait() + s.rttMonitor.disconnect() atomic.StoreInt64(&s.state, serverDisconnected) return nil @@ -661,8 +662,8 @@ func (s *Server) update() { transitionedFromNetworkError := desc.LastError != nil && unwrapConnectionError(desc.LastError) != nil && previousDescription.Kind != description.Unknown - if isStreamingEnabled(s) && isStreamable(s) && !s.rttMonitor.started { - s.rttMonitor.connect() + if isStreamingEnabled(s) && isStreamable(s) { + s.monitorOnce.Do(s.rttMonitor.connect) } if isStreamable(s) || connectionIsStreaming || transitionedFromNetworkError {