From e907f884082affc5056a30190315823830b0262e Mon Sep 17 00:00:00 2001 From: Seth Hoenig Date: Mon, 12 Apr 2021 13:10:10 -0600 Subject: [PATCH] consul/connect: add support for connect mesh gateways This PR implements first-class support for Nomad running Consul Connect Mesh Gateways. Mesh gateways enable services in the Connect mesh to make cross-DC connections via gateways, where each datacenter may not have full node interconnectivity. Consul docs with more information: https://www.consul.io/docs/connect/gateways/mesh-gateway The following group level service block can be used to establish a Connect mesh gateway. service { connect { gateway { mesh { // no configuration } } } } Services can make use of a mesh gateway by configuring so in their upstream blocks, e.g. service { connect { sidecar_service { proxy { upstreams { destination_name = "" local_bind_port = datacenter = "" mesh_gateway { mode = "" } } } } } } Typical use of a mesh gateway is to create a bridge between datacenters. A mesh gateway should then be configured with a service port that is mapped from a host_network configured on a WAN interface in Nomad agent config, e.g. client { host_network "public" { interface = "eth1" } } Create a port mapping in the group.network block for use by the mesh gateway service from the public host_network, e.g. network { mode = "bridge" port "mesh_wan" { host_network = "public" } } Use this port label for the service.port of the mesh gateway, e.g. service { name = "mesh-gateway" port = "mesh_wan" connect { gateway { mesh {} } } } Currently Envoy is the only supported gateway implementation in Consul. By default Nomad client will run the latest official Envoy docker image supported by the local Consul agent. The Envoy task can be customized by setting `meta.connect.gateway_image` in agent config or by setting the `connect.sidecar_task` block. Gateways require Consul 1.8.0+, enforced by the Nomad scheduler. Closes #9446 --- api/services.go | 97 ++++++++- api/services_test.go | 132 ++++++++++++ .../taskrunner/envoy_bootstrap_hook.go | 35 ++- .../taskrunner/envoy_bootstrap_hook_test.go | 28 +++ command/agent/consul/connect.go | 25 +++ command/agent/consul/connect_test.go | 33 ++- command/agent/consul/service_client.go | 18 +- command/agent/job_endpoint.go | 20 +- command/agent/job_endpoint_test.go | 27 ++- helper/envoy/envoy.go | 18 +- helper/envoy/envoy_test.go | 29 +++ nomad/job_endpoint_hook_connect.go | 51 ++++- nomad/job_endpoint_hook_connect_test.go | 181 +++++++++++++++- nomad/mock/mock.go | 102 ++++++++- nomad/structs/connect_test.go | 21 ++ nomad/structs/diff.go | 107 +++++++++- nomad/structs/diff_test.go | 23 ++ nomad/structs/services.go | 138 ++++++++++-- nomad/structs/services_test.go | 102 ++++++++- nomad/structs/structs.go | 16 +- .../hashicorp/nomad/api/services.go | 97 ++++++++- .../docs/job-specification/gateway.mdx | 202 ++++++++++++++++++ .../docs/job-specification/upstreams.mdx | 24 ++- 23 files changed, 1430 insertions(+), 96 deletions(-) create mode 100644 helper/envoy/envoy_test.go create mode 100644 nomad/structs/connect_test.go diff --git a/api/services.go b/api/services.go index 880ba7f7973..365bf848463 100644 --- a/api/services.go +++ b/api/services.go @@ -281,17 +281,81 @@ func (cp *ConsulProxy) Canonicalize() { cp.Upstreams = nil } + for i := 0; i < len(cp.Upstreams); i++ { + cp.Upstreams[i].Canonicalize() + } + if len(cp.Config) == 0 { cp.Config = nil } } +// ConsulMeshGateway is used to configure mesh gateway usage when connecting to +// a connect upstream in another datacenter. +type ConsulMeshGateway struct { + // Mode configures how an upstream should be accessed with regard to using + // mesh gateways. + // + // local - the connect proxy makes outbound connections through mesh gateway + // originating in the same datacenter. + // + // remote - the connect proxy makes outbound connections to a mesh gateway + // in the destination datacenter. + // + // none (default) - no mesh gateway is used, the proxy makes outbound connections + // directly to destination services. + // + // https://www.consul.io/docs/connect/gateways/mesh-gateway#modes-of-operation + Mode string `mapstructure:"mode" hcl:"mode,optional"` +} + +func (c *ConsulMeshGateway) Canonicalize() { + if c == nil { + return + } + + if c.Mode == "" { + c.Mode = "none" + } +} + +func (c *ConsulMeshGateway) Copy() *ConsulMeshGateway { + if c == nil { + return nil + } + + return &ConsulMeshGateway{ + Mode: c.Mode, + } +} + // ConsulUpstream represents a Consul Connect upstream jobspec stanza. type ConsulUpstream struct { - DestinationName string `mapstructure:"destination_name" hcl:"destination_name,optional"` - LocalBindPort int `mapstructure:"local_bind_port" hcl:"local_bind_port,optional"` - Datacenter string `mapstructure:"datacenter" hcl:"datacenter,optional"` - LocalBindAddress string `mapstructure:"local_bind_address" hcl:"local_bind_address,optional"` + DestinationName string `mapstructure:"destination_name" hcl:"destination_name,optional"` + LocalBindPort int `mapstructure:"local_bind_port" hcl:"local_bind_port,optional"` + Datacenter string `mapstructure:"datacenter" hcl:"datacenter,optional"` + LocalBindAddress string `mapstructure:"local_bind_address" hcl:"local_bind_address,optional"` + MeshGateway *ConsulMeshGateway `mapstructure:"mesh_gateway" hcl:"mesh_gateway,block"` +} + +func (cu *ConsulUpstream) Copy() *ConsulUpstream { + if cu == nil { + return nil + } + return &ConsulUpstream{ + DestinationName: cu.DestinationName, + LocalBindPort: cu.LocalBindPort, + Datacenter: cu.Datacenter, + LocalBindAddress: cu.LocalBindAddress, + MeshGateway: cu.MeshGateway.Copy(), + } +} + +func (cu *ConsulUpstream) Canonicalize() { + if cu == nil { + return + } + cu.MeshGateway.Canonicalize() } type ConsulExposeConfig struct { @@ -326,8 +390,8 @@ type ConsulGateway struct { // Terminating represents the Consul Configuration Entry for a Terminating Gateway. Terminating *ConsulTerminatingConfigEntry `hcl:"terminating,block"` - // Mesh is not yet supported. - // Mesh *ConsulMeshConfigEntry + // Mesh indicates the Consul service should be a Mesh Gateway. + Mesh *ConsulMeshConfigEntry `hcl:"mesh,block"` } func (g *ConsulGateway) Canonicalize() { @@ -643,6 +707,21 @@ func (e *ConsulTerminatingConfigEntry) Copy() *ConsulTerminatingConfigEntry { } } -// ConsulMeshConfigEntry is not yet supported. -// type ConsulMeshConfigEntry struct { -// } +// ConsulMeshConfigEntry is a stub used to represent that the gateway service type +// should be for a Mesh Gateway. Unlike Ingress and Terminating, there is no +// actual Consul Config Entry type for mesh-gateway, at least for now. We still +// create a type for future proofing, instead just using a bool for example. +type ConsulMeshConfigEntry struct { + // nothing in here +} + +func (e *ConsulMeshConfigEntry) Canonicalize() { + return +} + +func (e *ConsulMeshConfigEntry) Copy() *ConsulMeshConfigEntry { + if e == nil { + return nil + } + return new(ConsulMeshConfigEntry) +} diff --git a/api/services_test.go b/api/services_test.go index 705787ef2ef..14cd3827f8e 100644 --- a/api/services_test.go +++ b/api/services_test.go @@ -214,6 +214,68 @@ func TestService_Connect_ConsulProxy_Canonicalize(t *testing.T) { require.Nil(t, cp.Upstreams) require.Nil(t, cp.Config) }) + + t.Run("fix upstream mesh_gateway", func(t *testing.T) { + cp := &ConsulProxy{ + Upstreams: []*ConsulUpstream{{ + MeshGateway: &ConsulMeshGateway{ + Mode: "", + }}, + }, + } + cp.Canonicalize() + require.Equal(t, "none", cp.Upstreams[0].MeshGateway.Mode) + }) +} + +func TestService_Connect_ConsulUpstream_Copy(t *testing.T) { + t.Parallel() + + t.Run("nil upstream", func(t *testing.T) { + cu := (*ConsulUpstream)(nil) + result := cu.Copy() + require.Nil(t, result) + }) + + t.Run("complete upstream", func(t *testing.T) { + cu := &ConsulUpstream{ + DestinationName: "dest1", + Datacenter: "dc2", + LocalBindPort: 2000, + LocalBindAddress: "10.0.0.1", + MeshGateway: &ConsulMeshGateway{Mode: "remote"}, + } + result := cu.Copy() + require.Equal(t, cu, result) + }) +} + +func TestService_Connect_ConsulUpstream_Canonicalize(t *testing.T) { + t.Parallel() + + t.Run("nil upstream", func(t *testing.T) { + cu := (*ConsulUpstream)(nil) + cu.Canonicalize() + require.Nil(t, cu) + }) + + t.Run("fix mesh_gateway", func(t *testing.T) { + cu := &ConsulUpstream{ + DestinationName: "dest1", + Datacenter: "dc2", + LocalBindPort: 2000, + LocalBindAddress: "10.0.0.1", + MeshGateway: &ConsulMeshGateway{Mode: ""}, + } + cu.Canonicalize() + require.Equal(t, &ConsulUpstream{ + DestinationName: "dest1", + Datacenter: "dc2", + LocalBindPort: 2000, + LocalBindAddress: "10.0.0.1", + MeshGateway: &ConsulMeshGateway{Mode: "none"}, + }, cu) + }) } func TestService_Connect_proxy_settings(t *testing.T) { @@ -508,3 +570,73 @@ func TestService_ConsulTerminatingConfigEntry_Copy(t *testing.T) { require.Equal(t, entry, result) }) } + +func TestService_ConsulMeshConfigEntry_Canonicalize(t *testing.T) { + t.Parallel() + + t.Run("nil", func(t *testing.T) { + ce := (*ConsulMeshConfigEntry)(nil) + ce.Canonicalize() + require.Nil(t, ce) + }) + + t.Run("instantiated", func(t *testing.T) { + ce := new(ConsulMeshConfigEntry) + ce.Canonicalize() + require.NotNil(t, ce) + }) +} + +func TestService_ConsulMeshConfigEntry_Copy(t *testing.T) { + t.Parallel() + + t.Run("nil", func(t *testing.T) { + ce := (*ConsulMeshConfigEntry)(nil) + ce2 := ce.Copy() + require.Nil(t, ce2) + }) + + t.Run("instantiated", func(t *testing.T) { + ce := new(ConsulMeshConfigEntry) + ce2 := ce.Copy() + require.NotNil(t, ce2) + }) +} + +func TestService_ConsulMeshGateway_Canonicalize(t *testing.T) { + t.Parallel() + + t.Run("nil", func(t *testing.T) { + c := (*ConsulMeshGateway)(nil) + c.Canonicalize() + require.Nil(t, c) + }) + + t.Run("unset mode", func(t *testing.T) { + c := &ConsulMeshGateway{ + Mode: "", + } + c.Canonicalize() + require.Equal(t, &ConsulMeshGateway{ + Mode: "none", + }, c) + }) +} + +func TestService_ConsulMeshGateway_Copy(t *testing.T) { + t.Parallel() + + t.Run("nil", func(t *testing.T) { + c := (*ConsulMeshGateway)(nil) + result := c.Copy() + require.Nil(t, result) + }) + + t.Run("instantiated", func(t *testing.T) { + c := &ConsulMeshGateway{ + Mode: "local", + } + result := c.Copy() + require.Equal(t, c, result) + }) +} diff --git a/client/allocrunner/taskrunner/envoy_bootstrap_hook.go b/client/allocrunner/taskrunner/envoy_bootstrap_hook.go index d2ef3df53d1..ee58b03a0bd 100644 --- a/client/allocrunner/taskrunner/envoy_bootstrap_hook.go +++ b/client/allocrunner/taskrunner/envoy_bootstrap_hook.go @@ -164,8 +164,18 @@ func (envoyBootstrapHook) Name() string { } func isConnectKind(kind string) bool { - kinds := []string{structs.ConnectProxyPrefix, structs.ConnectIngressPrefix, structs.ConnectTerminatingPrefix} - return helper.SliceStringContains(kinds, kind) + switch kind { + case structs.ConnectProxyPrefix: + return true + case structs.ConnectIngressPrefix: + return true + case structs.ConnectTerminatingPrefix: + return true + case structs.ConnectMeshPrefix: + return true + default: + return false + } } func (_ *envoyBootstrapHook) extractNameAndKind(kind structs.TaskKind) (string, string, error) { @@ -183,7 +193,7 @@ func (_ *envoyBootstrapHook) extractNameAndKind(kind structs.TaskKind) (string, return serviceKind, serviceName, nil } -func (h *envoyBootstrapHook) lookupService(svcKind, svcName, tgName string, taskEnv *taskenv.TaskEnv) (*structs.Service, error) { +func (h *envoyBootstrapHook) lookupService(svcKind, svcName string, taskEnv *taskenv.TaskEnv) (*structs.Service, error) { tg := h.alloc.Job.LookupTaskGroup(h.alloc.TaskGroup) interpolatedServices := taskenv.InterpolateServices(taskEnv, tg.Services) @@ -221,7 +231,7 @@ func (h *envoyBootstrapHook) Prestart(ctx context.Context, req *ifs.TaskPrestart return err } - service, err := h.lookupService(serviceKind, serviceName, h.alloc.TaskGroup, req.TaskEnv) + service, err := h.lookupService(serviceKind, serviceName, req.TaskEnv) if err != nil { return err } @@ -404,6 +414,11 @@ func (h *envoyBootstrapHook) proxyServiceID(group string, service *structs.Servi return agentconsul.MakeAllocServiceID(h.alloc.ID, "group-"+group, service) } +// newEnvoyBootstrapArgs is used to prepare for the invocation of the +// 'consul connect envoy' command with arguments which will bootstrap the connect +// proxy or gateway. +// +// https://www.consul.io/commands/connect/envoy#consul-connect-envoy func (h *envoyBootstrapHook) newEnvoyBootstrapArgs( group string, service *structs.Service, grpcAddr, envoyAdminBind, siToken, filepath string, @@ -416,19 +431,23 @@ func (h *envoyBootstrapHook) newEnvoyBootstrapArgs( ) namespace = h.getConsulNamespace() + id := h.proxyServiceID(group, service) switch { case service.Connect.HasSidecar(): - sidecarForID = h.proxyServiceID(group, service) + sidecarForID = id case service.Connect.IsIngress(): - proxyID = h.proxyServiceID(group, service) + proxyID = id gateway = "ingress" case service.Connect.IsTerminating(): - proxyID = h.proxyServiceID(group, service) + proxyID = id gateway = "terminating" + case service.Connect.IsMesh(): + proxyID = id + gateway = "mesh" } - h.logger.Debug("bootstrapping envoy", + h.logger.Info("bootstrapping envoy", "sidecar_for", service.Name, "bootstrap_file", filepath, "sidecar_for_id", sidecarForID, "grpc_addr", grpcAddr, "admin_bind", envoyAdminBind, "gateway", gateway, diff --git a/client/allocrunner/taskrunner/envoy_bootstrap_hook_test.go b/client/allocrunner/taskrunner/envoy_bootstrap_hook_test.go index 5f9d78dfd7a..58e0ed988dd 100644 --- a/client/allocrunner/taskrunner/envoy_bootstrap_hook_test.go +++ b/client/allocrunner/taskrunner/envoy_bootstrap_hook_test.go @@ -193,6 +193,25 @@ func TestEnvoyBootstrapHook_envoyBootstrapArgs(t *testing.T) { "-proxy-id", "_nomad-task-803cb569-881c-b0d8-9222-360bcc33157e-group-ig-ig-8080", }, result) }) + + t.Run("mesh gateway", func(t *testing.T) { + ebArgs := envoyBootstrapArgs{ + consulConfig: consulPlainConfig, + grpcAddr: "1.1.1.1", + envoyAdminBind: "localhost:3333", + gateway: "my-mesh-gateway", + proxyID: "_nomad-task-803cb569-881c-b0d8-9222-360bcc33157e-group-mesh-mesh-8080", + } + result := ebArgs.args() + require.Equal(t, []string{"connect", "envoy", + "-grpc-addr", "1.1.1.1", + "-http-addr", "2.2.2.2", + "-admin-bind", "localhost:3333", + "-bootstrap", + "-gateway", "my-mesh-gateway", + "-proxy-id", "_nomad-task-803cb569-881c-b0d8-9222-360bcc33157e-group-mesh-mesh-8080", + }, result) + }) } func TestEnvoyBootstrapHook_envoyBootstrapEnv(t *testing.T) { @@ -809,3 +828,12 @@ func TestTaskRunner_EnvoyBootstrapHook_grpcAddress(t *testing.T) { require.Equal(t, "127.0.0.1:8502", hostH.grpcAddress(nil)) }) } + +func TestTaskRunner_EnvoyBootstrapHook_isConnectKind(t *testing.T) { + require.True(t, isConnectKind(structs.ConnectProxyPrefix)) + require.True(t, isConnectKind(structs.ConnectIngressPrefix)) + require.True(t, isConnectKind(structs.ConnectTerminatingPrefix)) + require.True(t, isConnectKind(structs.ConnectMeshPrefix)) + require.False(t, isConnectKind("")) + require.False(t, isConnectKind("something")) +} diff --git a/command/agent/consul/connect.go b/command/agent/consul/connect.go index 267f26aa77f..dfc7b6c1ba6 100644 --- a/command/agent/consul/connect.go +++ b/command/agent/consul/connect.go @@ -198,11 +198,36 @@ func connectUpstreams(in []structs.ConsulUpstream) []api.Upstream { LocalBindPort: upstream.LocalBindPort, Datacenter: upstream.Datacenter, LocalBindAddress: upstream.LocalBindAddress, + MeshGateway: connectMeshGateway(upstream.MeshGateway), } } return upstreams } +// connectMeshGateway creates an api.MeshGatewayConfig from the nomad upstream +// block. A non-existent config or unsupported gateway mode will default to the +// Consul default mode. +func connectMeshGateway(in *structs.ConsulMeshGateway) api.MeshGatewayConfig { + gw := api.MeshGatewayConfig{ + Mode: api.MeshGatewayModeDefault, + } + + if in == nil { + return gw + } + + switch in.Mode { + case "local": + gw.Mode = api.MeshGatewayModeLocal + case "remote": + gw.Mode = api.MeshGatewayModeRemote + case "none": + gw.Mode = api.MeshGatewayModeNone + } + + return gw +} + func connectProxyConfig(cfg map[string]interface{}, port int) map[string]interface{} { if cfg == nil { cfg = make(map[string]interface{}) diff --git a/command/agent/consul/connect_test.go b/command/agent/consul/connect_test.go index 52f2eea807b..4f34306642b 100644 --- a/command/agent/consul/connect_test.go +++ b/command/agent/consul/connect_test.go @@ -514,7 +514,7 @@ func TestConnect_newConnectGateway(t *testing.T) { ConnectTimeout: helper.TimeToPtr(1 * time.Second), EnvoyGatewayBindTaggedAddresses: true, EnvoyGatewayBindAddresses: map[string]*structs.ConsulGatewayBindAddress{ - "service1": &structs.ConsulGatewayBindAddress{ + "service1": { Address: "10.0.0.1", Port: 2000, }, @@ -532,7 +532,7 @@ func TestConnect_newConnectGateway(t *testing.T) { "connect_timeout_ms": int64(1000), "envoy_gateway_bind_tagged_addresses": true, "envoy_gateway_bind_addresses": map[string]*structs.ConsulGatewayBindAddress{ - "service1": &structs.ConsulGatewayBindAddress{ + "service1": { Address: "10.0.0.1", Port: 2000, }, @@ -544,3 +544,32 @@ func TestConnect_newConnectGateway(t *testing.T) { }, result) }) } + +func Test_connectMeshGateway(t *testing.T) { + t.Parallel() + + t.Run("nil", func(t *testing.T) { + result := connectMeshGateway(nil) + require.Equal(t, api.MeshGatewayConfig{Mode: api.MeshGatewayModeDefault}, result) + }) + + t.Run("local", func(t *testing.T) { + result := connectMeshGateway(&structs.ConsulMeshGateway{Mode: "local"}) + require.Equal(t, api.MeshGatewayConfig{Mode: api.MeshGatewayModeLocal}, result) + }) + + t.Run("remote", func(t *testing.T) { + result := connectMeshGateway(&structs.ConsulMeshGateway{Mode: "remote"}) + require.Equal(t, api.MeshGatewayConfig{Mode: api.MeshGatewayModeRemote}, result) + }) + + t.Run("none", func(t *testing.T) { + result := connectMeshGateway(&structs.ConsulMeshGateway{Mode: "none"}) + require.Equal(t, api.MeshGatewayConfig{Mode: api.MeshGatewayModeNone}, result) + }) + + t.Run("nonsense", func(t *testing.T) { + result := connectMeshGateway(nil) + require.Equal(t, api.MeshGatewayConfig{Mode: api.MeshGatewayModeDefault}, result) + }) +} diff --git a/command/agent/consul/service_client.go b/command/agent/consul/service_client.go index 8bd43483836..acddb826ec8 100644 --- a/command/agent/consul/service_client.go +++ b/command/agent/consul/service_client.go @@ -14,6 +14,7 @@ import ( "github.com/armon/go-metrics" log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/helper/envoy" "github.com/pkg/errors" "github.com/hashicorp/consul/api" @@ -961,11 +962,26 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, w kind = api.ServiceKindTerminatingGateway // set the default port if bridge / default listener set if defaultBind, exists := service.Connect.Gateway.Proxy.EnvoyGatewayBindAddresses["default"]; exists { - portLabel := fmt.Sprintf("%s-%s", structs.ConnectTerminatingPrefix, service.Name) + portLabel := envoy.PortLabel(structs.ConnectTerminatingPrefix, service.Name, "") if dynPort, ok := workload.Ports.Get(portLabel); ok { defaultBind.Port = dynPort.Value } } + case service.Connect.IsMesh(): + kind = api.ServiceKindMeshGateway + // wan uses the service port label, which is typically on a discrete host_network + if wanBind, exists := service.Connect.Gateway.Proxy.EnvoyGatewayBindAddresses["wan"]; exists { + if wanPort, ok := workload.Ports.Get(service.PortLabel); ok { + wanBind.Port = wanPort.Value + } + } + // lan uses a nomad generated dynamic port on the default network + if lanBind, exists := service.Connect.Gateway.Proxy.EnvoyGatewayBindAddresses["lan"]; exists { + portLabel := envoy.PortLabel(structs.ConnectMeshPrefix, service.Name, "lan") + if dynPort, ok := workload.Ports.Get(portLabel); ok { + lanBind.Port = dynPort.Value + } + } } // Build the Consul Service registration request diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 844802ee232..507a4c95521 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -7,7 +7,7 @@ import ( "strings" "github.com/golang/snappy" - "github.com/hashicorp/nomad/api" + api "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/jobspec" "github.com/hashicorp/nomad/jobspec2" @@ -1362,6 +1362,7 @@ func apiConnectGatewayToStructs(in *api.ConsulGateway) *structs.ConsulGateway { Proxy: apiConnectGatewayProxyToStructs(in.Proxy), Ingress: apiConnectIngressGatewayToStructs(in.Ingress), Terminating: apiConnectTerminatingGatewayToStructs(in.Terminating), + Mesh: apiConnectMeshGatewayToStructs(in.Mesh), } } @@ -1494,6 +1495,13 @@ func apiConnectTerminatingServiceToStructs(in *api.ConsulLinkedService) *structs } } +func apiConnectMeshGatewayToStructs(in *api.ConsulMeshConfigEntry) *structs.ConsulMeshConfigEntry { + if in == nil { + return nil + } + return new(structs.ConsulMeshConfigEntry) +} + func apiConnectSidecarServiceToStructs(in *api.ConsulSidecarService) *structs.ConsulSidecarService { if in == nil { return nil @@ -1530,11 +1538,21 @@ func apiUpstreamsToStructs(in []*api.ConsulUpstream) []structs.ConsulUpstream { LocalBindPort: upstream.LocalBindPort, Datacenter: upstream.Datacenter, LocalBindAddress: upstream.LocalBindAddress, + MeshGateway: apiMeshGatewayToStructs(upstream.MeshGateway), } } return upstreams } +func apiMeshGatewayToStructs(in *api.ConsulMeshGateway) *structs.ConsulMeshGateway { + if in == nil { + return nil + } + return &structs.ConsulMeshGateway{ + Mode: in.Mode, + } +} + func apiConsulExposeConfigToStructs(in *api.ConsulExposeConfig) *structs.ConsulExposeConfig { if in == nil { return nil diff --git a/command/agent/job_endpoint_test.go b/command/agent/job_endpoint_test.go index 4d633bac69f..648fa620a71 100644 --- a/command/agent/job_endpoint_test.go +++ b/command/agent/job_endpoint_test.go @@ -9,7 +9,7 @@ import ( "time" "github.com/golang/snappy" - "github.com/hashicorp/nomad/api" + api "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" @@ -3138,14 +3138,23 @@ func TestConversion_apiUpstreamsToStructs(t *testing.T) { LocalBindPort: 8000, Datacenter: "dc2", LocalBindAddress: "127.0.0.2", + MeshGateway: &structs.ConsulMeshGateway{Mode: "local"}, }}, apiUpstreamsToStructs([]*api.ConsulUpstream{{ DestinationName: "upstream", LocalBindPort: 8000, Datacenter: "dc2", LocalBindAddress: "127.0.0.2", + MeshGateway: &api.ConsulMeshGateway{Mode: "local"}, }})) } +func TestConversion_apiConsulMeshGatewayToStructs(t *testing.T) { + t.Parallel() + require.Nil(t, apiMeshGatewayToStructs(nil)) + require.Equal(t, &structs.ConsulMeshGateway{Mode: "remote"}, + apiMeshGatewayToStructs(&api.ConsulMeshGateway{Mode: "remote"})) +} + func TestConversion_apiConnectSidecarServiceProxyToStructs(t *testing.T) { t.Parallel() require.Nil(t, apiConnectSidecarServiceProxyToStructs(nil)) @@ -3313,6 +3322,22 @@ func TestConversion_ApiConsulConnectToStructs(t *testing.T) { })) }) + t.Run("gateway mesh", func(t *testing.T) { + require.Equal(t, &structs.ConsulConnect{ + Gateway: &structs.ConsulGateway{ + Mesh: &structs.ConsulMeshConfigEntry{ + // nothing + }, + }, + }, ApiConsulConnectToStructs(&api.ConsulConnect{ + Gateway: &api.ConsulGateway{ + Mesh: &api.ConsulMeshConfigEntry{ + // nothing + }, + }, + })) + }) + t.Run("native", func(t *testing.T) { require.Equal(t, &structs.ConsulConnect{ Native: true, diff --git a/helper/envoy/envoy.go b/helper/envoy/envoy.go index 2915ee549b1..e07e1ccc41a 100644 --- a/helper/envoy/envoy.go +++ b/helper/envoy/envoy.go @@ -2,6 +2,10 @@ // selecting an envoy version. package envoy +import ( + "fmt" +) + const ( // SidecarMetaParam is the parameter name used to configure the connect sidecar // at the client level. Setting this option in client configuration takes the @@ -27,7 +31,7 @@ const ( // gateway proxies, when they are injected in the job connect mutator. GatewayConfigVar = "${meta." + GatewayMetaParam + "}" - // envoyImageFormat is the default format string used for official envoy Docker + // ImageFormat is the default format string used for official envoy Docker // images with the tag being the semver of the version of envoy. Nomad fakes // interpolation of ${NOMAD_envoy_version} by replacing it with the version // string for envoy that Consul reports as preferred. @@ -37,7 +41,7 @@ const ( // to something like: "custom/envoy:${NOMAD_envoy_version}". ImageFormat = "envoyproxy/envoy:v" + VersionVar - // envoyVersionVar will be replaced with the Envoy version string when + // VersionVar will be replaced with the Envoy version string when // used in the meta.connect.sidecar_image variable. VersionVar = "${NOMAD_envoy_version}" @@ -47,3 +51,13 @@ const ( // dynamic envoy versions. FallbackImage = "envoyproxy/envoy:v1.11.2@sha256:a7769160c9c1a55bb8d07a3b71ce5d64f72b1f665f10d81aa1581bc3cf850d09" ) + +// PortLabel creates a consistent port label using the inputs of a prefix, +// service name, and optional suffix. The prefix should be the Kind part of +// TaskKind the envoy is being configured for. +func PortLabel(prefix, service, suffix string) string { + if suffix == "" { + return fmt.Sprintf("%s-%s", prefix, service) + } + return fmt.Sprintf("%s-%s-%s", prefix, service, suffix) +} diff --git a/helper/envoy/envoy_test.go b/helper/envoy/envoy_test.go new file mode 100644 index 00000000000..87a979c8586 --- /dev/null +++ b/helper/envoy/envoy_test.go @@ -0,0 +1,29 @@ +package envoy + +import ( + "fmt" + "testing" + + "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" +) + +func TestEnvoy_PortLabel(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + prefix string + service string + suffix string + exp string + }{ + {prefix: structs.ConnectProxyPrefix, service: "foo", suffix: "", exp: "connect-proxy-foo"}, + {prefix: structs.ConnectMeshPrefix, service: "bar", exp: "connect-mesh-bar"}, + } { + test := fmt.Sprintf("%s_%s_%s", tc.prefix, tc.service, tc.suffix) + t.Run(test, func(t *testing.T) { + result := PortLabel(tc.prefix, tc.service, tc.suffix) + require.Equal(t, tc.exp, result) + }) + } +} diff --git a/nomad/job_endpoint_hook_connect.go b/nomad/job_endpoint_hook_connect.go index 560a993f863..878beff17bc 100644 --- a/nomad/job_endpoint_hook_connect.go +++ b/nomad/job_endpoint_hook_connect.go @@ -140,19 +140,20 @@ func getSidecarTaskForService(tg *structs.TaskGroup, svc string) *structs.Task { return nil } -func isSidecarForService(t *structs.Task, svc string) bool { - return t.Kind == structs.NewTaskKind(structs.ConnectProxyPrefix, svc) +func isSidecarForService(t *structs.Task, service string) bool { + return t.Kind == structs.NewTaskKind(structs.ConnectProxyPrefix, service) } -func hasGatewayTaskForService(tg *structs.TaskGroup, svc string) bool { +func hasGatewayTaskForService(tg *structs.TaskGroup, service string) bool { for _, t := range tg.Tasks { switch { - case isIngressGatewayForService(t, svc): + case isIngressGatewayForService(t, service): return true - case isTerminatingGatewayForService(t, svc): + case isTerminatingGatewayForService(t, service): + return true + case isMeshGatewayForService(t, service): return true } - // mesh later } return false } @@ -165,6 +166,10 @@ func isTerminatingGatewayForService(t *structs.Task, svc string) bool { return t.Kind == structs.NewTaskKind(structs.ConnectTerminatingPrefix, svc) } +func isMeshGatewayForService(t *structs.Task, svc string) bool { + return t.Kind == structs.NewTaskKind(structs.ConnectMeshPrefix, svc) +} + // getNamedTaskForNativeService retrieves the Task with the name specified in the // group service definition. If the task name is empty and there is only one task // in the group, infer the name from the only option. @@ -202,8 +207,6 @@ func injectPort(group *structs.TaskGroup, label string) { }) } -// probably need to hack this up to look for checks on the service, and if they -// qualify, configure a port for envoy to use to expose their paths. func groupConnectHook(job *structs.Job, g *structs.TaskGroup) error { // Create an environment interpolator with what we have at submission time. // This should only be used to interpolate connect service names which are @@ -251,7 +254,7 @@ func groupConnectHook(job *structs.Job, g *structs.TaskGroup) error { // create a port for the sidecar task's proxy port portLabel := service.Connect.SidecarService.Port if portLabel == "" { - portLabel = fmt.Sprintf("%s-%s", structs.ConnectProxyPrefix, service.Name) + portLabel = envoy.PortLabel(structs.ConnectProxyPrefix, service.Name, "") } injectPort(g, portLabel) @@ -286,11 +289,25 @@ func groupConnectHook(job *structs.Job, g *structs.TaskGroup) error { // reasonable to keep the magic alive here. if service.Connect.IsTerminating() && service.PortLabel == "" { // Inject a dynamic port for the terminating gateway. - portLabel := fmt.Sprintf("%s-%s", structs.ConnectTerminatingPrefix, service.Name) + portLabel := envoy.PortLabel(structs.ConnectTerminatingPrefix, service.Name, "") service.PortLabel = portLabel injectPort(g, portLabel) } + // A mesh Gateway will need 2 ports (lan and wan). + if service.Connect.IsMesh() { + + // service port is used for mesh gateway wan address - it should + // come from a configured host_network to make sense + if service.PortLabel == "" { + return errors.New("service.port must be set for mesh gateway service") + } + + // Inject a dynamic port for mesh gateway LAN address. + lanPortLabel := envoy.PortLabel(structs.ConnectMeshPrefix, service.Name, "lan") + injectPort(g, lanPortLabel) + } + // inject the gateway task only if it does not yet already exist if !hasGatewayTaskForService(g, service.Name) { prefix := service.Connect.Gateway.Prefix() @@ -375,8 +392,20 @@ func gatewayProxyForBridge(gateway *structs.ConsulGateway) *structs.ConsulGatewa Address: "0.0.0.0", Port: -1, // filled in later with dynamic port }} + case gateway.Mesh != nil: + proxy.EnvoyGatewayNoDefaultBind = true + proxy.EnvoyGatewayBindTaggedAddresses = false + proxy.EnvoyGatewayBindAddresses = map[string]*structs.ConsulGatewayBindAddress{ + "wan": { + Address: "0.0.0.0", + Port: -1, // filled in later with configured port + }, + "lan": { + Address: "0.0.0.0", + Port: -1, // filled in later with generated port + }, + } } - // later: mesh return proxy } diff --git a/nomad/job_endpoint_hook_connect_test.go b/nomad/job_endpoint_hook_connect_test.go index 6d37c238f95..6773fbedc1a 100644 --- a/nomad/job_endpoint_hook_connect_test.go +++ b/nomad/job_endpoint_hook_connect_test.go @@ -16,9 +16,9 @@ func TestJobEndpointConnect_isSidecarForService(t *testing.T) { t.Parallel() cases := []struct { - t *structs.Task // task - s string // service - r bool // result + task *structs.Task + sidecar string + exp bool }{ { &structs.Task{}, @@ -49,7 +49,7 @@ func TestJobEndpointConnect_isSidecarForService(t *testing.T) { } for _, c := range cases { - require.Equal(t, c.r, isSidecarForService(c.t, c.s)) + require.Equal(t, c.exp, isSidecarForService(c.task, c.sidecar)) } } @@ -115,17 +115,16 @@ func TestJobEndpointConnect_groupConnectHook(t *testing.T) { func TestJobEndpointConnect_groupConnectHook_IngressGateway(t *testing.T) { t.Parallel() - // Test that the connect gateway task is inserted if a gateway service exists - // and since this is a bridge network, will rewrite the default gateway proxy + // Test that the connect ingress gateway task is inserted if a gateway service + // exists and since this is a bridge network, will rewrite the default gateway proxy // block with correct configuration. job := mock.ConnectIngressGatewayJob("bridge", false) - job.Meta = map[string]string{ "gateway_name": "my-gateway", } - job.TaskGroups[0].Services[0].Name = "${NOMAD_META_gateway_name}" + // setup expectations expTG := job.TaskGroups[0].Copy() expTG.Tasks = []*structs.Task{ // inject the gateway task @@ -153,11 +152,9 @@ func TestJobEndpointConnect_groupConnectHook_IngressGateway_CustomTask(t *testin // and since this is a bridge network, will rewrite the default gateway proxy // block with correct configuration. job := mock.ConnectIngressGatewayJob("bridge", false) - job.Meta = map[string]string{ "gateway_name": "my-gateway", } - job.TaskGroups[0].Services[0].Name = "${NOMAD_META_gateway_name}" job.TaskGroups[0].Services[0].Connect.SidecarTask = &structs.SidecarTask{ Driver: "raw_exec", @@ -173,6 +170,7 @@ func TestJobEndpointConnect_groupConnectHook_IngressGateway_CustomTask(t *testin KillSignal: "SIGHUP", } + // setup expectations expTG := job.TaskGroups[0].Copy() expTG.Tasks = []*structs.Task{ // inject merged gateway task @@ -215,6 +213,80 @@ func TestJobEndpointConnect_groupConnectHook_IngressGateway_CustomTask(t *testin require.Exactly(t, expTG, job.TaskGroups[0]) } +func TestJobEndpointConnect_groupConnectHook_TerminatingGateway(t *testing.T) { + t.Parallel() + + // Tests that the connect terminating gateway task is inserted if a gateway + // service exists and since this is a bridge network, will rewrite the default + // gateway proxy block with correct configuration. + job := mock.ConnectTerminatingGatewayJob("bridge", false) + job.Meta = map[string]string{ + "gateway_name": "my-gateway", + } + job.TaskGroups[0].Services[0].Name = "${NOMAD_META_gateway_name}" + + // setup expectations + expTG := job.TaskGroups[0].Copy() + expTG.Tasks = []*structs.Task{ + // inject the gateway task + newConnectGatewayTask(structs.ConnectTerminatingPrefix, "my-gateway", false), + } + expTG.Services[0].Name = "my-gateway" + expTG.Tasks[0].Canonicalize(job, expTG) + expTG.Networks[0].Canonicalize() + + // rewrite the service gateway proxy configuration + expTG.Services[0].Connect.Gateway.Proxy = gatewayProxyForBridge(expTG.Services[0].Connect.Gateway) + + require.NoError(t, groupConnectHook(job, job.TaskGroups[0])) + require.Exactly(t, expTG, job.TaskGroups[0]) + + // Test that the hook is idempotent + require.NoError(t, groupConnectHook(job, job.TaskGroups[0])) + require.Exactly(t, expTG, job.TaskGroups[0]) +} + +func TestJobEndpointConnect_groupConnectHook_MeshGateway(t *testing.T) { + t.Parallel() + + // Test that the connect mesh gateway task is inserted if a gateway service + // exists and since this is a bridge network, will rewrite the default gateway + // proxy block with correct configuration, injecting a dynamic port for use + // by the envoy lan listener. + job := mock.ConnectMeshGatewayJob("bridge", false) + job.Meta = map[string]string{ + "gateway_name": "my-gateway", + } + job.TaskGroups[0].Services[0].Name = "${NOMAD_META_gateway_name}" + + // setup expectations + expTG := job.TaskGroups[0].Copy() + expTG.Tasks = []*structs.Task{ + // inject the gateway task + newConnectGatewayTask(structs.ConnectMeshPrefix, "my-gateway", false), + } + expTG.Services[0].Name = "my-gateway" + expTG.Services[0].PortLabel = "public_port" + expTG.Networks[0].DynamicPorts = []structs.Port{{ + Label: "connect-mesh-my-gateway-lan", + Value: 0, + To: -1, + HostNetwork: "default", + }} + expTG.Tasks[0].Canonicalize(job, expTG) + expTG.Networks[0].Canonicalize() + + // rewrite the service gateway proxy configuration + expTG.Services[0].Connect.Gateway.Proxy = gatewayProxyForBridge(expTG.Services[0].Connect.Gateway) + + require.NoError(t, groupConnectHook(job, job.TaskGroups[0])) + require.Exactly(t, expTG, job.TaskGroups[0]) + + // Test that the hook is idempotent + require.NoError(t, groupConnectHook(job, job.TaskGroups[0])) + require.Exactly(t, expTG, job.TaskGroups[0]) +} + // TestJobEndpoint_ConnectInterpolation asserts that when a Connect sidecar // proxy task is being created for a group service with an interpolated name, // the service name is interpolated *before the task is created. @@ -393,6 +465,8 @@ func TestJobEndpointConnect_groupConnectGatewayValidate(t *testing.T) { } func TestJobEndpointConnect_newConnectGatewayTask_host(t *testing.T) { + t.Parallel() + t.Run("ingress", func(t *testing.T) { task := newConnectGatewayTask(structs.ConnectIngressPrefix, "foo", true) require.Equal(t, "connect-ingress-foo", task.Name) @@ -413,11 +487,15 @@ func TestJobEndpointConnect_newConnectGatewayTask_host(t *testing.T) { } func TestJobEndpointConnect_newConnectGatewayTask_bridge(t *testing.T) { + t.Parallel() + task := newConnectGatewayTask(structs.ConnectIngressPrefix, "service1", false) require.NotContains(t, task.Config, "network_mode") } func TestJobEndpointConnect_hasGatewayTaskForService(t *testing.T) { + t.Parallel() + t.Run("no gateway task", func(t *testing.T) { result := hasGatewayTaskForService(&structs.TaskGroup{ Name: "group", @@ -450,9 +528,22 @@ func TestJobEndpointConnect_hasGatewayTaskForService(t *testing.T) { }, "my-service") require.True(t, result) }) + + t.Run("has mesh task", func(t *testing.T) { + result := hasGatewayTaskForService(&structs.TaskGroup{ + Name: "group", + Tasks: []*structs.Task{{ + Name: "mesh-gateway-my-service", + Kind: structs.NewTaskKind(structs.ConnectMeshPrefix, "my-service"), + }}, + }, "my-service") + require.True(t, result) + }) } func TestJobEndpointConnect_gatewayProxyIsDefault(t *testing.T) { + t.Parallel() + t.Run("nil", func(t *testing.T) { result := gatewayProxyIsDefault(nil) require.True(t, result) @@ -494,6 +585,8 @@ func TestJobEndpointConnect_gatewayProxyIsDefault(t *testing.T) { } func TestJobEndpointConnect_gatewayBindAddresses(t *testing.T) { + t.Parallel() + t.Run("nil", func(t *testing.T) { result := gatewayBindAddressesIngress(nil) @@ -559,6 +652,8 @@ func TestJobEndpointConnect_gatewayBindAddresses(t *testing.T) { } func TestJobEndpointConnect_gatewayProxyForBridge(t *testing.T) { + t.Parallel() + t.Run("nil", func(t *testing.T) { result := gatewayProxyForBridge(nil) require.Nil(t, result) @@ -634,6 +729,7 @@ func TestJobEndpointConnect_gatewayProxyForBridge(t *testing.T) { }, }) require.Equal(t, &structs.ConsulGatewayProxy{ + ConnectTimeout: nil, Config: map[string]interface{}{"foo": 1}, EnvoyGatewayNoDefaultBind: false, EnvoyGatewayBindTaggedAddresses: true, @@ -672,6 +768,69 @@ func TestJobEndpointConnect_gatewayProxyForBridge(t *testing.T) { }) t.Run("terminating leave as-is", func(t *testing.T) { - // + result := gatewayProxyForBridge(&structs.ConsulGateway{ + Proxy: &structs.ConsulGatewayProxy{ + Config: map[string]interface{}{"foo": 1}, + EnvoyGatewayBindTaggedAddresses: true, + }, + Terminating: &structs.ConsulTerminatingConfigEntry{ + Services: []*structs.ConsulLinkedService{{ + Name: "service1", + }}, + }, + }) + require.Equal(t, &structs.ConsulGatewayProxy{ + ConnectTimeout: nil, + Config: map[string]interface{}{"foo": 1}, + EnvoyGatewayNoDefaultBind: false, + EnvoyGatewayBindTaggedAddresses: true, + EnvoyGatewayBindAddresses: nil, + }, result) + }) + + t.Run("mesh set defaults", func(t *testing.T) { + result := gatewayProxyForBridge(&structs.ConsulGateway{ + Proxy: &structs.ConsulGatewayProxy{ + ConnectTimeout: helper.TimeToPtr(2 * time.Second), + }, + Mesh: &structs.ConsulMeshConfigEntry{ + // nothing + }, + }) + require.Equal(t, &structs.ConsulGatewayProxy{ + ConnectTimeout: helper.TimeToPtr(2 * time.Second), + EnvoyGatewayNoDefaultBind: true, + EnvoyGatewayBindTaggedAddresses: false, + EnvoyGatewayBindAddresses: map[string]*structs.ConsulGatewayBindAddress{ + "lan": { + Address: "0.0.0.0", + Port: -1, + }, + "wan": { + Address: "0.0.0.0", + Port: -1, + }, + }, + }, result) }) + + t.Run("mesh leave as-is", func(t *testing.T) { + result := gatewayProxyForBridge(&structs.ConsulGateway{ + Proxy: &structs.ConsulGatewayProxy{ + Config: map[string]interface{}{"foo": 1}, + EnvoyGatewayBindTaggedAddresses: true, + }, + Mesh: &structs.ConsulMeshConfigEntry{ + // nothing + }, + }) + require.Equal(t, &structs.ConsulGatewayProxy{ + ConnectTimeout: nil, + Config: map[string]interface{}{"foo": 1}, + EnvoyGatewayNoDefaultBind: false, + EnvoyGatewayBindTaggedAddresses: true, + EnvoyGatewayBindAddresses: nil, + }, result) + }) + } diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 6a54a60d085..ea9cb512900 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -899,6 +899,9 @@ func ConnectIngressGatewayJob(mode string, inject bool) *structs.Job { }, }, }} + + tg.Tasks = nil + // some tests need to assume the gateway proxy task has already been injected if inject { tg.Tasks = []*structs.Task{{ @@ -912,9 +915,102 @@ func ConnectIngressGatewayJob(mode string, inject bool) *structs.Job { MaxFileSizeMB: 2, }, }} - } else { - // otherwise there are no tasks in the group yet - tg.Tasks = nil + } + return job +} + +// ConnectTerminatingGatewayJob creates a structs.Job that contains the definition +// of a Consul Terminating Gateway service. The mode is the name of the network mode +// assumed by the task group. If inject is true, a corresponding task is set on the +// group's Tasks (i.e. what the job would look like after mutation). +func ConnectTerminatingGatewayJob(mode string, inject bool) *structs.Job { + job := Job() + tg := job.TaskGroups[0] + tg.Networks = []*structs.NetworkResource{{ + Mode: mode, + }} + tg.Services = []*structs.Service{{ + Name: "my-terminating-service", + PortLabel: "9999", + Connect: &structs.ConsulConnect{ + Gateway: &structs.ConsulGateway{ + Proxy: &structs.ConsulGatewayProxy{ + ConnectTimeout: helper.TimeToPtr(3 * time.Second), + EnvoyGatewayBindAddresses: make(map[string]*structs.ConsulGatewayBindAddress), + }, + Terminating: &structs.ConsulTerminatingConfigEntry{ + Services: []*structs.ConsulLinkedService{{ + Name: "service1", + CAFile: "/ssl/ca_file", + CertFile: "/ssl/cert_file", + KeyFile: "/ssl/key_file", + SNI: "sni-name", + }}, + }, + }, + }, + }} + + tg.Tasks = nil + + // some tests need to assume the gateway proxy task has already been injected + if inject { + tg.Tasks = []*structs.Task{{ + Name: fmt.Sprintf("%s-%s", structs.ConnectTerminatingPrefix, "my-terminating-service"), + Kind: structs.NewTaskKind(structs.ConnectTerminatingPrefix, "my-terminating-service"), + Driver: "docker", + Config: make(map[string]interface{}), + ShutdownDelay: 5 * time.Second, + LogConfig: &structs.LogConfig{ + MaxFiles: 2, + MaxFileSizeMB: 2, + }, + }} + } + return job +} + +// ConnectMeshGatewayJob creates a structs.Job that contains the definition of a +// Consul Mesh Gateway service. The mode is the name of the network mode assumed +// by the task group. If inject is true, a corresponding task is set on the group's +// Tasks (i.e. what the job would look like after job mutation). +func ConnectMeshGatewayJob(mode string, inject bool) *structs.Job { + job := Job() + tg := job.TaskGroups[0] + tg.Networks = []*structs.NetworkResource{{ + Mode: mode, + }} + tg.Services = []*structs.Service{{ + Name: "my-mesh-service", + PortLabel: "public_port", + Connect: &structs.ConsulConnect{ + Gateway: &structs.ConsulGateway{ + Proxy: &structs.ConsulGatewayProxy{ + ConnectTimeout: helper.TimeToPtr(3 * time.Second), + EnvoyGatewayBindAddresses: make(map[string]*structs.ConsulGatewayBindAddress), + }, + Mesh: &structs.ConsulMeshConfigEntry{ + // nothing to configure + }, + }, + }, + }} + + tg.Tasks = nil + + // some tests need to assume the gateway task has already been injected + if inject { + tg.Tasks = []*structs.Task{{ + Name: fmt.Sprintf("%s-%s", structs.ConnectMeshPrefix, "my-mesh-service"), + Kind: structs.NewTaskKind(structs.ConnectMeshPrefix, "my-mesh-service"), + Driver: "docker", + Config: make(map[string]interface{}), + ShutdownDelay: 5 * time.Second, + LogConfig: &structs.LogConfig{ + MaxFiles: 2, + MaxFileSizeMB: 2, + }, + }} } return job } diff --git a/nomad/structs/connect_test.go b/nomad/structs/connect_test.go new file mode 100644 index 00000000000..385716fe8ba --- /dev/null +++ b/nomad/structs/connect_test.go @@ -0,0 +1,21 @@ +package structs + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestTaskKind_IsAnyConnectGateway(t *testing.T) { + t.Run("gateways", func(t *testing.T) { + require.True(t, NewTaskKind(ConnectIngressPrefix, "foo").IsAnyConnectGateway()) + require.True(t, NewTaskKind(ConnectTerminatingPrefix, "foo").IsAnyConnectGateway()) + require.True(t, NewTaskKind(ConnectMeshPrefix, "foo").IsAnyConnectGateway()) + }) + + t.Run("not gateways", func(t *testing.T) { + require.False(t, NewTaskKind(ConnectProxyPrefix, "foo").IsAnyConnectGateway()) + require.False(t, NewTaskKind(ConnectNativePrefix, "foo").IsAnyConnectGateway()) + require.False(t, NewTaskKind("", "foo").IsAnyConnectGateway()) + }) +} diff --git a/nomad/structs/diff.go b/nomad/structs/diff.go index 544af3e630d..24d1f242e14 100644 --- a/nomad/structs/diff.go +++ b/nomad/structs/diff.go @@ -849,6 +849,32 @@ func connectGatewayDiff(prev, next *ConsulGateway, contextual bool) *ObjectDiff diff.Objects = append(diff.Objects, gatewayTerminatingDiff) } + // Diff the mesh gateway fields. + gatewayMeshDiff := connectGatewayMeshDiff(prev.Mesh, next.Mesh, contextual) + if gatewayMeshDiff != nil { + diff.Objects = append(diff.Objects, gatewayMeshDiff) + } + + return diff +} + +func connectGatewayMeshDiff(prev, next *ConsulMeshConfigEntry, contextual bool) *ObjectDiff { + diff := &ObjectDiff{Type: DiffTypeNone, Name: "Mesh"} + + if reflect.DeepEqual(prev, next) { + return nil + } else if prev == nil { + // no fields to further diff + diff.Type = DiffTypeAdded + } else if next == nil { + // no fields to further diff + diff.Type = DiffTypeDeleted + } else { + diff.Type = DiffTypeEdited + } + + // Currently no fields in mesh gateways. + return diff } @@ -1326,18 +1352,15 @@ func consulProxyDiff(old, new *ConsulProxy, contextual bool) *ObjectDiff { newPrimitiveFlat = flatmap.Flatten(new, nil, true) } - // Diff the primitive fields. + // diff the primitive fields diff.Fields = fieldDiffs(oldPrimitiveFlat, newPrimitiveFlat, contextual) - consulUpstreamsDiff := primitiveObjectSetDiff( - interfaceSlice(old.Upstreams), - interfaceSlice(new.Upstreams), - nil, "ConsulUpstreams", contextual) - if consulUpstreamsDiff != nil { - diff.Objects = append(diff.Objects, consulUpstreamsDiff...) + // diff the consul upstream slices + if upDiffs := consulProxyUpstreamsDiff(old.Upstreams, new.Upstreams, contextual); upDiffs != nil { + diff.Objects = append(diff.Objects, upDiffs...) } - // Config diff + // diff the config blob if cDiff := configDiff(old.Config, new.Config, contextual); cDiff != nil { diff.Objects = append(diff.Objects, cDiff) } @@ -1345,6 +1368,74 @@ func consulProxyDiff(old, new *ConsulProxy, contextual bool) *ObjectDiff { return diff } +// consulProxyUpstreamsDiff diffs a set of connect upstreams. If contextual diff is +// enabled, unchanged fields within objects nested in the tasks will be returned. +func consulProxyUpstreamsDiff(old, new []ConsulUpstream, contextual bool) []*ObjectDiff { + oldMap := make(map[string]ConsulUpstream, len(old)) + newMap := make(map[string]ConsulUpstream, len(new)) + + idx := func(up ConsulUpstream) string { + return fmt.Sprintf("%s/%s", up.Datacenter, up.DestinationName) + } + + for _, o := range old { + oldMap[idx(o)] = o + } + for _, n := range new { + newMap[idx(n)] = n + } + + var diffs []*ObjectDiff + for index, oldUpstream := range oldMap { + // Diff the same, deleted, and edited + if diff := consulProxyUpstreamDiff(oldUpstream, newMap[index], contextual); diff != nil { + diffs = append(diffs, diff) + } + } + + for index, newUpstream := range newMap { + // diff the added + if oldUpstream, exists := oldMap[index]; !exists { + if diff := consulProxyUpstreamDiff(oldUpstream, newUpstream, contextual); diff != nil { + diffs = append(diffs, diff) + } + } + } + sort.Sort(ObjectDiffs(diffs)) + return diffs +} + +func consulProxyUpstreamDiff(prev, next ConsulUpstream, contextual bool) *ObjectDiff { + diff := &ObjectDiff{Type: DiffTypeNone, Name: "ConsulUpstreams"} + var oldPrimFlat, newPrimFlat map[string]string + + if reflect.DeepEqual(prev, next) { + return nil + } else if prev.Equals(new(ConsulUpstream)) { + prev = ConsulUpstream{} + diff.Type = DiffTypeAdded + newPrimFlat = flatmap.Flatten(next, nil, true) + } else if next.Equals(new(ConsulUpstream)) { + next = ConsulUpstream{} + diff.Type = DiffTypeDeleted + oldPrimFlat = flatmap.Flatten(prev, nil, true) + } else { + diff.Type = DiffTypeEdited + oldPrimFlat = flatmap.Flatten(prev, nil, true) + newPrimFlat = flatmap.Flatten(next, nil, true) + } + + // diff the primitive fields + diff.Fields = fieldDiffs(oldPrimFlat, newPrimFlat, contextual) + + // diff the mesh gateway primitive object + if mDiff := primitiveObjectDiff(prev.MeshGateway, next.MeshGateway, nil, "MeshGateway", contextual); mDiff != nil { + diff.Objects = append(diff.Objects, mDiff) + } + + return diff +} + // serviceCheckDiffs diffs a set of service checks. If contextual diff is // enabled, unchanged fields within objects nested in the tasks will be // returned. diff --git a/nomad/structs/diff_test.go b/nomad/structs/diff_test.go index 7e21be5544b..ee6a7d06e17 100644 --- a/nomad/structs/diff_test.go +++ b/nomad/structs/diff_test.go @@ -2742,6 +2742,9 @@ func TestTaskGroupDiff(t *testing.T) { SNI: "linked1.consul", }}, }, + Mesh: &ConsulMeshConfigEntry{ + // nothing + }, }, }, }, @@ -2785,6 +2788,9 @@ func TestTaskGroupDiff(t *testing.T) { LocalBindPort: 8000, Datacenter: "dc2", LocalBindAddress: "127.0.0.2", + MeshGateway: &ConsulMeshGateway{ + Mode: "remote", + }, }, }, Config: map[string]interface{}{ @@ -2830,6 +2836,9 @@ func TestTaskGroupDiff(t *testing.T) { SNI: "linked2.consul", }}, }, + Mesh: &ConsulMeshConfigEntry{ + // nothing + }, }, }, }, @@ -3104,6 +3113,20 @@ func TestTaskGroupDiff(t *testing.T) { New: "8000", }, }, + Objects: []*ObjectDiff{ + { + Type: DiffTypeAdded, + Name: "MeshGateway", + Fields: []*FieldDiff{ + { + Type: DiffTypeAdded, + Name: "Mode", + Old: "", + New: "remote", + }, + }, + }, + }, }, { Type: DiffTypeAdded, diff --git a/nomad/structs/services.go b/nomad/structs/services.go index 51202a89fde..8b1afa61e30 100644 --- a/nomad/structs/services.go +++ b/nomad/structs/services.go @@ -829,7 +829,9 @@ func (c *ConsulConnect) IsTerminating() bool { return c.IsGateway() && c.Gateway.Terminating != nil } -// also mesh +func (c *ConsulConnect) IsMesh() bool { + return c.IsGateway() && c.Gateway.Mesh != nil +} // Validate that the Connect block represents exactly one of: // - Connect non-native service sidecar proxy @@ -1221,6 +1223,56 @@ func (p *ConsulProxy) Equals(o *ConsulProxy) bool { return true } +// ConsulMeshGateway is used to configure mesh gateway usage when connecting to +// a connect upstream in another datacenter. +type ConsulMeshGateway struct { + // Mode configures how an upstream should be accessed with regard to using + // mesh gateways. + // + // local - the connect proxy makes outbound connections through mesh gateway + // originating in the same datacenter. + // + // remote - the connect proxy makes outbound connections to a mesh gateway + // in the destination datacenter. + // + // none (default) - no mesh gateway is used, the proxy makes outbound connections + // directly to destination services. + // + // https://www.consul.io/docs/connect/gateways/mesh-gateway#modes-of-operation + Mode string +} + +func (c *ConsulMeshGateway) Copy() *ConsulMeshGateway { + if c == nil { + return nil + } + + return &ConsulMeshGateway{ + Mode: c.Mode, + } +} + +func (c *ConsulMeshGateway) Equals(o *ConsulMeshGateway) bool { + if c == nil || o == nil { + return c == o + } + + return c.Mode == o.Mode +} + +func (c *ConsulMeshGateway) Validate() error { + if c == nil { + return nil + } + + switch c.Mode { + case "local", "remote", "none": + return nil + default: + return fmt.Errorf("Connect mesh_gateway mode %q not supported", c.Mode) + } +} + // ConsulUpstream represents a Consul Connect upstream jobspec stanza. type ConsulUpstream struct { // DestinationName is the name of the upstream service. @@ -1236,6 +1288,10 @@ type ConsulUpstream struct { // LocalBindAddress is the address the proxy will receive connections for the // upstream on. LocalBindAddress string + + // MeshGateway is the optional configuration of the mesh gateway for this + // upstream to use. + MeshGateway *ConsulMeshGateway } func upstreamsEquals(a, b []ConsulUpstream) bool { @@ -1266,6 +1322,7 @@ func (u *ConsulUpstream) Copy() *ConsulUpstream { LocalBindPort: u.LocalBindPort, Datacenter: u.Datacenter, LocalBindAddress: u.LocalBindAddress, + MeshGateway: u.MeshGateway.Copy(), } } @@ -1275,10 +1332,23 @@ func (u *ConsulUpstream) Equals(o *ConsulUpstream) bool { return u == o } - return (*u) == (*o) + switch { + case u.DestinationName != o.DestinationName: + return false + case u.LocalBindPort != o.LocalBindPort: + return false + case u.Datacenter != o.Datacenter: + return false + case u.LocalBindAddress != o.LocalBindAddress: + return false + case !u.MeshGateway.Equals(o.MeshGateway): + return false + } + + return true } -// ExposeConfig represents a Consul Connect expose jobspec stanza. +// ConsulExposeConfig represents a Consul Connect expose jobspec stanza. type ConsulExposeConfig struct { // Use json tag to match with field name in api/ Paths []ConsulExposePath `json:"Path"` @@ -1341,18 +1411,19 @@ type ConsulGateway struct { // Terminating represents the Consul Configuration Entry for a Terminating Gateway. Terminating *ConsulTerminatingConfigEntry - // Mesh is not yet supported. - // Mesh *ConsulMeshConfigEntry + // Mesh indicates the Consul service should be a Mesh Gateway. + Mesh *ConsulMeshConfigEntry } func (g *ConsulGateway) Prefix() string { switch { + case g.Mesh != nil: + return ConnectMeshPrefix case g.Ingress != nil: return ConnectIngressPrefix default: return ConnectTerminatingPrefix } - // also mesh } func (g *ConsulGateway) Copy() *ConsulGateway { @@ -1364,6 +1435,7 @@ func (g *ConsulGateway) Copy() *ConsulGateway { Proxy: g.Proxy.Copy(), Ingress: g.Ingress.Copy(), Terminating: g.Terminating.Copy(), + Mesh: g.Mesh.Copy(), } } @@ -1384,6 +1456,10 @@ func (g *ConsulGateway) Equals(o *ConsulGateway) bool { return false } + if !g.Mesh.Equals(o.Mesh) { + return false + } + return true } @@ -1404,7 +1480,11 @@ func (g *ConsulGateway) Validate() error { return err } - // Exactly 1 of ingress/terminating/mesh(soon) must be set. + if err := g.Mesh.Validate(); err != nil { + return err + } + + // Exactly 1 of ingress/terminating/mesh must be set. count := 0 if g.Ingress != nil { count++ @@ -1412,8 +1492,11 @@ func (g *ConsulGateway) Validate() error { if g.Terminating != nil { count++ } + if g.Mesh != nil { + count++ + } if count != 1 { - return fmt.Errorf("One Consul Gateway Configuration Entry must be set") + return fmt.Errorf("One Consul Gateway Configuration must be set") } return nil } @@ -1612,11 +1695,7 @@ func (c *ConsulGatewayTLSConfig) Equals(o *ConsulGatewayTLSConfig) bool { // ConsulIngressService is used to configure a service fronted by the ingress gateway. type ConsulIngressService struct { - // Namespace is not yet supported. - // Namespace string - - Name string - + Name string Hosts []string } @@ -1776,9 +1855,6 @@ COMPARE: // order does not matter // // https://www.consul.io/docs/agent/config-entries/ingress-gateway#available-fields type ConsulIngressConfigEntry struct { - // Namespace is not yet supported. - // Namespace string - TLS *ConsulGatewayTLSConfig Listeners []*ConsulIngressListener } @@ -1939,9 +2015,6 @@ COMPARE: // order does not matter } type ConsulTerminatingConfigEntry struct { - // Namespace is not yet supported. - // Namespace string - Services []*ConsulLinkedService } @@ -1988,3 +2061,30 @@ func (e *ConsulTerminatingConfigEntry) Validate() error { return nil } + +// ConsulMeshConfigEntry is a stub used to represent that the gateway service +// type should be for a Mesh Gateway. Unlike Ingress and Terminating, there is no +// dedicated Consul Config Entry type for "mesh-gateway", for now. We still +// create a type for future proofing, and to keep underlying job-spec marshaling +// consistent with the other types. +type ConsulMeshConfigEntry struct { + // nothing in here +} + +func (e *ConsulMeshConfigEntry) Copy() *ConsulMeshConfigEntry { + if e == nil { + return nil + } + return new(ConsulMeshConfigEntry) +} + +func (e *ConsulMeshConfigEntry) Equals(o *ConsulMeshConfigEntry) bool { + if e == nil || o == nil { + return e == o + } + return true +} + +func (e *ConsulMeshConfigEntry) Validate() error { + return nil +} diff --git a/nomad/structs/services_test.go b/nomad/structs/services_test.go index 6535f8c2e2f..49e31b502dc 100644 --- a/nomad/structs/services_test.go +++ b/nomad/structs/services_test.go @@ -522,6 +522,12 @@ func TestConsulUpstream_upstreamEquals(t *testing.T) { require.False(t, upstreamsEquals(a, b)) }) + t.Run("different mesh_gateway", func(t *testing.T) { + a := []ConsulUpstream{{DestinationName: "foo", MeshGateway: &ConsulMeshGateway{Mode: "local"}}} + b := []ConsulUpstream{{DestinationName: "foo", MeshGateway: &ConsulMeshGateway{Mode: "remote"}}} + require.False(t, upstreamsEquals(a, b)) + }) + t.Run("identical", func(t *testing.T) { a := []ConsulUpstream{up("foo", 8000), up("bar", 9000)} b := []ConsulUpstream{up("foo", 8000), up("bar", 9000)} @@ -682,6 +688,15 @@ var ( }}, }, } + + consulMeshGateway1 = &ConsulGateway{ + Proxy: &ConsulGatewayProxy{ + ConnectTimeout: helper.TimeToPtr(1 * time.Second), + }, + Mesh: &ConsulMeshConfigEntry{ + // nothing + }, + } ) func TestConsulGateway_Prefix(t *testing.T) { @@ -695,7 +710,10 @@ func TestConsulGateway_Prefix(t *testing.T) { require.Equal(t, ConnectTerminatingPrefix, result) }) - // also mesh + t.Run("mesh", func(t *testing.T) { + result := (&ConsulGateway{Mesh: new(ConsulMeshConfigEntry)}).Prefix() + require.Equal(t, ConnectMeshPrefix, result) + }) } func TestConsulGateway_Copy(t *testing.T) { @@ -720,6 +738,29 @@ func TestConsulGateway_Copy(t *testing.T) { require.True(t, result.Equals(consulTerminatingGateway1)) require.True(t, consulTerminatingGateway1.Equals(result)) }) + + t.Run("as mesh", func(t *testing.T) { + result := consulMeshGateway1.Copy() + require.Equal(t, consulMeshGateway1, result) + require.True(t, result.Equals(consulMeshGateway1)) + require.True(t, consulMeshGateway1.Equals(result)) + }) +} + +func TestConsulGateway_Equals_mesh(t *testing.T) { + t.Parallel() + + t.Run("nil", func(t *testing.T) { + a := (*ConsulGateway)(nil) + b := (*ConsulGateway)(nil) + require.True(t, a.Equals(b)) + require.False(t, a.Equals(consulMeshGateway1)) + require.False(t, consulMeshGateway1.Equals(a)) + }) + + t.Run("reflexive", func(t *testing.T) { + require.True(t, consulMeshGateway1.Equals(consulMeshGateway1)) + }) } func TestConsulGateway_Equals_ingress(t *testing.T) { @@ -962,8 +1003,9 @@ func TestConsulGateway_Validate(t *testing.T) { err := (&ConsulGateway{ Ingress: nil, Terminating: nil, + Mesh: nil, }).Validate() - require.EqualError(t, err, "One Consul Gateway Configuration Entry must be set") + require.EqualError(t, err, "One Consul Gateway Configuration must be set") }) t.Run("multiple config entries set", func(t *testing.T) { @@ -983,7 +1025,14 @@ func TestConsulGateway_Validate(t *testing.T) { }}, }, }).Validate() - require.EqualError(t, err, "One Consul Gateway Configuration Entry must be set") + require.EqualError(t, err, "One Consul Gateway Configuration must be set") + }) + + t.Run("ok mesh", func(t *testing.T) { + err := (&ConsulGateway{ + Mesh: new(ConsulMeshConfigEntry), + }).Validate() + require.NoError(t, err) }) } @@ -1227,7 +1276,7 @@ func TestConsulLinkedService_Validate(t *testing.T) { require.EqualError(t, err, "Consul Linked Service requires Name") }) - t.Run("missing cafile", func(t *testing.T) { + t.Run("missing ca_file", func(t *testing.T) { err := (&ConsulLinkedService{ Name: "linked-service1", CertFile: "cert_file.pem", @@ -1245,7 +1294,7 @@ func TestConsulLinkedService_Validate(t *testing.T) { require.EqualError(t, err, "Consul Linked Service TLS Cert and Key must both be set") }) - t.Run("sni without cafile", func(t *testing.T) { + t.Run("sni without ca_file", func(t *testing.T) { err := (&ConsulLinkedService{ Name: "linked-service1", SNI: "service.consul", @@ -1339,7 +1388,7 @@ func TestConsulLinkedService_linkedServicesEqual(t *testing.T) { require.True(t, linkedServicesEqual(services, reversed)) different := []*ConsulLinkedService{ - services[0], &ConsulLinkedService{ + services[0], { Name: "service2", CAFile: "ca.pem", SNI: "service2.consul", @@ -1382,3 +1431,44 @@ func TestConsulTerminatingConfigEntry_Validate(t *testing.T) { require.NoError(t, err) }) } + +func TestConsulMeshGateway_Copy(t *testing.T) { + t.Parallel() + + require.Nil(t, (*ConsulMeshGateway)(nil)) + require.Equal(t, &ConsulMeshGateway{ + Mode: "remote", + }, &ConsulMeshGateway{ + Mode: "remote", + }) +} + +func TestConsulMeshGateway_Equals(t *testing.T) { + t.Parallel() + + c := &ConsulMeshGateway{Mode: "local"} + require.False(t, c.Equals(nil)) + require.True(t, c.Equals(c)) + + o := &ConsulMeshGateway{Mode: "remote"} + require.False(t, c.Equals(o)) +} + +func TestConsulMeshGateway_Validate(t *testing.T) { + t.Parallel() + + t.Run("nil", func(t *testing.T) { + err := (*ConsulMeshGateway)(nil).Validate() + require.NoError(t, err) + }) + + t.Run("mode invalid", func(t *testing.T) { + err := (&ConsulMeshGateway{Mode: "banana"}).Validate() + require.EqualError(t, err, `Connect mesh_gateway mode "banana" not supported`) + }) + + t.Run("ok", func(t *testing.T) { + err := (&ConsulMeshGateway{Mode: "local"}).Validate() + require.NoError(t, err) + }) +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 418d48e5d59..74ab5b73314 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -7209,20 +7209,31 @@ func (k TaskKind) IsConnectNative() bool { return k.hasPrefix(ConnectNativePrefix) } +// IsConnectIngress returns true if the TaskKind is connect-ingress. func (k TaskKind) IsConnectIngress() bool { return k.hasPrefix(ConnectIngressPrefix) } +// IsConnectTerminating returns true if the TaskKind is connect-terminating. func (k TaskKind) IsConnectTerminating() bool { return k.hasPrefix(ConnectTerminatingPrefix) } +// IsConnectMesh returns true if the TaskKind is connect-mesh. +func (k TaskKind) IsConnectMesh() bool { + return k.hasPrefix(ConnectMeshPrefix) +} + +// IsAnyConnectGateway returns true if the TaskKind represents any one of the +// supported connect gateway types. func (k TaskKind) IsAnyConnectGateway() bool { switch { case k.IsConnectIngress(): return true case k.IsConnectTerminating(): return true + case k.IsConnectMesh(): + return true default: return false } @@ -7243,14 +7254,11 @@ const ( // ConnectTerminatingPrefix is the prefix used for fields referencing a Consul // Connect Terminating Gateway Proxy. - // ConnectTerminatingPrefix = "connect-terminating" // ConnectMeshPrefix is the prefix used for fields referencing a Consul Connect // Mesh Gateway Proxy. - // - // Not yet supported. - // ConnectMeshPrefix = "connect-mesh" + ConnectMeshPrefix = "connect-mesh" ) // ValidateConnectProxyService checks that the service that is being diff --git a/vendor/github.com/hashicorp/nomad/api/services.go b/vendor/github.com/hashicorp/nomad/api/services.go index 880ba7f7973..365bf848463 100644 --- a/vendor/github.com/hashicorp/nomad/api/services.go +++ b/vendor/github.com/hashicorp/nomad/api/services.go @@ -281,17 +281,81 @@ func (cp *ConsulProxy) Canonicalize() { cp.Upstreams = nil } + for i := 0; i < len(cp.Upstreams); i++ { + cp.Upstreams[i].Canonicalize() + } + if len(cp.Config) == 0 { cp.Config = nil } } +// ConsulMeshGateway is used to configure mesh gateway usage when connecting to +// a connect upstream in another datacenter. +type ConsulMeshGateway struct { + // Mode configures how an upstream should be accessed with regard to using + // mesh gateways. + // + // local - the connect proxy makes outbound connections through mesh gateway + // originating in the same datacenter. + // + // remote - the connect proxy makes outbound connections to a mesh gateway + // in the destination datacenter. + // + // none (default) - no mesh gateway is used, the proxy makes outbound connections + // directly to destination services. + // + // https://www.consul.io/docs/connect/gateways/mesh-gateway#modes-of-operation + Mode string `mapstructure:"mode" hcl:"mode,optional"` +} + +func (c *ConsulMeshGateway) Canonicalize() { + if c == nil { + return + } + + if c.Mode == "" { + c.Mode = "none" + } +} + +func (c *ConsulMeshGateway) Copy() *ConsulMeshGateway { + if c == nil { + return nil + } + + return &ConsulMeshGateway{ + Mode: c.Mode, + } +} + // ConsulUpstream represents a Consul Connect upstream jobspec stanza. type ConsulUpstream struct { - DestinationName string `mapstructure:"destination_name" hcl:"destination_name,optional"` - LocalBindPort int `mapstructure:"local_bind_port" hcl:"local_bind_port,optional"` - Datacenter string `mapstructure:"datacenter" hcl:"datacenter,optional"` - LocalBindAddress string `mapstructure:"local_bind_address" hcl:"local_bind_address,optional"` + DestinationName string `mapstructure:"destination_name" hcl:"destination_name,optional"` + LocalBindPort int `mapstructure:"local_bind_port" hcl:"local_bind_port,optional"` + Datacenter string `mapstructure:"datacenter" hcl:"datacenter,optional"` + LocalBindAddress string `mapstructure:"local_bind_address" hcl:"local_bind_address,optional"` + MeshGateway *ConsulMeshGateway `mapstructure:"mesh_gateway" hcl:"mesh_gateway,block"` +} + +func (cu *ConsulUpstream) Copy() *ConsulUpstream { + if cu == nil { + return nil + } + return &ConsulUpstream{ + DestinationName: cu.DestinationName, + LocalBindPort: cu.LocalBindPort, + Datacenter: cu.Datacenter, + LocalBindAddress: cu.LocalBindAddress, + MeshGateway: cu.MeshGateway.Copy(), + } +} + +func (cu *ConsulUpstream) Canonicalize() { + if cu == nil { + return + } + cu.MeshGateway.Canonicalize() } type ConsulExposeConfig struct { @@ -326,8 +390,8 @@ type ConsulGateway struct { // Terminating represents the Consul Configuration Entry for a Terminating Gateway. Terminating *ConsulTerminatingConfigEntry `hcl:"terminating,block"` - // Mesh is not yet supported. - // Mesh *ConsulMeshConfigEntry + // Mesh indicates the Consul service should be a Mesh Gateway. + Mesh *ConsulMeshConfigEntry `hcl:"mesh,block"` } func (g *ConsulGateway) Canonicalize() { @@ -643,6 +707,21 @@ func (e *ConsulTerminatingConfigEntry) Copy() *ConsulTerminatingConfigEntry { } } -// ConsulMeshConfigEntry is not yet supported. -// type ConsulMeshConfigEntry struct { -// } +// ConsulMeshConfigEntry is a stub used to represent that the gateway service type +// should be for a Mesh Gateway. Unlike Ingress and Terminating, there is no +// actual Consul Config Entry type for mesh-gateway, at least for now. We still +// create a type for future proofing, instead just using a bool for example. +type ConsulMeshConfigEntry struct { + // nothing in here +} + +func (e *ConsulMeshConfigEntry) Canonicalize() { + return +} + +func (e *ConsulMeshConfigEntry) Copy() *ConsulMeshConfigEntry { + if e == nil { + return nil + } + return new(ConsulMeshConfigEntry) +} diff --git a/website/content/docs/job-specification/gateway.mdx b/website/content/docs/job-specification/gateway.mdx index a679b805f88..54363a76c8e 100644 --- a/website/content/docs/job-specification/gateway.mdx +++ b/website/content/docs/job-specification/gateway.mdx @@ -43,6 +43,8 @@ Exactly one of `ingress` or `terminating` must be configured. that will be associated with the service. - `terminating` ([terminating]: nil) - Configuration Entry of type `terminating-gateway` that will be associated with the service. +- `mesh` ([mesh]: nil) - Indicates a mesh gateway will be associated + with the service. ### `proxy` Parameters @@ -166,6 +168,10 @@ envoy_gateway_bind_addresses "" { - `sni` `(string: )` - An optional hostname or domain name to specify during the TLS handshake. +### `mesh` Parameters + + The `mesh` block currently does not have any configurable parameters. + ### Gateway with host networking Nomad supports running gateways using host networking. A static port must be allocated @@ -433,6 +439,201 @@ job "countdash-terminating" { } ``` +#### mesh gateway + +Mesh gateways are useful when Connect services need to make cross-datacenter +requests where not all nodes in each datacenter have full connectivity. This example +demonstrates using mesh gateways to enable making requests between datacenters +`one` and `two`, where each mesh gateway will bind to the `public` host network +configured on at least one Nomad client in each datacenter. + +Job running where Nomad and Consul are in datacenter `one`. + +```hcl +job "countdash-mesh-one" { + datacenters = ["one"] + + group "mesh-gateway-one" { + network { + mode = "bridge" + + # A mesh gateway will require a host_network configured on at least one + # Nomad client that can establish cross-datacenter connections. Nomad will + # automatically schedule the mesh gateway task on compatible Nomad clients. + port "mesh_wan" { + host_network = "public" + } + } + + service { + name = "mesh-gateway" + + # The mesh gateway connect service should be configured to use a port from + # the host_network capable of cross-datacenter connections. + port = "mesh_wan" + + connect { + gateway { + mesh { + # No configuration options in the mesh block. + } + + # Consul gateway [envoy] proxy options. + proxy { + # The following options are automatically set by Nomad if not explicitly + # configured with using bridge networking. + # + # envoy_gateway_no_default_bind = true + # envoy_gateway_bind_addresses "lan" { + # address = "0.0.0.0" + # port = + # } + # envoy_gateway_bind_addresses "wan" { + # address = "0.0.0.0" + # port = + # } + # Additional options are documented at + # https://www.nomadproject.io/docs/job-specification/gateway#proxy-parameters + } + } + } + } + } + + group "dashboard" { + network { + mode = "bridge" + + port "http" { + static = 9002 + to = 9002 + } + } + + service { + name = "count-dashboard" + port = "9002" + + connect { + sidecar_service { + proxy { + upstreams { + destination_name = "count-api" + local_bind_port = 8080 + + # This dashboard service is running in datacenter "one", and will + # make requests to the "count-api" service running in datacenter + # "two", by going through the mesh gateway in each datacenter. + datacenter = "two" + + mesh_gateway { + # Using "local" mode indicates requests should exit this datacenter + # through the mesh gateway, and enter the destination datacenter + # through a mesh gateway in that datacenter. + # Using "remote" mode indicates requests should bypass the local + # mesh gateway, instead directly connecting to the mesh gateway + # in the destination datacenter. + mode = "local" + } + } + } + } + } + } + + task "dashboard" { + driver = "docker" + + env { + COUNTING_SERVICE_URL = "http://${NOMAD_UPSTREAM_ADDR_count_api}" + } + + config { + image = "hashicorpnomad/counter-dashboard:v3" + } + } + } +} +``` + +Job running where Nomad and Consul are in datacenter `two`. + +```hcl +job "countdash-mesh-two" { + datacenters = ["two"] + + group "mesh-gateway-two" { + network { + mode = "bridge" + + # A mesh gateway will require a host_network configured for at least one + # Nomad client that can establish cross-datacenter connections. Nomad will + # automatically schedule the mesh gateway task on compatible Nomad clients. + port "mesh_wan" { + host_network = "public" + } + } + + service { + name = "mesh-gateway" + + # The mesh gateway connect service should be configured to use a port from + # the host_network capable of cross-datacenter connections. + port = "mesh_wan" + + connect { + gateway { + mesh { + # No configuration options in the mesh block. + } + + # Consul gateway [envoy] proxy options. + proxy { + # The following options are automatically set by Nomad if not explicitly + # configured with using bridge networking. + # + # envoy_gateway_no_default_bind = true + # envoy_gateway_bind_addresses "lan" { + # address = "0.0.0.0" + # port = + # } + # envoy_gateway_bind_addresses "wan" { + # address = "0.0.0.0" + # port = + # } + # Additional options are documented at + # https://www.nomadproject.io/docs/job-specification/gateway#proxy-parameters + } + } + } + } + } + + group "api" { + network { + mode = "bridge" + } + + service { + name = "count-api" + port = "9001" + connect { + sidecar_service {} + } + } + + task "api" { + driver = "docker" + + config { + image = "hashicorpnomad/counter-api:v3" + } + } + } +} +``` + + [address]: /docs/job-specification/gateway#address-parameters [advanced configuration]: https://www.consul.io/docs/connect/proxies/envoy#advanced-configuration [connect_timeout_ms]: https://www.consul.io/docs/agent/config-entries/service-resolver#connecttimeout @@ -446,3 +647,4 @@ job "countdash-terminating" { [sidecar_task]: /docs/job-specification/sidecar_task [terminating]: /docs/job-specification/gateway#terminating-parameters [tls]: /docs/job-specification/gateway#tls-parameters +[mesh]: /docs/job-specification/gateway#mesh-parameters diff --git a/website/content/docs/job-specification/upstreams.mdx b/website/content/docs/job-specification/upstreams.mdx index 5a8d27dc685..1b540d219d1 100644 --- a/website/content/docs/job-specification/upstreams.mdx +++ b/website/content/docs/job-specification/upstreams.mdx @@ -52,8 +52,11 @@ job "countdash" { upstreams { destination_name = "count-api" local_bind_port = 8080 - datacenter = "dc1" + datacenter = "dc2" local_bind_address = "127.0.0.1" + mesh_gateway { + mode = "local" + } } } } @@ -86,6 +89,22 @@ job "countdash" { local Consul datacenter. - `local_bind_address` - `(string: "")` - The address the proxy will receive connections for the upstream on. +- `mesh_gateway` ([mesh_gateway][mesh_gateway_param]: nil) - Configures the mesh gateway + behavior for connecting to this upstream. + +### `mesh_gateway` Parameters + +- `mode` `(string: "default")` - The mode of operation in which to use [Connect Mesh Gateways][mesh_gateways] +Defaults to the mode as determined by the Consul [service-defaults][service_defaults_mode] + configuration for the service. Can be configured with the following modes: + - `local` - In this mode the Connect proxy makes its outbound connection to a + gateway running in the same datacenter. That gateway is then responsible for + ensuring the data gets forwarded along to gateways in the destination datacenter. + - `remote` - In this mode the Connect proxy makes its outbound connection to a + gateway running in the destination datacenter. That gateway will then forward + the data to the final destination service. + - `none` - In this mode, no gateway is used and a Connect proxy makes its + outbound connections directly to the destination services. The `NOMAD_UPSTREAM_ADDR_` environment variables may be used to interpolate the upstream's `host:port` address. @@ -113,3 +132,6 @@ and a local bind port. [interpolation]: /docs/runtime/interpolation 'Nomad interpolation' [sidecar_service]: /docs/job-specification/sidecar_service 'Nomad sidecar service Specification' [upstreams]: /docs/job-specification/upstreams 'Nomad upstream config Specification' +[service_defaults_mode]: https://www.consul.io/docs/connect/config-entries/service-defaults#meshgateway +[mesh_gateway_param]: /docs/job-specification/upstreams#mesh_gateway-parameters +[mesh_gateways]: https://www.consul.io/docs/connect/gateways/mesh-gateway#mesh-gateways