diff --git a/cmd/ncproxy/ncproxy.go b/cmd/ncproxy/ncproxy.go index 9cd2a2986d..abd79f9070 100644 --- a/cmd/ncproxy/ncproxy.go +++ b/cmd/ncproxy/ncproxy.go @@ -19,7 +19,6 @@ import ( "github.com/Microsoft/hcsshim/pkg/octtrpc" "github.com/containerd/ttrpc" "github.com/pkg/errors" - bolt "go.etcd.io/bbolt" "go.opencensus.io/trace" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -44,25 +43,55 @@ func newComputeAgentCache() *computeAgentCache { } } -func (c *computeAgentCache) get(cid string) (*computeAgentClient, bool) { +func (c *computeAgentCache) getAllAndClear() ([]*computeAgentClient, error) { + // set c.cache to nil first so that subsequent attempts to reads and writes + // return an error + c.rw.Lock() + cacheCopy := c.cache + c.cache = nil + c.rw.Unlock() + + if cacheCopy == nil { + return nil, errors.New("cannot read from a nil cache") + } + + results := []*computeAgentClient{} + for _, agent := range cacheCopy { + results = append(results, agent) + } + return results, nil + +} + +func (c *computeAgentCache) get(cid string) (*computeAgentClient, error) { c.rw.RLock() defer c.rw.RUnlock() - result, ok := c.cache[cid] - return result, ok + if c.cache == nil { + return nil, errors.New("cannot read from a nil cache") + } + result := c.cache[cid] + return result, nil } -func (c *computeAgentCache) put(cid string, agent *computeAgentClient) { +func (c *computeAgentCache) put(cid string, agent *computeAgentClient) error { c.rw.Lock() defer c.rw.Unlock() + if c.cache == nil { + return errors.New("cannot write to a nil cache") + } c.cache[cid] = agent + return nil } -func (c *computeAgentCache) getAndDelete(cid string) (*computeAgentClient, bool) { +func (c *computeAgentCache) getAndDelete(cid string) (*computeAgentClient, error) { c.rw.Lock() defer c.rw.Unlock() - result, ok := c.cache[cid] + if c.cache == nil { + return nil, errors.New("cannot read from a nil cache") + } + result := c.cache[cid] delete(c.cache, cid) - return result, ok + return result, nil } // GRPC service exposed for use by a Node Network Service. @@ -91,7 +120,11 @@ func (s *grpcService) AddNIC(ctx context.Context, req *ncproxygrpc.AddNICRequest if req.ContainerID == "" || req.EndpointName == "" || req.NicID == "" { return nil, status.Errorf(codes.InvalidArgument, "received empty field in request: %+v", req) } - if agent, ok := s.containerIDToComputeAgent.get(req.ContainerID); ok { + agent, err := s.containerIDToComputeAgent.get(req.ContainerID) + if err != nil { + return nil, err + } + if agent != nil { caReq := &computeagent.AddNICInternalRequest{ ContainerID: req.ContainerID, NicID: req.NicID, @@ -121,7 +154,11 @@ func (s *grpcService) ModifyNIC(ctx context.Context, req *ncproxygrpc.ModifyNICR return nil, status.Error(codes.InvalidArgument, "received empty field in request") } - if agent, ok := s.containerIDToComputeAgent.get(req.ContainerID); ok { + agent, err := s.containerIDToComputeAgent.get(req.ContainerID) + if err != nil { + return nil, err + } + if agent != nil { caReq := &computeagent.ModifyNICInternalRequest{ NicID: req.NicID, EndpointName: req.EndpointName, @@ -197,7 +234,11 @@ func (s *grpcService) DeleteNIC(ctx context.Context, req *ncproxygrpc.DeleteNICR if req.ContainerID == "" || req.EndpointName == "" || req.NicID == "" { return nil, status.Errorf(codes.InvalidArgument, "received empty field in request: %+v", req) } - if agent, ok := s.containerIDToComputeAgent.get(req.ContainerID); ok { + agent, err := s.containerIDToComputeAgent.get(req.ContainerID) + if err != nil { + return nil, err + } + if agent != nil { caReq := &computeagent.DeleteNICInternalRequest{ ContainerID: req.ContainerID, NicID: req.NicID, @@ -608,73 +649,22 @@ func (s *grpcService) GetNetworks(ctx context.Context, req *ncproxygrpc.GetNetwo // TTRPC service exposed for use by the shim. type ttrpcService struct { containerIDToComputeAgent *computeAgentCache - // database for containerID to compute agent address - agentStore *computeAgentStore + agentStore *computeAgentStore } -func newTTRPCService(ctx context.Context, agent *computeAgentCache, db *bolt.DB) *ttrpcService { - agentStore := newComputeAgentStore(db) +func newTTRPCService(ctx context.Context, agent *computeAgentCache, agentStore *computeAgentStore) *ttrpcService { return &ttrpcService{ containerIDToComputeAgent: agent, agentStore: agentStore, } } -func (s *ttrpcService) reconnectComputeAgents(ctx context.Context) { - computeAgentMap, err := s.agentStore.getActiveComputeAgents(ctx) - if err != nil && errors.Is(err, errBucketNotFound) { - // no entries in the database yet, return early - log.G(ctx).WithError(err).Debug("no entries in database") - return - } else if err != nil { - log.G(ctx).WithError(err).Error("failed to get compute agent information") - } - var wg sync.WaitGroup - for cid, addr := range computeAgentMap { - wg.Add(1) - go func(agentAddress, containerID string) { - defer wg.Done() - service, err := getComputeAgentClient(agentAddress) - if err != nil { - // can't connect to compute agent, remove entry in database - log.G(ctx).WithField("agentAddress", agentAddress).WithError(err).Error("failed to create new compute agent client") - dErr := s.agentStore.deleteComputeAgent(ctx, containerID) - if dErr != nil { - log.G(ctx).WithField("key", containerID).WithError(dErr).Warn("failed to delete key from compute agent store") - } - return - } - log.G(ctx).WithField("containerID", containerID).Info("reconnected to container's compute agent") - - // connection succeeded, add entry in cache map for later - s.containerIDToComputeAgent.put(containerID, service) - }(addr, cid) - } - - wg.Wait() -} - -// disconnectComputeAgents clears the cache of compute agent clients and cleans up -// their resources. -func disconnectComputeAgents(ctx context.Context, containerIDToComputeAgent *computeAgentCache) error { - agents, err := containerIDToComputeAgent.getAllAndClear() - if err != nil { - return errors.Wrapf(err, "failed to get all cached compute agent clients") - } - for _, agent := range agents { - if err := agent.Close(); err != nil { - log.G(ctx).WithError(err).Error("failed to close compute agent connection") - } - } - return nil -} - func getComputeAgentClient(agentAddr string) (*computeAgentClient, error) { conn, err := winioDialPipe(agentAddr, nil) if err != nil { return nil, errors.Wrap(err, "failed to connect to compute agent service") } - raw := ttrpc.NewClient( + raw := ttrpcNewClient( conn, ttrpc.WithUnaryClientInterceptor(octtrpc.ClientInterceptor()), ttrpc.WithOnClose(func() { conn.Close() }), @@ -700,9 +690,11 @@ func (s *ttrpcService) RegisterComputeAgent(ctx context.Context, req *ncproxyttr return nil, err } - // Add to global client map if connection succeeds. Don't check if there's already a map entry + // Add to client cache if connection succeeds. Don't check if there's already a map entry // just overwrite as the client may have changed the address of the config agent. - s.containerIDToComputeAgent.put(req.ContainerID, agent) + if err := s.containerIDToComputeAgent.put(req.ContainerID, agent); err != nil { + return nil, err + } return &ncproxyttrpc.RegisterComputeAgentResponse{}, nil } @@ -721,7 +713,11 @@ func (s *ttrpcService) UnregisterComputeAgent(ctx context.Context, req *ncproxyt } // remove the agent from the cache and return it so we can clean up its resources as well - if agent, ok := s.containerIDToComputeAgent.getAndDelete(req.ContainerID); ok { + agent, err := s.containerIDToComputeAgent.getAndDelete(req.ContainerID) + if err != nil { + return nil, err + } + if agent != nil { if err := agent.Close(); err != nil { return nil, err } diff --git a/cmd/ncproxy/run.go b/cmd/ncproxy/run.go index ea5750613f..fd7611c1ca 100644 --- a/cmd/ncproxy/run.go +++ b/cmd/ncproxy/run.go @@ -22,7 +22,6 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/urfave/cli" - bolt "go.etcd.io/bbolt" "go.opencensus.io/plugin/ocgrpc" "go.opencensus.io/trace" "google.golang.org/grpc" @@ -40,6 +39,9 @@ type computeAgentClient struct { } func (c *computeAgentClient) Close() error { + if c.raw == nil { + return nil + } return c.raw.Close() } @@ -229,11 +231,6 @@ func run(clicontext *cli.Context) error { } } - db, err := bolt.Open(dbPath, 0600, nil) - if err != nil { - return err - } - log.G(ctx).WithFields(logrus.Fields{ "TTRPCAddr": conf.TTRPCAddr, "NodeNetSvcAddr": conf.NodeNetSvcAddr, @@ -247,12 +244,13 @@ func run(clicontext *cli.Context) error { defer signal.Stop(sigChan) // Create new server and then register NetworkConfigProxyServices. - server, err := newServer(ctx, conf) + server, err := newServer(ctx, conf, dbPath) if err != nil { return errors.New("failed to make new ncproxy server") } + defer server.cleanupResources(ctx) - ttrpcListener, grpcListener, err := server.setup(ctx, db) + ttrpcListener, grpcListener, err := server.setup(ctx) if err != nil { return errors.New("failed to setup ncproxy server") } @@ -272,9 +270,7 @@ func run(clicontext *cli.Context) error { } // Cancel inflight requests and shutdown services - if err := server.gracefulShutdown(ctx); err != nil { - return errors.Wrap(err, "ncproxy failed to shutdown gracefully") - } + server.gracefulShutdown(ctx) return nil } diff --git a/cmd/ncproxy/server.go b/cmd/ncproxy/server.go index 53f796c826..d2d0ec6985 100644 --- a/cmd/ncproxy/server.go +++ b/cmd/ncproxy/server.go @@ -4,6 +4,7 @@ import ( "context" "net" "strings" + "sync" "github.com/Microsoft/go-winio" "github.com/Microsoft/hcsshim/cmd/ncproxy/ncproxygrpc" @@ -11,7 +12,9 @@ import ( "github.com/Microsoft/hcsshim/internal/ncproxyttrpc" "github.com/Microsoft/hcsshim/pkg/octtrpc" "github.com/containerd/ttrpc" + "github.com/pkg/errors" "github.com/sirupsen/logrus" + bolt "go.etcd.io/bbolt" "go.opencensus.io/plugin/ocgrpc" "google.golang.org/grpc" ) @@ -20,28 +23,42 @@ type server struct { ttrpc *ttrpc.Server grpc *grpc.Server conf *config + + // store shared data on server for cleaning up later + // database for containerID to compute agent address + agentStore *computeAgentStore + // cache of container IDs to compute agent clients + cache *computeAgentCache } -func newServer(ctx context.Context, conf *config) (*server, error) { +func newServer(ctx context.Context, conf *config, dbPath string) (*server, error) { + db, err := bolt.Open(dbPath, 0600, nil) + if err != nil { + return nil, err + } + agentStore := newComputeAgentStore(db) + agentCache := newComputeAgentCache() + reconnectComputeAgents(ctx, agentStore, agentCache) + ttrpcServer, err := ttrpc.NewServer(ttrpc.WithUnaryServerInterceptor(octtrpc.ServerInterceptor())) if err != nil { log.G(ctx).WithError(err).Error("failed to create ttrpc server") return nil, err } return &server{ - grpc: grpc.NewServer(grpc.StatsHandler(&ocgrpc.ServerHandler{})), - ttrpc: ttrpcServer, - conf: conf, + grpc: grpc.NewServer(grpc.StatsHandler(&ocgrpc.ServerHandler{})), + ttrpc: ttrpcServer, + conf: conf, + agentStore: agentStore, + cache: agentCache, }, nil } func (s *server) setup(ctx context.Context) (net.Listener, net.Listener, error) { - agentCache := newComputeAgentCache() - gService := newGRPCService(agentCache) + gService := newGRPCService(s.cache) ncproxygrpc.RegisterNetworkConfigProxyServer(s.grpc, gService) - tService := newTTRPCService(ctx, agentCache, s.db) - tService.reconnectComputeAgents(ctx) + tService := newTTRPCService(ctx, s.cache, s.agentStore) ncproxyttrpc.RegisterNetworkConfigProxyService(s.ttrpc, tService) ttrpcListener, err := winio.ListenPipe(s.conf.TTRPCAddr, nil) @@ -58,9 +75,22 @@ func (s *server) setup(ctx context.Context) (net.Listener, net.Listener, error) return ttrpcListener, grpcListener, nil } -func (s *server) gracefulShutdown(ctx context.Context) error { +// best effort graceful shutdown of the grpc and ttrpc servers +func (s *server) gracefulShutdown(ctx context.Context) { s.grpc.GracefulStop() - return s.ttrpc.Shutdown(ctx) + if err := s.ttrpc.Shutdown(ctx); err != nil { + log.G(ctx).WithError(err).Error("failed to gracefully shutdown ttrpc server") + } +} + +// best effort cleanup resources belonging to the server +func (s *server) cleanupResources(ctx context.Context) { + if err := disconnectComputeAgents(ctx, s.cache); err != nil { + log.G(ctx).WithError(err).Error("failed to disconnect connections in compute agent cache") + } + if err := s.agentStore.Close(); err != nil { + log.G(ctx).WithError(err).Error("failed to close ncproxy compute agent database") + } } func trapClosedConnErr(err error) error { @@ -89,3 +119,59 @@ func (s *server) serve(ctx context.Context, ttrpcListener net.Listener, grpcList serveErr <- trapClosedConnErr(s.grpc.Serve(grpcListener)) }() } + +// reconnectComputeAgents creates new compute agent connections from the database of +// active compute agent addresses and adds them to the compute agent client cache +// this MUST be called before the server start serving anything so that we can +// ensure that the cache is ready when they do. +func reconnectComputeAgents(ctx context.Context, agentStore *computeAgentStore, agentCache *computeAgentCache) { + computeAgentMap, err := agentStore.getActiveComputeAgents(ctx) + if err != nil && errors.Is(err, errBucketNotFound) { + // no entries in the database yet, return early + log.G(ctx).WithError(err).Debug("no entries in database") + return + } else if err != nil { + log.G(ctx).WithError(err).Error("failed to get compute agent information") + } + + var wg sync.WaitGroup + for cid, addr := range computeAgentMap { + wg.Add(1) + go func(agentAddress, containerID string) { + defer wg.Done() + service, err := getComputeAgentClient(agentAddress) + if err != nil { + // can't connect to compute agent, remove entry in database + log.G(ctx).WithField("agentAddress", agentAddress).WithError(err).Error("failed to create new compute agent client") + dErr := agentStore.deleteComputeAgent(ctx, containerID) + if dErr != nil { + log.G(ctx).WithField("key", containerID).WithError(dErr).Warn("failed to delete key from compute agent store") + } + return + } + log.G(ctx).WithField("containerID", containerID).Info("reconnected to container's compute agent") + + // connection succeeded, add entry in cache map for later + // since the servers have not started running, we know that the cache cannot be empty + // which would only happen on a call to `disconnectComputeAgents`, ignore error + _ = agentCache.put(containerID, service) + }(addr, cid) + } + + wg.Wait() +} + +// disconnectComputeAgents clears the cache of compute agent clients and cleans up +// their resources. +func disconnectComputeAgents(ctx context.Context, containerIDToComputeAgent *computeAgentCache) error { + agents, err := containerIDToComputeAgent.getAllAndClear() + if err != nil { + return errors.Wrapf(err, "failed to get all cached compute agent clients") + } + for _, agent := range agents { + if err := agent.Close(); err != nil { + log.G(ctx).WithError(err).Error("failed to close compute agent connection") + } + } + return nil +} diff --git a/cmd/ncproxy/server_test.go b/cmd/ncproxy/server_test.go index 2668a2cfc8..63fdcc7aaf 100644 --- a/cmd/ncproxy/server_test.go +++ b/cmd/ncproxy/server_test.go @@ -2,7 +2,10 @@ package main import ( "context" + "io/ioutil" "net" + "os" + "path/filepath" "strconv" "strings" "testing" @@ -17,6 +20,8 @@ import ( "github.com/Microsoft/hcsshim/osversion" "github.com/containerd/ttrpc" "github.com/golang/mock/gomock" + "github.com/pkg/errors" + bolt "go.etcd.io/bbolt" ) func exists(target string, list []string) bool { @@ -108,9 +113,12 @@ func TestAddNIC(t *testing.T) { computeAgentCtrl := gomock.NewController(t) defer computeAgentCtrl.Finish() mockedService := ncproxyMock.NewMockComputeAgentService(computeAgentCtrl) + mockedAgentClient := &computeAgentClient{nil, mockedService} // put mocked compute agent in agent cache for test - agentCache.put(containerID, mockedService) + if err := agentCache.put(containerID, mockedAgentClient); err != nil { + t.Fatal(err) + } // setup expected mocked calls mockedService.EXPECT().AddNIC(gomock.Any(), gomock.Any()).Return(&computeagent.AddNICInternalResponse{}, nil).AnyTimes() @@ -190,9 +198,12 @@ func TestDeleteNIC(t *testing.T) { computeAgentCtrl := gomock.NewController(t) defer computeAgentCtrl.Finish() mockedService := ncproxyMock.NewMockComputeAgentService(computeAgentCtrl) + mockedAgentClient := &computeAgentClient{nil, mockedService} // put mocked compute agent in agent cache for test - agentCache.put(containerID, mockedService) + if err := agentCache.put(containerID, mockedAgentClient); err != nil { + t.Fatal(err) + } // setup expected mocked calls mockedService.EXPECT().DeleteNIC(gomock.Any(), gomock.Any()).Return(&computeagent.DeleteNICInternalResponse{}, nil).AnyTimes() @@ -274,9 +285,12 @@ func TestModifyNIC(t *testing.T) { computeAgentCtrl := gomock.NewController(t) defer computeAgentCtrl.Finish() mockedService := ncproxyMock.NewMockComputeAgentService(computeAgentCtrl) + mockedAgentClient := &computeAgentClient{nil, mockedService} // populate agent cache with mocked service for test - agentCache.put(containerID, mockedService) + if err := agentCache.put(containerID, mockedAgentClient); err != nil { + t.Fatal(err) + } // setup expected mocked calls mockedService.EXPECT().ModifyNIC(gomock.Any(), gomock.Any()).Return(&computeagent.ModifyNICInternalResponse{}, nil).AnyTimes() @@ -1016,9 +1030,23 @@ func TestGetNetworks_NoError(t *testing.T) { func TestRegisterComputeAgent(t *testing.T) { ctx := context.Background() - // setup test ncproxy ttrpc service + // setup test database + tempDir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tempDir) + + db, err := bolt.Open(filepath.Join(tempDir, "networkproxy.db.test"), 0600, nil) + if err != nil { + t.Fatal(err) + } + defer db.Close() + + // create test TTRPC service + store := newComputeAgentStore(db) agentCache := newComputeAgentCache() - tService := newTTRPCService(agentCache) + tService := newTTRPCService(ctx, agentCache, store) // setup mocked calls winioDialPipe = func(path string, timeout *time.Duration) (net.Conn, error) { @@ -1038,18 +1066,36 @@ func TestRegisterComputeAgent(t *testing.T) { t.Fatalf("expected to get no error, instead got %v", err) } - // validate that the entry was added to the agent cache - if _, exists := agentCache.get(containerID); !exists { - t.Fatalf("compute agent client was not put into agent cache") + // validate that the entry was added to the agent + actual, err := agentCache.get(containerID) + if err != nil { + t.Fatalf("failed to get the agent entry %v", err) + } + if actual == nil { + t.Fatal("compute agent client was not put into agent cache") } } func TestConfigureNetworking(t *testing.T) { ctx := context.Background() - // setup test ncproxy ttrpc service + // setup test database + tempDir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tempDir) + + db, err := bolt.Open(filepath.Join(tempDir, "networkproxy.db.test"), 0600, nil) + if err != nil { + t.Fatal(err) + } + defer db.Close() + + // create test TTRPC service + store := newComputeAgentStore(db) agentCache := newComputeAgentCache() - tService := newTTRPCService(agentCache) + tService := newTTRPCService(ctx, agentCache, store) // setup mocked client and mocked calls for nodenetsvc nodeNetCtrl := gomock.NewController(t) @@ -1111,3 +1157,136 @@ func TestConfigureNetworking(t *testing.T) { }) } } + +func TestReconnectComputeAgents_Success(t *testing.T) { + ctx := context.Background() + + // setup test database + tempDir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tempDir) + + db, err := bolt.Open(filepath.Join(tempDir, "networkproxy.db.test"), 0600, nil) + if err != nil { + t.Fatal(err) + } + defer db.Close() + + // create test TTRPC service + store := newComputeAgentStore(db) + agentCache := newComputeAgentCache() + + // setup mocked calls + winioDialPipe = func(path string, timeout *time.Duration) (net.Conn, error) { + rPipe, _ := net.Pipe() + return rPipe, nil + } + ttrpcNewClient = func(conn net.Conn, opts ...ttrpc.ClientOpts) *ttrpc.Client { + return &ttrpc.Client{} + } + + // add test entry in database + containerID := "fake-container-id" + address := "123412341234" + + if err := store.updateComputeAgent(ctx, containerID, address); err != nil { + t.Fatal(err) + } + + reconnectComputeAgents(ctx, store, agentCache) + + // validate that the agent cache has the entry now + actualClient, err := agentCache.get(containerID) + if err != nil { + t.Fatal(err) + } + if actualClient == nil { + t.Fatal("no entry added on reconnect to agent client cache") + } +} + +func TestReconnectComputeAgents_Failure(t *testing.T) { + ctx := context.Background() + + // setup test database + tempDir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tempDir) + + db, err := bolt.Open(filepath.Join(tempDir, "networkproxy.db.test"), 0600, nil) + if err != nil { + t.Fatal(err) + } + defer db.Close() + + // create test TTRPC service + store := newComputeAgentStore(db) + agentCache := newComputeAgentCache() + + // setup mocked calls + winioDialPipe = func(path string, timeout *time.Duration) (net.Conn, error) { + // this will cause the reconnect compute agents call to run into an error + // trying to reconnect to the fake container address + return nil, errors.New("fake error") + } + ttrpcNewClient = func(conn net.Conn, opts ...ttrpc.ClientOpts) *ttrpc.Client { + return &ttrpc.Client{} + } + + // add test entry in database + containerID := "fake-container-id" + address := "123412341234" + + if err := store.updateComputeAgent(ctx, containerID, address); err != nil { + t.Fatal(err) + } + + reconnectComputeAgents(ctx, store, agentCache) + + // validate that the agent cache does NOT have an entry + actualClient, err := agentCache.get(containerID) + if err != nil { + t.Fatal(err) + } + if actualClient != nil { + t.Fatalf("expected no entry on failure, instead found %v", actualClient) + } + + // validate that the agent store no longer has an entry for this container + value, err := store.getComputeAgent(ctx, containerID) + if err == nil { + t.Fatalf("expected an error, instead found value %s", value) + } +} + +func TestDisconnectComputeAgents(t *testing.T) { + ctx := context.Background() + containerID := "fake-container-id" + + agentCache := newComputeAgentCache() + + // create mocked compute agent service + computeAgentCtrl := gomock.NewController(t) + defer computeAgentCtrl.Finish() + mockedService := ncproxyMock.NewMockComputeAgentService(computeAgentCtrl) + mockedAgentClient := &computeAgentClient{nil, mockedService} + + // put mocked compute agent in agent cache for test + if err := agentCache.put(containerID, mockedAgentClient); err != nil { + t.Fatal(err) + } + + if err := disconnectComputeAgents(ctx, agentCache); err != nil { + t.Fatal(err) + } + + // validate there is no longer an entry for the compute agent client + actual, err := agentCache.get(containerID) + if err == nil { + t.Fatalf("expected to find the cache empty, instead found %v", actual) + } +} diff --git a/cmd/ncproxy/store.go b/cmd/ncproxy/store.go index 8c6ed588fa..4a045b7de5 100644 --- a/cmd/ncproxy/store.go +++ b/cmd/ncproxy/store.go @@ -22,10 +22,14 @@ func newComputeAgentStore(db *bolt.DB) *computeAgentStore { return &computeAgentStore{db: db} } -// get returns the compute agent address of a single entry in the database for key `containerID` +func (c *computeAgentStore) Close() error { + return c.db.Close() +} + +// getComputeAgent returns the compute agent address of a single entry in the database for key `containerID` // or returns an error if the key does not exist -func (n *computeAgentStore) getComputeAgent(ctx context.Context, containerID string) (result string, err error) { - if err := n.db.View(func(tx *bolt.Tx) error { +func (c *computeAgentStore) getComputeAgent(ctx context.Context, containerID string) (result string, err error) { + if err := c.db.View(func(tx *bolt.Tx) error { bkt := getComputeAgentBucket(tx) if bkt == nil { return errors.Wrapf(errBucketNotFound, "bucket %v", bucketKeyComputeAgent) @@ -46,9 +50,9 @@ func (n *computeAgentStore) getComputeAgent(ctx context.Context, containerID str // getActiveComputeAgents returns a map of the key value pairs stored in the database // where the keys are the containerIDs and the values are the corresponding compute agent // server addresses -func (n *computeAgentStore) getActiveComputeAgents(ctx context.Context) (map[string]string, error) { +func (c *computeAgentStore) getActiveComputeAgents(ctx context.Context) (map[string]string, error) { content := map[string]string{} - if err := n.db.View(func(tx *bolt.Tx) error { + if err := c.db.View(func(tx *bolt.Tx) error { bkt := getComputeAgentBucket(tx) if bkt == nil { return errors.Wrapf(errBucketNotFound, "bucket %v", bucketKeyComputeAgent) @@ -67,8 +71,8 @@ func (n *computeAgentStore) getActiveComputeAgents(ctx context.Context) (map[str // updateComputeAgent updates or adds an entry (if none already exists) to the database // `address` corresponds to the address of the compute agent server for the `containerID` -func (n *computeAgentStore) updateComputeAgent(ctx context.Context, containerID string, address string) error { - if err := n.db.Update(func(tx *bolt.Tx) error { +func (c *computeAgentStore) updateComputeAgent(ctx context.Context, containerID string, address string) error { + if err := c.db.Update(func(tx *bolt.Tx) error { bkt, err := createComputeAgentBucket(tx) if err != nil { return err @@ -82,8 +86,8 @@ func (n *computeAgentStore) updateComputeAgent(ctx context.Context, containerID // deleteComputeAgent deletes an entry in the database or returns an error if none exists // `containerID` corresponds to the target key that the entry should be deleted for -func (n *computeAgentStore) deleteComputeAgent(ctx context.Context, containerID string) error { - if err := n.db.Update(func(tx *bolt.Tx) error { +func (c *computeAgentStore) deleteComputeAgent(ctx context.Context, containerID string) error { + if err := c.db.Update(func(tx *bolt.Tx) error { bkt := getComputeAgentBucket(tx) if bkt == nil { return errors.Wrapf(errBucketNotFound, "bucket %v", bucketKeyComputeAgent) diff --git a/internal/uvm/network.go b/internal/uvm/network.go index 4e6816be95..7f24b13a0b 100644 --- a/internal/uvm/network.go +++ b/internal/uvm/network.go @@ -124,13 +124,22 @@ func (uvm *UtilityVM) NCProxyEnabled() bool { return uvm.ncProxyClientAddress != "" } -func (uvm *UtilityVM) CreateNCProxyClient() (ncproxyttrpc.NetworkConfigProxyService, error) { +type ncproxyClient struct { + raw *ttrpc.Client + ncproxyttrpc.NetworkConfigProxyService +} + +func (n *ncproxyClient) Close() error { + return n.raw.Close() +} + +func (uvm *UtilityVM) GetNCProxyClient() (*ncproxyClient, error) { conn, err := winio.DialPipe(uvm.ncProxyClientAddress, nil) if err != nil { return nil, errors.Wrap(err, "failed to connect to ncproxy service") } - client := ttrpc.NewClient(conn, ttrpc.WithOnClose(func() { conn.Close() })) - return ncproxyttrpc.NewNetworkConfigProxyClient(client), nil + raw := ttrpc.NewClient(conn, ttrpc.WithOnClose(func() { conn.Close() })) + return &ncproxyClient{raw, ncproxyttrpc.NewNetworkConfigProxyClient(raw)}, nil } // NetworkConfigType specifies the action to be performed during network configuration. @@ -239,10 +248,11 @@ func NewExternalNetworkSetup(ctx context.Context, vm *UtilityVM, caAddr, contain } func (e *externalNetworkSetup) ConfigureNetworking(ctx context.Context, namespaceID string, configType NetworkConfigType) error { - client, err := e.vm.CreateNCProxyClient() + client, err := e.vm.GetNCProxyClient() if err != nil { return errors.Wrapf(err, "no ncproxy client for UVM %q", e.vm.ID()) } + defer client.Close() netReq := &ncproxyttrpc.ConfigureNetworkingInternalRequest{ ContainerID: e.containerID, diff --git a/test/vendor/github.com/Microsoft/hcsshim/internal/uvm/network.go b/test/vendor/github.com/Microsoft/hcsshim/internal/uvm/network.go index 1a8e853470..7f24b13a0b 100644 --- a/test/vendor/github.com/Microsoft/hcsshim/internal/uvm/network.go +++ b/test/vendor/github.com/Microsoft/hcsshim/internal/uvm/network.go @@ -124,13 +124,22 @@ func (uvm *UtilityVM) NCProxyEnabled() bool { return uvm.ncProxyClientAddress != "" } -func (uvm *UtilityVM) GetNCProxyClient() (ncproxyttrpc.NetworkConfigProxyService, error) { +type ncproxyClient struct { + raw *ttrpc.Client + ncproxyttrpc.NetworkConfigProxyService +} + +func (n *ncproxyClient) Close() error { + return n.raw.Close() +} + +func (uvm *UtilityVM) GetNCProxyClient() (*ncproxyClient, error) { conn, err := winio.DialPipe(uvm.ncProxyClientAddress, nil) if err != nil { return nil, errors.Wrap(err, "failed to connect to ncproxy service") } - client := ttrpc.NewClient(conn, ttrpc.WithOnClose(func() { conn.Close() })) - return ncproxyttrpc.NewNetworkConfigProxyClient(client), nil + raw := ttrpc.NewClient(conn, ttrpc.WithOnClose(func() { conn.Close() })) + return &ncproxyClient{raw, ncproxyttrpc.NewNetworkConfigProxyClient(raw)}, nil } // NetworkConfigType specifies the action to be performed during network configuration. @@ -243,6 +252,7 @@ func (e *externalNetworkSetup) ConfigureNetworking(ctx context.Context, namespac if err != nil { return errors.Wrapf(err, "no ncproxy client for UVM %q", e.vm.ID()) } + defer client.Close() netReq := &ncproxyttrpc.ConfigureNetworkingInternalRequest{ ContainerID: e.containerID,