Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement FileStatus.uri property #1648

Merged
merged 10 commits into from
Aug 5, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.D/1648.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implement `FileStatus.uri` property
asvetlov marked this conversation as resolved.
Show resolved Hide resolved
36 changes: 28 additions & 8 deletions neuromation/api/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class FileStatus:
type: FileStatusType
modification_time: int
permission: Action
uri: URL

def is_file(self) -> bool:
return self.type == FileStatusType.FILE
Expand All @@ -93,10 +94,16 @@ def __init__(self, core: _Core, config: Config) -> None:
self._min_time_diff = 0.0
self._max_time_diff = 0.0

def _uri_to_path(self, uri: URL) -> str:
uri = normalize_storage_path_uri(
def _normalize_uri(self, uri: URL) -> URL:
return normalize_storage_path_uri(
uri, self._config.username, self._config.cluster_name
)

def _uri_to_path(self, uri: URL, *, normalized: bool = False) -> str:
if not normalized:
uri = normalize_storage_path_uri(
uri, self._config.username, self._config.cluster_name
)
if not uri.host:
return ""
if uri.host != self._config.cluster_name:
Expand Down Expand Up @@ -133,22 +140,25 @@ def _is_remote_modified(self, local: os.stat_result, remote: FileStatus) -> bool
)

async def ls(self, uri: URL) -> AsyncIterator[FileStatus]:
url = self._config.storage_url / self._uri_to_path(uri)
uri = self._normalize_uri(uri)
url = self._config.storage_url / self._uri_to_path(uri, normalized=True)
url = url.with_query(op="LISTSTATUS")
headers = {"Accept": "application/x-ndjson"}

request_time = time.time()
auth = await self._config._api_auth()
# NB: the storage server returns file names in FileStatus for LISTSTATUS
# but full path for GETFILESTATUS
async with self._core.request("GET", url, headers=headers, auth=auth) as resp:
self._set_time_diff(request_time, resp)
if resp.headers.get("Content-Type", "").startswith("application/x-ndjson"):
async for line in resp.content:
status = json.loads(line)["FileStatus"]
yield _file_status_from_api(status)
yield _file_status_from_api(uri, status)
else:
res = await resp.json()
for status in res["FileStatuses"]["FileStatus"]:
yield _file_status_from_api(status)
yield _file_status_from_api(uri, status)

async def glob(self, uri: URL, *, dironly: bool = False) -> AsyncIterator[URL]:
if not _has_magic(uri.path):
Expand Down Expand Up @@ -254,15 +264,18 @@ async def create(self, uri: URL, data: Union[bytes, AsyncIterator[bytes]]) -> No
resp # resp.status == 201

async def stat(self, uri: URL) -> FileStatus:
url = self._config.storage_url / self._uri_to_path(uri)
uri = self._normalize_uri(uri)
url = self._config.storage_url / self._uri_to_path(uri, normalized=True)
url = url.with_query(op="GETFILESTATUS")
auth = await self._config._api_auth()

request_time = time.time()
# NB: the storage server returns file names in FileStatus for LISTSTATUS
# but full path for GETFILESTATUS
async with self._core.request("GET", url, auth=auth) as resp:
self._set_time_diff(request_time, resp)
res = await resp.json()
return _file_status_from_api(res["FileStatus"])
return _file_status_from_api(None, res["FileStatus"])

async def open(self, uri: URL) -> AsyncIterator[bytes]:
url = self._config.storage_url / self._uri_to_path(uri)
Expand Down Expand Up @@ -693,13 +706,20 @@ def _isrecursive(pattern: str) -> bool:
return pattern == "**"


def _file_status_from_api(values: Dict[str, Any]) -> FileStatus:
def _file_status_from_api(
base_uri: Optional[URL], values: Dict[str, Any]
) -> FileStatus:
if base_uri is None:
uri = URL("storage://" + values["path"].lstrip("/"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the path contains characters %, ? or #?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean %?# in cluster or user name?
Is it allowed at all?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently they are not allowed in cluster name and user name. But user can create files with arbitrary names.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got you.
@serhiy-storchaka please review the updated version

else:
uri = base_uri / values["path"]
return FileStatus(
path=values["path"],
type=FileStatusType(values["type"]),
size=int(values["length"]),
modification_time=int(values["modificationTime"]),
permission=Action(values["permission"]),
uri=uri,
)


Expand Down
42 changes: 41 additions & 1 deletion tests/api/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,15 @@ async def handler(request: web.Request) -> web.Response:
type=FileStatusType.FILE,
modification_time=0,
permission=Action.READ,
uri=URL("storage://default/user/folder/foo"),
),
FileStatus(
path="bar",
size=4 * 1024,
type=FileStatusType.DIRECTORY,
modification_time=0,
permission=Action.READ,
uri=URL("storage://default/user/folder/bar"),
),
]

Expand Down Expand Up @@ -231,13 +233,15 @@ async def handler(request: web.Request) -> web.StreamResponse:
type=FileStatusType.FILE,
modification_time=0,
permission=Action.READ,
uri=URL("storage://default/user/folder/foo"),
),
FileStatus(
path="bar",
size=4 * 1024,
type=FileStatusType.DIRECTORY,
modification_time=0,
permission=Action.READ,
uri=URL("storage://default/user/folder/bar"),
),
]

Expand Down Expand Up @@ -666,6 +670,42 @@ async def handler(request: web.Request) -> web.Response:
size=1234,
modification_time=3456,
permission=Action.READ,
uri=URL("storage://default/user/folder"),
)


async def test_storage_stats_user_home(
aiohttp_server: _TestServerFactory, make_client: _MakeClient
) -> None:
async def handler(request: web.Request) -> web.Response:
assert request.path == "/storage/user"
assert request.query == {"op": "GETFILESTATUS"}
return web.json_response(
{
"FileStatus": {
"path": "/default/user",
"type": "DIRECTORY",
"length": 1234,
"modificationTime": 3456,
"permission": "read",
}
}
)

app = web.Application()
app.router.add_get("/storage/user", handler)

srv = await aiohttp_server(app)

async with make_client(srv.make_url("/")) as client:
stats = await client.storage.stat(URL("storage:"))
assert stats == FileStatus(
path="/default/user",
type=FileStatusType.DIRECTORY,
size=1234,
modification_time=3456,
permission=Action.READ,
uri=URL("storage://default/user"),
)


Expand All @@ -684,7 +724,7 @@ async def handler(request: web.Request) -> web.StreamResponse:
return web.json_response(
{
"FileStatus": {
"path": "/user/file",
"path": "file",
"type": "FILE",
"length": 5,
"modificationTime": 3456,
Expand Down
28 changes: 24 additions & 4 deletions tests/api/test_storage_filestatus.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
from yarl import URL

from neuromation.api import FileStatusType
from neuromation.api.storage import _file_status_from_api


def test_from_api() -> None:
stat = _file_status_from_api(
URL("storage://default/user/foo"),
{
"path": "name",
"type": "FILE",
"length": 1234,
"modificationTime": 3456,
"permission": "read",
}
},
)
assert stat.path == "name"
assert stat.type == FileStatusType.FILE
Expand All @@ -21,13 +24,14 @@ def test_from_api() -> None:

def test_file() -> None:
stat = _file_status_from_api(
URL("storage://default/user/foo"),
{
"path": "name",
"type": "FILE",
"length": 1234,
"modificationTime": 3456,
"permission": "read",
}
},
)
assert stat.type == FileStatusType.FILE
assert stat.is_file()
Expand All @@ -36,13 +40,14 @@ def test_file() -> None:

def test_is_dir() -> None:
stat = _file_status_from_api(
URL("storage://default/user/foo"),
{
"path": "name",
"type": "DIRECTORY",
"length": 1234,
"modificationTime": 3456,
"permission": "read",
}
},
)
assert stat.type == FileStatusType.DIRECTORY
assert not stat.is_file()
Expand All @@ -51,12 +56,27 @@ def test_is_dir() -> None:

def test_name() -> None:
stat = _file_status_from_api(
URL("storage://default/user/foo"),
{
"path": "name",
"type": "FILE",
"length": 1234,
"modificationTime": 3456,
"permission": "read",
}
},
)
assert stat.name == "name"


def test_uri() -> None:
stat = _file_status_from_api(
URL("storage://default/user/foo"),
{
"path": "name",
"type": "FILE",
"length": 1234,
"modificationTime": 3456,
"permission": "read",
},
)
assert stat.uri == URL("storage://default/user/foo/name")
Loading