Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingester: active_native_histograms_postings #7982

Merged
merged 2 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions pkg/ingester/activeseries/active_native_histogram_postings.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// SPDX-License-Identifier: AGPL-3.0-only

package activeseries

import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/index"
)

// NativeHistogramPostings is a wrapper around ActiveSeries and index.Postings.
// Similar to Postings, but filters it's output to native histogram series.
// Implements index.Postings interface and returns only series references that
// are active in ActiveSeries and are native histograms.
// It is only valid to use NativeHistogramPostings if the postings are from the
// open TSDB head. It is not valid to use NativeHistogramPostings if the
// postings are from a block.
type NativeHistogramPostings struct {
activeSeries *ActiveSeries
postings index.Postings
currentBucketCount int
}

func NewNativeHistogramPostings(activeSeries *ActiveSeries, postings index.Postings) *NativeHistogramPostings {
return &NativeHistogramPostings{
activeSeries: activeSeries,
postings: postings,
}
}

// Type check.
var _ index.Postings = &NativeHistogramPostings{}

// At implements index.Postings.
func (a *NativeHistogramPostings) At() storage.SeriesRef {
return a.postings.At()
}

// AtBucketCount returns the current bucket count for the series reference at the current position.
func (a *NativeHistogramPostings) AtBucketCount() (storage.SeriesRef, int) {
return a.postings.At(), a.currentBucketCount
}

// Err implements index.Postings.
func (a *NativeHistogramPostings) Err() error {
return a.postings.Err()
}

// Next implements index.Postings.
func (a *NativeHistogramPostings) Next() bool {
for a.postings.Next() {
if count, ok := a.activeSeries.NativeHistoragramBuckets(a.postings.At()); ok {
a.currentBucketCount = count
return true
}
}
return false
}

// Seek implements index.Postings.
func (a *NativeHistogramPostings) Seek(v storage.SeriesRef) bool {
// Seek in the underlying postings.
// If the underlying postings don't contain a value, return false.
if !a.postings.Seek(v) {
return false
}

// If the underlying postings contain a value, check if it's active.
if count, ok := a.activeSeries.NativeHistoragramBuckets(a.postings.At()); ok {
a.currentBucketCount = count
return true
}

// If the underlying postings contain a value, but it's not active,
// seek to the next active value.
return a.Next()
}
158 changes: 158 additions & 0 deletions pkg/ingester/activeseries/active_native_histogram_postings_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package activeseries

import (
"testing"
"time"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/stretchr/testify/require"
)

func TestNativeHistogramPostings_Expand(t *testing.T) {
ttl := 3
mockedTime := time.Unix(int64(ttl), 0)
series := []labels.Labels{
labels.FromStrings("a", "1"),
labels.FromStrings("a", "2"),
labels.FromStrings("a", "3"), // Will make this series a native histogram.
labels.FromStrings("a", "4"), // Will make this series a native histogram.
labels.FromStrings("a", "5"),
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl))

// Update each series at a different time according to its index.
for i := range allStorageRefs {
buckets := -1 // No native histogram buckets.
if i+1 == 3 || i+1 == 4 {
buckets = 10 // Native histogram with 10 buckets.
}
activeSeries.UpdateSeries(series[i], allStorageRefs[i], time.Unix(int64(i), 0), buckets)
}

valid := activeSeries.Purge(mockedTime)
allActive, _, _, _, _, _ := activeSeries.ActiveWithMatchers()
require.True(t, valid)
require.Equal(t, 2, allActive)

activeSeriesPostings := NewNativeHistogramPostings(activeSeries, storagePostings)

activeRefs, err := index.ExpandPostings(activeSeriesPostings)
require.NoError(t, err)

require.Equal(t, allStorageRefs[3:4], activeRefs)
}

func TestNativeHistogramPostings_ExpandWithBucketCount(t *testing.T) {
ttl := 0
mockedTime := time.Unix(int64(ttl), 0)
series := []labels.Labels{
labels.FromStrings("a", "1"),
labels.FromStrings("a", "2"),
labels.FromStrings("a", "3"), // Will make this series a native histogram.
labels.FromStrings("a", "4"), // Will make this series a native histogram.
labels.FromStrings("a", "5"),
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl))

// Update each series at a different time according to its index.
for i := range allStorageRefs {
buckets := -1 // No native histogram buckets.
if i == 2 || i == 3 {
buckets = i * 10 // Native histogram with i*10 buckets.
}
activeSeries.UpdateSeries(series[i], allStorageRefs[i], time.Unix(int64(i), 0), buckets)
}

valid := activeSeries.Purge(mockedTime)
allActive, _, _, _, _, _ := activeSeries.ActiveWithMatchers()
require.True(t, valid)
require.Equal(t, 5, allActive)

activeSeriesPostings := NewNativeHistogramPostings(activeSeries, storagePostings)

seriesRef := []storage.SeriesRef{}
bucketCounts := []int{}
for activeSeriesPostings.Next() {
ref, count := activeSeriesPostings.AtBucketCount()
seriesRef = append(seriesRef, ref)
bucketCounts = append(bucketCounts, count)
}
//activeRefs, err := index.ExpandPostings(activeSeriesPostings)
require.NoError(t, activeSeriesPostings.Err())

require.Equal(t, allStorageRefs[2:4], seriesRef)
require.Equal(t, []int{20, 30}, bucketCounts)
}

func TestNativeHistogramPostings_Seek(t *testing.T) {
ttl := 3
mockedTime := time.Unix(int64(ttl), 0)
series := []labels.Labels{
labels.FromStrings("a", "1"),
labels.FromStrings("a", "2"),
labels.FromStrings("a", "3"),
labels.FromStrings("a", "4"),
labels.FromStrings("a", "5"),
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl))

// Update each series at a different time according to its index.
for i := range allStorageRefs {
buckets := 10
if i+1 == 4 {
buckets = -1 // Make ref==4 not a native histogram to check that Seek skips it.
}
activeSeries.UpdateSeries(series[i], allStorageRefs[i], time.Unix(int64(i), 0), buckets)
}

valid := activeSeries.Purge(mockedTime)
allActive, _, _, _, _, _ := activeSeries.ActiveWithMatchers()
require.True(t, valid)
require.Equal(t, 2, allActive)

activeSeriesPostings := NewNativeHistogramPostings(activeSeries, storagePostings)

// Seek to a series that is not active.
require.True(t, activeSeriesPostings.Seek(3))
// The next active series is 4, but it's not a native histogram.
require.Equal(t, storage.SeriesRef(5), activeSeriesPostings.At())
}

func TestNativeHistogramPostings_SeekToEnd(t *testing.T) {
ttl := 5
mockedTime := time.Unix(int64(ttl), 0)
series := []labels.Labels{
labels.FromStrings("a", "1"),
labels.FromStrings("a", "2"),
labels.FromStrings("a", "3"),
labels.FromStrings("a", "4"),
labels.FromStrings("a", "5"),
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl))

// Update each series at a different time according to its index.
for i := range allStorageRefs {
activeSeries.UpdateSeries(series[i], allStorageRefs[i], time.Unix(int64(i), 0), 10)
}

valid := activeSeries.Purge(mockedTime)
allActive, _, _, _, _, _ := activeSeries.ActiveWithMatchers()
require.True(t, valid)
require.Equal(t, 0, allActive)

activeSeriesPostings := NewNativeHistogramPostings(activeSeries, storagePostings)

// Seek to a series that is not active.
// There are no active series after 3, so Seek should return false.
require.False(t, activeSeriesPostings.Seek(3))
}
4 changes: 2 additions & 2 deletions pkg/ingester/activeseries/active_postings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestPostings_Seek(t *testing.T) {

activeSeriesPostings := NewPostings(activeSeries, storagePostings)

// See to a series that is not active.
// Seek to a series that is not active.
require.True(t, activeSeriesPostings.Seek(3))
// The next active series is 4.
require.Equal(t, storage.SeriesRef(4), activeSeriesPostings.At())
Expand Down Expand Up @@ -102,7 +102,7 @@ func TestPostings_SeekToEnd(t *testing.T) {

activeSeriesPostings := NewPostings(activeSeries, storagePostings)

// See to a series that is not active.
// Seek to a series that is not active.
// There are no active series after 3, so Seek should return false.
require.False(t, activeSeriesPostings.Seek(3))
}
16 changes: 16 additions & 0 deletions pkg/ingester/activeseries/active_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ func (c *ActiveSeries) ContainsRef(ref storage.SeriesRef) bool {
return c.stripes[stripeID].containsRef(ref)
}

func (c *ActiveSeries) NativeHistoragramBuckets(ref storage.SeriesRef) (int, bool) {
stripeID := ref % numStripes
return c.stripes[stripeID].nativeHistoragramBuckets(ref)
}

// Active returns the total numbers of active series, active native
// histogram series, and buckets of those native histogram series.
// This method does not purge expired entries, so Purge should be
Expand Down Expand Up @@ -213,6 +218,17 @@ func (s *seriesStripe) containsRef(ref storage.SeriesRef) bool {
return ok
}

// nativeHistogramBuckets returns the the active buckets for a series if it is active and is a native histogram series.
func (s *seriesStripe) nativeHistoragramBuckets(ref storage.SeriesRef) (int, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
if entry, ok := s.refs[ref]; ok && entry.numNativeHistogramBuckets >= 0 {
return entry.numNativeHistogramBuckets, true
}

return 0, false
}

func (s *seriesStripe) markDeleted(ref storage.SeriesRef, lbls labels.Labels) {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down
Loading