diff --git a/CHANGELOG.md b/CHANGELOG.md index 8d4f131..cfaf592 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file. ## [Unreleased] - No unreleased changes so far +## [0.12.0] - 2021-07-05 +### Added +- Possibility to retrieve images and save attributes to files on disk + ## [0.11.1] - 2021-07-03 ### Fixed - Addressing of statuses @@ -130,7 +134,8 @@ Minor fix in observer interface ## [0.1.0] - 2021-05-26 Initial release -[unreleased]: https://github.com/tillsteinbach/WeConnect-python/compare/v0.11.1...HEAD +[unreleased]: https://github.com/tillsteinbach/WeConnect-python/compare/v0.12.0...HEAD +[0.12.0]: https://github.com/tillsteinbach/WeConnect-python/releases/tag/v0.12.0 [0.11.1]: https://github.com/tillsteinbach/WeConnect-python/releases/tag/v0.11.1 [0.11.0]: https://github.com/tillsteinbach/WeConnect-python/releases/tag/v0.11.0 [0.10.0]: https://github.com/tillsteinbach/WeConnect-python/releases/tag/v0.10.0 diff --git a/requirements.txt b/requirements.txt index b7ec5aa..accc948 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,3 @@ -requests>=2.23.0 \ No newline at end of file +requests>=2.23.0 +pillow>=8.3.0 +ascii_magic>=1.5.5 \ No newline at end of file diff --git a/weconnect/addressable.py b/weconnect/addressable.py index 0fc5335..1b253a9 100644 --- a/weconnect/addressable.py +++ b/weconnect/addressable.py @@ -3,7 +3,10 @@ from enum import Enum, IntEnum, Flag, auto from typing import Dict, List -from .util import toBool +from PIL import Image +import ascii_magic + +from .util import toBool, imgToASCIIArt LOG = logging.getLogger("weconnect") @@ -181,11 +184,47 @@ def isLeaf(self): # pylint: disable=R0201 def getLeafChildren(self): return [self] + def saveToFile(self, filename): + if filename.endswith(('.txt', '.TXT', '.text')): + with open(filename, mode='w') as file: + if self.valueType == Image.Image: + file.write(imgToASCIIArt(self.value, columns=120, mode=ascii_magic.Modes.ASCII)) + else: + file.write(str(self)) + elif filename.endswith(('.htm', '.HTM', '.html', '.HTML')): + with open(filename, mode='w') as file: + if self.valueType == Image.Image: + html = """ASCII art
"""
+                    file.write(html)
+                    file.write(imgToASCIIArt(self.value, columns=240, mode=ascii_magic.Modes.HTML))
+                    file.write('
')
+                else:
+                    file.write(str(self))
+        elif filename.endswith(('.png', '.PNG')):
+            with open(filename, mode='wb') as file:
+                if self.valueType == Image.Image:
+                    self.value.save(fp=file, format='PNG')  # pylint: disable=no-member
+                else:
+                    raise ValueError('Attribute is no image and cannot be converted to one')
+        elif filename.endswith(('.jpg', '.JPG', '.jpeg', '.JPEG')):
+            with open(filename, mode='wb') as file:
+                if self.valueType == Image.Image:
+                    if self.value.mode in ("RGBA", "P"):  # pylint: disable=no-member
+                        raise ValueError('Image contains transparency and thus cannot be saved as jpeg-file')
+                    self.value.save(fp=file, format='JPEG')  # pylint: disable=no-member
+                else:
+                    raise ValueError('Attribute is no image and cannot be converted to one')
+        else:
+            raise ValueError('I cannot recognize the target file extension')
+
     def __str__(self):
         if isinstance(self.value, Enum):
             return str(self.value.value)  # pylint: disable=no-member
         if isinstance(self.value, datetime):
             return self.value.isoformat()  # pylint: disable=no-member
+        if isinstance(self.value, Image.Image):
+            return imgToASCIIArt(self.value)  # pylint: disable=no-member
         return str(self.value)
 
 
diff --git a/weconnect/elements.py b/weconnect/elements.py
index 04cec09..b2e7bb6 100644
--- a/weconnect/elements.py
+++ b/weconnect/elements.py
@@ -4,8 +4,10 @@
 from enum import Enum
 from datetime import datetime, timedelta, timezone
 from typing import Dict
-
+import base64
+import io
 import requests
+from PIL import Image
 
 from .util import robustTimeParse, toBool
 from .addressable import AddressableLeaf, AddressableObject, AddressableAttribute, AddressableDict, AddressableList, \
@@ -31,6 +33,8 @@ def __init__(
         parent,
         fromDict,
         fixAPI=True,
+        updateCapabilities=True,
+        updatePictures=True,
     ):
         self.weConnect = weConnect
         super().__init__(localAddress=vin, parent=parent)
@@ -47,12 +51,16 @@ def __init__(
         self.controls = Controls(localAddress='controls', vehicle=self, parent=self)
         self.fixAPI = fixAPI
 
-        self.update(fromDict)
+        self.__carImages = dict()
+        self.pictures = AddressableDict(localAddress='pictures', parent=self)
+
+        self.update(fromDict, updateCapabilities=updateCapabilities, updatePictures=updatePictures)
 
     def update(  # noqa: C901  # pylint: disable=too-many-branches
         self,
         fromDict=None,
         updateCapabilities=True,
+        updatePictures=True,
         force=False,
     ):
         if fromDict is not None:
@@ -145,7 +153,8 @@ def update(  # noqa: C901  # pylint: disable=too-many-branches
                 LOG.warning('%s: Unknown attribute %s with value %s', self.getGlobalAddress(), key, value)
 
         self.updateStatus(updateCapabilities=updateCapabilities, force=force)
-        # self.test()
+        if updatePictures:
+            self.updatePictures()
 
     def updateStatus(self, updateCapabilities=True, force=False):  # noqa: C901 # pylint: disable=too-many-branches
         data = None
@@ -288,10 +297,145 @@ def updateStatus(self, updateCapabilities=True, force=False):  # noqa: C901 # py
                                                                        parent=self.statuses,
                                                                        statusId='parkingPosition',
                                                                        fromDict=data['data'])
-            return
-        if 'parkingPosition' in self.statuses:
+        elif 'parkingPosition' in self.statuses:
             del self.statuses['parkingPosition']
 
+    def updatePictures(self):  # noqa: C901
+        data = None
+        cacheDate = None
+        url = f'https://vehicle-images-service.apps.emea.vwapps.io/v2/vehicle-images/{self.vin.value}?resolution=2x'
+        if self.weConnect.maxAge is not None and self.weConnect.cache is not None and url in self.weConnect.cache:
+            data, cacheDateString = self.weConnect.cache[url]
+            cacheDate = datetime.fromisoformat(cacheDateString)
+        if data is None or self.weConnect.maxAge is None \
+                or (cacheDate is not None and cacheDate < (datetime.utcnow() - timedelta(seconds=self.weConnect.maxAge))):
+            imageResponse = self.weConnect.session.get(url, allow_redirects=False)
+            if imageResponse.status_code == requests.codes['ok']:
+                data = imageResponse.json()
+                if self.weConnect.cache is not None:
+                    self.weConnect.cache[url] = (data, str(datetime.utcnow()))
+            elif imageResponse.status_code == requests.codes['unauthorized']:
+                LOG.info('Server asks for new authorization')
+                self.weConnect.login()
+                imageResponse = self.weConnect.session.get(url, allow_redirects=False)
+                if imageResponse.status_code == requests.codes['ok']:
+                    data = imageResponse.json()
+                    if self.weConnect.cache is not None:
+                        self.weConnect.cache[url] = (data, str(datetime.utcnow()))
+                else:
+                    raise RetrievalError('Could not retrieve data even after re-authorization.'
+                                         f' Status Code was: {imageResponse.status_code}')
+                raise RetrievalError(f'Could not retrieve data. Status Code was: {imageResponse.status_code}')
+        if data is not None and 'data' in data:  # pylint: disable=too-many-nested-blocks
+            for image in data['data']:
+                img = None
+                cacheDate = None
+                url = image['url']
+                if self.weConnect.maxAge is not None and self.weConnect.cache is not None and url in self.weConnect.cache:
+                    img, cacheDateString = self.weConnect.cache[url]
+                    img = base64.b64decode(img)
+                    img = Image.open(io.BytesIO(img))
+                    cacheDate = datetime.fromisoformat(cacheDateString)
+                if img is None or self.weConnect.maxAge is None \
+                        or (cacheDate is not None and cacheDate < (datetime.utcnow() - timedelta(days=1))):
+                    imageDownloadResponse = self.weConnect.session.get(url, stream=True)
+                    if imageDownloadResponse.status_code == requests.codes['ok']:
+                        img = Image.open(imageDownloadResponse.raw)
+                        if self.weConnect.cache is not None:
+                            buffered = io.BytesIO()
+                            img.save(buffered, format="PNG")
+                            imgStr = base64.b64encode(buffered.getvalue()).decode("utf-8")
+                            self.weConnect.cache[url] = (imgStr, str(datetime.utcnow()))
+                    elif imageDownloadResponse.status_code == requests.codes['unauthorized']:
+                        LOG.info('Server asks for new authorization')
+                        self.weConnect.login()
+                        imageDownloadResponse = self.weConnect.session.get(url, stream=True)
+                        if imageDownloadResponse.status_code == requests.codes['ok']:
+                            img = Image.open(imageDownloadResponse.raw)
+                            if self.weConnect.cache is not None:
+                                buffered = io.BytesIO()
+                                img.save(buffered, format="PNG")
+                                imgStr = base64.b64encode(buffered.getvalue()).decode("utf-8")
+                                self.weConnect.cache[url] = (imgStr, str(datetime.utcnow()))
+                        else:
+                            raise RetrievalError('Could not retrieve data even after re-authorization.'
+                                                 f' Status Code was: {imageDownloadResponse.status_code}')
+                        raise RetrievalError(f'Could not retrieve data. Status Code was: {imageDownloadResponse.status_code}')
+
+                if img is not None:
+                    self.__carImages[image['id']] = img
+                    if image['id'] == 'car_34view':
+                        if 'car' in self.pictures:
+                            self.pictures['car'].setValueWithCarTime(self.__carImages['car_34view'], lastUpdateFromCar=None, fromServer=True)
+                        else:
+                            self.pictures['car'] = AddressableAttribute(localAddress='car', parent=self.pictures, value=self.__carImages['car_34view'],
+                                                                        valueType=Image.Image)
+
+            self.updateStatusPicture()
+
+    def updateStatusPicture(self):  # noqa: C901
+        img = self.__carImages['car_birdview']
+
+        if 'accessStatus' in self.statuses:
+            accessStatus = self.statuses['accessStatus']
+            for name, door in accessStatus.doors.items():
+                doorNameMap = {'frontLeft': 'door_left_front',
+                               'frontRight': 'door_right_front',
+                               'rearLeft': 'door_left_back',
+                               'rearRight': 'door_right_back'}
+                name = doorNameMap.get(name, name)
+                doorImageName = None
+
+                if door.openState.value in (AccessStatus.Door.OpenState.OPEN, AccessStatus.Door.OpenState.INVALID):
+                    doorImageName = f'{name}_overlay'
+                elif door.openState.value == AccessStatus.Door.OpenState.CLOSED:
+                    doorImageName = name
+
+                if doorImageName is not None and doorImageName in self.__carImages:
+                    doorImage = self.__carImages[doorImageName].convert("RGBA")
+                    img.paste(doorImage, (0, 0), doorImage)
+
+            for name, window in accessStatus.windows.items():
+                windowNameMap = {'frontLeft': 'window_left_front',
+                                 'frontRight': 'window_right_front',
+                                 'rearLeft': 'window_left_back',
+                                 'rearRight': 'window_right_back',
+                                 'sunRoof': 'sunroof'}
+                name = windowNameMap.get(name, name)
+                windowImageName = None
+
+                if window.openState.value in (AccessStatus.Window.OpenState.OPEN, AccessStatus.Window.OpenState.INVALID):
+                    windowImageName = f'{name}_overlay'
+                elif window.openState.value == AccessStatus.Window.OpenState.CLOSED:
+                    windowImageName = f'{name}'
+
+                if windowImageName is not None and windowImageName in self.__carImages:
+                    windowImage = self.__carImages[windowImageName].convert("RGBA")
+                    img.paste(windowImage, (0, 0), windowImage)
+
+        if 'lightsStatus' in self.statuses:
+            lightsStatus = self.statuses['lightsStatus']
+            for name, light in lightsStatus.lights.items():
+                lightNameMap = {'frontLeft': 'door_left_front',
+                                'frontRight': 'door_right_front',
+                                'rearLeft': 'door_left_back',
+                                'rearRight': 'door_right_back'}
+                name = lightNameMap.get(name, name)
+                lightImageName = None
+
+                if light.status.value == LightsStatus.Light.LightState.ON:
+                    lightImageName = f'light_{name}'
+                    if lightImageName in self.__carImages:
+                        lightImage = self.__carImages[lightImageName].convert("RGBA")
+                        img.paste(lightImage, (0, 0), lightImage)
+
+        self.__carImages['status'] = img
+
+        if 'status' in self.pictures:
+            self.pictures['status'].setValueWithCarTime(img, lastUpdateFromCar=None, fromServer=True)
+        else:
+            self.pictures['status'] = AddressableAttribute(localAddress='status', parent=self.pictures, value=img, valueType=Image.Image)
+
     def __str__(self):  # noqa: C901
         returnString = ''
         if self.vin.enabled:
diff --git a/weconnect/util.py b/weconnect/util.py
index 1920318..353bc93 100644
--- a/weconnect/util.py
+++ b/weconnect/util.py
@@ -1,6 +1,11 @@
 import re
 from datetime import datetime
 
+import shutil
+
+from PIL import Image
+import ascii_magic
+
 
 def robustTimeParse(timeString):
     timestring = timeString.replace('Z', '+00:00')
@@ -17,3 +22,24 @@ def toBool(value):
     if value in [False, 'False', 'false', 'no']:
         return False
     raise ValueError('Not a valid boolean value (True/False)')
+
+
+def imgToASCIIArt(img, columns=0, mode=ascii_magic.Modes.TERMINAL):
+    bbox = img.getbbox()
+
+    # Crop the image to the contents of the bounding box
+    image = img.crop(bbox)
+
+    # Determine the width and height of the cropped image
+    (width, height) = image.size
+
+    # Create a new image object for the output image
+    cropped_image = Image.new("RGBA", (width, height), (0, 0, 0, 0))
+
+    # Paste the cropped image onto the new image
+    cropped_image.paste(image, (0, 0))
+
+    if columns == 0:
+        columns = shutil.get_terminal_size()[0]
+
+    return ascii_magic.from_image(cropped_image, columns=columns, mode=mode)
diff --git a/weconnect/weconnect.py b/weconnect/weconnect.py
index 9c9f030..8de6e3d 100644
--- a/weconnect/weconnect.py
+++ b/weconnect/weconnect.py
@@ -397,11 +397,11 @@ def __refreshToken(self):
     def vehicles(self):
         return self.__vehicles
 
-    def update(self, force=False):
-        self.updateVehicles(force=force)
+    def update(self, updateCapabilities=True, updatePictures=True, force=False):
+        self.updateVehicles(updateCapabilities=updateCapabilities, updatePictures=updatePictures, force=force)
         self.updateChargingStations(force=force)
 
-    def updateVehicles(self, updateCapabilities=True, force=False):  # noqa: C901
+    def updateVehicles(self, updateCapabilities=True, updatePictures=True, force=False):  # noqa: C901
         data = None
         cacheDate = None
         url = 'https://mobileapi.apps.emea.vwapps.io/vehicles'
@@ -436,10 +436,10 @@ def updateVehicles(self, updateCapabilities=True, force=False):  # noqa: C901
                     vins.append(vin)
                     if vin not in self.__vehicles:
                         vehicle = Vehicle(weConnect=self, vin=vin, parent=self.__vehicles, fromDict=vehicleDict,
-                                          fixAPI=self.fixAPI)
+                                          fixAPI=self.fixAPI, updateCapabilities=updateCapabilities, updatePictures=updatePictures)
                         self.__vehicles[vin] = vehicle
                     else:
-                        self.__vehicles[vin].update(fromDict=vehicleDict, updateCapabilities=updateCapabilities)
+                        self.__vehicles[vin].update(fromDict=vehicleDict, updateCapabilities=updateCapabilities, updatePictures=updatePictures)
                 # delete those vins that are not anymore available
                 for vin in [vin for vin in vins if vin not in self.__vehicles]:
                     del self.__vehicles[vin]