Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Many small performance tweaks #241

Merged
merged 6 commits into from
Nov 20, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ New features:
- Enabled by default for databases that support it (mysql and postgres) with a minimum pool size of 1, and a maximum of 5
- Not supported by sqlite
- Can be changed by passing the ``minsize`` and ``maxsize`` connection parameters
- Many small performance tweaks reducing filter query generation overhead by about ~5%
- Bulk inserts are ensured to be wrapped in a transaction for >50% speedup
- PostgreSQL prepared queries now use a LRU cache for significant >2x speedup on inserts/updates/deletes
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a significant speedup over mysql?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to do a similar thing with MySQL, but the PyMySQL driver doesn't actually support parametrized queries natively, as it re-escapes everything and submits a raw SQL for each query...

At least it encodes everything as binary, so should be injection immune, but it just seems non-ideal.

Currently PyMySQL is the only driver supporting asyncio via aiomysql.
Right now I'm going to keep this here, and after we do version 1.0, look at seeing if we can speed up the driver or do an asyncio wrapper around mysqlclient which is apparently notably faster.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that "should be injection imune" doesn't sound very assuring, i'll let loose some injection patters on it later just in case to make sure

it's already pretty fast but any speedups to mysql would be much apreciated as that's what i personally use the most :P let me know if i can help with that bit

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be appreciated.


Deprecations:
^^^^^^^^^^^^^
Expand Down
4 changes: 2 additions & 2 deletions tests/test_two_databases.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async def test_two_databases(self):
await self.db.execute_query("SELECT * FROM eventtwo")

results = await self.second_db.execute_query("SELECT * FROM eventtwo")
self.assertEqual(dict(results[0].items()), {"id": 1, "name": "Event", "tournament_id": 1})
self.assertEqual(dict(results[0]), {"id": 1, "name": "Event", "tournament_id": 1})

async def test_two_databases_relation(self):
tournament = await Tournament.create(name="Tournament")
Expand All @@ -41,7 +41,7 @@ async def test_two_databases_relation(self):
await self.db.execute_query("SELECT * FROM eventtwo")

results = await self.second_db.execute_query("SELECT * FROM eventtwo")
self.assertEqual(dict(results[0].items()), {"id": 1, "name": "Event", "tournament_id": 1})
self.assertEqual(dict(results[0]), {"id": 1, "name": "Event", "tournament_id": 1})

teams = []
for i in range(2):
Expand Down
3 changes: 3 additions & 0 deletions tortoise/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from inspect import isclass
from typing import Any, Coroutine, Dict, List, Optional, Tuple, Type, Union, cast

from pypika import Table

from tortoise import fields
from tortoise.backends.base.client import BaseDBAsyncClient
from tortoise.backends.base.config_generator import expand_db_url, generate_config
Expand Down Expand Up @@ -466,6 +468,7 @@ def _build_initial_querysets(cls) -> None:
for app in cls.apps.values():
for model in app.values():
model._meta.finalise_model()
model._meta.basetable = Table(model._meta.table)
model._meta.basequery = model._meta.db.query_class.from_(model._meta.table)
model._meta.basequery_all_fields = model._meta.basequery.select(
*model._meta.db_fields
Expand Down
77 changes: 39 additions & 38 deletions tortoise/backends/asyncpg/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,43 +31,35 @@ def retry_connection(func):
@wraps(func)
async def retry_connection_(self, *args):
try:
return await func(self, *args)
except (
asyncpg.PostgresConnectionError,
asyncpg.ConnectionDoesNotExistError,
asyncpg.ConnectionFailureError,
asyncpg.InterfaceError,
):
# Here we assume that a connection error has happened
# Re-create connection and re-try the function call once only.
if getattr(self, "transaction", None):
self._finalized = True
raise TransactionManagementError("Connection gone away during transaction")
logging.info("Attempting reconnect")
try:
async with self.acquire_connection():
logging.info("Reconnected")
except Exception as e:
raise DBConnectionError(f"Failed to reconnect: {str(e)}")

return await func(self, *args)

return retry_connection_


def translate_exceptions(func):
@wraps(func)
async def translate_exceptions_(self, *args):
try:
return await func(self, *args)
return await func(self, *args)
except (
asyncpg.PostgresConnectionError,
asyncpg.ConnectionDoesNotExistError,
asyncpg.ConnectionFailureError,
asyncpg.InterfaceError,
):
# Here we assume that a connection error has happened
# Re-create connection and re-try the function call once only.
if getattr(self, "transaction", None):
self._finalized = True
raise TransactionManagementError("Connection gone away during transaction")
logging.info("Attempting reconnect")
try:
async with self.acquire_connection():
logging.info("Reconnected")
except Exception as e:
raise DBConnectionError(f"Failed to reconnect: {str(e)}")

return await func(self, *args)
except asyncpg.SyntaxOrAccessError as exc:
raise OperationalError(exc)
except asyncpg.IntegrityConstraintViolationError as exc:
raise IntegrityError(exc)
except asyncpg.InvalidTransactionStateError as exc:
raise TransactionManagementError(exc)

return translate_exceptions_
return retry_connection_


class AsyncpgDBClient(BaseDBAsyncClient):
Expand Down Expand Up @@ -151,35 +143,37 @@ def acquire_connection(self) -> Union["PoolConnectionWrapper", "ConnectionWrappe
def _in_transaction(self) -> "TransactionContext":
return TransactionContextPooled(TransactionWrapper(self))

@translate_exceptions
@retry_connection
async def execute_insert(self, query: str, values: list) -> Optional[asyncpg.Record]:
async with self.acquire_connection() as connection:
self.log.debug("%s: %s", query, values)
# TODO: Cache prepared statement
stmt = await connection.prepare(query)
return await stmt.fetchrow(*values)
return await connection.fetchrow(query, *values)

@translate_exceptions
@retry_connection
async def execute_many(self, query: str, values: list) -> None:
async with self.acquire_connection() as connection:
self.log.debug("%s: %s", query, values)
# TODO: Consider using copy_records_to_table instead
await connection.executemany(query, values)
transaction = connection.transaction()
await transaction.start()
try:
await connection.executemany(query, values)
except Exception:
await transaction.rollback()
raise
else:
await transaction.commit()

@translate_exceptions
@retry_connection
async def execute_query(self, query: str, values: Optional[list] = None) -> List[dict]:
async with self.acquire_connection() as connection:
self.log.debug("%s: %s", query, values)
if values:
# TODO: Cache prepared statement
stmt = await connection.prepare(query)
return await stmt.fetch(*values)
return await connection.fetch(query, *values)
return await connection.fetch(query)

@translate_exceptions
@retry_connection
async def execute_script(self, query: str) -> None:
async with self.acquire_connection() as connection:
Expand All @@ -206,6 +200,13 @@ async def create_connection(self, with_db: bool) -> None:
def acquire_connection(self) -> "ConnectionWrapper":
return ConnectionWrapper(self._connection, self._lock)

@retry_connection
async def execute_many(self, query: str, values: list) -> None:
async with self.acquire_connection() as connection:
self.log.debug("%s: %s", query, values)
# TODO: Consider using copy_records_to_table instead
await connection.executemany(query, values)

@retry_connection
async def start(self) -> None:
self.transaction = self._connection.transaction()
Expand Down
4 changes: 2 additions & 2 deletions tortoise/backends/asyncpg/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import List, Optional

import asyncpg
from pypika import Parameter, Table
from pypika import Parameter

from tortoise import Model
from tortoise.backends.base.executor import BaseExecutor
Expand All @@ -17,7 +17,7 @@ def Parameter(self, pos: int) -> Parameter:

def _prepare_insert_statement(self, columns: List[str]) -> str:
query = (
self.db.query_class.into(Table(self.model._meta.table))
self.db.query_class.into(self.model._meta.basetable)
.columns(*columns)
.insert(*[self.Parameter(i) for i in range(len(columns))])
)
Expand Down
8 changes: 4 additions & 4 deletions tortoise/backends/base/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __init__(
else:
self.column_map[column] = field_object.to_db_value

table = Table(self.model._meta.table)
table = self.model._meta.basetable
self.delete_query = str(
self.model._meta.basequery.where(
table[self.model._meta.db_pk_field] == self.Parameter(0)
Expand Down Expand Up @@ -110,7 +110,7 @@ def _prepare_insert_statement(self, columns: List[str]) -> str:
# Each db has it's own methods for it, so each implementation should
# go to descendant executors
return str(
self.db.query_class.into(Table(self.model._meta.table))
self.db.query_class.into(self.model._meta.basetable)
.columns(*columns)
.insert(*[self.Parameter(i) for i in range(len(columns))])
)
Expand Down Expand Up @@ -148,7 +148,7 @@ def get_update_sql(self, update_fields: Optional[List[str]]) -> str:
if key in self.update_cache:
return self.update_cache[key]

table = Table(self.model._meta.table)
table = self.model._meta.basetable
query = self.db.query_class.update(table)
count = 0
for field in update_fields or self.model._meta.fields_db_projection.keys():
Expand Down Expand Up @@ -224,7 +224,7 @@ async def _prefetch_m2m_relation(self, instance_list: list, field: str, related_
.where(through_table[field_object.backward_key].isin(instance_id_set))
)

related_query_table = Table(related_query.model._meta.table)
related_query_table = related_query.model._meta.basetable
related_pk_field = related_query.model._meta.db_pk_field
query = (
related_query.query.join(subquery)
Expand Down
85 changes: 46 additions & 39 deletions tortoise/backends/mysql/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,35 +31,27 @@ def retry_connection(func):
@wraps(func)
async def retry_connection_(self, *args):
try:
return await func(self, *args)
except (
RuntimeError,
pymysql.err.OperationalError,
pymysql.err.InternalError,
pymysql.err.InterfaceError,
):
# Here we assume that a connection error has happened
# Re-create connection and re-try the function call once only.
if getattr(self, "_finalized", None) is False:
raise TransactionManagementError("Connection gone away during transaction")
logging.info("Attempting reconnect")
try:
async with self.acquire_connection() as connection:
await connection.ping()
logging.info("Reconnected")
except Exception as e:
raise DBConnectionError("Failed to reconnect: %s", str(e))

return await func(self, *args)

return retry_connection_


def translate_exceptions(func):
@wraps(func)
async def translate_exceptions_(self, *args):
try:
return await func(self, *args)
return await func(self, *args)
except (
RuntimeError,
pymysql.err.OperationalError,
pymysql.err.InternalError,
pymysql.err.InterfaceError,
):
# Here we assume that a connection error has happened
# Re-create connection and re-try the function call once only.
if getattr(self, "_finalized", None) is False:
raise TransactionManagementError("Connection gone away during transaction")
logging.info("Attempting reconnect")
try:
async with self.acquire_connection() as connection:
await connection.ping()
logging.info("Reconnected")
except Exception as e:
raise DBConnectionError("Failed to reconnect: %s", str(e))

return await func(self, *args)
except (
pymysql.err.OperationalError,
pymysql.err.ProgrammingError,
Expand All @@ -71,7 +63,7 @@ async def translate_exceptions_(self, *args):
except pymysql.err.IntegrityError as exc:
raise IntegrityError(exc)

return translate_exceptions_
return retry_connection_


class MySQLClient(BaseDBAsyncClient):
Expand Down Expand Up @@ -163,7 +155,6 @@ def acquire_connection(self) -> Union["ConnectionWrapper", "PoolConnectionWrappe
def _in_transaction(self) -> "TransactionContext":
return TransactionContextPooled(TransactionWrapper(self))

@translate_exceptions
@retry_connection
async def execute_insert(self, query: str, values: list) -> int:
async with self.acquire_connection() as connection:
Expand All @@ -172,26 +163,35 @@ async def execute_insert(self, query: str, values: list) -> int:
await cursor.execute(query, values)
return cursor.lastrowid # return auto-generated id

@translate_exceptions
@retry_connection
async def execute_many(self, query: str, values: list) -> None:
async with self.acquire_connection() as connection:
self.log.debug("%s: %s", query, values)
async with connection.cursor() as cursor:
await cursor.executemany(query, values)
if self.capabilities.supports_transactions:
await connection.begin()
try:
await cursor.executemany(query, values)
except Exception:
await connection.rollback()
raise
else:
await connection.commit()
else:
await cursor.executemany(query, values)

@translate_exceptions
@retry_connection
async def execute_query(
self, query: str, values: Optional[list] = None
) -> List[aiomysql.DictCursor]:
async def execute_query(self, query: str, values: Optional[list] = None) -> List[dict]:
async with self.acquire_connection() as connection:
self.log.debug("%s: %s", query, values)
async with connection.cursor(aiomysql.DictCursor) as cursor:
async with connection.cursor() as cursor:
await cursor.execute(query, values)
return await cursor.fetchall()
rows = await cursor.fetchall()
if rows:
fields = [f.name for f in cursor._result.fields]
return [dict(zip(fields, row)) for row in rows]
return []

@translate_exceptions
@retry_connection
async def execute_script(self, query: str) -> None:
async with self.acquire_connection() as connection:
Expand Down Expand Up @@ -219,6 +219,13 @@ async def create_connection(self, with_db: bool) -> None:
def acquire_connection(self) -> ConnectionWrapper:
return ConnectionWrapper(self._connection, self._lock)

@retry_connection
async def execute_many(self, query: str, values: list) -> None:
async with self.acquire_connection() as connection:
self.log.debug("%s: %s", query, values)
async with connection.cursor() as cursor:
await cursor.executemany(query, values)

@retry_connection
async def start(self) -> None:
await self._connection.begin()
Expand Down
21 changes: 17 additions & 4 deletions tortoise/backends/sqlite/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,21 @@ async def execute_insert(self, query: str, values: list) -> int:
async def execute_many(self, query: str, values: List[list]) -> None:
async with self.acquire_connection() as connection:
self.log.debug("%s: %s", query, values)
# TODO: Ensure that this is wrapped by a transaction, will provide a big speedup
await connection.executemany(query, values)
# This code is only ever called in AUTOCOMMIT mode
await connection.execute("BEGIN")
try:
await connection.executemany(query, values)
except Exception:
await connection.rollback()
raise
else:
await connection.commit()

@translate_exceptions
async def execute_query(self, query: str, values: Optional[list] = None) -> List[dict]:
async with self.acquire_connection() as connection:
self.log.debug("%s: %s", query, values)
res = [dict(row) for row in await connection.execute_fetchall(query, values)]
return res
return await connection.execute_fetchall(query, values)

@translate_exceptions
async def execute_script(self, query: str) -> None:
Expand All @@ -131,6 +137,13 @@ def __init__(self, connection: SqliteClient) -> None:
def _in_transaction(self) -> "TransactionContext":
return NestedTransactionContext(self)

@translate_exceptions
async def execute_many(self, query: str, values: List[list]) -> None:
async with self.acquire_connection() as connection:
self.log.debug("%s: %s", query, values)
# Already within transaction, so ideal for performance
await connection.executemany(query, values)

async def start(self) -> None:
try:
await self._connection.commit()
Expand Down
Loading