Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Call integration tests via OIDC #378

Merged
merged 2 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 45 additions & 4 deletions .github/workflows/push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@ on:
branches:
- main

permissions:
id-token: write
contents: read
pull-requests: write

env:
HATCH_VERSION: 1.7.0

jobs:
ci:
strategy:
matrix:
pyVersion: [ '3.10' ]
runs-on: ubuntu-latest
steps:
- name: Checkout
Expand All @@ -35,7 +37,7 @@ jobs:
with:
cache: 'pip'
cache-dependency-path: '**/pyproject.toml'
python-version: ${{ matrix.pyVersion }}
python-version: '3.10'

- name: Install hatch
run: pip install hatch==$HATCH_VERSION
Expand All @@ -46,6 +48,45 @@ jobs:
- name: Publish test coverage
uses: codecov/codecov-action@v1

integration:
if: github.event_name == 'pull_request'
environment: account-admin
runs-on: ubuntu-latest
steps:
- name: Checkout Code
uses: actions/[email protected]

- name: Unshallow
run: git fetch --prune --unshallow

- name: Install Python
uses: actions/setup-python@v4
with:
cache: 'pip'
cache-dependency-path: '**/pyproject.toml'
python-version: '3.10'

- name: Install hatch
run: pip install hatch==$HATCH_VERSION

- uses: azure/login@v1
with:
client-id: ${{ secrets.ARM_CLIENT_ID }}
tenant-id: ${{ secrets.ARM_TENANT_ID }}
subscription-id: ${{ secrets.ARM_SUBSCRIPTION_ID }}

- name: Run integration tests
run: hatch run integration:test
env:
CLOUD_ENV: "${{ vars.CLOUD_ENV }}"
DATABRICKS_HOST: "${{ secrets.DATABRICKS_HOST }}"
DATABRICKS_ACCOUNT_ID: "${{ secrets.DATABRICKS_ACCOUNT_ID }}"
DATABRICKS_CLUSTER_ID: "${{ vars.DATABRICKS_CLUSTER_ID }}"
TEST_DEFAULT_CLUSTER_ID: "${{ vars.TEST_DEFAULT_CLUSTER_ID }}"
TEST_DEFAULT_WAREHOUSE_ID: "${{ vars.TEST_DEFAULT_WAREHOUSE_ID }}"
TEST_INSTANCE_POOL_ID: "${{ vars.TEST_INSTANCE_POOL_ID }}"
TEST_LEGACY_TABLE_ACL_CLUSTER_ID: "${{ vars.TEST_LEGACY_TABLE_ACL_CLUSTER_ID }}"

fmt:
runs-on: ubuntu-latest
steps:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ branch = true
parallel = true

[tool.coverage.report]
omit = ["src/databricks/labs/ucx/mixins/*", "*/working-copy/*"]
omit = ["src/databricks/labs/ucx/mixins/*", "*/working-copy/*", "*/fresh_wheel_file/*"]
exclude_lines = [
"no cov",
"if __name__ == .__main__.:",
Expand Down
23 changes: 4 additions & 19 deletions src/databricks/labs/ucx/framework/crawlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,29 +46,14 @@ def _schema_for(cls, klass):
return ", ".join(fields)

@classmethod
def _filter_none_rows(cls, rows, full_name):
def _filter_none_rows(cls, rows):
if len(rows) == 0:
return rows

results = []
nullable_fields = set()

for field in dataclasses.fields(rows[0]):
if field.default is None:
nullable_fields.add(field.name)

for row in rows:
if row is None:
continue
row_contains_none = False
for column, value in dataclasses.asdict(row).items():
if value is None and column not in nullable_fields:
logger.warning(f"[{full_name}] Field {column} is None, filtering row")
row_contains_none = True
break

if not row_contains_none:
results.append(row)
results.append(row)
return results


Expand All @@ -90,7 +75,7 @@ def save_table(self, full_name: str, rows: list[any], klass: dataclasses.datacla
if mode == "overwrite":
msg = "Overwrite mode is not yet supported"
raise NotImplementedError(msg)
rows = self._filter_none_rows(rows, full_name)
rows = self._filter_none_rows(rows)
self.create_table(full_name, klass)
if len(rows) == 0:
return
Expand Down Expand Up @@ -141,7 +126,7 @@ def fetch(self, sql) -> Iterator[any]:
return self._spark.sql(sql).collect()

def save_table(self, full_name: str, rows: list[any], klass: dataclasses.dataclass, mode: str = "append"):
rows = self._filter_none_rows(rows, full_name)
rows = self._filter_none_rows(rows)

if len(rows) == 0:
self.create_table(full_name, klass)
Expand Down
4 changes: 4 additions & 0 deletions src/databricks/labs/ucx/framework/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from concurrent.futures import ThreadPoolExecutor
from typing import Generic, TypeVar

MIN_THREADS = 8

Result = TypeVar("Result")
logger = logging.getLogger(__name__)

Expand All @@ -27,6 +29,8 @@ def __init__(self, name, tasks: list[Callable[..., Result]], num_threads: int):
@classmethod
def gather(cls, name: str, tasks: list[Callable[..., Result]]) -> (list[Result], list[Exception]):
num_threads = os.cpu_count() * 2
if num_threads < MIN_THREADS:
num_threads = MIN_THREADS
return cls(name, tasks, num_threads=num_threads)._run()

def _run(self) -> (list[Result], list[Exception]):
Expand Down
4 changes: 3 additions & 1 deletion src/databricks/labs/ucx/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,9 @@ def _upload_wheel(self) -> str:

def _job_settings(self, step_name: str, dbfs_path: str):
email_notifications = None
if "@" in self._my_username:
if not self._override_clusters and "@" in self._my_username:
# set email notifications only if we're running the real
# installation and not the integration test.
email_notifications = jobs.JobEmailNotifications(
on_success=[self._my_username], on_failure=[self._my_username]
)
Expand Down
14 changes: 11 additions & 3 deletions src/databricks/labs/ucx/workspace_access/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class WorkspaceObjectInfo:
object_type: str
object_id: str
path: str
language: str
language: str = None


class RetryableError(DatabricksError):
Expand Down Expand Up @@ -245,8 +245,16 @@ def _crawl(self) -> list[WorkspaceObjectInfo]:
from databricks.labs.ucx.workspace_access.listing import WorkspaceListing

ws_listing = WorkspaceListing(self._ws, num_threads=self._num_threads, with_directories=False)
for _object in ws_listing.walk(self._start_path):
yield WorkspaceObjectInfo(_object.object_type.name, str(_object.object_id), _object.path, _object.language)
for obj in ws_listing.walk(self._start_path):
if obj is None:
continue
raw = obj.as_dict()
yield WorkspaceObjectInfo(
object_type=raw["object_type"],
object_id=str(raw["object_id"]),
path=raw["path"],
language=raw.get("language", None),
)

def snapshot(self) -> list[WorkspaceObjectInfo]:
return self._snapshot(self._try_fetch, self._crawl)
Expand Down
4 changes: 3 additions & 1 deletion src/databricks/labs/ucx/workspace_access/redash.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import dataclasses
import json
import logging
import time
from collections.abc import Callable
from dataclasses import dataclass
Expand All @@ -14,10 +15,11 @@
AclSupport,
Destination,
Permissions,
logger,
)
from databricks.labs.ucx.workspace_access.groups import GroupMigrationState

logger = logging.getLogger(__name__)


@dataclass
class SqlPermissionsInfo:
Expand Down
4 changes: 2 additions & 2 deletions src/databricks/labs/ucx/workspace_access/scim.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import logging
import time
from functools import partial
from logging import Logger

from databricks.sdk import WorkspaceClient
from databricks.sdk.core import DatabricksError
Expand All @@ -16,7 +16,7 @@
)
from databricks.labs.ucx.workspace_access.groups import GroupMigrationState

logger = Logger(__name__)
logger = logging.getLogger(__name__)


class ScimSupport(AclSupport):
Expand Down
41 changes: 15 additions & 26 deletions src/databricks/labs/ucx/workspace_access/secrets.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import json
import logging
import random
import time
from datetime import timedelta
from functools import partial

from databricks.sdk import WorkspaceClient
from databricks.sdk.retries import retried
from databricks.sdk.service import workspace

from databricks.labs.ucx.mixins.hardening import rate_limited
Expand All @@ -19,8 +19,11 @@


class SecretScopesSupport(AclSupport):
def __init__(self, ws: WorkspaceClient):
def __init__(self, ws: WorkspaceClient, verify_timeout: timedelta | None = None):
self._ws = ws
if verify_timeout is None:
verify_timeout = timedelta(minutes=1)
self._verify_timeout = verify_timeout

def get_crawler_tasks(self):
scopes = self._ws.secrets.list_scopes()
Expand Down Expand Up @@ -77,32 +80,18 @@ def secret_scope_permission(self, scope_name: str, group_name: str) -> workspace
return acl.permission
return None

def _inflight_check(
self, scope_name: str, group_name: str, expected_permission: workspace.AclPermission, num_retries: int = 5
):
def _inflight_check(self, scope_name: str, group_name: str, expected_permission: workspace.AclPermission):
# in-flight check for the applied permissions
# the api might be inconsistent, therefore we need to check that the permissions were applied
# TODO: add mixin to SDK
retries_left = num_retries
while retries_left > 0:
time.sleep(random.random() * 2)
applied_permission = self.secret_scope_permission(scope_name=scope_name, group_name=group_name)
if applied_permission:
if applied_permission == expected_permission:
return
else:
msg = (
f"Applied permission {applied_permission} is not "
f"equal to expected permission {expected_permission}"
)
raise ValueError(msg)

retries_left -= 1

msg = f"Failed to apply permissions for {group_name} on scope {scope_name} in {num_retries} retries"
raise ValueError(msg)
applied_permission = self.secret_scope_permission(scope_name, group_name)
if applied_permission != expected_permission:
msg = f"Applied permission {applied_permission} is not equal to expected permission {expected_permission}"
raise ValueError(msg)
return True

@rate_limited(max_requests=30)
def _rate_limited_put_acl(self, object_id: str, principal: str, permission: workspace.AclPermission):
self._ws.secrets.put_acl(object_id, principal, permission)
self._inflight_check(scope_name=object_id, group_name=principal, expected_permission=permission)
retry_on_value_error = retried(on=[ValueError], timeout=self._verify_timeout)
retried_check = retry_on_value_error(self._inflight_check)
retried_check(object_id, principal, permission)
48 changes: 14 additions & 34 deletions tests/integration/assessment/test_assessment.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,26 +120,17 @@ def test_spn_crawler_no_config(ws, inventory_schema, make_job, make_pipeline, sq
make_pipeline()
make_cluster(single_node=True)
spn_crawler = AzureServicePrincipalCrawler(ws=ws, sbe=sql_backend, schema=inventory_schema)
spns = spn_crawler.snapshot()
results = []
for spn in spns:
results.append(spn)

assert len(results) >= 1
spn_crawler.snapshot()


def test_spn_crawler_with_pipeline_unavlbl_secret(ws, inventory_schema, make_job, make_pipeline, sql_backend):
def test_spn_crawler_with_pipeline_unavailable_secret(ws, inventory_schema, make_job, make_pipeline, sql_backend):
make_job(spark_conf=_SPARK_CONF)
make_pipeline(configuration=_PIPELINE_CONF_WITH_SECRET)
spn_crawler = AzureServicePrincipalCrawler(ws=ws, sbe=sql_backend, schema=inventory_schema)
spns = spn_crawler.snapshot()
results = []
for spn in spns:
results.append(spn)
results = spn_crawler.snapshot()

assert len(results) >= 2
assert results[0].storage_account == "storage_acct_1"
assert results[0].tenant_id == "directory_12345"
assert any(_ for _ in results if _.tenant_id == "directory_12345")
assert any(_ for _ in results if _.storage_account == "storage_acct_1")


def test_spn_crawler_with_available_secrets(
Expand All @@ -158,27 +149,16 @@ def test_spn_crawler_with_available_secrets(
make_job()
make_pipeline(configuration=_pipeline_conf_with_avlbl_secret)
spn_crawler = AzureServicePrincipalCrawler(ws=ws, sbe=sql_backend, schema=inventory_schema)
spns = spn_crawler.snapshot()
results = []
for spn in spns:
results.append(spn)

ws.secrets.delete_secret(scope=secret_scope, key=secret_key)
results = spn_crawler.snapshot()

assert len(results) >= 2
assert any(_ for _ in results if _.secret_scope == secret_scope)
assert any(_ for _ in results if _.secret_key == secret_key)


def test_workspace_object_crawler(ws, make_directory, inventory_schema, sql_backend):
new_directory = make_directory()
workspace_listing = WorkspaceListing(
ws=ws, sql_backend=sql_backend, inventory_database=inventory_schema, start_path=new_directory
)
listing_results = workspace_listing.snapshot()
results = []
for _result in listing_results:
if _result.path == new_directory:
results.append(_result)
def test_workspace_object_crawler(ws, make_notebook, inventory_schema, sql_backend):
notebook = make_notebook()
workspace_listing = WorkspaceListing(ws, sql_backend, inventory_schema)
workspace_objects = {_.path: _ for _ in workspace_listing.snapshot()}

assert len(results) == 1
assert results[0].path == new_directory
assert results[0].object_type == "DIRECTORY"
assert notebook in workspace_objects
assert "NOTEBOOK" == workspace_objects[notebook].object_type
3 changes: 1 addition & 2 deletions tests/integration/workspace_access/test_groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,15 +164,14 @@ def check_permissions_for_account_group():
for _info in group_manager.migration_state.groups:
ws.groups.delete(_info.backup.id)

@retried(on=[AssertionError], timeout=timedelta(seconds=30))
@retried(on=[AssertionError], timeout=timedelta(minutes=1))
def check_table_permissions_after_backup_delete():
logger.info("check_table_permissions_after_backup_delete()")

policy_permissions = generic_permissions.load_as_dict("cluster-policies", cluster_policy.policy_id)
assert group_info.backup.display_name not in policy_permissions

table_permissions = grants.for_table_info(dummy_table)
assert group_info.backup.display_name not in table_permissions
assert group_info.account.display_name in table_permissions
assert "SELECT" in table_permissions[group_info.account.display_name]

Expand Down
4 changes: 1 addition & 3 deletions tests/integration/workspace_access/test_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from databricks.labs.ucx.framework.parallel import Threads

logger = logging.getLogger(__name__)
Threader = partial(Threads, num_threads=40)


def _create_user(ws: WorkspaceClient, uid: str):
Expand All @@ -28,5 +27,4 @@ def _create_user(ws: WorkspaceClient, uid: str):

def test_create_users(ws):
pytest.skip("run only in debug")
executables = [partial(_create_user, ws, uid) for uid in range(200)]
Threader(executables)._run()
Threads.gather("creating fixtures", [partial(_create_user, ws, uid) for uid in range(5)])
Loading