-
Notifications
You must be signed in to change notification settings - Fork 47
/
Copy path__init__.py
101 lines (82 loc) · 3.86 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# Copyright (C) 2016-2019 SignalFx, Inc. All rights reserved.
# Copyright (C) 2020 Splunk, Inc. All rights reserved.
from . import computation, ws
from .. import constants
class SignalFlowClient(object):
"""SignalFx SignalFlow client.
Client for SignalFx's SignalFlow real-time analytics API. Allows for the
execution of ad-hoc computations, returning its output in real-time as it
is produced; to start new background computations; attach, keep alive or
stop existing computations.
"""
def __init__(self, token, endpoint=constants.DEFAULT_STREAM_ENDPOINT,
timeout=constants.DEFAULT_TIMEOUT,
transport=ws.WebSocketTransport,
compress=True, proxy_url=None):
self._transport = transport(token, endpoint, timeout, compress,
proxy_url)
self._computations = set([])
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
return self.close()
def _get_params(self, **kwargs):
return dict((k, v) for k, v in kwargs.items() if v is not None)
def execute(self, program, start=None, stop=None, resolution=None,
max_delay=None, persistent=False, immediate=False,
disable_all_metric_publishes=None, withDerivedMetadata=None,
resolutionAdjustable=None, timezone=None):
"""Execute the given SignalFlow program and stream the output back."""
params = self._get_params(
start=start, stop=stop, resolution=resolution,
maxDelay=max_delay, persistent=persistent, immediate=immediate,
disableAllMetricPublishes=disable_all_metric_publishes,
withDerivedMetadata=withDerivedMetadata,
resolutionAdjustable=resolutionAdjustable,
timezone=timezone)
def exec_fn(since=None):
if since:
params['start'] = since
return self._transport.execute(program, params)
c = computation.Computation(exec_fn)
self._computations.add(c)
return c
def preflight(self, program, start, stop, resolution=None,
max_delay=None):
"""Preflight the given SignalFlow program and stream the output
back."""
params = self._get_params(start=start, stop=stop,
resolution=resolution,
maxDelay=max_delay)
def exec_fn(since=None):
if since:
params['start'] = since
return self._transport.preflight(program, params)
c = computation.Computation(exec_fn)
self._computations.add(c)
return c
def start(self, program, start=None, stop=None, resolution=None,
max_delay=None):
"""Start executing the given SignalFlow program without being attached
to the output of the computation."""
params = self._get_params(start=start, stop=stop,
resolution=resolution,
maxDelay=max_delay)
self._transport.start(program, params)
def attach(self, handle, filters=None, resolution=None):
"""Attach to an existing SignalFlow computation."""
params = self._get_params(filters=filters, resolution=resolution)
c = computation.Computation(
lambda since: self._transport.attach(handle, params))
self._computations.add(c)
return c
def keepalive(self, handle):
"""Keepalive a SignalFlow computation."""
self._transport.keepalive(handle)
def stop(self, handle, reason=None):
"""Stop a SignalFlow computation."""
params = self._get_params(reason=reason)
self._transport.stop(handle, params)
def close(self):
"""Close this SignalFlow client."""
self._transport.close()