Skip to content

Commit

Permalink
Issue 554 Added compiled glob matching using LRU Cache (#615)
Browse files Browse the repository at this point in the history
* consul: refactor service monitor

Refactor the set of functions which watch the consul state
and generate the route commands into a set of objects to make
them testable and extendable.

* consul: move build route command logic to separate object

... and finally add some tests.

* consul: fetch route updates concurrently

The code which updates the routing table from consul was using a
single go routine to fetch data from consul. This can be a slow
process if consul has lots of registered services.

This patch adds an option `registry.consul.serviceMonitors`
to increase the concurrency for the route updates.

* issue - 558 updated to include custom http.Transport

* added global polling interval for issue 558

* issue 558 updates and doco adds

* added compiled glob matching cache using LRU cache

* updated glob_cache to use sync.Map

* issue #554 refactored to use sync.Map instead of RWmutex locks

* updated glob_cache to support type casting of glob.Glob Interface

* added type assertion of glob.Glob interface instead of type switch

* rebased to master and fixed issue with load_test

* added Glob to grpc proxy

* removing polling interval

* updated tests and remove PollingInterval

Co-authored-by: Frank Schröder <[email protected]>
Co-authored-by: Aaron Hurt <[email protected]>
  • Loading branch information
3 people authored Jan 29, 2020
1 parent dfee47a commit 828d81f
Show file tree
Hide file tree
Showing 18 changed files with 216 additions and 29 deletions.
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Config struct {
ProfilePath string
Insecure bool
GlobMatchingDisabled bool
GlobCacheSize int
}

type CertSource struct {
Expand Down
2 changes: 2 additions & 0 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,6 @@ var defaultConfig = &Config{
SpanHost: "localhost:9998",
TraceID128Bit: true,
},

GlobCacheSize: 1000,
}
1 change: 1 addition & 0 deletions config/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ func load(cmdline, environ, envprefix []string, props *properties.Properties) (c
f.StringVar(&cfg.Tracing.SpanHost, "tracing.SpanHost", defaultConfig.Tracing.SpanHost, "Host:Port info to add to spans")
f.BoolVar(&cfg.Tracing.TraceID128Bit, "tracing.TraceID128Bit", defaultConfig.Tracing.TraceID128Bit, "Generate 128 bit trace IDs")
f.BoolVar(&cfg.GlobMatchingDisabled, "glob.matching.disabled", defaultConfig.GlobMatchingDisabled, "Disable Glob Matching on routes, one of [true, false]")
f.IntVar(&cfg.GlobCacheSize, "glob.cache.size", defaultConfig.GlobCacheSize, "sets the size of the glob cache")

f.StringVar(&cfg.Registry.Custom.Host, "registry.custom.host", defaultConfig.Registry.Custom.Host, "custom back end hostname/port")
f.StringVar(&cfg.Registry.Custom.Scheme, "registry.custom.scheme", defaultConfig.Registry.Custom.Scheme, "custom back end scheme - http/https")
Expand Down
7 changes: 7 additions & 0 deletions config/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1096,6 +1096,13 @@ func TestLoad(t *testing.T) {
cfg: func(cfg *Config) *Config { return nil },
err: errors.New("missing 'file' in auth 'foo'"),
},
{
args: []string{"-glob.cache.size", "1000"},
cfg: func(cfg *Config) *Config {
cfg.GlobCacheSize = 1000
return cfg
},
},
{
args: []string{"-cfg"},
cfg: func(cfg *Config) *Config { return nil },
Expand Down
9 changes: 9 additions & 0 deletions docs/content/ref/glob.cache.size.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
title: "glob.cache.size"
---

`glob.cache.size` Sets the globCache size used for matching on route lookups.

The default is

glob.cache.size = 1000
12 changes: 12 additions & 0 deletions fabio.properties
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@
#
# proxy.gzip.contenttype =


# proxy.auth configures one or more auth schemes.
#
# Each auth scheme is configured with a list of
Expand Down Expand Up @@ -524,6 +525,7 @@
# proxy.auth = name=mybasicauth;type=basic;file=p/creds.htpasswd
# name=myotherauth;type=basic;file=p/other-creds.htpasswd;realm=myrealm


# log.access.format configures the format of the access log.
#
# If the value is either 'common' or 'combined' then the logs are written in
Expand Down Expand Up @@ -722,6 +724,7 @@
#
# registry.consul.tls.cafile =


# registry.consul.tls.capath the path to the folder containing CA certificates.
#
# This is the full path to the folder with CA certificates while using TLS transport to
Expand Down Expand Up @@ -890,6 +893,7 @@
#
# registry.consul.serviceMonitors = 1


# registry.consul.pollInterval configures the poll interval
# for route updates. If Poll interval is set to 0 the updates will
# be disabled and fall back to blocking queries. Other values can
Expand Down Expand Up @@ -947,6 +951,7 @@
#
# registry.custom.path =


# registry.custom.queryparams is the query parameters used in the custom back
# end API Call
#
Expand All @@ -971,6 +976,12 @@
#
# glob.matching.disabled = false

# glob.cache.size sets the globCache size used for matching on route lookups.
#
# The default is
#
# glob.cache.size = 1000


# metrics.target configures the backend the metrics values are
# sent to.
Expand Down Expand Up @@ -1246,6 +1257,7 @@
#
# tracing.TracingEnabled = false


# tracing.CollectorType sets what type of collector is used.
# Currently only two types are supported http and kafka
#
Expand Down
11 changes: 10 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ func main() {
}

func newGrpcProxy(cfg *config.Config, tlscfg *tls.Config) []grpc.ServerOption {

//Init Glob Cache
globCache := route.NewGlobCache(cfg.GlobCacheSize)

statsHandler := &proxy.GrpcStatsHandler{
Connect: metrics.DefaultRegistry.GetCounter("grpc.conn"),
Request: metrics.DefaultRegistry.GetTimer("grpc.requests"),
Expand All @@ -157,6 +161,7 @@ func newGrpcProxy(cfg *config.Config, tlscfg *tls.Config) []grpc.ServerOption {
proxyInterceptor := proxy.GrpcProxyInterceptor{
Config: cfg,
StatsHandler: statsHandler,
GlobCache: globCache,
}

handler := grpc_proxy.TransparentHandler(proxy.GetGRPCDirector(tlscfg))
Expand All @@ -171,6 +176,10 @@ func newGrpcProxy(cfg *config.Config, tlscfg *tls.Config) []grpc.ServerOption {

func newHTTPProxy(cfg *config.Config) http.Handler {
var w io.Writer

//Init Glob Cache
globCache := route.NewGlobCache(cfg.GlobCacheSize)

switch cfg.Log.AccessTarget {
case "":
log.Printf("[INFO] Access logging disabled")
Expand Down Expand Up @@ -223,7 +232,7 @@ func newHTTPProxy(cfg *config.Config) http.Handler {
Transport: newTransport(nil),
InsecureTransport: newTransport(&tls.Config{InsecureSkipVerify: true}),
Lookup: func(r *http.Request) *route.Target {
t := route.GetTable().Lookup(r, r.Header.Get("trace"), pick, match, cfg.GlobMatchingDisabled)
t := route.GetTable().Lookup(r, r.Header.Get("trace"), pick, match, globCache, cfg.GlobMatchingDisabled)
if t == nil {
notFound.Inc(1)
log.Print("[WARN] No route for ", r.Host, r.URL)
Expand Down
3 changes: 2 additions & 1 deletion proxy/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func GetGRPCDirector(tlscfg *tls.Config) func(ctx context.Context, fullMethodNam
type GrpcProxyInterceptor struct {
Config *config.Config
StatsHandler *GrpcStatsHandler
GlobCache *route.GlobCache
}

type targetKey struct{}
Expand Down Expand Up @@ -153,7 +154,7 @@ func (g GrpcProxyInterceptor) lookup(ctx context.Context, fullMethodName string)
Header: headers,
}

return route.GetTable().Lookup(req, req.Header.Get("trace"), pick, match, g.Config.GlobMatchingDisabled), nil
return route.GetTable().Lookup(req, req.Header.Get("trace"), pick, match, g.GlobCache, g.Config.GlobMatchingDisabled), nil
}

type GrpcStatsHandler struct {
Expand Down
17 changes: 10 additions & 7 deletions proxy/http_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ const (
globDisabled = true
)

//Global GlobCache for Testing
var globCache = route.NewGlobCache(1000)

func TestProxyProducesCorrectXForwardedSomethingHeader(t *testing.T) {
var hdr http.Header = make(http.Header)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -194,7 +197,7 @@ func TestProxyStripsPath(t *testing.T) {
Transport: http.DefaultTransport,
Lookup: func(r *http.Request) *route.Target {
tbl, _ := route.NewTable(bytes.NewBufferString("route add mock /foo/bar " + server.URL + ` opts "strip=/foo"`))
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globEnabled)
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globCache, globEnabled)
},
})
defer proxy.Close()
Expand Down Expand Up @@ -230,7 +233,7 @@ func TestProxyHost(t *testing.T) {
},
},
Lookup: func(r *http.Request) *route.Target {
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globEnabled)
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globCache, globEnabled)
},
})
defer proxy.Close()
Expand Down Expand Up @@ -277,7 +280,7 @@ func TestHostRedirect(t *testing.T) {
Transport: http.DefaultTransport,
Lookup: func(r *http.Request) *route.Target {
r.Host = "c.com"
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globEnabled)
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globCache, globEnabled)
},
})
defer proxy.Close()
Expand Down Expand Up @@ -316,7 +319,7 @@ func TestPathRedirect(t *testing.T) {
proxy := httptest.NewServer(&HTTPProxy{
Transport: http.DefaultTransport,
Lookup: func(r *http.Request) *route.Target {
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globEnabled)
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globCache, globEnabled)
},
})
defer proxy.Close()
Expand Down Expand Up @@ -484,7 +487,7 @@ func TestProxyHTTPSUpstream(t *testing.T) {
Transport: &http.Transport{TLSClientConfig: tlsClientConfig()},
Lookup: func(r *http.Request) *route.Target {
tbl, _ := route.NewTable(bytes.NewBufferString("route add srv / " + server.URL + ` opts "proto=https"`))
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globEnabled)
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globCache, globEnabled)
},
})
defer proxy.Close()
Expand All @@ -511,7 +514,7 @@ func TestProxyHTTPSUpstreamSkipVerify(t *testing.T) {
},
Lookup: func(r *http.Request) *route.Target {
tbl, _ := route.NewTable(bytes.NewBufferString("route add srv / " + server.URL + ` opts "proto=https tlsskipverify=true"`))
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globEnabled)
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globCache, globEnabled)
},
})
defer proxy.Close()
Expand Down Expand Up @@ -712,7 +715,7 @@ func BenchmarkProxyLogger(b *testing.B) {
Transport: http.DefaultTransport,
Lookup: func(r *http.Request) *route.Target {
tbl, _ := route.NewTable(bytes.NewBufferString("route add mock / " + server.URL))
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globEnabled)
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globCache, globEnabled)
},
Logger: l,
}
Expand Down
4 changes: 1 addition & 3 deletions proxy/listen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ func TestGracefulShutdown(t *testing.T) {
}))
defer srv.Close()

globDisabled := false

// start proxy
addr := "127.0.0.1:57777"
var wg sync.WaitGroup
Expand All @@ -35,7 +33,7 @@ func TestGracefulShutdown(t *testing.T) {
Transport: http.DefaultTransport,
Lookup: func(r *http.Request) *route.Target {
tbl, _ := route.NewTable(bytes.NewBufferString("route add svc / " + srv.URL))
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globDisabled)
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globCache, globEnabled)
},
}
l := config.Listen{Addr: addr}
Expand Down
6 changes: 2 additions & 4 deletions proxy/ws_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ func TestProxyWSUpstream(t *testing.T) {
defer wssServer.Close()
t.Log("Started WSS server: ", wssServer.URL)

globDisabled := false

routes := "route add ws /ws " + wsServer.URL + "\n"
routes += "route add ws /wss " + wssServer.URL + ` opts "proto=https"` + "\n"
routes += "route add ws /insecure " + wssServer.URL + ` opts "proto=https tlsskipverify=true"` + "\n"
Expand All @@ -46,7 +44,7 @@ func TestProxyWSUpstream(t *testing.T) {
InsecureTransport: &http.Transport{TLSClientConfig: tlsInsecureConfig()},
Lookup: func(r *http.Request) *route.Target {
tbl, _ := route.NewTable(bytes.NewBufferString(routes))
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globDisabled)
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globCache, globEnabled)
},
})
defer httpProxy.Close()
Expand All @@ -58,7 +56,7 @@ func TestProxyWSUpstream(t *testing.T) {
InsecureTransport: &http.Transport{TLSClientConfig: tlsInsecureConfig()},
Lookup: func(r *http.Request) *route.Target {
tbl, _ := route.NewTable(bytes.NewBufferString(routes))
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globDisabled)
return tbl.Lookup(r, "", route.Picker["rr"], route.Matcher["prefix"], globCache, globEnabled)
},
})
httpsProxy.TLS = tlsServerConfig()
Expand Down
2 changes: 0 additions & 2 deletions registry/consul/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,12 @@ func (w *ServiceMonitor) Watch(updates chan string) {
} else {
q = &api.QueryOptions{RequireConsistent: true, WaitIndex: lastIndex}
}

checks, meta, err := w.client.Health().State("any", q)
if err != nil {
log.Printf("[WARN] consul: Error fetching health state. %v", err)
time.Sleep(time.Second)
continue
}

log.Printf("[DEBUG] consul: Health changed to #%d", meta.LastIndex)

// determine which services have passing health checks
Expand Down
65 changes: 65 additions & 0 deletions route/glob_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package route

import (
"github.com/gobwas/glob"
"sync"
)

// GlobCache implements an LRU cache for compiled glob patterns.
type GlobCache struct {
// m maps patterns to compiled glob matchers.
m sync.Map

// l contains the added patterns and serves as an LRU cache.
// l has a fixed size and is initialized in the constructor.
l []string

// h is the first element in l.
h int

// n is the number of elements in l.
n int
}

func NewGlobCache(size int) *GlobCache {
return &GlobCache{
l: make([]string, size),
}
}

// Get returns the compiled glob pattern if it compiled without
// error. Otherwise, the function returns nil. If the pattern
// is not in the cache it will be added.
func (c *GlobCache) Get(pattern string) (glob.Glob, error) {
// fast path
if glb, ok := c.m.Load(pattern); ok {
//Type Assert the returned interface{}
return glb.(glob.Glob), nil
}

// try to compile pattern
glbCompiled, err := glob.Compile(pattern)
if err != nil {
return nil, err
}

// if the LRU buffer is not full just append
// the element to the buffer.
if c.n < len(c.l) {
c.m.Store(pattern, glbCompiled)
c.l[c.n] = pattern
c.n++
return glbCompiled, nil
}

// otherwise, remove the oldest element and move
// the head. Note that once the buffer is full
// (c.n == len(c.l)) it will never become smaller
// again.
// TODO add logging for cache full - How will this impact performance
c.m.Delete(c.l[c.h])
c.m.Store(pattern, glbCompiled)
c.l[c.h] = pattern
c.h = (c.h + 1) % c.n
return glbCompiled, nil
}
Loading

0 comments on commit 828d81f

Please sign in to comment.