Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streaming: replace agent/cache with submatview.Store #10112

Merged
merged 29 commits into from
Apr 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
3d52abb
submatview: setup the interface for the store
dnephin Oct 19, 2020
ddddbdb
submatview: rough outline of the Get and Notify methods.
dnephin Feb 22, 2021
c574108
submatview: setup testing structure
dnephin Feb 22, 2021
7d13465
submatview: set up expiry of materializers
dnephin Feb 22, 2021
1d9d7d0
submatview: track requests instead of notifiers
dnephin Feb 23, 2021
832f1a2
submatview: reduce the getFromView implementation
dnephin Feb 23, 2021
c106399
submatview: more test cases for Store.Get
dnephin Feb 23, 2021
9854c42
submatview: test store with Get and Notify calls together
dnephin Feb 23, 2021
6ebb0f8
submatview: test Store.Run
dnephin Feb 25, 2021
6f29fa0
rpcclient: move streaming cache tests
dnephin Feb 25, 2021
55a677b
rpcclient/health: integrate submatview.Store into rpcclient/health
dnephin Feb 25, 2021
4fb2ba9
submatview: move error return to NewMaterializer
dnephin Apr 19, 2021
2a26085
connect: do not set QuerySource.Node
dnephin Apr 19, 2021
0558586
health: use blocking queries for near query parameter
dnephin Apr 19, 2021
a16c377
rpcclient/health: move all backend routing logic to client
dnephin Apr 19, 2021
e229b87
health: create health.Client in Agent.New
dnephin Apr 20, 2021
10ec9c2
rpcclient: close the grpc.ClientConn on shutdown
dnephin Apr 22, 2021
5fa0dea
rpcclient/health: fix data race in a test
dnephin Apr 22, 2021
c932833
rpcclient:health: fix a data race and flake in tests
dnephin Apr 22, 2021
034c5c5
sdk/retry: remove the need to pass args to NextOr
dnephin Apr 23, 2021
a2986eb
sdk/retry: support ending the iteration early
dnephin Apr 23, 2021
dacf500
submatview: fix two flaky tests
dnephin Apr 22, 2021
79a54f2
agent: fix data race in tests caused by grpc log init
dnephin Apr 26, 2021
d953741
rpcclient/health: convert tests to the new submatview.Store interface
dnephin Apr 26, 2021
46b7d0b
submatview: godoc
dnephin Apr 27, 2021
23e1cd6
submatview: only return materializer from getEntry
dnephin Apr 27, 2021
1139889
submatview: avoid sorting results unnecessarily
dnephin Apr 27, 2021
31cd580
Add changelog
dnephin Apr 27, 2021
3a27fce
submatview: fix godoc and comment typos
dnephin Apr 28, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/10112.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
streaming: fixes a bug that would cause context cancellation errors when a cache entry expired while requests were active.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it's worth adding either in this entry or a separate one, an explicit reference/link to the scalability challenge and the reduction in the long tail of deliveries at scale? Not important though just a thought.

```
25 changes: 17 additions & 8 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,15 +373,21 @@ func New(bd BaseDeps) (*Agent, error) {
cache: bd.Cache,
}

cacheName := cachetype.HealthServicesName
if bd.RuntimeConfig.UseStreamingBackend {
cacheName = cachetype.StreamingHealthServicesName
// TODO: create rpcClientHealth in BaseDeps once NetRPC is available without Agent
conn, err := bd.GRPCConnPool.ClientConn(bd.RuntimeConfig.Datacenter)
if err != nil {
return nil, err
dnephin marked this conversation as resolved.
Show resolved Hide resolved
}

a.rpcClientHealth = &health.Client{
Cache: bd.Cache,
NetRPC: &a,
CacheName: cacheName,
CacheNameIngress: cachetype.HealthServicesName,
Cache: bd.Cache,
NetRPC: &a,
CacheName: cachetype.HealthServicesName,
ViewStore: bd.ViewStore,
MaterializerDeps: health.MaterializerDeps{
Conn: conn,
Logger: bd.Logger.Named("rpcclient.health"),
},
}

a.serviceManager = NewServiceManager(&a)
Expand Down Expand Up @@ -533,14 +539,15 @@ func (a *Agent) Start(ctx context.Context) error {
return fmt.Errorf("unexpected ACL default policy value of %q", a.config.ACLDefaultPolicy)
}

go a.baseDeps.ViewStore.Run(&lib.StopChannelContext{StopCh: a.shutdownCh})

// Start the proxy config manager.
a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{
Cache: a.cache,
Health: a.rpcClientHealth,
Logger: a.logger.Named(logging.ProxyConfig),
State: a.State,
Source: &structs.QuerySource{
Node: a.config.NodeName,
Datacenter: a.config.Datacenter,
Segment: a.config.SegmentName,
},
Expand Down Expand Up @@ -1385,6 +1392,8 @@ func (a *Agent) ShutdownAgent() error {
a.cache.Close()
}

a.rpcClientHealth.Close()

var err error
if a.delegate != nil {
err = a.delegate.Shutdown()
Expand Down
9 changes: 9 additions & 0 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"gopkg.in/square/go-jose.v2/jwt"

"github.com/hashicorp/consul/agent/cache"
Expand Down Expand Up @@ -307,6 +308,7 @@ func TestAgent_HTTPMaxHeaderBytes(t *testing.T) {
Logger: hclog.NewInterceptLogger(nil),
Tokens: new(token.Store),
TLSConfigurator: tlsConf,
GRPCConnPool: &fakeGRPCConnPool{},
},
RuntimeConfig: &config.RuntimeConfig{
HTTPAddrs: []net.Addr{
Expand Down Expand Up @@ -355,6 +357,12 @@ func TestAgent_HTTPMaxHeaderBytes(t *testing.T) {
}
}

type fakeGRPCConnPool struct{}

func (f fakeGRPCConnPool) ClientConn(_ string) (*grpc.ClientConn, error) {
return nil, nil
}

func TestAgent_ReconnectConfigWanDisabled(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
Expand Down Expand Up @@ -5173,6 +5181,7 @@ func TestAgent_ListenHTTP_MultipleAddresses(t *testing.T) {
Logger: hclog.NewInterceptLogger(nil),
Tokens: new(token.Store),
TLSConfigurator: tlsConf,
GRPCConnPool: &fakeGRPCConnPool{},
},
RuntimeConfig: &config.RuntimeConfig{
HTTPAddrs: []net.Addr{
Expand Down
66 changes: 0 additions & 66 deletions agent/cache-types/streaming_test.go

This file was deleted.

11 changes: 8 additions & 3 deletions agent/consul/options.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package consul

import (
"github.com/hashicorp/consul/agent/grpc"
"github.com/hashicorp/go-hclog"
"google.golang.org/grpc"

"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/go-hclog"
)

type Deps struct {
Expand All @@ -15,5 +16,9 @@ type Deps struct {
Tokens *token.Store
Router *router.Router
ConnPool *pool.ConnPool
GRPCConnPool *grpc.ClientConnPool
GRPCConnPool GRPCClientConner
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was done so that tests which use Agent.New but don't actually need a gRPC connection pool can use a no-op fake.

}

type GRPCClientConner interface {
ClientConn(datacenter string) (*grpc.ClientConn, error)
}
3 changes: 1 addition & 2 deletions agent/consul/subscribe_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@ import (

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/stream"
agentgrpc "github.com/hashicorp/consul/agent/grpc"
"github.com/hashicorp/consul/agent/rpc/subscribe"
"github.com/hashicorp/consul/agent/structs"
)

type subscribeBackend struct {
srv *Server
connPool *agentgrpc.ClientConnPool
connPool GRPCClientConner
}

// TODO: refactor Resolve methods to an ACLBackend that can be used by all
Expand Down
7 changes: 0 additions & 7 deletions agent/health_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,6 @@ func (s *HTTPHandlers) healthServiceNodes(resp http.ResponseWriter, req *http.Re
return nil, nil
}

useStreaming := s.agent.config.UseStreamingBackend && args.MinQueryIndex > 0 && !args.Ingress
args.QueryOptions.UseCache = s.agent.config.HTTPUseCache && (args.QueryOptions.UseCache || useStreaming)

if args.QueryOptions.UseCache && useStreaming && args.Source.Node != "" {
return nil, BadRequestError{Reason: "'near' query param can not be used with streaming"}
}
Comment on lines -222 to -227
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic has moved into rpcclient/health.Client.useStreaming so that all of the logic to determine which backend to use is in a single place.


out, md, err := s.agent.rpcClientHealth.ServiceNodes(req.Context(), args)
if err != nil {
return nil, err
Expand Down
5 changes: 1 addition & 4 deletions agent/proxycfg/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,10 +352,7 @@ func testManager_BasicLifecycle(
require := require.New(t)
logger := testutil.Logger(t)
state := local.NewState(local.Config{}, logger, &token.Store{})
source := &structs.QuerySource{
Node: "node1",
Datacenter: "dc1",
}
source := &structs.QuerySource{Datacenter: "dc1"}

// Stub state syncing
state.TriggerSyncChanges = func() {}
Expand Down
91 changes: 73 additions & 18 deletions agent/rpcclient/health/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@ import (

"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/submatview"
"github.com/hashicorp/consul/proto/pbsubscribe"
)

// Client provides access to service health data.
type Client struct {
NetRPC NetRPC
Cache CacheGetter
// CacheName to use for service health.
CacheName string
// CacheNameIngress is the name of the cache type to use for ingress
// service health.
CacheNameIngress string
NetRPC NetRPC
Cache CacheGetter
ViewStore MaterializedViewStore
MaterializerDeps MaterializerDeps
CacheName string
UseStreamingBackend bool
}

type NetRPC interface {
Expand All @@ -26,10 +28,23 @@ type CacheGetter interface {
Notify(ctx context.Context, t string, r cache.Request, cID string, ch chan<- cache.UpdateEvent) error
}

type MaterializedViewStore interface {
Get(ctx context.Context, req submatview.Request) (submatview.Result, error)
Notify(ctx context.Context, req submatview.Request, cID string, ch chan<- cache.UpdateEvent) error
}

func (c *Client) ServiceNodes(
ctx context.Context,
req structs.ServiceSpecificRequest,
) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) {
if c.useStreaming(req) && (req.QueryOptions.UseCache || req.QueryOptions.MinQueryIndex > 0) {
result, err := c.ViewStore.Get(ctx, c.newServiceRequest(req))
if err != nil {
return structs.IndexedCheckServiceNodes{}, cache.ResultMeta{}, err
}
return *result.Value.(*structs.IndexedCheckServiceNodes), cache.ResultMeta{Index: result.Index}, err
}

out, md, err := c.getServiceNodes(ctx, req)
if err != nil {
return out, md, err
Expand All @@ -50,18 +65,12 @@ func (c *Client) getServiceNodes(
req structs.ServiceSpecificRequest,
) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) {
var out structs.IndexedCheckServiceNodes

if !req.QueryOptions.UseCache {
err := c.NetRPC.RPC("Health.ServiceNodes", &req, &out)
return out, cache.ResultMeta{}, err
}

cacheName := c.CacheName
if req.Ingress {
cacheName = c.CacheNameIngress
}

raw, md, err := c.Cache.Get(ctx, cacheName, &req)
raw, md, err := c.Cache.Get(ctx, c.CacheName, &req)
if err != nil {
return out, md, err
}
Expand All @@ -80,9 +89,55 @@ func (c *Client) Notify(
correlationID string,
ch chan<- cache.UpdateEvent,
) error {
cacheName := c.CacheName
if req.Ingress {
cacheName = c.CacheNameIngress
if c.useStreaming(req) {
sr := c.newServiceRequest(req)
return c.ViewStore.Notify(ctx, sr, correlationID, ch)
}

return c.Cache.Notify(ctx, c.CacheName, &req, correlationID, ch)
}

func (c *Client) useStreaming(req structs.ServiceSpecificRequest) bool {
return c.UseStreamingBackend && !req.Ingress && req.Source.Node == ""
}

func (c *Client) newServiceRequest(req structs.ServiceSpecificRequest) serviceRequest {
return serviceRequest{
ServiceSpecificRequest: req,
deps: c.MaterializerDeps,
}
}

// Close any underlying connections used by the client.
func (c *Client) Close() error {
if c == nil {
return nil
}
return c.MaterializerDeps.Conn.Close()
}

type serviceRequest struct {
structs.ServiceSpecificRequest
deps MaterializerDeps
}

func (r serviceRequest) CacheInfo() cache.RequestInfo {
return r.ServiceSpecificRequest.CacheInfo()
}

func (r serviceRequest) Type() string {
return "service-health"
}

func (r serviceRequest) NewMaterializer() (*submatview.Materializer, error) {
view, err := newHealthView(r.ServiceSpecificRequest)
if err != nil {
return nil, err
}
return c.Cache.Notify(ctx, cacheName, &req, correlationID, ch)
return submatview.NewMaterializer(submatview.Deps{
View: view,
Client: pbsubscribe.NewStateChangeSubscriptionClient(r.deps.Conn),
Logger: r.deps.Logger,
Request: newMaterializerRequest(r.ServiceSpecificRequest),
}), nil
}
Loading