Skip to content
This repository has been archived by the owner on Mar 1, 2019. It is now read-only.

Commit

Permalink
add watcher
Browse files Browse the repository at this point in the history
  • Loading branch information
shengxiang committed Mar 7, 2016
1 parent 2eca364 commit b764d19
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 66 deletions.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,14 @@ try:
except airtest.ImageNotFoundError:
print('Image not found')

# watcher, trigger when screenshot is called
w = airtest.Watcher()
w.on('setting.png', airtest.Watcher.ACTION_TOUCH)
w.on('common.png', airtest.Watcher.ACTION_TOUCH)

wid = d.add_watcher(w)
# d.del_watcher(wid) # remove watcher

d.stop_app(package_name)
```

Expand Down
2 changes: 2 additions & 0 deletions airtest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

from airtest.consts import *
from airtest.errors import *
from airtest.device import Watcher


def connect(serialno, platform='android'):
"""Connect to a device, and return its object
Expand Down
175 changes: 111 additions & 64 deletions airtest/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from airtest import errors


FindPoint = collections.namedtuple('FindPoint', ['pos', 'confidence'])
log = base.getLogger('devsuit')

__dir__ = os.path.dirname(os.path.abspath(__file__))
Expand All @@ -35,15 +36,37 @@
DISPLAY_RE = re.compile(
'.*DisplayViewport{valid=true, .*orientation=(?P<orientation>\d+), .*deviceWidth=(?P<width>\d+), deviceHeight=(?P<height>\d+).*')

class Watcher(object):
ACTION_TOUCH = 1

def __init__(self):
self._events = {}

def on(self, image, action):
self._events[image] = action

def hook(self, screen, d):
for (img, action) in self._events.items():
if action == Watcher.ACTION_TOUCH:
ret = d.exists(img, screen=screen)
if ret is None:
continue
if ret.confidence < 0.9:
print("Skip confidence:", ret.confidence)
continue
print('trigger watch click')
d.click(*ret.pos)


class AndroidDevice(UiaDevice):
def __init__(self, serialno=None):
super(AndroidDevice, self).__init__(serialno)
self._serial = serialno
self._uiauto = super(AndroidDevice, self)
self._minicap_params = None
self._watchers = {}

self.screenshot_method = consts.SCREENSHOT_METHOD_UIAUTOMATOR

# print 'DEVSUIT_SERIALNO:', phoneno
# self.dev = dev
# self.appname = appname
Expand Down Expand Up @@ -94,6 +117,14 @@ def sleep(self, secs):
time.sleep(1)
sys.stdout.write("\n")

def add_watcher(self, w):
id = time.time()
self._watchers[id] = w
return id

def del_watcher(self, id):
self._watchers.pop(id, None)

def _tmp_filename(self, prefix='tmp-', ext='.png'):
return '%s%s%s' %(prefix, time.time(), ext)

Expand Down Expand Up @@ -135,31 +166,36 @@ def screenshot(self, filename=None):
Take screen snapshot
Args:
filename: filename where save to
filename: filename where save to, optional
Returns:
None
cv2 Image object
Raises:
TypeError
"""
image = None
screen = None
if self.screenshot_method == consts.SCREENSHOT_METHOD_UIAUTOMATOR:
tmp_file = os.path.join(__tmp__, self._tmp_filename())
self._uiauto.screenshot(tmp_file)
image = cv2.imread(tmp_file)
screen = cv2.imread(tmp_file)
os.remove(tmp_file)
elif self.screenshot_method == consts.SCREENSHOT_METHOD_MINICAP:
image = self._minicap()
screen = self._minicap()
else:
raise TypeError('Invalid screenshot_method')

if filename:
save_dir = os.path.dirname(filename) or '.'
if not os.path.exists(save_dir):
os.makedirs(save_dir)
cv2.imwrite(filename, image)
return image
cv2.imwrite(filename, screen)

# handle watchers
for w in self._watchers.values():
w.hook(screen, self)

return screen

def adbrun(self, command):
'''
Expand Down Expand Up @@ -221,7 +257,6 @@ def stop_app(self, package_name, clear=False):
Returns:
None
'''
# FIXME(ssx): not sure if this method will clean data
if clear:
self.shell(['pm', 'clear', package_name])
else:
Expand All @@ -240,13 +275,34 @@ def _read_img(self, img):
# FIXME(ssx): need support other types
return img

def _get_screen_img(self):
tmp_file = os.path.join(__tmp__, self._tmp_filename())
self.screenshot(tmp_file)
# log.debug("touch image save screen to %s", tmp_file)
img = ac.imread(tmp_file)
os.remove(tmp_file)
return img
# def _get_screen_img(self):
# tmp_file = os.path.join(__tmp__, self._tmp_filename())
# self.screenshot(tmp_file)
# # log.debug("touch image save screen to %s", tmp_file)
# img = ac.imread(tmp_file)
# os.remove(tmp_file)
# return img

def exists(self, img, screen=None):
"""Change if image exists
Args:
img: string or opencv image
screen: opencv image, optional, if not None, screenshot will be called
Returns:
None or FindPoint
"""
search_img = self._read_img(img)
if screen is None:
screen = self.screenshot()
ret = ac.find_template(screen, search_img)
if ret is None:
return None
position = ret['result']
confidence = ret['confidence']
log.info('match confidence: %s', confidence)
return FindPoint(position, confidence)

def touch_image(self, img, timeout=20.0, wait_change=False):
"""Simulate touch according image position
Expand All @@ -266,14 +322,17 @@ def touch_image(self, img, timeout=20.0, wait_change=False):
start_time = time.time()
found = False
while time.time() - start_time < timeout:
screen_img = self._get_screen_img()
ret = ac.find_template(screen_img, search_img)
if ret is None:
point = self.exists(search_img)
# screen_img = self.screenshot()
# ret = ac.find_template(screen_img, search_img)
# if ret is None:
# continue
# position = ret['result']
# confidence = ret['confidence']
# log.info('match confidence: %s', confidence)
if point is None:
continue
position = ret['result']
confidence = ret['confidence']
log.info('match confidence: %s', confidence)
self._uiauto.click(*position)
self._uiauto.click(*point.pos)
found = True
break

Expand Down Expand Up @@ -394,18 +453,6 @@ def _search_image(self, filename):
return fullpath
raise RuntimeError('Image file(%s) not found in %s' %(filename, self._image_dirs))

# def _val_to_point(self, v):
# '''
# Convert v to point
# @return (x, y) or None if not found
# '''
# if isinstance(v, basestring):
# v = self.find(v)
# if not v:
# return None
# (x, y) = self._fix_point(v)#(PS[0], PS[1]))#(1L, 2L))
# return (x, y)

def _save_screen(self, filename, random_name=True, tempdir=True):
# use last snapshot file
if self._snapshot_file and self._keep_capture:
Expand Down Expand Up @@ -589,36 +636,36 @@ def wait(self, imgfile, timeout=20):
time.sleep(1)
raise RuntimeError('Wait timeout(%.2f)', float(timeout))

def exists(self, imgfile):
return True if self.find(imgfile) else False
# def exists(self, imgfile):
# return True if self.find(imgfile) else False

def click(self, img_or_point, timeout=None, duration=None):
'''
Click function
@param seconds: float (if time not exceed, it will retry and retry)
'''
if timeout is None:
timeout = self._click_timeout
log.info('CLICK %s, timeout=%.2fs, duration=%s', img_or_point, timeout, str(duration))
point = self._val_to_point(img_or_point)
if point:
(x, y) = point
else:
(x, y) = self.wait(img_or_point, timeout=timeout)
log.info('Click %s point: (%d, %d)', img_or_point, x, y)
self.dev.touch(x, y, duration)
log.debug('delay after click: %.2fs', self._delay_after_click)

# FIXME(ssx): not tested
if self._operation_mark:
if self._snapshot_file and os.path.exists(self._snapshot_file):
img = ac.imread(self._snapshot_file)
ac.mark_point(img, (x, y))
cv2.imwrite(self._snapshot_file, img)
if self._devtype == 'android':
self.dev.adbshell('am', 'broadcast', '-a', 'MP_POSITION', '--es', 'msg', '%d,%d' %(x, y))

time.sleep(self._delay_after_click)
# def click(self, img_or_point, timeout=None, duration=None):
# '''
# Click function
# @param seconds: float (if time not exceed, it will retry and retry)
# '''
# if timeout is None:
# timeout = self._click_timeout
# log.info('CLICK %s, timeout=%.2fs, duration=%s', img_or_point, timeout, str(duration))
# point = self._val_to_point(img_or_point)
# if point:
# (x, y) = point
# else:
# (x, y) = self.wait(img_or_point, timeout=timeout)
# log.info('Click %s point: (%d, %d)', img_or_point, x, y)
# self.dev.touch(x, y, duration)
# log.debug('delay after click: %.2fs', self._delay_after_click)

# # FIXME(ssx): not tested
# if self._operation_mark:
# if self._snapshot_file and os.path.exists(self._snapshot_file):
# img = ac.imread(self._snapshot_file)
# ac.mark_point(img, (x, y))
# cv2.imwrite(self._snapshot_file, img)
# if self._devtype == 'android':
# self.dev.adbshell('am', 'broadcast', '-a', 'MP_POSITION', '--es', 'msg', '%d,%d' %(x, y))

# time.sleep(self._delay_after_click)

def center(self):
'''
Expand Down
12 changes: 10 additions & 2 deletions tests/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def screenshot():
# Image now save in screen2.png
# d._minicap()
start = time.time()
d.screenshot_method = consts.SCREENSHOT_METHOD_MINICAP
# d.screenshot_method = consts.SCREENSHOT_METHOD_MINICAP
d.screenshot('ttt.png')
print time.time() - start

Expand All @@ -39,6 +39,14 @@ def touch():
d.touch_image('button.png')

if __name__ == '__main__':
stop_app()
# stop_app()
w = airtest.Watcher()
w.on('setting.png', airtest.Watcher.ACTION_TOUCH)
w.on('common.png', airtest.Watcher.ACTION_TOUCH)

wid = d.add_watcher(w)
d.del_watcher(wid)
while 1:
screenshot()
# screenshot()
# touch()

0 comments on commit b764d19

Please sign in to comment.