Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shareable Kedro-viz backend implementation #1498

Merged
merged 76 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from 67 commits
Commits
Show all changes
76 commits
Select commit Hold shift + click to select a range
5af4d9f
nodes working
rashidakanchwala Feb 17, 2023
283a4f5
added pipeline
rashidakanchwala Feb 17, 2023
96bb1d2
Merge branch 'kedro-viz-save-load' of https://github.com/kedro-org/ke…
rashidakanchwala Feb 17, 2023
7de3f65
fix path
rashidakanchwala Feb 17, 2023
3a023f9
fix bug and lint
rashidakanchwala Feb 17, 2023
3c2b625
fix lint
rashidakanchwala Feb 17, 2023
5da7410
Merge remote-tracking branch 'origin' into kedro-viz-save-load
rashidakanchwala Aug 18, 2023
4762c2f
update responses
rashidakanchwala Aug 18, 2023
4bc45e5
test
rashidakanchwala Aug 18, 2023
6081e85
fix rebase
rashidakanchwala Aug 21, 2023
d044bd1
fixing stuff
rashidakanchwala Aug 21, 2023
b986c85
refactored to latest
rashidakanchwala Sep 1, 2023
d0f7e65
some refactor
rashidakanchwala Sep 1, 2023
703b64c
fix lint - WIP
rashidakanchwala Sep 1, 2023
9ecb395
Merge branch 'main' into shareable-flowchart
rashidakanchwala Sep 4, 2023
6b01350
refactor work
rashidakanchwala Sep 4, 2023
8b91f7e
Merge branch 'shareable-flowchart' of https://github.com/kedro-org/ke…
rashidakanchwala Sep 4, 2023
30e5f8a
fix lint
rashidakanchwala Sep 4, 2023
fd75407
further refactor
rashidakanchwala Sep 4, 2023
480bd51
add error handling and debugging
rashidakanchwala Sep 5, 2023
e94f0ec
fix based on review
rashidakanchwala Sep 5, 2023
a6ecd7c
modify upload static files logic
ravi-kumar-pilla Sep 5, 2023
1232e92
Merge branch 'shareable-flowchart' of https://github.com/kedro-org/ke…
ravi-kumar-pilla Sep 5, 2023
737af74
refactor upload api with latest fsspec
ravi-kumar-pilla Sep 5, 2023
9e24189
fix unit tests_1
ravi-kumar-pilla Sep 5, 2023
ef4617f
revert os logic to pathlib
rashidakanchwala Sep 6, 2023
9e6d9a5
fix static folder issue
rashidakanchwala Sep 6, 2023
5b0a860
fix format and lint errors
ravi-kumar-pilla Sep 6, 2023
a6781ac
add unit tests for shareable viz s3deployer
ravi-kumar-pilla Sep 7, 2023
6d28f6b
add pytests for responses module
ravi-kumar-pilla Sep 7, 2023
16673a4
add s3fs as dependency
ravi-kumar-pilla Sep 7, 2023
605c450
add temporary no cover for apps
ravi-kumar-pilla Sep 7, 2023
ad8d089
update lower reqs
rashidakanchwala Sep 8, 2023
ae2992a
update fsspec
rashidakanchwala Sep 8, 2023
52a2a8a
check kedro latest version as 18.0 in e2e tests
ravi-kumar-pilla Sep 8, 2023
a0e7052
update fsspec and s3fs requirements to support earliest kedro version
ravi-kumar-pilla Sep 8, 2023
0046755
add timestamp file for deploy
ravi-kumar-pilla Sep 11, 2023
ef093d2
merged main
ravi-kumar-pilla Sep 11, 2023
77e9ad4
add pytest for timestamp route
ravi-kumar-pilla Sep 11, 2023
bf9f106
fix lint and format errors
ravi-kumar-pilla Sep 11, 2023
c300d74
Merge branch 'main' into shareable-flowchart
tynandebold Sep 13, 2023
2dab4c0
fix server changes and test e2e scenarios
ravi-kumar-pilla Sep 13, 2023
031c59a
try to catch versionConflicterror
rashidakanchwala Sep 14, 2023
a6e7ef0
add route /api/project-metadata to provide package version info
ravi-kumar-pilla Sep 14, 2023
7854c6a
remove frontend build for backend unit tests
ravi-kumar-pilla Sep 14, 2023
d162a8f
remove s3fs requirement to test
ravi-kumar-pilla Sep 14, 2023
1415f00
add s3fs without specific version
ravi-kumar-pilla Sep 14, 2023
3533925
adjust requirements and add pytests for project metadata
ravi-kumar-pilla Sep 15, 2023
455eeec
test open s3fs requirement
ravi-kumar-pilla Sep 15, 2023
948d420
test open s3fs requirement
ravi-kumar-pilla Sep 15, 2023
aa2d5e4
test open s3fs requirement
ravi-kumar-pilla Sep 15, 2023
b401f2f
add version info and modify route name from /api/timestamp to /api/de…
ravi-kumar-pilla Sep 15, 2023
4174d43
add pytests for updated api
ravi-kumar-pilla Sep 15, 2023
008624d
Merge branch 'main' into shareable-flowchart
rashidakanchwala Sep 18, 2023
de88895
undo all new requirements
rashidakanchwala Sep 18, 2023
74132c6
undo all fsspec changes
rashidakanchwala Sep 18, 2023
44681d4
added s3fs as dependency
rashidakanchwala Sep 18, 2023
16a01e5
Merge branch 'shareable-flowchart' of https://github.com/kedro-org/ke…
rashidakanchwala Sep 18, 2023
08551e3
fix unit tests
rashidakanchwala Sep 18, 2023
b8720b4
clean up tests
rashidakanchwala Sep 18, 2023
13f4425
lint
rashidakanchwala Sep 18, 2023
d69f7c5
fix lint
rashidakanchwala Sep 18, 2023
f26a53e
fix test and lint and compatibility response
rashidakanchwala Sep 18, 2023
a6ea86e
add packaging
rashidakanchwala Sep 18, 2023
269ed26
packaging reqs
rashidakanchwala Sep 18, 2023
72a7e18
fix api endpoint
rashidakanchwala Sep 20, 2023
4cea3e9
fixes based on reviews
rashidakanchwala Sep 20, 2023
c3de82f
Merge branch 'main' into shareable-flowchart
rashidakanchwala Sep 22, 2023
33f568d
changes based on reviews
rashidakanchwala Sep 22, 2023
890f92e
Merge branch 'shareable-flowchart' of https://github.com/kedro-org/ke…
rashidakanchwala Sep 22, 2023
3809add
fix lint
rashidakanchwala Sep 22, 2023
e912b92
fixes based on review
rashidakanchwala Sep 26, 2023
d2de353
updated cli help definition
rashidakanchwala Sep 26, 2023
29a0fab
update filpath to directory
rashidakanchwala Sep 26, 2023
76bd6dd
update filpath to directory
rashidakanchwala Sep 26, 2023
635634b
add s3 protocol in the backend
rashidakanchwala Sep 27, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,4 @@ coverage.xml

# Kedro
*.log

1 change: 1 addition & 0 deletions package/features/steps/lower_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ aiofiles==22.1.0
uvicorn[standard]==0.22.0
watchgod==0.8.2
plotly==4.0
packaging==23.0
pandas==1.3; python_version < '3.10'
pandas==1.5; python_version >= '3.10'
sqlalchemy==1.4
Expand Down
22 changes: 20 additions & 2 deletions package/kedro_viz/api/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def create_api_app_from_project(
# frontend e2e tests via Cypress
app.mount("/static", StaticFiles(directory=_HTML_DIR / "static"), name="static")

# everytime the server reloads, a new app with a new timestamp will be created.
# every time the server reloads, a new app with a new timestamp will be created.
# this is used as an etag embedded in the frontend for client to use when making requests.
app_etag = _create_etag()

Expand Down Expand Up @@ -131,6 +131,24 @@ async def index():

@app.get("/api/main", response_class=JSONResponse)
async def main():
return json.loads(Path(filepath).read_text(encoding="utf8"))
return json.loads((Path(filepath) / "main").read_text(encoding="utf8"))

@app.get("/api/nodes/{node_id}", response_class=JSONResponse)
async def get_node_metadata(node_id):
return json.loads( # pragma: no cover
(Path(filepath) / "nodes" / node_id).read_text(encoding="utf8")
)

@app.get("/api/pipelines/{pipeline_id}", response_class=JSONResponse)
async def get_registered_pipeline(pipeline_id):
return json.loads( # pragma: no cover
(Path(filepath) / "pipelines" / pipeline_id).read_text(encoding="utf8")
)

@app.get("/api/deploy-viz-metadata", response_class=JSONResponse)
async def get_deployed_viz_metadata():
return json.loads( # pragma: no cover
(Path(filepath) / "deploy-viz-metadata").read_text(encoding="utf8")
)

return app
9 changes: 9 additions & 0 deletions package/kedro_viz/api/rest/requests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
"""`kedro_viz.api.rest.requests` defines REST request types."""
from pydantic import BaseModel


class S3DeployerConfiguration(BaseModel):
"""Credentials for S3 Deployer."""

region: str
bucket_name: str
211 changes: 197 additions & 14 deletions package/kedro_viz/api/rest/responses.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,33 @@
"""`kedro_viz.api.rest.responses` defines REST response types."""
# pylint: disable=missing-class-docstring,too-few-public-methods,invalid-name
import abc
import logging
from typing import Any, Dict, List, Optional, Union

import fsspec
import orjson
from fastapi.responses import ORJSONResponse
import packaging
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse, ORJSONResponse
from kedro.io.core import get_protocol_and_path
from pydantic import BaseModel

from kedro_viz.api.rest.utils import get_package_version
from kedro_viz.data_access import data_access_manager
from kedro_viz.models.flowchart import (
DataNode,
DataNodeMetadata,
ParametersNodeMetadata,
TaskNode,
TaskNodeMetadata,
TranscodedDataNode,
TranscodedDataNodeMetadata,
)

logger = logging.getLogger(__name__)

_FSSPEC_PACKAGE_NAME = "fsspec"
_FSSPEC_COMPATIBLE_VERSION = "2023.9.0"


class APIErrorMessage(BaseModel):
Expand Down Expand Up @@ -244,6 +264,39 @@ class GraphAPIResponse(BaseAPIResponse):
selected_pipeline: str


class PackageCompatibilityAPIResponse(BaseAPIResponse):
package_name: str
package_version: str
is_compatible: bool

class Config:
schema_extra = {
"example": {
"package_name": "fsspec",
"package_version": "2023.9.1",
"is_compatible": True,
}
}


class EnhancedORJSONResponse(ORJSONResponse):
@staticmethod
def encode_to_human_readable(content: Any) -> bytes:
"""A method to encode the given content to JSON, with the
proper formatting to write a human-readable file.

Returns:
A bytes object containing the JSON to write.

"""
return orjson.dumps(
content,
option=orjson.OPT_INDENT_2
| orjson.OPT_NON_STR_KEYS
| orjson.OPT_SERIALIZE_NUMPY,
)


def get_default_response() -> GraphAPIResponse:
"""Default response for `/api/main`."""
default_selected_pipeline_id = (
Expand Down Expand Up @@ -273,19 +326,149 @@ def get_default_response() -> GraphAPIResponse:
)


class EnhancedORJSONResponse(ORJSONResponse):
@staticmethod
def encode_to_human_readable(content: Any) -> bytes:
"""A method to encode the given content to JSON, with the
proper formatting to write a human-readable file.
def get_node_metadata_response(node_id: str):
"""API response for `/api/nodes/node_id`."""
node = data_access_manager.nodes.get_node_by_id(node_id)
if not node:
return JSONResponse(status_code=404, content={"message": "Invalid node ID"})

Returns:
A bytes object containing the JSON to write.
if not node.has_metadata():
return JSONResponse(content={})

"""
return orjson.dumps(
content,
option=orjson.OPT_INDENT_2
| orjson.OPT_NON_STR_KEYS
| orjson.OPT_SERIALIZE_NUMPY,
if isinstance(node, TaskNode):
return TaskNodeMetadata(node)

if isinstance(node, DataNode):
return DataNodeMetadata(node)

if isinstance(node, TranscodedDataNode):
return TranscodedDataNodeMetadata(node)

return ParametersNodeMetadata(node)


def get_selected_pipeline_response(registered_pipeline_id: str):
"""API response for `/api/pipeline/pipeline_id`."""
if not data_access_manager.registered_pipelines.has_pipeline(
registered_pipeline_id
):
return JSONResponse(status_code=404, content={"message": "Invalid pipeline ID"})

modular_pipelines_tree = (
data_access_manager.create_modular_pipelines_tree_for_registered_pipeline(
registered_pipeline_id
)
)

return GraphAPIResponse(
nodes=data_access_manager.get_nodes_for_registered_pipeline( # type: ignore
registered_pipeline_id
),
edges=data_access_manager.get_edges_for_registered_pipeline( # type: ignore
registered_pipeline_id
),
tags=data_access_manager.tags.as_list(),
layers=data_access_manager.get_sorted_layers_for_registered_pipeline(
registered_pipeline_id
),
pipelines=data_access_manager.registered_pipelines.as_list(),
selected_pipeline=registered_pipeline_id,
modular_pipelines=modular_pipelines_tree, # type: ignore
)


def get_package_compatibilities_response():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be very fsspec specific, so I'd include that in the name to avoid confusion. Should this be "public" method? Is anything accessing this directly or will it only be called from within the viz backend?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true, the reason I have kept this generic is incase we have more package compatibilties to test in the future.

"""API response for `/api/package_compatibility`."""
package_name = _FSSPEC_PACKAGE_NAME
package_version = get_package_version(package_name)
is_compatible = packaging.version.parse(package_version) >= packaging.version.parse(
_FSSPEC_COMPATIBLE_VERSION
)
return PackageCompatibilityAPIResponse(
package_name=package_name,
package_version=package_version,
is_compatible=is_compatible,
)


def write_api_response_to_fs(file_path: str, response: Any, remote_fs: Any):
"""Encodes, enhances responses and writes it to a file"""
jsonable_response = jsonable_encoder(response)
encoded_response = EnhancedORJSONResponse.encode_to_human_readable(
jsonable_response
)

with remote_fs.open(file_path, "wb") as file:
file.write(encoded_response)

Comment on lines +399 to +405
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was a bit surprised it is using "wb" instead of "w". I expect the result of encode_to_human_readable should be text that is readable, but seems like it is returning bytes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it's currently returning bytes. I will have to create an issue to figure the best way to make this more clearer. But it would be out of scope for the current one.


def save_api_main_response_to_fs(main_loc: str, remote_fs: Any):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

main_loc is a slightly vague variable name, I'd name this differently and also add some doc string to describe what this is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps main_path?

"""Saves API /main response to a file."""
try:
write_api_response_to_fs(main_loc, get_default_response(), remote_fs)
except Exception as exc: # pragma: no cover
logger.exception("Failed to save default response. Error: %s", str(exc))
raise exc


def save_api_node_response_to_fs(nodes_loc: str, remote_fs: Any):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd change loc to path here as well if it makes sense to you.

"""Saves API /nodes/{node} response to a file."""
for nodeId in data_access_manager.nodes.get_node_ids():
try:
write_api_response_to_fs(
f"{nodes_loc}/{nodeId}", get_node_metadata_response(nodeId), remote_fs
)
except Exception as exc: # pragma: no cover
logger.exception(
"Failed to save node data for node ID %s. Error: %s", nodeId, str(exc)
)
raise exc


def save_api_pipeline_response_to_fs(pipelines_loc: str, remote_fs: Any):
"""Saves API /pipelines/{pipeline} response to a file."""
for pipelineId in data_access_manager.registered_pipelines.get_pipeline_ids():
try:
write_api_response_to_fs(
f"{pipelines_loc}/{pipelineId}",
get_selected_pipeline_response(pipelineId),
remote_fs,
)
except Exception as exc: # pragma: no cover
logger.exception(
"Failed to save pipeline data for pipeline ID %s. Error: %s",
pipelineId,
str(exc),
)
raise exc


def save_api_responses_to_fs(filepath: str):
"""Saves all Kedro Viz API responses to a file."""
try:
protocol, path = get_protocol_and_path(filepath)
remote_fs = fsspec.filesystem(protocol)

logger.debug(
"""Saving/Uploading api files to %s""",
filepath,
)

main_loc = f"{path}/api/main"
nodes_loc = f"{path}/api/nodes"
pipelines_loc = f"{path}/api/pipelines"

if protocol == "file":
remote_fs.makedirs(path, exist_ok=True)
remote_fs.makedirs(nodes_loc, exist_ok=True)
remote_fs.makedirs(pipelines_loc, exist_ok=True)

save_api_main_response_to_fs(main_loc, remote_fs)
save_api_node_response_to_fs(nodes_loc, remote_fs)
save_api_pipeline_response_to_fs(pipelines_loc, remote_fs)

except Exception as exc: # pragma: no cover
logger.exception(
"An error occurred while preparing data for saving. Error: %s", str(exc)
)
raise exc
Loading