Skip to content

Commit

Permalink
Prevent client connection leaks, address feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Kathryn Baldauf <[email protected]>
  • Loading branch information
katiewasnothere committed Sep 16, 2021
1 parent 9793955 commit e4152d9
Show file tree
Hide file tree
Showing 7 changed files with 396 additions and 115 deletions.
132 changes: 64 additions & 68 deletions cmd/ncproxy/ncproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/Microsoft/hcsshim/pkg/octtrpc"
"github.com/containerd/ttrpc"
"github.com/pkg/errors"
bolt "go.etcd.io/bbolt"
"go.opencensus.io/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -44,25 +43,55 @@ func newComputeAgentCache() *computeAgentCache {
}
}

func (c *computeAgentCache) get(cid string) (*computeAgentClient, bool) {
func (c *computeAgentCache) getAllAndClear() ([]*computeAgentClient, error) {
// set c.cache to nil first so that subsequent attempts to reads and writes
// return an error
c.rw.Lock()
cacheCopy := c.cache
c.cache = nil
c.rw.Unlock()

if cacheCopy == nil {
return nil, errors.New("cannot read from a nil cache")
}

results := []*computeAgentClient{}
for _, agent := range cacheCopy {
results = append(results, agent)
}
return results, nil

}

func (c *computeAgentCache) get(cid string) (*computeAgentClient, error) {
c.rw.RLock()
defer c.rw.RUnlock()
result, ok := c.cache[cid]
return result, ok
if c.cache == nil {
return nil, errors.New("cannot read from a nil cache")
}
result := c.cache[cid]
return result, nil
}

func (c *computeAgentCache) put(cid string, agent *computeAgentClient) {
func (c *computeAgentCache) put(cid string, agent *computeAgentClient) error {
c.rw.Lock()
defer c.rw.Unlock()
if c.cache == nil {
return errors.New("cannot write to a nil cache")
}
c.cache[cid] = agent
return nil
}

func (c *computeAgentCache) getAndDelete(cid string) (*computeAgentClient, bool) {
func (c *computeAgentCache) getAndDelete(cid string) (*computeAgentClient, error) {
c.rw.Lock()
defer c.rw.Unlock()
result, ok := c.cache[cid]
if c.cache == nil {
return nil, errors.New("cannot read from a nil cache")
}
result := c.cache[cid]
delete(c.cache, cid)
return result, ok
return result, nil
}

// GRPC service exposed for use by a Node Network Service.
Expand Down Expand Up @@ -91,7 +120,11 @@ func (s *grpcService) AddNIC(ctx context.Context, req *ncproxygrpc.AddNICRequest
if req.ContainerID == "" || req.EndpointName == "" || req.NicID == "" {
return nil, status.Errorf(codes.InvalidArgument, "received empty field in request: %+v", req)
}
if agent, ok := s.containerIDToComputeAgent.get(req.ContainerID); ok {
agent, err := s.containerIDToComputeAgent.get(req.ContainerID)
if err != nil {
return nil, err
}
if agent != nil {
caReq := &computeagent.AddNICInternalRequest{
ContainerID: req.ContainerID,
NicID: req.NicID,
Expand Down Expand Up @@ -121,7 +154,11 @@ func (s *grpcService) ModifyNIC(ctx context.Context, req *ncproxygrpc.ModifyNICR
return nil, status.Error(codes.InvalidArgument, "received empty field in request")
}

if agent, ok := s.containerIDToComputeAgent.get(req.ContainerID); ok {
agent, err := s.containerIDToComputeAgent.get(req.ContainerID)
if err != nil {
return nil, err
}
if agent != nil {
caReq := &computeagent.ModifyNICInternalRequest{
NicID: req.NicID,
EndpointName: req.EndpointName,
Expand Down Expand Up @@ -197,7 +234,11 @@ func (s *grpcService) DeleteNIC(ctx context.Context, req *ncproxygrpc.DeleteNICR
if req.ContainerID == "" || req.EndpointName == "" || req.NicID == "" {
return nil, status.Errorf(codes.InvalidArgument, "received empty field in request: %+v", req)
}
if agent, ok := s.containerIDToComputeAgent.get(req.ContainerID); ok {
agent, err := s.containerIDToComputeAgent.get(req.ContainerID)
if err != nil {
return nil, err
}
if agent != nil {
caReq := &computeagent.DeleteNICInternalRequest{
ContainerID: req.ContainerID,
NicID: req.NicID,
Expand Down Expand Up @@ -608,73 +649,22 @@ func (s *grpcService) GetNetworks(ctx context.Context, req *ncproxygrpc.GetNetwo
// TTRPC service exposed for use by the shim.
type ttrpcService struct {
containerIDToComputeAgent *computeAgentCache
// database for containerID to compute agent address
agentStore *computeAgentStore
agentStore *computeAgentStore
}

func newTTRPCService(ctx context.Context, agent *computeAgentCache, db *bolt.DB) *ttrpcService {
agentStore := newComputeAgentStore(db)
func newTTRPCService(ctx context.Context, agent *computeAgentCache, agentStore *computeAgentStore) *ttrpcService {
return &ttrpcService{
containerIDToComputeAgent: agent,
agentStore: agentStore,
}
}

func (s *ttrpcService) reconnectComputeAgents(ctx context.Context) {
computeAgentMap, err := s.agentStore.getActiveComputeAgents(ctx)
if err != nil && errors.Is(err, errBucketNotFound) {
// no entries in the database yet, return early
log.G(ctx).WithError(err).Debug("no entries in database")
return
} else if err != nil {
log.G(ctx).WithError(err).Error("failed to get compute agent information")
}
var wg sync.WaitGroup
for cid, addr := range computeAgentMap {
wg.Add(1)
go func(agentAddress, containerID string) {
defer wg.Done()
service, err := getComputeAgentClient(agentAddress)
if err != nil {
// can't connect to compute agent, remove entry in database
log.G(ctx).WithField("agentAddress", agentAddress).WithError(err).Error("failed to create new compute agent client")
dErr := s.agentStore.deleteComputeAgent(ctx, containerID)
if dErr != nil {
log.G(ctx).WithField("key", containerID).WithError(dErr).Warn("failed to delete key from compute agent store")
}
return
}
log.G(ctx).WithField("containerID", containerID).Info("reconnected to container's compute agent")

// connection succeeded, add entry in cache map for later
s.containerIDToComputeAgent.put(containerID, service)
}(addr, cid)
}

wg.Wait()
}

// disconnectComputeAgents clears the cache of compute agent clients and cleans up
// their resources.
func disconnectComputeAgents(ctx context.Context, containerIDToComputeAgent *computeAgentCache) error {
agents, err := containerIDToComputeAgent.getAllAndClear()
if err != nil {
return errors.Wrapf(err, "failed to get all cached compute agent clients")
}
for _, agent := range agents {
if err := agent.Close(); err != nil {
log.G(ctx).WithError(err).Error("failed to close compute agent connection")
}
}
return nil
}

func getComputeAgentClient(agentAddr string) (*computeAgentClient, error) {
conn, err := winioDialPipe(agentAddr, nil)
if err != nil {
return nil, errors.Wrap(err, "failed to connect to compute agent service")
}
raw := ttrpc.NewClient(
raw := ttrpcNewClient(
conn,
ttrpc.WithUnaryClientInterceptor(octtrpc.ClientInterceptor()),
ttrpc.WithOnClose(func() { conn.Close() }),
Expand All @@ -700,9 +690,11 @@ func (s *ttrpcService) RegisterComputeAgent(ctx context.Context, req *ncproxyttr
return nil, err
}

// Add to global client map if connection succeeds. Don't check if there's already a map entry
// Add to client cache if connection succeeds. Don't check if there's already a map entry
// just overwrite as the client may have changed the address of the config agent.
s.containerIDToComputeAgent.put(req.ContainerID, agent)
if err := s.containerIDToComputeAgent.put(req.ContainerID, agent); err != nil {
return nil, err
}

return &ncproxyttrpc.RegisterComputeAgentResponse{}, nil
}
Expand All @@ -721,7 +713,11 @@ func (s *ttrpcService) UnregisterComputeAgent(ctx context.Context, req *ncproxyt
}

// remove the agent from the cache and return it so we can clean up its resources as well
if agent, ok := s.containerIDToComputeAgent.getAndDelete(req.ContainerID); ok {
agent, err := s.containerIDToComputeAgent.getAndDelete(req.ContainerID)
if err != nil {
return nil, err
}
if agent != nil {
if err := agent.Close(); err != nil {
return nil, err
}
Expand Down
18 changes: 7 additions & 11 deletions cmd/ncproxy/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
bolt "go.etcd.io/bbolt"
"go.opencensus.io/plugin/ocgrpc"
"go.opencensus.io/trace"
"google.golang.org/grpc"
Expand All @@ -40,6 +39,9 @@ type computeAgentClient struct {
}

func (c *computeAgentClient) Close() error {
if c.raw == nil {
return nil
}
return c.raw.Close()
}

Expand Down Expand Up @@ -229,11 +231,6 @@ func run(clicontext *cli.Context) error {
}
}

db, err := bolt.Open(dbPath, 0600, nil)
if err != nil {
return err
}

log.G(ctx).WithFields(logrus.Fields{
"TTRPCAddr": conf.TTRPCAddr,
"NodeNetSvcAddr": conf.NodeNetSvcAddr,
Expand All @@ -247,12 +244,13 @@ func run(clicontext *cli.Context) error {
defer signal.Stop(sigChan)

// Create new server and then register NetworkConfigProxyServices.
server, err := newServer(ctx, conf)
server, err := newServer(ctx, conf, dbPath)
if err != nil {
return errors.New("failed to make new ncproxy server")
}
defer server.cleanupResources(ctx)

ttrpcListener, grpcListener, err := server.setup(ctx, db)
ttrpcListener, grpcListener, err := server.setup(ctx)
if err != nil {
return errors.New("failed to setup ncproxy server")
}
Expand All @@ -272,9 +270,7 @@ func run(clicontext *cli.Context) error {
}

// Cancel inflight requests and shutdown services
if err := server.gracefulShutdown(ctx); err != nil {
return errors.Wrap(err, "ncproxy failed to shutdown gracefully")
}
server.gracefulShutdown(ctx)

return nil
}
Loading

0 comments on commit e4152d9

Please sign in to comment.