Skip to content

Commit

Permalink
pkg: Pass labelset as map[string]string for quicker access
Browse files Browse the repository at this point in the history
I stopped converting labels.Labels to map[string]string in the separateNameFromLabels. It seem like a good abstraction there still.
  • Loading branch information
metalmatze committed Nov 30, 2022
1 parent b3e7136 commit 0400ff6
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 20 deletions.
7 changes: 4 additions & 3 deletions pkg/parcacol/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func (ing Ingester) Ingest(ctx context.Context, ls labels.Labels, p *pprofproto.
if err != nil {
return fmt.Errorf("prepare labels: %w", err)
}
lset := ls.Map()

if err := validatePprofProfile(p); err != nil {
return err
Expand All @@ -101,20 +102,20 @@ func (ing Ingester) Ingest(ctx context.Context, ls labels.Labels, p *pprofproto.
continue
}

if err := ing.IngestProfile(ctx, ls, p); err != nil {
if err := ing.IngestProfile(ctx, lset, p); err != nil {
return fmt.Errorf("ingest profile: %w", err)
}
}

return nil
}

func (ing Ingester) IngestProfile(ctx context.Context, ls labels.Labels, p *profile.NormalizedProfile) error {
func (ing Ingester) IngestProfile(ctx context.Context, lset map[string]string, p *profile.NormalizedProfile) error {
buf := ing.bufferPool.Get().(*bytes.Buffer)
buf.Reset()
defer ing.bufferPool.Put(buf)

err := NormalizedProfileToParquetBuffer(buf, ing.schema, ls, p)
err := NormalizedProfileToParquetBuffer(buf, ing.schema, lset, p)
if err != nil {
return fmt.Errorf("failed to convert samples to buffer: %w", err)
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/parcacol/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/go-kit/log"
"github.com/polarsignals/frostdb/dynparquet"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/segmentio/parquet-go"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -72,7 +71,7 @@ func TestPprofToParquet(t *testing.T) {

for i, np := range nps {
buf := bytes.NewBuffer(nil)
require.NoError(t, NormalizedProfileToParquetBuffer(buf, schema, labels.Labels{}, np))
require.NoError(t, NormalizedProfileToParquetBuffer(buf, schema, map[string]string{}, np))

serBuf, err := dynparquet.ReaderFromBytes(buf.Bytes())
require.NoError(t, err)
Expand Down
21 changes: 8 additions & 13 deletions pkg/parcacol/sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ import (
"sort"

"github.com/polarsignals/frostdb/dynparquet"
"github.com/prometheus/prometheus/model/labels"
"github.com/segmentio/parquet-go"
"golang.org/x/exp/maps"

"github.com/parca-dev/parca/pkg/profile"
)

// NormalizedProfileToParquetBuffer converts a normalized profile to a Parquet
// buffer. The passed labels must be sorted.
func NormalizedProfileToParquetBuffer(w io.Writer, schema *dynparquet.Schema, ls labels.Labels, p *profile.NormalizedProfile) error {
names := labelNames(ls)
func NormalizedProfileToParquetBuffer(w io.Writer, schema *dynparquet.Schema, lset map[string]string, p *profile.NormalizedProfile) error {
names := labelNames(lset)
pprofLabels := profileLabelNames(p)
pprofNumLabels := profileNumLabelNames(p)

Expand All @@ -50,7 +50,7 @@ func NormalizedProfileToParquetBuffer(w io.Writer, schema *dynparquet.Schema, ls
names,
pprofLabels,
pprofNumLabels,
ls,
lset,
p.Meta,
sample,
)
Expand All @@ -64,13 +64,9 @@ func NormalizedProfileToParquetBuffer(w io.Writer, schema *dynparquet.Schema, ls
return schema.SerializeBuffer(w, pb.Buffer)
}

func labelNames(ls labels.Labels) []string {
names := []string{}

for _, label := range ls {
names = append(names, label.Name)
}

func labelNames(ls map[string]string) []string {
names := maps.Keys(ls)
sort.Strings(names)
return names
}

Expand Down Expand Up @@ -114,7 +110,7 @@ func SampleToParquetRow(
schema *dynparquet.Schema,
row parquet.Row,
labelNames, profileLabelNames, profileNumLabelNames []string,
ls labels.Labels,
lset map[string]string,
meta profile.Meta,
s *profile.NormalizedSample,
) parquet.Row {
Expand Down Expand Up @@ -157,7 +153,6 @@ func SampleToParquetRow(

// All remaining cases take care of dynamic columns
case ColumnLabels:
lset := ls.Map() // Convert labels to map for quicker lookup in loop.
for _, name := range labelNames {
if value, ok := lset[name]; ok {
row = append(row, parquet.ValueOf(value).Level(0, 1, columnIndex))
Expand Down
4 changes: 2 additions & 2 deletions pkg/query/columnquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ func TestColumnQueryAPIQueryDiff(t *testing.T) {

err = ingester.IngestProfile(
ctx,
labels.Labels{{Name: "job", Value: "default"}},
labels.Labels{{Name: "job", Value: "default"}}.Map(),
&profile.NormalizedProfile{
Meta: profile.Meta{
Name: "memory",
Expand All @@ -539,7 +539,7 @@ func TestColumnQueryAPIQueryDiff(t *testing.T) {

err = ingester.IngestProfile(
ctx,
labels.Labels{{Name: "job", Value: "default"}},
labels.Labels{{Name: "job", Value: "default"}}.Map(),
&profile.NormalizedProfile{
Meta: profile.Meta{
Name: "memory",
Expand Down

0 comments on commit 0400ff6

Please sign in to comment.