From d6b9156577f2dfa239cc1c88c1e6bda5f00b4c07 Mon Sep 17 00:00:00 2001 From: Ritesh Ghorse Date: Fri, 1 Oct 2021 16:38:38 -0400 Subject: [PATCH] [BEAM-11217] Implemented metrics filtering (#15482) --- sdks/go.mod | 3 - sdks/go/pkg/beam/core/metrics/metrics.go | 65 ++++++++++++++++++- .../test/integration/wordcount/wordcount.go | 18 +++-- .../integration/wordcount/wordcount_test.go | 52 ++++++++++++--- 4 files changed, 120 insertions(+), 18 deletions(-) diff --git a/sdks/go.mod b/sdks/go.mod index 9cb3c90c60c6..2c7238f665ff 100644 --- a/sdks/go.mod +++ b/sdks/go.mod @@ -30,17 +30,14 @@ require ( github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.2 // TODO(danoliveira): Fully replace this with google.golang.org/protobuf github.com/golang/snappy v0.0.4 // indirect - github.com/google/btree v1.0.0 // indirect github.com/google/go-cmp v0.5.6 github.com/google/martian/v3 v3.2.1 // indirect github.com/google/uuid v1.3.0 - github.com/hashicorp/golang-lru v0.5.1 // indirect github.com/kr/text v0.2.0 // indirect github.com/linkedin/goavro v2.1.0+incompatible github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect github.com/nightlyone/lockfile v1.0.0 github.com/spf13/cobra v1.2.1 - golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 // indirect golang.org/x/net v0.0.0-20210423184538-5f58ad60dda6 golang.org/x/oauth2 v0.0.0-20210628180205-a41e5a781914 golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect diff --git a/sdks/go/pkg/beam/core/metrics/metrics.go b/sdks/go/pkg/beam/core/metrics/metrics.go index 8d0d2a8b2879..32fdd35dedfa 100644 --- a/sdks/go/pkg/beam/core/metrics/metrics.go +++ b/sdks/go/pkg/beam/core/metrics/metrics.go @@ -482,7 +482,40 @@ func (mr Results) AllMetrics() QueryResults { return QueryResults{mr.counters, mr.distributions, mr.gauges} } -// TODO(BEAM-11217): Implement Query(Filter) and metrics filtering +// TODO(BEAM-11217): Implement querying metrics by DoFn + +// SingleResult interface facilitates metrics query filtering methods. +type SingleResult interface { + Name() string + Namespace() string +} + +// Query allows metrics querying with filter. The filter takes the form of predicate function. Example: +// qr = pr.Metrics().Query(func(sr metrics.SingleResult) bool { +// return sr.Namespace() == test.namespace +// }) +func (mr Results) Query(f func(SingleResult) bool) QueryResults { + counters := []CounterResult{} + distributions := []DistributionResult{} + gauges := []GaugeResult{} + + for _, counter := range mr.counters { + if f(counter) { + counters = append(counters, counter) + } + } + for _, distribution := range mr.distributions { + if f(distribution) { + distributions = append(distributions, distribution) + } + } + for _, gauge := range mr.gauges { + if f(gauge) { + gauges = append(gauges, gauge) + } + } + return QueryResults{counters, distributions, gauges} +} // QueryResults is the result of a query. Allows accessing all of the // metrics that matched the filter. @@ -529,6 +562,16 @@ func (r CounterResult) Result() int64 { return r.Attempted } +// Name returns the Name of this Counter. +func (r CounterResult) Name() string { + return r.Key.Name +} + +// Namespace returns the Namespace of this Counter. +func (r CounterResult) Namespace() string { + return r.Key.Namespace +} + // MergeCounters combines counter metrics that share a common key. func MergeCounters( attempted map[StepKey]int64, @@ -571,6 +614,16 @@ func (r DistributionResult) Result() DistributionValue { return r.Attempted } +// Name returns the Name of this Distribution. +func (r DistributionResult) Name() string { + return r.Key.Name +} + +// Namespace returns the Namespace of this Distribution. +func (r DistributionResult) Namespace() string { + return r.Key.Namespace +} + // MergeDistributions combines distribution metrics that share a common key. func MergeDistributions( attempted map[StepKey]DistributionValue, @@ -613,6 +666,16 @@ func (r GaugeResult) Result() GaugeValue { return r.Attempted } +// Name returns the Name of this Gauge. +func (r GaugeResult) Name() string { + return r.Key.Name +} + +// Namespace returns the Namespace of this Gauge. +func (r GaugeResult) Namespace() string { + return r.Key.Namespace +} + // StepKey uniquely identifies a metric within a pipeline graph. type StepKey struct { Step, Name, Namespace string diff --git a/sdks/go/test/integration/wordcount/wordcount.go b/sdks/go/test/integration/wordcount/wordcount.go index bcc40b55a2d5..6be59011b5ba 100644 --- a/sdks/go/test/integration/wordcount/wordcount.go +++ b/sdks/go/test/integration/wordcount/wordcount.go @@ -30,9 +30,10 @@ import ( ) var ( - wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`) - empty = beam.NewCounter("extract", "emptyLines") - lineLen = beam.NewDistribution("extract", "lineLenDistro") + wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`) + empty = beam.NewCounter("extract", "emptyLines") + lineLen = beam.NewDistribution("extract", "lineLenDistro") + smallWords = beam.NewCounter("extract", "smallWords") ) // CountWords is a composite transform that counts the words of a PCollection @@ -56,6 +57,9 @@ func extractFn(ctx context.Context, line string, emit func(string)) { empty.Inc(ctx, 1) } for _, word := range wordRE.FindAllString(line, -1) { + if len(word) < 6 { + smallWords.Inc(ctx, 1) + } emit(word) } } @@ -74,8 +78,12 @@ func WordCount(glob, hash string, size int) *beam.Pipeline { p, s := beam.NewPipelineWithRoot() in := textio.Read(s, glob) + WordCountFromPCol(s, in, hash, size) + return p +} + +// WordCountFromPCol counts the words from a PCollection and validates it. +func WordCountFromPCol(s beam.Scope, in beam.PCollection, hash string, size int) { out := Format(s, CountWords(s, in)) passert.Hash(s, out, "out", hash, size) - - return p } diff --git a/sdks/go/test/integration/wordcount/wordcount_test.go b/sdks/go/test/integration/wordcount/wordcount_test.go index 482d9a3ac872..805b0ecc0ca9 100644 --- a/sdks/go/test/integration/wordcount/wordcount_test.go +++ b/sdks/go/test/integration/wordcount/wordcount_test.go @@ -19,7 +19,8 @@ import ( "strings" "testing" - "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/memfs" + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics" _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow" _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink" _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/samza" @@ -30,9 +31,11 @@ import ( func TestWordCount(t *testing.T) { tests := []struct { - lines []string - words int - hash string + lines []string + words int + hash string + smallWordsCount int64 + lineLen metrics.DistributionValue }{ { []string{ @@ -40,6 +43,8 @@ func TestWordCount(t *testing.T) { }, 1, "6zZtmVTet7aIhR3wmPE8BA==", + 1, + metrics.DistributionValue{Count: 1, Sum: 3, Min: 3, Max: 3}, }, { []string{ @@ -49,6 +54,8 @@ func TestWordCount(t *testing.T) { }, 1, "jAk8+k4BOH7vQDUiUZdfWg==", + 6, + metrics.DistributionValue{Count: 3, Sum: 21, Min: 3, Max: 11}, }, { []string{ @@ -56,6 +63,8 @@ func TestWordCount(t *testing.T) { }, 2, "Nz70m/sn3Ep9o484r7MalQ==", + 6, + metrics.DistributionValue{Count: 1, Sum: 23, Min: 23, Max: 23}, }, { []string{ @@ -63,6 +72,8 @@ func TestWordCount(t *testing.T) { }, 2, "Nz70m/sn3Ep9o484r7MalQ==", // ordering doesn't matter: same hash as above + 6, + metrics.DistributionValue{Count: 1, Sum: 23, Min: 23, Max: 23}, }, { []string{ @@ -75,19 +86,42 @@ func TestWordCount(t *testing.T) { }, 2, "Nz70m/sn3Ep9o484r7MalQ==", // whitespace doesn't matter: same hash as above + 6, + metrics.DistributionValue{Count: 6, Sum: 37, Min: 0, Max: 11}, }, } for _, test := range tests { integration.CheckFilters(t) - const filename = "memfs://input" - memfs.Write(filename, []byte(strings.Join(test.lines, "\n"))) - - p := WordCount(filename, test.hash, test.words) - _, err := ptest.RunWithMetrics(p) + p, s := beam.NewPipelineWithRoot() + lines := beam.CreateList(s, test.lines) + WordCountFromPCol(s, lines, test.hash, test.words) + pr, err := ptest.RunWithMetrics(p) if err != nil { t.Errorf("WordCount(\"%v\") failed: %v", strings.Join(test.lines, "|"), err) } + + qr := pr.Metrics().Query(func(sr metrics.SingleResult) bool { + return sr.Name() == "smallWords" + }) + counter := metrics.CounterResult{} + if len(qr.Counters()) != 0 { + counter = qr.Counters()[0] + } + if counter.Result() != test.smallWordsCount { + t.Errorf("Metrics().Query(by Name) failed. Got %d counters, Want %d counters", counter.Result(), test.smallWordsCount) + } + + qr = pr.Metrics().Query(func(sr metrics.SingleResult) bool { + return sr.Name() == "lineLenDistro" + }) + distribution := metrics.DistributionResult{} + if len(qr.Distributions()) != 0 { + distribution = qr.Distributions()[0] + } + if distribution.Result() != test.lineLen { + t.Errorf("Metrics().Query(by Name) failed. Got %v distribution, Want %v distribution", distribution.Result(), test.lineLen) + } } }