Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Hasura aiohttp session, fix in_global_transaction wrapper, make TemporaryState abstract #66

Merged
merged 2 commits into from
Jun 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/dipdup/codegen.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ async def generate_types(self) -> None:
if root.split('/')[-1] == 'parameter':
name += '_parameter'

name = snake_to_pascal(name)
self._logger.info('Generating type `%s`', name)
args = [
'datamodel-codegen',
Expand All @@ -232,7 +233,7 @@ async def generate_types(self) -> None:
'--output',
output_path,
'--class-name',
snake_to_pascal(name),
name,
'--disable-timestamp',
'--use-default',
]
Expand Down
4 changes: 3 additions & 1 deletion src/dipdup/datasources/tzkt/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,9 @@ async def get_originations(self, addresses: Set[str], offset: int, first_level:
originations.append(self.convert_operation(op))
return originations

async def get_transactions(self, field: str, addresses: Set[str], offset: int, first_level: int, last_level: int) -> List[OperationData]:
async def get_transactions(
self, field: str, addresses: Set[str], offset: int, first_level: int, last_level: int
) -> List[OperationData]:
raw_transactions = await self._proxy.http_request(
'get',
url=f'{self._url}/v1/operations/transactions',
Expand Down
102 changes: 50 additions & 52 deletions src/dipdup/hasura.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

def _is_model_class(obj) -> bool:
"""Is subclass of tortoise.Model, but not the base class"""
return isinstance(obj, type) and issubclass(obj, Model) and obj != Model
return isinstance(obj, type) and issubclass(obj, Model) and obj != Model and not getattr(obj.Meta, 'abstract', False)


def _format_array_relationship(
Expand Down Expand Up @@ -148,60 +148,58 @@ async def configure_hasura(config: DipDupConfig):
raise ConfigurationError('`hasura` config section missing')

_logger.info('Configuring Hasura')

session = aiohttp.ClientSession()
url = config.hasura.url.rstrip("/")
hasura_metadata = await generate_hasura_metadata(config)

_logger.info('Waiting for Hasura instance to be healthy')
for _ in range(60):
with suppress(ClientConnectorError, ClientOSError):
async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession() as session:
_logger.info('Waiting for Hasura instance to be healthy')
for _ in range(60):
with suppress(ClientConnectorError, ClientOSError):
await session.get(f'{url}/healthz')
break
await asyncio.sleep(1)
else:
_logger.error('Hasura instance not responding for 60 seconds')
return

headers = {}
if config.hasura.admin_secret:
headers['X-Hasura-Admin-Secret'] = config.hasura.admin_secret

_logger.info('Fetching existing metadata')
existing_hasura_metadata = await http_request(
session,
'post',
url=f'{url}/v1/query',
data=json.dumps(
{
"type": "export_metadata",
"args": hasura_metadata,
},
),
headers=headers,
)

_logger.info('Merging existing metadata')
hasura_metadata_tables = [table['table'] for table in hasura_metadata['tables']]
for table in existing_hasura_metadata['tables']:
if table['table'] not in hasura_metadata_tables:
hasura_metadata['tables'].append(table)

_logger.info('Sending replace metadata request')
result = await http_request(
session,
'post',
url=f'{url}/v1/query',
data=json.dumps(
{
"type": "replace_metadata",
"args": hasura_metadata,
},
),
headers=headers,
)
if not result.get('message') == 'success':
_logger.error('Can\'t configure Hasura instance: %s', result)
await asyncio.sleep(1)
else:
_logger.error('Hasura instance not responding for 60 seconds')
return

headers = {}
if config.hasura.admin_secret:
headers['X-Hasura-Admin-Secret'] = config.hasura.admin_secret

_logger.info('Fetching existing metadata')
existing_hasura_metadata = await http_request(
session,
'post',
url=f'{url}/v1/query',
data=json.dumps(
{
"type": "export_metadata",
"args": hasura_metadata,
},
),
headers=headers,
)

await session.close()
_logger.info('Merging existing metadata')
hasura_metadata_tables = [table['table'] for table in hasura_metadata['tables']]
for table in existing_hasura_metadata['tables']:
if table['table'] not in hasura_metadata_tables:
hasura_metadata['tables'].append(table)

_logger.info('Sending replace metadata request')
result = await http_request(
session,
'post',
url=f'{url}/v1/query',
data=json.dumps(
{
"type": "replace_metadata",
"args": hasura_metadata,
},
),
headers=headers,
)
if not result.get('message') == 'success':
_logger.error('Can\'t configure Hasura instance: %s', result)
else:
_logger.info('Hasura instance has been configured')
3 changes: 3 additions & 0 deletions src/dipdup/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ class TemporaryState(State):
async def save(self, using_db=None, update_fields=None, force_create=False, force_update=False) -> None:
pass

class Meta:
abstract = True


@dataclass
class OperationData:
Expand Down
23 changes: 18 additions & 5 deletions src/dipdup/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
import time
from contextlib import asynccontextmanager
from logging import Logger
from typing import Any, AsyncIterator, Iterator, List, Optional, Sequence
from typing import Any, AsyncIterator, Iterator, List, Optional

import aiohttp
from tortoise import Tortoise
from tortoise.backends.asyncpg.client import AsyncpgDBClient
from tortoise.backends.base.client import TransactionContext
from tortoise.backends.sqlite.client import SqliteClient
from tortoise.transactions import in_transaction

from dipdup import __version__
Expand Down Expand Up @@ -78,14 +80,25 @@ async def in_global_transaction():
"""Enforce using transaction for all queries inside wrapped block. Works for a single DB only."""
if list(Tortoise._connections.keys()) != ['default']:
raise RuntimeError('`in_global_transaction` wrapper works only with a single DB connection')
async with in_transaction() as conn:
# NOTE: SQLite hacks
conn.filename = ''
conn.pragmas = {}

async with in_transaction() as conn:
conn: TransactionContext
original_conn = Tortoise._connections['default']
Tortoise._connections['default'] = conn

if isinstance(original_conn, SqliteClient):
conn.filename = original_conn.filename
conn.pragmas = original_conn.pragmas
elif isinstance(original_conn, AsyncpgDBClient):
conn._pool = original_conn._pool
conn._template = original_conn._template
else:
raise NotImplementedError(
'`in_global_transaction` wrapper was not tested with database backends other then aiosqlite and asyncpg'
)

yield

Tortoise._connections['default'] = original_conn


Expand Down