Skip to content

Commit

Permalink
Merge pull request #108 from metrico/feature/traces_improve
Browse files Browse the repository at this point in the history
feature/traces improve
  • Loading branch information
akvlad authored Oct 22, 2024
2 parents b17cde3 + f774a14 commit 0b1cea3
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 19 deletions.
13 changes: 11 additions & 2 deletions exporter/qrynexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,18 @@


# Configuration options:
- `dsn` (required): Data Source Name for Clickhouse.
- Example: `tcp://localhost:9000/qryn`

- `clustered_clickhouse` (required):
- Type: boolean
- Description: Set to `true` if using a Clickhouse cluster; otherwise, set to `false`.

- `client_side_trace_processing` (required):
- Type: boolean
- Default: `true`
- Description: Enables client-side processing of trace data. This can improve performance but may increase client-side resource usage.

- `dsn` (required): Clickhouse's dsn.
- `clustered_clickhouse` (required): true if clickhouse cluster is used

# Example:
## Simple Trace Data
Expand Down
3 changes: 3 additions & 0 deletions exporter/qrynexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ type Config struct {
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
exporterhelper.QueueSettings `mapstructure:"sending_queue"`

// ClientSideTraceProcessing is a boolean that indicates whether to process traces on the client side.
ClientSideTraceProcessing bool `mapstructure:"client_side_trace_processing"`

ClusteredClickhouse bool `mapstructure:"clustered_clickhouse"`

// DSN is the ClickHouse server Data Source Name.
Expand Down
2 changes: 1 addition & 1 deletion exporter/qrynexporter/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
}
}

if err := batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries); err != nil {
if err := batchSamplesAndTimeSeries(context.WithValue(ctx, clusterKey, e.cluster), e.db, samples, timeSeries); err != nil {
otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeLogs)))
e.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error()))
return err
Expand Down
2 changes: 1 addition & 1 deletion exporter/qrynexporter/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ func (e *metricsExporter) pushMetricsData(ctx context.Context, md pmetric.Metric
}
}

if err := batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries); err != nil {
if err := batchSamplesAndTimeSeries(context.WithValue(ctx, clusterKey, e.cluster), e.db, samples, timeSeries); err != nil {
otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeMetrics)))
e.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error()))
return err
Expand Down
64 changes: 61 additions & 3 deletions exporter/qrynexporter/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
)

var (
tracesInputSQL = func(clustered bool) string {
tracesInputSQL = func(_ bool) string {
return `INSERT INTO traces_input (
trace_id,
span_id,
Expand Down Expand Up @@ -57,6 +57,40 @@ var (
}
)

func TracesV2InputSQL(clustered bool) string {
dist := ""
if clustered {
dist = "_dist"
}
return fmt.Sprintf(`INSERT INTO tempo_traces%s (
oid,
trace_id,
span_id,
parent_id,
name,
timestamp_ns,
duration_ns,
service_name,
payload_type,
payload)`, dist)
}

func TracesTagsV2InputSQL(clustered bool) string {
dist := ""
if clustered {
dist = "_dist"
}
return fmt.Sprintf(`INSERT INTO tempo_traces_attrs_gin%s (
oid,
date,
key,
val,
trace_id,
span_id,
timestamp_ns,
duration)`, dist)
}

// Note: https://github.com/metrico/qryn/blob/master/lib/db/maintain/scripts.js
// We need to align with the schema here.
//
Expand All @@ -76,8 +110,8 @@ var (
//
// ) Engine=Null

// Trace represent trace model
type Trace struct {
// TraceInput represent trace model
type TraceInput struct {
TraceID string `ch:"trace_id"`
SpanID string `ch:"span_id"`
ParentID string `ch:"parent_id"`
Expand All @@ -90,6 +124,30 @@ type Trace struct {
Tags [][]string `ch:"tags"`
}

type TempoTrace struct {
OID string `ch:"oid"`
TraceID []byte `ch:"trace_id"`
SpanID []byte `ch:"span_id"`
ParentID []byte `ch:"parent_id"`
Name string `ch:"name"`
TimestampNs int64 `ch:"timestamp_ns"`
DurationNs int64 `ch:"duration_ns"`
ServiceName string `ch:"service_name"`
PayloadType int8 `ch:"payload_type"`
Payload string `ch:"payload"`
}

type TempoTraceTag struct {
OID string `ch:"oid"`
Date time.Time `ch:"date"`
Key string `ch:"key"`
Val string `ch:"val"`
TraceID []byte `ch:"trace_id"`
SpanID []byte `ch:"span_id"`
TimestampNs int64 `ch:"timestamp_ns"`
DurationNs int64 `ch:"duration"`
}

// Sample represent sample data
// `CREATE TABLE IF NOT EXISTS samples_v3
// (
Expand Down
103 changes: 103 additions & 0 deletions exporter/qrynexporter/trace_batch_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package qrynexporter

import (
"encoding/hex"
"fmt"
"time"

"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
)

type traceWithTagsBatch struct {
driver.Batch
tagsBatch driver.Batch
}

func (b *traceWithTagsBatch) AppendStruct(v any) error {
ti, ok := v.(*TraceInput)
if !ok {
return fmt.Errorf("invalid data type, expected *Trace, got %T", v)
}
binTraceId, err := unhexAndPad(ti.TraceID, 16)
if err != nil {
return err
}
binParentID, err := unhexAndPad(ti.ParentID, 8)
if err != nil {
return err
}
binSpanID, err := unhexAndPad(ti.SpanID, 8)
if err != nil {
return err
}
trace := &TempoTrace{
OID: "0",
TraceID: binTraceId,
SpanID: binSpanID,
ParentID: binParentID,
Name: ti.Name,
TimestampNs: ti.TimestampNs,
DurationNs: ti.DurationNs,
ServiceName: ti.ServiceName,
PayloadType: ti.PayloadType,
Payload: ti.Payload,
}
err = b.Batch.AppendStruct(trace)
if err != nil {
return err
}
for _, tag := range ti.Tags {
attr := &TempoTraceTag{
OID: "0",
Date: time.Unix(0, trace.TimestampNs).Truncate(time.Hour * 24),
Key: tag[0],
Val: tag[1],
TraceID: binTraceId,
SpanID: binSpanID,
TimestampNs: ti.TimestampNs,
DurationNs: ti.DurationNs,
}
err = b.tagsBatch.AppendStruct(attr)
if err != nil {
return err
}
}
return nil
}

func (b *traceWithTagsBatch) Abort() error {
var errs [2]error
errs[0] = b.Batch.Abort()
errs[1] = b.tagsBatch.Abort()
for _, err := range errs {
if err != nil {
return err
}
}
return nil
}

func (b *traceWithTagsBatch) Send() error {
var errs [2]error
errs[0] = b.Batch.Send()
errs[1] = b.tagsBatch.Send()
for _, err := range errs {
if err != nil {
return err
}
}
return nil
}

func unhexAndPad(s string, size int) ([]byte, error) {
bStr, err := hex.DecodeString(s)
if err != nil {
return nil, err
}
if len(bStr) < size {
res := make([]byte, size)
copy(res[size-len(bStr):], bStr)
return res, nil
}
return bStr[size-len(bStr):], nil
}
52 changes: 40 additions & 12 deletions exporter/qrynexporter/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@ import (
"google.golang.org/protobuf/encoding/protojson"
)

type contextKey string

const (
spanLinkDataFormat = "%s|%s|%s|%s|%d"
spanLinkDataFormat = "%s|%s|%s|%s|%d"
clusterKey contextKey = "cluster"
)

var delegate = &protojson.MarshalOptions{
Expand All @@ -48,8 +51,9 @@ type tracesExporter struct {
logger *zap.Logger
meter metric.Meter

db clickhouse.Conn
cluster bool
db clickhouse.Conn
cluster bool
clientSide bool
}

// newTracesExporter returns a SpanWriter for the database
Expand All @@ -63,10 +67,11 @@ func newTracesExporter(logger *zap.Logger, cfg *Config, set *exporter.Settings)
return nil, err
}
exp := &tracesExporter{
logger: logger,
meter: set.MeterProvider.Meter(typeStr),
db: db,
cluster: cfg.ClusteredClickhouse,
logger: logger,
meter: set.MeterProvider.Meter(typeStr),
db: db,
cluster: cfg.ClusteredClickhouse,
clientSide: cfg.ClusteredClickhouse && cfg.ClientSideTraceProcessing,
}
if err := initMetrics(exp.meter); err != nil {
exp.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error()))
Expand Down Expand Up @@ -163,8 +168,15 @@ func extractScopeTags(il pcommon.InstrumentationScope, tags map[string]string) {
}

func (e *tracesExporter) exportResourceSapns(ctx context.Context, resourceSpans ptrace.ResourceSpansSlice) error {
isCluster := ctx.Value("cluster").(bool)
batch, err := e.db.PrepareBatch(ctx, tracesInputSQL(isCluster))
isCluster := ctx.Value(clusterKey).(bool)
var batch driver.Batch
var err error
if e.clientSide {
batch, err = e.prepareBatchClientSide(ctx)
} else {
batch, err = e.db.PrepareBatch(ctx, tracesInputSQL(isCluster))
}

if err != nil {
return err
}
Expand All @@ -187,9 +199,25 @@ func (e *tracesExporter) exportResourceSapns(ctx context.Context, resourceSpans
return nil
}

func (e *tracesExporter) prepareBatchClientSide(ctx context.Context) (driver.Batch, error) {
batch, err := e.db.PrepareBatch(ctx, TracesV2InputSQL(e.cluster))
if err != nil {
return nil, err
}
subBatch, err := e.db.PrepareBatch(ctx, TracesTagsV2InputSQL(e.cluster))
if err != nil {
batch.Abort()
return nil, err
}
return &traceWithTagsBatch{
Batch: batch,
tagsBatch: subBatch,
}, nil
}

// traceDataPusher implements OTEL exporterhelper.traceDataPusher
func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) error {
_ctx := context.WithValue(ctx, "cluster", e.cluster)
_ctx := context.WithValue(ctx, clusterKey, e.cluster)
start := time.Now()
if err := e.exportResourceSapns(_ctx, td.ResourceSpans()); err != nil {
otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeTraces)))
Expand Down Expand Up @@ -362,7 +390,7 @@ func marshalSpanToJSON(span ptrace.Span, mergedAttributes pcommon.Map) ([]byte,
return delegate.Marshal(otlpSpan)
}

func convertTracesInput(span ptrace.Span, resource pcommon.Resource, serviceName string, tags map[string]string) (*Trace, error) {
func convertTracesInput(span ptrace.Span, resource pcommon.Resource, serviceName string, tags map[string]string) (*TraceInput, error) {
durationNano := uint64(span.EndTimestamp() - span.StartTimestamp())
tags = aggregateSpanTags(span, tags)
tags["service.name"] = serviceName
Expand All @@ -379,7 +407,7 @@ func convertTracesInput(span ptrace.Span, resource pcommon.Resource, serviceName
return nil, fmt.Errorf("failed to marshal span: %w", err)
}

trace := &Trace{
trace := &TraceInput{
TraceID: span.TraceID().String(),
SpanID: span.SpanID().String(),
ParentID: span.ParentSpanID().String(),
Expand Down

0 comments on commit 0b1cea3

Please sign in to comment.