diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md index 4005879a48ac..c716c7554db4 100644 --- a/website/www/site/content/en/documentation/programming-guide.md +++ b/website/www/site/content/en/documentation/programming-guide.md @@ -6096,6 +6096,18 @@ func (fn *MyDoFn) ProcessElement(ctx context.Context, ...) { } {{< /highlight >}} +{{< highlight py>}} +from apache_beam import metrics + +class MyDoFn(beam.DoFn): + def __init__(self): + self.counter = metrics.Metrics.counter("namespace", "counter1") + + def process(self, element): + self.counter.inc() + yield element +{{< /highlight >}} + **Distribution**: A metric that reports information about the distribution of reported values. {{< highlight java >}} @@ -6120,6 +6132,16 @@ func (fn *MyDoFn) ProcessElement(ctx context.Context, v int64, ...) { } {{< /highlight >}} +{{< highlight py >}} +class MyDoFn(beam.DoFn): + def __init__(self): + self.distribution = metrics.Metrics.distribution("namespace", "distribution1") + + def process(self, element): + self.distribution.update(element) + yield element +{{< /highlight >}} + **Gauge**: A metric that reports the latest value out of reported values. Since metrics are collected from many workers the value may not be the absolute last, but one of the latest values. @@ -6145,6 +6167,16 @@ func (fn *MyDoFn) ProcessElement(ctx context.Context, v int64, ...) { } {{< /highlight >}} +{{< highlight py >}} +class MyDoFn(beam.DoFn): + def __init__(self): + self.gauge = metrics.Metrics.gauge("namespace", "gauge1") + + def process(self, element): + self.gaguge.set(element) + yield element +{{< /highlight >}} + ### 10.3. Querying metrics {#querying-metrics} {{< paragraph class="language-java language-python">}} `PipelineResult` has a method `metrics()` which returns a `MetricResults` object that allows @@ -6159,6 +6191,17 @@ matching a given filter. It takes in a predicate with a `SingleResult` paramete be used for custom filters. {{< /paragraph >}} +{{< paragraph class="language-py">}} +`PipelineResult` has a `metrics` method that returns a `MetricResults` object. The `MetricResults` object lets you +access metrics. The main method available in the `MetricResults` object, `query`, lets you +query all metrics that match a given filter. The `query` method takes in a `MetricsFilter` object that you can +use to filter by several different criteria. Querying a `MetricResults` object returns +a dictionary of lists of `MetricResult` objects, with the dictionary organizing them by type, +for example, `Counter`, `Distribution`, and `Gauge`. The `MetricResult` object contains a `result` function +that gets the value of the metric and contains a `key` property. The `key` property contains information about +the namespace and the name of the metric. +{{< /paragraph >}} + {{< highlight java >}} public interface PipelineResult { MetricResults metrics(); @@ -6186,6 +6229,20 @@ public interface MetricResult { {{< code_sample "sdks/go/examples/snippets/10metrics.go" metrics_query >}} {{< /highlight >}} +{{< highlight py >}} +class PipelineResult: + def metrics(self) -> MetricResults: + """Returns a the metric results from the pipeline.""" + +class MetricResults: + def query(self, filter: MetricsFilter) -> Dict[str, List[MetricResult]]: + """Filters the results against the specified filter.""" + +class MetricResult: + def result(self): + """Returns the value of the metric.""" +{{< /highlight >}} + ### 10.4. Using metrics in pipeline {#using-metrics} Below, there is a simple example of how to use a `Counter` metric in a user pipeline. @@ -6228,6 +6285,28 @@ public class MyMetricsDoFn extends DoFn { {{< code_sample "sdks/go/examples/snippets/10metrics.go" metrics_pipeline >}} {{< /highlight >}} +{{< highlight py >}} +class MyMetricsDoFn(beam.DoFn): + def __init__(self): + self.counter = metrics.Metrics.counter("namespace", "counter1") + + def process(self, element): + counter.inc() + yield element + +pipeline = beam.Pipeline() + +pipeline | beam.ParDo(MyMetricsDoFn()) + +result = pipeline.run().wait_until_finish() + +metrics = result.metrics().query( + metrics.MetricsFilter.with_namespace("namespace").with_name("counter1")) + +for metric in metrics["counters"]: + print(metric) +{{< /highlight >}} + ### 10.5. Export metrics {#export-metrics} Beam metrics can be exported to external sinks. If a metrics sink is set up in the configuration, the runner will push metrics to it at a default 5s period.