diff --git a/clientv3/naming/grpc_test.go b/clientv3/naming/grpc_test.go index 8f1a38c46320..2cdb0df55f38 100644 --- a/clientv3/naming/grpc_test.go +++ b/clientv3/naming/grpc_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package naming +package naming_test import ( "context" @@ -21,6 +21,7 @@ import ( "testing" etcd "go.etcd.io/etcd/v3/clientv3" + namingv3 "go.etcd.io/etcd/v3/clientv3/naming" "go.etcd.io/etcd/v3/integration" "go.etcd.io/etcd/v3/pkg/testutil" @@ -33,7 +34,7 @@ func TestGRPCResolver(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) - r := GRPCResolver{ + r := namingv3.GRPCResolver{ Client: clus.RandClient(), } @@ -107,7 +108,7 @@ func TestGRPCResolverMulti(t *testing.T) { t.Fatal(err) } - r := GRPCResolver{c} + r := namingv3.GRPCResolver{c} w, err := r.Resolve("foo") if err != nil { diff --git a/etcdmain/grpc_proxy.go b/etcdmain/grpc_proxy.go index 3da3a6879ea7..6ed38ee4f08b 100644 --- a/etcdmain/grpc_proxy.go +++ b/etcdmain/grpc_proxy.go @@ -398,12 +398,15 @@ func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server { } kvp, _ := grpcproxy.NewKvProxy(client) - watchp, _ := grpcproxy.NewWatchProxy(lg, client) + watchp, _ := grpcproxy.NewWatchProxy(client.Ctx(), lg, client) if grpcProxyResolverPrefix != "" { grpcproxy.Register(lg, client, grpcProxyResolverPrefix, grpcProxyAdvertiseClientURL, grpcProxyResolverTTL) } clusterp, _ := grpcproxy.NewClusterProxy(lg, client, grpcProxyAdvertiseClientURL, grpcProxyResolverPrefix) - leasep, _ := grpcproxy.NewLeaseProxy(client) + leasep, _ := grpcproxy.NewLeaseProxy(client.Ctx(), client) + + // grpcproxy. + mainp := grpcproxy.NewMaintenanceProxy(client) authp := grpcproxy.NewAuthProxy(client) electionp := grpcproxy.NewElectionProxy(client) diff --git a/integration/cluster_proxy.go b/integration/cluster_proxy.go index b5e2b9571132..5d439b29a46b 100644 --- a/integration/cluster_proxy.go +++ b/integration/cluster_proxy.go @@ -17,6 +17,7 @@ package integration import ( + "context" "sync" "go.etcd.io/etcd/v3/clientv3" @@ -37,16 +38,23 @@ var ( const proxyNamespace = "proxy-namespace" type grpcClientProxy struct { - grpc grpcAPI - wdonec <-chan struct{} - kvdonec <-chan struct{} - lpdonec <-chan struct{} + ctx context.Context + ctxCancel func() + grpc grpcAPI + wdonec <-chan struct{} + kvdonec <-chan struct{} + lpdonec <-chan struct{} } func toGRPC(c *clientv3.Client) grpcAPI { pmu.Lock() defer pmu.Unlock() + // dedicated context bound to 'grpc-proxy' lifetype + // (so in practice lifetime of the client connection to the proxy). + // TODO: Refactor to a separate clientv3.Client instance instead of the context alone. + ctx, ctxCancel := context.WithCancel(context.WithValue(context.TODO(), "_name", "grpcProxyContext")) + lg := zap.NewExample() if v, ok := proxies[c]; ok { @@ -59,8 +67,8 @@ func toGRPC(c *clientv3.Client) grpcAPI { c.Lease = namespace.NewLease(c.Lease, proxyNamespace) // test coalescing/caching proxy kvp, kvpch := grpcproxy.NewKvProxy(c) - wp, wpch := grpcproxy.NewWatchProxy(lg, c) - lp, lpch := grpcproxy.NewLeaseProxy(c) + wp, wpch := grpcproxy.NewWatchProxy(ctx, lg, c) + lp, lpch := grpcproxy.NewLeaseProxy(ctx, c) mp := grpcproxy.NewMaintenanceProxy(c) clp, _ := grpcproxy.NewClusterProxy(lg, c, "", "") // without registering proxy URLs authp := grpcproxy.NewAuthProxy(c) @@ -77,20 +85,21 @@ func toGRPC(c *clientv3.Client) grpcAPI { adapter.LockServerToLockClient(lockp), adapter.ElectionServerToElectionClient(electp), } - proxies[c] = grpcClientProxy{grpc: grpc, wdonec: wpch, kvdonec: kvpch, lpdonec: lpch} + proxies[c] = grpcClientProxy{ctx: ctx, ctxCancel: ctxCancel, grpc: grpc, wdonec: wpch, kvdonec: kvpch, lpdonec: lpch} return grpc } type proxyCloser struct { clientv3.Watcher - wdonec <-chan struct{} - kvdonec <-chan struct{} - lclose func() - lpdonec <-chan struct{} + proxyCtxCancel func() + wdonec <-chan struct{} + kvdonec <-chan struct{} + lclose func() + lpdonec <-chan struct{} } func (pc *proxyCloser) Close() error { - // client ctx is canceled before calling close, so kv and lp will close out + pc.proxyCtxCancel() <-pc.kvdonec err := pc.Watcher.Close() <-pc.wdonec @@ -110,11 +119,12 @@ func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) { lc := c.Lease c.Lease = clientv3.NewLeaseFromLeaseClient(rpc.Lease, c, cfg.DialTimeout) c.Watcher = &proxyCloser{ - Watcher: clientv3.NewWatchFromWatchClient(rpc.Watch, c), - wdonec: proxies[c].wdonec, - kvdonec: proxies[c].kvdonec, - lclose: func() { lc.Close() }, - lpdonec: proxies[c].lpdonec, + Watcher: clientv3.NewWatchFromWatchClient(rpc.Watch, c), + wdonec: proxies[c].wdonec, + kvdonec: proxies[c].kvdonec, + lclose: func() { lc.Close() }, + lpdonec: proxies[c].lpdonec, + proxyCtxCancel: proxies[c].ctxCancel, } pmu.Unlock() return c, nil diff --git a/proxy/grpcproxy/lease.go b/proxy/grpcproxy/lease.go index 48bcd663378d..f10acf60a049 100644 --- a/proxy/grpcproxy/lease.go +++ b/proxy/grpcproxy/lease.go @@ -48,13 +48,13 @@ type leaseProxy struct { wg sync.WaitGroup } -func NewLeaseProxy(c *clientv3.Client) (pb.LeaseServer, <-chan struct{}) { - cctx, cancel := context.WithCancel(c.Ctx()) +func NewLeaseProxy(ctx context.Context, c *clientv3.Client) (pb.LeaseServer, <-chan struct{}) { + cctx, cancel := context.WithCancel(ctx) lp := &leaseProxy{ leaseClient: pb.NewLeaseClient(c.ActiveConnection()), lessor: c.Lease, ctx: cctx, - leader: newLeader(c.Ctx(), c.Watcher), + leader: newLeader(cctx, c.Watcher), } ch := make(chan struct{}) go func() { diff --git a/proxy/grpcproxy/watch.go b/proxy/grpcproxy/watch.go index bbde0ad591ec..146b466a64c9 100644 --- a/proxy/grpcproxy/watch.go +++ b/proxy/grpcproxy/watch.go @@ -48,12 +48,12 @@ type watchProxy struct { lg *zap.Logger } -func NewWatchProxy(lg *zap.Logger, c *clientv3.Client) (pb.WatchServer, <-chan struct{}) { - cctx, cancel := context.WithCancel(c.Ctx()) +func NewWatchProxy(ctx context.Context, lg *zap.Logger, c *clientv3.Client) (pb.WatchServer, <-chan struct{}) { + cctx, cancel := context.WithCancel(ctx) wp := &watchProxy{ cw: c.Watcher, ctx: cctx, - leader: newLeader(c.Ctx(), c.Watcher), + leader: newLeader(cctx, c.Watcher), kv: c.KV, // for permission checking lg: lg,