Skip to content

Commit

Permalink
grpcsync : Remove OnceFunc (grpc#8049)
Browse files Browse the repository at this point in the history
  • Loading branch information
ashishksrivastava authored Jan 30, 2025
1 parent 78eebff commit e4a0dfd
Show file tree
Hide file tree
Showing 11 changed files with 15 additions and 102 deletions.
2 changes: 1 addition & 1 deletion balancer_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ func (acbw *acBalancerWrapper) GetOrBuildProducer(pb balancer.ProducerBuilder) (
}
acbw.producersMu.Unlock()
}
return pData.producer, grpcsync.OnceFunc(unref)
return pData.producer, sync.OnceFunc(unref)
}

func (acbw *acBalancerWrapper) closeProducers() {
Expand Down
32 changes: 0 additions & 32 deletions internal/grpcsync/oncefunc.go

This file was deleted.

53 changes: 0 additions & 53 deletions internal/grpcsync/oncefunc_test.go

This file was deleted.

4 changes: 2 additions & 2 deletions internal/xds/bootstrap/tlscreds/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ import (
"errors"
"fmt"
"net"
"sync"

"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/tls/certprovider"
"google.golang.org/grpc/credentials/tls/certprovider/pemfile"
"google.golang.org/grpc/internal/grpcsync"
)

// bundle is an implementation of credentials.Bundle which implements mTLS
Expand Down Expand Up @@ -81,7 +81,7 @@ func NewBundle(jd json.RawMessage) (credentials.Bundle, func(), error) {
}
return &bundle{
transportCredentials: &reloadingCreds{provider: provider},
}, grpcsync.OnceFunc(func() { provider.Close() }), nil
}, sync.OnceFunc(func() { provider.Close() }), nil
}

func (t *bundle) TransportCredentials() credentials.TransportCredentials {
Expand Down
3 changes: 1 addition & 2 deletions orca/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/orca/internal"
"google.golang.org/grpc/status"

Expand Down Expand Up @@ -85,7 +84,7 @@ func RegisterOOBListener(sc balancer.SubConn, l OOBListener, opts OOBListenerOpt

// If stop is called multiple times, prevent it from having any effect on
// subsequent calls.
return grpcsync.OnceFunc(func() {
return sync.OnceFunc(func() {
p.unregisterListener(l, opts.ReportInterval)
closeFn()
})
Expand Down
4 changes: 2 additions & 2 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ func (s *Server) serverWorker() {
// connections to reduce the time spent overall on runtime.morestack.
func (s *Server) initServerWorkers() {
s.serverWorkerChannel = make(chan func())
s.serverWorkerChannelClose = grpcsync.OnceFunc(func() {
s.serverWorkerChannelClose = sync.OnceFunc(func() {
close(s.serverWorkerChannel)
})
for i := uint32(0); i < s.opts.numServerWorkers; i++ {
Expand Down Expand Up @@ -1930,7 +1930,7 @@ func (s *Server) stop(graceful bool) {
s.conns = nil

if s.opts.numServerWorkers > 0 {
// Closing the channel (only once, via grpcsync.OnceFunc) after all the
// Closing the channel (only once, via sync.OnceFunc) after all the
// connections have been closed above ensures that there are no
// goroutines executing the callback passed to st.HandleStreams (where
// the channel is written to).
Expand Down
4 changes: 2 additions & 2 deletions xds/internal/resolver/xds_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"encoding/json"
"fmt"
"strings"
"sync"
"testing"
"time"

Expand All @@ -32,7 +33,6 @@ import (
"github.com/google/uuid"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/grpcsync"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
Expand Down Expand Up @@ -271,7 +271,7 @@ func (s) TestResolverCloseClosesXDSClient(t *testing.T) {
Name: t.Name(),
WatchExpiryTimeout: defaultTestTimeout,
})
return c, grpcsync.OnceFunc(func() {
return c, sync.OnceFunc(func() {
close(closeCh)
cancel()
}), err
Expand Down
3 changes: 2 additions & 1 deletion xds/internal/xdsclient/authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package xdsclient
import (
"context"
"fmt"
"sync"
"sync/atomic"

"google.golang.org/grpc/grpclog"
Expand Down Expand Up @@ -674,7 +675,7 @@ func (a *authority) watchResource(rType xdsresource.Type, resourceName string, w
}

func (a *authority) unwatchResource(rType xdsresource.Type, resourceName string, watcher xdsresource.ResourceWatcher) func() {
return grpcsync.OnceFunc(func() {
return sync.OnceFunc(func() {
done := make(chan struct{})
a.xdsClientSerializer.ScheduleOr(func(context.Context) {
defer close(done)
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/xdsclient/clientimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func (c *clientImpl) getOrCreateChannel(serverConfig *bootstrap.ServerConfig, in
// reference to the xdsChannel. This returned function is idempotent, meaning
// it can be called multiple times without any additional effect.
func (c *clientImpl) releaseChannel(serverConfig *bootstrap.ServerConfig, state *channelState, deInitLocked func(*channelState)) func() {
return grpcsync.OnceFunc(func() {
return sync.OnceFunc(func() {
c.channelsMu.Lock()

if c.logger.V(2) {
Expand Down
7 changes: 3 additions & 4 deletions xds/internal/xdsclient/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/xds/bootstrap"
)

Expand Down Expand Up @@ -131,7 +130,7 @@ func (p *Pool) GetClientForTesting(name string) (XDSClient, func(), error) {
return nil, nil, fmt.Errorf("xds:: xDS client with name %q not found", name)
}
c.incrRef()
return c, grpcsync.OnceFunc(func() { p.clientRefCountedClose(name) }), nil
return c, sync.OnceFunc(func() { p.clientRefCountedClose(name) }), nil
}

// SetFallbackBootstrapConfig is used to specify a bootstrap configuration
Expand Down Expand Up @@ -193,7 +192,7 @@ func (p *Pool) newRefCounted(name string, watchExpiryTimeout time.Duration, stre

if c := p.clients[name]; c != nil {
c.incrRef()
return c, grpcsync.OnceFunc(func() { p.clientRefCountedClose(name) }), nil
return c, sync.OnceFunc(func() { p.clientRefCountedClose(name) }), nil
}

c, err := newClientImpl(p.config, watchExpiryTimeout, streamBackoff)
Expand All @@ -208,5 +207,5 @@ func (p *Pool) newRefCounted(name string, watchExpiryTimeout time.Duration, stre
xdsClientImplCreateHook(name)

logger.Infof("xDS node ID: %s", p.config.Node().GetId())
return client, grpcsync.OnceFunc(func() { p.clientRefCountedClose(name) }), nil
return client, sync.OnceFunc(func() { p.clientRefCountedClose(name) }), nil
}
3 changes: 1 addition & 2 deletions xds/internal/xdsclient/transport/lrs/lrs_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/backoff"
igrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/xdsclient/load"
Expand Down Expand Up @@ -107,7 +106,7 @@ func (lrs *StreamImpl) ReportLoad() (*load.Store, func()) {
lrs.mu.Lock()
defer lrs.mu.Unlock()

cleanup := grpcsync.OnceFunc(func() {
cleanup := sync.OnceFunc(func() {
lrs.mu.Lock()
defer lrs.mu.Unlock()

Expand Down

0 comments on commit e4a0dfd

Please sign in to comment.