Skip to content

Commit

Permalink
mirror mirror on the wall, who is the best service mesh of them all
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisboulton committed Aug 18, 2022
1 parent c7e42d8 commit c20c1d4
Show file tree
Hide file tree
Showing 15 changed files with 541 additions and 19 deletions.
33 changes: 31 additions & 2 deletions agent/consul/discoverychain/compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,9 @@ func (c *compiler) removeUnusedNodes() error {
case structs.DiscoveryGraphNodeTypeRouter:
for _, route := range node.Routes {
todo[route.NextNode] = struct{}{}
if route.MirrorPolicy != nil {
todo[route.MirrorPolicy.DestinationNode] = struct{}{}
}
}
case structs.DiscoveryGraphNodeTypeSplitter:
for _, split := range node.Splits {
Expand Down Expand Up @@ -621,8 +624,9 @@ func (c *compiler) assembleChain() error {

// Check to see if the destination is eligible for splitting.
var (
node *structs.DiscoveryGraphNode
err error
node *structs.DiscoveryGraphNode
mirrorNode *structs.DiscoveryGraphNode
err error
)
if dest.ServiceSubset == "" {
node, err = c.getSplitterOrResolverNode(
Expand All @@ -637,6 +641,31 @@ func (c *compiler) assembleChain() error {
if err != nil {
return err
}

// Check to see if traffic to this destination should be mirrored to another service
if dest.MirrorPolicy != nil {
mirrorNamespace := defaultIfEmpty(dest.MirrorPolicy.Namespace, destNamespace)
mirrorPartition := defaultIfEmpty(dest.MirrorPolicy.Partion, destPartition)
if dest.MirrorPolicy.ServiceSubset == "" {
mirrorNode, err = c.getSplitterOrResolverNode(
c.newTarget(dest.MirrorPolicy.Service, "", mirrorNamespace, mirrorPartition, ""),
)
} else {
mirrorNode, err = c.getResolverNode(
c.newTarget(dest.MirrorPolicy.Service, dest.MirrorPolicy.ServiceSubset, mirrorNamespace, mirrorPartition, ""),
false,
)
}
if err != nil {
return err
}

compiledRoute.MirrorPolicy = &structs.DiscoveryMirrorPolicy{
DestinationNode: mirrorNode.MapKey(),
Percent: dest.MirrorPolicy.Percent,
}
}

compiledRoute.NextNode = node.MapKey()
}

Expand Down
135 changes: 135 additions & 0 deletions agent/consul/discoverychain/compile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func TestCompile(t *testing.T) {
"default resolver with proxy defaults": testcase_DefaultResolver_WithProxyDefaults(),
"loadbalancer splitter and resolver": testcase_LBSplitterAndResolver(),
"loadbalancer resolver": testcase_LBResolver(),
"router with traffic mirroring": testcase_RouterWithMirrorPolicy(),
"service redirect to service with default resolver is not a default chain": testcase_RedirectToDefaultResolverIsNotDefaultChain(),
"service meta projection": testcase_ServiceMetaProjection(),
"service meta projection with redirect": testcase_ServiceMetaProjectionWithRedirect(),
Expand Down Expand Up @@ -2750,6 +2751,140 @@ func testcase_LBResolver() compileTestCase {
return compileTestCase{entries: entries, expect: expect}
}

func testcase_RouterWithMirrorPolicy() compileTestCase {
entries := newEntries()
setServiceProtocol(entries, "main", "http")
setServiceProtocol(entries, "mirror-source", "http")
setServiceProtocol(entries, "mirror-dest", "http")
setServiceProtocol(entries, "mirror-source-subset", "http")

entries.AddResolvers(
&structs.ServiceResolverConfigEntry{
Kind: "service-resolver",
Name: "mirror-dest",
Subsets: map[string]structs.ServiceResolverSubset{
"v1": {
Filter: "Service.Meta.version == 1",
},
"v2": {
Filter: "Service.Meta.version == 2",
},
},
},
)

entries.AddRouters(
&structs.ServiceRouterConfigEntry{
Kind: "service-router",
Name: "main",
Routes: []structs.ServiceRoute{
newSimpleRoute("mirror-source", func(r *structs.ServiceRoute) {
r.Destination.MirrorPolicy = &structs.ServiceRouteDestinationMirror{
Service: "mirror-dest",
}
}),
newSimpleRoute("mirror-source-subset", func(r *structs.ServiceRoute) {
r.Destination.MirrorPolicy = &structs.ServiceRouteDestinationMirror{
Service: "mirror-dest",
ServiceSubset: "v2",
}
}),
},
},
)

router := entries.GetRouter(structs.NewServiceID("main", nil))

expect := &structs.CompiledDiscoveryChain{
Protocol: "http",
StartNode: "router:main.default.default",
Nodes: map[string]*structs.DiscoveryGraphNode{
"resolver:main.default.default.dc1": {
Type: structs.DiscoveryGraphNodeTypeResolver,
Name: "main.default.default.dc1",
Resolver: &structs.DiscoveryResolver{
Default: true,
ConnectTimeout: 5 * time.Second,
Target: "main.default.default.dc1",
},
},
"router:main.default.default": {
Type: structs.DiscoveryGraphNodeTypeRouter,
Name: "main.default.default",
Routes: []*structs.DiscoveryRoute{
{
Definition: &router.Routes[0],
NextNode: "resolver:mirror-source.default.default.dc1",
MirrorPolicy: &structs.DiscoveryMirrorPolicy{
DestinationNode: "resolver:mirror-dest.default.default.dc1",
Percent: 0,
},
},
{
Definition: &router.Routes[1],
NextNode: "resolver:mirror-source-subset.default.default.dc1",
MirrorPolicy: &structs.DiscoveryMirrorPolicy{
DestinationNode: "resolver:v2.mirror-dest.default.default.dc1",
Percent: 0,
},
},
{
Definition: newDefaultServiceRoute("main", "default", "default"),
NextNode: "resolver:main.default.default.dc1",
},
},
},
"resolver:mirror-dest.default.default.dc1": {
Type: structs.DiscoveryGraphNodeTypeResolver,
Name: "mirror-dest.default.default.dc1",
Resolver: &structs.DiscoveryResolver{
ConnectTimeout: 5 * time.Second,
Target: "mirror-dest.default.default.dc1",
},
},
"resolver:v2.mirror-dest.default.default.dc1": {
Type: structs.DiscoveryGraphNodeTypeResolver,
Name: "v2.mirror-dest.default.default.dc1",
Resolver: &structs.DiscoveryResolver{
ConnectTimeout: 5 * time.Second,
Target: "v2.mirror-dest.default.default.dc1",
},
},
"resolver:mirror-source-subset.default.default.dc1": {
Type: structs.DiscoveryGraphNodeTypeResolver,
Name: "mirror-source-subset.default.default.dc1",
Resolver: &structs.DiscoveryResolver{
Default: true,
ConnectTimeout: 5 * time.Second,
Target: "mirror-source-subset.default.default.dc1",
},
},
"resolver:mirror-source.default.default.dc1": {
Type: structs.DiscoveryGraphNodeTypeResolver,
Name: "mirror-source.default.default.dc1",
Resolver: &structs.DiscoveryResolver{
Default: true,
ConnectTimeout: 5 * time.Second,
Target: "mirror-source.default.default.dc1",
},
},
},
Targets: map[string]*structs.DiscoveryTarget{
"main.default.default.dc1": newTarget("main", "", "default", "default", "dc1", nil),
"mirror-source.default.default.dc1": newTarget("mirror-source", "", "default", "default", "dc1", nil),
"mirror-dest.default.default.dc1": newTarget("mirror-dest", "", "default", "default", "dc1", nil),
"v2.mirror-dest.default.default.dc1": newTarget("mirror-dest", "v2", "default", "default", "dc1", func(t *structs.DiscoveryTarget) {
t.Subset = structs.ServiceResolverSubset{
Filter: "Service.Meta.version == 2",
}
}),
"mirror-source-subset.default.default.dc1": newTarget("mirror-source-subset", "", "default", "default", "dc1", nil),
},
}

return compileTestCase{entries: entries, expect: expect}
}

func newSimpleRoute(name string, muts ...func(*structs.ServiceRoute)) structs.ServiceRoute {
r := structs.ServiceRoute{
Match: &structs.ServiceRouteMatch{
Expand Down
12 changes: 12 additions & 0 deletions agent/proxycfg/testing_upstreams.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,18 @@ func setupTestVariationDiscoveryChain(
},
},
},
{
Match: httpMatch(&structs.ServiceRouteHTTPMatch{
PathPrefix: "/mirror",
}),
Destination: &structs.ServiceRouteDestination{
Service: "original-destination",
MirrorPolicy: &structs.ServiceRouteDestinationMirror{
Service: "mirror-destination",
Percent: 25,
},
},
},
},
},
)
Expand Down
33 changes: 33 additions & 0 deletions agent/structs/config_entry_discoverychain.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,12 @@ func (e *ServiceRouterConfigEntry) Validate() error {
if route.Destination.PrefixRewrite != "" && !eligibleForPrefixRewrite {
return fmt.Errorf("Route[%d] cannot make use of PrefixRewrite without configuring either PathExact or PathPrefix", i)
}

if route.Destination.MirrorPolicy != nil {
if err := route.Destination.MirrorPolicy.Validate(); err != nil {
return fmt.Errorf("Route[%d] has invalid mirror policy: %s", i, err)
}
}
}
}

Expand Down Expand Up @@ -416,6 +422,9 @@ type ServiceRouteDestination struct {
// Allow HTTP header manipulation to be configured.
RequestHeaders *HTTPHeaderModifiers `json:",omitempty" alias:"request_headers"`
ResponseHeaders *HTTPHeaderModifiers `json:",omitempty" alias:"response_headers"`

// MirrorPolicy allows for traffic to this destination to be mirrored elsewhere
MirrorPolicy *ServiceRouteDestinationMirror `json:",omitempty" alias:"mirror_policy"`
}

func (e *ServiceRouteDestination) MarshalJSON() ([]byte, error) {
Expand Down Expand Up @@ -458,6 +467,30 @@ func (d *ServiceRouteDestination) HasRetryFeatures() bool {
return d.NumRetries > 0 || d.RetryOnConnectFailure || len(d.RetryOnStatusCodes) > 0
}

// ServiceRouteDestinationMirrors allow traffic to a destination service
// to be mirrored to another service, where the reply will be discarded.
//
// Optionally, a percentage of traffic can be mirrored.
type ServiceRouteDestinationMirror struct {
Service string `json:",omitempty"`
Namespace string `json:",omitempty"`
Partion string `json:",omitempty"`
ServiceSubset string `json:",omitempty" alias:"service_subset"`
Percent uint32 `json:",omitempty" alias:"percent"`
}

func (e *ServiceRouteDestinationMirror) Validate() error {
if e.Service == "" {
return fmt.Errorf("service to mirror traffic to is required")
}

if e.Percent < 0 || e.Percent > 100 {
return fmt.Errorf("percent of traffic to mirror must be between 0 and 100. got %d", e.Percent)
}

return nil
}

// ServiceSplitterConfigEntry defines how incoming requests are split across
// different subsets of a single service (like during staged canary rollouts),
// or perhaps across different services (like during a v2 rewrite or other type
Expand Down
57 changes: 57 additions & 0 deletions agent/structs/config_entry_discoverychain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1857,6 +1857,63 @@ func TestServiceRouterConfigEntry(t *testing.T) {
}),
validateErr: "cannot make use of PrefixRewrite without configuring either PathExact or PathPrefix",
},
/////////////////
// mirror with service
{
name: "route traffic mirror",
entry: makerouter(ServiceRoute{
Match: httpMatch(&ServiceRouteHTTPMatch{
PathPrefix: "/",
}),
Destination: &ServiceRouteDestination{
Service: "other",
MirrorPolicy: &ServiceRouteDestinationMirror{
Service: "mirror-to",
ServiceSubset: "v2",
Percent: 11,
},
},
}),
check: func(t *testing.T, entry *ServiceRouterConfigEntry) {
dm := entry.Routes[0].Destination.MirrorPolicy
require.Equal(t, "mirror-to", dm.Service)
require.Equal(t, "v2", dm.ServiceSubset)
require.Equal(t, uint32(11), dm.Percent)
},
},
{
name: "route traffic mirror without destination",
entry: makerouter(ServiceRoute{
Match: httpMatchParam(ServiceRouteHTTPMatchQueryParam{
Name: "foo",
Exact: "bar",
}),
Destination: &ServiceRouteDestination{
Service: "other",
MirrorPolicy: &ServiceRouteDestinationMirror{
Percent: 11,
},
},
}),
validateErr: "service to mirror traffic to is required",
},
{
name: "route traffic mirror with invalid mirror percent",
entry: makerouter(ServiceRoute{
Match: httpMatchParam(ServiceRouteHTTPMatchQueryParam{
Name: "foo",
Exact: "bar",
}),
Destination: &ServiceRouteDestination{
Service: "other",
MirrorPolicy: &ServiceRouteDestinationMirror{
Service: "mirror-to",
Percent: 150,
},
},
}),
validateErr: "percent of traffic to mirror must be between 0 and 100",
},
////////////////
{
name: "route with method matches",
Expand Down
17 changes: 16 additions & 1 deletion agent/structs/config_entry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,11 @@ func TestDecodeConfigEntry(t *testing.T) {
num_retries = 12345
retry_on_connect_failure = true
retry_on_status_codes = [401, 209]
mirror_policy {
service = "banana"
service_subset = "apple"
percent = 25
}
request_headers {
add {
x-foo = "bar"
Expand Down Expand Up @@ -580,6 +585,11 @@ func TestDecodeConfigEntry(t *testing.T) {
NumRetries = 12345
RetryOnConnectFailure = true
RetryOnStatusCodes = [401, 209]
MirrorPolicy {
Service = "banana"
ServiceSubset = "apple"
Percent = 25
}
RequestHeaders {
Add {
x-foo = "bar"
Expand Down Expand Up @@ -681,6 +691,11 @@ func TestDecodeConfigEntry(t *testing.T) {
NumRetries: 12345,
RetryOnConnectFailure: true,
RetryOnStatusCodes: []uint32{401, 209},
MirrorPolicy: &ServiceRouteDestinationMirror{
Service: "banana",
ServiceSubset: "apple",
Percent: 25,
},
RequestHeaders: &HTTPHeaderModifiers{
Add: map[string]string{"x-foo": "bar"},
Set: map[string]string{"bar": "baz"},
Expand Down Expand Up @@ -1727,7 +1742,7 @@ func TestDecodeConfigEntry(t *testing.T) {
}
HTTP {
SanitizeXForwardedClientCert = true
}
}
`,
expect: &MeshConfigEntry{
Meta: map[string]string{
Expand Down
Loading

0 comments on commit c20c1d4

Please sign in to comment.