-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
streaming: replace agent/cache with submatview.Store #10112
Conversation
🤔 This PR has changes in the |
47172b7
to
fc5cd56
Compare
@@ -15,5 +16,9 @@ type Deps struct { | |||
Tokens *token.Store | |||
Router *router.Router | |||
ConnPool *pool.ConnPool | |||
GRPCConnPool *grpc.ClientConnPool | |||
GRPCConnPool GRPCClientConner |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was done so that tests which use Agent.New
but don't actually need a gRPC connection pool can use a no-op fake.
useStreaming := s.agent.config.UseStreamingBackend && args.MinQueryIndex > 0 && !args.Ingress | ||
args.QueryOptions.UseCache = s.agent.config.HTTPUseCache && (args.QueryOptions.UseCache || useStreaming) | ||
|
||
if args.QueryOptions.UseCache && useStreaming && args.Source.Node != "" { | ||
return nil, BadRequestError{Reason: "'near' query param can not be used with streaming"} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic has moved into rpcclient/health.Client.useStreaming
so that all of the logic to determine which backend to use is in a single place.
fc5cd56
to
ab169bb
Compare
@@ -123,6 +126,10 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error) | |||
return d, nil | |||
} | |||
|
|||
// grpcLogInitOnce because the test suite will call NewBaseDeps in many tests and | |||
// causes data races when it is re-initialized. | |||
var grpcLogInitOnce sync.Once |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this mean that these logs are useless during tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it depends on how the logs are setup. If the logs are setup to go to stdout/stderr then it should work fine, they'll be attributed to the active test. If the logs are sent via t.Log
it might hide some.
Although not doing this doesn't really fix the issue either, because the gRPC logger is a global. So if multiple tests are run in parallel, we would send all of the logs to one test logger, and none to the other.
This is only the logs for the grpc client and server though, so probably not important for any tests of Consul behaviour.
7dc881d
to
cf4e666
Compare
"error", err, | ||
"request-type", req.Type(), | ||
"index", index) | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't this busy-loop since the index
value hasn't been bumped?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe so. This is an error case, so I think it's expected that index
doesn't change. getFromView
will block when it gets called again, waiting on either a new value, another error, or the timeout.
In Materializer
there is a retry with backoff on an error, so the updateCh
that getFromView
blocks on shouldn't get updated until that reports another error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that sounds right, perhaps a comment here could clarify that assumption that getFromView
will never return repeated errors without a delay/backoff so this is safe would be useful?
It's not needed here, but I was thinking if we did need to be defensive, adding a rate limit in this error case of the loop rather than exponentially backing off errors again might be a nice approach. That way if something else is already doing a backoff we won't add additional compounded delays here, but if there is a condition introduced later where the materializer could return errors without backing of then the rate limit will kick in to prevent this from becoming a busy loop?
I don't think we need to do that now, just thinking aloud.
|
||
ctx, cancel := context.WithCancel(context.Background()) | ||
go mat.Run(ctx) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should getEntry
remove the entry from the expiryHeap?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could, but it's not necessary the way it is written now. getEntry
increments the count of requests, and the expiry is ignored if requests > 0
. Alternatively we could remove things from the expiry heap instead of tracking requests. I wonder if that would be more expensive because of the need to modify the heap more often. Where as incrementing the request count is pretty cheap since we have a reference to it from the map.
And I guess in many cases there won't be an entry in the heap, because there are already active requests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I like this approach where the expiry heap only contains inactive entries rather than needing to maintain a heap over all entries and constantly update it to keep nudging back expiry on the active ones.
99% of the time I suspect all entries will be active so not needing to maintain a heap in steady state seems like a cleaner approach.
cf4e666
to
21034cc
Compare
Also fixes a minor data race in Materializer. Capture the error before releasing the lock.
Also rename it to readEntry now that it doesn't return the entire entry. Based on feedback in PR review, the full entry is not used by the caller, and accessing the fields wouldn't be safe outside the lock, so it is safer to return only the Materializer
Previous getFromView would call view.Result when the result may not have been returned (because the index is updated past the minIndex. This would allocate a slice and sort it for no reason, because the values would never be returned. Fix this by re-ordering the operations in getFromView. The test changes in this commit were an attempt to cover the case where an update is received but the index does not exceed the minIndex.
a07a43d
to
31cd580
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks awesome @dnephin 👏
I have a few minor comments inline, none are blocking though so see what you think!
@@ -0,0 +1,3 @@ | |||
```release-note:bug | |||
streaming: fixes a bug that would cause context cancellation errors when a cache entry expired while requests were active. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it's worth adding either in this entry or a separate one, an explicit reference/link to the scalability challenge and the reduction in the long tail of deliveries at scale? Not important though just a thought.
run(t, tc) | ||
}) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great testing pattern. I found this really easy to read and validate that we covered and asserted the right behaviour/cases!
@@ -32,7 +31,7 @@ type View interface { | |||
// separately and passed in in case the return type needs an Index field | |||
// populating. This allows implementations to not worry about maintaining | |||
// indexes seen during Update. | |||
Result(index uint64) (interface{}, error) | |||
Result(index uint64) interface{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess you removed the error as we were never using it, but I wonder how we handle propagating background fetching errors without it?
For example if the streaming connection can't be established, I'd assume that we'd want to 500 the HTTP request to the user rather than send them a blank 200 as if there was no data on the servers? How does the caller know the difference?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This interface is only for the view, which is effectively just a map with some filtering and sorting. There's no IO or network operations here. It seems unlikely that other View
implementations would need an error return for this, but we can always add it back if we do need it.
Even the error
return from View.Update
is arguably not necessary. The one place we error there would be a logic bug, and could panic.
// context.DeadlineExceeded is translated to nil to match the behaviour of | ||
// agent/cache.Cache.Get. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this could mask underlying network timeouts too?
I assume gRPC client code also uses contexts for things like connection timeout, if it did and there was an actual error like unable to connect to servers which caused a timeout, could we mistakenly mask that error?
I wonder if there is a way to be sure this is the timeout based on the user's requested blocking timeout vs a downstream network timeout that's propagated directly back through the calls? Right now this might never be possible but it makes me wonder if this could ever result in a subtle bug later?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From what I can tell request timeouts in gRPC are always set by the context, so there's no difference. Connect timeouts are reported using this error: https://github.com/grpc/grpc-go/blob/24d03d9f769106b3c96b4145244ce682999d3d88/internal/transport/transport.go#L713, so would not be context.DeadlineExceeded
.
I believe this is safe. I do think it is unfortunate, and it would be better to report this timeout to the user, but agent/cache masks these kinds of errors, so I thought for now it would be better to match the behaviour.
"error", err, | ||
"request-type", req.Type(), | ||
"index", index) | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that sounds right, perhaps a comment here could clarify that assumption that getFromView
will never return repeated errors without a delay/backoff so this is safe would be useful?
It's not needed here, but I was thinking if we did need to be defensive, adding a rate limit in this error case of the loop rather than exponentially backing off errors again might be a nice approach. That way if something else is already doing a backoff we won't add additional compounded delays here, but if there is a condition introduced later where the materializer could return errors without backing of then the rate limit will kick in to prevent this from becoming a busy loop?
I don't think we need to do that now, just thinking aloud.
|
||
ctx, cancel := context.WithCancel(context.Background()) | ||
go mat.Run(ctx) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I like this approach where the expiry heap only contains inactive entries rather than needing to maintain a heap over all entries and constantly update it to keep nudging back expiry on the active ones.
99% of the time I suspect all entries will be active so not needing to maintain a heap in steady state seems like a cleaner approach.
defer store.lock.Unlock() | ||
require.Len(t, store.byKey, 0) | ||
require.Equal(t, ttlcache.NotIndexed, e.expiry.Index()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these tests look great.
Possibly a TODO for later but what do you think about adding a slightly "fuzzy" test that starts up a bunch of concurrent Notify
calls and asserts reasonable behaviour for each one. The value in the test isn't so much covering specific anomalies or cases but just verifying the concurrent behaviour works (i.e. all clients see the update), and when run with -race
we have a better chance of catching any data race issues?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ya, that is an option. In my mind, if we were to do that, we should use a real backend not a fake one. It seems like a fake will never be close enough to the real behaviour that any such test wouldn't be meaningful with a fake.
Co-authored-by: Paul Banks <[email protected]>
674e2e4
to
3a27fce
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Test failures pass locally, so likely flakes. |
🍒 If backport labels were added before merging, cherry-picking will start automatically. To retroactively trigger a backport after merging, add backport labels and re-run https://circleci.com/gh/hashicorp/consul/358627. |
🍒 If backport labels were added before merging, cherry-picking will start automatically. To retroactively trigger a backport after merging, add backport labels and re-run https://circleci.com/gh/hashicorp/consul/358712. |
🍒✅ Cherry pick of commit 9b344b3 onto |
…m-cache streaming: replace agent/cache with submatview.Store
This PR replaces
agent/cache.Cache
in the streaming flow with a newsubmatview.Store
. The newStore
implements a similar interface, and consumes similar interfaces, but removes all the logic about fetching results, which is no longer necessary with streaming.Some notable differences:
Request
to specify the details about the type and how to initialize theMaterializer
if there is no existing entry.Materializer
(which shuts down the stream) is done synchronously with expiring the entry from the store. This prevents issues that we encountered in the cache where expiration would happen while background goroutines were still running.This PR is unfortunately a bit large, but hopefully it is mostly tests. This PR includes #10110 (which can be rebased out if that merges), and #10068 because it was not really possible to test those changes without first making this change.
TODO:
cache-type/streaming_health_services_test.go
tosubmatview/store_test.go
Agent.New
.Store.Get
andStore.Notify