Skip to content

Commit

Permalink
Merge pull request #10112 from hashicorp/dnephin/remove-streaming-fro…
Browse files Browse the repository at this point in the history
…m-cache

streaming: replace agent/cache with submatview.Store
  • Loading branch information
dnephin authored and hc-github-team-consul-core committed Apr 28, 2021
1 parent 8cc2d3e commit 798953f
Show file tree
Hide file tree
Showing 21 changed files with 1,720 additions and 656 deletions.
3 changes: 3 additions & 0 deletions .changelog/10112.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
streaming: fixes a bug that would cause context cancellation errors when a cache entry expired while requests were active.
```
25 changes: 17 additions & 8 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,15 +373,21 @@ func New(bd BaseDeps) (*Agent, error) {
cache: bd.Cache,
}

cacheName := cachetype.HealthServicesName
if bd.RuntimeConfig.UseStreamingBackend {
cacheName = cachetype.StreamingHealthServicesName
// TODO: create rpcClientHealth in BaseDeps once NetRPC is available without Agent
conn, err := bd.GRPCConnPool.ClientConn(bd.RuntimeConfig.Datacenter)
if err != nil {
return nil, err
}

a.rpcClientHealth = &health.Client{
Cache: bd.Cache,
NetRPC: &a,
CacheName: cacheName,
CacheNameIngress: cachetype.HealthServicesName,
Cache: bd.Cache,
NetRPC: &a,
CacheName: cachetype.HealthServicesName,
ViewStore: bd.ViewStore,
MaterializerDeps: health.MaterializerDeps{
Conn: conn,
Logger: bd.Logger.Named("rpcclient.health"),
},
}

a.serviceManager = NewServiceManager(&a)
Expand Down Expand Up @@ -533,14 +539,15 @@ func (a *Agent) Start(ctx context.Context) error {
return fmt.Errorf("unexpected ACL default policy value of %q", a.config.ACLDefaultPolicy)
}

go a.baseDeps.ViewStore.Run(&lib.StopChannelContext{StopCh: a.shutdownCh})

// Start the proxy config manager.
a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{
Cache: a.cache,
Health: a.rpcClientHealth,
Logger: a.logger.Named(logging.ProxyConfig),
State: a.State,
Source: &structs.QuerySource{
Node: a.config.NodeName,
Datacenter: a.config.Datacenter,
Segment: a.config.SegmentName,
},
Expand Down Expand Up @@ -1385,6 +1392,8 @@ func (a *Agent) ShutdownAgent() error {
a.cache.Close()
}

a.rpcClientHealth.Close()

var err error
if a.delegate != nil {
err = a.delegate.Shutdown()
Expand Down
9 changes: 9 additions & 0 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"gopkg.in/square/go-jose.v2/jwt"

"github.com/hashicorp/consul/agent/cache"
Expand Down Expand Up @@ -307,6 +308,7 @@ func TestAgent_HTTPMaxHeaderBytes(t *testing.T) {
Logger: hclog.NewInterceptLogger(nil),
Tokens: new(token.Store),
TLSConfigurator: tlsConf,
GRPCConnPool: &fakeGRPCConnPool{},
},
RuntimeConfig: &config.RuntimeConfig{
HTTPAddrs: []net.Addr{
Expand Down Expand Up @@ -355,6 +357,12 @@ func TestAgent_HTTPMaxHeaderBytes(t *testing.T) {
}
}

type fakeGRPCConnPool struct{}

func (f fakeGRPCConnPool) ClientConn(_ string) (*grpc.ClientConn, error) {
return nil, nil
}

func TestAgent_ReconnectConfigWanDisabled(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
Expand Down Expand Up @@ -5173,6 +5181,7 @@ func TestAgent_ListenHTTP_MultipleAddresses(t *testing.T) {
Logger: hclog.NewInterceptLogger(nil),
Tokens: new(token.Store),
TLSConfigurator: tlsConf,
GRPCConnPool: &fakeGRPCConnPool{},
},
RuntimeConfig: &config.RuntimeConfig{
HTTPAddrs: []net.Addr{
Expand Down
66 changes: 0 additions & 66 deletions agent/cache-types/streaming_test.go

This file was deleted.

11 changes: 8 additions & 3 deletions agent/consul/options.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package consul

import (
"github.com/hashicorp/consul/agent/grpc"
"github.com/hashicorp/go-hclog"
"google.golang.org/grpc"

"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/go-hclog"
)

type Deps struct {
Expand All @@ -15,5 +16,9 @@ type Deps struct {
Tokens *token.Store
Router *router.Router
ConnPool *pool.ConnPool
GRPCConnPool *grpc.ClientConnPool
GRPCConnPool GRPCClientConner
}

type GRPCClientConner interface {
ClientConn(datacenter string) (*grpc.ClientConn, error)
}
3 changes: 1 addition & 2 deletions agent/consul/subscribe_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@ import (

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/stream"
agentgrpc "github.com/hashicorp/consul/agent/grpc"
"github.com/hashicorp/consul/agent/rpc/subscribe"
"github.com/hashicorp/consul/agent/structs"
)

type subscribeBackend struct {
srv *Server
connPool *agentgrpc.ClientConnPool
connPool GRPCClientConner
}

// TODO: refactor Resolve methods to an ACLBackend that can be used by all
Expand Down
7 changes: 0 additions & 7 deletions agent/health_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,6 @@ func (s *HTTPHandlers) healthServiceNodes(resp http.ResponseWriter, req *http.Re
return nil, nil
}

useStreaming := s.agent.config.UseStreamingBackend && args.MinQueryIndex > 0 && !args.Ingress
args.QueryOptions.UseCache = s.agent.config.HTTPUseCache && (args.QueryOptions.UseCache || useStreaming)

if args.QueryOptions.UseCache && useStreaming && args.Source.Node != "" {
return nil, BadRequestError{Reason: "'near' query param can not be used with streaming"}
}

out, md, err := s.agent.rpcClientHealth.ServiceNodes(req.Context(), args)
if err != nil {
return nil, err
Expand Down
5 changes: 1 addition & 4 deletions agent/proxycfg/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,10 +352,7 @@ func testManager_BasicLifecycle(
require := require.New(t)
logger := testutil.Logger(t)
state := local.NewState(local.Config{}, logger, &token.Store{})
source := &structs.QuerySource{
Node: "node1",
Datacenter: "dc1",
}
source := &structs.QuerySource{Datacenter: "dc1"}

// Stub state syncing
state.TriggerSyncChanges = func() {}
Expand Down
91 changes: 73 additions & 18 deletions agent/rpcclient/health/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@ import (

"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/submatview"
"github.com/hashicorp/consul/proto/pbsubscribe"
)

// Client provides access to service health data.
type Client struct {
NetRPC NetRPC
Cache CacheGetter
// CacheName to use for service health.
CacheName string
// CacheNameIngress is the name of the cache type to use for ingress
// service health.
CacheNameIngress string
NetRPC NetRPC
Cache CacheGetter
ViewStore MaterializedViewStore
MaterializerDeps MaterializerDeps
CacheName string
UseStreamingBackend bool
}

type NetRPC interface {
Expand All @@ -26,10 +28,23 @@ type CacheGetter interface {
Notify(ctx context.Context, t string, r cache.Request, cID string, ch chan<- cache.UpdateEvent) error
}

type MaterializedViewStore interface {
Get(ctx context.Context, req submatview.Request) (submatview.Result, error)
Notify(ctx context.Context, req submatview.Request, cID string, ch chan<- cache.UpdateEvent) error
}

func (c *Client) ServiceNodes(
ctx context.Context,
req structs.ServiceSpecificRequest,
) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) {
if c.useStreaming(req) && (req.QueryOptions.UseCache || req.QueryOptions.MinQueryIndex > 0) {
result, err := c.ViewStore.Get(ctx, c.newServiceRequest(req))
if err != nil {
return structs.IndexedCheckServiceNodes{}, cache.ResultMeta{}, err
}
return *result.Value.(*structs.IndexedCheckServiceNodes), cache.ResultMeta{Index: result.Index}, err
}

out, md, err := c.getServiceNodes(ctx, req)
if err != nil {
return out, md, err
Expand All @@ -50,18 +65,12 @@ func (c *Client) getServiceNodes(
req structs.ServiceSpecificRequest,
) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) {
var out structs.IndexedCheckServiceNodes

if !req.QueryOptions.UseCache {
err := c.NetRPC.RPC("Health.ServiceNodes", &req, &out)
return out, cache.ResultMeta{}, err
}

cacheName := c.CacheName
if req.Ingress {
cacheName = c.CacheNameIngress
}

raw, md, err := c.Cache.Get(ctx, cacheName, &req)
raw, md, err := c.Cache.Get(ctx, c.CacheName, &req)
if err != nil {
return out, md, err
}
Expand All @@ -80,9 +89,55 @@ func (c *Client) Notify(
correlationID string,
ch chan<- cache.UpdateEvent,
) error {
cacheName := c.CacheName
if req.Ingress {
cacheName = c.CacheNameIngress
if c.useStreaming(req) {
sr := c.newServiceRequest(req)
return c.ViewStore.Notify(ctx, sr, correlationID, ch)
}

return c.Cache.Notify(ctx, c.CacheName, &req, correlationID, ch)
}

func (c *Client) useStreaming(req structs.ServiceSpecificRequest) bool {
return c.UseStreamingBackend && !req.Ingress && req.Source.Node == ""
}

func (c *Client) newServiceRequest(req structs.ServiceSpecificRequest) serviceRequest {
return serviceRequest{
ServiceSpecificRequest: req,
deps: c.MaterializerDeps,
}
}

// Close any underlying connections used by the client.
func (c *Client) Close() error {
if c == nil {
return nil
}
return c.MaterializerDeps.Conn.Close()
}

type serviceRequest struct {
structs.ServiceSpecificRequest
deps MaterializerDeps
}

func (r serviceRequest) CacheInfo() cache.RequestInfo {
return r.ServiceSpecificRequest.CacheInfo()
}

func (r serviceRequest) Type() string {
return "service-health"
}

func (r serviceRequest) NewMaterializer() (*submatview.Materializer, error) {
view, err := newHealthView(r.ServiceSpecificRequest)
if err != nil {
return nil, err
}
return c.Cache.Notify(ctx, cacheName, &req, correlationID, ch)
return submatview.NewMaterializer(submatview.Deps{
View: view,
Client: pbsubscribe.NewStateChangeSubscriptionClient(r.deps.Conn),
Logger: r.deps.Logger,
Request: newMaterializerRequest(r.ServiceSpecificRequest),
}), nil
}
Loading

0 comments on commit 798953f

Please sign in to comment.