Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CherryPickPoints #34

Merged
merged 3 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions examples/example_specific_points.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""List defined zones."""
import asyncio
import aiohttp

from my_token import MY_TOKEN

from myuplink.auth import Auth
from myuplink import MyUplinkAPI

PARAMETERS = ["40004", "40940", "40005"]


async def main():
"""Connect and print test data."""
async with aiohttp.ClientSession() as session:
auth = Auth(
session,
"https://api.myuplink.com",
MY_TOKEN,
)
api = MyUplinkAPI(auth)

systems = await api.async_get_systems()
for system in systems:
print(f"System id: {system.id}")
print(f"System name: {system.name}")

print(f"No of devices in system: {len(system.devices)}")
for sys_device in system.devices:
device = await api.async_get_device(sys_device.deviceId)
print(device.id)
print(device.productName)
print(device.productSerialNumber)
print(device.firmwareCurrent)
print(device.firmwareDesired)
print(device.connectionState)

points = await api.async_get_device_points(
sys_device.deviceId, points=PARAMETERS
)
for point in points:
print(
f"{point.parameter_id} | {point.parameter_name} | {point.value}"
)
print()


asyncio.run(main())
13 changes: 9 additions & 4 deletions src/myuplink/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,24 @@ async def async_get_device_json(self, device_id) -> dict:
return await resp.json()

async def async_get_device_points(
self, device_id, language: str = "en-GB"
self, device_id, language: str = "en-GB", points: list[str] | None = None
) -> list[DevicePoint]:
"""Return device points."""
array = await self.async_get_device_points_json(device_id, language)
array = await self.async_get_device_points_json(device_id, language, points)
return [DevicePoint(point_data) for point_data in array]

async def async_get_device_points_json(
self, device_id, language: str = "en-GB"
self, device_id, language: str = "en-GB", points: list[str] | None = None
) -> dict:
"""Return device points as json."""
headers = {"Accept-Language": language}
if points is None:
points = []
params = ""
if len(points) > 0:
params = "?parameters=" + ",".join(points)
resp = await self.auth.request(
"get", f"v2/devices/{device_id}/points", headers=headers
"get", f"v2/devices/{device_id}/points{params}", headers=headers
)
resp.raise_for_status()
return await resp.json()
Expand Down
Loading