Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add start_pipeline to MeterProvider in SDK, atexit moved to MeterProvider #791

Merged
merged 17 commits into from
Jun 9, 2020
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions docs/examples/basic_meter/basic_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@
from opentelemetry import metrics
from opentelemetry.sdk.metrics import Counter, MeterProvider, ValueRecorder
from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter
from opentelemetry.sdk.metrics.export.controller import PushController

stateful = True

print(
"Starting example, values will be printed to the console every 5 seconds."
Expand All @@ -37,17 +34,21 @@
# Stateful determines whether how metrics are collected: if true, metrics
# accumulate over the process lifetime. If false, metrics are reset at the
# beginning of each collection interval.
metrics.set_meter_provider(MeterProvider(stateful))
stateful = True

# Sets the global MeterProvider instance
metrics.set_meter_provider(MeterProvider())

# The Meter is responsible for creating and recording metrics. Each meter has a
# unique name, which we set as the module's name here.
meter = metrics.get_meter(__name__)

# Exporter to export metrics to the console
exporter = ConsoleMetricsExporter()

# A PushController collects metrics created from meter and exports it via the
# exporter every interval
controller = PushController(meter=meter, exporter=exporter, interval=5)
# start_pipeline will notify the MeterProvider to begin collecting/exporting
# metrics with the given meter, exporter and interval in seconds
metrics.get_meter_provider().start_pipeline(meter, exporter, 5)

# Metric instruments allow to capture measurements
requests_counter = meter.create_metric(
Expand Down Expand Up @@ -77,7 +78,7 @@
# Update the metric instruments using the direct calling convention
requests_counter.add(25, staging_labels)
requests_size.record(100, staging_labels)
time.sleep(5)
time.sleep(10)

requests_counter.add(50, staging_labels)
requests_size.record(5000, staging_labels)
Expand Down
6 changes: 2 additions & 4 deletions docs/examples/basic_meter/calling_conventions.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@
from opentelemetry import metrics
from opentelemetry.sdk.metrics import Counter, MeterProvider, ValueRecorder
from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter
from opentelemetry.sdk.metrics.export.controller import PushController

# Use the meter type provided by the SDK package
metrics.set_meter_provider(MeterProvider())
meter = metrics.get_meter(__name__)
exporter = ConsoleMetricsExporter()
controller = PushController(meter=meter, exporter=exporter, interval=5)
metrics.get_meter_provider().start_pipeline(meter, ConsoleMetricsExporter(), 5)

requests_counter = meter.create_metric(
name="requests",
Expand Down Expand Up @@ -62,7 +60,7 @@
# You can record metrics directly using the metric instrument. You pass in
# labels that you would like to record for.
requests_counter.add(25, labels)
time.sleep(5)
time.sleep(10)

print("Updating using a bound instrument...")
# You can record metrics with bound metric instruments. Bound metric
Expand Down
6 changes: 2 additions & 4 deletions docs/examples/basic_meter/observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,10 @@
from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider, ValueObserver
from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter
from opentelemetry.sdk.metrics.export.batcher import UngroupedBatcher
from opentelemetry.sdk.metrics.export.controller import PushController

metrics.set_meter_provider(MeterProvider())
meter = metrics.get_meter(__name__)
exporter = ConsoleMetricsExporter()
controller = PushController(meter=meter, exporter=exporter, interval=2)
metrics.get_meter_provider().start_pipeline(meter, ConsoleMetricsExporter(), 5)


# Callback to gather cpu usage
Expand Down Expand Up @@ -60,6 +57,7 @@ def get_ram_usage_callback(observer):
description="RAM memory usage",
unit="1",
value_type=float,
observer_type=ValueObserver,
label_keys=(),
)

Expand Down
10 changes: 3 additions & 7 deletions docs/examples/cloud_monitoring/basic_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,10 @@
CloudMonitoringMetricsExporter,
)
from opentelemetry.sdk.metrics import Counter, MeterProvider
from opentelemetry.sdk.metrics.export.controller import PushController

meter = metrics.get_meter(__name__, True)

# Gather and export metrics every 5 seconds
controller = PushController(
meter=meter, exporter=CloudMonitoringMetricsExporter(), interval=5
)
metrics.set_meter_provider(MeterProvider())
meter = metrics.get_meter(__name__)
metrics.get_meter_provider().start_pipeline(meter, CloudMonitoringMetricsExporter(), 5)

requests_counter = meter.create_metric(
name="request_counter",
Expand Down
3 changes: 1 addition & 2 deletions docs/examples/opencensus-exporter-metrics/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@
OpenCensusMetricsExporter,
)
from opentelemetry.sdk.metrics import Counter, MeterProvider
from opentelemetry.sdk.metrics.export.controller import PushController

exporter = OpenCensusMetricsExporter(
service_name="basic-service", endpoint="localhost:55678"
)

metrics.set_meter_provider(MeterProvider())
meter = metrics.get_meter(__name__)
controller = PushController(meter, exporter, 5)
metrics.get_meter_provider().start_pipeline(meter, exporter, 5)

requests_counter = meter.create_metric(
name="requests",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,19 @@
from opentelemetry import metrics
from opentelemetry.ext.prometheus import PrometheusMetricsExporter
from opentelemetry.sdk.metrics import Counter, Meter
from opentelemetry.sdk.metrics.export.controller import PushController
from prometheus_client import start_http_server

# Start Prometheus client
start_http_server(port=8000, addr="localhost")

# Meter is responsible for creating and recording metrics
metrics.set_meter_provider(MeterProvider())
meter = metrics.meter()
meter = metrics.get_meter(__name__)
# exporter to export metrics to Prometheus
prefix = "MyAppPrefix"
exporter = PrometheusMetricsExporter(prefix)
# controller collects metrics created from meter and exports it via the
# exporter every interval
controller = PushController(meter, exporter, 5)
# Starts the collect/export pipeline for metrics
metrics.get_meter_provider().start_pipeline(meter, exporter, 5)

counter = meter.create_metric(
"requests",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@

.. code:: python

from opentelemetry import metrics
from opentelemetry.ext.system_metrics import SystemMetrics
from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter

metrics.set_meter_provider(MeterProvider())
exporter = ConsoleMetricsExporter()
SystemMetrics(exporter)

Expand Down Expand Up @@ -60,7 +62,6 @@
from opentelemetry import metrics
from opentelemetry.sdk.metrics import ValueObserver
from opentelemetry.sdk.metrics.export import MetricsExporter
from opentelemetry.sdk.metrics.export.controller import PushController


class SystemMetrics:
Expand All @@ -73,9 +74,7 @@ def __init__(
):
self._labels = {} if labels is None else labels
self.meter = metrics.get_meter(__name__)
self.controller = PushController(
meter=self.meter, exporter=exporter, interval=interval
)
metrics.get_meter_provider().start_pipeline(meter, exporter, interval)
if config is None:
self._config = {
"system_memory": ["total", "available", "used", "free"],
Expand Down
2 changes: 2 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
([#775](https://github.com/open-telemetry/opentelemetry-python/pull/775))
- Rename Observer to ValueObserver
([#764](https://github.com/open-telemetry/opentelemetry-python/pull/764))
- Add start_pipeline to MeterProvider
([#764](https://github.com/open-telemetry/opentelemetry-python/pull/764))

## 0.8b0

Expand Down
52 changes: 49 additions & 3 deletions opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import atexit
import logging
import threading
from typing import Dict, Sequence, Tuple, Type

from opentelemetry import metrics as metrics_api
from opentelemetry.sdk.metrics.export import (
ConsoleMetricsExporter,
MetricsExporter,
)
from opentelemetry.sdk.metrics.export.aggregate import Aggregator
from opentelemetry.sdk.metrics.export.batcher import UngroupedBatcher
from opentelemetry.sdk.metrics.export.controller import PushController
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.util.instrumentation import InstrumentationInfo

Expand Down Expand Up @@ -403,24 +409,64 @@ class MeterProvider(metrics_api.MeterProvider):
Args:
stateful: Indicates whether meters created are going to be stateful
resource: Resource for this MeterProvider
shutdown_on_exit: Register an atexit hook to shut down when the
application exists
"""

def __init__(
self, stateful=True, resource: Resource = Resource.create_empty(),
self,
stateful=True,
resource: Resource = Resource.create_empty(),
shutdown_on_exit: bool = True,
):
self.stateful = stateful
self.resource = resource
self._controllers = []
self._exporters = set()
self._atexit_handler = None
if shutdown_on_exit:
self._atexit_handler = atexit.register(self.shutdown)

def get_meter(
self,
instrumenting_module_name: str,
instrumenting_library_version: str = "",
) -> "metrics_api.Meter":
"""See `opentelemetry.metrics.MeterProvider`.get_meter."""
if not instrumenting_module_name: # Reject empty strings too.
raise ValueError("get_meter called with missing module name.")
instrumenting_module_name = "ERROR:MISSING MODULE NAME"
logger.error("get_meter called with missing module name.")
return Meter(
self,
InstrumentationInfo(
instrumenting_module_name, instrumenting_library_version
instrumenting_module_name, instrumenting_library_version,
),
)

def start_pipeline(
self,
meter: metrics_api.Meter,
exporter: MetricsExporter = None,
interval: float = 15.0,
) -> None:
"""Method to begin the collect/export pipeline.

Args:
meter: The meter to collect metrics from.
exporter: The exporter to export metrics to.
interval: The collect/export interval in seconds.
"""
if not exporter:
exporter = ConsoleMetricsExporter()
self._exporters.add(exporter)
# TODO: Controller type configurable?
self._controllers.append(PushController(meter, exporter, interval))

def shutdown(self) -> None:
for controller in self._controllers:
controller.shutdown()
for exporter in self._exporters:
exporter.shutdown()
if self._atexit_handler is not None:
atexit.unregister(self._atexit_handler)
self._atexit_handler = None
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,38 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import atexit
import threading

from opentelemetry.context import attach, detach, set_value
from opentelemetry.metrics import Meter
from opentelemetry.sdk.metrics.export import MetricsExporter


class PushController(threading.Thread):
"""A push based controller, used for exporting.
"""A push based controller, used for collecting and exporting.

Uses a worker thread that periodically collects metrics for exporting,
exports them and performs some post-processing.

Args:
meter: The meter used to collect metrics.
exporter: The exporter used to export metrics.
interval: The collect/export interval in seconds.
"""

daemon = True

def __init__(self, meter, exporter, interval, shutdown_on_exit=True):
def __init__(
self,
meter: Meter,
exporter: MetricsExporter,
interval: float
):
super().__init__()
self.meter = meter
self.exporter = exporter
self.interval = interval
self.finished = threading.Event()
self._atexit_handler = None
if shutdown_on_exit:
self._atexit_handler = atexit.register(self.shutdown)
self.start()

def run(self):
Expand All @@ -46,17 +54,13 @@ def shutdown(self):
self.finished.set()
# Run one more collection pass to flush metrics batched in the meter
self.tick()
self.exporter.shutdown()
if self._atexit_handler is not None:
atexit.unregister(self._atexit_handler)
self._atexit_handler = None

def tick(self):
# Collect all of the meter's metrics to be exported
self.meter.collect()
# Export the collected metrics
token = attach(set_value("suppress_instrumentation", True))
# Export the given metrics in the batcher
self.exporter.export(self.meter.batcher.checkpoint_set())
detach(token)
# Perform post-exporting logic based on batcher configuration
# Perform post-exporting logic
self.meter.batcher.finished_collection()
1 change: 0 additions & 1 deletion opentelemetry-sdk/tests/metrics/export/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,6 @@ def test_push_controller(self):

controller.shutdown()
self.assertTrue(controller.finished.isSet())
exporter.shutdown.assert_any_call()

# shutdown should flush the meter
self.assertEqual(meter.collect.call_count, 1)
Expand Down
22 changes: 22 additions & 0 deletions opentelemetry-sdk/tests/metrics/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,28 @@ def test_resource_empty(self):
# pylint: disable=protected-access
self.assertIs(meter.resource, resources._EMPTY_RESOURCE)

def test_start_pipeline(self):
exporter = mock.Mock()
meter_provider = metrics.MeterProvider()
meter = meter_provider.get_meter(__name__)
# pylint: disable=protected-access
meter_provider.start_pipeline(meter, exporter, 6)
self.assertEqual(len(meter_provider._exporters), 1)
self.assertEqual(len(meter_provider._controllers), 1)
meter_provider.shutdown()

def test_shutdown(self):
controller = mock.Mock()
exporter = mock.Mock()
meter_provider = metrics.MeterProvider()
# pylint: disable=protected-access
meter_provider._controllers = [controller]
meter_provider._exporters = [exporter]
meter_provider.shutdown()
self.assertEqual(controller.shutdown.call_count, 1)
self.assertEqual(exporter.shutdown.call_count, 1)
self.assertIsNone(meter_provider._atexit_handler)


class TestMeter(unittest.TestCase):
def test_extends_api(self):
Expand Down