-
Notifications
You must be signed in to change notification settings - Fork 455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add APIs for M3Aggregator placement #1030
Conversation
richardartoul
commented
Oct 6, 2018
•
edited
Loading
edited
- Add M3Agg placement endpoints and separate them out by path from the M3DB placement endpoints
- Run all tests for both services
- Update documentation for new and old endpoints
- Refactor service to accept options instead of parsing headers directly
Codecov Report
@@ Coverage Diff @@
## master #1030 +/- ##
=========================================
- Coverage 77.74% 77.6% -0.14%
=========================================
Files 532 532
Lines 45866 45566 -300
=========================================
- Hits 35659 35362 -297
- Misses 7950 7962 +12
+ Partials 2257 2242 -15
Continue to review full report at Codecov.
|
// DefaultServiceEnvironment is the default service ID environment | ||
DefaultServiceEnvironment = "default_env" | ||
// DefaultServiceZone is the default service ID zone | ||
DefaultServiceZone = "embedded" | ||
// HeaderClusterServiceName is the header used to specify the service name. | ||
HeaderClusterServiceName = "Cluster-Service-Name" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why remove these headers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm moving the service to the URL itself. I'm gonna add back the zone/environment headers.
errUnableToParseService = errors.New("unable to parse service") | ||
|
||
// AllowedServiceNames is the list of allowed service names | ||
AllowedServiceNames = []string{M3DBServiceName, M3AggServiceName} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be private? also maybe just use a map?
DeleteAllURL = handler.RoutePrefixV1 + "/placement" | ||
// OldM3DBDeleteAllURL is the old url for the handler to delete all placements, maintained | ||
// for backwards compatibility. | ||
OldM3DBDeleteAllURL = handler.RoutePrefixV1 + "/placement" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, this does not contain delete
in it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nope, just DELETE method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: DeprecatedM3DBDeleteAllURL
@@ -21,3 +21,9 @@ message PlacementAddRequest { | |||
// are AVAILABLE for all their shards. force overrides that. | |||
bool force = 2; | |||
} | |||
|
|||
message AggPlacementInitRequest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just reuse PlacementInitRequest
for this? They should always be the same
assert.Equal(t, initTestInvalidRequestResponse, string(body)) | ||
} | ||
|
||
func TestAggPlacementInitHandler(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: Fix this test
src/query/util/logging/log.go
Outdated
@@ -123,3 +123,22 @@ func WithResponseTimeLogging(next http.Handler) http.Handler { | |||
} | |||
}) | |||
} | |||
|
|||
// WithServiceNameResponseTimeLogging wraps around the given handler, providing response time logging | |||
func WithServiceNameResponseTimeLogging( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: Rename this
) | ||
pOpts = pOpts. | ||
SetIsMirrored(true). | ||
// TODO(rartoul): Do we need to set placement cutover time? Seems like that would |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using immediate for placement cutover is fine, but I'd rather we copy all the logic from r2admin here to keep them in sync.
For example, the placementCutoff
time is calculated as
now.
Add(h.cfg.Placement.Clock.MaxPositiveSkew).
Add(h.cfg.Placement.Clock.MaxNegativeSkew).
Add(h.cfg.PropagationDelay)
in r2admin, we could just set all those three values as 0 to achieve immediate placement cutover.
Keeping the logic the same makes debugging much simpler and I could see us migrating r2admin/m3admin to call these endpoints here if we just run them with production configs.
|
||
// M3DBAddURL is the url for the placement add handler (with the POST method) | ||
// for the M3DB service. | ||
M3DBAddURL = handler.RoutePrefixV1 + "/services/m3db/placement" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, are we worried about breaking all our guides/etc? I guess it's ok, at some point we'll probably have to version our API docs similar to ElasticSearch, etc so that we can go and see what the APIs were at a specific version (for users still running an older version).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see, you register the old one too. Sounds good.
Do you want to call OldM3DBAddURL
perhaps DeprecatedM3DBAddURL
? Just as it's a bit more clear that it's active but deprecated, rather than old which is ever so slightly not as clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it better to write:
M3DBAddURL = handler.RoutePrefixV1 + "/services/" + M3DBServiceName + "/placement"
in case you update the service in one place and forgot the others.
Same goes to the m3agg url as well. Also nit: use m3aggregator
instead of m3agg
everywhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also consider making a const for "/services/" + M3DBServiceName + "/placement"
, I see that string being reused in a lot of places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed can we break up the consts?
|
||
// M3AggAddURL is the url for the placement add handler (with the POST method) | ||
// for the M3Agg service. | ||
M3AggAddURL = handler.RoutePrefixV1 + "/services/m3db/placement" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this perhaps be /services/m3aggregator/placement
or similar?
@@ -115,7 +131,17 @@ func (h *AddHandler) Add( | |||
return nil, err | |||
} | |||
|
|||
service, algo, err := ServiceWithAlgo(h.client, httpReq.Header) | |||
serviceOpts := NewServiceOptionsFromHeaders(serviceName, httpReq.Header) | |||
if serviceName == M3AggServiceName { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Maybe use a switch here so when we add further specialization later it's straightforward where to make the change (and we don't use an if/else if)?
service, algo, err := ServiceWithAlgo(h.client, httpReq.Header) | ||
serviceOpts := NewServiceOptionsFromHeaders(serviceName, httpReq.Header) | ||
if serviceName == M3AggServiceName { | ||
if req.MaxAggregationWindowSizeNanos == 0 || req.WarmupDurationNanos == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps use <= 0
? Just because a negative value is invalid too I assume.
} | ||
serviceOpts.M3Agg = &M3AggServiceOptions{ | ||
MaxAggregationWindowSize: time.Duration(req.MaxAggregationWindowSizeNanos), | ||
WarmupDuration: time.Duration(req.WarmupDurationNanos), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we not have a default warmup duration and max aggregation window size if they don't specify any? I wouldn't have a good idea of what to choose for either of these without reading something first, and most users could probably change this later/shouldn't need to worry too much about these values perhaps?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could these be configured instead of request based? I don't think they should change per request
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can probably have a sane default for warmupDuration, but maxAggregationWindowSize is very difficult to pick a sane default value for because if it’s too short they could lose data and if it’s too long their deploy takes forever
req *http.Request | ||
) | ||
if serviceName == M3AggServiceName { | ||
req = httptest.NewRequest(AddHTTPMethod, M3DBAddURL, strings.NewReader("{\"instances\":[], \"max_aggregation_window_size_nanos\": 1000000, \"warmup_duration_nanos\": 1000000}")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want to use back ticks to avoid having to escape the quotes? Looks much more readable, e.g.:
strings.NewReader(`{"instances": [], "max_aggregation_window_size_nanos": 1000000, "warmup_duration_nanos": 1000000}"`)
return func(s shard.Shard) error { | ||
switch s.State() { | ||
case shard.Initializing: | ||
if s.CutoverNanos() > now.UnixNano() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm @cw9 is this what's expected? I though that cutover times always had to be in the future (inverse of this check)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is expected, this lambda is called before marking an intializing shard as available, so we don't allow users to mark the shard if now
is before its CutoverNanos
case shard.Leaving: | ||
// TODO(rartoul): This seems overly cautious, basically it requires an entire maxAggregationWindowSize | ||
// to elapse before "leaving" shards can be cleaned up. | ||
if s.CutoffNanos() > now.UnixNano()-maxAggregationWindowSize.Nanoseconds() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another query here, should it not be cutoff has to be greater than now+maxAggregationWindowSize
rather than equal or less?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar to cutover logic, this is called to when a leaving shard is being removed(when marking its corresponding initializing shard), this logic checks if the time has passed this shard's CutoffNanos
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To answer Richie's TODO question, yes it's overly cautious, but let's just keep the logic the same as r2admin for now.
} | ||
} | ||
|
||
func parseServiceFromRequest(r *http.Request) (string, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, seems like a regexp might be slightly cleaner here? shrug
DeleteURL = fmt.Sprintf("%s/placement/{%s}", handler.RoutePrefixV1, placementIDVar) | ||
// OldM3DBDeleteURL is the old url for the placement delete handler, maintained | ||
// for backwards compatibility. | ||
OldM3DBDeleteURL = fmt.Sprintf("%s/placement/{%s}", handler.RoutePrefixV1, placementIDVar) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: DeprecatedM3DBDeleteURL
?
// require.Equal(t, http.StatusOK, resp.StatusCode) | ||
// if serviceName == M3AggServiceName { | ||
|
||
// } else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still need the if/else commented out code?
Also: perhaps use backticks here for these literal requests?
GetURL = handler.RoutePrefixV1 + "/placement" | ||
// OldM3DBGetURL is the old url for the placement get handler, maintained for | ||
// backwards compatibility. | ||
OldM3DBGetURL = handler.RoutePrefixV1 + "/placement" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: DeprecatedM3DBGetURL
?
|
||
const placementJSON = "{\"placement\":{\"instances\":{\"host1\":{\"id\":\"host1\",\"isolationGroup\":\"rack1\",\"zone\":\"test\",\"weight\":1,\"endpoint\":\"http://host1:1234\",\"shards\":[],\"shardSetId\":0,\"hostname\":\"host1\",\"port\":1234},\"host2\":{\"id\":\"host2\",\"isolationGroup\":\"rack1\",\"zone\":\"test\",\"weight\":1,\"endpoint\":\"http://host2:1234\",\"shards\":[],\"shardSetId\":0,\"hostname\":\"host2\",\"port\":1234}},\"replicaFactor\":0,\"numShards\":0,\"isSharded\":false,\"cutoverTime\":\"0\",\"isMirrored\":false,\"maxShardSetId\":0},\"version\":%d}" | ||
const placementJSON = "{\"placement\":{\"instances\":{\"host1\":{\"id\":\"host1\",\"isolationGroup\":\"rack1\",\"zone\":\"test\",\"weight\":1,\"endpoint\":\"http://host1:1234\",\"shards\":[],\"shardSetId\":0,\"hostname\":\"host1\",\"port\":1234},\"host2\":{\"id\":\"host2\",\"isolationGroup\":\"rack1\",\"zone\":\"test\",\"weight\":1,\"endpoint\":\"http://host2:1234\",\"shards\":[],\"shardSetId\":0,\"hostname\":\"host2\",\"port\":1234}},\"replicaFactor\":0,\"numShards\":0,\"isSharded\":false,\"cutoverTime\":\"0\",\"isMirrored\":false,\"maxShardSetId\":0},\"version\":%d}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Backticks for literal JSON.
InitURL = handler.RoutePrefixV1 + "/placement/init" | ||
// OldM3DBInitURL is the old url for the placement init handler, maintained for backwards | ||
// compatibility. (with the POST method). | ||
OldM3DBInitURL = handler.RoutePrefixV1 + "/placement/init" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: DeprecatedM3DBInitURL
?
serviceOpts := NewServiceOptionsFromHeaders(serviceName, httpReq.Header) | ||
if serviceName == M3AggServiceName { | ||
if req.MaxAggregationWindowSizeNanos == 0 || req.WarmupDurationNanos == 0 { | ||
return nil, errAggWindowAndWarmupMustBeSet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm seems this code is duplicated elsewhere? Can we reuse? Also same question about whether we can have defaults for these.
@@ -8,6 +8,10 @@ message PlacementInitRequest { | |||
repeated placementpb.Instance instances = 1; | |||
int32 num_shards = 2; | |||
int32 replication_factor = 3; | |||
|
|||
// M3Agg placements only |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should place these together, i.e.
message M3AggregatorPlacementOptions {
uint64 max_aggregation_window_size_nanos = 1;
uint64 warmup_duration_nanos = 2;
}
message PlacementInitRequest {
repeated placementpb.Instance instances = 1;
int32 num_shards = 2;
int32 replication_factor = 3;
M3AggregatorPlacementOptions aggregator_options = 4;
}
message PlacementAddRequest {
repeated placementpb.Instance instances = 1;
bool force = 2;
M3AggregatorPlacementOptions aggregator_options = 4;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any way we can avoid passing M3AggregatorPlacementOptions
in request? I think it can be configured in the handler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is that we can only guess at the correct value. They may not have all the namespaces configured on this coordinator, or the endpoints may be embedded in a different service that doesn't have the info....lets talk offline
} | ||
serviceOpts.M3Agg = &M3AggServiceOptions{ | ||
MaxAggregationWindowSize: time.Duration(req.MaxAggregationWindowSizeNanos), | ||
WarmupDuration: time.Duration(req.WarmupDurationNanos), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could these be configured instead of request based? I don't think they should change per request
SetValidZone(opts.ServiceZone). | ||
SetIsSharded(true). | ||
// Can use goal-based placement for both M3DB and M3Agg | ||
SetIsStaged(false). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't use goal-based for m3agg yet, m3agg expect the protobuf to be stored as staged, so it parses the proto differently than m3db
) | ||
pOpts = pOpts. | ||
SetIsMirrored(true). | ||
// TODO(rartoul): Do we need to set placement cutover time? Seems like that would |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using immediate for placement cutover is fine, but I'd rather we copy all the logic from r2admin here to keep them in sync.
For example, the placementCutoff
time is calculated as
now.
Add(h.cfg.Placement.Clock.MaxPositiveSkew).
Add(h.cfg.Placement.Clock.MaxNegativeSkew).
Add(h.cfg.PropagationDelay)
in r2admin, we could just set all those three values as 0 to achieve immediate placement cutover.
Keeping the logic the same makes debugging much simpler and I could see us migrating r2admin/m3admin to call these endpoints here if we just run them with production configs.
return func(s shard.Shard) error { | ||
switch s.State() { | ||
case shard.Initializing: | ||
if s.CutoverNanos() > now.UnixNano() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is expected, this lambda is called before marking an intializing shard as available, so we don't allow users to mark the shard if now
is before its CutoverNanos
case shard.Leaving: | ||
// TODO(rartoul): This seems overly cautious, basically it requires an entire maxAggregationWindowSize | ||
// to elapse before "leaving" shards can be cleaned up. | ||
if s.CutoffNanos() > now.UnixNano()-maxAggregationWindowSize.Nanoseconds() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar to cutover logic, this is called to when a leaving shard is being removed(when marking its corresponding initializing shard), this logic checks if the time has passed this shard's CutoffNanos
case shard.Leaving: | ||
// TODO(rartoul): This seems overly cautious, basically it requires an entire maxAggregationWindowSize | ||
// to elapse before "leaving" shards can be cleaned up. | ||
if s.CutoffNanos() > now.UnixNano()-maxAggregationWindowSize.Nanoseconds() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To answer Richie's TODO question, yes it's overly cautious, but let's just keep the logic the same as r2admin for now.
@@ -8,6 +8,10 @@ message PlacementInitRequest { | |||
repeated placementpb.Instance instances = 1; | |||
int32 num_shards = 2; | |||
int32 replication_factor = 3; | |||
|
|||
// M3Agg placements only |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any way we can avoid passing M3AggregatorPlacementOptions
in request? I think it can be configured in the handler
acb4174
to
06f779c
Compare
|
||
// M3DBAddURL is the url for the placement add handler (with the POST method) | ||
// for the M3DB service. | ||
M3DBAddURL = handler.RoutePrefixV1 + "/services/m3db/placement" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it better to write:
M3DBAddURL = handler.RoutePrefixV1 + "/services/" + M3DBServiceName + "/placement"
in case you update the service in one place and forgot the others.
Same goes to the m3agg url as well. Also nit: use m3aggregator
instead of m3agg
everywhere?
SetValidZone(opts.ServiceZone). | ||
SetIsSharded(true). | ||
// M3Agg expects a staged placement. | ||
SetIsStaged(true). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this shared by m3db as well? should use isStaged=false
for m3db and isStaged=true
for m3agg?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not shared, this is inside a if m3agg switch statement
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh no its not you're right, will fix
r.HandleFunc(DeleteAllURL, logged(NewDeleteAllHandler(client, cfg)).ServeHTTP).Methods(DeleteAllHTTPMethod) | ||
r.HandleFunc(AddURL, logged(NewAddHandler(client, cfg)).ServeHTTP).Methods(AddHTTPMethod) | ||
r.HandleFunc(DeleteURL, logged(NewDeleteHandler(client, cfg)).ServeHTTP).Methods(DeleteHTTPMethod) | ||
type placementCutoverOpts struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename to m3aggregatorPlacementOpts
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// this by adding the warmup time and the max aggregation window size to the current time, and then truncating | ||
// to the max aggregation window size. This ensure that we always return a time that is at the beginning of an | ||
// aggregation window size, but is also later than now.Add(warmup). | ||
func newShardCutOffNanosFn(now time.Time, maxAggregationWindowSize, warmup time.Duration) placement.TimeNanosFn { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just call shardCutoverNanos like r2admin
?
|
||
// M3DBAddURL is the url for the placement add handler (with the POST method) | ||
// for the M3DB service. | ||
M3DBAddURL = handler.RoutePrefixV1 + "/services/m3db/placement" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also consider making a const for "/services/" + M3DBServiceName + "/placement"
, I see that string being reused in a lot of places.
|
||
// M3DBDeleteAllURL is the url for the handler to delete all placements (with the DELETE method) | ||
// for the M3DB service. | ||
M3DBDeleteAllURL = handler.RoutePrefixV1 + "/services/m3db/placement" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be reused from somewhere yeah? "/services/m3db/placement"
is rewritten a fair bit.
d4093d6
to
570fc35
Compare