Skip to content

Commit

Permalink
Many small performance tweaks (#241)
Browse files Browse the repository at this point in the history
* Various small performance tweaks
* Don't re-create Table object unnesesarily
* Merge decorators to remove a function call for PostgreSQL/MySQL
* Bulk inserts are ensured to be wrapped in a transaction for >50% speedup
* PostgreSQL prepared queries now use a LRU cache for significant >2x speedup on inserts/updates/deletes
* Ligther-weight mysql cursor-to-dict implementation
* Warn about py3.6 issue, and mark as deprecated (pending)
  • Loading branch information
grigi authored Nov 20, 2019
1 parent d6e9fd9 commit 1d53abe
Show file tree
Hide file tree
Showing 12 changed files with 159 additions and 112 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,20 @@ New features:
- Enabled by default for databases that support it (mysql and postgres) with a minimum pool size of 1, and a maximum of 5
- Not supported by sqlite
- Can be changed by passing the ``minsize`` and ``maxsize`` connection parameters
- Many small performance tweaks:
- Overhead of query generation has been reduced by about 6%
- Bulk inserts are ensured to be wrapped in a transaction for >50% speedup
- PostgreSQL prepared queries now use a LRU cache for significant >2x speedup on inserts/updates/deletes

Deprecations:
^^^^^^^^^^^^^
- ``start_transaction`` is deprecated, please use ``@atomic()`` or ``async with in_transaction():`` instead.
- **This release brings with it, deprecation of Python 3.6 / PyPy-3.6:**

This is due to small differences with how the backported ``aiocontextvars`` behaves
in comparison to the built-in in Python 3.7+.

There is a known context confusion, specifically regarding nested transactions.

0.14.2
------
Expand Down
4 changes: 2 additions & 2 deletions tests/test_two_databases.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async def test_two_databases(self):
await self.db.execute_query("SELECT * FROM eventtwo")

results = await self.second_db.execute_query("SELECT * FROM eventtwo")
self.assertEqual(dict(results[0].items()), {"id": 1, "name": "Event", "tournament_id": 1})
self.assertEqual(dict(results[0]), {"id": 1, "name": "Event", "tournament_id": 1})

async def test_two_databases_relation(self):
tournament = await Tournament.create(name="Tournament")
Expand All @@ -41,7 +41,7 @@ async def test_two_databases_relation(self):
await self.db.execute_query("SELECT * FROM eventtwo")

results = await self.second_db.execute_query("SELECT * FROM eventtwo")
self.assertEqual(dict(results[0].items()), {"id": 1, "name": "Event", "tournament_id": 1})
self.assertEqual(dict(results[0]), {"id": 1, "name": "Event", "tournament_id": 1})

teams = []
for i in range(2):
Expand Down
3 changes: 3 additions & 0 deletions tortoise/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from inspect import isclass
from typing import Any, Coroutine, Dict, List, Optional, Tuple, Type, Union, cast

from pypika import Table

from tortoise import fields
from tortoise.backends.base.client import BaseDBAsyncClient
from tortoise.backends.base.config_generator import expand_db_url, generate_config
Expand Down Expand Up @@ -466,6 +468,7 @@ def _build_initial_querysets(cls) -> None:
for app in cls.apps.values():
for model in app.values():
model._meta.finalise_model()
model._meta.basetable = Table(model._meta.table)
model._meta.basequery = model._meta.db.query_class.from_(model._meta.table)
model._meta.basequery_all_fields = model._meta.basequery.select(
*model._meta.db_fields
Expand Down
77 changes: 39 additions & 38 deletions tortoise/backends/asyncpg/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,43 +31,35 @@ def retry_connection(func):
@wraps(func)
async def retry_connection_(self, *args):
try:
return await func(self, *args)
except (
asyncpg.PostgresConnectionError,
asyncpg.ConnectionDoesNotExistError,
asyncpg.ConnectionFailureError,
asyncpg.InterfaceError,
):
# Here we assume that a connection error has happened
# Re-create connection and re-try the function call once only.
if getattr(self, "transaction", None):
self._finalized = True
raise TransactionManagementError("Connection gone away during transaction")
logging.info("Attempting reconnect")
try:
async with self.acquire_connection():
logging.info("Reconnected")
except Exception as e:
raise DBConnectionError(f"Failed to reconnect: {str(e)}")

return await func(self, *args)

return retry_connection_


def translate_exceptions(func):
@wraps(func)
async def translate_exceptions_(self, *args):
try:
return await func(self, *args)
return await func(self, *args)
except (
asyncpg.PostgresConnectionError,
asyncpg.ConnectionDoesNotExistError,
asyncpg.ConnectionFailureError,
asyncpg.InterfaceError,
):
# Here we assume that a connection error has happened
# Re-create connection and re-try the function call once only.
if getattr(self, "transaction", None):
self._finalized = True
raise TransactionManagementError("Connection gone away during transaction")
logging.info("Attempting reconnect")
try:
async with self.acquire_connection():
logging.info("Reconnected")
except Exception as e:
raise DBConnectionError(f"Failed to reconnect: {str(e)}")

return await func(self, *args)
except asyncpg.SyntaxOrAccessError as exc:
raise OperationalError(exc)
except asyncpg.IntegrityConstraintViolationError as exc:
raise IntegrityError(exc)
except asyncpg.InvalidTransactionStateError as exc:
raise TransactionManagementError(exc)

return translate_exceptions_
return retry_connection_


class AsyncpgDBClient(BaseDBAsyncClient):
Expand Down Expand Up @@ -151,35 +143,37 @@ def acquire_connection(self) -> Union["PoolConnectionWrapper", "ConnectionWrappe
def _in_transaction(self) -> "TransactionContext":
return TransactionContextPooled(TransactionWrapper(self))

@translate_exceptions
@retry_connection
async def execute_insert(self, query: str, values: list) -> Optional[asyncpg.Record]:
async with self.acquire_connection() as connection:
self.log.debug("%s: %s", query, values)
# TODO: Cache prepared statement
stmt = await connection.prepare(query)
return await stmt.fetchrow(*values)
return await connection.fetchrow(query, *values)

@translate_exceptions
@retry_connection
async def execute_many(self, query: str, values: list) -> None:
async with self.acquire_connection() as connection:
self.log.debug("%s: %s", query, values)
# TODO: Consider using copy_records_to_table instead
await connection.executemany(query, values)
transaction = connection.transaction()
await transaction.start()
try:
await connection.executemany(query, values)
except Exception:
await transaction.rollback()
raise
else:
await transaction.commit()

@translate_exceptions
@retry_connection
async def execute_query(self, query: str, values: Optional[list] = None) -> List[dict]:
async with self.acquire_connection() as connection:
self.log.debug("%s: %s", query, values)
if values:
# TODO: Cache prepared statement
stmt = await connection.prepare(query)
return await stmt.fetch(*values)
return await connection.fetch(query, *values)
return await connection.fetch(query)

@translate_exceptions
@retry_connection
async def execute_script(self, query: str) -> None:
async with self.acquire_connection() as connection:
Expand All @@ -206,6 +200,13 @@ async def create_connection(self, with_db: bool) -> None:
def acquire_connection(self) -> "ConnectionWrapper":
return ConnectionWrapper(self._connection, self._lock)

@retry_connection
async def execute_many(self, query: str, values: list) -> None:
async with self.acquire_connection() as connection:
self.log.debug("%s: %s", query, values)
# TODO: Consider using copy_records_to_table instead
await connection.executemany(query, values)

@retry_connection
async def start(self) -> None:
self.transaction = self._connection.transaction()
Expand Down
4 changes: 2 additions & 2 deletions tortoise/backends/asyncpg/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import List, Optional

import asyncpg
from pypika import Parameter, Table
from pypika import Parameter

from tortoise import Model
from tortoise.backends.base.executor import BaseExecutor
Expand All @@ -17,7 +17,7 @@ def Parameter(self, pos: int) -> Parameter:

def _prepare_insert_statement(self, columns: List[str]) -> str:
query = (
self.db.query_class.into(Table(self.model._meta.table))
self.db.query_class.into(self.model._meta.basetable)
.columns(*columns)
.insert(*[self.Parameter(i) for i in range(len(columns))])
)
Expand Down
8 changes: 4 additions & 4 deletions tortoise/backends/base/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __init__(
else:
self.column_map[column] = field_object.to_db_value

table = Table(self.model._meta.table)
table = self.model._meta.basetable
self.delete_query = str(
self.model._meta.basequery.where(
table[self.model._meta.db_pk_field] == self.Parameter(0)
Expand Down Expand Up @@ -110,7 +110,7 @@ def _prepare_insert_statement(self, columns: List[str]) -> str:
# Each db has it's own methods for it, so each implementation should
# go to descendant executors
return str(
self.db.query_class.into(Table(self.model._meta.table))
self.db.query_class.into(self.model._meta.basetable)
.columns(*columns)
.insert(*[self.Parameter(i) for i in range(len(columns))])
)
Expand Down Expand Up @@ -148,7 +148,7 @@ def get_update_sql(self, update_fields: Optional[List[str]]) -> str:
if key in self.update_cache:
return self.update_cache[key]

table = Table(self.model._meta.table)
table = self.model._meta.basetable
query = self.db.query_class.update(table)
count = 0
for field in update_fields or self.model._meta.fields_db_projection.keys():
Expand Down Expand Up @@ -224,7 +224,7 @@ async def _prefetch_m2m_relation(self, instance_list: list, field: str, related_
.where(through_table[field_object.backward_key].isin(instance_id_set))
)

related_query_table = Table(related_query.model._meta.table)
related_query_table = related_query.model._meta.basetable
related_pk_field = related_query.model._meta.db_pk_field
query = (
related_query.query.join(subquery)
Expand Down
85 changes: 46 additions & 39 deletions tortoise/backends/mysql/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,35 +31,27 @@ def retry_connection(func):
@wraps(func)
async def retry_connection_(self, *args):
try:
return await func(self, *args)
except (
RuntimeError,
pymysql.err.OperationalError,
pymysql.err.InternalError,
pymysql.err.InterfaceError,
):
# Here we assume that a connection error has happened
# Re-create connection and re-try the function call once only.
if getattr(self, "_finalized", None) is False:
raise TransactionManagementError("Connection gone away during transaction")
logging.info("Attempting reconnect")
try:
async with self.acquire_connection() as connection:
await connection.ping()
logging.info("Reconnected")
except Exception as e:
raise DBConnectionError("Failed to reconnect: %s", str(e))

return await func(self, *args)

return retry_connection_


def translate_exceptions(func):
@wraps(func)
async def translate_exceptions_(self, *args):
try:
return await func(self, *args)
return await func(self, *args)
except (
RuntimeError,
pymysql.err.OperationalError,
pymysql.err.InternalError,
pymysql.err.InterfaceError,
):
# Here we assume that a connection error has happened
# Re-create connection and re-try the function call once only.
if getattr(self, "_finalized", None) is False:
raise TransactionManagementError("Connection gone away during transaction")
logging.info("Attempting reconnect")
try:
async with self.acquire_connection() as connection:
await connection.ping()
logging.info("Reconnected")
except Exception as e:
raise DBConnectionError("Failed to reconnect: %s", str(e))

return await func(self, *args)
except (
pymysql.err.OperationalError,
pymysql.err.ProgrammingError,
Expand All @@ -71,7 +63,7 @@ async def translate_exceptions_(self, *args):
except pymysql.err.IntegrityError as exc:
raise IntegrityError(exc)

return translate_exceptions_
return retry_connection_


class MySQLClient(BaseDBAsyncClient):
Expand Down Expand Up @@ -163,7 +155,6 @@ def acquire_connection(self) -> Union["ConnectionWrapper", "PoolConnectionWrappe
def _in_transaction(self) -> "TransactionContext":
return TransactionContextPooled(TransactionWrapper(self))

@translate_exceptions
@retry_connection
async def execute_insert(self, query: str, values: list) -> int:
async with self.acquire_connection() as connection:
Expand All @@ -172,26 +163,35 @@ async def execute_insert(self, query: str, values: list) -> int:
await cursor.execute(query, values)
return cursor.lastrowid # return auto-generated id

@translate_exceptions
@retry_connection
async def execute_many(self, query: str, values: list) -> None:
async with self.acquire_connection() as connection:
self.log.debug("%s: %s", query, values)
async with connection.cursor() as cursor:
await cursor.executemany(query, values)
if self.capabilities.supports_transactions:
await connection.begin()
try:
await cursor.executemany(query, values)
except Exception:
await connection.rollback()
raise
else:
await connection.commit()
else:
await cursor.executemany(query, values)

@translate_exceptions
@retry_connection
async def execute_query(
self, query: str, values: Optional[list] = None
) -> List[aiomysql.DictCursor]:
async def execute_query(self, query: str, values: Optional[list] = None) -> List[dict]:
async with self.acquire_connection() as connection:
self.log.debug("%s: %s", query, values)
async with connection.cursor(aiomysql.DictCursor) as cursor:
async with connection.cursor() as cursor:
await cursor.execute(query, values)
return await cursor.fetchall()
rows = await cursor.fetchall()
if rows:
fields = [f.name for f in cursor._result.fields]
return [dict(zip(fields, row)) for row in rows]
return []

@translate_exceptions
@retry_connection
async def execute_script(self, query: str) -> None:
async with self.acquire_connection() as connection:
Expand Down Expand Up @@ -219,6 +219,13 @@ async def create_connection(self, with_db: bool) -> None:
def acquire_connection(self) -> ConnectionWrapper:
return ConnectionWrapper(self._connection, self._lock)

@retry_connection
async def execute_many(self, query: str, values: list) -> None:
async with self.acquire_connection() as connection:
self.log.debug("%s: %s", query, values)
async with connection.cursor() as cursor:
await cursor.executemany(query, values)

@retry_connection
async def start(self) -> None:
await self._connection.begin()
Expand Down
Loading

0 comments on commit 1d53abe

Please sign in to comment.