Skip to content

Commit

Permalink
Add information on python metrics to the programming guide (#32464)
Browse files Browse the repository at this point in the history
* Add information on python metrics to the programming guide

This patch adds documentation within the programming guide on how to use
Beam metrics within Python as before this patch the only two languages
with such information were Go and Java. This prevents the need to look
at specific examples (or needing to read the source code) to learn how
to use metrics in Python.

* Apply suggestions from code review

Co-authored-by: Rebecca Szper <[email protected]>

---------

Co-authored-by: Rebecca Szper <[email protected]>
  • Loading branch information
boomanaiden154 and rszper authored Sep 16, 2024
1 parent 9f8a4b2 commit d20c0b1
Showing 1 changed file with 79 additions and 0 deletions.
79 changes: 79 additions & 0 deletions website/www/site/content/en/documentation/programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -6096,6 +6096,18 @@ func (fn *MyDoFn) ProcessElement(ctx context.Context, ...) {
}
{{< /highlight >}}

{{< highlight py>}}
from apache_beam import metrics

class MyDoFn(beam.DoFn):
def __init__(self):
self.counter = metrics.Metrics.counter("namespace", "counter1")

def process(self, element):
self.counter.inc()
yield element
{{< /highlight >}}

**Distribution**: A metric that reports information about the distribution of reported values.

{{< highlight java >}}
Expand All @@ -6120,6 +6132,16 @@ func (fn *MyDoFn) ProcessElement(ctx context.Context, v int64, ...) {
}
{{< /highlight >}}

{{< highlight py >}}
class MyDoFn(beam.DoFn):
def __init__(self):
self.distribution = metrics.Metrics.distribution("namespace", "distribution1")

def process(self, element):
self.distribution.update(element)
yield element
{{< /highlight >}}

**Gauge**: A metric that reports the latest value out of reported values. Since metrics are
collected from many workers the value may not be the absolute last, but one of the latest values.

Expand All @@ -6145,6 +6167,16 @@ func (fn *MyDoFn) ProcessElement(ctx context.Context, v int64, ...) {
}
{{< /highlight >}}

{{< highlight py >}}
class MyDoFn(beam.DoFn):
def __init__(self):
self.gauge = metrics.Metrics.gauge("namespace", "gauge1")

def process(self, element):
self.gaguge.set(element)
yield element
{{< /highlight >}}

### 10.3. Querying metrics {#querying-metrics}
{{< paragraph class="language-java language-python">}}
`PipelineResult` has a method `metrics()` which returns a `MetricResults` object that allows
Expand All @@ -6159,6 +6191,17 @@ matching a given filter. It takes in a predicate with a `SingleResult` paramete
be used for custom filters.
{{< /paragraph >}}

{{< paragraph class="language-py">}}
`PipelineResult` has a `metrics` method that returns a `MetricResults` object. The `MetricResults` object lets you
access metrics. The main method available in the `MetricResults` object, `query`, lets you
query all metrics that match a given filter. The `query` method takes in a `MetricsFilter` object that you can
use to filter by several different criteria. Querying a `MetricResults` object returns
a dictionary of lists of `MetricResult` objects, with the dictionary organizing them by type,
for example, `Counter`, `Distribution`, and `Gauge`. The `MetricResult` object contains a `result` function
that gets the value of the metric and contains a `key` property. The `key` property contains information about
the namespace and the name of the metric.
{{< /paragraph >}}

{{< highlight java >}}
public interface PipelineResult {
MetricResults metrics();
Expand Down Expand Up @@ -6186,6 +6229,20 @@ public interface MetricResult<T> {
{{< code_sample "sdks/go/examples/snippets/10metrics.go" metrics_query >}}
{{< /highlight >}}

{{< highlight py >}}
class PipelineResult:
def metrics(self) -> MetricResults:
"""Returns a the metric results from the pipeline."""

class MetricResults:
def query(self, filter: MetricsFilter) -> Dict[str, List[MetricResult]]:
"""Filters the results against the specified filter."""

class MetricResult:
def result(self):
"""Returns the value of the metric."""
{{< /highlight >}}

### 10.4. Using metrics in pipeline {#using-metrics}
Below, there is a simple example of how to use a `Counter` metric in a user pipeline.

Expand Down Expand Up @@ -6228,6 +6285,28 @@ public class MyMetricsDoFn extends DoFn<Integer, Integer> {
{{< code_sample "sdks/go/examples/snippets/10metrics.go" metrics_pipeline >}}
{{< /highlight >}}

{{< highlight py >}}
class MyMetricsDoFn(beam.DoFn):
def __init__(self):
self.counter = metrics.Metrics.counter("namespace", "counter1")

def process(self, element):
counter.inc()
yield element

pipeline = beam.Pipeline()

pipeline | beam.ParDo(MyMetricsDoFn())

result = pipeline.run().wait_until_finish()

metrics = result.metrics().query(
metrics.MetricsFilter.with_namespace("namespace").with_name("counter1"))

for metric in metrics["counters"]:
print(metric)
{{< /highlight >}}

### 10.5. Export metrics {#export-metrics}

Beam metrics can be exported to external sinks. If a metrics sink is set up in the configuration, the runner will push metrics to it at a default 5s period.
Expand Down

0 comments on commit d20c0b1

Please sign in to comment.