Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Many small performance tweaks #241

Merged
merged 6 commits into from
Nov 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,20 @@ New features:
- Enabled by default for databases that support it (mysql and postgres) with a minimum pool size of 1, and a maximum of 5
- Not supported by sqlite
- Can be changed by passing the ``minsize`` and ``maxsize`` connection parameters
- Many small performance tweaks:
- Overhead of query generation has been reduced by about 6%
- Bulk inserts are ensured to be wrapped in a transaction for >50% speedup
- PostgreSQL prepared queries now use a LRU cache for significant >2x speedup on inserts/updates/deletes

Deprecations:
^^^^^^^^^^^^^
- ``start_transaction`` is deprecated, please use ``@atomic()`` or ``async with in_transaction():`` instead.
- **This release brings with it, deprecation of Python 3.6 / PyPy-3.6:**

This is due to small differences with how the backported ``aiocontextvars`` behaves
in comparison to the built-in in Python 3.7+.

There is a known context confusion, specifically regarding nested transactions.

0.14.2
------
Expand Down
4 changes: 2 additions & 2 deletions tests/test_two_databases.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async def test_two_databases(self):
await self.db.execute_query("SELECT * FROM eventtwo")

results = await self.second_db.execute_query("SELECT * FROM eventtwo")
self.assertEqual(dict(results[0].items()), {"id": 1, "name": "Event", "tournament_id": 1})
self.assertEqual(dict(results[0]), {"id": 1, "name": "Event", "tournament_id": 1})

async def test_two_databases_relation(self):
tournament = await Tournament.create(name="Tournament")
Expand All @@ -41,7 +41,7 @@ async def test_two_databases_relation(self):
await self.db.execute_query("SELECT * FROM eventtwo")

results = await self.second_db.execute_query("SELECT * FROM eventtwo")
self.assertEqual(dict(results[0].items()), {"id": 1, "name": "Event", "tournament_id": 1})
self.assertEqual(dict(results[0]), {"id": 1, "name": "Event", "tournament_id": 1})

teams = []
for i in range(2):
Expand Down
3 changes: 3 additions & 0 deletions tortoise/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from inspect import isclass
from typing import Any, Coroutine, Dict, List, Optional, Tuple, Type, Union, cast

from pypika import Table

from tortoise import fields
from tortoise.backends.base.client import BaseDBAsyncClient
from tortoise.backends.base.config_generator import expand_db_url, generate_config
Expand Down Expand Up @@ -466,6 +468,7 @@ def _build_initial_querysets(cls) -> None:
for app in cls.apps.values():
for model in app.values():
model._meta.finalise_model()
model._meta.basetable = Table(model._meta.table)
model._meta.basequery = model._meta.db.query_class.from_(model._meta.table)
model._meta.basequery_all_fields = model._meta.basequery.select(
*model._meta.db_fields
Expand Down
77 changes: 39 additions & 38 deletions tortoise/backends/asyncpg/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,43 +31,35 @@ def retry_connection(func):
@wraps(func)
async def retry_connection_(self, *args):
try:
return await func(self, *args)
except (
asyncpg.PostgresConnectionError,
asyncpg.ConnectionDoesNotExistError,
asyncpg.ConnectionFailureError,
asyncpg.InterfaceError,
):
# Here we assume that a connection error has happened
# Re-create connection and re-try the function call once only.
if getattr(self, "transaction", None):
self._finalized = True
raise TransactionManagementError("Connection gone away during transaction")
logging.info("Attempting reconnect")
try:
async with self.acquire_connection():
logging.info("Reconnected")
except Exception as e:
raise DBConnectionError(f"Failed to reconnect: {str(e)}")

return await func(self, *args)

return retry_connection_


def translate_exceptions(func):
@wraps(func)
async def translate_exceptions_(self, *args):
try:
return await func(self, *args)
return await func(self, *args)
except (
asyncpg.PostgresConnectionError,
asyncpg.ConnectionDoesNotExistError,
asyncpg.ConnectionFailureError,
asyncpg.InterfaceError,
):
# Here we assume that a connection error has happened
# Re-create connection and re-try the function call once only.
if getattr(self, "transaction", None):
self._finalized = True
raise TransactionManagementError("Connection gone away during transaction")
logging.info("Attempting reconnect")
try:
async with self.acquire_connection():
logging.info("Reconnected")
except Exception as e:
raise DBConnectionError(f"Failed to reconnect: {str(e)}")

return await func(self, *args)
except asyncpg.SyntaxOrAccessError as exc:
raise OperationalError(exc)
except asyncpg.IntegrityConstraintViolationError as exc:
raise IntegrityError(exc)
except asyncpg.InvalidTransactionStateError as exc:
raise TransactionManagementError(exc)

return translate_exceptions_
return retry_connection_


class AsyncpgDBClient(BaseDBAsyncClient):
Expand Down Expand Up @@ -151,35 +143,37 @@ def acquire_connection(self) -> Union["PoolConnectionWrapper", "ConnectionWrappe
def _in_transaction(self) -> "TransactionContext":
return TransactionContextPooled(TransactionWrapper(self))

@translate_exceptions
@retry_connection
async def execute_insert(self, query: str, values: list) -> Optional[asyncpg.Record]:
async with self.acquire_connection() as connection:
self.log.debug("%s: %s", query, values)
# TODO: Cache prepared statement
stmt = await connection.prepare(query)
return await stmt.fetchrow(*values)
return await connection.fetchrow(query, *values)

@translate_exceptions
@retry_connection
async def execute_many(self, query: str, values: list) -> None:
async with self.acquire_connection() as connection:
self.log.debug("%s: %s", query, values)
# TODO: Consider using copy_records_to_table instead
await connection.executemany(query, values)
transaction = connection.transaction()
await transaction.start()
try:
await connection.executemany(query, values)
except Exception:
await transaction.rollback()
raise
else:
await transaction.commit()

@translate_exceptions
@retry_connection
async def execute_query(self, query: str, values: Optional[list] = None) -> List[dict]:
async with self.acquire_connection() as connection:
self.log.debug("%s: %s", query, values)
if values:
# TODO: Cache prepared statement
stmt = await connection.prepare(query)
return await stmt.fetch(*values)
return await connection.fetch(query, *values)
return await connection.fetch(query)

@translate_exceptions
@retry_connection
async def execute_script(self, query: str) -> None:
async with self.acquire_connection() as connection:
Expand All @@ -206,6 +200,13 @@ async def create_connection(self, with_db: bool) -> None:
def acquire_connection(self) -> "ConnectionWrapper":
return ConnectionWrapper(self._connection, self._lock)

@retry_connection
async def execute_many(self, query: str, values: list) -> None:
async with self.acquire_connection() as connection:
self.log.debug("%s: %s", query, values)
# TODO: Consider using copy_records_to_table instead
await connection.executemany(query, values)

@retry_connection
async def start(self) -> None:
self.transaction = self._connection.transaction()
Expand Down
4 changes: 2 additions & 2 deletions tortoise/backends/asyncpg/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import List, Optional

import asyncpg
from pypika import Parameter, Table
from pypika import Parameter

from tortoise import Model
from tortoise.backends.base.executor import BaseExecutor
Expand All @@ -17,7 +17,7 @@ def Parameter(self, pos: int) -> Parameter:

def _prepare_insert_statement(self, columns: List[str]) -> str:
query = (
self.db.query_class.into(Table(self.model._meta.table))
self.db.query_class.into(self.model._meta.basetable)
.columns(*columns)
.insert(*[self.Parameter(i) for i in range(len(columns))])
)
Expand Down
8 changes: 4 additions & 4 deletions tortoise/backends/base/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __init__(
else:
self.column_map[column] = field_object.to_db_value

table = Table(self.model._meta.table)
table = self.model._meta.basetable
self.delete_query = str(
self.model._meta.basequery.where(
table[self.model._meta.db_pk_field] == self.Parameter(0)
Expand Down Expand Up @@ -110,7 +110,7 @@ def _prepare_insert_statement(self, columns: List[str]) -> str:
# Each db has it's own methods for it, so each implementation should
# go to descendant executors
return str(
self.db.query_class.into(Table(self.model._meta.table))
self.db.query_class.into(self.model._meta.basetable)
.columns(*columns)
.insert(*[self.Parameter(i) for i in range(len(columns))])
)
Expand Down Expand Up @@ -148,7 +148,7 @@ def get_update_sql(self, update_fields: Optional[List[str]]) -> str:
if key in self.update_cache:
return self.update_cache[key]

table = Table(self.model._meta.table)
table = self.model._meta.basetable
query = self.db.query_class.update(table)
count = 0
for field in update_fields or self.model._meta.fields_db_projection.keys():
Expand Down Expand Up @@ -224,7 +224,7 @@ async def _prefetch_m2m_relation(self, instance_list: list, field: str, related_
.where(through_table[field_object.backward_key].isin(instance_id_set))
)

related_query_table = Table(related_query.model._meta.table)
related_query_table = related_query.model._meta.basetable
related_pk_field = related_query.model._meta.db_pk_field
query = (
related_query.query.join(subquery)
Expand Down
85 changes: 46 additions & 39 deletions tortoise/backends/mysql/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,35 +31,27 @@ def retry_connection(func):
@wraps(func)
async def retry_connection_(self, *args):
try:
return await func(self, *args)
except (
RuntimeError,
pymysql.err.OperationalError,
pymysql.err.InternalError,
pymysql.err.InterfaceError,
):
# Here we assume that a connection error has happened
# Re-create connection and re-try the function call once only.
if getattr(self, "_finalized", None) is False:
raise TransactionManagementError("Connection gone away during transaction")
logging.info("Attempting reconnect")
try:
async with self.acquire_connection() as connection:
await connection.ping()
logging.info("Reconnected")
except Exception as e:
raise DBConnectionError("Failed to reconnect: %s", str(e))

return await func(self, *args)

return retry_connection_


def translate_exceptions(func):
@wraps(func)
async def translate_exceptions_(self, *args):
try:
return await func(self, *args)
return await func(self, *args)
except (
RuntimeError,
pymysql.err.OperationalError,
pymysql.err.InternalError,
pymysql.err.InterfaceError,
):
# Here we assume that a connection error has happened
# Re-create connection and re-try the function call once only.
if getattr(self, "_finalized", None) is False:
raise TransactionManagementError("Connection gone away during transaction")
logging.info("Attempting reconnect")
try:
async with self.acquire_connection() as connection:
await connection.ping()
logging.info("Reconnected")
except Exception as e:
raise DBConnectionError("Failed to reconnect: %s", str(e))

return await func(self, *args)
except (
pymysql.err.OperationalError,
pymysql.err.ProgrammingError,
Expand All @@ -71,7 +63,7 @@ async def translate_exceptions_(self, *args):
except pymysql.err.IntegrityError as exc:
raise IntegrityError(exc)

return translate_exceptions_
return retry_connection_


class MySQLClient(BaseDBAsyncClient):
Expand Down Expand Up @@ -163,7 +155,6 @@ def acquire_connection(self) -> Union["ConnectionWrapper", "PoolConnectionWrappe
def _in_transaction(self) -> "TransactionContext":
return TransactionContextPooled(TransactionWrapper(self))

@translate_exceptions
@retry_connection
async def execute_insert(self, query: str, values: list) -> int:
async with self.acquire_connection() as connection:
Expand All @@ -172,26 +163,35 @@ async def execute_insert(self, query: str, values: list) -> int:
await cursor.execute(query, values)
return cursor.lastrowid # return auto-generated id

@translate_exceptions
@retry_connection
async def execute_many(self, query: str, values: list) -> None:
async with self.acquire_connection() as connection:
self.log.debug("%s: %s", query, values)
async with connection.cursor() as cursor:
await cursor.executemany(query, values)
if self.capabilities.supports_transactions:
await connection.begin()
try:
await cursor.executemany(query, values)
except Exception:
await connection.rollback()
raise
else:
await connection.commit()
else:
await cursor.executemany(query, values)

@translate_exceptions
@retry_connection
async def execute_query(
self, query: str, values: Optional[list] = None
) -> List[aiomysql.DictCursor]:
async def execute_query(self, query: str, values: Optional[list] = None) -> List[dict]:
async with self.acquire_connection() as connection:
self.log.debug("%s: %s", query, values)
async with connection.cursor(aiomysql.DictCursor) as cursor:
async with connection.cursor() as cursor:
await cursor.execute(query, values)
return await cursor.fetchall()
rows = await cursor.fetchall()
if rows:
fields = [f.name for f in cursor._result.fields]
return [dict(zip(fields, row)) for row in rows]
return []

@translate_exceptions
@retry_connection
async def execute_script(self, query: str) -> None:
async with self.acquire_connection() as connection:
Expand Down Expand Up @@ -219,6 +219,13 @@ async def create_connection(self, with_db: bool) -> None:
def acquire_connection(self) -> ConnectionWrapper:
return ConnectionWrapper(self._connection, self._lock)

@retry_connection
async def execute_many(self, query: str, values: list) -> None:
async with self.acquire_connection() as connection:
self.log.debug("%s: %s", query, values)
async with connection.cursor() as cursor:
await cursor.executemany(query, values)

@retry_connection
async def start(self) -> None:
await self._connection.begin()
Expand Down
Loading