Skip to content

Commit

Permalink
Split sockets subsystem into setup and handlers
Browse files Browse the repository at this point in the history
Added s3 subsystem
  • Loading branch information
Pedro Crespo committed Nov 12, 2018
1 parent 5098583 commit 0ff6c01
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
from .rest import setup_rest
from .security import setup_security
from .session import setup_session
from .sockets import setup_sio
from .sockets import setup_sockets
from .statics import setup_statics
from .director import setup_director
from .s3 import setup_s3

log = logging.getLogger(__name__)

Expand All @@ -41,9 +43,11 @@ def create_application(config: dict):
setup_email(app)
setup_computation(app)
setup_statics(app)
setup_sio(app)
setup_sockets(app)
setup_rest(app)
setup_login(app)
setup_director(app)
setup_s3(app)

return app

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
log = logging.getLogger(__name__)




def create_schema():
"""
Build schema for the configuration's file
Expand All @@ -52,6 +50,8 @@ def create_schema():
rest_config.CONFIG_SECTION_NAME: rest_config.schema,
email_config.CONFIG_SECTION_NAME: email_config.schema,
computation_config.CONFIG_SECTION_NAME: computation_config.schema,
#s3_config.CONFIG_SECTION_NAME: s3_config.schema
#TODO: enable when sockets are refactored
})


Expand Down
12 changes: 12 additions & 0 deletions services/web/server/src/simcore_service_webserver/director.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,24 @@
from aiohttp import web

import logging
from .application_keys import APP_CONFIG_KEY
from .director_config import CONFIG_SECTION_NAME

logger = logging.getLogger(__name__)

def setup(app: web.Application):
log.debug("Setting up %s ...", __name__)

assert CONFIG_SECTION_NAME in app[APP_CONFIG_KEY]


# TODO: implement!!!



# alias
setup_director = setup

__all__ = (
'setup_director'
)
39 changes: 39 additions & 0 deletions services/web/server/src/simcore_service_webserver/s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
""" s3 subsystem
Provides a client-sdk to interact with minio services
"""
import logging

from aiohttp import web

from .application_keys import APP_CONFIG_KEY
from .s3_config import CONFIG_SECTION_NAME

#from s3wrapper.s3_client import S3Client

logger = logging.getLogger(__name__)

def setup(app: web.Application):
logger.debug("Setting up %s ...", __name__)

assert CONFIG_SECTION_NAME not in app[APP_CONFIG_KEY], "Temporarily disabled"

# TODO: implement!!!

# TODO: enable when sockets are refactored
#cfg = app[APP_CONFIG_KEY][CONFIG_SECTION_NAME]
#
# client = S3Client(
# endpoint=cfg['endpoint'],
# access_key=cfg['access_key'],
# secret_key=cfg['secret_key'])

# app["s3.client"] = client


# alias
setup_s3 = setup

__all__ = (
'setup_s3'
)
11 changes: 11 additions & 0 deletions services/web/server/src/simcore_service_webserver/s3_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
""" s3 subsystem's configuration
- config-file schema
- settings
"""
#import trafaret as T
from simcore_sdk.config.s3 import CONFIG_SCHEMA as _S3_SCHEMA

CONFIG_SECTION_NAME = 's3'

schema = _S3_SCHEMA
111 changes: 13 additions & 98 deletions services/web/server/src/simcore_service_webserver/sockets.py
Original file line number Diff line number Diff line change
@@ -1,112 +1,27 @@
""" Defines **async** handlers for socket.io server
""" socket io subsystem
SEE https://pypi.python.org/pypi/python-socketio
SEE http://python-socketio.readthedocs.io/en/latest/
"""
# pylint: disable=C0111
# pylint: disable=W0703
"""
import logging
import socketio

from s3wrapper.s3_client import S3Client
from simcore_sdk.config.s3 import Config as s3_config

from . import interactive_services_manager

log = logging.getLogger(__file__)

# TODO: separate API from server application!
SIO = socketio.AsyncServer(async_mode="aiohttp", logging=log)

from aiohttp import web

@SIO.on("connect")
def connect(sid, environ):
# pylint: disable=W0613
# environ = WSGI evnironment dictionary
log.debug("client %s connects", sid)
interactive_services_manager.session_connect(sid)
return True
from .sockets_handlers import sio

@SIO.on("startDynamic")
async def start_dynamic_service(sid, data):
log.debug("client %s starts dynamic service %s", sid, data)
try:
service_key = data["serviceKey"]
service_version = "latest"
# if "serviceVersion" in data:
# service_version = data["serviceVersion"]
node_id = data["nodeId"]
result = await interactive_services_manager.start_service(sid, service_key, node_id, service_version)
await SIO.emit("startDynamic", data=result, room=sid)
except IOError:
log.exception("Error emitting results")
except Exception:
log.exception("Error while starting service")

@SIO.on("stopDynamic")
async def stop_dynamic_service(sid, data):
log.debug("client %s stops dynamic service %s", sid, data)
try:
node_id = data["nodeId"]
await interactive_services_manager.stop_service(sid, node_id)
except Exception:
log.exception("Error while stopping service")
log = logging.getLogger(__name__)

@SIO.on("presignedUrl")
async def retrieve_url_for_file(sid, data):
log.debug("client %s requests S3 url for %s", sid, data)
_config = s3_config()
log.debug("S3 endpoint %s", _config.endpoint)

def setup(app: web.Application):
log.debug("Setting up %s ...", __name__)

s3_client = S3Client(endpoint=_config.endpoint,
access_key=_config.access_key, secret_key=_config.secret_key)
url = s3_client.create_presigned_put_url(_config.bucket_name, data["fileName"])
#result = minioClient.presigned_put_object(data["bucketName"], data["fileName"])
# Response error is still possible since internally presigned does get
# bucket location.
data_out = {}
data_out["url"] = url
try:
await SIO.emit("presignedUrl", data=data_out, room=sid)
except IOError:
log.exception("Error emitting results")

@SIO.on("listObjects")
async def list_S3_objects(sid, data):
log.debug("client %s requests objects in storage. Extra argument %s", sid, data)
_config = s3_config()

s3_client = S3Client(endpoint=_config.endpoint,
access_key=_config.access_key, secret_key=_config.secret_key)

objects = s3_client.list_objects_v2(_config.bucket_name)
data_out = []
location = "simcore.sandbox"
for obj in objects:
obj_info = {}
obj_info["file_uuid"] = obj.bucket_name + "/" + obj.object_name
obj_info["location"] = location
obj_info["bucket_name"] = obj.bucket_name
obj_info["object_name"] = obj.object_name
obj_info["size"] = obj.size
data_out.append(obj_info)
try:
await SIO.emit("listObjects", data=data_out, room=sid)
except IOError:
log.exception("Error emitting results")
sio.attach(app)

@SIO.on("disconnect")
async def disconnect(sid):
log.debug("client %s disconnected", sid)
try:
await interactive_services_manager.session_disconnected(sid)
except Exception:
log.exception("Error while disconnecting client")

# alias
setup_sockets = setup

def setup_sio(app):
log.debug("Setting up %s ...", __name__)

SIO.attach(app)
__all__ = (
"setup_sockets"
)
110 changes: 110 additions & 0 deletions services/web/server/src/simcore_service_webserver/sockets_handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
""" Defines **async** handlers for socket.io server
SEE https://pypi.python.org/pypi/python-socketio
SEE http://python-socketio.readthedocs.io/en/latest/
"""
# pylint: disable=C0111
# pylint: disable=W0703

import logging

import socketio

from s3wrapper.s3_client import S3Client
from simcore_sdk.config.s3 import Config as s3_config
# TODO: this is the only config that is not part of the schema
# At first sight, adding it would require refactorin how socketio
# is setup and avoid sio as a singleton!

from . import interactive_services_manager

log = logging.getLogger(__file__)

# TODO: separate API from server application!
sio = socketio.AsyncServer(async_mode="aiohttp", logging=log)


@sio.on("connect")
def connect(sid, environ):
# pylint: disable=W0613
# environ = WSGI evnironment dictionary
log.debug("client %s connects", sid)
interactive_services_manager.session_connect(sid)
return True

@sio.on("startDynamic")
async def start_dynamic_service(sid, data):
log.debug("client %s starts dynamic service %s", sid, data)
try:
service_key = data["serviceKey"]
service_version = "latest"
# if "serviceVersion" in data:
# service_version = data["serviceVersion"]
node_id = data["nodeId"]
result = await interactive_services_manager.start_service(sid, service_key, node_id, service_version)
await sio.emit("startDynamic", data=result, room=sid)
except IOError:
log.exception("Error emitting results")
except Exception:
log.exception("Error while starting service")

@sio.on("stopDynamic")
async def stop_dynamic_service(sid, data):
log.debug("client %s stops dynamic service %s", sid, data)
try:
node_id = data["nodeId"]
await interactive_services_manager.stop_service(sid, node_id)
except Exception:
log.exception("Error while stopping service")

@sio.on("presignedUrl")
async def retrieve_url_for_file(sid, data):
log.debug("client %s requests S3 url for %s", sid, data)
_config = s3_config()
log.debug("S3 endpoint %s", _config.endpoint)


s3_client = S3Client(endpoint=_config.endpoint,
access_key=_config.access_key, secret_key=_config.secret_key)
url = s3_client.create_presigned_put_url(_config.bucket_name, data["fileName"])
#result = minioClient.presigned_put_object(data["bucketName"], data["fileName"])
# Response error is still possible since internally presigned does get
# bucket location.
data_out = {}
data_out["url"] = url
try:
await sio.emit("presignedUrl", data=data_out, room=sid)
except IOError:
log.exception("Error emitting results")

@sio.on("listObjects")
async def list_S3_objects(sid, data):
log.debug("client %s requests objects in storage. Extra argument %s", sid, data)
_config = s3_config()

s3_client = S3Client(endpoint=_config.endpoint,
access_key=_config.access_key, secret_key=_config.secret_key)

objects = s3_client.list_objects_v2(_config.bucket_name)
data_out = []
location = "simcore.sandbox"
for obj in objects:
obj_info = {}
obj_info["file_uuid"] = obj.bucket_name + "/" + obj.object_name
obj_info["location"] = location
obj_info["bucket_name"] = obj.bucket_name
obj_info["object_name"] = obj.object_name
obj_info["size"] = obj.size
data_out.append(obj_info)
try:
await sio.emit("listObjects", data=data_out, room=sid)
except IOError:
log.exception("Error emitting results")

@sio.on("disconnect")
async def disconnect(sid):
log.debug("client %s disconnected", sid)
try:
await interactive_services_manager.session_disconnected(sid)
except Exception:
log.exception("Error while disconnecting client")

0 comments on commit 0ff6c01

Please sign in to comment.