From 13ffc976f31abda77b8eeffd0b9ffcc145f8f239 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Wed, 27 Apr 2022 16:19:02 -0400 Subject: [PATCH 1/4] Add a per tenant ability to fudge duplicate timestamps in the distributor Signed-off-by: Edward Welch --- pkg/distributor/distributor.go | 14 ++ pkg/distributor/distributor_test.go | 208 ++++++++++++++++++++++++++++ pkg/distributor/limits.go | 2 + pkg/distributor/validator.go | 21 +-- pkg/validation/limits.go | 30 ++-- 5 files changed, 254 insertions(+), 21 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 932c9f9b76f63..d302d24534d81 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -271,7 +271,21 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log validationErr = err continue } + stream.Entries[n] = entry + + // If configured for this tenant, fudge duplicate timestamps. Note, this is imperfect + // since Loki will accept out of order writes it doesn't account for separate + // pushes with overlapping time ranges having entries with duplicate timestamps + if validationContext.fudgeDuplicateTimestamps && n != 0 && stream.Entries[n-1].Timestamp.Equal(entry.Timestamp) { + // Traditional logic for Loki is that 2 lines with the same timestamp and + // exact same content will be de-duplicated, (i.e. only one will be stored, others dropped) + // To maintain this behavior, only fudge the timestamp if the log content is different + if stream.Entries[n-1].Line != entry.Line { + stream.Entries[n].Timestamp = entry.Timestamp.Add(1 * time.Nanosecond) + } + } + n++ validatedSamplesSize += len(entry.Line) validatedSamplesCount++ diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index f39da9d93541f..c009c62cb3cb7 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -100,6 +100,214 @@ func TestDistributor(t *testing.T) { } } +func Test_FudgeTimestamp(t *testing.T) { + fudgingDisabled := &validation.Limits{} + flagext.DefaultValues(fudgingDisabled) + fudgingDisabled.RejectOldSamples = false + + fudgingEnabled := &validation.Limits{} + flagext.DefaultValues(fudgingEnabled) + fudgingEnabled.RejectOldSamples = false + fudgingEnabled.FudgeDuplicateTimestamp = true + + tests := map[string]struct { + limits *validation.Limits + push *logproto.PushRequest + expectedPush *logproto.PushRequest + }{ + "fudging disabled, no dupes": { + limits: fudgingDisabled, + push: &logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: "{job=\"foo\"}", + Entries: []logproto.Entry{ + {time.Unix(123456, 0), "heyooooooo"}, + {time.Unix(123457, 0), "heyiiiiiii"}, + }, + }, + }, + }, + expectedPush: &logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: "{job=\"foo\"}", + Entries: []logproto.Entry{ + {time.Unix(123456, 0), "heyooooooo"}, + {time.Unix(123457, 0), "heyiiiiiii"}, + }, + }, + }, + }, + }, + "fudging disabled, with dupe timestamp different entry": { + limits: fudgingDisabled, + push: &logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: "{job=\"foo\"}", + Entries: []logproto.Entry{ + {time.Unix(123456, 0), "heyooooooo"}, + {time.Unix(123456, 0), "heyiiiiiii"}, + }, + }, + }, + }, + expectedPush: &logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: "{job=\"foo\"}", + Entries: []logproto.Entry{ + {time.Unix(123456, 0), "heyooooooo"}, + {time.Unix(123456, 0), "heyiiiiiii"}, + }, + }, + }, + }, + }, + "fudging disabled, with dupe timestamp same entry": { + limits: fudgingDisabled, + push: &logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: "{job=\"foo\"}", + Entries: []logproto.Entry{ + {time.Unix(123456, 0), "heyooooooo"}, + {time.Unix(123456, 0), "heyooooooo"}, + }, + }, + }, + }, + expectedPush: &logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: "{job=\"foo\"}", + Entries: []logproto.Entry{ + {time.Unix(123456, 0), "heyooooooo"}, + {time.Unix(123456, 0), "heyooooooo"}, + }, + }, + }, + }, + }, + "fudging enabled, no dupes": { + limits: fudgingEnabled, + push: &logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: "{job=\"foo\"}", + Entries: []logproto.Entry{ + {time.Unix(123456, 0), "heyooooooo"}, + {time.Unix(123457, 0), "heyiiiiiii"}, + }, + }, + }, + }, + expectedPush: &logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: "{job=\"foo\"}", + Entries: []logproto.Entry{ + {time.Unix(123456, 0), "heyooooooo"}, + {time.Unix(123457, 0), "heyiiiiiii"}, + }, + }, + }, + }, + }, + "fudging enabled, with dupe timestamp different entry": { + limits: fudgingEnabled, + push: &logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: "{job=\"foo\"}", + Entries: []logproto.Entry{ + {time.Unix(123456, 0), "heyooooooo"}, + {time.Unix(123456, 0), "heyiiiiiii"}, + }, + }, + }, + }, + expectedPush: &logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: "{job=\"foo\"}", + Entries: []logproto.Entry{ + {time.Unix(123456, 0), "heyooooooo"}, + {time.Unix(123456, 1), "heyiiiiiii"}, + }, + }, + }, + }, + }, + "fudging enabled, with dupe timestamp same entry": { + limits: fudgingEnabled, + push: &logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: "{job=\"foo\"}", + Entries: []logproto.Entry{ + {time.Unix(123456, 0), "heyooooooo"}, + {time.Unix(123456, 0), "heyooooooo"}, + }, + }, + }, + }, + expectedPush: &logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: "{job=\"foo\"}", + Entries: []logproto.Entry{ + {time.Unix(123456, 0), "heyooooooo"}, + {time.Unix(123456, 0), "heyooooooo"}, + }, + }, + }, + }, + }, + "fudging enabled, multiple subsequent fudges": { + limits: fudgingEnabled, + push: &logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: "{job=\"foo\"}", + Entries: []logproto.Entry{ + {time.Unix(123456, 0), "heyooooooo"}, + {time.Unix(123456, 0), "hi"}, + {time.Unix(123456, 1), "hey there"}, + }, + }, + }, + }, + expectedPush: &logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: "{job=\"foo\"}", + Entries: []logproto.Entry{ + {time.Unix(123456, 0), "heyooooooo"}, + {time.Unix(123456, 1), "hi"}, + {time.Unix(123456, 2), "hey there"}, + }, + }, + }, + }, + }, + } + + for testName, testData := range tests { + testData := testData + + t.Run(testName, func(t *testing.T) { + ingester := &mockIngester{} + d := prepare(t, testData.limits, nil, func(addr string) (ring_client.PoolClient, error) { return ingester, nil }) + defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck + _, err := d.Push(ctx, testData.push) + assert.NoError(t, err) + assert.Equal(t, testData.expectedPush, ingester.pushed[0]) + }) + } +} + func Test_SortLabelsOnPush(t *testing.T) { limits := &validation.Limits{} flagext.DefaultValues(limits) diff --git a/pkg/distributor/limits.go b/pkg/distributor/limits.go index 559f4b55051c3..cade28eb093da 100644 --- a/pkg/distributor/limits.go +++ b/pkg/distributor/limits.go @@ -14,4 +14,6 @@ type Limits interface { CreationGracePeriod(userID string) time.Duration RejectOldSamples(userID string) bool RejectOldSamplesMaxAge(userID string) time.Duration + + FudgeDuplicateTimestamps(userID string) bool } diff --git a/pkg/distributor/validator.go b/pkg/distributor/validator.go index f69dc5c18e132..2aca4f42e029c 100644 --- a/pkg/distributor/validator.go +++ b/pkg/distributor/validator.go @@ -40,20 +40,23 @@ type validationContext struct { maxLabelNameLength int maxLabelValueLength int + fudgeDuplicateTimestamps bool + userID string } func (v Validator) getValidationContextForTime(now time.Time, userID string) validationContext { return validationContext{ - userID: userID, - rejectOldSample: v.RejectOldSamples(userID), - rejectOldSampleMaxAge: now.Add(-v.RejectOldSamplesMaxAge(userID)).UnixNano(), - creationGracePeriod: now.Add(v.CreationGracePeriod(userID)).UnixNano(), - maxLineSize: v.MaxLineSize(userID), - maxLineSizeTruncate: v.MaxLineSizeTruncate(userID), - maxLabelNamesPerSeries: v.MaxLabelNamesPerSeries(userID), - maxLabelNameLength: v.MaxLabelNameLength(userID), - maxLabelValueLength: v.MaxLabelValueLength(userID), + userID: userID, + rejectOldSample: v.RejectOldSamples(userID), + rejectOldSampleMaxAge: now.Add(-v.RejectOldSamplesMaxAge(userID)).UnixNano(), + creationGracePeriod: now.Add(v.CreationGracePeriod(userID)).UnixNano(), + maxLineSize: v.MaxLineSize(userID), + maxLineSizeTruncate: v.MaxLineSizeTruncate(userID), + maxLabelNamesPerSeries: v.MaxLabelNamesPerSeries(userID), + maxLabelNameLength: v.MaxLabelNameLength(userID), + maxLabelValueLength: v.MaxLabelValueLength(userID), + fudgeDuplicateTimestamps: v.FudgeDuplicateTimestamps(userID), } } diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index cca7e23203c0d..b7b13398d0095 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -46,18 +46,19 @@ const ( // to support user-friendly duration format (e.g: "1h30m45s") in JSON value. type Limits struct { // Distributor enforced limits. - IngestionRateStrategy string `yaml:"ingestion_rate_strategy" json:"ingestion_rate_strategy"` - IngestionRateMB float64 `yaml:"ingestion_rate_mb" json:"ingestion_rate_mb"` - IngestionBurstSizeMB float64 `yaml:"ingestion_burst_size_mb" json:"ingestion_burst_size_mb"` - MaxLabelNameLength int `yaml:"max_label_name_length" json:"max_label_name_length"` - MaxLabelValueLength int `yaml:"max_label_value_length" json:"max_label_value_length"` - MaxLabelNamesPerSeries int `yaml:"max_label_names_per_series" json:"max_label_names_per_series"` - RejectOldSamples bool `yaml:"reject_old_samples" json:"reject_old_samples"` - RejectOldSamplesMaxAge model.Duration `yaml:"reject_old_samples_max_age" json:"reject_old_samples_max_age"` - CreationGracePeriod model.Duration `yaml:"creation_grace_period" json:"creation_grace_period"` - EnforceMetricName bool `yaml:"enforce_metric_name" json:"enforce_metric_name"` - MaxLineSize flagext.ByteSize `yaml:"max_line_size" json:"max_line_size"` - MaxLineSizeTruncate bool `yaml:"max_line_size_truncate" json:"max_line_size_truncate"` + IngestionRateStrategy string `yaml:"ingestion_rate_strategy" json:"ingestion_rate_strategy"` + IngestionRateMB float64 `yaml:"ingestion_rate_mb" json:"ingestion_rate_mb"` + IngestionBurstSizeMB float64 `yaml:"ingestion_burst_size_mb" json:"ingestion_burst_size_mb"` + MaxLabelNameLength int `yaml:"max_label_name_length" json:"max_label_name_length"` + MaxLabelValueLength int `yaml:"max_label_value_length" json:"max_label_value_length"` + MaxLabelNamesPerSeries int `yaml:"max_label_names_per_series" json:"max_label_names_per_series"` + RejectOldSamples bool `yaml:"reject_old_samples" json:"reject_old_samples"` + RejectOldSamplesMaxAge model.Duration `yaml:"reject_old_samples_max_age" json:"reject_old_samples_max_age"` + CreationGracePeriod model.Duration `yaml:"creation_grace_period" json:"creation_grace_period"` + EnforceMetricName bool `yaml:"enforce_metric_name" json:"enforce_metric_name"` + MaxLineSize flagext.ByteSize `yaml:"max_line_size" json:"max_line_size"` + MaxLineSizeTruncate bool `yaml:"max_line_size_truncate" json:"max_line_size_truncate"` + FudgeDuplicateTimestamp bool `yaml:"fudge_duplicate_timestamp" json:"fudge_duplicate_timestamp"` // Ingester enforced limits. MaxLocalStreamsPerUser int `yaml:"max_streams_per_user" json:"max_streams_per_user"` @@ -135,6 +136,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.MaxLabelValueLength, "validation.max-length-label-value", 2048, "Maximum length accepted for label value. This setting also applies to the metric name") f.IntVar(&l.MaxLabelNamesPerSeries, "validation.max-label-names-per-series", 30, "Maximum number of label names per series.") f.BoolVar(&l.RejectOldSamples, "validation.reject-old-samples", true, "Reject old samples.") + f.BoolVar(&l.FudgeDuplicateTimestamp, "validation.fudge-duplicate-timestamps", false, "Fudge the timestamp of a log line by one nanosecond in the future from a previous entry for the same stream with the same timestamp, guarantees sort order at query time.") _ = l.RejectOldSamplesMaxAge.Set("7d") f.Var(&l.RejectOldSamplesMaxAge, "validation.reject-old-samples.max-age", "Maximum accepted sample age before rejecting.") @@ -537,6 +539,10 @@ func (o *Overrides) PerStreamRateLimit(userID string) RateLimit { } } +func (o *Overrides) FudgeDuplicateTimestamps(userID string) bool { + return o.getOverridesForUser(userID).FudgeDuplicateTimestamp +} + func (o *Overrides) getOverridesForUser(userID string) *Limits { if o.tenantLimits != nil { l := o.tenantLimits.TenantLimits(userID) From c81819f827c6dd3f9e92ffd601e9d69ba7f4ab87 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Wed, 27 Apr 2022 18:40:01 -0400 Subject: [PATCH 2/4] update docs Signed-off-by: Edward Welch --- docs/sources/configuration/_index.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index 4861c24537c44..9a1c3c0dd6520 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -2100,6 +2100,16 @@ The `limits_config` block configures global and per-tenant limits in Loki. # CLI flag: -distributor.max-line-size-truncate [max_line_size_truncate: | default = false ] +# Fudge the log line timestamp during ingestion when it's the same as the previous entry for the same stream +# When enabled, if a log line in a push request has the same timestamp as the previous line +# for the same stream, one nanosecond is added to the log line. This will preserve the received +# order of log lines with the exact same timestamp when they are queried by slightly altering +# their stored timestamp. NOTE: this is imperfect because Loki accepts out of order writes +# and another push request for the same stream could contain duplicate timestamps to existing +# entries and they will not be fudged. +# CLI flag: -validation.fudge-duplicate-timestamps +[fudge_duplicate_timestamp: | default = false ] + # Maximum number of log entries that will be returned for a query. # CLI flag: -validation.max-entries-limit [max_entries_limit_per_query: | default = 5000 ] From 9841c69c8d2eea54e0c436c5f1e55cbfe0219e3b Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Wed, 27 Apr 2022 18:57:50 -0400 Subject: [PATCH 3/4] lint Signed-off-by: Edward Welch --- pkg/distributor/distributor_test.go | 60 ++++++++++++++--------------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index c009c62cb3cb7..2da8986a3f145 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -122,8 +122,8 @@ func Test_FudgeTimestamp(t *testing.T) { { Labels: "{job=\"foo\"}", Entries: []logproto.Entry{ - {time.Unix(123456, 0), "heyooooooo"}, - {time.Unix(123457, 0), "heyiiiiiii"}, + {Timestamp: time.Unix(123456, 0), Line: "heyooooooo"}, + {Timestamp: time.Unix(123457, 0), Line: "heyiiiiiii"}, }, }, }, @@ -133,8 +133,8 @@ func Test_FudgeTimestamp(t *testing.T) { { Labels: "{job=\"foo\"}", Entries: []logproto.Entry{ - {time.Unix(123456, 0), "heyooooooo"}, - {time.Unix(123457, 0), "heyiiiiiii"}, + {Timestamp: time.Unix(123456, 0), Line: "heyooooooo"}, + {Timestamp: time.Unix(123457, 0), Line: "heyiiiiiii"}, }, }, }, @@ -147,8 +147,8 @@ func Test_FudgeTimestamp(t *testing.T) { { Labels: "{job=\"foo\"}", Entries: []logproto.Entry{ - {time.Unix(123456, 0), "heyooooooo"}, - {time.Unix(123456, 0), "heyiiiiiii"}, + {Timestamp: time.Unix(123456, 0), Line: "heyooooooo"}, + {Timestamp: time.Unix(123456, 0), Line: "heyiiiiiii"}, }, }, }, @@ -158,8 +158,8 @@ func Test_FudgeTimestamp(t *testing.T) { { Labels: "{job=\"foo\"}", Entries: []logproto.Entry{ - {time.Unix(123456, 0), "heyooooooo"}, - {time.Unix(123456, 0), "heyiiiiiii"}, + {Timestamp: time.Unix(123456, 0), Line: "heyooooooo"}, + {Timestamp: time.Unix(123456, 0), Line: "heyiiiiiii"}, }, }, }, @@ -172,8 +172,8 @@ func Test_FudgeTimestamp(t *testing.T) { { Labels: "{job=\"foo\"}", Entries: []logproto.Entry{ - {time.Unix(123456, 0), "heyooooooo"}, - {time.Unix(123456, 0), "heyooooooo"}, + {Timestamp: time.Unix(123456, 0), Line: "heyooooooo"}, + {Timestamp: time.Unix(123456, 0), Line: "heyooooooo"}, }, }, }, @@ -183,8 +183,8 @@ func Test_FudgeTimestamp(t *testing.T) { { Labels: "{job=\"foo\"}", Entries: []logproto.Entry{ - {time.Unix(123456, 0), "heyooooooo"}, - {time.Unix(123456, 0), "heyooooooo"}, + {Timestamp: time.Unix(123456, 0), Line: "heyooooooo"}, + {Timestamp: time.Unix(123456, 0), Line: "heyooooooo"}, }, }, }, @@ -197,8 +197,8 @@ func Test_FudgeTimestamp(t *testing.T) { { Labels: "{job=\"foo\"}", Entries: []logproto.Entry{ - {time.Unix(123456, 0), "heyooooooo"}, - {time.Unix(123457, 0), "heyiiiiiii"}, + {Timestamp: time.Unix(123456, 0), Line: "heyooooooo"}, + {Timestamp: time.Unix(123457, 0), Line: "heyiiiiiii"}, }, }, }, @@ -208,8 +208,8 @@ func Test_FudgeTimestamp(t *testing.T) { { Labels: "{job=\"foo\"}", Entries: []logproto.Entry{ - {time.Unix(123456, 0), "heyooooooo"}, - {time.Unix(123457, 0), "heyiiiiiii"}, + {Timestamp: time.Unix(123456, 0), Line: "heyooooooo"}, + {Timestamp: time.Unix(123457, 0), Line: "heyiiiiiii"}, }, }, }, @@ -222,8 +222,8 @@ func Test_FudgeTimestamp(t *testing.T) { { Labels: "{job=\"foo\"}", Entries: []logproto.Entry{ - {time.Unix(123456, 0), "heyooooooo"}, - {time.Unix(123456, 0), "heyiiiiiii"}, + {Timestamp: time.Unix(123456, 0), Line: "heyooooooo"}, + {Timestamp: time.Unix(123456, 0), Line: "heyiiiiiii"}, }, }, }, @@ -233,8 +233,8 @@ func Test_FudgeTimestamp(t *testing.T) { { Labels: "{job=\"foo\"}", Entries: []logproto.Entry{ - {time.Unix(123456, 0), "heyooooooo"}, - {time.Unix(123456, 1), "heyiiiiiii"}, + {Timestamp: time.Unix(123456, 0), Line: "heyooooooo"}, + {Timestamp: time.Unix(123456, 1), Line: "heyiiiiiii"}, }, }, }, @@ -247,8 +247,8 @@ func Test_FudgeTimestamp(t *testing.T) { { Labels: "{job=\"foo\"}", Entries: []logproto.Entry{ - {time.Unix(123456, 0), "heyooooooo"}, - {time.Unix(123456, 0), "heyooooooo"}, + {Timestamp: time.Unix(123456, 0), Line: "heyooooooo"}, + {Timestamp: time.Unix(123456, 0), Line: "heyooooooo"}, }, }, }, @@ -258,8 +258,8 @@ func Test_FudgeTimestamp(t *testing.T) { { Labels: "{job=\"foo\"}", Entries: []logproto.Entry{ - {time.Unix(123456, 0), "heyooooooo"}, - {time.Unix(123456, 0), "heyooooooo"}, + {Timestamp: time.Unix(123456, 0), Line: "heyooooooo"}, + {Timestamp: time.Unix(123456, 0), Line: "heyooooooo"}, }, }, }, @@ -272,9 +272,9 @@ func Test_FudgeTimestamp(t *testing.T) { { Labels: "{job=\"foo\"}", Entries: []logproto.Entry{ - {time.Unix(123456, 0), "heyooooooo"}, - {time.Unix(123456, 0), "hi"}, - {time.Unix(123456, 1), "hey there"}, + {Timestamp: time.Unix(123456, 0), Line: "heyooooooo"}, + {Timestamp: time.Unix(123456, 0), Line: "hi"}, + {Timestamp: time.Unix(123456, 1), Line: "hey there"}, }, }, }, @@ -284,9 +284,9 @@ func Test_FudgeTimestamp(t *testing.T) { { Labels: "{job=\"foo\"}", Entries: []logproto.Entry{ - {time.Unix(123456, 0), "heyooooooo"}, - {time.Unix(123456, 1), "hi"}, - {time.Unix(123456, 2), "hey there"}, + {Timestamp: time.Unix(123456, 0), Line: "heyooooooo"}, + {Timestamp: time.Unix(123456, 1), Line: "hi"}, + {Timestamp: time.Unix(123456, 2), Line: "hey there"}, }, }, }, From 3cf4833478b003ac7afd97b6b46d4b551caa91e1 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Wed, 27 Apr 2022 19:00:44 -0400 Subject: [PATCH 4/4] update changelog Signed-off-by: Edward Welch --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3ddf5804c904b..5d1565f4e8d5a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -68,6 +68,7 @@ ##### Fixes * [5685](https://github.com/grafana/loki/pull/5685) **chaudum**: Assert that push values tuples consist of string values ##### Changes +* [6042](https://github.com/grafana/loki/pull/6042) **slim-bean**: Add a new configuration to allow fudging of ingested timestamps to guarantee sort order of duplicate timestamps at query time. * [5777](https://github.com/grafana/loki/pull/5777) **tatchiuleung**: storage: make Azure blobID chunk delimiter configurable * [5650](https://github.com/grafana/loki/pull/5650) **cyriltovena**: Remove more chunkstore and schema version below v9 * [5643](https://github.com/grafana/loki/pull/5643) **simonswine**: Introduce a ChunkRef type as part of logproto