Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New chunk external key v12 #4857

Closed

Conversation

JordanRushing
Copy link
Contributor

@JordanRushing JordanRushing commented Dec 1, 2021

Signed-off-by: Jordan Rushing [email protected]

What this PR does / why we need it:
This PR introduces a new ExternalKey for chunks with (2) additional object prefixes (time period, shard). These new prefixes are configured as chunk.PeriodConfig fields in schema v12.

Adding prefixes to chunk keys will allow for more efficient parallelization of read and write operations when using AWS S3 as rate limits are applied on a per-prefix basis.

ExternalKey construction also moved from chunk.Chunk to chunk.SchemaConfig (parsing is still in chunk.Chunk) to simplify reasoning about the differences in external keys based on the Schema moving forward.

Example schema_config with new schema v12 and chunk prefix fields

schema_config:
  configs:
    - from: 2020-10-24
      store: boltdb-shipper
      object_store: filesystem
      schema: v12
      index:
        prefix: index_
        period: 24h
      chunk_path_period: 1m
      chunk_path_shard_factor: 2

Example new chunk key with additional prefixes

fake/34bad90b/1/92785331009eb1cb/17d6da7a975:17d6daf4860:1bb871f9

Which issue(s) this PR fixes:
N/A

Special notes for your reviewer:
This PR supersedes the now-closed #4819.

Checklist

  • Documentation added
  • Tests updated
  • Add an entry in the CHANGELOG.md about the changes.

owen-d and others added 2 commits December 1, 2021 16:32
…nchmark

Signed-off-by: Jordan Rushing <[email protected]>

Fix `TestGrpcStore` to work with new chunk key structure

Signed-off-by: Jordan Rushing <[email protected]>

Add a function to SchemaConfig that returns the correct PeriodConfig for
a given time.

Signed-off-by: Callum Styan <[email protected]>

Add schema config to object client so it we can use the right external
key function based on the schema version.

Signed-off-by: Callum Styan <[email protected]>

Fix failing compactor tests by passing SchemaConfig to chunkClient

Remove empty conditional from SchemaForTime

Define schema v12; wire-in ChunkPathShardFactor and ChunkPathPeriod as configurable values

Add PeriodConfig test steps for new values ChunkPathShardFactor and ChunkPathPeriod in v12

Update test for chunk.NewExternalKey(); remove completed TODO

Set defaults for new ChunkPathPeriod and ChunkPathShardFactor SchemaConfig values; change FSObjectClient to use IdentityEncoder instead of Base64Encoder

Use IdentityEncoder everywhere we use FSObjectClient for Chunks

Add ExternalKey() function to SchemaConfig; update ObjectClient for Chunks
schemaConfig.ExternalKey change.

Signed-off-by: Callum Styan <[email protected]>
@pull-request-size pull-request-size bot added size/XL and removed size/L labels Dec 2, 2021
Signed-off-by: Jordan Rushing <[email protected]>
Signed-off-by: Jordan Rushing <[email protected]>
//
// Post-v12, (2) additional prefixes were added to external keys
// to support better read and write request parallelization:
// `<user>/<period>/<shard>/<fprint>/<start>:<end>:<checksum>`
func ParseExternalKey(userID, externalKey string) (Chunk, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeschkies raised a great question about this function in an earlier iteration of this PR:

I've been there. May I suggest to add a key version prefix or suffix? E.g.

keyv1:<user id>/<fingerprint>:<start time>:<end time>:<checksum>

Unless you are really sure that the key schema will never change again.

I think this is a fair point as right now our process for parsing relies on the structure of the key as opposed to a specific identifier to understand how to proceed.

Now that we've separated out the key construction from the parsing (we did this so that there was less inter-dependence between the ObjectClient and the Schema), we considered what a refactor would look like if we wanted to introduce yet another key structure. It seems that either way we would either be adding, removing, or reordering prefixes which all would require changes to this conditional and our associated parsing functions.

Because of this, routing like we are in this function seems to make the most sense but I'm open to comment here if there is a better way.

Copy link
Member

@owen-d owen-d Dec 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point. It would be even better if we could indicate a version without additional data in the key, which helps to minimize the index. I see a few immediate options.

  1. The most obvious candidate to me is to couple the ParseExternalKey with a schema itself to ensure compatibility. This is the same idea that we're introducing in this PR already of going (Schema, Chunk) -> Key, albeit inverted to (Key, Schema) -> Chunk. Enforcing this in code gives us guarantees as the codebase evolves, but requires threading this concept through the read path. Note: This could also be done in a followup PR for simplicity.
    Edit: This allows us to leave the index size unchanged, since <period> and <shard> need not be in the index itself, but could be calculated when translating to object storage. This gains the benefits without increasing index size. The key could still be <user>/<fprint>/<start>:<end>:<checksum> in the index, but be calculated and stored/retrieved as <user>/<period>/<shard>/<fprint>/<start>:<end>:<checksum> in the storage layer since both <period> and <shard> can be calculated from PeriodConfig + Chunk.
  2. Alternatively, we can keep what you have here, which doesn't age as nicely, but works. This may be the better path forward as we're already talking how to refactor our storage layer and doing a lot of work to make this code safer/ergonomic now may be wasted in the long run.
  3. We can add a signaling byte in the chunk key for the format used, but I'm less inclined towards this idea as it unnecessarily increases index size.

I'd like to see what @cyriltovena thinks, but I'm leaning towards leaving it this way for now (option 2).

}

// pre-checksum
func (cfg SchemaConfig) legacyExternalKey(chunk Chunk) string {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like there might be a bug in Loki currently where we prepend UserID to legacy keys but never successfully parse them. I discovered this with the introduction of new tests to cover all (3) variations of external key.

I don't quite understand the context here via the comment so if there is something I'm missing by removing this please chime in.

Existing legacy key creation vs. legacy key parsing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likely possible, you can send a separate PR if needed.

Signed-off-by: Jordan Rushing <[email protected]>
@JordanRushing JordanRushing marked this pull request as ready for review December 8, 2021 19:17
@JordanRushing JordanRushing requested a review from a team as a code owner December 8, 2021 19:17
Signed-off-by: Jordan Rushing <[email protected]>
Copy link
Contributor

@cyriltovena cyriltovena left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR looks good.

However it does come with two drawbacks:

  • The key has 12 bytes more and that can increase on index size
  • The code become complex as we require more dependency. (schema)
  • Require more configuration that is not easy to grasp from a user perspective

I wonder if just adding the series ID would have been enough (and if not no why not use the day period from the chunk without any new configuration).

<userid>/<finguerprint>/from:to:checksum seems enough to me.

Alternatively <userid>/<day_period>/<finguerprint>/from:to:checksum

Have we tested this simple solution against the other ? I think if we could prove that the current implementation is by far better I would feel more confident with the change.

Assuming the rate limit is at 5500, this would already give 660GB/s per series.

Copy link
Member

@owen-d owen-d left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work. This is a large undertaking and it's off to a great start.

Let me see if I can address @cyriltovena's concerns:

  • The key has 12 bytes more and that can increase on index size
  • The code become complex as we require more dependency. (schema)
  • Require more configuration that is not easy to grasp from a user perspective

These are all great points. Starting with

The code become complex as we require more dependency. (schema)

Hardcoding some of these configurable options could certainly make the code simpler, but it's conceivable some of these limits may prove an issue and I think the configurability is worth the addt'l code complexity. For instance, if we didn't include a shard_factor, according to the napkin math max_throughput = rate_limit * target_chunk_size * rep_factor * dedupe_ratio, we'll max out at 3500 * 1.5e6 * 3 * 0.5 =~ 7.9GB/s/tenant. We could also tune these higher and still hardcode them, but since we've already got a lot of this code done, I'm happy to keep it.

Re

  • Require more configuration that is not easy to grasp from a user perspective

I think sensible defaults mean that only users who need the control are required to understand it, although there will be a few new knobs in the configuration.md.

Re

  • The key has 12 bytes more and that can increase on index size

This is a really good point and not one that I'd thought a ton about. I don't think we need to store it in the index itself since it's calculable with a combination of key + period. I've gone more into depth on this point in a previous comment on this PR.

I've left some nits and I'd like us to come to some consensus on moving ParseExternalKey into BaseSchema or not (see earlier comment on this review).
@cyriltovena @JordanRushing WDYT?

//
// Post-v12, (2) additional prefixes were added to external keys
// to support better read and write request parallelization:
// `<user>/<period>/<shard>/<fprint>/<start>:<end>:<checksum>`
func ParseExternalKey(userID, externalKey string) (Chunk, error) {
Copy link
Member

@owen-d owen-d Dec 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point. It would be even better if we could indicate a version without additional data in the key, which helps to minimize the index. I see a few immediate options.

  1. The most obvious candidate to me is to couple the ParseExternalKey with a schema itself to ensure compatibility. This is the same idea that we're introducing in this PR already of going (Schema, Chunk) -> Key, albeit inverted to (Key, Schema) -> Chunk. Enforcing this in code gives us guarantees as the codebase evolves, but requires threading this concept through the read path. Note: This could also be done in a followup PR for simplicity.
    Edit: This allows us to leave the index size unchanged, since <period> and <shard> need not be in the index itself, but could be calculated when translating to object storage. This gains the benefits without increasing index size. The key could still be <user>/<fprint>/<start>:<end>:<checksum> in the index, but be calculated and stored/retrieved as <user>/<period>/<shard>/<fprint>/<start>:<end>:<checksum> in the storage layer since both <period> and <shard> can be calculated from PeriodConfig + Chunk.
  2. Alternatively, we can keep what you have here, which doesn't age as nicely, but works. This may be the better path forward as we're already talking how to refactor our storage layer and doing a lot of work to make this code safer/ergonomic now may be wasted in the long run.
  3. We can add a signaling byte in the chunk key for the format used, but I'm less inclined towards this idea as it unnecessarily increases index size.

I'd like to see what @cyriltovena thinks, but I'm leaning towards leaving it this way for now (option 2).

@@ -87,6 +87,9 @@ func (cfg *StoreConfig) Validate(logger log.Logger) error {

type baseStore struct {
cfg StoreConfig
// todo (callum) it looks like baseStore is created off a specific schema struct implementation, so perhaps we can store something else here
// other than the entire set of schema period configs
schemaCfg SchemaConfig
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to add ParseExternalKey to the BaseSchema interface itself which is already in baseStore? This is what I mentioned in an earlier comment, creating a function with the signature (Key, Schema) -> Chunk.

@cyriltovena
Copy link
Contributor

cyriltovena commented Jan 3, 2022

I agree with the napkins math for writes, we can currently support 8GB/s per tenant, the largest tenant we have is 120mb/s, so that's 66x already more than we need.

I'm suggesting to only add the finguerprint in the path which is not currently, this is a simple change from : to /, which means we'll have to have a tenant that logs more than 8GB/s per stream to have an issue and that seems difficult to reach, and easy to overcome by asking tenant to split their stream, which anyway will cause issue on a single ingester.

Adding more technically just add more to a given stream, which I found unnecessary, unless we've been told we'll need more than that.

For read, everything is compressed and we always talked in uncompressed throughput, doubling that means 16GB per stream, and the average compression ratio is 8x, so we're talking about 128GB/s per stream. Assuming someone queries across 10 stream this means we're already in the TB/s game.

Not sure what are our expectation long term but I want to avoid adding complexity if we don't need it. May be that's the informations I'm missing.

I'm not planing to block this PR, but I would love to see the difference in throughput between just adding the finguerprint and the suggested solution, having that would help me be onboard with the change.

@owen-d
Copy link
Member

owen-d commented Jan 3, 2022

Hey @cyriltovena, thanks for weighing in.

I'm suggesting to only add the finguerprint in the path which is not currently, this is a simple change from : to /, which means we'll have to have a tenant that logs more than 8GB/s per stream to have an issue

We previously ran a load test by generating objects according to the schema foo/<uuid>/<same-uuid>, but were unable to scale past a single s3 prefix's rate limit (3500 writes/s). However, we then tried foo/<1,2>/<uuid>/<uuid> and were able to scale to twice that. It appears that any prefix is the only place we can shard our quotas and there will never be enough writes or reads to a single stream to trigger this. That's why I believe we need to shard first by <period> (to nullify the affect of retention on <shard> size) and then by <shard> (in case Loki needs to write more than the rate limit to the currently active <period>).

I think we can also do this without increasing the index size at all by deriving the shard and prefix when needed from the available schema, fingerprint, and start_ts.

Does this make sense? I think we can do some of it in a followup PR(s) as well, to mitigate review complexity.

@cyriltovena
Copy link
Contributor

We previously ran a load test by generating objects according to the schema foo/<uuid>/<same-uuid>, but were unable to scale past a single s3 prefix's rate limit (3500 writes/s). However, we then tried foo/<1,2>/<uuid>/<uuid> and were able to scale to twice that. It appears that any prefix is the only place we can shard our quotas and there will never be enough writes or reads to a single stream to trigger this. That's why I believe we need to shard first by <period> (to nullify the affect of retention on <shard> size) and then by <shard> (in case Loki needs to write more than the rate limit to the currently active <period>).

I actually don't understand this part. I'm really curious to understand it for the new storage design doc. I think what you saying is that you can scale per stream writes by sharding them, but why do we need to ? If a single stream can write a 8GB/s that is already huge and a single ingester will most likely not be able to keep up, and we'll have to ask users to split more their stream.

Feel free to merge the PR, we can discuss this offline may be and you can walk me through the problem.

Copy link
Member

@owen-d owen-d left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add an entry into configuration.md with the new options.

func defaultChunkPathPeriod(schema string) time.Duration {
switch schema {
case v12:
return 1 * time.Hour
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it may take a while to scale up prefixes, I think we should increase the default

Suggested change
return 1 * time.Hour
return 12 * time.Hour

@cstyan
Copy link
Contributor

cstyan commented Jan 4, 2022

Just going to weigh in here with some of my thoughts/responses to comments. I also have a few questions as a result of some of Cyril's comments but those aren't high priority.

Note that so far we've been talking amongst ourselves in best case scenario terms, where s3's backend has already scaled up to give us the maximum rate limit for each prefix value. In reality we've yet to reliably get that rate limit scaled up on some reasonable timeline for new prefix values. We've seen everything from never scaling up and only giving us a single prefixes rate limit for multiple prefixes, to scaling up slightly after 5 minutes, never scaling at all for multiple prefixes but giving us 1/2 the advertised rate limit for each prefix, and more. I've even engaged with someone from AWS and a contact of theirs who works on s3 and we've not been able to come to any conclusion as to how the scaling of their backend is supposed to work. I think, ultimately, if we want to understand that well enough to be able to estimate and provision reliably we'd need to engage AWS premium support.

@cyriltovena The key has 12 bytes more and that can increase on index size
What is the potential impact of this? As someone without a lot of context in that area of Loki, 12 bytes seems trivial. This is the per chunk index correct?

Regarding Cyril's comment about alternative key formats we could have used:

<userid>/<fingerprint>/from:to:checksum seems enough to me.

Alternatively <userid>/<day_period>/<fingerprint>/from:to:checksum

The issue with the first is that, based on the way s3 advertises that their rate limits work is that you get 3500 writes or 5500 reads per second per prefix. In the case of <userid>/<fingerprint>/from:to:checksum where fingerprint identifies a unique stream this means that per stream, regardless of the time range of a query, a query would only be able to do 5500 read operations per second. For <userid>/<day_period>/<fingerprint>/from:to:checksum we'd at least be able to get the rate limit per day_period, so longer range queries would get more throughput. The reasoning for adding in the additional shard is to increase the potential upscaling of the rate limit, as well as decrease the likelyhood of downscaling. Think of it this way, if a user has 100 active series and 10 shards, s3 can scale each shard up on it's backend. And since there's 10 active series within each shard, it's more likely that we'll have sustained load to those prefix values, meaning s3 will scale us up and also not see a significant decrease in sustained load which would result in downscaling (and a lower rate limit). Hopefully this wall of text clarifies some of the behaviour we expect to see.

Next, the unpredictability (so far) of s3's scaling means that it's difficult for us to accurately test different key formats. Or at least expensive in terms of time. Jordan's tests today show scaling after a period of hours, not minutes. Previously we were running tests for ~30 minutes each. Unless we want to commit more time to running additional tests that may or may not yield consistent results we should move forward with a solution and just see how well it scales in a real Loki cluster 🤞

Finally regarding further refactors and the moving of the ParseExternalKey function to the schema code, I am in support of these changes but agree that it can be left to another PR. When reading through the code and talking about what changes would have to be made in order to clean this up it became increasingly clear to Jordan and I that there was more that would have to happen to do this properly than we expected. To the point that I don't think doing so should block the current PR and/or delay what we need to have finished in the coming weeks.

One portion of the code that was particularly hard for me to follow was the naming of some interfaces and structs that we're touching in this PR, and it felt like the embedding of schema version structs within each other was also potentially a problem when it came to the refactor. IMO a good start would be reevaluating the current interfaces and their APIs, and commenting each struct with which interfaces they actually implement.

@JordanRushing
Copy link
Contributor Author

#5054 is branched from this PR and implements a simpler chunk key as mentioned in earlier discussion.

@cyriltovena
Copy link
Contributor

@cyriltovena The key has 12 bytes more and that can increase on index size What is the potential impact of this? As someone without a lot of context in that area of Loki, 12 bytes seems trivial. This is the per chunk index correct?

Correct I don't think it's was key in my thinking but just adds up to the list of cons. We're flushing 1.2 billions chunks over 30d across all workload, so roughly 15GB. Again not a big problem just an observation.

Regarding Cyril's comment about alternative key formats we could have used:

<userid>/<fingerprint>/from:to:checksum seems enough to me.

Alternatively <userid>/<day_period>/<fingerprint>/from:to:checksum

The issue with the first is that, based on the way s3 advertises that their rate limits work is that you get 3500 writes or 5500 reads per second per prefix. In the case of <userid>/<fingerprint>/from:to:checksum where fingerprint identifies a unique stream this means that per stream, regardless of the time range of a query, a query would only be able to do 5500 read operations per second. For <userid>/<day_period>/<fingerprint>/from:to:checksum we'd at least be able to get the rate limit per day_period, so longer range queries would get more throughput. The reasoning for adding in the additional shard is to increase the potential upscaling of the rate limit, as well as decrease the likelyhood of downscaling. Think of it this way, if a user has 100 active series and 10 shards, s3 can scale each shard up on it's backend. And since there's 10 active series within each shard, it's more likely that we'll have sustained load to those prefix values, meaning s3 will scale us up and also not see a significant decrease in sustained load which would result in downscaling (and a lower rate limit). Hopefully this wall of text clarifies some of the behaviour we expect to see.

I understand the concern for long lived streams hence why I suggested an alternative, I think <day_period> could be an hour or 6hours if we want to go with it. This is mostly for scaling very high the read path, I doubt it will help the write path. I leave the decision up to you guys.

Next, the unpredictability (so far) of s3's scaling means that it's difficult for us to accurately test different key formats. Or at least expensive in terms of time. Jordan's tests today show scaling after a period of hours, not minutes. Previously we were running tests for ~30 minutes each. Unless we want to commit more time to running additional tests that may or may not yield consistent results we should move forward with a solution and just see how well it scales in a real Loki cluster 🤞

I don't want to block this PR. I'm trusting you guys to make the best decision, I was mostly curious about the selected solution. This will have a big impact in our storage refactoring, since we want to make the right decision from ground up, I don't want something configurable, but something that works for everyone.

@JordanRushing
Copy link
Contributor Author

Closing in favor of #5054

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants