diff --git a/app-debug.apk b/app-debug.apk new file mode 100644 index 0000000..7ec56cb Binary files /dev/null and b/app-debug.apk differ diff --git a/master/droid_master.thrift b/master/droid_master.thrift new file mode 100644 index 0000000..6d0b97e --- /dev/null +++ b/master/droid_master.thrift @@ -0,0 +1,16 @@ +struct ConnParams { + 1: string host, + 2: string port, +} + +service MasterService { + void ping(), + + string get_package_name(1: string apk_url), + + ConnParams get_endpoint(1: string user), + + bool install_apk(1: string endpoint_id, 2: string apk_url), + + bool start_package(1: string endpoint_id, 2: string package_name), +} diff --git a/master/master.py b/master/master.py deleted file mode 100644 index 2d1700c..0000000 --- a/master/master.py +++ /dev/null @@ -1,87 +0,0 @@ -import json - -from kazoo.client import KazooClient, KazooState -from kazoo.protocol.states import EventType -from kazoo.recipe.watchers import DataWatch -from twisted.internet import reactor - -from logger import logger - -zk = KazooClient() - - -def init_nodes(): - zk.ensure_path('/droids/available') - zk.ensure_path('/droids/synchronize') - zk.ensure_path('/droids/synchronized') - zk.ensure_path('/droids/assigned') - - -def conn_listener(state): - if state == KazooState.CONNECTED: - logger.debug('connected...') - elif state == KazooState.LOST: - logger.debug('connection lost...') - elif state == KazooState.SUSPENDED: - logger.debug('connection suspended...') - - -@zk.ChildrenWatch('/droids/available') -def on_available_droid(children): - logger.debug('Registering watches on available droids') - for child in children: - child_p = '/droids/available/{}'.format(child) - DataWatch(zk, child_p, handle_dropped_droid) - - -def handle_dropped_droid(data, stat, event): - if event: - if event.type == EventType.CREATED: - # Implies a new emulator became available - logger.debug('WooHoo! Another droid joins our ranks!') - elif event.type == EventType.DELETED: - # Implies an existing emulator was disconnected - # Check if it was assigned and remove it from there. - node_name = event.path.rsplit('/', 1)[1] - assigned_emulators = zk.get_children('/droids/assigned') - if node_name in assigned_emulators: - logger.error('Oh no, droid-{} was lost'.format(node_name)) - assigned_p = '/droids/assigned/{}'.format(node_name) - zk.delete(assigned_p) - else: - logger.debug('These are not the events you are looking for.') - - -def synchronize_droid(droid_name, apks=None): - zk.create('/droids/synchronize/{}'.format(droid_name), - value=json.dumps({ - 'installed_apks': apks or [], - })) - - -def assign_droid(user): - available_droids = zk.get_children('/droids/available') - if not available_droids: - logger.error('No droids available') - return False - child = available_droids[0] - child_p = '/droids/available/{}'.format(child) - data, stat = zk.get(child_p) - data = json.loads(data) - transaction = zk.transaction() - transaction.check(child_p, stat.version) - transaction.delete(child_p) - data['user'] = user - transaction.create( - '/droids/assigned/{}'.format(child), - value=json.dumps(data), - ) - transaction.commit() - return True - - -if __name__ == '__main__': - zk.start() - init_nodes() - reactor.addSystemEventTrigger('before', 'shutdown', lambda: zk.stop()) - reactor.run() diff --git a/master/settings.py b/master/settings.py new file mode 100644 index 0000000..e69de29 diff --git a/master/zk_master.py b/master/zk_master.py new file mode 100644 index 0000000..bbf2ceb --- /dev/null +++ b/master/zk_master.py @@ -0,0 +1,92 @@ +import json + +from kazoo.client import KazooClient, KazooState +from kazoo.protocol.states import EventType +from kazoo.recipe.watchers import DataWatch + +from logger import logger +from master.settings import ZOOKEEPER_HOST, ZOOKEEPER_PORT + + +class MasterZkClient(object): + def __init__(self): + host='{}:{}'.format(ZOOKEEPER_HOST, ZOOKEEPER_PORT) + zk = KazooClient(host) + zk.add_listener(self.conn_listener) + self.zk = zk + self.on_available_droid = ChildrenWatch( + zk, + '/droids/available', + self.on_available_droid, + ) + + def setup(self): + self.zk.ensure_path('/droids/available') + self.zk.ensure_path('/droids/assigned') + + @staticmethod + def conn_listener(state): + if state == KazooState.CONNECTED: + logger.debug('connected...') + elif state == KazooState.LOST: + logger.debug('connection lost...') + elif state == KazooState.SUSPENDED: + logger.debug('connection suspended...') + + + def on_available_droid(self, children): + logger.debug('Registering watches on available droids') + for child in children: + child_p = '/droids/available/{}'.format(child) + DataWatch(self.zk, child_p, self.handle_dropped_droid) + + + def handle_dropped_droid(self, data, stat, event): + if event: + if event.type == EventType.CREATED: + # Implies a new droid became available + logger.debug('WooHoo! Another droid joins our ranks!') + elif event.type == EventType.DELETED: + # Implies an existing droid was disconnected + # Check if it was assigned and remove it from there. + node_name = event.path.rsplit('/', 1)[1] + assigned_emulators = self.zk.get_children('/droids/assigned') + if node_name in assigned_emulators: + # Check if droid was being assigned, by comparing creation + # times. + assigned_p = '/droids/assigned/{}'.format(node_name) + adata, astat = self.zk.get(assigned_p) + import ipdb; ipdb.set_trace() + logger.error('Oh no, droid-{} was lost'.format(node_name)) + self.zk.delete(assigned_p) + else: + logger.debug('These are not the events you are looking for.') + + def assign_droid(self, user): + available_droids = self.zk.get_children('/droids/available') + if not available_droids: + logger.error('No droids available') + return False + child = available_droids[0] + child_p = '/droids/available/{}'.format(child) + data, stat = self.zk.get(child_p) + data = json.loads(data) + transaction = self.zk.transaction() + transaction.check(child_p, stat.version) + transaction.delete(child_p) + data['user'] = user + transaction.create( + '/droids/assigned/{}'.format(child), + value=json.dumps(data), + ) + transaction.commit() + return True + + +if __name__ == '__main__': + zk.start() + init_nodes() + from twisted.internet import reactor + + reactor.addSystemEventTrigger('before', 'shutdown', lambda: zk.stop()) + reactor.run() diff --git a/settings.py b/settings.py index a32d87e..aca8ba2 100644 --- a/settings.py +++ b/settings.py @@ -1,3 +1,5 @@ import os -ROOT_DIR = os.path.dirname(__file__) \ No newline at end of file +ROOT_DIR = os.path.dirname(__file__) +ZK_HOST = "localhost" +ZK_PORT = "2181" \ No newline at end of file diff --git a/worker/config.json b/worker/config.json index ed04995..27a31f3 100644 --- a/worker/config.json +++ b/worker/config.json @@ -1,11 +1,8 @@ { "thrift_host": "0.0.0.0", "thrift_port": "9090", - "zk_host": "localhost", - "zk_port": "2181", "droids": [ { - "name": "r2-d2", "port": "5554", "avd": "Nexus_6_API_25" } diff --git a/worker/helpers.py b/worker/helpers.py index cc5556c..37e3b3b 100644 --- a/worker/helpers.py +++ b/worker/helpers.py @@ -1,7 +1,7 @@ import os from worker.emulator import Emulator -from zk_client import DroidZkClient +from zk_droid import DroidZkClient class DroidBuilder(object): @@ -23,48 +23,51 @@ def build(self): class DroidCoordinator(object): def __init__(self): - self.instances = {} + self.droids_to_start = [] + self.initialised = {} - def set_endpoint(self, id, endpoint): - self.instances[id] = { - 'endpoint': endpoint, - 'zk_client': DroidZkClient(id), - } + def add_droid(self, droid): + self.droids_to_start.append(droid) - def get_endpoint(self, id): - return self.instances[id]['endpoint'] + def start_droid(self, droid): + droid.start() + zk_client = DroidZkClient() + zk_client.setup() + self.initialised[zk_client.nodename] = { + 'droid': droid, + 'zk_client': zk_client, + } - def get_zk_client(self, id): - return self.instances[id]['zk_client'] + def setup(self): + while True: + try: + droid = self.droids_to_start.pop() + self.start_droid(droid) + except IndexError: + break def count(self): - return len(self.instances) + return len(self.initialised) - def start_endpoint(self, id): - endpoint = self.get_endpoint(id) - # Start droid - endpoint.start() - # Setup Zk client - zk_client = self.get_zk_client(id) - zk_client.setup() + def get_droid(self, id): + return self.initialised[id]['droid'] - def iter_endpoints(self): - return self.instances.iteritems() - - def setup(self): - for instance_id, _ in self.iter_endpoints(): - self.start_endpoint(instance_id) + def get_zk_client(self, id): + return self.initialised[id]['zk_client'] - def stop_endpoint(self, id): - endpoint = self.get_endpoint(id) - # Stop droid - endpoint.stop() - # Setup Zk client + def stop_droid(self, id): + # Stop Zk client zk_client = self.get_zk_client(id) zk_client.teardown() + # Stop droid + droid = self.get_droid(id) + droid.stop() + + def iter_droid_ids(self): + return self.initialised.iterkeys() def teardown(self): - for instance_id, _ in self.iter_endpoints(): - self.stop_endpoint(instance_id) + for instance_id, _ in self.iter_droid_ids(): + self.stop_droid(instance_id) \ No newline at end of file diff --git a/worker/thrift_server.py b/worker/thrift_server.py index b4c03bd..4aabc3d 100644 --- a/worker/thrift_server.py +++ b/worker/thrift_server.py @@ -3,6 +3,7 @@ from logger import logger from worker.helpers import DroidCoordinator, DroidBuilder from worker.utils import get_config, get_public_hostname +from worker.utils import get_package_name_from_url from tgen.droid_service.ttypes import ConnParams from tgen.droid_service import DroidService @@ -26,7 +27,7 @@ def setup(self): builder.set_port(droid['port']) if droid['avd']: builder.set_avd(droid['avd']) - self.coordinator.set_endpoint(droid['name'], builder.build()) + self.coordinator.add_droid(builder.build()) # Start all endpoints now self.coordinator.setup() @@ -35,11 +36,11 @@ def ping(self): def get_package_name(self, apk_url): logger.debug("getting package name for apk {}".format(apk_url)) - return "some random package" + return get_package_name_from_url(apk_url) def get_endpoint(self, endpoint_id): logger.debug("Getting endpoint for {}".format(endpoint_id)) - endpoint = self.coordinator.get_endpoint(endpoint_id) + endpoint = self.coordinator.get_droid(endpoint_id) cp = ConnParams() cp.host = get_public_hostname() cp.port = endpoint.port diff --git a/worker/utils.py b/worker/utils.py index 4c9f4fd..8823707 100644 --- a/worker/utils.py +++ b/worker/utils.py @@ -1,12 +1,57 @@ -from copy import deepcopy +import re import json import os import socket +from copy import deepcopy +from tempfile import NamedTemporaryFile import requests +from mirakuru import SimpleExecutor from requests.exceptions import ConnectTimeout from settings import ROOT_DIR +from logger import logger + + +def get_temp_apk_path(): + nf = NamedTemporaryFile(suffix='.apk', delete=True) + nf.close() + return nf.name + + +def download_to(url, target_p): + resp = requests.get(url, stream=True) + SIZE_4MB = 4 << 20 + with open(target_p, 'wb') as target: + for chunk in resp.iter_content(SIZE_4MB): + target.write(chunk) + return target_p + + +def download_to_temp(url): + target_p = get_temp_apk_path() + return download_to(url, target_p) + + +def get_package_name_from_url(apk_url): + download_to = download_to_temp(apk_url) + return get_package_name(download_to) + + +def get_package_name(path): + cmd = ["aapt", "dump", "badging", path] + proc = SimpleExecutor(cmd) + proc.start() + p = re.compile(r"^package: name='([\w\.]+)'") + for line in proc.output(): + m = p.match(line) + if m: + pkg = m.group(1) + logger.debug('Extracted package name {}'.format(pkg)) + return pkg + error = 'Unable to extract package name for file {}'.format( + os.path.basename(path)) + raise Exception(error) def is_open_port(port): diff --git a/worker/zk_client.py b/worker/zk_droid.py similarity index 54% rename from worker/zk_client.py rename to worker/zk_droid.py index 98fe48f..8632f20 100644 --- a/worker/zk_client.py +++ b/worker/zk_droid.py @@ -7,27 +7,27 @@ from logger import logger from worker.emulator import Emulator from worker.utils import get_config, get_public_hostname - - -def conn_listener(state): - if state == KazooState.CONNECTED: - logger.debug('connected...') - elif state == KazooState.LOST: - logger.debug('connection lost...') - elif state == KazooState.SUSPENDED: - logger.debug('connection suspended...') +from settings import ZK_HOST, ZK_PORT class DroidZkClient(object): - def __init__(self, nodename): - self.nodename = nodename + def __init__(self): + self.nodename = None - config = get_config() - host='{}:{}'.format(config['zk_host'], config['zk_port']) + host='{}:{}'.format(ZK_HOST, ZK_PORT) zk = KazooClient(host) - zk.add_listener(conn_listener) + zk.add_listener(self.conn_listener) self.zk = zk + @staticmethod + def conn_listener(state): + if state == KazooState.CONNECTED: + logger.debug('connected...') + elif state == KazooState.LOST: + logger.debug('connection lost...') + elif state == KazooState.SUSPENDED: + logger.debug('connection suspended...') + def setup(self): logger.debug('Registering onto zookeeper') self.zk.start() @@ -36,9 +36,11 @@ def setup(self): 'thrift_host': get_public_hostname(), 'thrift_port': config['thrift_port'], } - self.zk.create('/droids/available/{}'.format(self.nodename), - value=json.dumps(value), - makepath=True, ephemeral=True) + path = self.zk.create( + '/droids/available/droid', + value=json.dumps(value), sequence=True, + makepath=True, ephemeral=True) + self.nodename = path.rsplit('/', 1)[1] def teardown(self): self.zk.stop()