Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Graphite ingestion #1310

Merged
merged 70 commits into from
Jan 25, 2019
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
c3c4306
dont use statsdex code and cleanup
Jan 19, 2019
2f59a0b
address feedback
Jan 21, 2019
d60d2fc
wip
Jan 21, 2019
54b6e01
basic skeleton and generation of tags from name
Jan 21, 2019
9a156ca
wip
Jan 21, 2019
7dabba8
Add benchmark for generateTagsFromName and reuse names as much as pos…
Jan 21, 2019
9a08c76
add comments
Jan 21, 2019
689cd95
add space
Jan 21, 2019
cf50185
Add downsampleandwrite + tests
Jan 22, 2019
10dc790
refactor remote write to use new downsamplerAndWriter
Jan 22, 2019
176d862
handle nil downsampler
Jan 22, 2019
ffd52ba
add test for case where downsampler is nil
Jan 22, 2019
8dba5d4
use downsamplerAndWriter in carbon ingester
Jan 22, 2019
1d25d1b
wip
Jan 22, 2019
3f6e5eb
wip
Jan 23, 2019
caf97a0
update test
Jan 24, 2019
761c2a1
wire up carbon ingestion server
Jan 24, 2019
a6a04ca
improve logging and bug fixes
Jan 24, 2019
a2e7898
Fix carbon ingestion tests
Jan 24, 2019
9fef8ae
remove print
Jan 24, 2019
632ba0b
renable tests
Jan 24, 2019
8c1c419
update start_m3 script
Jan 24, 2019
19503ab
Share code among docker integration tests
Jan 24, 2019
f229bb7
Add new docker integration test to makefile
Jan 24, 2019
d2c4b51
fix broken test
Jan 24, 2019
d219402
fix imports
Jan 24, 2019
eabf575
making writing to unaggregated optional
Jan 24, 2019
9c84978
disable unaggregated writes in carbon ingester
Jan 24, 2019
977b425
fix typo
Jan 24, 2019
7b14c97
refactor interfaces
Jan 24, 2019
5f6c303
remove todo
Jan 24, 2019
9ff3a6a
refactor metrics struct
Jan 24, 2019
0607d6c
add docstring
Jan 24, 2019
74bc426
make prom TS iter more efficient
Jan 24, 2019
be483c9
refactor write method
Jan 24, 2019
9c36aac
fix tests and add comment
Jan 24, 2019
2e7ec1c
disable test
Jan 24, 2019
0b83af6
make carbon enabled by default
Jan 24, 2019
2a40912
add TODO
Jan 24, 2019
4c3d7ae
fix write code
Jan 24, 2019
4f1582e
switch from nanos to durations
Jan 24, 2019
63b65fc
validate options
Jan 24, 2019
c2a79a1
factor out method for better error handling
Jan 24, 2019
ba5d5f2
add constant
Jan 24, 2019
1d80dbd
more and better logs
Jan 24, 2019
6cb50f7
camelcase yaml keys
Jan 24, 2019
02c48c6
fix error message
Jan 24, 2019
e11d56a
fix nit
Jan 24, 2019
9b055d8
add const
Jan 24, 2019
9520f62
add newline
Jan 24, 2019
2ff51c8
remove unused file
Jan 24, 2019
767861a
generate some bad lines in the ingestion carbon test
Jan 24, 2019
dcf2151
fix compilation issue
Jan 24, 2019
cd32d86
add more tests
Jan 24, 2019
b9b2013
improve test to try more error paths
Jan 24, 2019
7e11360
add timeouts for writes
Jan 24, 2019
2d08dcb
add metrics
Jan 24, 2019
c83d2c3
add metrics
Jan 24, 2019
bb1e055
delete TODO
Jan 24, 2019
67de87b
fix lint issues
Jan 25, 2019
7d4504d
fix server.go
Jan 25, 2019
0fdac03
fix server.go
Jan 25, 2019
94579aa
fix broken test
Jan 25, 2019
e35158a
fix cleanup code
Jan 25, 2019
4807443
address config feedback
Jan 25, 2019
e27716f
fix bbroken integration test
Jan 25, 2019
51dbe03
reduce shards in prop test
Jan 25, 2019
379af5e
fix broken integration test
Jan 25, 2019
b9ff591
close readers when done
Jan 25, 2019
29c5a88
fix unit test
Jan 25, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 212 additions & 0 deletions src/cmd/services/m3coordinator/ingest/carbon/ingest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package ingestcarbon

import (
"bytes"
"context"
"errors"
"fmt"
"net"
"sync"

"github.com/m3db/m3/src/cmd/services/m3coordinator/ingest"
"github.com/m3db/m3/src/metrics/carbon"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/ts"
"github.com/m3db/m3x/instrument"
m3xserver "github.com/m3db/m3x/server"
xsync "github.com/m3db/m3x/sync"
xtime "github.com/m3db/m3x/time"

"github.com/uber-go/tally"
)

var (
// Used for parsing carbon names into tags.
carbonSeparatorByte = byte('.')
carbonSeparatorBytes = []byte{carbonSeparatorByte}

// Number of pre-formatted key names to generate in the init() function.
numPreFormattedKeyNames = 100
// Should never be modified after init().
preFormattedKeyNames = [][]byte{}

errCannotGenerateTagsFromEmptyName = errors.New("cannot generate tags from empty name")
)

// Options configures the ingester.
type Options struct {
InstrumentOptions instrument.Options
WorkerPool xsync.PooledWorkerPool
}

// NewIngester returns an ingester for carbon metrics.
func NewIngester(
downsamplerAndWriter ingest.DownsamplerAndWriter,
opts Options,
) m3xserver.Handler {
return &ingester{
downsamplerAndWriter: downsamplerAndWriter,
opts: opts,
}
}

type ingester struct {
downsamplerAndWriter ingest.DownsamplerAndWriter
opts Options
conn net.Conn
}

func (i *ingester) Handle(conn net.Conn) {
if i.conn != nil {
// TODO: Something
}
i.conn = conn

var (
wg = sync.WaitGroup{}
s = carbon.NewScanner(conn)
)

fmt.Println("start scan")
for s.Scan() {
_, timestamp, value := s.Metric()

fmt.Println("da fuq")
wg.Add(1)
if i.opts.WorkerPool != nil {
i.opts.WorkerPool.Go(func() {
// TODO: Real context?
datapoints := []ts.Datapoint{{Timestamp: timestamp, Value: value}}
i.downsamplerAndWriter.Write(
context.Background(), models.Tags{}, datapoints, xtime.Second)
wg.Done()
})
} else {
go func() {
// TODO: Real context?
datapoints := []ts.Datapoint{{Timestamp: timestamp, Value: value}}
fmt.Println("hmm:", datapoints)
i.downsamplerAndWriter.Write(
context.Background(), models.Tags{}, datapoints, xtime.Second)
fmt.Println("hmm done:", datapoints)
wg.Done()
}()
}

// i.metrics.malformedCounter.Inc(int64(s.MalformedCount))
s.MalformedCount = 0
}
fmt.Println("end scan")

if err := s.Err(); err != nil {
fmt.Println(err)
}
// Wait for all outstanding writes
wg.Wait()
}

func (i *ingester) Close() {
// TODO: Log error
// TODO: Not sure this will do anything?
i.conn.Close()
}

func newCarbonHandlerMetrics(m tally.Scope) carbonHandlerMetrics {
writesScope := m.SubScope("writes")
return carbonHandlerMetrics{
unresolvedIDs: writesScope.Counter("ids-policy-unresolved"),
malformedCounter: writesScope.Counter("malformed"),
readTimeLatency: writesScope.Timer("read-time-latency"),
}
}

type carbonHandlerMetrics struct {
unresolvedIDs tally.Counter
malformedCounter tally.Counter
readTimeLatency tally.Timer
}

func generateTagsFromName(name []byte) (models.Tags, error) {
if len(name) == 0 {
return models.Tags{}, errCannotGenerateTagsFromEmptyName
}

var (
numTags = bytes.Count(name, carbonSeparatorBytes) + 1
tags = make([]models.Tag, 0, numTags)
)

startIdx := 0
tagNum := 0
for i, charByte := range name {
if charByte == carbonSeparatorByte {
if i+1 < len(name) && name[i+1] == carbonSeparatorByte {
return models.Tags{}, fmt.Errorf("carbon metric: %s has duplicate separator", string(name))
}

tags = append(tags, models.Tag{
Name: getOrGenerateKeyName(tagNum),
Value: name[startIdx:i],
})
startIdx = i + 1
tagNum++
}
}

// Write out the final tag since the for loop above will miss anything
// after the final separator. Note, that we make sure that the final
// character in the name is not the separator because in that case there
// would be no additional tag to add. I.E if the input was:
// foo.bar.baz
// then the for loop would append foo and bar, but we would still need to
// append baz, however, if the input was:
// foo.bar.baz.
// then the foor loop would have appended foo, bar, and baz already.
if name[len(name)-1] != carbonSeparatorByte {
tags = append(tags, models.Tag{
Name: getOrGenerateKeyName(tagNum),
Value: name[startIdx:],
})
}

return models.Tags{Tags: tags}, nil
}

func getOrGenerateKeyName(idx int) []byte {
if idx < len(preFormattedKeyNames) {
return preFormattedKeyNames[idx]
}

return []byte(fmt.Sprintf("__$%d__", idx))
}

func generateKeyName(idx int) []byte {
return []byte(fmt.Sprintf("__$%d__", idx))
}

func init() {
for i := 0; i < numPreFormattedKeyNames; i++ {
keyName := generateKeyName(i)
preFormattedKeyNames = append(preFormattedKeyNames, keyName)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package ingestcarbon

import (
"testing"

"github.com/m3db/m3/src/query/models"
)

var benchmarkGenerateTagsSink models.Tags

func BenchmarkGenerateTagsFromName(b *testing.B) {
var (
testName = []byte("foo.bar.baz.bax")
err error
)

for i := 0; i < b.N; i++ {
benchmarkGenerateTagsSink, err = generateTagsFromName(testName)
if err != nil {
panic(err)
}
}
}
Loading