Skip to content

Commit

Permalink
test(ingest/bigquery): Add performance testing framework for bigquery…
Browse files Browse the repository at this point in the history
… usage (#7690)

- Creates metadata-ingestion/tests/performance directory
- Excludes metadata-ingestion/tests from docs generation
- Updates bigquery reporting around project state
  • Loading branch information
asikowitz authored and Hyejin Yoon committed Apr 3, 2023
1 parent b5a5a50 commit b899116
Show file tree
Hide file tree
Showing 10 changed files with 428 additions and 6 deletions.
1 change: 1 addition & 0 deletions docs-website/generateDocsDir.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ function list_markdown_files(): string[] {
/^metadata-models\/docs\//, // these are used to generate docs, so we don't want to consider them here
/^metadata-ingestion\/archived\//, // these are archived, so we don't want to consider them here
/^metadata-ingestion\/docs\/sources\//, // these are used to generate docs, so we don't want to consider them here
/^metadata-ingestion\/tests\//,
/^metadata-ingestion-examples\//,
/^docker\/(?!README|datahub-upgrade|airflow\/local_airflow)/, // Drop all but a few docker docs.
/^docs\/docker\/README\.md/, // This one is just a pointer to another file.
Expand Down
2 changes: 2 additions & 0 deletions metadata-ingestion/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,11 @@ markers =
integration: marks tests to only run in integration (deselect with '-m "not integration"')
integration_batch_1: mark tests to only run in batch 1 of integration tests. This is done mainly for parallelisation (deselect with '-m not integration_batch_1')
slow_integration: marks tests that are too slow to even run in integration (deselect with '-m "not slow_integration"')
performance: marks tests that are sparingly run to measure performance (deselect with '-m "not performance"')
testpaths =
tests/unit
tests/integration
tests/performance

[coverage:run]
# Because of some quirks in the way setup.cfg, coverage.py, pytest-cov,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import re
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, ClassVar, Dict, List, Optional, Pattern, Set, Tuple
from typing import Any, ClassVar, Dict, List, Optional, Pattern, Set, Tuple, Union

from dateutil import parser

Expand Down Expand Up @@ -623,3 +623,12 @@ def from_exported_bigquery_audit_metadata(
class AuditEvent:
read_event: Optional[ReadEvent] = None
query_event: Optional[QueryEvent] = None

@classmethod
def create(cls, event: Union[ReadEvent, QueryEvent]) -> "AuditEvent":
if isinstance(event, QueryEvent):
return AuditEvent(query_event=event)
elif isinstance(event, ReadEvent):
return AuditEvent(read_event=event)
else:
raise TypeError(f"Cannot create AuditEvent: {event}")
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from datahub.ingestion.source.sql.sql_generic_profiler import ProfilingSqlReport
from datahub.utilities.lossy_collections import LossyDict, LossyList
from datahub.utilities.perf_timer import PerfTimer
from datahub.utilities.stats_collections import TopKDict

logger: logging.Logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -75,12 +76,20 @@ class BigQueryV2Report(ProfilingSqlReport):
operation_types_stat: Counter[str] = dataclasses.field(
default_factory=collections.Counter
)
current_project_status: Optional[Dict[str, Dict[str, datetime]]] = None
current_project_status: Optional[str] = None

timer: Optional[PerfTimer] = field(
default=None, init=False, repr=False, compare=False
)

def set_project_state(self, project: str, stage: str) -> None:
if self.current_project_status:
if self.timer:
logger.info(
"Previous project state was: %s",
self.to_pure_python_obj(self.current_project_status),
f"Time spent in stage <{self.current_project_status}>: "
f"{self.timer.elapsed_seconds():.2f} seconds"
)
self.current_project_status = {project: {stage: datetime.now()}}
else:
self.timer = PerfTimer()

self.current_project_status = f"{project}: {stage} at {datetime.now()}"
self.timer.start()
7 changes: 7 additions & 0 deletions metadata-ingestion/tests/performance/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Performance Testing
This module provides a framework for performance testing our ingestion sources.

When running a performance test, make sure to output print statements and live logs:
```bash
pytest -s --log-cli-level=INFO -m performance tests/performance/<test_name>.py
```
1 change: 1 addition & 0 deletions metadata-ingestion/tests/performance/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Tests for performance of ingestion, not run in CI."""
99 changes: 99 additions & 0 deletions metadata-ingestion/tests/performance/bigquery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import dataclasses
import random
import uuid
from collections import defaultdict
from typing import Dict, Iterable, List

from datahub.ingestion.source.bigquery_v2.bigquery_audit import (
AuditEvent,
BigqueryTableIdentifier,
BigQueryTableRef,
QueryEvent,
ReadEvent,
)
from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config
from tests.performance.data_model import Query, Table

# https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/BigQueryAuditMetadata.TableDataRead.Reason
READ_REASONS = [
"REASON_UNSPECIFIED",
"JOB",
"TABLEDATA_LIST_REQUEST",
"GET_QUERY_RESULTS_REQUEST",
"QUERY_REQUEST",
"CREATE_READ_SESSION",
"MATERIALIZED_VIEW_REFRESH",
]


def generate_events(
queries: Iterable[Query],
projects: List[str],
table_to_project: Dict[str, str],
config: BigQueryV2Config,
) -> Iterable[AuditEvent]:
for query in queries:
project = ( # Most queries are run in the project of the tables they access
table_to_project[
query.object_modified.name
if query.object_modified
else query.fields_accessed[0].table.name
]
if random.random() >= 0.1
else random.choice(projects)
)
job_name = str(uuid.uuid4())
yield AuditEvent.create(
QueryEvent(
job_name=job_name,
timestamp=query.timestamp,
actor_email=query.actor,
query=query.text,
statementType=query.type,
project_id=project,
destinationTable=ref_from_table(query.object_modified, table_to_project)
if query.object_modified
else None,
referencedTables=[
ref_from_table(field.table, table_to_project)
for field in query.fields_accessed
if not field.table.is_view()
],
referencedViews=[
ref_from_table(field.table, table_to_project)
for field in query.fields_accessed
if field.table.is_view()
],
payload=dataclasses.asdict(query)
if config.debug_include_full_payloads
else None,
)
)
table_accesses = defaultdict(list)
for field in query.fields_accessed:
table_accesses[ref_from_table(field.table, table_to_project)].append(
field.column
)

for ref, columns in table_accesses.items():
yield AuditEvent.create(
ReadEvent(
jobName=job_name,
timestamp=query.timestamp,
actor_email=query.actor,
resource=ref,
fieldsRead=columns,
readReason=random.choice(READ_REASONS),
payload=dataclasses.asdict(query)
if config.debug_include_full_payloads
else None,
)
)


def ref_from_table(table: Table, table_to_project: Dict[str, str]) -> BigQueryTableRef:
return BigQueryTableRef(
BigqueryTableIdentifier(
table_to_project[table.name], table.container.name, table.name
)
)
160 changes: 160 additions & 0 deletions metadata-ingestion/tests/performance/data_generation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
"""
Generates data for performance testing of warehouse sources.
In the future, we could try to create a more realistic dataset
by anonymizing and reduplicating a production datahub instance's data.
We could also get more human data by using Faker.
This is a work in progress, built piecemeal as needed.
"""
import random
import uuid
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Iterable, List, TypeVar

from tests.performance.data_model import (
Container,
FieldAccess,
Query,
StatementType,
Table,
View,
)

T = TypeVar("T")

OPERATION_TYPES: List[StatementType] = [
"INSERT",
"UPDATE",
"DELETE",
"CREATE",
"ALTER",
"DROP",
"CUSTOM",
"UNKNOWN",
]


@dataclass(frozen=True)
class NormalDistribution:
mu: float
sigma: float

def sample(self) -> int:
return int(random.gauss(mu=self.mu, sigma=self.sigma))

def sample_with_floor(self, floor: int = 1) -> int:
return max(int(random.gauss(mu=self.mu, sigma=self.sigma)), floor)


@dataclass
class SeedMetadata:
containers: List[Container]
tables: List[Table]
views: List[View]
start_time: datetime
end_time: datetime


def generate_data(
num_containers: int,
num_tables: int,
num_views: int,
columns_per_table: NormalDistribution = NormalDistribution(5, 2),
parents_per_view: NormalDistribution = NormalDistribution(2, 1),
view_definition_length: NormalDistribution = NormalDistribution(150, 50),
time_range: timedelta = timedelta(days=14),
) -> SeedMetadata:
containers = [Container(f"container-{i}") for i in range(num_containers)]
tables = [
Table(
f"table-{i}",
container=random.choice(containers),
columns=[
f"column-{j}-{uuid.uuid4()}"
for j in range(columns_per_table.sample_with_floor())
],
)
for i in range(num_tables)
]
views = [
View(
f"view-{i}",
container=random.choice(containers),
columns=[
f"column-{j}-{uuid.uuid4()}"
for j in range(columns_per_table.sample_with_floor())
],
definition=f"{uuid.uuid4()}-{'*' * view_definition_length.sample_with_floor(10)}",
parents=random.sample(tables, parents_per_view.sample_with_floor()),
)
for i in range(num_views)
]

now = datetime.now(tz=timezone.utc)
return SeedMetadata(
containers=containers,
tables=tables,
views=views,
start_time=now - time_range,
end_time=now,
)


def generate_queries(
seed_metadata: SeedMetadata,
num_selects: int,
num_operations: int,
num_users: int,
tables_per_select: NormalDistribution = NormalDistribution(3, 5),
columns_per_select: NormalDistribution = NormalDistribution(10, 5),
upstream_tables_per_operation: NormalDistribution = NormalDistribution(2, 2),
query_length: NormalDistribution = NormalDistribution(100, 50),
) -> Iterable[Query]:
all_tables = seed_metadata.tables + seed_metadata.views
users = [f"user-{i}@xyz.com" for i in range(num_users)]
for i in range(num_selects): # Pure SELECT statements
tables = _sample_list(all_tables, tables_per_select)
all_columns = [
FieldAccess(column, table) for table in tables for column in table.columns
]
yield Query(
text=f"{uuid.uuid4()}-{'*' * query_length.sample_with_floor(10)}",
type="SELECT",
actor=random.choice(users),
timestamp=_random_time_between(
seed_metadata.start_time, seed_metadata.end_time
),
fields_accessed=_sample_list(all_columns, columns_per_select),
)

for i in range(num_operations):
modified_table = random.choice(seed_metadata.tables)
n_col = len(modified_table.columns)
num_columns_modified = NormalDistribution(n_col, n_col / 2)
upstream_tables = _sample_list(all_tables, upstream_tables_per_operation)

all_columns = [
FieldAccess(column, table)
for table in upstream_tables
for column in table.columns
]
yield Query(
text=f"{uuid.uuid4()}-{'*' * query_length.sample_with_floor(10)}",
type=random.choice(OPERATION_TYPES),
actor=random.choice(users),
timestamp=_random_time_between(
seed_metadata.start_time, seed_metadata.end_time
),
fields_accessed=_sample_list(all_columns, num_columns_modified),
object_modified=modified_table,
)


def _sample_list(lst: List[T], dist: NormalDistribution) -> List[T]:
return random.sample(lst, min(dist.sample_with_floor(), len(lst)))


def _random_time_between(start: datetime, end: datetime) -> datetime:
return start + timedelta(seconds=(end - start).total_seconds() * random.random())
Loading

0 comments on commit b899116

Please sign in to comment.