-
Notifications
You must be signed in to change notification settings - Fork 2
/
api.py
248 lines (200 loc) · 8.78 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
import asyncio
import logging
from typing import Any, AsyncIterator, Awaitable, Callable, Dict, Sequence
import aiohttp.web
from aiohttp.web import HTTPUnauthorized
from async_exit_stack import AsyncExitStack
from neuro_auth_client import AuthClient
from neuro_auth_client.security import AuthScheme, setup_security
from notifications_client import Client as NotificationsClient
from .cluster import Cluster, ClusterConfig, ClusterRegistry
from .config import Config
from .config_factory import EnvironConfigFactory
from .handlers import JobsHandler
from .kube_cluster import KubeCluster
from .orchestrator.job_request import JobException
from .orchestrator.jobs_poller import JobsPoller
from .orchestrator.jobs_service import JobsService, JobsServiceException
from .orchestrator.jobs_storage import RedisJobsStorage
from .redis import create_redis_client
from .user import authorized_user
logger = logging.getLogger(__name__)
class ApiHandler:
def __init__(self, *, app: aiohttp.web.Application, config: Config):
self._app = app
self._config = config
def register(self, app: aiohttp.web.Application) -> None:
app.add_routes(
(
aiohttp.web.get("/ping", self.handle_ping),
aiohttp.web.get("/config", self.handle_config),
)
)
@property
def _jobs_service(self) -> JobsService:
return self._app["jobs_service"]
async def handle_ping(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
return aiohttp.web.Response()
async def handle_config(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
data: Dict[str, Any] = {}
try:
user = await authorized_user(request)
cluster_config = await self._jobs_service.get_cluster_config(user)
presets = []
for preset in cluster_config.orchestrator.presets:
preset_dict: Dict[str, Any] = {"name": preset.name}
preset_dict["cpu"] = preset.cpu
preset_dict["memory_mb"] = preset.memory_mb
preset_dict["is_preemptible"] = preset.is_preemptible
if preset.gpu is not None:
preset_dict["gpu"] = preset.gpu
if preset.gpu_model is not None:
preset_dict["gpu_model"] = preset.gpu_model
presets.append(preset_dict)
data.update(
{
"registry_url": str(cluster_config.registry.url),
"storage_url": str(cluster_config.ingress.storage_url),
"users_url": str(cluster_config.ingress.users_url),
"monitoring_url": str(cluster_config.ingress.monitoring_url),
"resource_presets": presets,
}
)
except HTTPUnauthorized:
pass
if self._config.oauth:
data["auth_url"] = str(self._config.oauth.auth_url)
data["token_url"] = str(self._config.oauth.token_url)
data["client_id"] = self._config.oauth.client_id
data["audience"] = self._config.oauth.audience
data["callback_urls"] = [str(u) for u in self._config.oauth.callback_urls]
data["headless_callback_url"] = str(
self._config.oauth.headless_callback_url
)
redirect_url = self._config.oauth.success_redirect_url
if redirect_url:
data["success_redirect_url"] = str(redirect_url)
return aiohttp.web.json_response(data)
def init_logging() -> None:
logging.basicConfig(
# TODO (A Danshyn 06/01/18): expose in the Config
level=logging.DEBUG,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
@aiohttp.web.middleware
async def handle_exceptions(
request: aiohttp.web.Request,
handler: Callable[[aiohttp.web.Request], Awaitable[aiohttp.web.StreamResponse]],
) -> aiohttp.web.StreamResponse:
try:
return await handler(request)
except JobException as e:
payload = {"error": str(e)}
return aiohttp.web.json_response(
payload, status=aiohttp.web.HTTPBadRequest.status_code
)
except JobsServiceException as e:
payload = {"error": str(e)}
return aiohttp.web.json_response(
payload, status=aiohttp.web.HTTPBadRequest.status_code
)
except ValueError as e:
payload = {"error": str(e)}
return aiohttp.web.json_response(
payload, status=aiohttp.web.HTTPBadRequest.status_code
)
except aiohttp.web.HTTPException:
raise
except Exception as e:
msg_str = (
f"Unexpected exception: {str(e)}. " f"Path with query: {request.path_qs}."
)
logging.exception(msg_str)
payload = {"error": msg_str}
return aiohttp.web.json_response(
payload, status=aiohttp.web.HTTPInternalServerError.status_code
)
async def create_api_v1_app(config: Config) -> aiohttp.web.Application:
api_v1_app = aiohttp.web.Application()
api_v1_handler = ApiHandler(app=api_v1_app, config=config)
api_v1_handler.register(api_v1_app)
return api_v1_app
async def create_jobs_app(config: Config) -> aiohttp.web.Application:
jobs_app = aiohttp.web.Application()
jobs_handler = JobsHandler(app=jobs_app, config=config)
jobs_handler.register(jobs_app)
return jobs_app
def create_cluster(config: ClusterConfig) -> Cluster:
return KubeCluster(config)
async def create_app(
config: Config, cluster_configs_future: Awaitable[Sequence[ClusterConfig]]
) -> aiohttp.web.Application:
app = aiohttp.web.Application(middlewares=[handle_exceptions])
app["config"] = config
async def _init_app(app: aiohttp.web.Application) -> AsyncIterator[None]:
async with AsyncExitStack() as exit_stack:
logger.info("Initializing Notifications client")
notifications_client = NotificationsClient(
url=config.notifications.url, token=config.notifications.token
)
await exit_stack.enter_async_context(notifications_client)
logger.info("Initializing Redis client")
redis_client = await exit_stack.enter_async_context(
create_redis_client(config.database.redis)
)
logger.info("Initializing Cluster Registry")
cluster_registry = await exit_stack.enter_async_context(
ClusterRegistry(factory=create_cluster)
)
[
await cluster_registry.add(cluster_config)
for cluster_config in await cluster_configs_future
]
logger.info("Initializing JobsStorage")
jobs_storage = RedisJobsStorage(redis_client)
await jobs_storage.migrate()
logger.info("Initializing JobsService")
jobs_service = JobsService(
cluster_registry=cluster_registry,
jobs_storage=jobs_storage,
jobs_config=config.jobs,
notifications_client=notifications_client,
)
logger.info("Initializing JobsPoller")
jobs_poller = JobsPoller(jobs_service=jobs_service)
await exit_stack.enter_async_context(jobs_poller)
app["api_v1_app"]["jobs_service"] = jobs_service
app["jobs_app"]["jobs_service"] = jobs_service
auth_client = await exit_stack.enter_async_context(
AuthClient(
url=config.auth.server_endpoint_url, token=config.auth.service_token
)
)
app["jobs_app"]["auth_client"] = auth_client
await setup_security(
app=app, auth_client=auth_client, auth_scheme=AuthScheme.BEARER
)
yield
app.cleanup_ctx.append(_init_app)
api_v1_app = await create_api_v1_app(config)
app["api_v1_app"] = api_v1_app
jobs_app = await create_jobs_app(config=config)
app["jobs_app"] = jobs_app
api_v1_app.add_subapp("/jobs", jobs_app)
app.add_subapp("/api/v1", api_v1_app)
return app
async def get_cluster_configs(config: Config) -> Sequence[ClusterConfig]:
cluster_config = EnvironConfigFactory().create_cluster()
async with config.config_client as client:
return await client.get_clusters(
users_url=cluster_config.ingress.users_url,
jobs_ingress_class=config.jobs.jobs_ingress_class,
jobs_ingress_oauth_url=config.jobs.jobs_ingress_oauth_url,
)
def main() -> None:
init_logging()
config = EnvironConfigFactory().create()
logging.info("Loaded config: %r", config)
loop = asyncio.get_event_loop()
app = loop.run_until_complete(create_app(config, get_cluster_configs(config)))
aiohttp.web.run_app(app, host=config.server.host, port=config.server.port)