diff --git a/exporter/clickhouseprofileexporter/ch/access_native_columnar.go b/exporter/clickhouseprofileexporter/ch/access_native_columnar.go index fbff9ca..0bc6703 100644 --- a/exporter/clickhouseprofileexporter/ch/access_native_columnar.go +++ b/exporter/clickhouseprofileexporter/ch/access_native_columnar.go @@ -72,6 +72,7 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error { timestamp_ns := make([]uint64, sz) typ := make([]string, sz) service_name := make([]string, sz) + values_agg := make([][]tuple, sz) sample_types_units := make([][]tuple, sz) period_type := make([]string, sz) period_unit := make([]string, sz) @@ -89,7 +90,6 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error { for i := 0; i < sz; i++ { r = rl.At(i).ScopeLogs().At(0).LogRecords().At(0) m = r.Attributes() - timestamp_ns[i] = uint64(r.Timestamp()) tmp, _ = m.Get("type") @@ -99,6 +99,7 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error { service_name[i] = tmp.AsString() sample_types, _ := m.Get("sample_types") + sample_units, _ := m.Get("sample_units") sample_types_array, err := valueToStringArray(sample_types) @@ -111,12 +112,21 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error { return err } + values_agg_raw, ok := m.Get("values_agg") + if ok { + values_agg_tuple, err := valueAggToTuple(&values_agg_raw) + if err != nil { + return err + } + values_agg[i] = append(values_agg[i], values_agg_tuple...) + } + sample_types_units_item := make([]tuple, len(sample_types_array)) for i, v := range sample_types_array { + sample_types_units_item[i] = tuple{v, sample_units_array[i]} } sample_types_units[i] = sample_types_units_item - tmp, _ = m.Get("period_type") period_type[i] = tmp.AsString() @@ -169,9 +179,13 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error { if err := b.Column(8).Append(payload_type); err != nil { return err } + if err := b.Column(9).Append(payload); err != nil { return err } + if err := b.Column(10).Append(values_agg); err != nil { + return err + } return b.Send() } @@ -179,3 +193,19 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error { func (ch *clickhouseAccessNativeColumnar) Shutdown() error { return ch.conn.Close() } + +func valueAggToTuple(value *pcommon.Value) ([]tuple, error) { + var res []tuple + for _, value_agg_any := range value.AsRaw().([]any) { + value_agg_any_array, ok := value_agg_any.([]any) + if !ok || len(value_agg_any_array) != 3 { + return nil, fmt.Errorf("failed to convert value_agg to tuples") + } + res = append(res, tuple{ + value_agg_any_array[0], + value_agg_any_array[1], + int32(value_agg_any_array[2].(int64)), + }) + } + return res, nil +} diff --git a/exporter/clickhouseprofileexporter/exporter.go b/exporter/clickhouseprofileexporter/exporter.go index 149295f..7258847 100644 --- a/exporter/clickhouseprofileexporter/exporter.go +++ b/exporter/clickhouseprofileexporter/exporter.go @@ -64,11 +64,11 @@ func newClickhouseProfileExporter(ctx context.Context, set *exporter.CreateSetti func (exp *clickhouseProfileExporter) send(ctx context.Context, logs plog.Logs) error { start := time.Now().UnixMilli() if err := exp.ch.InsertBatch(logs); err != nil { - otelcolExporterClickhouseProfileFlushTimeMillis.Record(ctx, time.Now().UnixMilli()-start, metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError))) + otelcolExporterClickhouseProfileBatchInsertTimeMillis.Record(ctx, time.Now().UnixMilli()-start, metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError))) exp.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error())) return err } - otelcolExporterClickhouseProfileFlushTimeMillis.Record(ctx, time.Now().UnixMilli()-start, metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeSuccess))) + otelcolExporterClickhouseProfileBatchInsertTimeMillis.Record(ctx, time.Now().UnixMilli()-start, metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeSuccess))) exp.logger.Info("inserted batch", zap.Int("size", logs.ResourceLogs().Len())) return nil } diff --git a/exporter/clickhouseprofileexporter/metrics.go b/exporter/clickhouseprofileexporter/metrics.go index 83082cf..d494573 100644 --- a/exporter/clickhouseprofileexporter/metrics.go +++ b/exporter/clickhouseprofileexporter/metrics.go @@ -9,14 +9,14 @@ import ( const prefix = "exporter_clickhouse_profile_" var ( - otelcolExporterClickhouseProfileFlushTimeMillis metric.Int64Histogram + otelcolExporterClickhouseProfileBatchInsertTimeMillis metric.Int64Histogram ) func initMetrics(meter metric.Meter) error { var err error - if otelcolExporterClickhouseProfileFlushTimeMillis, err = meter.Int64Histogram( - fmt.Sprint(prefix, "flush_time_millis"), - metric.WithDescription("Clickhouse profile exporter flush time in millis"), + if otelcolExporterClickhouseProfileBatchInsertTimeMillis, err = meter.Int64Histogram( + fmt.Sprint(prefix, "batch_insert_time_millis"), + metric.WithDescription("Clickhouse profile exporter batch insert time in millis"), metric.WithExplicitBucketBoundaries(0, 5, 10, 20, 50, 100, 200, 500, 1000, 5000), ); err != nil { return err diff --git a/go.sum b/go.sum index f436bc9..43bb863 100644 --- a/go.sum +++ b/go.sum @@ -2247,4 +2247,4 @@ sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc= sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo= sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8= skywalking.apache.org/repo/goapi v0.0.0-20231026090926-09378dd56587 h1:TY79I5Y7xRB8q5LQ+MJn7NYsYi0VL5nj1QDrUHwK7cQ= -skywalking.apache.org/repo/goapi v0.0.0-20231026090926-09378dd56587/go.mod h1:onFubXaIoY/2FTRVrLMqCTlaNq4SilAEwF/2G0IcaBw= +skywalking.apache.org/repo/goapi v0.0.0-20231026090926-09378dd56587/go.mod h1:onFubXaIoY/2FTRVrLMqCTlaNq4SilAEwF/2G0IcaBw= \ No newline at end of file diff --git a/receiver/pyroscopereceiver/jfrparser/parser.go b/receiver/pyroscopereceiver/jfrparser/parser.go index 396b8f8..c956c45 100644 --- a/receiver/pyroscopereceiver/jfrparser/parser.go +++ b/receiver/pyroscopereceiver/jfrparser/parser.go @@ -32,10 +32,9 @@ type profileWrapper struct { type jfrPprofParser struct { jfrParser *jfr_parser.Parser - - proftab [sampleTypeCount]*profileWrapper // - samptab [sampleTypeCount]map[uint32]uint32 // - loctab [sampleTypeCount]map[uint32]*pprof_proto.Location // + proftab [sampleTypeCount]*profileWrapper // + samptab [sampleTypeCount]map[uint32]uint32 // + loctab [sampleTypeCount]map[uint32]*pprof_proto.Location // } var typetab = []profile_types.ProfileType{ @@ -112,12 +111,18 @@ func (pa *jfrPprofParser) Parse(jfr *bytes.Buffer, md profile_types.Metadata) ([ ps := make([]profile_types.ProfileIR, 0) for _, pr := range pa.proftab { - if nil != pr { - // assuming jfr-pprof conversion should not expand memory footprint, transitively applying jfr limit on pprof - pr.prof.Payload = new(bytes.Buffer) - pr.pprof.WriteUncompressed(pr.prof.Payload) - ps = append(ps, pr.prof) + if nil == pr { + continue } + // assuming jfr-pprof conversion should not expand memory footprint, transitively applying jfr limit on pprof + pr.prof.Payload = new(bytes.Buffer) + pr.pprof.WriteUncompressed(pr.prof.Payload) + + // Calculate values_agg based on the requirements + valuesAgg := calculateValuesAgg(pr.pprof) + pr.prof.ValueAggregation = valuesAgg + + ps = append(ps, pr.prof) } return ps, nil } @@ -204,12 +209,38 @@ func (pa *jfrPprofParser) addProfile(sampleType sampleType) *profileWrapper { pprof: &pprof_proto.Profile{}, } pa.proftab[sampleType] = pw - // add sample types and units to keep the pprof valid for libraries for i, t := range pw.prof.Type.SampleType { pa.appendSampleType(pw.pprof, t, pw.prof.Type.SampleUnit[i]) } + return pw + +} +func calculateValuesAgg(samples *pprof_proto.Profile) []profile_types.SampleType { + var valuesAgg []profile_types.SampleType + // Loop through each sample type + for j, st := range samples.SampleType { + sum, count := calculateSumAndCount(samples, j) + valuesAgg = append(valuesAgg, profile_types.SampleType{fmt.Sprintf("%s:%s", st.Type, st.Unit), sum, count}) + } + + return valuesAgg +} + +func calculateSumAndCount(samples *pprof_proto.Profile, sampleTypeIndex int) (int64, int32) { + var sum int64 + count := int32(len(samples.Sample)) + + for _, sample := range samples.Sample { + // Check if the sample has a value for the specified sample type + if sampleTypeIndex < len(sample.Value) { + // Accumulate the value for the specified sample type + sum += sample.Value[sampleTypeIndex] + } + } + + return sum, count } func (pa *jfrPprofParser) appendSampleType(prof *pprof_proto.Profile, typ, unit string) { @@ -229,6 +260,7 @@ func (pa *jfrPprofParser) getSample(sampleType sampleType, prof *pprof_proto.Pro return nil } return prof.Sample[i] + } func (pa *jfrPprofParser) appendSample(sampleType sampleType, prof *pprof_proto.Profile, locations []*pprof_proto.Location, values []int64, externStacktraceRef uint32) { diff --git a/receiver/pyroscopereceiver/receiver.go b/receiver/pyroscopereceiver/receiver.go index cd4f348..5065025 100644 --- a/receiver/pyroscopereceiver/receiver.go +++ b/receiver/pyroscopereceiver/receiver.go @@ -361,22 +361,43 @@ func stringToAnyArray(s []string) []any { } return res } +func entitiesToStrings(entities []profile_types.SampleType) []any { + var result []any + for _, entity := range entities { + result = append(result, + []any{entity.Key, entity.Sum, entity.Count}, + ) + } + return result +} func setAttrsFromProfile(prof profile_types.ProfileIR, m pcommon.Map) error { m.PutStr("type", prof.Type.Type) s := m.PutEmptySlice("sample_types") + err := s.FromRaw(stringToAnyArray(prof.Type.SampleType)) if err != nil { return err } + s = m.PutEmptySlice("sample_units") err = s.FromRaw(stringToAnyArray(prof.Type.SampleUnit)) + if err != nil { + return err + } + // Correct type assertion for []profile.SampleType + result := prof.ValueAggregation.([]profile_types.SampleType) + s = m.PutEmptySlice("values_agg") + + err = s.FromRaw(entitiesToStrings(result)) + if err != nil { return err } m.PutStr("period_type", prof.Type.PeriodType) m.PutStr("period_unit", prof.Type.PeriodUnit) m.PutStr("payload_type", fmt.Sprint(prof.PayloadType)) + return nil } diff --git a/receiver/pyroscopereceiver/receiver_test.go b/receiver/pyroscopereceiver/receiver_test.go index 578cecc..aac68fb 100644 --- a/receiver/pyroscopereceiver/receiver_test.go +++ b/receiver/pyroscopereceiver/receiver_test.go @@ -102,6 +102,7 @@ func TestPyroscopeIngestJfrCpu(t *testing.T) { "payload_type": "0", "sample_types": []any{"cpu"}, "sample_units": []any{"nanoseconds"}, + "values_agg": []any{[]any{"cpu:nanoseconds", 4780000000, 370}}, }, body: pb, }}), @@ -139,6 +140,10 @@ func TestPyroscopeIngestJfrMemory(t *testing.T) { "payload_type": "0", "sample_types": []any{"alloc_in_new_tlab_objects", "alloc_in_new_tlab_bytes"}, "sample_units": []any{"count", "bytes"}, + "values_agg": []any{ + []any{"alloc_in_new_tlab_objects:count", 977, 471}, + []any{"alloc_in_new_tlab_bytes:bytes", 512229376, 471}, + }, }, body: pbAllocInNewTlab, }, @@ -157,6 +162,7 @@ func TestPyroscopeIngestJfrMemory(t *testing.T) { "payload_type": "0", "sample_types": []any{"live"}, "sample_units": []any{"count"}, + "values_agg": []any{[]any{"live:count", 976, 471}}, }, body: pbLiveObject, }, diff --git a/receiver/pyroscopereceiver/types/profile.go b/receiver/pyroscopereceiver/types/profile.go index 0e38ce3..05f88e4 100644 --- a/receiver/pyroscopereceiver/types/profile.go +++ b/receiver/pyroscopereceiver/types/profile.go @@ -27,7 +27,14 @@ type ProfileType struct { // Parser IR for profile processing type ProfileIR struct { - Type ProfileType - Payload *bytes.Buffer - PayloadType PayloadType + Type ProfileType + Payload *bytes.Buffer + PayloadType PayloadType + ValueAggregation interface{} +} + +type SampleType struct { + Key string + Sum int64 + Count int32 }