Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create new types for service-defaults upstream cfg #9872

Merged
merged 12 commits into from
Mar 17, 2021
Merged
4 changes: 4 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -1948,6 +1948,7 @@ type addServiceLockedRequest struct {
// agent using Agent.AddService.
type AddServiceRequest struct {
Service *structs.NodeService
nodeName string
freddygv marked this conversation as resolved.
Show resolved Hide resolved
chkTypes []*structs.CheckType
persist bool
token string
Expand Down Expand Up @@ -3107,6 +3108,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI
err = a.addServiceLocked(addServiceLockedRequest{
AddServiceRequest: AddServiceRequest{
Service: ns,
nodeName: a.config.NodeName,
chkTypes: chkTypes,
persist: false, // don't rewrite the file with the same data we just read
token: service.Token,
Expand All @@ -3127,6 +3129,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI
err = a.addServiceLocked(addServiceLockedRequest{
AddServiceRequest: AddServiceRequest{
Service: sidecar,
nodeName: a.config.NodeName,
chkTypes: sidecarChecks,
persist: false, // don't rewrite the file with the same data we just read
token: sidecarToken,
Expand Down Expand Up @@ -3225,6 +3228,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI
err = a.addServiceLocked(addServiceLockedRequest{
AddServiceRequest: AddServiceRequest{
Service: p.Service,
nodeName: a.config.NodeName,
chkTypes: nil,
persist: false, // don't rewrite the file with the same data we just read
token: p.Token,
Expand Down
2 changes: 2 additions & 0 deletions agent/agent_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -994,6 +994,7 @@ func (s *HTTPHandlers) AgentRegisterService(resp http.ResponseWriter, req *http.

addReq := AddServiceRequest{
Service: ns,
nodeName: s.agent.config.NodeName,
chkTypes: chkTypes,
persist: true,
token: token,
Expand All @@ -1007,6 +1008,7 @@ func (s *HTTPHandlers) AgentRegisterService(resp http.ResponseWriter, req *http.
if sidecar != nil {
addReq := AddServiceRequest{
Service: sidecar,
nodeName: s.agent.config.NodeName,
chkTypes: sidecarChecks,
persist: true,
token: sidecarToken,
Expand Down
4 changes: 4 additions & 0 deletions agent/cache-types/resolved_service_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ func TestResolvedServiceConfig(t *testing.T) {
require.Equal(uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(1*time.Second, req.QueryOptions.MaxQueryTime)
require.Equal("foo", req.Name)
require.Equal("foo-1", req.ID)
require.Equal("foo-node", req.NodeName)
require.True(req.AllowStale)

reply := args.Get(2).(*structs.ServiceConfigResponse)
Expand All @@ -48,6 +50,8 @@ func TestResolvedServiceConfig(t *testing.T) {
}, &structs.ServiceConfigRequest{
Datacenter: "dc1",
Name: "foo",
ID: "foo-1",
NodeName: "foo-node",
})
require.NoError(err)
require.Equal(cache.FetchResult{
Expand Down
177 changes: 125 additions & 52 deletions agent/consul/config_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,31 +329,21 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it might make sense to split this function out and apart.
With big closures like this can be hard to reason about what is captured from the surrounding context; splitting it into a small closure that calls another function makes that explicit.
There's also quite a bit of code, and a number of separate concerns; for example figuring out the default protocol is spread around a bit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good point. I added a TODO here that I can take care of for the beta.

reply.Reset()

reply.MeshGateway.Mode = structs.MeshGatewayModeDefault
// Pass the WatchSet to both the service and proxy config lookups. If either is updated
// during the blocking query, this function will be rerun and these state store lookups
// will both be current.
index, serviceEntry, err := state.ConfigEntry(ws, structs.ServiceDefaults, args.Name, &args.EnterpriseMeta)
if err != nil {
return err
}
var serviceConf *structs.ServiceConfigEntry
var ok bool
if serviceEntry != nil {
serviceConf, ok = serviceEntry.(*structs.ServiceConfigEntry)
if !ok {
return fmt.Errorf("invalid service config type %T", serviceEntry)
}
}

// Use the default enterprise meta to look up the global proxy defaults. In the future we may allow per-namespace proxy-defaults
// but not yet.
// Pass the WatchSet to both the service and proxy config lookups. If either is updated during the
// blocking query, this function will be rerun and these state store lookups will both be current.
// We use the default enterprise meta to look up the global proxy defaults because their are not namespaced.
freddygv marked this conversation as resolved.
Show resolved Hide resolved
_, proxyEntry, err := state.ConfigEntry(ws, structs.ProxyDefaults, structs.ProxyConfigGlobal, structs.DefaultEnterpriseMeta())
if err != nil {
return err
}
var proxyConf *structs.ProxyConfigEntry

var (
proxyConf *structs.ProxyConfigEntry
proxyConfGlobalProtocol string
ok bool
)
if proxyEntry != nil {
proxyConf, ok = proxyEntry.(*structs.ProxyConfigEntry)
if !ok {
Expand All @@ -367,11 +357,29 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
reply.ProxyConfig = mapCopy.(map[string]interface{})
reply.MeshGateway = proxyConf.MeshGateway
reply.Expose = proxyConf.Expose

// Extract the global protocol from proxyConf for upstream configs.
rawProtocol := proxyConf.Config["protocol"]
if rawProtocol != nil {
proxyConfGlobalProtocol, ok = rawProtocol.(string)
if !ok {
return fmt.Errorf("invalid protocol type %T", rawProtocol)
}
}
}
rboyer marked this conversation as resolved.
Show resolved Hide resolved

index, serviceEntry, err := state.ConfigEntry(ws, structs.ServiceDefaults, args.Name, &args.EnterpriseMeta)
if err != nil {
return err
}
reply.Index = index

if serviceConf != nil {
var serviceConf *structs.ServiceConfigEntry
if serviceEntry != nil {
serviceConf, ok = serviceEntry.(*structs.ServiceConfigEntry)
if !ok {
return fmt.Errorf("invalid service config type %T", serviceEntry)
}
if serviceConf.Expose.Checks {
reply.Expose.Checks = true
}
Expand All @@ -389,55 +397,121 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
}
}

// Extract the global protocol from proxyConf for upstream configs.
var proxyConfGlobalProtocol interface{}
if proxyConf != nil && proxyConf.Config != nil {
proxyConfGlobalProtocol = proxyConf.Config["protocol"]
}
// First collect all upstreams into a set of seen upstreams.
// Upstreams can come from:
// - Explicitly from proxy registrations, and therefore as an argument to this RPC endpoint
// - Implicitly from centralized upstream config in service-defaults
seenUpstreams := map[structs.ServiceID]struct{}{}

// map the legacy request structure using only service names
// to the new ServiceID type.
upstreamIDs := args.UpstreamIDs
legacyUpstreams := false

// Before Consul namespaces were released, the Upstreams provided to the endpoint did not contain the namespace.
// Because of this we attach the enterprise meta of the request, which will just be the default namespace.
if len(upstreamIDs) == 0 {
legacyUpstreams = true

upstreamIDs = make([]structs.ServiceID, 0)
for _, upstream := range args.Upstreams {
upstreamIDs = append(upstreamIDs, structs.NewServiceID(upstream, &args.EnterpriseMeta))
sid := structs.NewServiceID(upstream, &args.EnterpriseMeta)
upstreamIDs = append(upstreamIDs, sid)
}
}

// First store all upstreams that were provided in the request
for _, sid := range upstreamIDs {
if _, ok := seenUpstreams[sid]; !ok {
seenUpstreams[sid] = struct{}{}
}
}

// Then store upstreams inferred from service-defaults
if serviceConf != nil && serviceConf.Connect != nil {
for sid := range serviceConf.Connect.UpstreamConfigs {
seenUpstreams[structs.ServiceIDFromString(sid)] = struct{}{}
}
}

var (
upstreamDefaults *structs.UpstreamConfig
upstreamConfigs map[string]*structs.UpstreamConfig
)
if serviceConf != nil && serviceConf.Connect != nil {
if serviceConf.Connect.UpstreamDefaults != nil {
upstreamDefaults = serviceConf.Connect.UpstreamDefaults
}
if serviceConf.Connect.UpstreamConfigs != nil {
upstreamConfigs = serviceConf.Connect.UpstreamConfigs
}
}

// The goal is to flatten the mesh gateway mode in this order:
// 0. Value from centralized upstream_defaults
// 1. Value from local proxy registration
// 2. Value from centralized upstream_configs
// 3. Value from local upstream definition. This last step is done in the client's service manager.
var registrationMGConfig structs.MeshGatewayConfig

if args.ID != "" && args.NodeName != "" {
index, registration, err := state.NodeServiceWatch(ws, args.NodeName, args.ID, &args.EnterpriseMeta)
freddygv marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("failed to query service registration")
}
if index > reply.Index {
reply.Index = index
}

if registration != nil && !registration.Proxy.MeshGateway.IsZero() {
registrationMGConfig = registration.Proxy.MeshGateway
}
}

// usConfigs stores the opaque config map for each upstream and is keyed on the upstream's ID.
usConfigs := make(map[structs.ServiceID]map[string]interface{})

for _, upstream := range upstreamIDs {
_, upstreamEntry, err := state.ConfigEntry(ws, structs.ServiceDefaults, upstream.ID, &upstream.EnterpriseMeta)
for upstream := range seenUpstreams {
resolvedCfg := make(map[string]interface{})

// The protocol of an upstream is resolved in this order:
// 1. Default protocol from proxy-defaults (how all services should be addressed)
// 2. Protocol for upstream service defined in its service-defaults (how the upstream wants to be addressed)
// 3. Protocol defined for the upstream in the service-defaults.(upstream_defaults|upstream_configs) of the downstream
// (how the downstream wants to address it)
protocol := proxyConfGlobalProtocol

_, upstreamSvcDefaults, err := state.ConfigEntry(ws, structs.ServiceDefaults, upstream.ID, &upstream.EnterpriseMeta)
if err != nil {
return err
}
var upstreamConf *structs.ServiceConfigEntry
var ok bool
if upstreamEntry != nil {
upstreamConf, ok = upstreamEntry.(*structs.ServiceConfigEntry)
if upstreamSvcDefaults != nil {
cfg, ok := upstreamSvcDefaults.(*structs.ServiceConfigEntry)
if !ok {
return fmt.Errorf("invalid service config type %T", upstreamEntry)
return fmt.Errorf("invalid service config type %T", upstreamSvcDefaults)
}
if cfg.Protocol != "" {
protocol = cfg.Protocol
}
}
if protocol != "" {
resolvedCfg["protocol"] = protocol
}

// Fallback to proxyConf global protocol.
protocol := proxyConfGlobalProtocol
if upstreamConf != nil && upstreamConf.Protocol != "" {
protocol = upstreamConf.Protocol
// Merge centralized defaults for all upstreams before configuration for specific upstreams
if upstreamDefaults != nil {
upstreamDefaults.MergeInto(resolvedCfg, args.ID == "")
}
// The value from the proxy registration overrides the one from upstream_defaults because
// it is specific to the proxy instance
if !registrationMGConfig.IsZero() {
resolvedCfg["mesh_gateway"] = registrationMGConfig
}

// Nothing to configure if a protocol hasn't been set.
if protocol == nil {
continue
if upstreamConfigs[upstream.String()] != nil {
upstreamConfigs[upstream.String()].MergeInto(resolvedCfg, args.ID == "")
}

usConfigs[upstream] = map[string]interface{}{
"protocol": protocol,
if len(resolvedCfg) > 0 {
usConfigs[upstream] = resolvedCfg
}
}

Expand All @@ -447,22 +521,21 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
}

if legacyUpstreams {
if reply.UpstreamConfigs == nil {
reply.UpstreamConfigs = make(map[string]map[string]interface{})
}
// For legacy upstreams we return a map that is only keyed on the string ID, since they precede namespaces
reply.UpstreamConfigs = make(map[string]map[string]interface{})

for us, conf := range usConfigs {
reply.UpstreamConfigs[us.ID] = conf
}

} else {
if reply.UpstreamIDConfigs == nil {
reply.UpstreamIDConfigs = make(structs.UpstreamConfigs, 0, len(usConfigs))
}
reply.UpstreamIDConfigs = make(structs.OpaqueUpstreamConfigs, 0, len(usConfigs))

for us, conf := range usConfigs {
reply.UpstreamIDConfigs = append(reply.UpstreamIDConfigs, structs.UpstreamConfig{Upstream: us, Config: conf})
reply.UpstreamIDConfigs = append(reply.UpstreamIDConfigs,
structs.OpaqueUpstreamConfig{Upstream: us, Config: conf})
}
}

return nil
})
}
Expand Down
Loading