Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(ingest/bigquery): Add performance testing framework for bigquery usage #7690

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions metadata-ingestion/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ markers =
integration: marks tests to only run in integration (deselect with '-m "not integration"')
integration_batch_1: mark tests to only run in batch 1 of integration tests. This is done mainly for parallelisation (deselect with '-m not integration_batch_1')
slow_integration: marks tests that are too slow to even run in integration (deselect with '-m "not slow_integration"')
performance: marks tests that are sparingly run to measure performance (deselect with '-m "not performance"')
testpaths =
tests/unit
tests/integration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import re
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, ClassVar, Dict, List, Optional, Pattern, Set, Tuple
from typing import Any, ClassVar, Dict, List, Optional, Pattern, Set, Tuple, Union

from dateutil import parser

Expand Down Expand Up @@ -623,3 +623,12 @@ def from_exported_bigquery_audit_metadata(
class AuditEvent:
read_event: Optional[ReadEvent] = None
query_event: Optional[QueryEvent] = None

@classmethod
def create(cls, event: Union[ReadEvent, QueryEvent]) -> "AuditEvent":
if isinstance(event, QueryEvent):
return AuditEvent(query_event=event)
elif isinstance(event, ReadEvent):
return AuditEvent(read_event=event)
else:
raise TypeError(f"Cannot create AuditEvent: {event}")
1 change: 1 addition & 0 deletions metadata-ingestion/tests/performance/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Tests for performance of ingestion, not run in CI."""
100 changes: 100 additions & 0 deletions metadata-ingestion/tests/performance/bigquery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import dataclasses
import random
import uuid
from collections import defaultdict
from typing import Dict, Iterable, List

from performance.data_model import Query, Table

from datahub.ingestion.source.bigquery_v2.bigquery_audit import (
AuditEvent,
BigqueryTableIdentifier,
BigQueryTableRef,
QueryEvent,
ReadEvent,
)
from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config

# https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/BigQueryAuditMetadata.TableDataRead.Reason
READ_REASONS = [
"REASON_UNSPECIFIED",
"JOB",
"TABLEDATA_LIST_REQUEST",
"GET_QUERY_RESULTS_REQUEST",
"QUERY_REQUEST",
"CREATE_READ_SESSION",
"MATERIALIZED_VIEW_REFRESH",
]


def generate_events(
queries: Iterable[Query],
projects: List[str],
table_to_project: Dict[str, str],
config: BigQueryV2Config,
) -> Iterable[AuditEvent]:
for query in queries:
project = ( # Most queries are run in the project of the tables they access
table_to_project[
query.object_modified.name
if query.object_modified
else query.fields_accessed[0].table.name
]
if random.random() >= 0.1
else random.choice(projects)
)
job_name = str(uuid.uuid4())
yield AuditEvent.create(
QueryEvent(
job_name=job_name,
timestamp=query.timestamp,
actor_email=query.actor,
query=query.text,
statementType=query.type,
project_id=project,
destinationTable=ref_from_table(query.object_modified, table_to_project)
if query.object_modified
else None,
referencedTables=[
ref_from_table(field.table, table_to_project)
for field in query.fields_accessed
if not field.table.is_view()
],
referencedViews=[
ref_from_table(field.table, table_to_project)
for field in query.fields_accessed
if field.table.is_view()
],
payload=dataclasses.asdict(query)
if config.debug_include_full_payloads
else None,
)
)
table_accesses = defaultdict(list)
for field in query.fields_accessed:
table_accesses[ref_from_table(field.table, table_to_project)].append(
field.column
)

for ref, columns in table_accesses.items():
yield AuditEvent.create(
ReadEvent(
jobName=job_name,
timestamp=query.timestamp,
actor_email=query.actor,
resource=ref,
fieldsRead=columns,
readReason=random.choice(READ_REASONS),
payload=dataclasses.asdict(query)
if config.debug_include_full_payloads
else None,
)
)


def ref_from_table(table: Table, table_to_project: Dict[str, str]) -> BigQueryTableRef:
return BigQueryTableRef(
BigqueryTableIdentifier(
table_to_project[table.name], table.container.name, table.name
)
)
155 changes: 155 additions & 0 deletions metadata-ingestion/tests/performance/data_generation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
"""
Generates data for performance testing of warehouse sources.

In the future, we could try to create a more realistic dataset
by anonymizing and reduplicating a production datahub instance's data.
We could also get more human data by using Faker.

This is a work in progress, built piecemeal as needed.
"""
import random
import uuid
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Iterable, List, TypeVar

from performance.data_model import Container, FieldAccess, Query, Table, View

from datahub.metadata.schema_classes import OperationTypeClass

T = TypeVar("T")

OperationTypes = [
OperationTypeClass.INSERT,
OperationTypeClass.UPDATE,
OperationTypeClass.DELETE,
OperationTypeClass.CREATE,
OperationTypeClass.ALTER,
OperationTypeClass.DROP,
OperationTypeClass.CUSTOM,
OperationTypeClass.UNKNOWN,
]


@dataclass(frozen=True)
class NormalDistribution:
mu: float
sigma: float

def sample(self) -> int:
return int(random.gauss(mu=self.mu, sigma=self.sigma))

def sample_with_floor(self, floor: int = 1) -> int:
return max(int(random.gauss(mu=self.mu, sigma=self.sigma)), floor)


@dataclass
class SeedMetadata:
containers: List[Container]
tables: List[Table]
views: List[View]
start_time: datetime
end_time: datetime


def generate_data(
num_containers: int,
num_tables: int,
num_views: int,
columns_per_table: NormalDistribution = NormalDistribution(5, 2),
parents_per_view: NormalDistribution = NormalDistribution(2, 1),
view_definition_length: NormalDistribution = NormalDistribution(150, 50),
time_range: timedelta = timedelta(days=14),
) -> SeedMetadata:
containers = [Container(f"container-{i}") for i in range(num_containers)]
tables = [
Table(
f"table-{i}",
container=random.choice(containers),
columns=[
f"column-{j}-{uuid.uuid4()}"
for j in range(columns_per_table.sample_with_floor())
],
)
for i in range(num_tables)
]
views = [
View(
f"view-{i}",
container=random.choice(containers),
columns=[
f"column-{j}-{uuid.uuid4()}"
for j in range(columns_per_table.sample_with_floor())
],
definition=f"{uuid.uuid4()}-{'*' * view_definition_length.sample_with_floor(10)}",
parents=random.sample(tables, parents_per_view.sample_with_floor()),
)
for i in range(num_views)
]

now = datetime.now(tz=timezone.utc)
return SeedMetadata(
containers=containers,
tables=tables,
views=views,
start_time=now - time_range,
end_time=now,
)


def generate_queries(
seed_metadata: SeedMetadata,
num_selects: int,
num_operations: int,
num_users: int,
tables_per_select: NormalDistribution = NormalDistribution(3, 5),
columns_per_select: NormalDistribution = NormalDistribution(10, 5),
upstream_tables_per_operation: NormalDistribution = NormalDistribution(2, 2),
query_length: NormalDistribution = NormalDistribution(100, 50),
) -> Iterable[Query]:
all_tables = seed_metadata.tables + seed_metadata.views
users = [f"user-{i}@xyz.com" for i in range(num_users)]
for i in range(num_selects): # Pure SELECT statements
tables = _sample_list(all_tables, tables_per_select)
all_columns = [
FieldAccess(column, table) for table in tables for column in table.columns
]
yield Query(
text=f"{uuid.uuid4()}-{'*' * query_length.sample_with_floor(10)}",
type="SELECT",
actor=random.choice(users),
timestamp=_random_time_between(
seed_metadata.start_time, seed_metadata.end_time
),
fields_accessed=_sample_list(all_columns, columns_per_select),
)

for i in range(num_operations):
modified_table = random.choice(seed_metadata.tables)
n_col = len(modified_table.columns)
num_columns_modified = NormalDistribution(n_col, n_col / 2)
upstream_tables = _sample_list(all_tables, upstream_tables_per_operation)

all_columns = [
FieldAccess(column, table)
for table in upstream_tables
for column in table.columns
]
yield Query(
text=f"{uuid.uuid4()}-{'*' * query_length.sample_with_floor(10)}",
type=random.choice(OperationTypes),
actor=random.choice(users),
timestamp=_random_time_between(
seed_metadata.start_time, seed_metadata.end_time
),
fields_accessed=_sample_list(all_columns, num_columns_modified),
object_modified=modified_table,
)


def _sample_list(lst: List[T], dist: NormalDistribution) -> List[T]:
return random.sample(lst, min(dist.sample_with_floor(), len(lst)))


def _random_time_between(start: datetime, end: datetime) -> datetime:
return start + timedelta(seconds=(end - start).total_seconds() * random.random())
58 changes: 58 additions & 0 deletions metadata-ingestion/tests/performance/data_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from dataclasses import dataclass
from datetime import datetime
from typing import List, Optional

from typing_extensions import Literal

Column = str
StatementType = Literal[ # SELECT + values from OperationTypeClass
"SELECT",
"INSERT",
"UPDATE",
"DELETE",
"CREATE",
"ALTER",
"DROP",
"CUSTOM",
"UNKNOWN",
]


@dataclass
class Container:
name: str


@dataclass
class Table:
name: str
container: Container
columns: List[Column]

def is_view(self) -> bool:
return False


@dataclass
class View(Table):
definition: str
parents: List[Table]

def is_view(self) -> bool:
return True


@dataclass
class FieldAccess:
column: Column
table: Table


@dataclass
class Query:
text: str
type: StatementType
actor: str
timestamp: datetime
fields_accessed: List[FieldAccess] # Has at least one entry
object_modified: Optional[Table] = None # Can be only part of a table
Loading