Skip to content

Commit

Permalink
[BEAM-11217] Implemented metrics filtering (apache#15482)
Browse files Browse the repository at this point in the history
  • Loading branch information
riteshghorse authored Oct 1, 2021
1 parent e4b400b commit d6b9156
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 18 deletions.
3 changes: 0 additions & 3 deletions sdks/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,14 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // TODO(danoliveira): Fully replace this with google.golang.org/protobuf
github.com/golang/snappy v0.0.4 // indirect
github.com/google/btree v1.0.0 // indirect
github.com/google/go-cmp v0.5.6
github.com/google/martian/v3 v3.2.1 // indirect
github.com/google/uuid v1.3.0
github.com/hashicorp/golang-lru v0.5.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/linkedin/goavro v2.1.0+incompatible
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/nightlyone/lockfile v1.0.0
github.com/spf13/cobra v1.2.1
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 // indirect
golang.org/x/net v0.0.0-20210423184538-5f58ad60dda6
golang.org/x/oauth2 v0.0.0-20210628180205-a41e5a781914
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect
Expand Down
65 changes: 64 additions & 1 deletion sdks/go/pkg/beam/core/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,40 @@ func (mr Results) AllMetrics() QueryResults {
return QueryResults{mr.counters, mr.distributions, mr.gauges}
}

// TODO(BEAM-11217): Implement Query(Filter) and metrics filtering
// TODO(BEAM-11217): Implement querying metrics by DoFn

// SingleResult interface facilitates metrics query filtering methods.
type SingleResult interface {
Name() string
Namespace() string
}

// Query allows metrics querying with filter. The filter takes the form of predicate function. Example:
// qr = pr.Metrics().Query(func(sr metrics.SingleResult) bool {
// return sr.Namespace() == test.namespace
// })
func (mr Results) Query(f func(SingleResult) bool) QueryResults {
counters := []CounterResult{}
distributions := []DistributionResult{}
gauges := []GaugeResult{}

for _, counter := range mr.counters {
if f(counter) {
counters = append(counters, counter)
}
}
for _, distribution := range mr.distributions {
if f(distribution) {
distributions = append(distributions, distribution)
}
}
for _, gauge := range mr.gauges {
if f(gauge) {
gauges = append(gauges, gauge)
}
}
return QueryResults{counters, distributions, gauges}
}

// QueryResults is the result of a query. Allows accessing all of the
// metrics that matched the filter.
Expand Down Expand Up @@ -529,6 +562,16 @@ func (r CounterResult) Result() int64 {
return r.Attempted
}

// Name returns the Name of this Counter.
func (r CounterResult) Name() string {
return r.Key.Name
}

// Namespace returns the Namespace of this Counter.
func (r CounterResult) Namespace() string {
return r.Key.Namespace
}

// MergeCounters combines counter metrics that share a common key.
func MergeCounters(
attempted map[StepKey]int64,
Expand Down Expand Up @@ -571,6 +614,16 @@ func (r DistributionResult) Result() DistributionValue {
return r.Attempted
}

// Name returns the Name of this Distribution.
func (r DistributionResult) Name() string {
return r.Key.Name
}

// Namespace returns the Namespace of this Distribution.
func (r DistributionResult) Namespace() string {
return r.Key.Namespace
}

// MergeDistributions combines distribution metrics that share a common key.
func MergeDistributions(
attempted map[StepKey]DistributionValue,
Expand Down Expand Up @@ -613,6 +666,16 @@ func (r GaugeResult) Result() GaugeValue {
return r.Attempted
}

// Name returns the Name of this Gauge.
func (r GaugeResult) Name() string {
return r.Key.Name
}

// Namespace returns the Namespace of this Gauge.
func (r GaugeResult) Namespace() string {
return r.Key.Namespace
}

// StepKey uniquely identifies a metric within a pipeline graph.
type StepKey struct {
Step, Name, Namespace string
Expand Down
18 changes: 13 additions & 5 deletions sdks/go/test/integration/wordcount/wordcount.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ import (
)

var (
wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
empty = beam.NewCounter("extract", "emptyLines")
lineLen = beam.NewDistribution("extract", "lineLenDistro")
wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
empty = beam.NewCounter("extract", "emptyLines")
lineLen = beam.NewDistribution("extract", "lineLenDistro")
smallWords = beam.NewCounter("extract", "smallWords")
)

// CountWords is a composite transform that counts the words of a PCollection
Expand All @@ -56,6 +57,9 @@ func extractFn(ctx context.Context, line string, emit func(string)) {
empty.Inc(ctx, 1)
}
for _, word := range wordRE.FindAllString(line, -1) {
if len(word) < 6 {
smallWords.Inc(ctx, 1)
}
emit(word)
}
}
Expand All @@ -74,8 +78,12 @@ func WordCount(glob, hash string, size int) *beam.Pipeline {
p, s := beam.NewPipelineWithRoot()

in := textio.Read(s, glob)
WordCountFromPCol(s, in, hash, size)
return p
}

// WordCountFromPCol counts the words from a PCollection and validates it.
func WordCountFromPCol(s beam.Scope, in beam.PCollection, hash string, size int) {
out := Format(s, CountWords(s, in))
passert.Hash(s, out, "out", hash, size)

return p
}
52 changes: 43 additions & 9 deletions sdks/go/test/integration/wordcount/wordcount_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import (
"strings"
"testing"

"github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/memfs"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/samza"
Expand All @@ -30,16 +31,20 @@ import (

func TestWordCount(t *testing.T) {
tests := []struct {
lines []string
words int
hash string
lines []string
words int
hash string
smallWordsCount int64
lineLen metrics.DistributionValue
}{
{
[]string{
"foo",
},
1,
"6zZtmVTet7aIhR3wmPE8BA==",
1,
metrics.DistributionValue{Count: 1, Sum: 3, Min: 3, Max: 3},
},
{
[]string{
Expand All @@ -49,20 +54,26 @@ func TestWordCount(t *testing.T) {
},
1,
"jAk8+k4BOH7vQDUiUZdfWg==",
6,
metrics.DistributionValue{Count: 3, Sum: 21, Min: 3, Max: 11},
},
{
[]string{
"bar bar foo bar foo foo",
},
2,
"Nz70m/sn3Ep9o484r7MalQ==",
6,
metrics.DistributionValue{Count: 1, Sum: 23, Min: 23, Max: 23},
},
{
[]string{
"foo bar foo bar foo bar",
},
2,
"Nz70m/sn3Ep9o484r7MalQ==", // ordering doesn't matter: same hash as above
6,
metrics.DistributionValue{Count: 1, Sum: 23, Min: 23, Max: 23},
},
{
[]string{
Expand All @@ -75,19 +86,42 @@ func TestWordCount(t *testing.T) {
},
2,
"Nz70m/sn3Ep9o484r7MalQ==", // whitespace doesn't matter: same hash as above
6,
metrics.DistributionValue{Count: 6, Sum: 37, Min: 0, Max: 11},
},
}

for _, test := range tests {
integration.CheckFilters(t)
const filename = "memfs://input"
memfs.Write(filename, []byte(strings.Join(test.lines, "\n")))

p := WordCount(filename, test.hash, test.words)
_, err := ptest.RunWithMetrics(p)
p, s := beam.NewPipelineWithRoot()
lines := beam.CreateList(s, test.lines)
WordCountFromPCol(s, lines, test.hash, test.words)
pr, err := ptest.RunWithMetrics(p)
if err != nil {
t.Errorf("WordCount(\"%v\") failed: %v", strings.Join(test.lines, "|"), err)
}

qr := pr.Metrics().Query(func(sr metrics.SingleResult) bool {
return sr.Name() == "smallWords"
})
counter := metrics.CounterResult{}
if len(qr.Counters()) != 0 {
counter = qr.Counters()[0]
}
if counter.Result() != test.smallWordsCount {
t.Errorf("Metrics().Query(by Name) failed. Got %d counters, Want %d counters", counter.Result(), test.smallWordsCount)
}

qr = pr.Metrics().Query(func(sr metrics.SingleResult) bool {
return sr.Name() == "lineLenDistro"
})
distribution := metrics.DistributionResult{}
if len(qr.Distributions()) != 0 {
distribution = qr.Distributions()[0]
}
if distribution.Result() != test.lineLen {
t.Errorf("Metrics().Query(by Name) failed. Got %v distribution, Want %v distribution", distribution.Result(), test.lineLen)
}
}
}

Expand Down

0 comments on commit d6b9156

Please sign in to comment.