Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Sarama update #906

Merged
merged 10 commits into from
May 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 0 additions & 17 deletions dashboard.json
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,6 @@
{
"alias": "/unknown/",
"color": "#890F02"
},
{
"alias": "/pressure\\./",
"fill": 0,
"lines": true,
"linewidth": 1,
"nullPointMode": "connected",
"points": false,
"yaxis": 2
}
],
"span": 3,
Expand Down Expand Up @@ -171,14 +162,6 @@
{
"refId": "G",
"target": "alias(sumSeries(perSecond(metrictank.stats.$environment.$instance.tank.metrics_reordered.counter32)), 'reordered')"
},
{
"refId": "H",
"target": "aliasByNode(averageSeries(scale(perSecond(metrictank.stats.$environment.$instance.input.*.pressure.tank.counter32), 1e-9)), 6, 7)"
},
{
"refId": "I",
"target": "aliasByNode(averageSeries(scale(perSecond(metrictank.stats.$environment.$instance.input.*.pressure.idx.counter32), 1e-9)), 6, 7)"
}
],
"thresholds": [],
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-dev-custom-cfg-kafka/gw/tsdb-gw.ini
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ api-auth-plugin = file
kafka-tcp-addr = kafka:9092

metrics-topic = mdm
metrics-kafka-comp = none
metrics-kafka-comp = snappy
metrics-publish = true
metrics-partition-scheme = bySeries
metrics-max-in-flight = 1000000
Expand Down
10 changes: 5 additions & 5 deletions docs/operations.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ this will give instant insights in all the performance metrics of Metrictank.

* process is running and listening on its http port (and carbon port, if you enabled it) (use your monitoring agent of choice for this)
* `metrictank.stats.$environment.$instance.cluster.primary.gauge1`: assure you have exactly 1 primary node (saving to cassandra) or as many as you have shardgroups, for sharded setups.
* `metrictank.stats.$environment.$instance.input.kafka-mdm.partition.*.lag.gauge64`: kafka lag, depending on your throughput you can always expect some lag, but it should be in the thousands not millions.
* `metrictank.stats.$environment.$instance.store.cassandra.write_queue.*.items.{min,max}.gauge32`: make sure the write queues are able to drain. For primary nodes that are also used for qureies, assert the write queues don't reach capacity, otherwise ingest will block and data will lag behind in queries.
* `metrictank.stats.$environment.$instance.input.*.pressure.idx.counter32`: index pressure as a ns counter, rise is between 0 and 10^9 each second. Alert if increase is more than 4x10^8 each second: this would signify the index can't keep up with indexing new data and is blocking ingestion pipeline.
* `metrictank.stats.$environment.$instance.input.*.metricpoint.unknown.counter32`: counter of MetricPoint messages for an unknown metric, will be dropped.
* `metrictank.stats.$environment.$instance.input.*.metrics_decode_err.counter32`: counter of incoming data that could not be decoded.
* `metrictank.stats.$environment.$instance.input.*.*.invalid.counter32`: counter of incoming data that could not be decoded.
* `metrictank.stats.$environment.$instance.tank.metrics_too_old.counter32`: counter of points that are too old and can't be added.
* `metrictank.stats.$environment.$instance.api.request_handle.latency.*.gauge32`: shows how fast/slow metrictank responds to http queries
Expand Down Expand Up @@ -101,10 +102,9 @@ If metrictank ingestion speed is lower than expected, or decreased for seemingly

1) [Indexing of metadata](https://github.com/grafana/metrictank/blob/master/docs/metadata.md) puts backpressure on the ingest stream.
New metrics (including metrics with new settings such as interval, unit, or tags) need to get indexed into:
* an in-memory index (which seems to always be snappy and not exert any backpressure)
* Cassandra - if enabled - which may not keep up with throughput, resulting in backpressure, and a lowered ingestion rate.
See the `pressure.idx` metric in the 'metrics in' graph of the metrictank dashboard.
For more details, look at the various index stats further down the dashboard.
* the in-memory index (which generally should not exert backpressure)
* Cassandra (index) - if enabled - which may not keep up with throughput, resulting in backpressure, and a lowered ingestion rate.
Check the index stats on the dashboard.

2) Saving of chunks. Metrictank saves chunks at the rhythm of your [chunkspan](https://github.com/grafana/metrictank/blob/master/docs/memory-server.md) (10 minutes in the default docker image)
When this happens, it will need to save a bunch of chunks and
Expand Down
14 changes: 0 additions & 14 deletions input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package input

import (
"fmt"
"time"

"gopkg.in/raintank/schema.v1"
"gopkg.in/raintank/schema.v1/msg"
Expand All @@ -30,8 +29,6 @@ type DefaultHandler struct {
invalidMD *stats.Counter32
invalidMP *stats.Counter32
unknownMP *stats.Counter32
pressureIdx *stats.Counter32
pressureTank *stats.Counter32

metrics mdata.Metrics
metricIndex idx.MetricIndex
Expand All @@ -45,8 +42,6 @@ func NewDefaultHandler(metrics mdata.Metrics, metricIndex idx.MetricIndex, input
invalidMD: stats.NewCounter32(fmt.Sprintf("input.%s.metricdata.invalid", input)),
invalidMP: stats.NewCounter32(fmt.Sprintf("input.%s.metricpoint.invalid", input)),
unknownMP: stats.NewCounter32(fmt.Sprintf("input.%s.metricpoint.unknown", input)),
pressureIdx: stats.NewCounter32(fmt.Sprintf("input.%s.pressure.idx", input)),
pressureTank: stats.NewCounter32(fmt.Sprintf("input.%s.pressure.tank", input)),

metrics: metrics,
metricIndex: metricIndex,
Expand All @@ -67,20 +62,15 @@ func (in DefaultHandler) ProcessMetricPoint(point schema.MetricPoint, format msg
return
}

pre := time.Now()
archive, _, ok := in.metricIndex.Update(point, partition)
in.pressureIdx.Add(int(time.Since(pre).Nanoseconds()))

if !ok {
in.unknownMP.Inc()
return
}

pre = time.Now()
m := in.metrics.GetOrCreate(point.MKey, archive.SchemaId, archive.AggId)
m.Add(point.Time, point.Value)
in.pressureTank.Add(int(time.Since(pre).Nanoseconds()))

}

// ProcessMetricData assures the data is stored and the metadata is in the index
Expand All @@ -105,12 +95,8 @@ func (in DefaultHandler) ProcessMetricData(md *schema.MetricData, partition int3
return
}

pre := time.Now()
archive, _, _ := in.metricIndex.AddOrUpdate(mkey, md, partition)
in.pressureIdx.Add(int(time.Since(pre).Nanoseconds()))

pre = time.Now()
m := in.metrics.GetOrCreate(mkey, archive.SchemaId, archive.AggId)
m.Add(uint32(md.Time), md.Value)
in.pressureTank.Add(int(time.Since(pre).Nanoseconds()))
}
37 changes: 27 additions & 10 deletions mdata/aggmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,35 @@ func (ms *AggMetrics) Get(key schema.MKey) (Metric, bool) {
}

func (ms *AggMetrics) GetOrCreate(key schema.MKey, schemaId, aggId uint16) Metric {
ms.Lock()

// in the most common case, it's already there and an Rlock is all we need
ms.RLock()
m, ok := ms.Metrics[key]
if !ok {
k := schema.AMKey{
MKey: key,
}
agg := Aggregations.Get(aggId)
schema := Schemas.Get(schemaId)
m = NewAggMetric(ms.store, ms.cachePusher, k, schema.Retentions, schema.ReorderWindow, &agg, ms.dropFirstChunk)
ms.Metrics[key] = m
metricsActive.Set(len(ms.Metrics))
ms.RUnlock()
if ok {
return m
}

k := schema.AMKey{
MKey: key,
}

agg := Aggregations.Get(aggId)
schema := Schemas.Get(schemaId)

// if it wasn't there, get the write lock and prepare to add it
// but first we need to check again if someone has added it in
// the meantime (quite rare, but anyway)
ms.Lock()
m, ok = ms.Metrics[key]
if ok {
ms.Unlock()
return m
}
m = NewAggMetric(ms.store, ms.cachePusher, k, schema.Retentions, schema.ReorderWindow, &agg, ms.dropFirstChunk)
ms.Metrics[key] = m
active := len(ms.Metrics)
ms.Unlock()
metricsActive.Set(active)
return m
}
6 changes: 5 additions & 1 deletion scripts/build.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
#!/bin/bash

set -e


# Find the directory we exist within
DIR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
cd ${DIR}/..
Expand All @@ -25,7 +29,7 @@ OUTPUT=$BUILDDIR/metrictank
if [ "$1" == "-race" ]
then
set -x
CGO_ENABLED=1 go build -race -ldflags "-X main.gitHash=$GITVERSION" -o $OUTPUT
go build -race -ldflags "-X main.gitHash=$GITVERSION" -o $OUTPUT
else
set -x
go build -ldflags "-X main.gitHash=$GITVERSION" -o $OUTPUT
Expand Down
31 changes: 31 additions & 0 deletions vendor/github.com/Shopify/sarama/.github/CONTRIBUTING.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions vendor/github.com/Shopify/sarama/.github/ISSUE_TEMPLATE.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions vendor/github.com/Shopify/sarama/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 10 additions & 8 deletions vendor/github.com/Shopify/sarama/.travis.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading