Skip to content

Commit

Permalink
-added GET & PUT time endpoints, added tests for system, addressed so…
Browse files Browse the repository at this point in the history
…me comments

-added router tests, added response links
-raise error instead of returning error in regular response
  • Loading branch information
sanni-t committed Aug 26, 2020
1 parent f7da768 commit 6526c88
Show file tree
Hide file tree
Showing 9 changed files with 386 additions and 0 deletions.
78 changes: 78 additions & 0 deletions api/src/opentrons/system/time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import asyncio
import logging
from typing import Dict, Tuple, Union
from datetime import datetime, timezone
from opentrons.util.helpers import utc_now

log = logging.getLogger(__name__)


def _str_to_dict(res_str) -> Dict[str, Union[str, bool]]:
res_lines = res_str.splitlines()
res_dict = {}
try:
for line in res_lines:
if line:
prop, val = line.split('=')
res_dict[prop] = val if val not in ['yes', 'no'] \
else val == 'yes' # Convert yes/no to boolean value
except (ValueError, IndexError) as e:
raise Exception("Error converting timedatectl string: {}".format(e))
return res_dict


async def _time_status(loop: asyncio.AbstractEventLoop = None
) -> Dict[str, Union[str, bool]]:
"""
Get details of robot's date & time, with specifics of RTC (if present)
& status of NTP synchronization.
:return: Dictionary of status params
"""
proc = await asyncio.create_subprocess_shell(
'timedatectl show',
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
loop=loop or asyncio.get_event_loop()
)
out, err = await proc.communicate()
return _str_to_dict(out.decode())


async def _set_time(time: str,
loop: asyncio.AbstractEventLoop = None) -> Tuple[str, str]:
proc = await asyncio.create_subprocess_shell(
f'date --utc --set \"{time}\"',
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
loop=loop or asyncio.get_event_loop()
)
out, err = await proc.communicate()
return out.decode(), err.decode()


async def get_system_time(loop: asyncio.AbstractEventLoop = None) -> datetime:
"""
:return: Just the system time as a UTC datetime object
"""
return utc_now()


async def set_system_time(
new_time_dt: datetime,
loop: asyncio.AbstractEventLoop = None
) -> Tuple[datetime, str]:
"""
Set the system time unless system time is already being synchronized using
an RTC or NTPsync.
:return: Tuple specifying current date read and error message, if any.
"""
status = await _time_status(loop)
if status.get('LocalRTC') or status.get('NTPSynchronized'):
# TODO: Update this to handle RTC sync correctly once we introduce RTC
err = 'Cannot set system time; already synchronized with NTP or RTC'
else:
new_time_dt = new_time_dt.astimezone(tz=timezone.utc)
new_time_str = new_time_dt.strftime("%Y-%m-%d %H:%M:%S")
log.info(f'Setting time to {new_time_str} UTC')
out, err = await _set_time(new_time_str)
return utc_now(), err
77 changes: 77 additions & 0 deletions api/tests/opentrons/system/test_time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import pytest
from unittest.mock import MagicMock
from datetime import datetime, timezone
from opentrons.system import time


@pytest.fixture
def mock_status_str():
return "Timezone=Etc/UTC\n" \
"LocalRTC=no\n" \
"CanNTP=yes\n" \
"NTP=yes\n" \
"NTPSynchronized=no\n" \
"TimeUSec=Fri 2020-08-14 21:44:16 UTC\n"

@pytest.fixture
def mock_status_dict():
return {'Timezone': 'Etc/UTC',
'LocalRTC': False,
'CanNTP': True,
'NTP': True,
'NTPSynchronized': False,
'TimeUSec': 'Fri 2020-08-14 21:44:16 UTC'}


@pytest.fixture
def mock_time():
# The above time in datetime
return datetime(2020, 8, 14, 21, 44, 16, tzinfo=timezone.utc)


def test_str_to_dict(mock_status_str, mock_status_dict):
status_dict = time._str_to_dict(mock_status_str)
assert status_dict == mock_status_dict

# Test exception raised for unexpected status string
with pytest.raises(Exception, match="Error converting timedatectl.*"):
not_a_status_str = "Something that's not a timedatectl status"
time._str_to_dict(not_a_status_str)


async def test_set_time_error_response(mock_status_dict):

async def async_mock_time_status(*args, **kwargs):
_stat = mock_status_dict
_stat.update(NTPSynchronized=True)
return _stat

time._time_status = MagicMock(side_effect=async_mock_time_status)

now, err = await time.set_system_time(datetime.now())
assert err == 'Cannot set system time; ' \
'already synchronized with NTP or RTC'


async def test_set_time_response(mock_status_dict, mock_time):

async def async_mock_time_status(*args, **kwargs):
_stat = mock_status_dict
_stat.update(NTPSynchronized=False)
return _stat

async def async_mock_set_time(*args, **kwargs):
return "out", "err"

time._time_status = MagicMock(side_effect=async_mock_time_status)
time._set_time = MagicMock(side_effect=async_mock_set_time)

# System time gets set successfully
time._set_time.assert_not_called()
await time.set_system_time(mock_time)
time._set_time.assert_called_once()

# Datetime is converted to special format with UTC timezone for _set_time
await time.set_system_time(datetime.fromisoformat(
"2020-08-14T16:44:16-05:00")) # from EST
time._set_time.assert_called_with("2020-08-14 21:44:16") # to UTC
4 changes: 4 additions & 0 deletions robot-server/robot_server/service/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from robot_server.service.session.router import router as session_router
from robot_server.service.labware.router import router as labware_router
from robot_server.service.protocol.router import router as protocol_router
from robot_server.service.system.router import router as system_router


log = logging.getLogger(__name__)
Expand Down Expand Up @@ -67,6 +68,9 @@
}
})

app.include_router(router=system_router,
tags=["System Control"])


@app.on_event("startup")
async def on_startup():
Expand Down
Empty file.
23 changes: 23 additions & 0 deletions robot-server/robot_server/service/system/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from robot_server.service.errors import RobotServerError, CommonErrorDef


class SystemException(RobotServerError):
"""Base of all system exceptions"""
pass


class SystemTimeAlreadySynchronized(SystemException):
"""
Cannot update system time because it is already being synchronized
via NTP or local RTC.
"""
def __init__(self, msg: str):
super().__init__(definition=CommonErrorDef.ACTION_FORBIDDEN,
reason=msg)


class SystemSetTimeException(SystemException):
"""Server process Failure"""
def __init__(self, msg: str):
super().__init__(definition=CommonErrorDef.INTERNAL_SERVER_ERROR,
error=msg)
16 changes: 16 additions & 0 deletions robot-server/robot_server/service/system/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from datetime import datetime
from pydantic import BaseModel

from robot_server.service.json_api import ResponseModel, ResponseDataModel, \
RequestDataModel, RequestModel


class SystemTimeAttributes(BaseModel):
systemTime: datetime


SystemTimeResponseDataModel = ResponseDataModel[SystemTimeAttributes]

SystemTimeResponse = ResponseModel[SystemTimeResponseDataModel, dict]

SystemTimeRequest = RequestModel[RequestDataModel[SystemTimeAttributes]]
67 changes: 67 additions & 0 deletions robot-server/robot_server/service/system/router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import logging
from datetime import datetime
from fastapi import APIRouter
from opentrons.system import time
from typing import Dict
from robot_server.service.system import models as time_models
from robot_server.service.system import errors
from robot_server.service.json_api import ResourceLink

router = APIRouter()
log = logging.getLogger(__name__)

"""
These routes allows the client to read & update robot system time
"""


def _create_response(dt: datetime) \
-> time_models.SystemTimeResponse:
"""Create a SystemTimeResponse with system datetime"""
return time_models.SystemTimeResponse(
data=time_models.SystemTimeResponseDataModel.create(
attributes=time_models.SystemTimeAttributes(
systemTime=dt
),
resource_id="time"
),
links=_get_valid_time_links(router)
)


def _get_valid_time_links(api_router: APIRouter) -> Dict[str, ResourceLink]:
""" Get valid links for time resource"""
return {
"GET": ResourceLink(href=api_router.url_path_for(
get_time.__name__)),
"PUT": ResourceLink(href=api_router.url_path_for(
set_time.__name__))
}


@router.get("/system/time",
description="Fetch system time & date",
summary="Get robot's time status, which includes- current UTC "
"date & time, local timezone, whether robot time is synced"
" with an NTP server &/or it has an active RTC.",
response_model=time_models.SystemTimeResponse
)
async def get_time() -> time_models.SystemTimeResponse:
res = await time.get_system_time()
return _create_response(res)


@router.put("/system/time",
description="Update system time",
summary="Set robot time",
response_model=time_models.SystemTimeResponse)
async def set_time(new_time: time_models.SystemTimeRequest) \
-> time_models.SystemTimeResponse:
sys_time, err = await time.set_system_time(
new_time.data.attributes.systemTime)
if err:
if 'already synchronized with NTP or RTC' in err:
raise errors.SystemTimeAlreadySynchronized(err)
else:
raise errors.SystemSetTimeException(err)
return _create_response(sys_time)
Empty file.
Loading

0 comments on commit 6526c88

Please sign in to comment.