Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update ncproxy compute agent cache map #1126

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 52 additions & 14 deletions cmd/ncproxy/ncproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,42 @@ import (
"google.golang.org/grpc/status"
)

type computeAgentCache struct {
// lock for synchronizing read/write access to `cache`
rw sync.RWMutex
// mapping of container ID to shim compute agent ttrpc service
cache map[string]computeagent.ComputeAgentService
}

func newComputeAgentCache() *computeAgentCache {
return &computeAgentCache{
cache: make(map[string]computeagent.ComputeAgentService),
}
}

func (c *computeAgentCache) get(cid string) (computeagent.ComputeAgentService, bool) {
c.rw.RLock()
defer c.rw.RUnlock()
result, ok := c.cache[cid]
return result, ok
}

func (c *computeAgentCache) put(cid string, agent computeagent.ComputeAgentService) {
c.rw.Lock()
defer c.rw.Unlock()
c.cache[cid] = agent
}

// GRPC service exposed for use by a Node Network Service.
type grpcService struct{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side note: I would love if in the future we can find better names for the "network agent facing" and "shim facing" services than grpcService and ttrpcService.

type grpcService struct {
containerIDToComputeAgent *computeAgentCache
}

func newGRPCService(agentCache *computeAgentCache) *grpcService {
return &grpcService{
containerIDToComputeAgent: agentCache,
}
}

var _ ncproxygrpc.NetworkConfigProxyServer = &grpcService{}

Expand All @@ -42,13 +76,13 @@ func (s *grpcService) AddNIC(ctx context.Context, req *ncproxygrpc.AddNICRequest
if req.ContainerID == "" || req.EndpointName == "" || req.NicID == "" {
return nil, status.Errorf(codes.InvalidArgument, "received empty field in request: %+v", req)
}
if client, ok := containerIDToShim[req.ContainerID]; ok {
if agent, ok := s.containerIDToComputeAgent.get(req.ContainerID); ok {
caReq := &computeagent.AddNICInternalRequest{
ContainerID: req.ContainerID,
NicID: req.NicID,
EndpointName: req.EndpointName,
}
if _, err := client.AddNIC(ctx, caReq); err != nil {
if _, err := agent.AddNIC(ctx, caReq); err != nil {
return nil, err
}
return &ncproxygrpc.AddNICResponse{}, nil
Expand All @@ -72,7 +106,7 @@ func (s *grpcService) ModifyNIC(ctx context.Context, req *ncproxygrpc.ModifyNICR
return nil, status.Error(codes.InvalidArgument, "received empty field in request")
}

if client, ok := containerIDToShim[req.ContainerID]; ok {
if agent, ok := s.containerIDToComputeAgent.get(req.ContainerID); ok {
caReq := &computeagent.ModifyNICInternalRequest{
NicID: req.NicID,
EndpointName: req.EndpointName,
Expand Down Expand Up @@ -112,7 +146,7 @@ func (s *grpcService) ModifyNIC(ctx context.Context, req *ncproxygrpc.ModifyNICR
//
// To turn on iov offload, the reverse order is used.
if req.IovPolicySettings.IovOffloadWeight == 0 {
if _, err := client.ModifyNIC(ctx, caReq); err != nil {
if _, err := agent.ModifyNIC(ctx, caReq); err != nil {
return nil, err
}
if err := modifyEndpoint(ctx, ep.Id, policies, hcn.RequestTypeUpdate); err != nil {
Expand All @@ -125,7 +159,7 @@ func (s *grpcService) ModifyNIC(ctx context.Context, req *ncproxygrpc.ModifyNICR
if err := modifyEndpoint(ctx, ep.Id, policies, hcn.RequestTypeUpdate); err != nil {
return nil, errors.Wrap(err, "failed to modify network adapter")
}
if _, err := client.ModifyNIC(ctx, caReq); err != nil {
if _, err := agent.ModifyNIC(ctx, caReq); err != nil {
return nil, err
}
}
Expand All @@ -148,13 +182,13 @@ func (s *grpcService) DeleteNIC(ctx context.Context, req *ncproxygrpc.DeleteNICR
if req.ContainerID == "" || req.EndpointName == "" || req.NicID == "" {
return nil, status.Errorf(codes.InvalidArgument, "received empty field in request: %+v", req)
}
if client, ok := containerIDToShim[req.ContainerID]; ok {
if agent, ok := s.containerIDToComputeAgent.get(req.ContainerID); ok {
caReq := &computeagent.DeleteNICInternalRequest{
ContainerID: req.ContainerID,
NicID: req.NicID,
EndpointName: req.EndpointName,
}
if _, err := client.DeleteNIC(ctx, caReq); err != nil {
if _, err := agent.DeleteNIC(ctx, caReq); err != nil {
if err == uvm.ErrNICNotFound || err == uvm.ErrNetNSNotFound {
return nil, status.Errorf(codes.NotFound, "failed to remove endpoint %q from namespace %q", req.EndpointName, req.NicID)
}
Expand Down Expand Up @@ -553,10 +587,15 @@ func (s *grpcService) GetNetworks(ctx context.Context, req *ncproxygrpc.GetNetwo
}, nil
}

// TTRPC service exposed for use by the shim. Holds a mutex for updating map of
// client connections.
// TTRPC service exposed for use by the shim.
type ttrpcService struct {
m sync.Mutex
containerIDToComputeAgent *computeAgentCache
}

func newTTRPCService(agentCache *computeAgentCache) *ttrpcService {
return &ttrpcService{
containerIDToComputeAgent: agentCache,
}
}

func (s *ttrpcService) RegisterComputeAgent(ctx context.Context, req *ncproxyttrpc.RegisterComputeAgentRequest) (_ *ncproxyttrpc.RegisterComputeAgentResponse, err error) {
Expand All @@ -579,9 +618,8 @@ func (s *ttrpcService) RegisterComputeAgent(ctx context.Context, req *ncproxyttr
)
// Add to global client map if connection succeeds. Don't check if there's already a map entry
// just overwrite as the client may have changed the address of the config agent.
s.m.Lock()
defer s.m.Unlock()
containerIDToShim[req.ContainerID] = computeagent.NewComputeAgentClient(client)
s.containerIDToComputeAgent.put(req.ContainerID, computeagent.NewComputeAgentClient(client))

return &ncproxyttrpc.RegisterComputeAgentResponse{}, nil
}

Expand Down
3 changes: 0 additions & 3 deletions cmd/ncproxy/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/Microsoft/go-winio/pkg/etwlogrus"
"github.com/Microsoft/go-winio/pkg/guid"
"github.com/Microsoft/hcsshim/cmd/ncproxy/nodenetsvc"
"github.com/Microsoft/hcsshim/internal/computeagent"
"github.com/Microsoft/hcsshim/internal/debug"
"github.com/Microsoft/hcsshim/internal/log"
"github.com/Microsoft/hcsshim/internal/oc"
Expand All @@ -33,8 +32,6 @@ type nodeNetSvcConn struct {
}

var (
// Global mapping of network namespace ID to shim compute agent ttrpc service.
containerIDToShim = make(map[string]computeagent.ComputeAgentService)
// Global object representing the connection to the node network service that
// ncproxy will be talking to.
nodeNetSvcClient *nodeNetSvcConn
Expand Down
8 changes: 6 additions & 2 deletions cmd/ncproxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,12 @@ func newServer(ctx context.Context, conf *config) (*server, error) {
}

func (s *server) setup(ctx context.Context) (net.Listener, net.Listener, error) {
ncproxygrpc.RegisterNetworkConfigProxyServer(s.grpc, &grpcService{})
ncproxyttrpc.RegisterNetworkConfigProxyService(s.ttrpc, &ttrpcService{})
agentCache := newComputeAgentCache()
gService := newGRPCService(agentCache)
ncproxygrpc.RegisterNetworkConfigProxyServer(s.grpc, gService)

tService := newTTRPCService(agentCache)
ncproxyttrpc.RegisterNetworkConfigProxyService(s.ttrpc, tService)

ttrpcListener, err := winio.ListenPipe(s.conf.TTRPCAddr, nil)
if err != nil {
Expand Down