Skip to content

Commit

Permalink
add kwargs error_ok for screenshot (#118), fix window_size on tablele…
Browse files Browse the repository at this point in the history
…t, fix adb-server not start on windows

* add kwargs error_ok for screenshot
* fix adb not start-server on windows
* fix screenshot with display_id
* change socket timeout from 0.5 to 3
* fix window_size for android tablelet
* manual merge pr #124
  • Loading branch information
codeskyblue authored May 20, 2024
1 parent 2138c40 commit 77beeb2
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 55 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,11 @@ Take screenshot
```python
# Method 1 (Recommend)
pil_image = d.screenshot()
# default display_id=0, error_ok=True
try:
pil_image = d.screenshot(display_id=1, error_ok=False)
except AdbError:
print("failed to takeScreenshot")

# Method 2
# adb exec-out screencap -p p.png
Expand Down
7 changes: 4 additions & 3 deletions adbutils/_adb.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ def _create_socket(self):
adb_port = self.__port
s = socket.socket()
try:
s.settimeout(.1) # prevent socket hang
s.settimeout(3) # prevent socket hang
s.connect((adb_host, adb_port))
s.settimeout(None)
return s
except socket.timeout as e:
raise AdbTimeout("connect to adb server timeout")
raise AdbTimeout("connect to adb server timeout") # windows raise timeout, mac raise connection error
except socket.error as e:
raise AdbConnectionError("connect to adb server failed: %s" % e)

Expand All @@ -65,7 +65,8 @@ def _safe_connect(self):
try:
return self._create_socket()
except AdbConnectionError:
subprocess.run([adb_path(), "start-server"], timeout=20.0) # 20s should enough for adb start
flags = subprocess.CREATE_NO_WINDOW if os.name == 'nt' else 0
subprocess.run([adb_path(), "start-server"], timeout=20.0, creationflags=flags) # 20s should enough for adb start
return self._create_socket()

@property
Expand Down
1 change: 0 additions & 1 deletion adbutils/_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,4 +507,3 @@ def __init__(
):
BaseDevice.__init__(self, client, serial, transport_id)
ScreenrecordExtension.__init__(self)
ScreenshotExtesion.__init__(self)
49 changes: 30 additions & 19 deletions adbutils/screenshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
import abc
import io
import logging
import re
from typing import Optional, Union
from adbutils.errors import AdbError
from adbutils.sync import Sync
from adbutils._proto import WindowSize
from PIL import Image
Expand Down Expand Up @@ -39,41 +41,50 @@ def framebuffer(self) -> Image.Image:
pass

class ScreenshotExtesion(AbstractDevice):
def __init__(self):
self.__framebuffer_ok = True

def screenshot(self, display_id: Optional[int] = None) -> Image.Image:
def screenshot(self, display_id: Optional[int] = None, error_ok: bool = True) -> Image.Image:
""" Take a screenshot and return PIL.Image.Image object
Args:
display_id: int, default None, see "dumpsys SurfaceFlinger --display-id" for valid display IDs
error_ok: bool, default True, if True, return a black image when capture failed
Returns:
PIL.Image.Image object, If capture failed, return a black image
PIL.Image.Image object
Raises:
AdbError: when capture failed and error_ok is False
"""
try:
pil_image = self.__screencap(display_id)
if pil_image.mode == "RGBA":
pil_image = pil_image.convert("RGB")
return pil_image
except UnidentifiedImageError as e:
wsize = self.window_size()
return Image.new("RGB", wsize, (0, 0, 0))
logger.warning("screencap error: %s", e)
if error_ok:
wsize = self.window_size()
return Image.new("RGB", wsize, (0, 0, 0))
else:
raise AdbError("screencap error") from e

def __screencap(self, display_id: int = None) -> Image.Image:
""" Take a screenshot and return PIL.Image.Image object
"""
# framebuffer is not stable, so we disable it
# MemoryError may occur when using framebuffer

# if self.__framebuffer_ok and display_id is None:
# try:
# return self.framebuffer()
# except NotImplementedError:
# self.__framebuffer_ok = False
# except UnidentifiedImageError as e:
# logger.warning("framebuffer error: %s", e)
# framebuffer() is not stable, so here still use screencap
cmdargs = ['screencap', '-p']
if display_id is not None:
cmdargs.extend(['-d', str(display_id)])
_id = self.__get_real_display_id(display_id)
cmdargs.extend(['-d', _id])
png_bytes = self.shell(cmdargs, encoding=None)
return Image.open(io.BytesIO(png_bytes))
return Image.open(io.BytesIO(png_bytes))

def __get_real_display_id(self, display_id: int) -> str:
# adb shell dumpsys SurfaceFlinger --display-id
# Display 4619827259835644672 (HWC display 0): port=0 pnpId=GGL displayName="EMU_display_0"
output = self.shell("dumpsys SurfaceFlinger --display-id")
_RE = re.compile(r"Display (\d+) ")
ids = _RE.findall(output)
if not ids:
raise AdbError("No display found, debug with 'dumpsys SurfaceFlinger --display-id'")
if display_id >= len(ids):
raise AdbError("Invalid display_id", display_id)
return ids[display_id]
66 changes: 34 additions & 32 deletions adbutils/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import abc
import datetime
import json
import logging
import re
import time
from typing import List, Optional, Union
Expand All @@ -17,6 +18,8 @@

from adbutils.sync import Sync

logger = logging.getLogger(__name__)

_DISPLAY_RE = re.compile(
r".*DisplayViewport{.*?valid=true, .*?orientation=(?P<orientation>\d+), .*?deviceWidth=(?P<width>\d+), deviceHeight=(?P<height>\d+).*"
)
Expand Down Expand Up @@ -153,34 +156,45 @@ def switch_wifi(self, enable: bool):

def window_size(self) -> WindowSize:
"""
Return screen (width, height)
Return screen (width, height) in pixel, width and height will be swapped if rotation is 90 or 270
Virtual keyborad may get small d.info['displayHeight']
Returns:
WindowSize
Raises:
AdbError
"""
w, h = self._raw_window_size()
s, l = min(w, h), max(w, h)
horizontal = self.rotation() % 2 == 1
return WindowSize(l, s) if horizontal else WindowSize(s, l)

def _raw_window_size(self) -> WindowSize:
try:
logger.debug("get window size from 'dumpsys display'")
return self._dumpsys_window_size()
except AdbError:
logger.debug("get window size from 'wm size'")
wsize = self._wm_size()
horizontal = self.rotation() % 2 == 1
return WindowSize(wsize.height, wsize.width) if horizontal else wsize

def _dumpsys_window_size(self) -> WindowSize:
output = self.shell("dumpsys display")
for line in output.splitlines():
m = _DISPLAY_RE.search(line, 0)
if not m:
continue
w = int(m.group("width"))
h = int(m.group("height"))
return WindowSize(w, h)
raise AdbError("get window size from 'dumpsys display' failed", output)

def _wm_size(self) -> WindowSize:
output = self.shell("wm size")
o = re.search(r"Override size: (\d+)x(\d+)", output)
m = re.search(r"Physical size: (\d+)x(\d+)", output)
if o:
w, h = o.group(1), o.group(2)
return WindowSize(int(w), int(h))
elif m:
m = re.search(r"Physical size: (\d+)x(\d+)", output)
if m:
w, h = m.group(1), m.group(2)
return WindowSize(int(w), int(h))

for line in self.shell("dumpsys display").splitlines():
m = _DISPLAY_RE.search(line, 0)
if not m:
continue
w = int(m.group("width"))
h = int(m.group("height"))
return WindowSize(w, h)
raise AdbError("get window size failed")
raise AdbError("wm size output unexpected", output)

def swipe(self, sx, sy, ex, ey, duration: float = 1.0) -> None:
"""
Expand Down Expand Up @@ -254,23 +268,11 @@ def rotation(self) -> int:
int [0, 1, 2, 3]
"""
for line in self.shell("dumpsys display").splitlines():
m = _DISPLAY_RE.search(line, 0)
m = re.search(r".*?orientation=(?P<orientation>\d+)", line)
if not m:
continue
o = int(m.group("orientation"))
return int(o)

output = self.shell(
"LD_LIBRARY_PATH=/data/local/tmp /data/local/tmp/minicap -i"
)
try:
if output.startswith("INFO:"):
output = output[output.index("{"):]
data = json.loads(output)
return data["rotation"] / 90
except ValueError:
pass

raise AdbError("rotation get failed")

def remove(self, path: str):
Expand Down
42 changes: 42 additions & 0 deletions tests/test_adb_shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,56 @@
"""Created on Mon May 06 2024 14:41:10 by codeskyblue
"""

from unittest import mock
import pytest
import adbutils
from adbutils.errors import AdbError


def test_shell_pwd(adb: adbutils.AdbClient):
d = adb.device(serial="123456")
assert d.shell("pwd") == "/"


def test_shell_screenshot(adb: adbutils.AdbClient):
d = adb.device(serial="123456")

def mock_shell(cmd: str, encoding='utf-8', **kwargs):
if encoding is None:
return b""
if cmd == "wm size":
return "Physical size: 1080x1920"
return b""

d.shell = mock_shell
d.rotation = lambda: 0

with pytest.raises(AdbError):
d.screenshot(error_ok=False)
pil_img = d.screenshot(error_ok=True)
assert pil_img.size == (1080, 1920)

# assert pixel is blank
pixel = pil_img.getpixel((0, 0))
assert pixel[:3] == (0, 0, 0)


def test_window_size(adb: adbutils.AdbClient):
d = adb.device(serial="123456")

def mock_shell(cmd):
if cmd == "wm size":
return "Physical size: 1080x1920"
if cmd == "dumpsys display":
return "mViewports=[DisplayViewport{orientation=0]"
return ""

d.shell = mock_shell
wsize = d.window_size()
assert wsize.width == 1080
assert wsize.height == 1920


def test_shell_battery(adb: adbutils.AdbClient):
d = adb.device(serial="123456")

Expand Down

0 comments on commit 77beeb2

Please sign in to comment.