Skip to content

Commit

Permalink
Update functions to manage database schema (#852)
Browse files Browse the repository at this point in the history
  • Loading branch information
droserasprout authored Oct 9, 2023
1 parent b3e40f4 commit aee2240
Show file tree
Hide file tree
Showing 9 changed files with 186 additions and 100 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog], and this project adheres to [Semantic Versioning].

## [Unreleased]

### Added

- database: Added `dipdup_wipe` and `dipdup_approve` SQL functions to the schema.

### Fixed

- cli: Fixed `schema wipe` command for SQLite databases.

## [7.0.1] - 2023-09-30

### Added
Expand Down
14 changes: 7 additions & 7 deletions docs/5.advanced/1.reindexing.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ description: "In some cases, DipDup can't proceed with indexing without a full w

In some cases, DipDup can't proceed with indexing without a full wipe. Several reasons trigger reindexing:

| reason | description |
| ----------------- | ------------------------------------------------------------------------------------------------------------------------- |
| `manual` | Reindexing triggered manually from callback with `ctx.reindex`. |
| `migration` | Applied migration requires reindexing. Check release notes before switching between major DipDup versions to be prepared. |
| `rollback` | Reorg message received from TzKT can not be processed. |
| `config_modified` | One of the index configs has been modified. |
| `schema_modified` | Database schema has been modified. Try to avoid manual schema modifications in favor of [sql](../5.advanced/6.sql.md). |
| reason | description |
| ----------------- | ------------------------------------------------------------------------------------------------------------------------------ |
| `manual` | Reindexing triggered manually from callback with `ctx.reindex`. |
| `migration` | Applied migration requires reindexing. Check release notes before switching between major DipDup versions to be prepared. |
| `rollback` | Reorg message received from datasource and can not be processed. |
| `config_modified` | One of the index configs has been modified. |
| `schema_modified` | Database schema has been modified. Try to avoid manual schema modifications in favor of [SQL scripts](../5.advanced/3.sql.md). |

It is possible to configure desirable action on reindexing triggered by a specific reason.

Expand Down
39 changes: 33 additions & 6 deletions docs/5.advanced/3.internal-tables.md → docs/5.advanced/3.sql.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
---
title: "Internal tables"
description: "This page describes the internal tables used by DipDup. They are created automatically and are not intended to be modified by the user. However, they can be useful for external monitoring and debugging."
title: "Advanced SQL"
description: "Put your *.sql scripts to dipdup_indexer/sql. You can run these scripts from any callback with ctx.execute_sql('name'). If name is a directory, each script it contains will be executed."
---

# Internal tables
# Advanced SQL

This page describes the internal tables used by DipDup. They are created automatically and are not intended to be modified by the user. However, they can be useful for external monitoring and debugging.
## Internal tables

Several tables haing `dipdup_` prefix are created by DipDup automatically and are not intended to be modified by the user. However, they can be useful for external monitoring and debugging.

| table | description |
|:-------------------------- |:----------------------------------------------------------------------------------------------------------------------------------------- |
Expand All @@ -15,8 +17,8 @@ This page describes the internal tables used by DipDup. They are created automat
| `dipdup_contract` | Info about contracts used by all indexes, including ones added in runtime. |
| `dipdup_model_update` | Service table to store model diffs for database rollback. Configured by `advanced.rollback_depth` |
| `dipdup_meta` | Arbitrary key-value storage for DipDup internal use. Survives reindexing. You can use it too, but don't touch keys with `dipdup_` prefix. |
| `dipdup_contract_metadata` | See Metadata interface page |
| `dipdup_token_metadata` | See Metadata interface page |
| `dipdup_contract_metadata` | See [Metadata interface](/docs/advanced/metadata-interface) |
| `dipdup_token_metadata` | See [Metadata interface](/docs/advanced/metadata-interface) |

See [`dipdup.models` module](https://github.com/dipdup-io/dipdup/blob/next/src/dipdup/models/__init__.py) for exact table definitions.

Expand All @@ -32,3 +34,28 @@ SELECT name, status FROM dipdup_index;
-- Get last reindex time
SELECT created_at FROM dipdup_schema WHERE name = 'public';
```

## Scripts

Put your `*.sql` scripts to `{{ project.package }}/sql`. You can run these scripts from any callback with `ctx.execute_sql('name')`. If `name` is a directory, each script it contains will be executed.

Scripts are executed without being wrapped with SQL transactions. It's generally a good idea to avoid touching table data in scripts.

By default, an empty `sql/<hook_name>` directory is generated for every hook in config during init. Remove `ctx.execute_sql` call from hook callback to avoid executing them.

```python
# Execute all scripts in sql/my_hook directory
await ctx.execute_sql('my_hook')

# Execute a single script
await ctx.execute_sql('my_hook/my_script.sql')
```

## Managing schema

When using PostgreSQL as database engine you can use `dipdup_approve` and `dipdup_wipe` functions to manage schema state from SQL console if needed:

```sql
SELECT dipdup_approve('public');
SELECT dipdup_wipe('public');
```
22 changes: 0 additions & 22 deletions docs/5.advanced/6.sql.md

This file was deleted.

4 changes: 3 additions & 1 deletion src/dipdup/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,9 @@ async def schema_wipe(ctx: click.Context, immune: bool, force: bool) -> None:
conn = get_connection()
await wipe_schema(
conn=conn,
schema_name=config.database.schema_name,
schema_name=config.database.path
if isinstance(config.database, SqliteDatabaseConfig)
else config.database.schema_name,
immune_tables=immune_tables,
)

Expand Down
64 changes: 37 additions & 27 deletions src/dipdup/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,44 +194,54 @@ async def generate_schema(
conn: SupportedClient,
name: str,
) -> None:
if isinstance(conn, AsyncpgClient):
await pg_create_schema(conn, name)
if isinstance(conn, SqliteClient):
await Tortoise.generate_schemas()
elif isinstance(conn, AsyncpgClient):
await _pg_create_schema(conn, name)
await Tortoise.generate_schemas()
await _pg_create_functions(conn)
await _pg_create_views(conn)
else:
raise NotImplementedError

await Tortoise.generate_schemas()

if isinstance(conn, AsyncpgClient):
# NOTE: Create a view for monitoring head status
sql_path = Path(__file__).parent / 'sql' / 'dipdup_head_status.sql'
# TODO: Configurable interval
await execute_sql(conn, sql_path, HEAD_STATUS_TIMEOUT)
async def _pg_create_functions(conn: AsyncpgClient) -> None:
for fn in (
'dipdup_approve.sql',
'dipdup_wipe.sql',
):
sql_path = Path(__file__).parent / 'sql' / fn
await execute_sql(conn, sql_path)


async def _wipe_schema_postgres(
async def _pg_create_views(conn: AsyncpgClient) -> None:
sql_path = Path(__file__).parent / 'sql' / 'dipdup_head_status.sql'
# TODO: Configurable interval
await execute_sql(conn, sql_path, HEAD_STATUS_TIMEOUT)


async def _pg_wipe_schema(
conn: AsyncpgClient,
schema_name: str,
immune_tables: set[str],
) -> None:
immune_schema_name = f'{schema_name}_immune'

# NOTE: Create a truncate_schema function to trigger cascade deletion
sql_path = Path(__file__).parent / 'sql' / 'truncate_schema.sql'
await execute_sql(conn, sql_path, schema_name, immune_schema_name)

# NOTE: Move immune tables to a separate schema - it's free!
if immune_tables:
await pg_create_schema(conn, immune_schema_name)
await _pg_create_schema(conn, immune_schema_name)
for table in immune_tables:
await pg_move_table(conn, table, schema_name, immune_schema_name)
await _pg_move_table(conn, table, schema_name, immune_schema_name)

await conn.execute_script(f"SELECT truncate_schema('{schema_name}')")
await conn.execute_script(f"SELECT dipdup_wipe('{schema_name}')")

if immune_tables:
for table in immune_tables:
await pg_move_table(conn, table, immune_schema_name, schema_name)
await pg_drop_schema(conn, immune_schema_name)
await _pg_move_table(conn, table, immune_schema_name, schema_name)
await _pg_drop_schema(conn, immune_schema_name)


async def _wipe_schema_sqlite(
async def _sqlite_wipe_schema(
conn: SqliteClient,
path: str,
immune_tables: set[str],
Expand All @@ -245,10 +255,10 @@ async def _wipe_schema_sqlite(
await conn.execute_script(f'ATTACH DATABASE "{immune_path}" AS {namespace}')

# NOTE: Copy immune tables to the new database.
master_query = 'SELECT name, type FROM sqlite_master'
master_query = 'SELECT name FROM sqlite_master WHERE type = "table"'
result = await conn.execute_query(master_query)
for name, type_ in result[1]:
if type_ != 'table' or name not in immune_tables:
for name in result[1]:
if name not in immune_tables:
continue

expr = f'CREATE TABLE {namespace}.{name} AS SELECT * FROM {name}'
Expand All @@ -271,23 +281,23 @@ async def wipe_schema(
"""Truncate schema preserving immune tables. Executes in a transaction"""
async with conn._in_transaction() as conn:
if isinstance(conn, SqliteClient):
await _wipe_schema_sqlite(conn, schema_name, immune_tables)
await _sqlite_wipe_schema(conn, schema_name, immune_tables)
elif isinstance(conn, AsyncpgClient):
await _wipe_schema_postgres(conn, schema_name, immune_tables)
await _pg_wipe_schema(conn, schema_name, immune_tables)
else:
raise NotImplementedError


async def pg_create_schema(conn: AsyncpgClient, name: str) -> None:
async def _pg_create_schema(conn: AsyncpgClient, name: str) -> None:
"""Create PostgreSQL schema if not exists"""
await conn.execute_script(f'CREATE SCHEMA IF NOT EXISTS {name}')


async def pg_drop_schema(conn: AsyncpgClient, name: str) -> None:
async def _pg_drop_schema(conn: AsyncpgClient, name: str) -> None:
await conn.execute_script(f'DROP SCHEMA IF EXISTS {name}')


async def pg_move_table(conn: AsyncpgClient, name: str, schema: str, new_schema: str) -> None:
async def _pg_move_table(conn: AsyncpgClient, name: str, schema: str, new_schema: str) -> None:
"""Move table from one schema to another"""
await conn.execute_script(f'ALTER TABLE {schema}.{name} SET SCHEMA {new_schema}')

Expand Down
7 changes: 7 additions & 0 deletions src/dipdup/sql/dipdup_approve.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION dipdup_approve(schema_name VARCHAR) RETURNS void AS $$
BEGIN
UPDATE dipdup_index SET config_hash = null;
UPDATE dipdup_schema SET reindex = null, hash = null;
RETURN;
END;
$$ LANGUAGE plpgsql;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- source of inspiration: https://stackoverflow.com/a/11462481
CREATE OR REPLACE FUNCTION truncate_schema(schema_name VARCHAR) RETURNS void AS $$
CREATE OR REPLACE FUNCTION dipdup_wipe(schema_name VARCHAR) RETURNS void AS $$
DECLARE
rec RECORD;
BEGIN
Expand Down Expand Up @@ -63,14 +63,6 @@ BEGIN
WHEN others THEN END;
END LOOP;

-- BEGIN
-- CREATE EXTENSION IF NOT EXISTS pgcrypto;
-- CREATE EXTENSION IF NOT EXISTS timescaledb;
-- EXCEPTION
-- WHEN OTHERS THEN
-- NULL;
-- END;

RETURN;
END;
$$ LANGUAGE plpgsql;
Loading

0 comments on commit aee2240

Please sign in to comment.