From 36b09695088503e3214050786439b55e2e34ec1d Mon Sep 17 00:00:00 2001 From: Seth Hoenig Date: Tue, 15 Dec 2020 14:38:33 -0600 Subject: [PATCH] consul/connect: Add support for Connect terminating gateways This PR implements Nomad built-in support for running Consul Connect terminating gateways. Such a gateway can be used by services running inside the service mesh to access "legacy" services running outside the service mesh while still making use of Consul's service identity based networking and ACL policies. https://www.consul.io/docs/connect/gateways/terminating-gateway These gateways are declared as part of a task group level service definition within the connect stanza. service { connect { gateway { proxy { // envoy proxy configuration } terminating { // terminating-gateway configuration entry } } } } Currently Envoy is the only supported gateway implementation in Consul. The gateay task can be customized by configuring the connect.sidecar_task block. When the gateway.terminating field is set, Nomad will write/update the Configuration Entry into Consul on job submission. Because CEs are global in scope and there may be more than one Nomad cluster communicating with Consul, there is an assumption that any terminating gateway defined in Nomad for a particular service will be the same among Nomad clusters. Gateways require Consul 1.8.0+, checked by a node constraint. Closes #9445 --- CHANGELOG.md | 4 + api/services.go | 85 +++- api/services_test.go | 53 +++ .../taskrunner/envoy_bootstrap_hook.go | 22 +- command/agent/consul/connect.go | 10 +- command/agent/consul/connect_test.go | 8 +- command/agent/consul/service_client.go | 15 +- command/agent/job_endpoint.go | 42 +- command/agent/job_endpoint_test.go | 145 ++++++- contributing/checklist-jobspec.md | 24 +- e2e/connect/acls.go | 32 ++ e2e/connect/connect.go | 11 + e2e/connect/input/terminating-gateway.nomad | 109 +++++ nomad/consul.go | 49 ++- nomad/consul_test.go | 52 ++- nomad/job_endpoint.go | 12 +- nomad/job_endpoint_hook_connect.go | 235 ++++++----- nomad/job_endpoint_hook_connect_test.go | 93 ++++- nomad/node_endpoint_test.go | 2 +- nomad/structs/connect.go | 33 ++ nomad/structs/diff.go | 103 ++++- nomad/structs/diff_test.go | 104 +++++ nomad/structs/services.go | 246 +++++++++++- nomad/structs/services_test.go | 362 ++++++++++++++++- nomad/structs/structs.go | 28 +- nomad/structs/structs_test.go | 16 + .../hashicorp/nomad/api/services.go | 85 +++- .../docs/job-specification/gateway.mdx | 378 +++++++++++++----- 28 files changed, 1982 insertions(+), 376 deletions(-) create mode 100644 e2e/connect/input/terminating-gateway.nomad create mode 100644 nomad/structs/connect.go diff --git a/CHANGELOG.md b/CHANGELOG.md index c3fd04e5b9a..c3ce79a0a07 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## 1.0.3 (Unreleased) +FEATURES: + +* **Terminating Gateways**: Adds built-in support for running Consul Connect terminating gateways [[GH-9829](https://github.com/hashicorp/nomad/pull/9829)] + ## 1.0.2 (January 14, 2020) IMPROVEMENTS: diff --git a/api/services.go b/api/services.go index 59e92f6794e..f85e113cdfd 100644 --- a/api/services.go +++ b/api/services.go @@ -302,8 +302,8 @@ type ConsulGateway struct { // Ingress represents the Consul Configuration Entry for an Ingress Gateway. Ingress *ConsulIngressConfigEntry `hcl:"ingress,block"` - // Terminating is not yet supported. - // Terminating *ConsulTerminatingConfigEntry + // Terminating represents the Consul Configuration Entry for a Terminating Gateway. + Terminating *ConsulTerminatingConfigEntry `hcl:"terminating,block"` // Mesh is not yet supported. // Mesh *ConsulMeshConfigEntry @@ -315,6 +315,7 @@ func (g *ConsulGateway) Canonicalize() { } g.Proxy.Canonicalize() g.Ingress.Canonicalize() + g.Terminating.Canonicalize() } func (g *ConsulGateway) Copy() *ConsulGateway { @@ -323,8 +324,9 @@ func (g *ConsulGateway) Copy() *ConsulGateway { } return &ConsulGateway{ - Proxy: g.Proxy.Copy(), - Ingress: g.Ingress.Copy(), + Proxy: g.Proxy.Copy(), + Ingress: g.Ingress.Copy(), + Terminating: g.Terminating.Copy(), } } @@ -335,6 +337,8 @@ type ConsulGatewayBindAddress struct { } var ( + // defaultGatewayConnectTimeout is the default amount of time connections to + // upstreams are allowed before timing out. defaultGatewayConnectTimeout = 5 * time.Second ) @@ -347,6 +351,7 @@ type ConsulGatewayProxy struct { EnvoyGatewayBindTaggedAddresses bool `mapstructure:"envoy_gateway_bind_tagged_addresses" hcl:"envoy_gateway_bind_tagged_addresses,optional"` EnvoyGatewayBindAddresses map[string]*ConsulGatewayBindAddress `mapstructure:"envoy_gateway_bind_addresses" hcl:"envoy_gateway_bind_addresses,block"` EnvoyGatewayNoDefaultBind bool `mapstructure:"envoy_gateway_no_default_bind" hcl:"envoy_gateway_no_default_bind,optional"` + EnvoyDNSDiscoveryType string `mapstructure:"envoy_dns_discovery_type" hcl:"envoy_dns_discovery_type,optional"` Config map[string]interface{} `hcl:"config,block"` // escape hatch envoy config } @@ -395,6 +400,7 @@ func (p *ConsulGatewayProxy) Copy() *ConsulGatewayProxy { EnvoyGatewayBindTaggedAddresses: p.EnvoyGatewayBindTaggedAddresses, EnvoyGatewayBindAddresses: binds, EnvoyGatewayNoDefaultBind: p.EnvoyGatewayNoDefaultBind, + EnvoyDNSDiscoveryType: p.EnvoyDNSDiscoveryType, Config: config, } } @@ -547,9 +553,74 @@ func (e *ConsulIngressConfigEntry) Copy() *ConsulIngressConfigEntry { } } -// ConsulTerminatingConfigEntry is not yet supported. -// type ConsulTerminatingConfigEntry struct { -// } +type ConsulLinkedService struct { + Name string `hcl:"name,optional"` + CAFile string `hcl:"ca_file,optional"` + CertFile string `hcl:"cert_file,optional"` + KeyFile string `hcl:"key_file,optional"` + SNI string `hcl:"sni,optional"` +} + +func (s *ConsulLinkedService) Canonicalize() { + // nothing to do for now +} + +func (s *ConsulLinkedService) Copy() *ConsulLinkedService { + if s == nil { + return nil + } + + return &ConsulLinkedService{ + Name: s.Name, + CAFile: s.CAFile, + CertFile: s.CertFile, + KeyFile: s.KeyFile, + SNI: s.SNI, + } +} + +// ConsulTerminatingConfigEntry represents the Consul Configuration Entry type +// for a Terminating Gateway. +// +// https://www.consul.io/docs/agent/config-entries/terminating-gateway#available-fields +type ConsulTerminatingConfigEntry struct { + // Namespace is not yet supported. + // Namespace string + + Services []*ConsulLinkedService `hcl:"service,block"` +} + +func (e *ConsulTerminatingConfigEntry) Canonicalize() { + if e == nil { + return + } + + if len(e.Services) == 0 { + e.Services = nil + } + + for _, service := range e.Services { + service.Canonicalize() + } +} + +func (e *ConsulTerminatingConfigEntry) Copy() *ConsulTerminatingConfigEntry { + if e == nil { + return nil + } + + var services []*ConsulLinkedService = nil + if n := len(e.Services); n > 0 { + services = make([]*ConsulLinkedService, n) + for i := 0; i < n; i++ { + services[i] = e.Services[i].Copy() + } + } + + return &ConsulTerminatingConfigEntry{ + Services: services, + } +} // ConsulMeshConfigEntry is not yet supported. // type ConsulMeshConfigEntry struct { diff --git a/api/services_test.go b/api/services_test.go index 6bfeaed4e8b..737b8b87ec4 100644 --- a/api/services_test.go +++ b/api/services_test.go @@ -291,7 +291,10 @@ func TestService_ConsulGateway_Canonicalize(t *testing.T) { } cg.Canonicalize() require.Equal(t, timeToPtr(5*time.Second), cg.Proxy.ConnectTimeout) + require.True(t, cg.Proxy.EnvoyGatewayBindTaggedAddresses) require.Nil(t, cg.Proxy.EnvoyGatewayBindAddresses) + require.True(t, cg.Proxy.EnvoyGatewayNoDefaultBind) + require.Empty(t, cg.Proxy.EnvoyDNSDiscoveryType) require.Nil(t, cg.Proxy.Config) require.Nil(t, cg.Ingress.Listeners) }) @@ -314,6 +317,7 @@ func TestService_ConsulGateway_Copy(t *testing.T) { "listener2": {Address: "10.0.0.1", Port: 2001}, }, EnvoyGatewayNoDefaultBind: true, + EnvoyDNSDiscoveryType: "STRICT_DNS", Config: map[string]interface{}{ "foo": "bar", "baz": 3, @@ -334,6 +338,11 @@ func TestService_ConsulGateway_Copy(t *testing.T) { }}, }, }, + Terminating: &ConsulTerminatingConfigEntry{ + Services: []*ConsulLinkedService{{ + Name: "linked-service1", + }}, + }, } t.Run("complete", func(t *testing.T) { @@ -418,3 +427,47 @@ func TestService_ConsulIngressConfigEntry_Copy(t *testing.T) { require.Equal(t, entry, result) }) } + +func TestService_ConsulTerminatingConfigEntry_Canonicalize(t *testing.T) { + t.Parallel() + + t.Run("nil", func(t *testing.T) { + c := (*ConsulTerminatingConfigEntry)(nil) + c.Canonicalize() + require.Nil(t, c) + }) + + t.Run("empty services", func(t *testing.T) { + c := &ConsulTerminatingConfigEntry{ + Services: []*ConsulLinkedService{}, + } + c.Canonicalize() + require.Nil(t, c.Services) + }) +} + +func TestService_ConsulTerminatingConfigEntry_Copy(t *testing.T) { + t.Parallel() + + t.Run("nil", func(t *testing.T) { + result := (*ConsulIngressConfigEntry)(nil).Copy() + require.Nil(t, result) + }) + + entry := &ConsulTerminatingConfigEntry{ + Services: []*ConsulLinkedService{{ + Name: "servic1", + }, { + Name: "service2", + CAFile: "ca_file.pem", + CertFile: "cert_file.pem", + KeyFile: "key_file.pem", + SNI: "sni.terminating.consul", + }}, + } + + t.Run("complete", func(t *testing.T) { + result := entry.Copy() + require.Equal(t, entry, result) + }) +} diff --git a/client/allocrunner/taskrunner/envoy_bootstrap_hook.go b/client/allocrunner/taskrunner/envoy_bootstrap_hook.go index 0186e2ea77f..5982ca73391 100644 --- a/client/allocrunner/taskrunner/envoy_bootstrap_hook.go +++ b/client/allocrunner/taskrunner/envoy_bootstrap_hook.go @@ -110,13 +110,16 @@ func (envoyBootstrapHook) Name() string { return envoyBootstrapHookName } +func isConnectKind(kind string) bool { + kinds := []string{structs.ConnectProxyPrefix, structs.ConnectIngressPrefix, structs.ConnectTerminatingPrefix} + return helper.SliceStringContains(kinds, kind) +} + func (_ *envoyBootstrapHook) extractNameAndKind(kind structs.TaskKind) (string, string, error) { - serviceKind := kind.Name() serviceName := kind.Value() + serviceKind := kind.Name() - switch serviceKind { - case structs.ConnectProxyPrefix, structs.ConnectIngressPrefix: - default: + if !isConnectKind(serviceKind) { return "", "", errors.New("envoy must be used as connect sidecar or gateway") } @@ -346,12 +349,13 @@ func (h *envoyBootstrapHook) newEnvoyBootstrapArgs( gateway string // gateway only ) - if service.Connect.HasSidecar() { + switch { + case service.Connect.HasSidecar(): sidecarForID = agentconsul.MakeAllocServiceID(h.alloc.ID, "group-"+tgName, service) - } - - if service.Connect.IsGateway() { - gateway = "ingress" // more types in the future + case service.Connect.IsIngress(): + gateway = "ingress" + case service.Connect.IsTerminating(): + gateway = "terminating" } h.logger.Debug("bootstrapping envoy", diff --git a/command/agent/consul/connect.go b/command/agent/consul/connect.go index 1fcf3f568bc..68904070f6d 100644 --- a/command/agent/consul/connect.go +++ b/command/agent/consul/connect.go @@ -26,6 +26,7 @@ func newConnect(serviceName string, nc *structs.ConsulConnect, networks structs. return &api.AgentServiceConnect{Native: true}, nil case nc.HasSidecar(): + // must register the sidecar for this service sidecarReg, err := connectSidecarRegistration(serviceName, nc.SidecarService, networks) if err != nil { return nil, err @@ -33,6 +34,7 @@ func newConnect(serviceName string, nc *structs.ConsulConnect, networks structs. return &api.AgentServiceConnect{SidecarService: sidecarReg}, nil default: + // a non-nil but empty connect block makes no sense return nil, fmt.Errorf("Connect configuration empty for service %s", serviceName) } } @@ -64,6 +66,10 @@ func newConnectGateway(serviceName string, connect *structs.ConsulConnect) *api. envoyConfig["envoy_gateway_bind_tagged_addresses"] = true } + if proxy.EnvoyDNSDiscoveryType != "" { + envoyConfig["envoy_dns_discovery_type"] = proxy.EnvoyDNSDiscoveryType + } + if proxy.ConnectTimeout != nil { envoyConfig["connect_timeout_ms"] = proxy.ConnectTimeout.Milliseconds() } @@ -89,7 +95,7 @@ func connectSidecarRegistration(serviceName string, css *structs.ConsulSidecarSe return nil, err } - proxy, err := connectProxy(css.Proxy, cPort.To, networks) + proxy, err := connectSidecarProxy(css.Proxy, cPort.To, networks) if err != nil { return nil, err } @@ -102,7 +108,7 @@ func connectSidecarRegistration(serviceName string, css *structs.ConsulSidecarSe }, nil } -func connectProxy(proxy *structs.ConsulProxy, cPort int, networks structs.Networks) (*api.AgentServiceConnectProxyConfig, error) { +func connectSidecarProxy(proxy *structs.ConsulProxy, cPort int, networks structs.Networks) (*api.AgentServiceConnectProxyConfig, error) { if proxy == nil { proxy = new(structs.ConsulProxy) } diff --git a/command/agent/consul/connect_test.go b/command/agent/consul/connect_test.go index e3d08d3e97a..d42f639957c 100644 --- a/command/agent/consul/connect_test.go +++ b/command/agent/consul/connect_test.go @@ -119,7 +119,7 @@ func TestConnect_connectProxy(t *testing.T) { // If the input proxy is nil, we expect the output to be a proxy with its // config set to default values. t.Run("nil proxy", func(t *testing.T) { - proxy, err := connectProxy(nil, 2000, testConnectNetwork) + proxy, err := connectSidecarProxy(nil, 2000, testConnectNetwork) require.NoError(t, err) require.Equal(t, &api.AgentServiceConnectProxyConfig{ LocalServiceAddress: "", @@ -134,7 +134,7 @@ func TestConnect_connectProxy(t *testing.T) { }) t.Run("bad proxy", func(t *testing.T) { - _, err := connectProxy(&structs.ConsulProxy{ + _, err := connectSidecarProxy(&structs.ConsulProxy{ LocalServiceAddress: "0.0.0.0", LocalServicePort: 2000, Upstreams: nil, @@ -149,7 +149,7 @@ func TestConnect_connectProxy(t *testing.T) { }) t.Run("normal", func(t *testing.T) { - proxy, err := connectProxy(&structs.ConsulProxy{ + proxy, err := connectSidecarProxy(&structs.ConsulProxy{ LocalServiceAddress: "0.0.0.0", LocalServicePort: 2000, Upstreams: nil, @@ -453,6 +453,7 @@ func TestConnect_newConnectGateway(t *testing.T) { }, }, EnvoyGatewayNoDefaultBind: true, + EnvoyDNSDiscoveryType: "STRICT_DNS", Config: map[string]interface{}{ "foo": 1, }, @@ -470,6 +471,7 @@ func TestConnect_newConnectGateway(t *testing.T) { }, }, "envoy_gateway_no_default_bind": true, + "envoy_dns_discovery_type": "STRICT_DNS", "foo": 1, }, }, result) diff --git a/command/agent/consul/service_client.go b/command/agent/consul/service_client.go index 798e2e451bf..762d3154c33 100644 --- a/command/agent/consul/service_client.go +++ b/command/agent/consul/service_client.go @@ -891,10 +891,21 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, w // This enables the consul UI to show that Nomad registered this service meta["external-source"] = "nomad" - // Explicitly set the service kind in case this service represents a Connect gateway. + // Explicitly set the Consul service Kind in case this service represents + // one of the Connect gateway types. kind := api.ServiceKindTypical - if service.Connect.IsGateway() { + switch { + case service.Connect.IsIngress(): kind = api.ServiceKindIngressGateway + case service.Connect.IsTerminating(): + kind = api.ServiceKindTerminatingGateway + // set the default port if bridge / default listener set + if defaultBind, exists := service.Connect.Gateway.Proxy.EnvoyGatewayBindAddresses["default"]; exists { + portLabel := fmt.Sprintf("%s-%s", structs.ConnectTerminatingPrefix, service.Name) + if dynPort, ok := workload.Ports.Get(portLabel); ok { + defaultBind.Port = dynPort.Value + } + } } // Build the Consul Service registration request diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 396916627dd..ce58457e12f 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -1329,8 +1329,9 @@ func apiConnectGatewayToStructs(in *api.ConsulGateway) *structs.ConsulGateway { } return &structs.ConsulGateway{ - Proxy: apiConnectGatewayProxyToStructs(in.Proxy), - Ingress: apiConnectIngressGatewayToStructs(in.Ingress), + Proxy: apiConnectGatewayProxyToStructs(in.Proxy), + Ingress: apiConnectIngressGatewayToStructs(in.Ingress), + Terminating: apiConnectTerminatingGatewayToStructs(in.Terminating), } } @@ -1355,6 +1356,7 @@ func apiConnectGatewayProxyToStructs(in *api.ConsulGatewayProxy) *structs.Consul EnvoyGatewayBindTaggedAddresses: in.EnvoyGatewayBindTaggedAddresses, EnvoyGatewayBindAddresses: bindAddresses, EnvoyGatewayNoDefaultBind: in.EnvoyGatewayNoDefaultBind, + EnvoyDNSDiscoveryType: in.EnvoyDNSDiscoveryType, Config: helper.CopyMapStringInterface(in.Config), } } @@ -1427,6 +1429,42 @@ func apiConnectIngressServiceToStructs(in *api.ConsulIngressService) *structs.Co } } +func apiConnectTerminatingGatewayToStructs(in *api.ConsulTerminatingConfigEntry) *structs.ConsulTerminatingConfigEntry { + if in == nil { + return nil + } + + return &structs.ConsulTerminatingConfigEntry{ + Services: apiConnectTerminatingServicesToStructs(in.Services), + } +} + +func apiConnectTerminatingServicesToStructs(in []*api.ConsulLinkedService) []*structs.ConsulLinkedService { + if len(in) == 0 { + return nil + } + + services := make([]*structs.ConsulLinkedService, len(in)) + for i, service := range in { + services[i] = apiConnectTerminatingServiceToStructs(service) + } + return services +} + +func apiConnectTerminatingServiceToStructs(in *api.ConsulLinkedService) *structs.ConsulLinkedService { + if in == nil { + return nil + } + + return &structs.ConsulLinkedService{ + Name: in.Name, + CAFile: in.CAFile, + CertFile: in.CertFile, + KeyFile: in.KeyFile, + SNI: in.SNI, + } +} + func apiConnectSidecarServiceToStructs(in *api.ConsulSidecarService) *structs.ConsulSidecarService { if in == nil { return nil diff --git a/command/agent/job_endpoint_test.go b/command/agent/job_endpoint_test.go index 166c995ba95..43f395a5f4f 100644 --- a/command/agent/job_endpoint_test.go +++ b/command/agent/job_endpoint_test.go @@ -3061,26 +3061,131 @@ func TestConversion_apiConnectSidecarServiceToStructs(t *testing.T) { })) } -func TestConversion_ApiConsulConnectToStructs_legacy(t *testing.T) { +func TestConversion_ApiConsulConnectToStructs(t *testing.T) { t.Parallel() - require.Nil(t, ApiConsulConnectToStructs(nil)) - require.Equal(t, &structs.ConsulConnect{ - Native: false, - SidecarService: &structs.ConsulSidecarService{Port: "myPort"}, - SidecarTask: &structs.SidecarTask{Name: "task"}, - }, ApiConsulConnectToStructs(&api.ConsulConnect{ - Native: false, - SidecarService: &api.ConsulSidecarService{Port: "myPort"}, - SidecarTask: &api.SidecarTask{Name: "task"}, - })) -} -func TestConversion_ApiConsulConnectToStructs_native(t *testing.T) { - t.Parallel() - require.Nil(t, ApiConsulConnectToStructs(nil)) - require.Equal(t, &structs.ConsulConnect{ - Native: true, - }, ApiConsulConnectToStructs(&api.ConsulConnect{ - Native: true, - })) + t.Run("nil", func(t *testing.T) { + require.Nil(t, ApiConsulConnectToStructs(nil)) + }) + + t.Run("sidecar", func(t *testing.T) { + require.Equal(t, &structs.ConsulConnect{ + Native: false, + SidecarService: &structs.ConsulSidecarService{Port: "myPort"}, + SidecarTask: &structs.SidecarTask{Name: "task"}, + }, ApiConsulConnectToStructs(&api.ConsulConnect{ + Native: false, + SidecarService: &api.ConsulSidecarService{Port: "myPort"}, + SidecarTask: &api.SidecarTask{Name: "task"}, + })) + }) + + t.Run("gateway proxy", func(t *testing.T) { + require.Equal(t, &structs.ConsulConnect{ + Gateway: &structs.ConsulGateway{ + Proxy: &structs.ConsulGatewayProxy{ + ConnectTimeout: helper.TimeToPtr(3 * time.Second), + EnvoyGatewayBindTaggedAddresses: true, + EnvoyGatewayBindAddresses: map[string]*structs.ConsulGatewayBindAddress{ + "service": { + Address: "10.0.0.1", + Port: 9000, + }}, + EnvoyGatewayNoDefaultBind: true, + EnvoyDNSDiscoveryType: "STRICT_DNS", + Config: map[string]interface{}{ + "foo": "bar", + }, + }, + }, + }, ApiConsulConnectToStructs(&api.ConsulConnect{ + Gateway: &api.ConsulGateway{ + Proxy: &api.ConsulGatewayProxy{ + ConnectTimeout: helper.TimeToPtr(3 * time.Second), + EnvoyGatewayBindTaggedAddresses: true, + EnvoyGatewayBindAddresses: map[string]*api.ConsulGatewayBindAddress{ + "service": { + Address: "10.0.0.1", + Port: 9000, + }, + }, + EnvoyGatewayNoDefaultBind: true, + EnvoyDNSDiscoveryType: "STRICT_DNS", + Config: map[string]interface{}{ + "foo": "bar", + }, + }, + }, + })) + }) + + t.Run("gateway ingress", func(t *testing.T) { + require.Equal(t, &structs.ConsulConnect{ + Gateway: &structs.ConsulGateway{ + Ingress: &structs.ConsulIngressConfigEntry{ + TLS: &structs.ConsulGatewayTLSConfig{Enabled: true}, + Listeners: []*structs.ConsulIngressListener{{ + Port: 1111, + Protocol: "http", + Services: []*structs.ConsulIngressService{{ + Name: "ingress1", + Hosts: []string{"host1"}, + }}, + }}, + }, + }, + }, ApiConsulConnectToStructs( + &api.ConsulConnect{ + Gateway: &api.ConsulGateway{ + Ingress: &api.ConsulIngressConfigEntry{ + TLS: &api.ConsulGatewayTLSConfig{Enabled: true}, + Listeners: []*api.ConsulIngressListener{{ + Port: 1111, + Protocol: "http", + Services: []*api.ConsulIngressService{{ + Name: "ingress1", + Hosts: []string{"host1"}, + }}, + }}, + }, + }, + }, + )) + }) + + t.Run("gateway terminating", func(t *testing.T) { + require.Equal(t, &structs.ConsulConnect{ + Gateway: &structs.ConsulGateway{ + Terminating: &structs.ConsulTerminatingConfigEntry{ + Services: []*structs.ConsulLinkedService{{ + Name: "linked-service", + CAFile: "ca.pem", + CertFile: "cert.pem", + KeyFile: "key.pem", + SNI: "linked.consul", + }}, + }, + }, + }, ApiConsulConnectToStructs(&api.ConsulConnect{ + Gateway: &api.ConsulGateway{ + Terminating: &api.ConsulTerminatingConfigEntry{ + Services: []*api.ConsulLinkedService{{ + Name: "linked-service", + CAFile: "ca.pem", + CertFile: "cert.pem", + KeyFile: "key.pem", + SNI: "linked.consul", + }}, + }, + }, + })) + }) + + t.Run("native", func(t *testing.T) { + require.Equal(t, &structs.ConsulConnect{ + Native: true, + }, ApiConsulConnectToStructs(&api.ConsulConnect{ + Native: true, + })) + }) } diff --git a/contributing/checklist-jobspec.md b/contributing/checklist-jobspec.md index 315b00f4320..24c86a226da 100644 --- a/contributing/checklist-jobspec.md +++ b/contributing/checklist-jobspec.md @@ -2,21 +2,25 @@ ## Code -* [ ] Consider similar features in Consul, Kubernetes, and other tools. Is - there prior art we should match? Terminology, structure, etc? +* [ ] Consider similar features in Consul, Kubernetes, and other tools. Is there prior art we should match? Terminology, structure, etc? * [ ] Add structs/fields to `api/` package - * structs usually have Canonicalize, Copy, and Merge methods - * New fields should be added to existing Canonicalize, Copy, and Merge - methods - * Test the struct/field via all methods mentioned above + * `api/` structs usually have Canonicalize and Copy methods + * New fields should be added to existing Canonicalize, Copy methods + * Test the structs/fields via methods mentioned above * [ ] Add structs/fields to `nomad/structs` package - * Validation happens in this package and must be implemented - * Implement other methods and tests from `api/` package + * `structs/` structs usually have Copy, Equals, and Validate methods + * Validation happens in this package and _must_ be implemented * Note that analogous struct field names should match with `api/` package + * Test the structs/fields via methods mentioned above + * Implement and test other logical methods * [ ] Add conversion between `api/` and `nomad/structs` in `command/agent/job_endpoint.go` -* [ ] Add check for job diff in `nomad/structs/diff.go` + * Add test for conversion +* [ ] Implement diff logic for new structs/fields in `nomad/structs/diff.go` * Note that fields must be listed in alphabetical order in `FieldDiff` slices in `nomad/structs/diff_test.go` -* [ ] Test conversion + * Add test for diff of new structs/fields +* [ ] Add change detection for new structs/feilds in `scheduler/util.go/tasksUpdated` + * Might be covered by `.Equals` but might not be, check. + * Should return true if the task must be replaced as a result of the change. ## HCL1 (deprecated) diff --git a/e2e/connect/acls.go b/e2e/connect/acls.go index 4033e9ff343..16aca749eed 100644 --- a/e2e/connect/acls.go +++ b/e2e/connect/acls.go @@ -28,6 +28,9 @@ const ( // demoConnectIngressGateway is the example ingress gateway job useful for testing demoConnectIngressGateway = "connect/input/ingress-gateway.nomad" + + // demoConnectTerminatingGateway is the example terminating gateway job useful for testing + demoConnectTerminatingGateway = "connect/input/terminating-gateway.nomad" ) type ConnectACLsE2ETest struct { @@ -339,6 +342,35 @@ func (tc *ConnectACLsE2ETest) TestConnectACLsConnectIngressGatewayDemo(f *framew t.Log("connect ingress gateway job with ACLs enabled finished") } +func (tc *ConnectACLsE2ETest) TestConnectACLsConnectTerminatingGatewayDemo(f *framework.F) { + t := f.T() + + t.Log("test register Connect Terminating Gateway job w/ ACLs enabled") + + // setup ACL policy and mint operator token + + policyID := tc.createConsulPolicy(consulPolicy{ + Name: "nomad-operator-policy", + Rules: `service "api-gateway" { policy = "write" } service "count-dashboard" { policy = "write" }`, + }, f) + operatorToken := tc.createOperatorToken(policyID, f) + t.Log("created operator token:", operatorToken) + + jobID := connectJobID() + tc.jobIDs = append(tc.jobIDs, jobID) + + allocs := e2eutil.RegisterAndWaitForAllocs(t, tc.Nomad(), demoConnectTerminatingGateway, jobID, operatorToken) + allocIDs := e2eutil.AllocIDsFromAllocationListStubs(allocs) + e2eutil.WaitForAllocsRunning(t, tc.Nomad(), allocIDs) + + foundSITokens := tc.countSITokens(t) + f.Equal(2, len(foundSITokens), "expected 2 SI tokens total: %v", foundSITokens) + f.Equal(1, foundSITokens["connect-terminating-api-gateway"], "expected 1 SI token for connect-terminating-api-gateway: %v", foundSITokens) + f.Equal(1, foundSITokens["connect-proxy-count-dashboard"], "expected 1 SI token for count-dashboard: %v", foundSITokens) + + t.Log("connect terminating gateway job with ACLs enabled finished") +} + var ( siTokenRe = regexp.MustCompile(`_nomad_si \[[\w-]{36}] \[[\w-]{36}] \[([\S]+)]`) ) diff --git a/e2e/connect/connect.go b/e2e/connect/connect.go index 66d582dc48f..eca857199a1 100644 --- a/e2e/connect/connect.go +++ b/e2e/connect/connect.go @@ -92,3 +92,14 @@ func (tc *ConnectE2ETest) TestConnectIngressGatewayDemo(f *framework.F) { allocIDs := e2eutil.AllocIDsFromAllocationListStubs(allocs) e2eutil.WaitForAllocsRunning(t, tc.Nomad(), allocIDs) } + +func (tc *ConnectE2ETest) TestConnectTerminatingGatewayDemo(f *framework.F) { + t := f.T() + + jobID := connectJobID() + tc.jobIds = append(tc.jobIds, jobID) + + allocs := e2eutil.RegisterAndWaitForAllocs(t, tc.Nomad(), demoConnectTerminatingGateway, jobID, "") + allocIDs := e2eutil.AllocIDsFromAllocationListStubs(allocs) + e2eutil.WaitForAllocsRunning(t, tc.Nomad(), allocIDs) +} diff --git a/e2e/connect/input/terminating-gateway.nomad b/e2e/connect/input/terminating-gateway.nomad new file mode 100644 index 00000000000..ff8ea276e8b --- /dev/null +++ b/e2e/connect/input/terminating-gateway.nomad @@ -0,0 +1,109 @@ +job "countdash-terminating" { + + datacenters = ["dc1"] + + constraint { + attribute = "${attr.kernel.name}" + value = "linux" + } + + group "api" { + network { + mode = "host" + port "port" { + static = "9001" + } + } + + service { + name = "count-api" + port = "port" + } + + task "api" { + driver = "docker" + + config { + image = "hashicorpnomad/counter-api:v3" + network_mode = "host" + } + } + } + + group "gateway" { + network { + mode = "bridge" + } + + service { + name = "api-gateway" + + connect { + gateway { + proxy { + # The following options are automatically set by Nomad if not explicitly + # configured with using bridge networking. + # + # envoy_gateway_no_default_bind = true + # envoy_gateway_bind_addresses "default" { + # address = "0.0.0.0" + # port = + # } + # Additional options are documented at + # https://www.nomadproject.io/docs/job-specification/gateway#proxy-parameters + } + + terminating { + # Nomad will automatically manage the Configuration Entry in Consul + # given the parameters in the terminating block. + # + # Additional options are documented at + # https://www.nomadproject.io/docs/job-specification/gateway#terminating-parameters + service { + name = "count-api" + } + } + } + } + } + } + + group "dashboard" { + network { + mode = "bridge" + + port "http" { + static = 9002 + to = 9002 + } + } + + service { + name = "count-dashboard" + port = "9002" + + connect { + sidecar_service { + proxy { + upstreams { + destination_name = "count-api" + local_bind_port = 8080 + } + } + } + } + } + + task "dashboard" { + driver = "docker" + + env { + COUNTING_SERVICE_URL = "http://${NOMAD_UPSTREAM_ADDR_count_api}" + } + + config { + image = "hashicorpnomad/counter-dashboard:v3" + } + } + } +} diff --git a/nomad/consul.go b/nomad/consul.go index f6bd1960c0e..a541e1ec2d4 100644 --- a/nomad/consul.go +++ b/nomad/consul.go @@ -451,9 +451,13 @@ func (s *Server) purgeSITokenAccessors(accessors []*structs.SITokenAccessor) err // Removing the entries is not particularly safe, given that multiple Nomad clusters // may be writing to the same config entries, which are global in the Consul scope. type ConsulConfigsAPI interface { - // SetIngressGatewayConfigEntry adds the given ConfigEntry to Consul, overwriting + // SetIngressCE adds the given ConfigEntry to Consul, overwriting // the previous entry if set. - SetIngressGatewayConfigEntry(ctx context.Context, service string, entry *structs.ConsulIngressConfigEntry) error + SetIngressCE(ctx context.Context, service string, entry *structs.ConsulIngressConfigEntry) error + + // SetTerminatingCE adds the given ConfigEntry to Consul, overwriting + // the previous entry if set. + SetTerminatingCE(ctx context.Context, service string, entry *structs.ConsulTerminatingConfigEntry) error // Stop is used to stop additional creations of Configuration Entries. Intended to // be used on Nomad Server shutdown. @@ -491,13 +495,18 @@ func (c *consulConfigsAPI) Stop() { c.stopped = true } -func (c *consulConfigsAPI) SetIngressGatewayConfigEntry(ctx context.Context, service string, entry *structs.ConsulIngressConfigEntry) error { - configEntry := convertIngressGatewayConfig(service, entry) - return c.setConfigEntry(ctx, configEntry) +func (c *consulConfigsAPI) SetIngressCE(ctx context.Context, service string, entry *structs.ConsulIngressConfigEntry) error { + return c.setCE(ctx, convertIngressCE(service, entry)) } -// setConfigEntry will set the Configuration Entry of any type Consul supports. -func (c *consulConfigsAPI) setConfigEntry(ctx context.Context, entry api.ConfigEntry) error { +func (c *consulConfigsAPI) SetTerminatingCE(ctx context.Context, service string, entry *structs.ConsulTerminatingConfigEntry) error { + return c.setCE(ctx, convertTerminatingCE(service, entry)) +} + +// also mesh + +// setCE will set the Configuration Entry of any type Consul supports. +func (c *consulConfigsAPI) setCE(ctx context.Context, entry api.ConfigEntry) error { defer metrics.MeasureSince([]string{"nomad", "consul", "create_config_entry"}, time.Now()) // make sure the background deletion goroutine has not been stopped @@ -518,14 +527,14 @@ func (c *consulConfigsAPI) setConfigEntry(ctx context.Context, entry api.ConfigE return err } -func convertIngressGatewayConfig(service string, entry *structs.ConsulIngressConfigEntry) api.ConfigEntry { +func convertIngressCE(service string, entry *structs.ConsulIngressConfigEntry) api.ConfigEntry { var listeners []api.IngressListener = nil for _, listener := range entry.Listeners { var services []api.IngressService = nil - for _, service := range listener.Services { + for _, s := range listener.Services { services = append(services, api.IngressService{ - Name: service.Name, - Hosts: helper.CopySliceString(service.Hosts), + Name: s.Name, + Hosts: helper.CopySliceString(s.Hosts), }) } listeners = append(listeners, api.IngressListener{ @@ -547,3 +556,21 @@ func convertIngressGatewayConfig(service string, entry *structs.ConsulIngressCon Listeners: listeners, } } + +func convertTerminatingCE(service string, entry *structs.ConsulTerminatingConfigEntry) api.ConfigEntry { + var linked []api.LinkedService = nil + for _, s := range entry.Services { + linked = append(linked, api.LinkedService{ + Name: s.Name, + CAFile: s.CAFile, + CertFile: s.CertFile, + KeyFile: s.KeyFile, + SNI: s.SNI, + }) + } + return &api.TerminatingGatewayConfigEntry{ + Kind: api.TerminatingGateway, + Name: service, + Services: linked, + } +} diff --git a/nomad/consul_test.go b/nomad/consul_test.go index 3eacbfe65bf..c57a0e32be3 100644 --- a/nomad/consul_test.go +++ b/nomad/consul_test.go @@ -20,36 +20,54 @@ var _ ConsulACLsAPI = (*consulACLsAPI)(nil) var _ ConsulACLsAPI = (*mockConsulACLsAPI)(nil) var _ ConsulConfigsAPI = (*consulConfigsAPI)(nil) -func TestConsulConfigsAPI_SetIngressGatewayConfigEntry(t *testing.T) { +func TestConsulConfigsAPI_SetCE(t *testing.T) { t.Parallel() - try := func(t *testing.T, expErr error) { + try := func(t *testing.T, expect error, f func(ConsulConfigsAPI) error) { logger := testlog.HCLogger(t) - configsAPI := consul.NewMockConfigsAPI(logger) // agent - configsAPI.SetError(expErr) + configsAPI := consul.NewMockConfigsAPI(logger) + configsAPI.SetError(expect) c := NewConsulConfigsAPI(configsAPI, logger) + err := f(c) // set the config entry - ctx := context.Background() - err := c.SetIngressGatewayConfigEntry(ctx, "service1", &structs.ConsulIngressConfigEntry{ - TLS: nil, - Listeners: nil, - }) - - if expErr != nil { - require.Equal(t, expErr, err) - } else { + switch expect { + case nil: require.NoError(t, err) + default: + require.Equal(t, expect, err) } } - t.Run("set ingress CE success", func(t *testing.T) { - try(t, nil) + ctx := context.Background() + + ingressCE := new(structs.ConsulIngressConfigEntry) + t.Run("ingress ok", func(t *testing.T) { + try(t, nil, func(c ConsulConfigsAPI) error { + return c.SetIngressCE(ctx, "ig", ingressCE) + }) }) - t.Run("set ingress CE failure", func(t *testing.T) { - try(t, errors.New("consul broke")) + t.Run("ingress fail", func(t *testing.T) { + try(t, errors.New("consul broke"), func(c ConsulConfigsAPI) error { + return c.SetIngressCE(ctx, "ig", ingressCE) + }) + }) + + terminatingCE := new(structs.ConsulTerminatingConfigEntry) + t.Run("terminating ok", func(t *testing.T) { + try(t, nil, func(c ConsulConfigsAPI) error { + return c.SetTerminatingCE(ctx, "tg", terminatingCE) + }) }) + + t.Run("terminating fail", func(t *testing.T) { + try(t, errors.New("consul broke"), func(c ConsulConfigsAPI) error { + return c.SetTerminatingCE(ctx, "tg", terminatingCE) + }) + }) + + // also mesh } type revokeRequest struct { diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 245e56da7f2..5d35675d6d7 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -289,11 +289,19 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis // Every job update will re-write the Configuration Entry into Consul. ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - for service, entry := range args.Job.ConfigEntries() { - if err := j.srv.consulConfigEntries.SetIngressGatewayConfigEntry(ctx, service, entry); err != nil { + entries := args.Job.ConfigEntries() + for service, entry := range entries.Ingress { + if err := j.srv.consulConfigEntries.SetIngressCE(ctx, service, entry); err != nil { return err } } + for service, entry := range entries.Terminating { + fmt.Println("SH JE set terminating CE", service) + if err := j.srv.consulConfigEntries.SetTerminatingCE(ctx, service, entry); err != nil { + return err + } + } + // also mesh // Enforce Sentinel policies. Pass a copy of the job to prevent // sentinel from altering it. diff --git a/nomad/job_endpoint_hook_connect.go b/nomad/job_endpoint_hook_connect.go index d429360dfcf..b03040e93fb 100644 --- a/nomad/job_endpoint_hook_connect.go +++ b/nomad/job_endpoint_hook_connect.go @@ -12,77 +12,75 @@ import ( "github.com/pkg/errors" ) -var ( - // connectSidecarResources returns the set of resources used by default for - // the Consul Connect sidecar task - connectSidecarResources = func() *structs.Resources { - return &structs.Resources{ - CPU: 250, - MemoryMB: 128, - } +// connectSidecarResources returns the set of resources used by default for +// the Consul Connect sidecar task +func connectSidecarResources() *structs.Resources { + return &structs.Resources{ + CPU: 250, + MemoryMB: 128, } +} - // connectSidecarDriverConfig is the driver configuration used by the injected - // connect proxy sidecar task. - connectSidecarDriverConfig = func() map[string]interface{} { - return map[string]interface{}{ - "image": envoy.SidecarConfigVar, - "args": []interface{}{ - "-c", structs.EnvoyBootstrapPath, - "-l", "${meta.connect.log_level}", - "--concurrency", "${meta.connect.proxy_concurrency}", - "--disable-hot-restart", - }, - } +// connectSidecarDriverConfig is the driver configuration used by the injected +// connect proxy sidecar task. +func connectSidecarDriverConfig() map[string]interface{} { + return map[string]interface{}{ + "image": envoy.SidecarConfigVar, + "args": []interface{}{ + "-c", structs.EnvoyBootstrapPath, + "-l", "${meta.connect.log_level}", + "--concurrency", "${meta.connect.proxy_concurrency}", + "--disable-hot-restart", + }, } +} - // connectGatewayDriverConfig is the Docker driver configuration used by the - // injected connect proxy sidecar task. - // - // A gateway may run in a group with bridge or host networking, and if host - // networking is being used the network_mode driver configuration is set here. - connectGatewayDriverConfig = func(hostNetwork bool) map[string]interface{} { - m := map[string]interface{}{ - "image": envoy.GatewayConfigVar, - "args": []interface{}{ - "-c", structs.EnvoyBootstrapPath, - "-l", "${meta.connect.log_level}", - "--concurrency", "${meta.connect.proxy_concurrency}", - "--disable-hot-restart", - }, - } - - if hostNetwork { - m["network_mode"] = "host" - } +// connectGatewayDriverConfig is the Docker driver configuration used by the +// injected connect proxy sidecar task. +// +// A gateway may run in a group with bridge or host networking, and if host +// networking is being used the network_mode driver configuration is set here. +func connectGatewayDriverConfig(hostNetwork bool) map[string]interface{} { + m := map[string]interface{}{ + "image": envoy.GatewayConfigVar, + "args": []interface{}{ + "-c", structs.EnvoyBootstrapPath, + "-l", "${meta.connect.log_level}", + "--concurrency", "${meta.connect.proxy_concurrency}", + "--disable-hot-restart", + }, + } - return m + if hostNetwork { + m["network_mode"] = "host" } - // connectMinimalVersionConstraint is used when building the sidecar task to ensure - // the proper Consul version is used that supports the necessary Connect - // features. This includes bootstrapping envoy with a unix socket for Consul's - // gRPC xDS API. - connectMinimalVersionConstraint = func() *structs.Constraint { - return &structs.Constraint{ - LTarget: "${attr.consul.version}", - RTarget: ">= 1.6.0-beta1", - Operand: structs.ConstraintSemver, - } + return m +} + +// connectSidecarVersionConstraint is used when building the sidecar task to ensure +// the proper Consul version is used that supports the necessary Connect +// features. This includes bootstrapping envoy with a unix socket for Consul's +// gRPC xDS API. +func connectSidecarVersionConstraint() *structs.Constraint { + return &structs.Constraint{ + LTarget: "${attr.consul.version}", + RTarget: ">= 1.6.0-beta1", + Operand: structs.ConstraintSemver, } +} - // connectGatewayVersionConstraint is used when building a connect gateway - // task to ensure proper Consul version is used that supports Connect Gateway - // features. This includes making use of Consul Configuration Entries of type - // {ingress,terminating,mesh}-gateway. - connectGatewayVersionConstraint = func() *structs.Constraint { - return &structs.Constraint{ - LTarget: "${attr.consul.version}", - RTarget: ">= 1.8.0", - Operand: structs.ConstraintSemver, - } +// connectGatewayVersionConstraint is used when building a connect gateway +// task to ensure proper Consul version is used that supports Connect Gateway +// features. This includes making use of Consul Configuration Entries of type +// {ingress,terminating,mesh}-gateway. +func connectGatewayVersionConstraint() *structs.Constraint { + return &structs.Constraint{ + LTarget: "${attr.consul.version}", + RTarget: ">= 1.8.0", + Operand: structs.ConstraintSemver, } -) +} // jobConnectHook implements a job Mutating and Validating admission controller type jobConnectHook struct{} @@ -91,7 +89,7 @@ func (jobConnectHook) Name() string { return "connect" } -func (jobConnectHook) Mutate(job *structs.Job) (_ *structs.Job, warnings []error, err error) { +func (jobConnectHook) Mutate(job *structs.Job) (*structs.Job, []error, error) { for _, g := range job.TaskGroups { // TG isn't validated yet, but validation // may depend on mutation results. @@ -110,13 +108,13 @@ func (jobConnectHook) Mutate(job *structs.Job) (_ *structs.Job, warnings []error return job, nil, nil } -func (jobConnectHook) Validate(job *structs.Job) (warnings []error, err error) { +func (jobConnectHook) Validate(job *structs.Job) ([]error, error) { + var warnings []error + for _, g := range job.TaskGroups { - w, err := groupConnectValidate(g) - if err != nil { + if w, err := groupConnectValidate(g); err != nil { return nil, err - } - if w != nil { + } else if w != nil { warnings = append(warnings, w...) } } @@ -143,9 +141,11 @@ func hasGatewayTaskForService(tg *structs.TaskGroup, svc string) bool { for _, t := range tg.Tasks { switch { case isIngressGatewayForService(t, svc): - // also terminating and mesh in the future + return true + case isTerminatingGatewayForService(t, svc): return true } + // mesh later } return false } @@ -154,6 +154,10 @@ func isIngressGatewayForService(t *structs.Task, svc string) bool { return t.Kind == structs.NewTaskKind(structs.ConnectIngressPrefix, svc) } +func isTerminatingGatewayForService(t *structs.Task, svc string) bool { + return t.Kind == structs.NewTaskKind(structs.ConnectTerminatingPrefix, svc) +} + // getNamedTaskForNativeService retrieves the Task with the name specified in the // group service definition. If the task name is empty and there is only one task // in the group, infer the name from the only option. @@ -173,6 +177,24 @@ func getNamedTaskForNativeService(tg *structs.TaskGroup, serviceName, taskName s return nil, errors.Errorf("task %s named by Consul Connect Native service %s->%s does not exist", taskName, tg.Name, serviceName) } +func injectPort(group *structs.TaskGroup, label string) { + // check that port hasn't already been defined before adding it to tg + for _, p := range group.Networks[0].DynamicPorts { + if p.Label == label { + return + } + } + + // inject a port of label that maps inside the bridge namespace + group.Networks[0].DynamicPorts = append(group.Networks[0].DynamicPorts, structs.Port{ + Label: label, + // -1 is a sentinel value to instruct the + // scheduler to map the host's dynamic port to + // the same port in the netns. + To: -1, + }) +} + // probably need to hack this up to look for checks on the service, and if they // qualify, configure a port for envoy to use to expose their paths. func groupConnectHook(job *structs.Job, g *structs.TaskGroup) error { @@ -199,7 +221,7 @@ func groupConnectHook(job *structs.Job, g *structs.TaskGroup) error { // If the task doesn't already exist, create a new one and add it to the job if task == nil { - task = newConnectTask(service.Name) + task = newConnectSidecarTask(service.Name) // If there happens to be a task defined with the same name // append an UUID fragment to the task name @@ -219,24 +241,8 @@ func groupConnectHook(job *structs.Job, g *structs.TaskGroup) error { // Canonicalize task since this mutator runs after job canonicalization task.Canonicalize(job, g) - makePort := func(label string) { - // check that port hasn't already been defined before adding it to tg - for _, p := range g.Networks[0].DynamicPorts { - if p.Label == label { - return - } - } - g.Networks[0].DynamicPorts = append(g.Networks[0].DynamicPorts, structs.Port{ - Label: label, - // -1 is a sentinel value to instruct the - // scheduler to map the host's dynamic port to - // the same port in the netns. - To: -1, - }) - } - // create a port for the sidecar task's proxy port - makePort(fmt.Sprintf("%s-%s", structs.ConnectProxyPrefix, service.Name)) + injectPort(g, fmt.Sprintf("%s-%s", structs.ConnectProxyPrefix, service.Name)) case service.Connect.IsNative(): // find the task backing this connect native service and set the kind @@ -253,18 +259,31 @@ func groupConnectHook(job *structs.Job, g *structs.TaskGroup) error { // a name of an injected gateway task service.Name = env.ReplaceEnv(service.Name) + // detect whether the group is in host networking mode, which will + // require tweaking the default gateway task config netHost := g.Networks[0].Mode == "host" - if !netHost && service.Connect.Gateway.Ingress != nil { + + if !netHost && service.Connect.IsGateway() { // Modify the gateway proxy service configuration to automatically // do the correct envoy bind address plumbing when inside a net // namespace, but only if things are not explicitly configured. service.Connect.Gateway.Proxy = gatewayProxyForBridge(service.Connect.Gateway) } + // Inject a port whether bridge or host network (if not already set). + // This port is accessed by the magic of Connect plumbing so it seems + // reasonable to keep the magic alive here. + if service.Connect.IsTerminating() && service.PortLabel == "" { + // Inject a dynamic port for the terminating gateway. + portLabel := fmt.Sprintf("%s-%s", structs.ConnectTerminatingPrefix, service.Name) + service.PortLabel = portLabel + injectPort(g, portLabel) + } + // inject the gateway task only if it does not yet already exist if !hasGatewayTaskForService(g, service.Name) { - task := newConnectGatewayTask(service.Name, netHost) - + prefix := service.Connect.Gateway.Prefix() + task := newConnectGatewayTask(prefix, service.Name, netHost) g.Tasks = append(g.Tasks, task) // the connect.sidecar_task stanza can also be used to configure @@ -321,18 +340,32 @@ func gatewayProxyForBridge(gateway *structs.ConsulGateway) *structs.ConsulGatewa proxy := new(structs.ConsulGatewayProxy) if gateway.Proxy != nil { proxy.ConnectTimeout = gateway.Proxy.ConnectTimeout + proxy.EnvoyDNSDiscoveryType = gateway.Proxy.EnvoyDNSDiscoveryType proxy.Config = gateway.Proxy.Config } - // magically set the fields where Nomad knows what to do - proxy.EnvoyGatewayNoDefaultBind = true - proxy.EnvoyGatewayBindTaggedAddresses = false - proxy.EnvoyGatewayBindAddresses = gatewayBindAddresses(gateway.Ingress) + // magically configure bind address(es) for bridge networking, per gateway type + // non-default configuration is gated above + switch { + case gateway.Ingress != nil: + proxy.EnvoyGatewayNoDefaultBind = true + proxy.EnvoyGatewayBindTaggedAddresses = false + proxy.EnvoyGatewayBindAddresses = gatewayBindAddressesIngress(gateway.Ingress) + case gateway.Terminating != nil: + proxy.EnvoyGatewayNoDefaultBind = true + proxy.EnvoyGatewayBindTaggedAddresses = false + proxy.EnvoyGatewayBindAddresses = map[string]*structs.ConsulGatewayBindAddress{ + "default": { + Address: "0.0.0.0", + Port: -1, // filled in later with dynamic port + }} + } + // later: mesh return proxy } -func gatewayBindAddresses(ingress *structs.ConsulIngressConfigEntry) map[string]*structs.ConsulGatewayBindAddress { +func gatewayBindAddressesIngress(ingress *structs.ConsulIngressConfigEntry) map[string]*structs.ConsulGatewayBindAddress { if ingress == nil || len(ingress.Listeners) == 0 { return nil } @@ -350,11 +383,11 @@ func gatewayBindAddresses(ingress *structs.ConsulIngressConfigEntry) map[string] return addresses } -func newConnectGatewayTask(serviceName string, netHost bool) *structs.Task { +func newConnectGatewayTask(prefix, service string, netHost bool) *structs.Task { return &structs.Task{ // Name is used in container name so must start with '[A-Za-z0-9]' - Name: fmt.Sprintf("%s-%s", structs.ConnectIngressPrefix, serviceName), - Kind: structs.NewTaskKind(structs.ConnectIngressPrefix, serviceName), + Name: fmt.Sprintf("%s-%s", prefix, service), + Kind: structs.NewTaskKind(prefix, service), Driver: "docker", Config: connectGatewayDriverConfig(netHost), ShutdownDelay: 5 * time.Second, @@ -369,11 +402,11 @@ func newConnectGatewayTask(serviceName string, netHost bool) *structs.Task { } } -func newConnectTask(serviceName string) *structs.Task { +func newConnectSidecarTask(service string) *structs.Task { return &structs.Task{ // Name is used in container name so must start with '[A-Za-z0-9]' - Name: fmt.Sprintf("%s-%s", structs.ConnectProxyPrefix, serviceName), - Kind: structs.NewTaskKind(structs.ConnectProxyPrefix, serviceName), + Name: fmt.Sprintf("%s-%s", structs.ConnectProxyPrefix, service), + Kind: structs.NewTaskKind(structs.ConnectProxyPrefix, service), Driver: "docker", Config: connectSidecarDriverConfig(), ShutdownDelay: 5 * time.Second, @@ -387,7 +420,7 @@ func newConnectTask(serviceName string) *structs.Task { Sidecar: true, }, Constraints: structs.Constraints{ - connectMinimalVersionConstraint(), + connectSidecarVersionConstraint(), }, } } diff --git a/nomad/job_endpoint_hook_connect_test.go b/nomad/job_endpoint_hook_connect_test.go index 09c12f8c1ab..6687d46adbd 100644 --- a/nomad/job_endpoint_hook_connect_test.go +++ b/nomad/job_endpoint_hook_connect_test.go @@ -86,8 +86,8 @@ func TestJobEndpointConnect_groupConnectHook(t *testing.T) { // Expected tasks tgExp := job.TaskGroups[0].Copy() tgExp.Tasks = []*structs.Task{ - newConnectTask("backend"), - newConnectTask("admin"), + newConnectSidecarTask("backend"), + newConnectSidecarTask("admin"), } tgExp.Services[0].Name = "backend" tgExp.Services[1].Name = "admin" @@ -129,7 +129,7 @@ func TestJobEndpointConnect_groupConnectHook_IngressGateway(t *testing.T) { expTG := job.TaskGroups[0].Copy() expTG.Tasks = []*structs.Task{ // inject the gateway task - newConnectGatewayTask("my-gateway", false), + newConnectGatewayTask(structs.ConnectIngressPrefix, "my-gateway", false), } expTG.Services[0].Name = "my-gateway" expTG.Tasks[0].Canonicalize(job, expTG) @@ -328,16 +328,27 @@ func TestJobEndpointConnect_groupConnectGatewayValidate(t *testing.T) { } func TestJobEndpointConnect_newConnectGatewayTask_host(t *testing.T) { - task := newConnectGatewayTask("service1", true) - require.Equal(t, "connect-ingress-service1", task.Name) - require.Equal(t, "connect-ingress:service1", string(task.Kind)) - require.Equal(t, ">= 1.8.0", task.Constraints[0].RTarget) - require.Equal(t, "host", task.Config["network_mode"]) - require.Nil(t, task.Lifecycle) + t.Run("ingress", func(t *testing.T) { + task := newConnectGatewayTask(structs.ConnectIngressPrefix, "foo", true) + require.Equal(t, "connect-ingress-foo", task.Name) + require.Equal(t, "connect-ingress:foo", string(task.Kind)) + require.Equal(t, ">= 1.8.0", task.Constraints[0].RTarget) + require.Equal(t, "host", task.Config["network_mode"]) + require.Nil(t, task.Lifecycle) + }) + + t.Run("terminating", func(t *testing.T) { + task := newConnectGatewayTask(structs.ConnectTerminatingPrefix, "bar", true) + require.Equal(t, "connect-terminating-bar", task.Name) + require.Equal(t, "connect-terminating:bar", string(task.Kind)) + require.Equal(t, ">= 1.8.0", task.Constraints[0].RTarget) + require.Equal(t, "host", task.Config["network_mode"]) + require.Nil(t, task.Lifecycle) + }) } func TestJobEndpointConnect_newConnectGatewayTask_bridge(t *testing.T) { - task := newConnectGatewayTask("service1", false) + task := newConnectGatewayTask(structs.ConnectIngressPrefix, "service1", false) require.NotContains(t, task.Config, "network_mode") } @@ -353,19 +364,27 @@ func TestJobEndpointConnect_hasGatewayTaskForService(t *testing.T) { require.False(t, result) }) - t.Run("has gateway task", func(t *testing.T) { + t.Run("has ingress task", func(t *testing.T) { result := hasGatewayTaskForService(&structs.TaskGroup{ Name: "group", Tasks: []*structs.Task{{ - Name: "task1", - Kind: "", - }, { Name: "ingress-gateway-my-service", Kind: structs.NewTaskKind(structs.ConnectIngressPrefix, "my-service"), }}, }, "my-service") require.True(t, result) }) + + t.Run("has terminating task", func(t *testing.T) { + result := hasGatewayTaskForService(&structs.TaskGroup{ + Name: "group", + Tasks: []*structs.Task{{ + Name: "terminating-gateway-my-service", + Kind: structs.NewTaskKind(structs.ConnectTerminatingPrefix, "my-service"), + }}, + }, "my-service") + require.True(t, result) + }) } func TestJobEndpointConnect_gatewayProxyIsDefault(t *testing.T) { @@ -411,17 +430,17 @@ func TestJobEndpointConnect_gatewayProxyIsDefault(t *testing.T) { func TestJobEndpointConnect_gatewayBindAddresses(t *testing.T) { t.Run("nil", func(t *testing.T) { - result := gatewayBindAddresses(nil) + result := gatewayBindAddressesIngress(nil) require.Nil(t, result) }) t.Run("no listeners", func(t *testing.T) { - result := gatewayBindAddresses(&structs.ConsulIngressConfigEntry{Listeners: nil}) + result := gatewayBindAddressesIngress(&structs.ConsulIngressConfigEntry{Listeners: nil}) require.Nil(t, result) }) t.Run("simple", func(t *testing.T) { - result := gatewayBindAddresses(&structs.ConsulIngressConfigEntry{ + result := gatewayBindAddressesIngress(&structs.ConsulIngressConfigEntry{ Listeners: []*structs.ConsulIngressListener{{ Port: 3000, Protocol: "tcp", @@ -439,7 +458,7 @@ func TestJobEndpointConnect_gatewayBindAddresses(t *testing.T) { }) t.Run("complex", func(t *testing.T) { - result := gatewayBindAddresses(&structs.ConsulIngressConfigEntry{ + result := gatewayBindAddressesIngress(&structs.ConsulIngressConfigEntry{ Listeners: []*structs.ConsulIngressListener{{ Port: 3000, Protocol: "tcp", @@ -502,7 +521,7 @@ func TestJobEndpointConnect_gatewayProxyForBridge(t *testing.T) { }, result) }) - t.Run("fill in defaults", func(t *testing.T) { + t.Run("ingress set defaults", func(t *testing.T) { result := gatewayProxyForBridge(&structs.ConsulGateway{ Proxy: &structs.ConsulGatewayProxy{ ConnectTimeout: helper.TimeToPtr(2 * time.Second), @@ -531,7 +550,7 @@ func TestJobEndpointConnect_gatewayProxyForBridge(t *testing.T) { }, result) }) - t.Run("leave as-is", func(t *testing.T) { + t.Run("ingress leave as-is", func(t *testing.T) { result := gatewayProxyForBridge(&structs.ConsulGateway{ Proxy: &structs.ConsulGatewayProxy{ Config: map[string]interface{}{"foo": 1}, @@ -554,4 +573,38 @@ func TestJobEndpointConnect_gatewayProxyForBridge(t *testing.T) { EnvoyGatewayBindAddresses: nil, }, result) }) + + t.Run("terminating set defaults", func(t *testing.T) { + result := gatewayProxyForBridge(&structs.ConsulGateway{ + Proxy: &structs.ConsulGatewayProxy{ + ConnectTimeout: helper.TimeToPtr(2 * time.Second), + EnvoyDNSDiscoveryType: "STRICT_DNS", + }, + Terminating: &structs.ConsulTerminatingConfigEntry{ + Services: []*structs.ConsulLinkedService{{ + Name: "service1", + CAFile: "/cafile.pem", + CertFile: "/certfile.pem", + KeyFile: "/keyfile.pem", + SNI: "", + }}, + }, + }) + require.Equal(t, &structs.ConsulGatewayProxy{ + ConnectTimeout: helper.TimeToPtr(2 * time.Second), + EnvoyGatewayNoDefaultBind: true, + EnvoyGatewayBindTaggedAddresses: false, + EnvoyDNSDiscoveryType: "STRICT_DNS", + EnvoyGatewayBindAddresses: map[string]*structs.ConsulGatewayBindAddress{ + "default": { + Address: "0.0.0.0", + Port: -1, + }, + }, + }, result) + }) + + t.Run("terminating leave as-is", func(t *testing.T) { + // + }) } diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index b1bc89c356c..fbc82d8b7a0 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -3178,7 +3178,7 @@ func TestClientEndpoint_taskUsesConnect(t *testing.T) { t.Run("task uses connect", func(t *testing.T) { try(t, &structs.Task{ - // see nomad.newConnectTask for how this works + // see nomad.newConnectSidecarTask for how this works Name: "connect-proxy-myservice", Kind: "connect-proxy:myservice", }, true) diff --git a/nomad/structs/connect.go b/nomad/structs/connect.go new file mode 100644 index 00000000000..79703876bfe --- /dev/null +++ b/nomad/structs/connect.go @@ -0,0 +1,33 @@ +package structs + +// ConsulConfigEntries represents Consul ConfigEntry definitions from a job. +type ConsulConfigEntries struct { + Ingress map[string]*ConsulIngressConfigEntry + Terminating map[string]*ConsulTerminatingConfigEntry + // Mesh later +} + +// ConfigEntries accumulates the Consul Configuration Entries defined in task groups +// of j. +func (j *Job) ConfigEntries() *ConsulConfigEntries { + entries := &ConsulConfigEntries{ + Ingress: make(map[string]*ConsulIngressConfigEntry), + Terminating: make(map[string]*ConsulTerminatingConfigEntry), + // Mesh later + } + + for _, tg := range j.TaskGroups { + for _, service := range tg.Services { + if service.Connect.IsGateway() { + gateway := service.Connect.Gateway + if ig := gateway.Ingress; ig != nil { + entries.Ingress[service.Name] = ig + } else if tg := gateway.Terminating; tg != nil { + entries.Terminating[service.Name] = tg + } // mesh later + } + } + } + + return entries +} diff --git a/nomad/structs/diff.go b/nomad/structs/diff.go index 2a30d2f6360..6a08e7c4e7b 100644 --- a/nomad/structs/diff.go +++ b/nomad/structs/diff.go @@ -827,12 +827,18 @@ func connectGatewayDiff(prev, next *ConsulGateway, contextual bool) *ObjectDiff diff.Objects = append(diff.Objects, gatewayProxyDiff) } - // Diff the ConsulGatewayIngress fields. + // Diff the ingress gateway fields. gatewayIngressDiff := connectGatewayIngressDiff(prev.Ingress, next.Ingress, contextual) if gatewayIngressDiff != nil { diff.Objects = append(diff.Objects, gatewayIngressDiff) } + // Diff the terminating gateway fields. + gatewayTerminatingDiff := connectGatewayTerminatingDiff(prev.Terminating, next.Terminating, contextual) + if gatewayTerminatingDiff != nil { + diff.Objects = append(diff.Objects, gatewayTerminatingDiff) + } + return diff } @@ -874,6 +880,99 @@ func connectGatewayIngressDiff(prev, next *ConsulIngressConfigEntry, contextual return diff } +func connectGatewayTerminatingDiff(prev, next *ConsulTerminatingConfigEntry, contextual bool) *ObjectDiff { + diff := &ObjectDiff{Type: DiffTypeNone, Name: "Terminating"} + var oldPrimitiveFlat, newPrimitiveFlat map[string]string + + if reflect.DeepEqual(prev, next) { + return nil + } else if prev == nil { + prev = new(ConsulTerminatingConfigEntry) + diff.Type = DiffTypeAdded + newPrimitiveFlat = flatmap.Flatten(next, nil, true) + } else if next == nil { + next = new(ConsulTerminatingConfigEntry) + diff.Type = DiffTypeDeleted + oldPrimitiveFlat = flatmap.Flatten(prev, nil, true) + } else { + diff.Type = DiffTypeEdited + oldPrimitiveFlat = flatmap.Flatten(prev, nil, true) + newPrimitiveFlat = flatmap.Flatten(next, nil, true) + } + + // Diff the primitive fields. + diff.Fields = fieldDiffs(oldPrimitiveFlat, newPrimitiveFlat, contextual) + + // Diff the Services lists. + gatewayLinkedServicesDiff := connectGatewayTerminatingLinkedServicesDiff(prev.Services, next.Services, contextual) + if gatewayLinkedServicesDiff != nil { + diff.Objects = append(diff.Objects, gatewayLinkedServicesDiff...) + } + + return diff +} + +// connectGatewayTerminatingLinkedServicesDiff diffs are a set of services keyed +// by service name. These objects contain only fields. +func connectGatewayTerminatingLinkedServicesDiff(prev, next []*ConsulLinkedService, contextual bool) []*ObjectDiff { + // create maps, diff the maps, key by linked service name + + prevMap := make(map[string]*ConsulLinkedService, len(prev)) + nextMap := make(map[string]*ConsulLinkedService, len(next)) + + for _, s := range prev { + prevMap[s.Name] = s + } + for _, s := range next { + nextMap[s.Name] = s + } + + var diffs []*ObjectDiff + for k, prevS := range prevMap { + // Diff the same, deleted, and edited + if diff := connectGatewayTerminatingLinkedServiceDiff(prevS, nextMap[k], contextual); diff != nil { + diffs = append(diffs, diff) + } + } + for k, nextS := range nextMap { + // Diff the added + if old, ok := prevMap[k]; !ok { + if diff := connectGatewayTerminatingLinkedServiceDiff(old, nextS, contextual); diff != nil { + diffs = append(diffs, diff) + } + } + } + + sort.Sort(ObjectDiffs(diffs)) + return diffs +} + +func connectGatewayTerminatingLinkedServiceDiff(prev, next *ConsulLinkedService, contextual bool) *ObjectDiff { + diff := &ObjectDiff{Type: DiffTypeNone, Name: "Service"} + var oldPrimitiveFlat, newPrimitiveFlat map[string]string + + if reflect.DeepEqual(prev, next) { + return nil + } else if prev == nil { + diff.Type = DiffTypeAdded + newPrimitiveFlat = flatmap.Flatten(next, nil, true) + } else if next == nil { + diff.Type = DiffTypeDeleted + oldPrimitiveFlat = flatmap.Flatten(prev, nil, true) + } else { + diff.Type = DiffTypeEdited + oldPrimitiveFlat = flatmap.Flatten(prev, nil, true) + newPrimitiveFlat = flatmap.Flatten(next, nil, true) + } + + // Diff the primitive fields. + diff.Fields = fieldDiffs(oldPrimitiveFlat, newPrimitiveFlat, contextual) + + // No objects today. + + return diff +} + func connectGatewayTLSConfigDiff(prev, next *ConsulGatewayTLSConfig, contextual bool) *ObjectDiff { diff := &ObjectDiff{Type: DiffTypeNone, Name: "TLS"} var oldPrimitiveFlat, newPrimitiveFlat map[string]string @@ -900,7 +999,7 @@ func connectGatewayTLSConfigDiff(prev, next *ConsulGatewayTLSConfig, contextual // connectGatewayIngressListenersDiff diffs are a set of listeners keyed by "protocol/port", which is // a nifty workaround having slices instead of maps. Presumably such a key will be unique, because if -// it is not the config entry is not going to work anyway. +// if is not the config entry is not going to work anyway. func connectGatewayIngressListenersDiff(prev, next []*ConsulIngressListener, contextual bool) []*ObjectDiff { // create maps, diff the maps, keys are fields, keys are (port+protocol) diff --git a/nomad/structs/diff_test.go b/nomad/structs/diff_test.go index be83b64917d..46c94a8828f 100644 --- a/nomad/structs/diff_test.go +++ b/nomad/structs/diff_test.go @@ -2630,6 +2630,7 @@ func TestTaskGroupDiff(t *testing.T) { Port: 2001, }, }, + EnvoyDNSDiscoveryType: "STRICT_DNS", EnvoyGatewayNoDefaultBind: false, Config: map[string]interface{}{ "foo": 1, @@ -2647,6 +2648,15 @@ func TestTaskGroupDiff(t *testing.T) { }}, }}, }, + Terminating: &ConsulTerminatingConfigEntry{ + Services: []*ConsulLinkedService{{ + Name: "linked1", + CAFile: "ca1.pem", + CertFile: "cert1.pem", + KeyFile: "key1.pem", + SNI: "linked1.consul", + }}, + }, }, }, }, @@ -2705,6 +2715,7 @@ func TestTaskGroupDiff(t *testing.T) { Port: 2002, }, }, + EnvoyDNSDiscoveryType: "LOGICAL_DNS", EnvoyGatewayNoDefaultBind: true, Config: map[string]interface{}{ "foo": 2, @@ -2723,6 +2734,15 @@ func TestTaskGroupDiff(t *testing.T) { }}, }}, }, + Terminating: &ConsulTerminatingConfigEntry{ + Services: []*ConsulLinkedService{{ + Name: "linked2", + CAFile: "ca2.pem", + CertFile: "cert2.pem", + KeyFile: "key2.pem", + SNI: "linked2.consul", + }}, + }, }, }, }, @@ -3031,6 +3051,12 @@ func TestTaskGroupDiff(t *testing.T) { Old: "1s", New: "2s", }, + { + Type: DiffTypeEdited, + Name: "EnvoyDNSDiscoveryType", + Old: "STRICT_DNS", + New: "LOGICAL_DNS", + }, { Type: DiffTypeEdited, Name: "EnvoyGatewayBindTaggedAddresses", @@ -3173,6 +3199,84 @@ func TestTaskGroupDiff(t *testing.T) { }, }, }, + { + Type: DiffTypeEdited, + Name: "Terminating", + Objects: []*ObjectDiff{ + { + Type: DiffTypeAdded, + Name: "Service", + Fields: []*FieldDiff{ + { + Type: DiffTypeAdded, + Name: "CAFile", + Old: "", + New: "ca2.pem", + }, + { + Type: DiffTypeAdded, + Name: "CertFile", + Old: "", + New: "cert2.pem", + }, + { + Type: DiffTypeAdded, + Name: "KeyFile", + Old: "", + New: "key2.pem", + }, + { + Type: DiffTypeAdded, + Name: "Name", + Old: "", + New: "linked2", + }, + { + Type: DiffTypeAdded, + Name: "SNI", + Old: "", + New: "linked2.consul", + }, + }, + }, + { + Type: DiffTypeDeleted, + Name: "Service", + Fields: []*FieldDiff{ + { + Type: DiffTypeDeleted, + Name: "CAFile", + Old: "ca1.pem", + New: "", + }, + { + Type: DiffTypeDeleted, + Name: "CertFile", + Old: "cert1.pem", + New: "", + }, + { + Type: DiffTypeDeleted, + Name: "KeyFile", + Old: "key1.pem", + New: "", + }, + { + Type: DiffTypeDeleted, + Name: "Name", + Old: "linked1", + New: "", + }, + { + Type: DiffTypeDeleted, + Name: "SNI", + Old: "linked1.consul", + New: "", + }, + }, + }, + }, + }, }, }, }, diff --git a/nomad/structs/services.go b/nomad/structs/services.go index 2ca1dd81f2d..073577b3c1d 100644 --- a/nomad/structs/services.go +++ b/nomad/structs/services.go @@ -733,11 +733,23 @@ func (c *ConsulConnect) IsNative() bool { return c != nil && c.Native } -// IsGateway checks if the service is a Connect gateway. +// IsGateway checks if the service is any type of connect gateway. func (c *ConsulConnect) IsGateway() bool { return c != nil && c.Gateway != nil } +// IsIngress checks if the service is an ingress gateway. +func (c *ConsulConnect) IsIngress() bool { + return c.IsGateway() && c.Gateway.Ingress != nil +} + +// IsTerminating checks if the service is a terminating gateway. +func (c *ConsulConnect) IsTerminating() bool { + return c.IsGateway() && c.Gateway.Terminating != nil +} + +// also mesh + // Validate that the Connect block represents exactly one of: // - Connect non-native service sidecar proxy // - Connect native service @@ -1231,21 +1243,32 @@ type ConsulGateway struct { // Ingress represents the Consul Configuration Entry for an Ingress Gateway. Ingress *ConsulIngressConfigEntry - // Terminating is not yet supported. - // Terminating *ConsulTerminatingConfigEntry + // Terminating represents the Consul Configuration Entry for a Terminating Gateway. + Terminating *ConsulTerminatingConfigEntry // Mesh is not yet supported. // Mesh *ConsulMeshConfigEntry } +func (g *ConsulGateway) Prefix() string { + switch { + case g.Ingress != nil: + return ConnectIngressPrefix + default: + return ConnectTerminatingPrefix + } + // also mesh +} + func (g *ConsulGateway) Copy() *ConsulGateway { if g == nil { return nil } return &ConsulGateway{ - Proxy: g.Proxy.Copy(), - Ingress: g.Ingress.Copy(), + Proxy: g.Proxy.Copy(), + Ingress: g.Ingress.Copy(), + Terminating: g.Terminating.Copy(), } } @@ -1262,6 +1285,10 @@ func (g *ConsulGateway) Equals(o *ConsulGateway) bool { return false } + if !g.Terminating.Equals(o.Terminating) { + return false + } + return true } @@ -1270,18 +1297,30 @@ func (g *ConsulGateway) Validate() error { return nil } - if g.Proxy != nil { - if err := g.Proxy.Validate(); err != nil { - return err - } + if err := g.Proxy.Validate(); err != nil { + return err } - // eventually one of: ingress, terminating, mesh - if g.Ingress != nil { - return g.Ingress.Validate() + if err := g.Ingress.Validate(); err != nil { + return err + } + + if err := g.Terminating.Validate(); err != nil { + return err } - return fmt.Errorf("Consul Gateway ingress Configuration Entry must be set") + // Exactly 1 of ingress/terminating/mesh(soon) must be set. + count := 0 + if g.Ingress != nil { + count++ + } + if g.Terminating != nil { + count++ + } + if count != 1 { + return fmt.Errorf("One Consul Gateway Configuration Entry must be set") + } + return nil } // ConsulGatewayBindAddress is equivalent to Consul's api/catalog.go ServiceAddress @@ -1328,7 +1367,7 @@ func (a *ConsulGatewayBindAddress) Validate() error { return fmt.Errorf("Consul Gateway Bind Address must be set") } - if a.Port <= 0 { + if a.Port <= 0 && a.Port != -1 { // port -1 => nomad autofill return fmt.Errorf("Consul Gateway Bind Address must set valid Port") } @@ -1344,6 +1383,7 @@ type ConsulGatewayProxy struct { EnvoyGatewayBindTaggedAddresses bool EnvoyGatewayBindAddresses map[string]*ConsulGatewayBindAddress EnvoyGatewayNoDefaultBind bool + EnvoyDNSDiscoveryType string Config map[string]interface{} } @@ -1352,20 +1392,29 @@ func (p *ConsulGatewayProxy) Copy() *ConsulGatewayProxy { return nil } - bindAddresses := make(map[string]*ConsulGatewayBindAddress, len(p.EnvoyGatewayBindAddresses)) - for k, v := range p.EnvoyGatewayBindAddresses { - bindAddresses[k] = v.Copy() - } - return &ConsulGatewayProxy{ ConnectTimeout: helper.TimeToPtr(*p.ConnectTimeout), EnvoyGatewayBindTaggedAddresses: p.EnvoyGatewayBindTaggedAddresses, - EnvoyGatewayBindAddresses: bindAddresses, + EnvoyGatewayBindAddresses: p.copyBindAddresses(), EnvoyGatewayNoDefaultBind: p.EnvoyGatewayNoDefaultBind, + EnvoyDNSDiscoveryType: p.EnvoyDNSDiscoveryType, Config: helper.CopyMapStringInterface(p.Config), } } +func (p *ConsulGatewayProxy) copyBindAddresses() map[string]*ConsulGatewayBindAddress { + if len(p.EnvoyGatewayBindAddresses) == 0 { + return nil + } + + bindAddresses := make(map[string]*ConsulGatewayBindAddress, len(p.EnvoyGatewayBindAddresses)) + for k, v := range p.EnvoyGatewayBindAddresses { + bindAddresses[k] = v.Copy() + } + + return bindAddresses +} + func (p *ConsulGatewayProxy) equalBindAddresses(o map[string]*ConsulGatewayBindAddress) bool { if len(p.EnvoyGatewayBindAddresses) != len(o) { return false @@ -1401,6 +1450,10 @@ func (p *ConsulGatewayProxy) Equals(o *ConsulGatewayProxy) bool { return false } + if p.EnvoyDNSDiscoveryType != o.EnvoyDNSDiscoveryType { + return false + } + if !opaqueMapsEqual(p.Config, o.Config) { return false } @@ -1408,6 +1461,11 @@ func (p *ConsulGatewayProxy) Equals(o *ConsulGatewayProxy) bool { return true } +const ( + strictDNS = "STRICT_DNS" + logicalDNS = "LOGICAL_DNS" +) + func (p *ConsulGatewayProxy) Validate() error { if p == nil { return nil @@ -1417,6 +1475,14 @@ func (p *ConsulGatewayProxy) Validate() error { return fmt.Errorf("Consul Gateway Proxy connection_timeout must be set") } + switch p.EnvoyDNSDiscoveryType { + case "", strictDNS, logicalDNS: + // Consul defaults to logical DNS, suitable for large scale workloads. + // https://www.envoyproxy.io/docs/envoy/v1.16.1/intro/arch_overview/upstream/service_discovery + default: + return fmt.Errorf("Consul Gateway Proxy Envoy DNS Discovery type must be %s or %s", strictDNS, logicalDNS) + } + for _, bindAddr := range p.EnvoyGatewayBindAddresses { if err := bindAddr.Validate(); err != nil { return err @@ -1671,3 +1737,143 @@ COMPARE: // order does not matter } return true } + +type ConsulLinkedService struct { + Name string + CAFile string + CertFile string + KeyFile string + SNI string +} + +func (s *ConsulLinkedService) Copy() *ConsulLinkedService { + if s == nil { + return nil + } + + return &ConsulLinkedService{ + Name: s.Name, + CAFile: s.CAFile, + CertFile: s.CertFile, + KeyFile: s.KeyFile, + SNI: s.SNI, + } +} + +func (s *ConsulLinkedService) Equals(o *ConsulLinkedService) bool { + if s == nil || o == nil { + return s == o + } + + switch { + case s.Name != o.Name: + return false + case s.CAFile != o.CAFile: + return false + case s.CertFile != o.CertFile: + return false + case s.KeyFile != o.KeyFile: + return false + case s.SNI != o.SNI: + return false + } + + return true +} + +func (s *ConsulLinkedService) Validate() error { + if s == nil { + return nil + } + + if s.Name == "" { + return fmt.Errorf("Consul Linked Service requires Name") + } + + caSet := s.CAFile != "" + certSet := s.CertFile != "" + keySet := s.KeyFile != "" + sniSet := s.SNI != "" + + if (certSet || keySet) && !caSet { + return fmt.Errorf("Consul Linked Service TLS requires CAFile") + } + + if certSet != keySet { + return fmt.Errorf("Consul Linked Service TLS Cert and Key must both be set") + } + + if sniSet && !caSet { + return fmt.Errorf("Consul Linked Service TLS SNI requires CAFile") + } + + return nil +} + +func linkedServicesEqual(servicesA, servicesB []*ConsulLinkedService) bool { + if len(servicesA) != len(servicesB) { + return false + } + +COMPARE: // order does not matter + for _, serviceA := range servicesA { + for _, serviceB := range servicesB { + if serviceA.Equals(serviceB) { + continue COMPARE + } + } + return false + } + return true +} + +type ConsulTerminatingConfigEntry struct { + // Namespace is not yet supported. + // Namespace string + + Services []*ConsulLinkedService +} + +func (e *ConsulTerminatingConfigEntry) Copy() *ConsulTerminatingConfigEntry { + if e == nil { + return nil + } + + var services []*ConsulLinkedService = nil + if n := len(e.Services); n > 0 { + services = make([]*ConsulLinkedService, n) + for i := 0; i < n; i++ { + services[i] = e.Services[i].Copy() + } + } + + return &ConsulTerminatingConfigEntry{ + Services: services, + } +} + +func (e *ConsulTerminatingConfigEntry) Equals(o *ConsulTerminatingConfigEntry) bool { + if e == nil || o == nil { + return e == o + } + + return linkedServicesEqual(e.Services, o.Services) +} + +func (e *ConsulTerminatingConfigEntry) Validate() error { + if e == nil { + return nil + } + + if len(e.Services) == 0 { + return fmt.Errorf("Consul Terminating Gateway requires at least one service") + } + + for _, service := range e.Services { + if err := service.Validate(); err != nil { + return err + } + } + + return nil +} diff --git a/nomad/structs/services_test.go b/nomad/structs/services_test.go index 12b3b5b55c4..0024c626ed3 100644 --- a/nomad/structs/services_test.go +++ b/nomad/structs/services_test.go @@ -560,8 +560,8 @@ var ( ConnectTimeout: helper.TimeToPtr(1 * time.Second), EnvoyGatewayBindTaggedAddresses: true, EnvoyGatewayBindAddresses: map[string]*ConsulGatewayBindAddress{ - "listener1": &ConsulGatewayBindAddress{Address: "10.0.0.1", Port: 2001}, - "listener2": &ConsulGatewayBindAddress{Address: "10.0.0.1", Port: 2002}, + "listener1": {Address: "10.0.0.1", Port: 2001}, + "listener2": {Address: "10.0.0.1", Port: 2002}, }, EnvoyGatewayNoDefaultBind: true, Config: map[string]interface{}{ @@ -591,8 +591,41 @@ var ( }}, }, } + + consulTerminatingGateway1 = &ConsulGateway{ + Proxy: &ConsulGatewayProxy{ + ConnectTimeout: helper.TimeToPtr(1 * time.Second), + EnvoyDNSDiscoveryType: "STRICT_DNS", + EnvoyGatewayBindAddresses: nil, + }, + Terminating: &ConsulTerminatingConfigEntry{ + Services: []*ConsulLinkedService{{ + Name: "linked-service1", + CAFile: "ca.pem", + CertFile: "cert.pem", + KeyFile: "key.pem", + SNI: "service1.consul", + }, { + Name: "linked-service2", + }}, + }, + } ) +func TestConsulGateway_Prefix(t *testing.T) { + t.Run("ingress", func(t *testing.T) { + result := (&ConsulGateway{Ingress: new(ConsulIngressConfigEntry)}).Prefix() + require.Equal(t, ConnectIngressPrefix, result) + }) + + t.Run("terminating", func(t *testing.T) { + result := (&ConsulGateway{Terminating: new(ConsulTerminatingConfigEntry)}).Prefix() + require.Equal(t, ConnectTerminatingPrefix, result) + }) + + // also mesh +} + func TestConsulGateway_Copy(t *testing.T) { t.Parallel() @@ -608,6 +641,13 @@ func TestConsulGateway_Copy(t *testing.T) { require.True(t, result.Equals(consulIngressGateway1)) require.True(t, consulIngressGateway1.Equals(result)) }) + + t.Run("as terminating", func(t *testing.T) { + result := consulTerminatingGateway1.Copy() + require.Equal(t, consulTerminatingGateway1, result) + require.True(t, result.Equals(consulTerminatingGateway1)) + require.True(t, consulTerminatingGateway1.Equals(result)) + }) } func TestConsulGateway_Equals_ingress(t *testing.T) { @@ -623,8 +663,8 @@ func TestConsulGateway_Equals_ingress(t *testing.T) { original := consulIngressGateway1.Copy() - type gway = ConsulGateway - type tweaker = func(g *gway) + type cg = ConsulGateway + type tweaker = func(g *cg) t.Run("reflexive", func(t *testing.T) { require.True(t, original.Equals(original)) @@ -641,27 +681,27 @@ func TestConsulGateway_Equals_ingress(t *testing.T) { // proxy stanza equality checks t.Run("mod gateway timeout", func(t *testing.T) { - try(t, func(g *gway) { g.Proxy.ConnectTimeout = helper.TimeToPtr(9 * time.Second) }) + try(t, func(g *cg) { g.Proxy.ConnectTimeout = helper.TimeToPtr(9 * time.Second) }) }) t.Run("mod gateway envoy_gateway_bind_tagged_addresses", func(t *testing.T) { - try(t, func(g *gway) { g.Proxy.EnvoyGatewayBindTaggedAddresses = false }) + try(t, func(g *cg) { g.Proxy.EnvoyGatewayBindTaggedAddresses = false }) }) t.Run("mod gateway envoy_gateway_bind_addresses", func(t *testing.T) { - try(t, func(g *gway) { + try(t, func(g *cg) { g.Proxy.EnvoyGatewayBindAddresses = map[string]*ConsulGatewayBindAddress{ - "listener3": &ConsulGatewayBindAddress{Address: "9.9.9.9", Port: 9999}, + "listener3": {Address: "9.9.9.9", Port: 9999}, } }) }) t.Run("mod gateway envoy_gateway_no_default_bind", func(t *testing.T) { - try(t, func(g *gway) { g.Proxy.EnvoyGatewayNoDefaultBind = false }) + try(t, func(g *cg) { g.Proxy.EnvoyGatewayNoDefaultBind = false }) }) t.Run("mod gateway config", func(t *testing.T) { - try(t, func(g *gway) { + try(t, func(g *cg) { g.Proxy.Config = map[string]interface{}{ "foo": 2, } @@ -671,40 +711,95 @@ func TestConsulGateway_Equals_ingress(t *testing.T) { // ingress config entry equality checks t.Run("mod ingress tls", func(t *testing.T) { - try(t, func(g *gway) { g.Ingress.TLS = nil }) - try(t, func(g *gway) { g.Ingress.TLS.Enabled = false }) + try(t, func(g *cg) { g.Ingress.TLS = nil }) + try(t, func(g *cg) { g.Ingress.TLS.Enabled = false }) }) t.Run("mod ingress listeners count", func(t *testing.T) { - try(t, func(g *gway) { g.Ingress.Listeners = g.Ingress.Listeners[:1] }) + try(t, func(g *cg) { g.Ingress.Listeners = g.Ingress.Listeners[:1] }) }) t.Run("mod ingress listeners port", func(t *testing.T) { - try(t, func(g *gway) { g.Ingress.Listeners[0].Port = 7777 }) + try(t, func(g *cg) { g.Ingress.Listeners[0].Port = 7777 }) }) t.Run("mod ingress listeners protocol", func(t *testing.T) { - try(t, func(g *gway) { g.Ingress.Listeners[0].Protocol = "tcp" }) + try(t, func(g *cg) { g.Ingress.Listeners[0].Protocol = "tcp" }) }) t.Run("mod ingress listeners services count", func(t *testing.T) { - try(t, func(g *gway) { g.Ingress.Listeners[0].Services = g.Ingress.Listeners[0].Services[:1] }) + try(t, func(g *cg) { g.Ingress.Listeners[0].Services = g.Ingress.Listeners[0].Services[:1] }) }) t.Run("mod ingress listeners services name", func(t *testing.T) { - try(t, func(g *gway) { g.Ingress.Listeners[0].Services[0].Name = "serviceX" }) + try(t, func(g *cg) { g.Ingress.Listeners[0].Services[0].Name = "serviceX" }) }) t.Run("mod ingress listeners services hosts count", func(t *testing.T) { - try(t, func(g *gway) { g.Ingress.Listeners[0].Services[0].Hosts = g.Ingress.Listeners[0].Services[0].Hosts[:1] }) + try(t, func(g *cg) { g.Ingress.Listeners[0].Services[0].Hosts = g.Ingress.Listeners[0].Services[0].Hosts[:1] }) }) t.Run("mod ingress listeners services hosts content", func(t *testing.T) { - try(t, func(g *gway) { g.Ingress.Listeners[0].Services[0].Hosts[0] = "255.255.255.255" }) + try(t, func(g *cg) { g.Ingress.Listeners[0].Services[0].Hosts[0] = "255.255.255.255" }) + }) +} + +func TestConsulGateway_Equals_terminating(t *testing.T) { + t.Parallel() + + original := consulTerminatingGateway1.Copy() + + type cg = ConsulGateway + type tweaker = func(c *cg) + + t.Run("reflexive", func(t *testing.T) { + require.True(t, original.Equals(original)) + }) + + try := func(t *testing.T, tweak tweaker) { + modifiable := original.Copy() + tweak(modifiable) + require.False(t, original.Equals(modifiable)) + require.False(t, modifiable.Equals(original)) + require.True(t, modifiable.Equals(modifiable)) + } + + // proxy stanza equality checks + + t.Run("mod dns discovery type", func(t *testing.T) { + try(t, func(g *cg) { g.Proxy.EnvoyDNSDiscoveryType = "LOGICAL_DNS" }) + }) + + // terminating config entry equality checks + + t.Run("mod terminating services count", func(t *testing.T) { + try(t, func(g *cg) { g.Terminating.Services = g.Terminating.Services[:1] }) + }) + + t.Run("mod terminating services name", func(t *testing.T) { + try(t, func(g *cg) { g.Terminating.Services[0].Name = "foo" }) + }) + + t.Run("mod terminating services ca_file", func(t *testing.T) { + try(t, func(g *cg) { g.Terminating.Services[0].CAFile = "foo.pem" }) + }) + + t.Run("mod terminating services cert_file", func(t *testing.T) { + try(t, func(g *cg) { g.Terminating.Services[0].CertFile = "foo.pem" }) + }) + + t.Run("mod terminating services key_file", func(t *testing.T) { + try(t, func(g *cg) { g.Terminating.Services[0].KeyFile = "foo.pem" }) + }) + + t.Run("mod terminating services sni", func(t *testing.T) { + try(t, func(g *cg) { g.Terminating.Services[0].SNI = "foo.consul" }) }) } func TestConsulGateway_ingressServicesEqual(t *testing.T) { + t.Parallel() + igs1 := []*ConsulIngressService{{ Name: "service1", Hosts: []string{"host1", "host2"}, @@ -714,6 +809,7 @@ func TestConsulGateway_ingressServicesEqual(t *testing.T) { }} require.False(t, ingressServicesEqual(igs1, nil)) + require.True(t, ingressServicesEqual(igs1, igs1)) reversed := []*ConsulIngressService{ igs1[1], igs1[0], // services reversed @@ -733,6 +829,8 @@ func TestConsulGateway_ingressServicesEqual(t *testing.T) { } func TestConsulGateway_ingressListenersEqual(t *testing.T) { + t.Parallel() + ils1 := []*ConsulIngressListener{{ Port: 2000, Protocol: "http", @@ -758,6 +856,8 @@ func TestConsulGateway_ingressListenersEqual(t *testing.T) { } func TestConsulGateway_Validate(t *testing.T) { + t.Parallel() + t.Run("bad proxy", func(t *testing.T) { err := (&ConsulGateway{ Proxy: &ConsulGatewayProxy{ @@ -776,9 +876,48 @@ func TestConsulGateway_Validate(t *testing.T) { }).Validate() require.EqualError(t, err, "Consul Ingress Gateway requires at least one listener") }) + + t.Run("bad terminating config entry", func(t *testing.T) { + err := (&ConsulGateway{ + Terminating: &ConsulTerminatingConfigEntry{ + Services: nil, + }, + }).Validate() + require.EqualError(t, err, "Consul Terminating Gateway requires at least one service") + }) + + t.Run("no config entry set", func(t *testing.T) { + err := (&ConsulGateway{ + Ingress: nil, + Terminating: nil, + }).Validate() + require.EqualError(t, err, "One Consul Gateway Configuration Entry must be set") + }) + + t.Run("multiple config entries set", func(t *testing.T) { + err := (&ConsulGateway{ + Ingress: &ConsulIngressConfigEntry{ + Listeners: []*ConsulIngressListener{{ + Port: 1111, + Protocol: "tcp", + Services: []*ConsulIngressService{{ + Name: "service1", + }}, + }}, + }, + Terminating: &ConsulTerminatingConfigEntry{ + Services: []*ConsulLinkedService{{ + Name: "linked-service1", + }}, + }, + }).Validate() + require.EqualError(t, err, "One Consul Gateway Configuration Entry must be set") + }) } func TestConsulGatewayBindAddress_Validate(t *testing.T) { + t.Parallel() + t.Run("no address", func(t *testing.T) { err := (&ConsulGatewayBindAddress{ Address: "", @@ -805,6 +944,8 @@ func TestConsulGatewayBindAddress_Validate(t *testing.T) { } func TestConsulGatewayProxy_Validate(t *testing.T) { + t.Parallel() + t.Run("no timeout", func(t *testing.T) { err := (&ConsulGatewayProxy{ ConnectTimeout: nil, @@ -824,6 +965,14 @@ func TestConsulGatewayProxy_Validate(t *testing.T) { require.EqualError(t, err, "Consul Gateway Bind Address must set valid Port") }) + t.Run("invalid dns discovery type", func(t *testing.T) { + err := (&ConsulGatewayProxy{ + ConnectTimeout: helper.TimeToPtr(1 * time.Second), + EnvoyDNSDiscoveryType: "RANDOM_DNS", + }).Validate() + require.EqualError(t, err, "Consul Gateway Proxy Envoy DNS Discovery type must be STRICT_DNS or LOGICAL_DNS") + }) + t.Run("ok with nothing set", func(t *testing.T) { err := (&ConsulGatewayProxy{ ConnectTimeout: helper.TimeToPtr(1 * time.Second), @@ -847,6 +996,8 @@ func TestConsulGatewayProxy_Validate(t *testing.T) { } func TestConsulIngressService_Validate(t *testing.T) { + t.Parallel() + t.Run("invalid name", func(t *testing.T) { err := (&ConsulIngressService{ Name: "", @@ -886,6 +1037,8 @@ func TestConsulIngressService_Validate(t *testing.T) { } func TestConsulIngressListener_Validate(t *testing.T) { + t.Parallel() + t.Run("invalid port", func(t *testing.T) { err := (&ConsulIngressListener{ Port: 0, @@ -941,6 +1094,8 @@ func TestConsulIngressListener_Validate(t *testing.T) { } func TestConsulIngressConfigEntry_Validate(t *testing.T) { + t.Parallel() + t.Run("no listeners", func(t *testing.T) { err := (&ConsulIngressConfigEntry{}).Validate() require.EqualError(t, err, "Consul Ingress Gateway requires at least one listener") @@ -972,3 +1127,172 @@ func TestConsulIngressConfigEntry_Validate(t *testing.T) { require.NoError(t, err) }) } + +func TestConsulLinkedService_Validate(t *testing.T) { + t.Parallel() + + t.Run("nil", func(t *testing.T) { + err := (*ConsulLinkedService)(nil).Validate() + require.Nil(t, err) + }) + + t.Run("missing name", func(t *testing.T) { + err := (&ConsulLinkedService{}).Validate() + require.EqualError(t, err, "Consul Linked Service requires Name") + }) + + t.Run("missing cafile", func(t *testing.T) { + err := (&ConsulLinkedService{ + Name: "linked-service1", + CertFile: "cert_file.pem", + KeyFile: "key_file.pem", + }).Validate() + require.EqualError(t, err, "Consul Linked Service TLS requires CAFile") + }) + + t.Run("mutual cert key", func(t *testing.T) { + err := (&ConsulLinkedService{ + Name: "linked-service1", + CAFile: "ca_file.pem", + CertFile: "cert_file.pem", + }).Validate() + require.EqualError(t, err, "Consul Linked Service TLS Cert and Key must both be set") + }) + + t.Run("sni without cafile", func(t *testing.T) { + err := (&ConsulLinkedService{ + Name: "linked-service1", + SNI: "service.consul", + }).Validate() + require.EqualError(t, err, "Consul Linked Service TLS SNI requires CAFile") + }) + + t.Run("minimal", func(t *testing.T) { + err := (&ConsulLinkedService{ + Name: "linked-service1", + }).Validate() + require.NoError(t, err) + }) + + t.Run("tls minimal", func(t *testing.T) { + err := (&ConsulLinkedService{ + Name: "linked-service1", + CAFile: "ca_file.pem", + }).Validate() + require.NoError(t, err) + }) + + t.Run("tls mutual", func(t *testing.T) { + err := (&ConsulLinkedService{ + Name: "linked-service1", + CAFile: "ca_file.pem", + CertFile: "cert_file.pem", + KeyFile: "key_file.pem", + }).Validate() + require.NoError(t, err) + }) + + t.Run("tls sni", func(t *testing.T) { + err := (&ConsulLinkedService{ + Name: "linked-service1", + CAFile: "ca_file.pem", + SNI: "linked-service.consul", + }).Validate() + require.NoError(t, err) + }) + + t.Run("tls complete", func(t *testing.T) { + err := (&ConsulLinkedService{ + Name: "linked-service1", + CAFile: "ca_file.pem", + CertFile: "cert_file.pem", + KeyFile: "key_file.pem", + SNI: "linked-service.consul", + }).Validate() + require.NoError(t, err) + }) +} + +func TestConsulLinkedService_Copy(t *testing.T) { + t.Parallel() + + require.Nil(t, (*ConsulLinkedService)(nil).Copy()) + require.Equal(t, &ConsulLinkedService{ + Name: "service1", + CAFile: "ca.pem", + CertFile: "cert.pem", + KeyFile: "key.pem", + SNI: "service1.consul", + }, (&ConsulLinkedService{ + Name: "service1", + CAFile: "ca.pem", + CertFile: "cert.pem", + KeyFile: "key.pem", + SNI: "service1.consul", + }).Copy()) +} + +func TestConsulLinkedService_linkedServicesEqual(t *testing.T) { + t.Parallel() + + services := []*ConsulLinkedService{{ + Name: "service1", + CAFile: "ca.pem", + }, { + Name: "service2", + CAFile: "ca.pem", + }} + + require.False(t, linkedServicesEqual(services, nil)) + require.True(t, linkedServicesEqual(services, services)) + + reversed := []*ConsulLinkedService{ + services[1], services[0], // reversed + } + + require.True(t, linkedServicesEqual(services, reversed)) + + different := []*ConsulLinkedService{ + services[0], &ConsulLinkedService{ + Name: "service2", + CAFile: "ca.pem", + SNI: "service2.consul", + }, + } + + require.False(t, linkedServicesEqual(services, different)) +} + +func TestConsulTerminatingConfigEntry_Validate(t *testing.T) { + t.Parallel() + + t.Run("nil", func(t *testing.T) { + err := (*ConsulTerminatingConfigEntry)(nil).Validate() + require.NoError(t, err) + }) + + t.Run("no services", func(t *testing.T) { + err := (&ConsulTerminatingConfigEntry{ + Services: make([]*ConsulLinkedService, 0), + }).Validate() + require.EqualError(t, err, "Consul Terminating Gateway requires at least one service") + }) + + t.Run("service invalid", func(t *testing.T) { + err := (&ConsulTerminatingConfigEntry{ + Services: []*ConsulLinkedService{{ + Name: "", + }}, + }).Validate() + require.EqualError(t, err, "Consul Linked Service requires Name") + }) + + t.Run("ok", func(t *testing.T) { + err := (&ConsulTerminatingConfigEntry{ + Services: []*ConsulLinkedService{{ + Name: "service1", + }}, + }).Validate() + require.NoError(t, err) + }) +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index ec905c06e23..5924abe1a9b 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -4385,25 +4385,6 @@ func (j *Job) ConnectTasks() []TaskKind { return kinds } -// ConfigEntries accumulates the Consul Configuration Entries defined in task groups -// of j. -// -// Currently Nomad only supports entries for connect ingress gateways. -func (j *Job) ConfigEntries() map[string]*ConsulIngressConfigEntry { - igEntries := make(map[string]*ConsulIngressConfigEntry) - for _, tg := range j.TaskGroups { - for _, service := range tg.Services { - if service.Connect.IsGateway() { - if ig := service.Connect.Gateway.Ingress; ig != nil { - igEntries[service.Name] = ig - } - // imagine also accumulating other entry types in the future - } - } - } - return igEntries -} - // RequiredSignals returns a mapping of task groups to tasks to their required // set of signals func (j *Job) RequiredSignals() map[string]map[string][]string { @@ -7124,10 +7105,16 @@ func (k TaskKind) IsConnectIngress() bool { return k.hasPrefix(ConnectIngressPrefix) } +func (k TaskKind) IsConnectTerminating() bool { + return k.hasPrefix(ConnectTerminatingPrefix) +} + func (k TaskKind) IsAnyConnectGateway() bool { switch { case k.IsConnectIngress(): return true + case k.IsConnectTerminating(): + return true default: return false } @@ -7149,8 +7136,7 @@ const ( // ConnectTerminatingPrefix is the prefix used for fields referencing a Consul // Connect Terminating Gateway Proxy. // - // Not yet supported. - // ConnectTerminatingPrefix = "connect-terminating" + ConnectTerminatingPrefix = "connect-terminating" // ConnectMeshPrefix is the prefix used for fields referencing a Consul Connect // Mesh Gateway Proxy. diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index a313a73b3ff..d379cacba23 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -669,6 +669,12 @@ func TestJob_ConnectTasks(t *testing.T) { Name: "generator", Kind: "connect-native:uuid-api", }}, + }, { + Name: "tg5", + Tasks: []*Task{{ + Name: "t1000", + Kind: "connect-terminating:t1000", + }}, }}, } @@ -681,6 +687,7 @@ func TestJob_ConnectTasks(t *testing.T) { NewTaskKind(ConnectIngressPrefix, "ingress"), NewTaskKind(ConnectNativePrefix, "uuid-fe"), NewTaskKind(ConnectNativePrefix, "uuid-api"), + NewTaskKind(ConnectTerminatingPrefix, "t1000"), } r.Equal(exp, connectTasks) @@ -878,6 +885,15 @@ func TestTask_UsesConnect(t *testing.T) { usesConnect := task.UsesConnect() require.True(t, usesConnect) }) + + t.Run("terminating gateway", func(t *testing.T) { + task := &Task{ + Name: "task1", + Kind: NewTaskKind(ConnectTerminatingPrefix, "task1"), + } + usesConnect := task.UsesConnect() + require.True(t, usesConnect) + }) } func TestTaskGroup_UsesConnect(t *testing.T) { diff --git a/vendor/github.com/hashicorp/nomad/api/services.go b/vendor/github.com/hashicorp/nomad/api/services.go index 59e92f6794e..f85e113cdfd 100644 --- a/vendor/github.com/hashicorp/nomad/api/services.go +++ b/vendor/github.com/hashicorp/nomad/api/services.go @@ -302,8 +302,8 @@ type ConsulGateway struct { // Ingress represents the Consul Configuration Entry for an Ingress Gateway. Ingress *ConsulIngressConfigEntry `hcl:"ingress,block"` - // Terminating is not yet supported. - // Terminating *ConsulTerminatingConfigEntry + // Terminating represents the Consul Configuration Entry for a Terminating Gateway. + Terminating *ConsulTerminatingConfigEntry `hcl:"terminating,block"` // Mesh is not yet supported. // Mesh *ConsulMeshConfigEntry @@ -315,6 +315,7 @@ func (g *ConsulGateway) Canonicalize() { } g.Proxy.Canonicalize() g.Ingress.Canonicalize() + g.Terminating.Canonicalize() } func (g *ConsulGateway) Copy() *ConsulGateway { @@ -323,8 +324,9 @@ func (g *ConsulGateway) Copy() *ConsulGateway { } return &ConsulGateway{ - Proxy: g.Proxy.Copy(), - Ingress: g.Ingress.Copy(), + Proxy: g.Proxy.Copy(), + Ingress: g.Ingress.Copy(), + Terminating: g.Terminating.Copy(), } } @@ -335,6 +337,8 @@ type ConsulGatewayBindAddress struct { } var ( + // defaultGatewayConnectTimeout is the default amount of time connections to + // upstreams are allowed before timing out. defaultGatewayConnectTimeout = 5 * time.Second ) @@ -347,6 +351,7 @@ type ConsulGatewayProxy struct { EnvoyGatewayBindTaggedAddresses bool `mapstructure:"envoy_gateway_bind_tagged_addresses" hcl:"envoy_gateway_bind_tagged_addresses,optional"` EnvoyGatewayBindAddresses map[string]*ConsulGatewayBindAddress `mapstructure:"envoy_gateway_bind_addresses" hcl:"envoy_gateway_bind_addresses,block"` EnvoyGatewayNoDefaultBind bool `mapstructure:"envoy_gateway_no_default_bind" hcl:"envoy_gateway_no_default_bind,optional"` + EnvoyDNSDiscoveryType string `mapstructure:"envoy_dns_discovery_type" hcl:"envoy_dns_discovery_type,optional"` Config map[string]interface{} `hcl:"config,block"` // escape hatch envoy config } @@ -395,6 +400,7 @@ func (p *ConsulGatewayProxy) Copy() *ConsulGatewayProxy { EnvoyGatewayBindTaggedAddresses: p.EnvoyGatewayBindTaggedAddresses, EnvoyGatewayBindAddresses: binds, EnvoyGatewayNoDefaultBind: p.EnvoyGatewayNoDefaultBind, + EnvoyDNSDiscoveryType: p.EnvoyDNSDiscoveryType, Config: config, } } @@ -547,9 +553,74 @@ func (e *ConsulIngressConfigEntry) Copy() *ConsulIngressConfigEntry { } } -// ConsulTerminatingConfigEntry is not yet supported. -// type ConsulTerminatingConfigEntry struct { -// } +type ConsulLinkedService struct { + Name string `hcl:"name,optional"` + CAFile string `hcl:"ca_file,optional"` + CertFile string `hcl:"cert_file,optional"` + KeyFile string `hcl:"key_file,optional"` + SNI string `hcl:"sni,optional"` +} + +func (s *ConsulLinkedService) Canonicalize() { + // nothing to do for now +} + +func (s *ConsulLinkedService) Copy() *ConsulLinkedService { + if s == nil { + return nil + } + + return &ConsulLinkedService{ + Name: s.Name, + CAFile: s.CAFile, + CertFile: s.CertFile, + KeyFile: s.KeyFile, + SNI: s.SNI, + } +} + +// ConsulTerminatingConfigEntry represents the Consul Configuration Entry type +// for a Terminating Gateway. +// +// https://www.consul.io/docs/agent/config-entries/terminating-gateway#available-fields +type ConsulTerminatingConfigEntry struct { + // Namespace is not yet supported. + // Namespace string + + Services []*ConsulLinkedService `hcl:"service,block"` +} + +func (e *ConsulTerminatingConfigEntry) Canonicalize() { + if e == nil { + return + } + + if len(e.Services) == 0 { + e.Services = nil + } + + for _, service := range e.Services { + service.Canonicalize() + } +} + +func (e *ConsulTerminatingConfigEntry) Copy() *ConsulTerminatingConfigEntry { + if e == nil { + return nil + } + + var services []*ConsulLinkedService = nil + if n := len(e.Services); n > 0 { + services = make([]*ConsulLinkedService, n) + for i := 0; i < n; i++ { + services[i] = e.Services[i].Copy() + } + } + + return &ConsulTerminatingConfigEntry{ + Services: services, + } +} // ConsulMeshConfigEntry is not yet supported. // type ConsulMeshConfigEntry struct { diff --git a/website/content/docs/job-specification/gateway.mdx b/website/content/docs/job-specification/gateway.mdx index 96a70aaefb2..941b14ad835 100644 --- a/website/content/docs/job-specification/gateway.mdx +++ b/website/content/docs/job-specification/gateway.mdx @@ -25,100 +25,10 @@ same network. For public ingress products like [NGINX](https://learn.hashicorp.c provide more suitable features. ```hcl -job "ingress-demo" { - - datacenters = ["dc1"] - - # This group will have a task providing the ingress gateway automatically - # created by Nomad. The ingress gateway is based on the Envoy proxy being - # managed by the docker driver. - group "ingress-group" { - - network { - mode = "bridge" - - # This example will enable plain HTTP traffic to access the uuid-api connect - # native example service on port 8080. - port "inbound" { - static = 8080 - to = 8080 - } - } - - service { - name = "my-ingress-service" - port = "8080" - - connect { - gateway { - - # Consul gateway [envoy] proxy options. - proxy { - # The following options are automatically set by Nomad if not - # explicitly configured when using bridge networking. - # - # envoy_gateway_no_default_bind = true - # envoy_gateway_bind_addresses "uuid-api" { - # address = "0.0.0.0" - # port = - # } - # - # Additional options are documented at - # https://www.nomadproject.io/docs/job-specification/gateway#proxy-parameters - } - - # Consul Ingress Gateway Configuration Entry. - ingress { - # Nomad will automatically manage the Configuration Entry in Consul - # given the parameters in the ingress block. - # - # Additional options are documented at - # https://www.nomadproject.io/docs/job-specification/gateway#ingress-parameters - listener { - port = 8080 - protocol = "tcp" - service { - name = "uuid-api" - } - } - } - } - } - } - } - - # The UUID generator from the connect-native demo is used as an example service. - # The ingress gateway above makes access to the service possible over normal HTTP. - # For example, - # - # $ curl $(dig +short @127.0.0.1 -p 8600 uuid-api.ingress.dc1.consul. ANY):8080 - group "generator" { - network { - mode = "host" - port "api" {} - } - - service { - name = "uuid-api" - port = "${NOMAD_PORT_api}" - - connect { - native = true - } - } - - task "generate" { - driver = "docker" - - config { - image = "hashicorpnomad/uuid-api:v3" - network_mode = "host" - } - - env { - BIND = "0.0.0.0" - PORT = "${NOMAD_PORT_api}" - } +service { + connect { + gateway { + # ... } } } @@ -126,10 +36,14 @@ job "ingress-demo" { ## `gateway` Parameters +Exactly one of `ingress` or `terminating` must be configured. + - `proxy` ([proxy]: nil) - Configuration of the Envoy proxy that will be injected into the task group. - `ingress` ([ingress]: nil) - Configuration Entry of type `ingress-gateway` that will be associated with the service. +- `terminating` ([terminating]: nil) - Configuration Entry of type `terminating-gateway` + that will be associated with the service. ### `proxy` Parameters @@ -159,6 +73,11 @@ envoy_gateway_bind_addresses "" { to configure the gateway's bind addresses. If `bridge` networking is in use, this value will default to `true` since the Envoy proxy does not need to bind to the service address from inside the network namespace. +- `envoy_dns_discovery_type` `(string: optional)` - Determintes how Envoy will + resolve hostnames. Defaults to `LOGICAL_DNS`. Must be one of `STRICT_DNS` or + `LOGICAL_DNS`. Details for each type are available in the [Envoy Documentation](https://www.envoyproxy.io/docs/envoy/v1.16.1/intro/arch_overview/upstream/service_discovery). + This option applies to terminating gateways that route to services addressed by a + hostname. - `config` `(map: nil)` - Escape hatch for [Advanced Configuration] of Envoy. #### `address` Parameters @@ -219,6 +138,35 @@ envoy_gateway_bind_addresses "" { field as well. TLS verification only matches against the hostname of the incoming connection, and does not take into account the port. +### `terminating` Parameters + +- `service` (array<[linked-service]>: required) - One or more services to be + linked with the gateway. The gateway will proxy traffic to these services. These + linked services must be registered with Consul for the gateway to discover their + addresses. They must also be registered in the same Consul datacenter as the + terminating gateway. + +#### `service` Parameters + +- `name` `(string: required)` - The name of the service to link with the gateway. + If the wildcard specifier `*` is provided, then ALL services within the Consul + namespace wil lbe linked with the gateway. + +- `ca_file` `(string: )` - A file path to a PEM-encoded certificate + authority. The file must be accessible by the gateway task. The certificate authority + is used to verify the authenticity of the service linked with the gateway. It + can be provided along with a `cert_file` and `key_file` for mutual TLS + authentication, or on its own for one-way TLS authentication. If none is provided + the gateway **will not** encrypt traffic to the destination. +- `cert_file` `(string: )` - A file path to a PEM-encoded certificate. + The file must be accessible by the gateway task. The certificate is provided to servers + to verify the gateway's authenticity. It must be provided if a `key_file` is provided. +- `key_file` `(string: )` - A file path to a PEM-encoded private key. + The file must be accessible by the gateway task. The key is used with the certificate + to verify the gateway's authenticity. It must be provided if a `cert_file` is provided. +- `sni` `(string: )` - An optional hostname or domain name to specify during + the TLS handshake. + ### Gateway with host networking Nomad supports running gateways using host networking. A static port must be allocated @@ -259,14 +207,244 @@ connect { } ``` -[proxy]: /docs/job-specification/gateway#proxy-parameters +### Examples + +#### ingress gateway + +```hcl +job "ingress-demo" { + + datacenters = ["dc1"] + + # This group will have a task providing the ingress gateway automatically + # created by Nomad. The ingress gateway is based on the Envoy proxy being + # managed by the docker driver. + group "ingress-group" { + + network { + mode = "bridge" + + # This example will enable plain HTTP traffic to access the uuid-api connect + # native example service on port 8080. + port "inbound" { + static = 8080 + to = 8080 + } + } + + service { + name = "my-ingress-service" + port = "8080" + + connect { + gateway { + + # Consul gateway [envoy] proxy options. + proxy { + # The following options are automatically set by Nomad if not + # explicitly configured when using bridge networking. + # + # envoy_gateway_no_default_bind = true + # envoy_gateway_bind_addresses "uuid-api" { + # address = "0.0.0.0" + # port = + # } + # + # Additional options are documented at + # https://www.nomadproject.io/docs/job-specification/gateway#proxy-parameters + } + + # Consul Ingress Gateway Configuration Entry. + ingress { + # Nomad will automatically manage the Configuration Entry in Consul + # given the parameters in the ingress block. + # + # Additional options are documented at + # https://www.nomadproject.io/docs/job-specification/gateway#ingress-parameters + listener { + port = 8080 + protocol = "tcp" + service { + name = "uuid-api" + } + } + } + } + } + } + } + + # The UUID generator from the connect-native demo is used as an example service. + # The ingress gateway above makes access to the service possible over normal HTTP. + # For example, + # + # $ curl $(dig +short @127.0.0.1 -p 8600 uuid-api.ingress.dc1.consul. ANY):8080 + group "generator" { + network { + mode = "host" + port "api" {} + } + + service { + name = "uuid-api" + port = "${NOMAD_PORT_api}" + + connect { + native = true + } + } + + task "generate" { + driver = "docker" + + config { + image = "hashicorpnomad/uuid-api:v3" + network_mode = "host" + } + + env { + BIND = "0.0.0.0" + PORT = "${NOMAD_PORT_api}" + } + } + } +} +``` + +#### terminating gateway + +```hcl +job "countdash-terminating" { + + datacenters = ["dc1"] + + # This group provides the service that exists outside of the Consul Connect + # service mesh. It is using host networking and listening to a statically + # allocated port. + group "api" { + network { + mode = "host" + port "port" { + static = "9001" + } + } + + # This example will enable services in the service mesh to make requests + # to this service which is not in the service mesh by making requests + # through the terminating gateway. + service { + name = "count-api" + port = "port" + } + + task "api" { + driver = "docker" + + config { + image = "hashicorpnomad/counter-api:v3" + network_mode = "host" + } + } + } + + group "gateway" { + network { + mode = "bridge" + } + + service { + name = "api-gateway" + + connect { + gateway { + # Consul gateway [envoy] proxy options. + proxy { + # The following options are automatically set by Nomad if not explicitly + # configured with using bridge networking. + # + # envoy_gateway_no_default_bind = true + # envoy_gateway_bind_addresses "default" { + # address = "0.0.0.0" + # port = + # } + # Additional options are documented at + # https://www.nomadproject.io/docs/job-specification/gateway#proxy-parameters + } + + # Consul Terminating Gateway Configuration Entry. + terminating { + # Nomad will automatically manage the Configuration Entry in Consul + # given the parameters in the terminating block. + # + # Additional options are documented at + # https://www.nomadproject.io/docs/job-specification/gateway#terminating-parameters + service { + name = "count-api" + } + } + } + } + } + } + + # The dashboard service is in the service mesh, making use of bridge network + # mode and connect.sidecar_service. When running, the dashboard should be + # available from a web browser at localhost:9002. + group "dashboard" { + network { + mode = "bridge" + + port "http" { + static = 9002 + to = 9002 + } + } + + service { + name = "count-dashboard" + port = "9002" + + connect { + sidecar_service { + proxy { + upstreams { + # By configuring an upstream destination to the linked service of + # the terminating gateway, the dashboard is able to make requests + # through the gateway to the count-api service. + destination_name = "count-api" + local_bind_port = 8080 + } + } + } + } + } + + task "dashboard" { + driver = "docker" + + env { + COUNTING_SERVICE_URL = "http://${NOMAD_UPSTREAM_ADDR_count_api}" + } + + config { + image = "hashicorpnomad/counter-dashboard:v3" + } + } + } +} +``` + +[address]: /docs/job-specification/gateway#address-parameters +[advanced configuration]: https://www.consul.io/docs/connect/proxies/envoy#advanced-configuration +[connect_timeout_ms]: https://www.consul.io/docs/agent/config-entries/service-resolver#connecttimeout +[envoy docker]: https://hub.docker.com/r/envoyproxy/envoy/tags [ingress]: /docs/job-specification/gateway#ingress-parameters -[tls]: /docs/job-specification/gateway#tls-parameters +[proxy]: /docs/job-specification/gateway#proxy-parameters +[linked-service]: /docs/job-specification/gateway#service-parameters-1 [listener]: /docs/job-specification/gateway#listener-parameters [service]: /docs/job-specification/gateway#service-parameters [service-default]: https://www.consul.io/docs/agent/config-entries/service-defaults [sidecar_task]: /docs/job-specification/sidecar_task -[connect_timeout_ms]: https://www.consul.io/docs/agent/config-entries/service-resolver#connecttimeout -[address]: /docs/job-specification/gateway#address-parameters -[advanced configuration]: https://www.consul.io/docs/connect/proxies/envoy#advanced-configuration -[envoy docker]: https://hub.docker.com/r/envoyproxy/envoy/tags +[terminating]: /docs/job-specification/gateway#terminating-parameters +[tls]: /docs/job-specification/gateway#tls-parameters +