Skip to content

Commit

Permalink
Fix panic when max_span_count is reached, add counter metric (#104)
Browse files Browse the repository at this point in the history
* Fix panic when max_span_count is reached, add counter metric

Panic seen in `ghcr.io/jaegertracing/jaeger-clickhouse:0.8.0` with `log-level=debug`:
```
panic: undefined type *clickhousespanstore.WriteWorker return from workerHeap

goroutine 20 [running]:
github.com/jaegertracing/jaeger-clickhouse/storage/clickhousespanstore.(*WriteWorkerPool).CleanWorkers(0xc00020c300, 0xc00008eefc)
	github.com/jaegertracing/jaeger-clickhouse/storage/clickhousespanstore/pool.go:95 +0x199
github.com/jaegertracing/jaeger-clickhouse/storage/clickhousespanstore.(*WriteWorkerPool).Work(0xc00020c300)
	github.com/jaegertracing/jaeger-clickhouse/storage/clickhousespanstore/pool.go:50 +0x15e
created by github.com/jaegertracing/jaeger-clickhouse/storage/clickhousespanstore.(*SpanWriter).backgroundWriter
	github.com/jaegertracing/jaeger-clickhouse/storage/clickhousespanstore/writer.go:89 +0x226
```

Also adds metric counter and logging to surface when things are hitting backpressure.

Signed-off-by: Nick Parker <[email protected]>

* Potential fix for deadlock: Avoid holding mutex while waiting on close

Signed-off-by: Nick Parker <[email protected]>

* Discard new batches instead of waiting for old batches to finish

The current limit logic can result in a stall where `worker.CLose()` never returns due to errors being returned from ClickHouse.
This switches to a simpler system of discarding new work when the limit is reached, ensuring that we don't get backed up indefinitely in the event of a long outage.

Also moves the count of pending spans to the parent pool:
- Avoids race conditions where new work can be started before it's added to the count
- Mutexing around the count is no longer needed

Signed-off-by: Nick Parker <[email protected]>

* Include arbitrary worker_id in logs to differentiate between retry loops

Signed-off-by: Nick Parker <[email protected]>

* Fix lint

Signed-off-by: Nick Parker <[email protected]>
  • Loading branch information
nickbp authored Jan 25, 2022
1 parent a79c103 commit 34ad236
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 87 deletions.
74 changes: 39 additions & 35 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -1,35 +1,39 @@
address: tcp://some-clickhouse-server:9000
# When empty the embedded scripts from sqlscripts directory are used
init_sql_scripts_dir:
# Maximal amount of spans that can be written at the same time. Default 10_000_000
max_span_count:
# Batch write size. Default 10_000.
batch_write_size:
# Batch flush interval. Default 5s.
batch_flush_interval:
# Encoding of stored data. Either json or protobuf. Default json.
encoding:
# Path to CA TLS certificate.
ca_file:
# Username for connection. Default is "default".
username:
# Password for connection.
password:
# Database name. The database has to be created manually before Jaeger starts. Default is "default".
database:
# Endpoint for scraping prometheus metrics. Default localhost:9090.
metrics_endpoint: localhost:9090
# Whether to use sql scripts supporting replication and sharding.
# Replication can be used only on database with Atomic engine.
# Default false.
replication:
# Table with spans. Default "jaeger_spans_local" or "jaeger_spans" when replication is enabled.
spans_table:
# Span index table. Default "jaeger_index_local" or "jaeger_index" when replication is enabled.
spans_index_table:
# Operations table. Default "jaeger_operations_local" or "jaeger_operations" when replication is enabled.
operations_table:
# TTL for data in tables in days. If 0, no TTL is set. Default 0.
ttl:
# The maximum number of spans to fetch per trace. If 0, no limits is set. Default 0.
max_num_spans:
address: tcp://some-clickhouse-server:9000
# When empty the embedded scripts from sqlscripts directory are used
init_sql_scripts_dir:
# Maximal amount of spans that can be pending writes at a time.
# New spans exceeding this limit will be discarded,
# keeping memory in check if there are issues writing to ClickHouse.
# Check the "jaeger_clickhouse_discarded_spans" metric to keep track of discards.
# If 0, no limit is set. Default 10_000_000.
max_span_count:
# Batch write size. Default 10_000.
batch_write_size:
# Batch flush interval. Default 5s.
batch_flush_interval:
# Encoding of stored data. Either json or protobuf. Default json.
encoding:
# Path to CA TLS certificate.
ca_file:
# Username for connection to ClickHouse. Default is "default".
username:
# Password for connection to ClickHouse.
password:
# ClickHouse database name. The database must be created manually before Jaeger starts. Default is "default".
database:
# Endpoint for serving prometheus metrics. Default localhost:9090.
metrics_endpoint: localhost:9090
# Whether to use sql scripts supporting replication and sharding.
# Replication can be used only on database with Atomic engine.
# Default false.
replication:
# Table with spans. Default "jaeger_spans_local" or "jaeger_spans" when replication is enabled.
spans_table:
# Span index table. Default "jaeger_index_local" or "jaeger_index" when replication is enabled.
spans_index_table:
# Operations table. Default "jaeger_operations_local" or "jaeger_operations" when replication is enabled.
operations_table:
# TTL for data in tables in days. If 0, no TTL is set. Default 0.
ttl:
# The maximum number of spans to fetch per trace. If 0, no limit is set. Default 0.
max_num_spans:
93 changes: 63 additions & 30 deletions storage/clickhousespanstore/pool.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,25 @@
package clickhousespanstore

import (
"container/heap"
"fmt"
"math"
"sync"

"github.com/prometheus/client_golang/prometheus"

"github.com/jaegertracing/jaeger/model"
)

var (
numDiscardedSpans = prometheus.NewCounter(prometheus.CounterOpts{
Name: "jaeger_clickhouse_discarded_spans",
Help: "Count of spans that have been discarded due to pending writes exceeding max_span_count",
})
numPendingSpans = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "jaeger_clickhouse_pending_spans",
Help: "Number of spans that are currently pending, counts against max_span_count",
})
)

// WriteWorkerPool is a worker pool for writing batches of spans.
// Given a new batch, WriteWorkerPool creates a new WriteWorker.
// If the number of currently processed spans if more than maxSpanCount, then the oldest worker is removed.
Expand All @@ -18,14 +30,19 @@ type WriteWorkerPool struct {
done sync.WaitGroup
batches chan []*model.Span

totalSpanCount int
maxSpanCount int
mutex sync.Mutex
workers workerHeap
workerDone chan *WriteWorker
maxSpanCount int
mutex sync.Mutex
workers workerHeap
workerDone chan *WriteWorker
}

var registerPoolMetrics sync.Once

func NewWorkerPool(params *WriteParams, maxSpanCount int) WriteWorkerPool {
registerPoolMetrics.Do(func() {
prometheus.MustRegister(numDiscardedSpans, numPendingSpans)
})

return WriteWorkerPool{
params: params,
finish: make(chan bool),
Expand All @@ -42,24 +59,44 @@ func NewWorkerPool(params *WriteParams, maxSpanCount int) WriteWorkerPool {

func (pool *WriteWorkerPool) Work() {
finish := false

nextWorkerID := int32(1)
pendingSpanCount := 0
for {
// Initialize to zero, or update value from previous loop
numPendingSpans.Set(float64(pendingSpanCount))

pool.done.Add(1)
select {
case batch := <-pool.batches:
pool.CleanWorkers(len(batch))
worker := WriteWorker{
params: pool.params,

counter: &pool.totalSpanCount,
mutex: &pool.mutex,
finish: make(chan bool),
workerDone: pool.workerDone,
done: sync.WaitGroup{},
batchSize := len(batch)
if pool.checkLimit(pendingSpanCount, batchSize) {
// Limit disabled or batch fits within limit, write the batch.
worker := WriteWorker{
workerID: nextWorkerID,

params: pool.params,
batch: batch,

finish: make(chan bool),
workerDone: pool.workerDone,
done: sync.WaitGroup{},
}
if nextWorkerID == math.MaxInt32 {
nextWorkerID = 1
} else {
nextWorkerID++
}
pool.workers.AddWorker(&worker)
pendingSpanCount += batchSize
go worker.Work()
} else {
// Limit exceeded, complain
numDiscardedSpans.Add(float64(batchSize))
pool.params.logger.Error("Discarding batch of spans due to exceeding pending span count", "batch_size", batchSize, "pending_span_count", pendingSpanCount, "max_span_count", pool.maxSpanCount)
}
pool.workers.AddWorker(&worker)
go worker.Work(batch)
case worker := <-pool.workerDone:
// The worker has finished, subtract its work from the count and clean it from the heap.
pendingSpanCount -= len(worker.batch)
if err := pool.workers.RemoveWorker(worker); err != nil {
pool.params.logger.Error("could not remove worker", "worker", worker, "error", err)
}
Expand All @@ -84,16 +121,12 @@ func (pool *WriteWorkerPool) CLose() {
pool.done.Wait()
}

func (pool *WriteWorkerPool) CleanWorkers(batchSize int) {
pool.mutex.Lock()
if pool.totalSpanCount+batchSize > pool.maxSpanCount {
earliest := heap.Pop(pool.workers)
switch worker := earliest.(type) {
case WriteWorker:
worker.CLose()
default:
panic(fmt.Sprintf("undefined type %T return from workerHeap", worker))
}
// checkLimit returns whether batchSize fits within the maxSpanCount
func (pool *WriteWorkerPool) checkLimit(pendingSpanCount int, batchSize int) bool {
if pool.maxSpanCount <= 0 {
return true
}
pool.mutex.Unlock()

// Check limit, add batchSize if within limit
return pendingSpanCount+batchSize <= pool.maxSpanCount
}
32 changes: 13 additions & 19 deletions storage/clickhousespanstore/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,27 @@ var delays = []int{2, 3, 5, 8}
// Given a batch of spans, WriteWorker attempts to write them to database.
// Interval in seconds between attempts changes due to delays slice, then it remains the same as the last value in delays.
type WriteWorker struct {
// workerID is an arbitrary identifier for keeping track of this worker in logs
workerID int32

params *WriteParams
batch []*model.Span

counter *int
mutex *sync.Mutex
finish chan bool
workerDone chan *WriteWorker
done sync.WaitGroup
}

func (worker *WriteWorker) Work(
batch []*model.Span,
) {
func (worker *WriteWorker) Work() {
worker.done.Add(1)
worker.mutex.Lock()
*worker.counter += len(batch)
worker.mutex.Unlock()

defer worker.done.Done()

// TODO: look for specific error(connection refused | database error)
if err := worker.writeBatch(batch); err != nil {
worker.params.logger.Error("Could not write a batch of spans", "error", err)
if err := worker.writeBatch(worker.batch); err != nil {
worker.params.logger.Error("Could not write a batch of spans", "error", err, "worker_id", worker.workerID)
} else {
worker.close(len(batch))
worker.close()
return
}
attempt := 0
Expand All @@ -51,13 +48,13 @@ func (worker *WriteWorker) Work(
timer := time.After(currentDelay)
select {
case <-worker.finish:
worker.close(len(batch))
worker.close()
return
case <-timer:
if err := worker.writeBatch(batch); err != nil {
worker.params.logger.Error("Could not write a batch of spans", "error", err)
if err := worker.writeBatch(worker.batch); err != nil {
worker.params.logger.Error("Could not write a batch of spans", "error", err, "worker_id", worker.workerID)
} else {
worker.close(len(batch))
worker.close()
return
}
}
Expand All @@ -76,10 +73,7 @@ func (worker *WriteWorker) getCurrentDelay(attempt *int, delay time.Duration) ti
return time.Duration(int64(delays[*attempt-1]) * delay.Nanoseconds())
}

func (worker *WriteWorker) close(batchSize int) {
worker.mutex.Lock()
*worker.counter -= batchSize
worker.mutex.Unlock()
func (worker *WriteWorker) close() {
worker.workerDone <- worker
}

Expand Down
4 changes: 2 additions & 2 deletions storage/clickhousespanstore/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type SpanWriter struct {
done sync.WaitGroup
}

var registerMetrics sync.Once
var registerWriterMetrics sync.Once
var _ spanstore.Writer = (*SpanWriter)(nil)

// NewSpanWriter returns a SpanWriter for the database
Expand Down Expand Up @@ -78,7 +78,7 @@ func NewSpanWriter(
}

func (w *SpanWriter) registerMetrics() {
registerMetrics.Do(func() {
registerWriterMetrics.Do(func() {
prometheus.MustRegister(numWritesWithBatchSize)
prometheus.MustRegister(numWritesWithFlushInterval)
})
Expand Down
6 changes: 5 additions & 1 deletion storage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ type Configuration struct {
BatchWriteSize int64 `yaml:"batch_write_size"`
// Batch flush interval. Default is 5s.
BatchFlushInterval time.Duration `yaml:"batch_flush_interval"`
// Maximal amount of spans that can be written at the same time. Default is 10_000_000.
// Maximal amount of spans that can be pending writes at a time.
// New spans exceeding this limit will be discarded,
// keeping memory in check if there are issues writing to ClickHouse.
// Check the "jaeger_clickhouse_discarded_spans" metric to keep track of discards.
// Default 10_000_000, or disable the limit entirely by setting to 0.
MaxSpanCount int `yaml:"max_span_count"`
// Encoding either json or protobuf. Default is json.
Encoding EncodingType `yaml:"encoding"`
Expand Down

0 comments on commit 34ad236

Please sign in to comment.