Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: add data connectors #407

Merged
merged 11 commits into from
Oct 17, 2024
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ components/renku_data_services/platform/apispec.py: components/renku_data_servic
poetry run datamodel-codegen --input components/renku_data_services/platform/api.spec.yaml --output components/renku_data_services/platform/apispec.py --base-class renku_data_services.platform.apispec_base.BaseAPISpec $(codegen_params)
components/renku_data_services/message_queue/apispec.py: components/renku_data_services/message_queue/api.spec.yaml
poetry run datamodel-codegen --input components/renku_data_services/message_queue/api.spec.yaml --output components/renku_data_services/message_queue/apispec.py --base-class renku_data_services.message_queue.apispec_base.BaseAPISpec $(codegen_params)
components/renku_data_services/data_connectors/apispec.py: components/renku_data_services/data_connectors/api.spec.yaml
poetry run datamodel-codegen --input components/renku_data_services/data_connectors/api.spec.yaml --output components/renku_data_services/data_connectors/apispec.py --base-class renku_data_services.data_connectors.apispec_base.BaseAPISpec $(codegen_params)

##@ Apispec

schemas: components/renku_data_services/crc/apispec.py components/renku_data_services/storage/apispec.py components/renku_data_services/users/apispec.py components/renku_data_services/project/apispec.py components/renku_data_services/namespace/apispec.py components/renku_data_services/secrets/apispec.py components/renku_data_services/connected_services/apispec.py components/renku_data_services/repositories/apispec.py components/renku_data_services/notebooks/apispec.py components/renku_data_services/platform/apispec.py components/renku_data_services/message_queue/apispec.py ## Generate pydantic classes from apispec yaml files
schemas: components/renku_data_services/crc/apispec.py components/renku_data_services/storage/apispec.py components/renku_data_services/users/apispec.py components/renku_data_services/project/apispec.py components/renku_data_services/namespace/apispec.py components/renku_data_services/secrets/apispec.py components/renku_data_services/connected_services/apispec.py components/renku_data_services/repositories/apispec.py components/renku_data_services/notebooks/apispec.py components/renku_data_services/platform/apispec.py components/renku_data_services/message_queue/apispec.py components/renku_data_services/data_connectors/apispec.py ## Generate pydantic classes from apispec yaml files
@echo "generated classes based on ApiSpec"

##@ Avro schemas
Expand Down
30 changes: 28 additions & 2 deletions bases/renku_data_services/background_jobs/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

from renku_data_services.authz.authz import Authz
from renku_data_services.authz.config import AuthzConfig
from renku_data_services.data_connectors.db import DataConnectorProjectLinkRepository, DataConnectorRepository
from renku_data_services.data_connectors.migration_utils import DataConnectorMigrationTool
from renku_data_services.errors import errors
from renku_data_services.message_queue.config import RedisConfig
from renku_data_services.message_queue.db import EventRepository
Expand All @@ -29,6 +31,7 @@ class SyncConfig:
group_repo: GroupRepository
event_repo: EventRepository
project_repo: ProjectRepository
data_connector_migration_tool: DataConnectorMigrationTool
session_maker: Callable[..., AsyncSession]

@classmethod
Expand Down Expand Up @@ -67,7 +70,21 @@ def from_env(cls, prefix: str = "") -> "SyncConfig":
group_repo=group_repo,
authz=Authz(authz_config),
)

data_connector_repo = DataConnectorRepository(
session_maker=session_maker,
authz=Authz(authz_config),
)
data_connector_project_link_repo = DataConnectorProjectLinkRepository(
session_maker=session_maker,
authz=Authz(authz_config),
)
data_connector_migration_tool = DataConnectorMigrationTool(
session_maker=session_maker,
data_connector_repo=data_connector_repo,
data_connector_project_link_repo=data_connector_project_link_repo,
project_repo=project_repo,
authz=Authz(authz_config),
)
user_repo = UserRepo(
session_maker=session_maker,
message_queue=message_queue,
Expand All @@ -89,4 +106,13 @@ def from_env(cls, prefix: str = "") -> "SyncConfig":
client_secret = os.environ[f"{prefix}KEYCLOAK_CLIENT_SECRET"]
realm = os.environ.get(f"{prefix}KEYCLOAK_REALM", "Renku")
kc_api = KeycloakAPI(keycloak_url=keycloak_url, client_id=client_id, client_secret=client_secret, realm=realm)
return cls(syncer, kc_api, authz_config, group_repo, event_repo, project_repo, session_maker)
return cls(
syncer,
kc_api,
authz_config,
group_repo,
event_repo,
project_repo,
data_connector_migration_tool,
session_maker,
)
30 changes: 30 additions & 0 deletions bases/renku_data_services/background_jobs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,3 +264,33 @@ async def migrate_user_namespaces_make_all_public(config: SyncConfig) -> None:
)
await authz.client.WriteRelationships(authz_change)
logger.info(f"Made user namespace {ns_id} public")


async def migrate_storages_v2_to_data_connectors(config: SyncConfig) -> None:
"""Move storages_v2 to data_connectors."""
logger = logging.getLogger("background_jobs").getChild(migrate_storages_v2_to_data_connectors.__name__)

api_user = InternalServiceAdmin(id=ServiceAdminId.migrations)
storages_v2 = await config.data_connector_migration_tool.get_storages_v2(requested_by=api_user)

if not storages_v2:
logger.info("Nothing to do.")
return

logger.info(f"Migrating {len(storages_v2)} cloud storage v2 items to data connectors.")
failed_storages: list[str] = []
for storage in storages_v2:
try:
data_connector = await config.data_connector_migration_tool.migrate_storage_v2(
requested_by=api_user, storage=storage
)
logger.info(f"Migrated {storage.name} to {data_connector.namespace.slug}/{data_connector.slug}.")
logger.info(f"Deleted storage_v2: {storage.storage_id}")
except Exception as err:
logger.error(f"Failed to migrate {storage.name}.")
logger.error(err)
failed_storages.append(str(storage.storage_id))

logger.info(f"Migrated {len(storages_v2)-len(failed_storages)}/{len(storages_v2)} data connectors.")
if failed_storages:
logger.error(f"Migration failed for storages: {failed_storages}.")
2 changes: 2 additions & 0 deletions bases/renku_data_services/background_jobs/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
bootstrap_user_namespaces,
fix_mismatched_project_namespace_ids,
migrate_groups_make_all_public,
migrate_storages_v2_to_data_connectors,
migrate_user_namespaces_make_all_public,
)
from renku_data_services.migrations.core import run_migrations_for_app
Expand All @@ -28,6 +29,7 @@ async def short_period_sync() -> None:
await fix_mismatched_project_namespace_ids(config)
await migrate_groups_make_all_public(config)
await migrate_user_namespaces_make_all_public(config)
await migrate_storages_v2_to_data_connectors(config)


async def long_period_sync() -> None:
Expand Down
19 changes: 11 additions & 8 deletions bases/renku_data_services/data_api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
ResourcePoolUsersBP,
UserResourcePoolsBP,
)
from renku_data_services.data_connectors.blueprints import DataConnectorsBP
from renku_data_services.message_queue.blueprints import SearchBP
from renku_data_services.namespace.blueprints import GroupsBP
from renku_data_services.platform.blueprints import PlatformConfigBP
from renku_data_services.project.blueprints import ProjectsBP
from renku_data_services.repositories.blueprints import RepositoriesBP
from renku_data_services.session.blueprints import EnvironmentsBP, SessionLaunchersBP
from renku_data_services.storage.blueprints import StorageBP, StorageSchemaBP, StoragesV2BP
from renku_data_services.storage.blueprints import StorageBP, StorageSchemaBP
from renku_data_services.users.blueprints import KCUsersBP, UserPreferencesBP, UserSecretsBP


Expand Down Expand Up @@ -75,12 +76,6 @@ def register_all_handlers(app: Sanic, config: Config) -> Sanic:
storage_repo=config.storage_repo,
authenticator=config.gitlab_authenticator,
)
storages_v2 = StoragesV2BP(
name="storages_v2",
url_prefix=url_prefix,
storage_v2_repo=config.storage_v2_repo,
authenticator=config.authenticator,
)
storage_schema = StorageSchemaBP(name="storage_schema", url_prefix=url_prefix)
user_preferences = UserPreferencesBP(
name="user_preferences",
Expand Down Expand Up @@ -153,6 +148,14 @@ def register_all_handlers(app: Sanic, config: Config) -> Sanic:
project_repo=config.project_repo,
authz=config.authz,
)
data_connectors = DataConnectorsBP(
name="data_connectors",
url_prefix=url_prefix,
data_connector_repo=config.data_connector_repo,
data_connector_to_project_link_repo=config.data_connector_to_project_link_repo,
data_connector_secret_repo=config.data_connector_secret_repo,
authenticator=config.authenticator,
)
app.blueprint(
[
resource_pools.blueprint(),
Expand All @@ -163,7 +166,6 @@ def register_all_handlers(app: Sanic, config: Config) -> Sanic:
user_secrets.blueprint(),
user_resource_pools.blueprint(),
storage.blueprint(),
storages_v2.blueprint(),
storage_schema.blueprint(),
user_preferences.blueprint(),
misc.blueprint(),
Expand All @@ -176,6 +178,7 @@ def register_all_handlers(app: Sanic, config: Config) -> Sanic:
repositories.blueprint(),
platform_config.blueprint(),
search.blueprint(),
data_connectors.blueprint(),
]
)

Expand Down
61 changes: 47 additions & 14 deletions components/renku_data_services/app_config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import renku_data_services.base_models as base_models
import renku_data_services.connected_services
import renku_data_services.crc
import renku_data_services.data_connectors
import renku_data_services.platform
import renku_data_services.repositories
import renku_data_services.storage
Expand All @@ -43,6 +44,11 @@
ServerOptionsDefaults,
generate_default_resource_pool,
)
from renku_data_services.data_connectors.db import (
DataConnectorProjectLinkRepository,
DataConnectorRepository,
DataConnectorSecretRepository,
)
from renku_data_services.db_config import DBConfig
from renku_data_services.git.gitlab import DummyGitlabAPI, GitlabAPI
from renku_data_services.k8s.clients import DummyCoreClient, DummySchedulingClient, K8sCoreClient, K8sSchedulingClient
Expand All @@ -57,7 +63,7 @@
from renku_data_services.repositories.db import GitRepositoriesRepository
from renku_data_services.secrets.db import UserSecretsRepo
from renku_data_services.session.db import SessionRepository
from renku_data_services.storage.db import StorageRepository, StorageV2Repository
from renku_data_services.storage.db import StorageRepository
from renku_data_services.users.config import UserPreferencesConfig
from renku_data_services.users.db import UserPreferencesRepository
from renku_data_services.users.db import UserRepo as KcUserRepo
Expand Down Expand Up @@ -162,7 +168,6 @@ class Config:
_user_repo: UserRepository | None = field(default=None, repr=False, init=False)
_rp_repo: ResourcePoolRepository | None = field(default=None, repr=False, init=False)
_storage_repo: StorageRepository | None = field(default=None, repr=False, init=False)
_storage_v2_repo: StorageV2Repository | None = field(default=None, repr=False, init=False)
_project_repo: ProjectRepository | None = field(default=None, repr=False, init=False)
_group_repo: GroupRepository | None = field(default=None, repr=False, init=False)
_event_repo: EventRepository | None = field(default=None, repr=False, init=False)
Expand All @@ -175,6 +180,11 @@ class Config:
_connected_services_repo: ConnectedServicesRepository | None = field(default=None, repr=False, init=False)
_git_repositories_repo: GitRepositoriesRepository | None = field(default=None, repr=False, init=False)
_platform_repo: PlatformRepository | None = field(default=None, repr=False, init=False)
_data_connector_repo: DataConnectorRepository | None = field(default=None, repr=False, init=False)
_data_connector_to_project_link_repo: DataConnectorProjectLinkRepository | None = field(
default=None, repr=False, init=False
)
_data_connector_secret_repo: DataConnectorSecretRepository | None = field(default=None, repr=False, init=False)

def __post_init__(self) -> None:
# NOTE: Read spec files required for Swagger
Expand Down Expand Up @@ -218,6 +228,10 @@ def __post_init__(self) -> None:
with open(spec_file) as f:
search = safe_load(f)

spec_file = Path(renku_data_services.data_connectors.__file__).resolve().parent / "api.spec.yaml"
with open(spec_file) as f:
data_connectors = safe_load(f)

self.spec = merge_api_specs(
crc_spec,
storage_spec,
Expand All @@ -229,6 +243,7 @@ def __post_init__(self) -> None:
repositories,
platform,
search,
data_connectors,
)

if self.default_resource_pool_file is not None:
Expand Down Expand Up @@ -273,18 +288,6 @@ def storage_repo(self) -> StorageRepository:
)
return self._storage_repo

@property
def storage_v2_repo(self) -> StorageV2Repository:
"""The DB adapter for V2 cloud storage configs."""
if not self._storage_v2_repo:
self._storage_v2_repo = StorageV2Repository(
session_maker=self.db.async_session_maker,
project_authz=self.authz,
user_repo=self.kc_user_repo,
secret_service_public_key=self.secrets_service_public_key,
)
return self._storage_v2_repo

@property
def event_repo(self) -> EventRepository:
"""The DB adapter for cloud event configs."""
Expand Down Expand Up @@ -412,6 +415,36 @@ def platform_repo(self) -> PlatformRepository:
)
return self._platform_repo

@property
def data_connector_repo(self) -> DataConnectorRepository:
"""The DB adapter for data connectors."""
if not self._data_connector_repo:
self._data_connector_repo = DataConnectorRepository(
session_maker=self.db.async_session_maker, authz=self.authz
)
return self._data_connector_repo

@property
def data_connector_to_project_link_repo(self) -> DataConnectorProjectLinkRepository:
"""The DB adapter for data connector to project links."""
if not self._data_connector_to_project_link_repo:
self._data_connector_to_project_link_repo = DataConnectorProjectLinkRepository(
session_maker=self.db.async_session_maker, authz=self.authz
)
return self._data_connector_to_project_link_repo

@property
def data_connector_secret_repo(self) -> DataConnectorSecretRepository:
"""The DB adapter for data connector secrets."""
if not self._data_connector_secret_repo:
self._data_connector_secret_repo = DataConnectorSecretRepository(
session_maker=self.db.async_session_maker,
data_connector_repo=self.data_connector_repo,
user_repo=self.kc_user_repo,
secret_service_public_key=self.secrets_service_public_key,
)
return self._data_connector_secret_repo

@classmethod
def from_env(cls, prefix: str = "") -> "Config":
"""Create a config from environment variables."""
Expand Down
Loading
Loading