-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathself_monitoring.py
34 lines (26 loc) · 1.36 KB
/
self_monitoring.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import configparser
import asyncio
import aiohttp
import apis.account
import apis.application
parser = configparser.ConfigParser()
parser.read("config.ini")
config = parser['DEFAULT']
accountApi = apis.account.AccountApi(config)
applicationApi = apis.application.ApplicationApi(config)
async def load_applications_information():
async with aiohttp.ClientSession(headers = {'Content-Type': "application/vnd.appd.cntrl+protobuf;v=1"}) as session:
oauth_token = await asyncio.ensure_future(accountApi.access_token(session))
async with aiohttp.ClientSession(headers = {'Authorization': f"Bearer {oauth_token}"}) as session:
applications = await asyncio.ensure_future(applicationApi.applications(session))
business_transactions_tasks = []
tier_tasks = []
backend_tasks = []
for application in applications:
business_transactions_tasks.append(asyncio.ensure_future(applicationApi.business_transactions(session, application)))
tier_tasks.append(asyncio.ensure_future(applicationApi.tiers_and_nodes(session, application)))
backend_tasks.append(asyncio.ensure_future(applicationApi.backends(session, application)))
await asyncio.gather(*tier_tasks)
await asyncio.gather(*business_transactions_tasks)
await asyncio.gather(*backend_tasks)
asyncio.run(load_applications_information())