Skip to content

Commit

Permalink
Introduce Set type for encoding structured metadata fields
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Sep 4, 2024
1 parent 5bbedc3 commit 854e418
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 41 deletions.
7 changes: 6 additions & 1 deletion pkg/storage/bloom/v1/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,12 @@ func (mb *MergeBuilder) processNextSeries(
bytesAdded += bloom.SourceBytesAdded
}

done, err := builder.AddSeries(*nextInStore, offsets, []Field{Field("__line__")})
// TODO(chaudum): Use the indexed fields from bloom creation, however,
// currently we still build blooms from log lines.
fields := NewSet[Field](1)
fields.Add("__line__")

done, err := builder.AddSeries(*nextInStore, offsets, fields)
if err != nil {
return nil, bytesAdded, 0, false, false, errors.Wrap(err, "committing series")
}
Expand Down
47 changes: 13 additions & 34 deletions pkg/storage/bloom/v1/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ type Series struct {
}

type Meta struct {
Fields Fields
Fields Set[Field]
Offsets []BloomOffset
}

Expand Down Expand Up @@ -323,10 +323,9 @@ func (s *SeriesWithMeta) Encode(
lastEnd = chunk.Encode(enc, version, lastEnd)
}

enc.PutUvarint(len(s.Fields))
sort.Sort(s.Fields) // ensure order
for _, field := range s.Fields {
field.Encode(enc, version)
enc.PutUvarint(s.Fields.Len())
for _, f := range s.Fields.Items() {
f.Encode(enc, version)
}

return lastOffset
Expand Down Expand Up @@ -370,53 +369,33 @@ func (s *SeriesWithMeta) Decode(
}
}

s.Fields = make([]Field, dec.Uvarint())
for i := range s.Fields {
err = s.Fields[i].Decode(dec, version)
n := dec.Uvarint()
s.Fields = NewSet[Field](n)
for i := 0; i < n; i++ {
var f Field
err = f.Decode(dec, version)
if err != nil {
return 0, BloomOffset{}, errors.Wrapf(err, "decoding %dth field", i)
}
s.Fields.Add(f)
}

return s.Fingerprint, lastOffset, dec.Err()
}

// field encoding/decoding ---------------------------------------------------

type Field []byte // key of an indexed structured metadata field
type Field string

func (f *Field) Encode(enc *encoding.Encbuf, _ Version) {
enc.PutUvarintBytes(*f)
func (f Field) Encode(enc *encoding.Encbuf, _ Version) {
enc.PutUvarintBytes([]byte(f))
}

func (f *Field) Decode(dec *encoding.Decbuf, _ Version) error {
*f = Field(dec.UvarintBytes())
return dec.Err()
}

func (f *Field) String() string {
return string(*f)
}

func (f *Field) Less(other Field) bool {
// avoid string allocations
return string(*f) < string(other)
}

type Fields []Field

func (f Fields) Len() int {
return len(f)
}

func (f Fields) Less(i, j int) bool {
return f[i].Less(f[j])
}

func (f Fields) Swap(i, j int) {
f[i], f[j] = f[j], f[i]
}

// chunk encoding/decoding ---------------------------------------------------

type ChunkRef logproto.ShortRef
Expand Down
5 changes: 1 addition & 4 deletions pkg/storage/bloom/v1/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,7 @@ func TestSeriesEncoding_V3(t *testing.T) {
{Page: 1, ByteOffset: 2},
{Page: 2, ByteOffset: 1},
},
Fields: []Field{
Field("foo"),
Field("bar"),
},
Fields: NewSetFromLiteral[Field]("foo", "bar"),
},
}

Expand Down
42 changes: 42 additions & 0 deletions pkg/storage/bloom/v1/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,45 @@ func PointerSlice[T any](xs []T) []*T {
}
return out
}

type Set[V comparable] struct {
internal map[V]struct{}
}

func NewSet[V comparable](size int) Set[V] {
return Set[V]{make(map[V]struct{}, size)}
}

func NewSetFromLiteral[V comparable](v ...V) Set[V] {
set := NewSet[V](len(v))
for _, elem := range v {
set.Add(elem)
}
return set
}

func (s Set[V]) Add(v V) bool {
_, ok := s.internal[v]
if !ok {
s.internal[v] = struct{}{}
}
return !ok
}

func (s Set[V]) Len() int {
return len(s.internal)
}

func (s Set[V]) Items() []V {
set := make([]V, 0, s.Len())
for k := range s.internal {
set = append(set, k)
}
return set
}

func (s Set[V]) Union(other Set[V]) {
for _, v := range other.Items() {
s.Add(v)
}
}
10 changes: 8 additions & 2 deletions pkg/storage/bloom/v1/versioned_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,13 @@ func (b *V3Builder) BuildFrom(itr iter.Iterator[SeriesWithBlooms]) (uint32, erro
if err := at.Blooms.Err(); err != nil {
return 0, errors.Wrap(err, "iterating blooms")
}
blockFull, err := b.AddSeries(*at.Series, offsets, []Field{Field("__line__")})

// TODO(chaudum): Use the indexed fields from bloom creation, however,
// currently we still build blooms from log lines.
fields := NewSet[Field](1)
fields.Add("__line__")

blockFull, err := b.AddSeries(*at.Series, offsets, fields)
if err != nil {
return 0, errors.Wrapf(err, "writing series")
}
Expand Down Expand Up @@ -111,7 +117,7 @@ func (b *V3Builder) AddBloom(bloom *Bloom) (BloomOffset, error) {
}

// AddSeries adds a series to the block. It returns true after adding the series, the block is full.
func (b *V3Builder) AddSeries(series Series, offsets []BloomOffset, fields []Field) (bool, error) {
func (b *V3Builder) AddSeries(series Series, offsets []BloomOffset, fields Set[Field]) (bool, error) {
if err := b.index.Append(SeriesWithMeta{
Series: series,
Meta: Meta{
Expand Down

0 comments on commit 854e418

Please sign in to comment.