Skip to content

Commit

Permalink
Helpers for testing DipDup projects (#864)
Browse files Browse the repository at this point in the history
  • Loading branch information
droserasprout authored Oct 20, 2023
1 parent f0187c2 commit 21ad10f
Show file tree
Hide file tree
Showing 13 changed files with 423 additions and 201 deletions.
1 change: 1 addition & 0 deletions src/dipdup/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ async def schema_wipe(ctx: click.Context, immune: bool, force: bool) -> None:
models=models,
timeout=config.database.connection_timeout,
decimal_precision=config.advanced.decimal_precision,
unsafe_sqlite=config.advanced.unsafe_sqlite,
):
conn = get_connection()
await wipe_schema(
Expand Down
35 changes: 33 additions & 2 deletions src/dipdup/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,40 @@ async def _pg_create_functions(conn: AsyncpgClient) -> None:
await execute_sql(conn, sql_path)


async def get_tables() -> set[str]:
conn = get_connection()
if isinstance(conn, SqliteClient):
_, sqlite_res = await conn.execute_query('SELECT name FROM sqlite_master WHERE type = "table";')
return {row[0] for row in sqlite_res}
if isinstance(conn, AsyncpgClient):
_, postgres_res = await conn.execute_query(
"SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' AND table_type = 'BASE TABLE'"
)
return {row[0] for row in postgres_res}

raise NotImplementedError


async def _pg_create_views(conn: AsyncpgClient) -> None:
sql_path = Path(__file__).parent / 'sql' / 'dipdup_head_status.sql'
# TODO: Configurable interval
await execute_sql(conn, sql_path, HEAD_STATUS_TIMEOUT)


# FIXME: Private but used in dipdup.hasura
async def _pg_get_views(conn: AsyncpgClient, schema_name: str) -> list[str]:
return [
row[0]
for row in (
await conn.execute_query(
"SELECT table_name FROM information_schema.views WHERE table_schema ="
f" '{schema_name}' UNION SELECT matviewname as table_name FROM pg_matviews"
f" WHERE schemaname = '{schema_name}'"
)
)[1]
]


async def _pg_wipe_schema(
conn: AsyncpgClient,
schema_name: str,
Expand Down Expand Up @@ -257,8 +285,11 @@ async def _sqlite_wipe_schema(
# NOTE: Copy immune tables to the new database.
master_query = 'SELECT name FROM sqlite_master WHERE type = "table"'
result = await conn.execute_query(master_query)
for name in result[1]:
if name not in immune_tables: # type: ignore[comparison-overlap]
for row in result[1]:
name = row[0]
if name == 'sqlite_sequence':
continue
if name not in immune_tables:
continue

expr = f'CREATE TABLE {namespace}.{name} AS SELECT * FROM {name}'
Expand Down
17 changes: 5 additions & 12 deletions src/dipdup/hasura.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
from dipdup.config import HttpConfig
from dipdup.config import PostgresDatabaseConfig
from dipdup.config import ResolvedHttpConfig
from dipdup.database import AsyncpgClient
from dipdup.database import _pg_get_views
from dipdup.database import get_connection
from dipdup.database import iter_models
from dipdup.exceptions import ConfigurationError
Expand Down Expand Up @@ -328,18 +330,9 @@ async def _apply_custom_metadata(self) -> None:

async def _get_views(self) -> list[str]:
conn = get_connection()
views = [
row[0]
for row in (
await conn.execute_query(
"SELECT table_name FROM information_schema.views WHERE table_schema ="
f" '{self._database_config.schema_name}' UNION SELECT matviewname as table_name FROM pg_matviews"
f" WHERE schemaname = '{self._database_config.schema_name}'"
)
)[1]
]
self._logger.info('Found %s regular and materialized views', len(views))
return views
if not isinstance(conn, AsyncpgClient):
raise HasuraError('Hasura integration requires `postgres` database client')
return await _pg_get_views(conn, self._database_config.schema_name)

def _iterate_graphql_queries(self) -> Iterator[tuple[str, str]]:
graphql_path = env.get_package_path(self._package) / 'graphql'
Expand Down
159 changes: 159 additions & 0 deletions src/dipdup/test.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,30 @@
"""This module contains helper functions for testing DipDup projects.
These helpers are not part of the public API and can be changed without prior notice.
"""
import asyncio
import atexit
import os
import tempfile
from collections.abc import AsyncIterator
from contextlib import AsyncExitStack
from contextlib import asynccontextmanager
from pathlib import Path
from shutil import which
from typing import TYPE_CHECKING
from typing import Any

from dipdup.config import DipDupConfig
from dipdup.config import HasuraConfig
from dipdup.config import PostgresDatabaseConfig
from dipdup.dipdup import DipDup
from dipdup.exceptions import FrameworkException
from dipdup.index import Index
from dipdup.project import get_default_answers
from dipdup.yaml import DipDupYAMLConfig

if TYPE_CHECKING:
from docker.client import DockerClient # type: ignore[import-untyped]


async def create_dummy_dipdup(
Expand Down Expand Up @@ -47,3 +67,142 @@ async def spawn_index(dipdup: DipDup, name: str) -> Index[Any, Any, Any]:
index: Index[Any, Any, Any] = await dispatcher._ctx._spawn_index(name)
dispatcher._indexes[name] = dispatcher._ctx._pending_indexes.pop()
return index


def get_docker_client() -> 'DockerClient':
"""Get Docker client instance if socket is available; skip test otherwise."""
import _pytest.outcomes
from docker.client import DockerClient

docker_socks = (
Path('/var/run/docker.sock'),
Path.home() / 'Library' / 'Containers' / 'com.docker.docker' / 'Data' / 'vms' / '0' / 'docker.sock',
Path.home() / 'Library' / 'Containers' / 'com.docker.docker' / 'Data' / 'docker.sock',
)
for path in docker_socks:
if path.exists():
return DockerClient(base_url=f'unix://{path}')

raise _pytest.outcomes.Skipped( # pragma: no cover
'Docker socket not found',
allow_module_level=True,
)


async def run_postgres_container() -> PostgresDatabaseConfig:
"""Run Postgres container (destroyed on exit) and return database config with its IP."""
docker = get_docker_client()
postgres_container = docker.containers.run(
image=get_default_answers()['postgres_image'],
environment={
'POSTGRES_USER': 'test',
'POSTGRES_PASSWORD': 'test',
'POSTGRES_DB': 'test',
},
detach=True,
remove=True,
)
atexit.register(postgres_container.stop)
postgres_container.reload()
postgres_ip = postgres_container.attrs['NetworkSettings']['IPAddress']

while not postgres_container.exec_run('pg_isready').exit_code == 0:
await asyncio.sleep(0.1)

return PostgresDatabaseConfig(
kind='postgres',
host=postgres_ip,
port=5432,
user='test',
database='test',
password='test',
)


async def run_hasura_container(postgres_ip: str) -> HasuraConfig:
"""Run Hasura container (destroyed on exit) and return config with its IP."""
docker = get_docker_client()
hasura_container = docker.containers.run(
image=get_default_answers()['hasura_image'],
environment={
'HASURA_GRAPHQL_DATABASE_URL': f'postgres://test:test@{postgres_ip}:5432',
},
detach=True,
remove=True,
)
atexit.register(hasura_container.stop)
hasura_container.reload()
hasura_ip = hasura_container.attrs['NetworkSettings']['IPAddress']

return HasuraConfig(
url=f'http://{hasura_ip}:8080',
source='new_source',
create_source=True,
)


@asynccontextmanager
async def tmp_project(
config_paths: list[Path],
package: str,
exists: bool,
env: dict[str, str] | None = None,
) -> AsyncIterator[tuple[Path, dict[str, str]]]:
"""Create a temporary isolated DipDup project."""
with tempfile.TemporaryDirectory() as tmp_package_path:
# NOTE: Dump config
config, _ = DipDupYAMLConfig.load(config_paths, environment=False)
tmp_config_path = Path(tmp_package_path) / 'dipdup.yaml'
tmp_config_path.write_text(config.dump())

# NOTE: Symlink packages and executables
tmp_bin_path = Path(tmp_package_path) / 'bin'
tmp_bin_path.mkdir()
for executable in ('dipdup', 'datamodel-codegen'):
if (executable_path := which(executable)) is None:
raise FrameworkException(f'Executable `{executable}` not found') # pragma: no cover
os.symlink(executable_path, tmp_bin_path / executable)

os.symlink(
Path(__file__).parent.parent / 'dipdup',
Path(tmp_package_path) / 'dipdup',
)

# NOTE: Ensure that `run` uses existing package and `init` creates a new one
if exists:
os.symlink(
Path(__file__).parent.parent / package,
Path(tmp_package_path) / package,
)

# NOTE: Prepare environment
env = {
**os.environ,
**(env or {}),
'PATH': str(tmp_bin_path),
'PYTHONPATH': str(tmp_package_path),
'DIPDUP_TEST': '1',
'DIPDUP_DEBUG': '1',
}

yield Path(tmp_package_path), env


async def run_in_tmp(
tmp_path: Path,
env: dict[str, str],
*args: str,
) -> None:
"""Run DipDup in existing temporary project."""
tmp_config_path = Path(tmp_path) / 'dipdup.yaml'

proc = await asyncio.subprocess.create_subprocess_shell(
f'dipdup -c {tmp_config_path} {" ".join(args)}',
cwd=tmp_path,
shell=True,
env=env,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
await proc.communicate()
assert proc.returncode == 0
4 changes: 1 addition & 3 deletions tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@
env.set_test()


CONFIGS_PATH = Path(__file__).parent / 'configs'
REPLAYS_PATH = Path(__file__).parent / 'replays'
SRC_PATH = Path(__file__).parent.parent / 'src'
TEST_CONFIGS = Path(__file__).parent / 'configs'


@asynccontextmanager
Expand Down
6 changes: 6 additions & 0 deletions tests/configs/test_postgres.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
database:
kind: postgres
host: ${POSTGRES_HOST}
user: test
password: test
database: test
11 changes: 11 additions & 0 deletions tests/configs/test_postgres_immune.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
database:
kind: postgres
host: ${POSTGRES_HOST}
user: test
password: test
database: test

immune_tables:
- tld
- domain
- test
File renamed without changes.
11 changes: 11 additions & 0 deletions tests/configs/test_sqlite_immune.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
database:
kind: sqlite
path: db.sqlite3

immune_tables:
- tld
- domain
- test

advanced:
unsafe_sqlite: true
Loading

0 comments on commit 21ad10f

Please sign in to comment.