Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(ingest/bigquery): Add performance testing framework for bigquery usage #7690

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions metadata-ingestion/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ markers =
testpaths =
tests/unit
tests/integration
tests/performance

[coverage:run]
# Because of some quirks in the way setup.cfg, coverage.py, pytest-cov,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from datahub.ingestion.source.sql.sql_generic_profiler import ProfilingSqlReport
from datahub.utilities.lossy_collections import LossyDict, LossyList
from datahub.utilities.perf_timer import PerfTimer
from datahub.utilities.stats_collections import TopKDict

logger: logging.Logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -75,12 +76,20 @@ class BigQueryV2Report(ProfilingSqlReport):
operation_types_stat: Counter[str] = dataclasses.field(
default_factory=collections.Counter
)
current_project_status: Optional[Dict[str, Dict[str, datetime]]] = None
current_project_status: Optional[str] = None

timer: Optional[PerfTimer] = field(
default=None, init=False, repr=False, compare=False
)

def set_project_state(self, project: str, stage: str) -> None:
if self.current_project_status:
if self.timer:
logger.info(
"Previous project state was: %s",
self.to_pure_python_obj(self.current_project_status),
f"Time spent in stage <{self.current_project_status}>: "
f"{self.timer.elapsed_seconds():.2f} seconds"
)
self.current_project_status = {project: {stage: datetime.now()}}
else:
self.timer = PerfTimer()

self.current_project_status = f"{project}: {stage} at {datetime.now()}"
self.timer.start()
7 changes: 7 additions & 0 deletions metadata-ingestion/tests/performance/README.md
Copy link
Collaborator

@hsheth2 hsheth2 Mar 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we'll probably need to add a rule to exclude this file from our docs site - can add that in the generateDocsDir script in docs-website

Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Performance Testing
This module provides a framework for performance testing our ingestion sources.

When running a performance test, make sure to output print statements and live logs:
```bash
pytest -s --log-cli-level=INFO -m performance tests/performance/<test_name>.py
```
3 changes: 1 addition & 2 deletions metadata-ingestion/tests/performance/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
from collections import defaultdict
from typing import Dict, Iterable, List

from performance.data_model import Query, Table

from datahub.ingestion.source.bigquery_v2.bigquery_audit import (
AuditEvent,
BigqueryTableIdentifier,
Expand All @@ -14,6 +12,7 @@
ReadEvent,
)
from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config
from tests.performance.data_model import Query, Table

# https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/BigQueryAuditMetadata.TableDataRead.Reason
READ_REASONS = [
Expand Down
31 changes: 18 additions & 13 deletions metadata-ingestion/tests/performance/data_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,26 @@
from datetime import datetime, timedelta, timezone
from typing import Iterable, List, TypeVar

from performance.data_model import Container, FieldAccess, Query, Table, View

from datahub.metadata.schema_classes import OperationTypeClass
from tests.performance.data_model import (
Container,
FieldAccess,
Query,
StatementType,
Table,
View,
)

T = TypeVar("T")

OperationTypes = [
OperationTypeClass.INSERT,
OperationTypeClass.UPDATE,
OperationTypeClass.DELETE,
OperationTypeClass.CREATE,
OperationTypeClass.ALTER,
OperationTypeClass.DROP,
OperationTypeClass.CUSTOM,
OperationTypeClass.UNKNOWN,
OPERATION_TYPES: List[StatementType] = [
"INSERT",
"UPDATE",
"DELETE",
"CREATE",
"ALTER",
"DROP",
"CUSTOM",
"UNKNOWN",
]


Expand Down Expand Up @@ -137,7 +142,7 @@ def generate_queries(
]
yield Query(
text=f"{uuid.uuid4()}-{'*' * query_length.sample_with_floor(10)}",
type=random.choice(OperationTypes),
type=random.choice(OPERATION_TYPES),
actor=random.choice(users),
timestamp=_random_time_between(
seed_metadata.start_time, seed_metadata.end_time
Expand Down
32 changes: 6 additions & 26 deletions metadata-ingestion/tests/performance/test_bigquery_usage.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
import logging
import os
import random
import sys
from datetime import timedelta
from typing import Generator

import humanfriendly
import psutil
import pytest
from performance.bigquery import generate_events, ref_from_table
from performance.data_generation import generate_data, generate_queries

from datahub.ingestion.source.bigquery_v2.bigquery_config import (
BigQueryUsageConfig,
Expand All @@ -21,34 +17,18 @@
)
from datahub.ingestion.source.bigquery_v2.usage import BigQueryUsageExtractor
from datahub.utilities.perf_timer import PerfTimer
from tests.performance.bigquery import generate_events, ref_from_table
from tests.performance.data_generation import generate_data, generate_queries


def set_log_level(logger: logging.Logger, level: int) -> Generator[None, None, None]:
old_log_level = logger.level
try:
logger.setLevel(level)
yield
finally:
logger.setLevel(old_log_level)
pytestmark = pytest.mark.performance


@pytest.fixture(autouse=True)
def default_log_level_error():
root_logger = logging.getLogger()
stream_handler = logging.StreamHandler(sys.stdout)
try:
root_logger.addHandler(stream_handler)
yield from set_log_level(root_logger, logging.ERROR)
finally:
root_logger.removeHandler(stream_handler)


@pytest.fixture
def report_log_level_info():
yield from set_log_level(report_logger, logging.INFO)
def report_log_level_info(caplog):
with caplog.at_level(logging.INFO, logger=report_logger.name):
yield


@pytest.mark.performance
def test_bigquery_usage(report_log_level_info):
report = BigQueryV2Report()
report.set_project_state("All", "Seed Data Generation")
Expand Down