Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add cancel and status queries to server-side async execution #192

Merged
merged 93 commits into from
Aug 24, 2022
Merged
Show file tree
Hide file tree
Changes from 87 commits
Commits
Show all changes
93 commits
Select commit Hold shift + click to select a range
cfd856b
Added async_execution to async_db/cursor and db/cursor. Added an erro…
ericf-firebolt Jul 8, 2022
321b021
Removed ignore C901 from flake8 settings in setup.cfg.
ericf-firebolt Jul 8, 2022
370e5cf
Merge branch 'main' into async_queries
ericf-firebolt Jul 8, 2022
6ad3be4
Fixed a couple of missing arguments in async_db/cursor.py on execute …
ericf-firebolt Jul 8, 2022
1bea472
Merge branch 'async_queries' of https://github.com/firebolt-db/firebo…
ericf-firebolt Jul 8, 2022
bf7f1f1
mypy and black cleanup.
ericf-firebolt Jul 8, 2022
4196204
Removed set_parameters argument from all(?) functions. Started adding…
ericf-firebolt Jul 15, 2022
7ff71a7
Added a bunch of callbacks to cursor tests.
ericf-firebolt Jul 18, 2022
0bace5b
Merge branch 'main' into async_queries
ericf-firebolt Jul 19, 2022
c69f585
Pulled out a couple more set_parameters variables from function signa…
ericf-firebolt Jul 20, 2022
15df8cc
Added, and commented out, server_side_async_url to unit/conftests.py.
ericf-firebolt Jul 20, 2022
bec0804
Removed test_set_parameters() from tests/async/cursor.py.
ericf-firebolt Jul 20, 2022
37f2373
Removed test_set_parameters() from tests/async/cursor.py.
ericf-firebolt Jul 20, 2022
cb04815
Added ability to see which exectute is failing (execute or executeman…
ericf-firebolt Jul 20, 2022
fc7d116
Added more explicit error messages to async and sync test_cursor.py m…
ericf-firebolt Jul 20, 2022
ec0fd55
Added some periods.
ericf-firebolt Jul 20, 2022
f7b9d09
Replaced cursor reset call in async _do_execute().
ericf-firebolt Jul 20, 2022
d2ae886
Updated query/message tuple decomposition in test_cursor to be more h…
ericf-firebolt Jul 21, 2022
f77ba35
Fixed a typo and function signature for db/test_cursor_server_side_as…
ericf-firebolt Jul 21, 2022
8cae1d2
Removed second hard-coding of query_id in server-side async id callback.
ericf-firebolt Jul 21, 2022
95f5e4d
Needed to add an await to an _api_request() call.
ericf-firebolt Jul 21, 2022
9d1f1ff
Used InternalError to error out on no response to async server-side q…
ericf-firebolt Jul 21, 2022
442c420
Added additional checks on rowcount and description in test_cursor_se…
ericf-firebolt Jul 21, 2022
e76a545
Added QueryResponse class.
ericf-firebolt Jul 22, 2022
224f7c4
Minor changes requested on PR.
ericf-firebolt Jul 22, 2022
9d20bba
Added an OperationalError is asynchronous query response is missing q…
ericf-firebolt Jul 22, 2022
f403673
Had a typo.
ericf-firebolt Jul 22, 2022
16ed40d
Added a warning if asyc_execution is set via a SET parameter rather t…
ericf-firebolt Jul 22, 2022
b58f881
Started adding test_cursor_async_execute_error().
ericf-firebolt Jul 25, 2022
8753a99
Updated query_url argument in test_cursor_async_execute_error().
ericf-firebolt Jul 25, 2022
3d1c5b2
Added AsyncExecutionUnavailableError on server-side async query execu…
ericf-firebolt Jul 26, 2022
1abaaa4
Seem to have dealt with auth issues in test_cursor_async_execute_erro…
ericf-firebolt Jul 27, 2022
9d4ffe8
Added all necessary set params to url string in test_cursor_async_exe…
ericf-firebolt Jul 27, 2022
c9e8947
Cleaned up string input in test_cursor_async_execute_error().
ericf-firebolt Jul 27, 2022
2ba105f
Now no token error.
ericf-firebolt Jul 27, 2022
52bd72c
Multi-statement queries now error out correctly.
ericf-firebolt Jul 28, 2022
22e4540
Reworked a string to try to get commit/push to work.
ericf-firebolt Jul 28, 2022
3320495
Had to add an extra auth callback to get all cursor.execute() calls t…
ericf-firebolt Jul 28, 2022
aef7dff
Removed some parameters from various fns in unit/async_db/test_cursor…
ericf-firebolt Jul 28, 2022
65b61da
Added error check for missing query_id on async_execution. A little b…
ericf-firebolt Jul 28, 2022
3a4655d
Merge branch 'main' into async_queries
ericf-firebolt Jul 28, 2022
ba6fd52
Finished merge by hand.
ericf-firebolt Jul 29, 2022
e9c70f8
Merge branch 'main' into async_queries
ericf-firebolt Jul 29, 2022
7671506
Merge branch 'async_queries' into async_queries_add_cancel_status
ericf-firebolt Jul 29, 2022
c3a02af
Fixed error for empty response.json on asynch execution. Also changed…
ericf-firebolt Jul 29, 2022
2bc20db
Merge branch 'main' into async_queries
ericf-firebolt Jul 29, 2022
d482caf
Fixed error for empty response.json on asynch execution. Also changed…
ericf-firebolt Jul 29, 2022
04a9e2f
Fixed error for empty response.json on asynch execution. Also changed…
ericf-firebolt Jul 29, 2022
cc749a3
Fixed failed auto merge.
ericf-firebolt Jul 29, 2022
3650755
Added a test to check that an server-side asynchronous execution retu…
ericf-firebolt Jul 29, 2022
3a33790
Added an integration test to check that an server-side asynchronous e…
ericf-firebolt Jul 29, 2022
e373cf0
Finished merge by hand.
ericf-firebolt Jul 29, 2022
18d8fbe
Added cancel() to async/cursor.py. Also fixed an error where I was ge…
ericf-firebolt Aug 1, 2022
36d0bbe
Forgot that I'd commented out most of test_cursor.py.
ericf-firebolt Aug 1, 2022
50b89f9
Trying to get rid of coroutine 'BaseCursor.execute' was never awaited…
ericf-firebolt Aug 2, 2022
66604c0
Added unit tests for cancel and cancel errors.
ericf-firebolt Aug 2, 2022
bdb1063
Fixed a mistake that would have failed the cancel() integration test.
ericf-firebolt Aug 2, 2022
4888d3f
Completed auto-merge fail.
ericf-firebolt Aug 2, 2022
a37d250
Fixed several imports that had disappeared (maybe during a merge?). A…
ericf-firebolt Aug 2, 2022
6939e28
get_status() and two unit tests are added. Integration test is failin…
ericf-firebolt Aug 2, 2022
634541d
Added a new QueryStatus, NOT_AVAILABLE, because checking status will …
ericf-firebolt Aug 3, 2022
3278e94
Added a comment.
ericf-firebolt Aug 3, 2022
f7dc846
Updated a comment.
ericf-firebolt Aug 3, 2022
4072793
Added stub fn for async execution fetch.
ericf-firebolt Aug 4, 2022
8830361
Keep forgetting to uncomment test code and the pre-commit checks are …
ericf-firebolt Aug 4, 2022
df3709d
Removed some extraneous testing code.
ericf-firebolt Aug 4, 2022
0e2c553
Updated test_ss_async_execution_get_status() after Yoni pointed out t…
ericf-firebolt Aug 5, 2022
7a798be
Had to comment out test_ss_async_execution_get_status(), as it basica…
ericf-firebolt Aug 5, 2022
b428ba9
Added ability to specify output_format in _api_request(), as status r…
ericf-firebolt Aug 9, 2022
3a98c18
First set of requested changes on PR.
ericf-firebolt Aug 9, 2022
e9f2fc4
Removed noqa on _do_execute().
ericf-firebolt Aug 9, 2022
98c3252
Renamed _find_async_problems() to _validate_ss_async_settings(). Remo…
ericf-firebolt Aug 10, 2022
c840433
Moved call to _validate_ss_async_settings() into try.
ericf-firebolt Aug 10, 2022
58807a2
Added asyncio_mode=auto to pytest config in config.cfg, because I was…
ericf-firebolt Aug 10, 2022
99540ed
Changed long query in test_queries_async integration tests. Paused ex…
ericf-firebolt Aug 11, 2022
d8d6d8f
Updated all unit tests that test SET parameters to not have output_fo…
ericf-firebolt Aug 11, 2022
0ab70cc
Changed query_loop() in integration tests/async/test_queries to check…
ericf-firebolt Aug 11, 2022
3704b9a
Merge branch 'main' into async_queries_add_cancel_status
ericf-firebolt Aug 11, 2022
18d7c83
Noticed that test_anyio_backend_import_issue() was commented out in s…
ericf-firebolt Aug 11, 2022
8d4ce91
Added query tests to integration/dbapi/sync/test_queries.py. Changed …
ericf-firebolt Aug 12, 2022
c93bc08
Added query tests to integration/dbapi/sync/test_queries.py. Changed …
ericf-firebolt Aug 12, 2022
ef3b668
Merge branch 'async_queries_add_cancel_status' of https://github.com/…
ericf-firebolt Aug 12, 2022
7ad25c9
Now errors out when use_standard_sql=0 rather than when it equals 1. …
ericf-firebolt Aug 12, 2022
b5efbf4
Changed order of synchronous unit tests to move all server-side async…
ericf-firebolt Aug 15, 2022
143b0bd
Changed order of asynchronous cursor unit tests to move all server-si…
ericf-firebolt Aug 15, 2022
3090dc1
Reordered integration and unit test modules to move all server-side a…
ericf-firebolt Aug 15, 2022
b24a263
Merge branch 'main' into async_queries_add_cancel_status
ericf-firebolt Aug 16, 2022
923d931
Moving JSON_OUTPUT_FORMAT outside of _api_request (#196)
ptiurin Aug 19, 2022
316f50a
Updated docs to include information on server-side async query execut…
ericf-firebolt Aug 21, 2022
ad18500
Merge branch 'main' into async_queries_add_cancel_status
ericf-firebolt Aug 23, 2022
51fb204
Updated external table mention in comments and removed sentence in do…
ericf-firebolt Aug 23, 2022
0e816a7
Made a change to server-side execution explanation for clarity and to…
ericf-firebolt Aug 23, 2022
5398f77
Renamed a function and moved table create and drop out of test_querie…
ericf-firebolt Aug 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 144 additions & 59 deletions src/firebolt/async_db/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,19 @@ class CursorState(Enum):
CLOSED = 4


class QueryStatus(Enum):
"""Enumeration of query responses on server-side async queries."""

RUNNING = 1
ENDED_SUCCESSFULLY = 2
ENDED_UNSUCCESSFULLY = 3
NOT_READY = 4
STARTED_EXECUTION = 5
PARSE_ERROR = 6
CANCELED_EXECUTION = 7
EXECUTION_ERROR = 8


class Statistics(BaseModel):
"""
Class for query execution statistics.
Expand Down Expand Up @@ -273,16 +286,6 @@ def _reset(self) -> None:
self._next_set_idx = 0
self._query_id = ""

def _query_id_from_response_async(self, response: Response) -> str:
if response.headers.get("content-length", "") == "0":
raise OperationalError("No response to asynchronous query.")
query_data = response.json()
if "query_id" not in query_data:
raise OperationalError(
"Invalid response to asynchronous query: missing query_id."
)
return query_data["query_id"]

def _row_set_from_response(
self, response: Response
) -> Tuple[
Expand Down Expand Up @@ -328,27 +331,43 @@ def _append_row_set(

async def _api_request(
self,
query: str,
set_parameters: Optional[dict] = None,
query: Optional[str] = "",
parameters: Optional[dict] = {},
path: Optional[str] = "",
use_set_parameters: Optional[bool] = True,
) -> Response:
"""
Query API, return Response object.

Args:
query (str): SQL query
parameters (Optional[Sequence[ParameterType]]): A sequence of substitution
parameters. Used to replace '?' placeholders inside a query with
actual values. Note: In order to "output_format" dict value, it
must be an empty string. If no value not specified,
JSON_OUTPUT_FORMAT will be used.
path (str): endpoint suffix, for example "cancel" or "status"
"""
if not parameters:
parameters = {"output_format": JSON_OUTPUT_FORMAT}
elif "output_format" not in parameters:
parameters["output_format"] = JSON_OUTPUT_FORMAT
elif parameters["output_format"] == "":
# In the case that output_format should be missing, send in "";
# we remove it here so the API endpoint string won't be malformed.
del parameters["output_format"]
if use_set_parameters:
parameters = {**self._set_parameters, **parameters}
return await self._client.request(
url="/",
url=f"/{path}",
method="POST",
params={
"database": self.connection.database,
"output_format": JSON_OUTPUT_FORMAT,
**self._set_parameters,
**(set_parameters or dict()),
**parameters,
},
content=query,
)

async def _async_execution_api_request(self, query: str) -> Response:
"""Do query request using SET async_execution=1."""
return await self._api_request(
query, {"async_execution": 1, "advanced_mode": 1}
)

async def _validate_set_parameter(self, parameter: SetParameter) -> None:
"""Validate parameter by executing simple query with it."""
if parameter.name == "async_execution":
Expand All @@ -357,7 +376,9 @@ async def _validate_set_parameter(self, parameter: SetParameter) -> None:
"Instead, pass it as an argument to the execute() or "
"executemany() function."
)
resp = await self._api_request("select 1", {parameter.name: parameter.value})
resp = await self._api_request(
"select 1", {parameter.name: parameter.value, "output_format": ""}
)
# Handle invalid set parameter
if resp.status_code == codes.BAD_REQUEST:
raise OperationalError(resp.text)
Expand All @@ -366,6 +387,35 @@ async def _validate_set_parameter(self, parameter: SetParameter) -> None:
# set parameter passed validation
self._set_parameters[parameter.name] = parameter.value

def _validate_ss_async_settings(
ericf-firebolt marked this conversation as resolved.
Show resolved Hide resolved
self,
parameters: Sequence[Sequence[ParameterType]],
queries: List[Union[SetParameter, str]],
skip_parsing: bool = False,
async_execution: Optional[bool] = False,
) -> None:
if async_execution and self._set_parameters.get("use_standard_sql", "1") == "0":
ericf-firebolt marked this conversation as resolved.
Show resolved Hide resolved
raise AsyncExecutionUnavailableError(
"It is not possible to execute queries asynchronously if "
"use_standard_sql=0."
)
if parameters and skip_parsing:
logger.warning(
"Query formatting parameters are provided but skip_parsing "
"is specified. They will be ignored."
)

# Allow users to manually skip parsing for performance improvement.
ericf-firebolt marked this conversation as resolved.
Show resolved Hide resolved
non_set_queries = 0
for query in queries:
if type(query) is not SetParameter:
non_set_queries += 1
if non_set_queries > 1 and async_execution:
raise AsyncExecutionUnavailableError(
"It is not possible to execute multi-statement "
"queries asynchronously."
)

async def _do_execute(
self,
raw_query: str,
Expand All @@ -374,34 +424,14 @@ async def _do_execute(
async_execution: Optional[bool] = False,
) -> None:
self._reset()
queries: List[Union[SetParameter, str]] = (
[raw_query] if skip_parsing else split_format_sql(raw_query, parameters)
)
try:
if (
async_execution
and self._set_parameters.get("use_standard_sql", "0") == "1"
):
raise AsyncExecutionUnavailableError(
"It is not possible to execute queries asynchronously if "
"use_standard_sql is in use."
)
if parameters and skip_parsing:
logger.warning(
"Query formatting parameters are provided with skip_parsing."
" They will be ignored."
)

# Allow users to manually skip parsing for performance improvement
queries: List[Union[SetParameter, str]] = (
[raw_query] if skip_parsing else split_format_sql(raw_query, parameters)
)
if len(queries) > 1 and async_execution:
raise AsyncExecutionUnavailableError(
"It is not possible to execute multi-statement "
"queries asynchronously."
)
for query in queries:

start_time = time.time()
# our CREATE EXTERNAL TABLE queries currently require credentials,
# Our CREATE EXTERNAL TABLE queries currently require credentials,
# so we will skip logging those queries.
# https://docs.firebolt.io/sql-reference/commands/ddl-commands#create-external-table
ericf-firebolt marked this conversation as resolved.
Show resolved Hide resolved
if isinstance(query, SetParameter) or not re.search(
Expand All @@ -419,9 +449,24 @@ async def _do_execute(
if isinstance(query, SetParameter):
await self._validate_set_parameter(query)
elif async_execution:
resp = await self._async_execution_api_request(query)
await self._raise_if_error(resp)
self._query_id = self._query_id_from_response_async(resp)
self._validate_ss_async_settings(
parameters,
queries,
skip_parsing,
async_execution,
)
response = await self._api_request(
query, {"async_execution": 1, "advanced_mode": 1}
)
await self._raise_if_error(response)
if response.headers.get("content-length", "") == "0":
ericf-firebolt marked this conversation as resolved.
Show resolved Hide resolved
raise OperationalError("No response to asynchronous query.")
resp = response.json()
if "query_id" not in resp or resp["query_id"] == "":
raise OperationalError(
"Invalid response to asynchronous query: missing query_id."
)
self._query_id = resp["query_id"]
else:
resp = await self._api_request(query)
await self._raise_if_error(resp)
Expand All @@ -431,7 +476,7 @@ async def _do_execute(

logger.info(
f"Query fetched {self.rowcount} rows in"
f" {time.time() - start_time} seconds"
f" {time.time() - start_time} seconds."
)

self._state = CursorState.DONE
Expand Down Expand Up @@ -584,6 +629,44 @@ def setinputsizes(self, sizes: List[int]) -> None:
def setoutputsize(self, size: int, column: Optional[int] = None) -> None:
"""Set a column buffer size for fetches of large columns (does nothing)."""

@check_not_closed
async def cancel(self, query_id: str) -> None:
"""Cancel a server-side async query."""
await self._api_request(
parameters={"query_id": query_id, "output_format": ""},
path="cancel",
use_set_parameters=False,
)

@check_not_closed
async def get_status(self, query_id: str) -> QueryStatus:
"""Get status of a server-side async query. Return the state of the query."""
try:
resp = await self._api_request(
# output_format must be empty for status to work correctly.
# And set parameters will cause 400 errors.
parameters={"query_id": query_id, "output_format": ""},
ericf-firebolt marked this conversation as resolved.
Show resolved Hide resolved
path="status",
use_set_parameters=False,
)
if resp.status_code == codes.BAD_REQUEST:
raise OperationalError(
f"Asynchronous query {query_id} status check failed: "
f"{resp.status_code}."
)
resp_json = resp.json()
if "status" not in resp_json:
raise OperationalError(
f"Invalid response to asynchronous query: missing status."
)
except Exception:
self._state = CursorState.ERROR
raise
# Remember that query_id might be empty.
if resp_json["status"] == "":
return QueryStatus.NOT_READY
return QueryStatus[resp_json["status"]]

# Context manager support
@check_not_closed
def __enter__(self) -> BaseCursor:
Expand Down Expand Up @@ -636,31 +719,33 @@ async def executemany(
parameters_seq: Sequence[Sequence[ParameterType]],
async_execution: Optional[bool] = False,
) -> int:
async with self._async_query_lock.writer:
return await super().executemany(query, parameters_seq, async_execution)
"""
Prepare and execute a database query against all parameter
sequences provided.
Prepare and execute a database query against all parameter
sequences provided.
"""
async with self._async_query_lock.writer:
return await super().executemany(query, parameters_seq, async_execution)

@wraps(BaseCursor.fetchone)
async def fetchone(self) -> Optional[List[ColType]]:
async with self._async_query_lock.reader:
"""Fetch the next row of a query result set."""
return super().fetchone()
"""Fetch the next row of a query result set."""

@wraps(BaseCursor.fetchmany)
async def fetchmany(self, size: Optional[int] = None) -> List[List[ColType]]:
async with self._async_query_lock.reader:
"""
Fetch the next set of rows of a query result;
size is cursor.arraysize by default.
"""
return super().fetchmany(size)
"""Fetch the next set of rows of a query result;
size is cursor.arraysize by default."""

@wraps(BaseCursor.fetchall)
async def fetchall(self) -> List[List[ColType]]:
async with self._async_query_lock.reader:
"""Fetch all remaining rows of a query result."""
return super().fetchall()
"""Fetch all remaining rows of a query result."""

@wraps(BaseCursor.nextset)
async def nextset(self) -> None:
Expand Down
11 changes: 11 additions & 0 deletions src/firebolt/db/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from firebolt.async_db.cursor import BaseCursor as AsyncBaseCursor
from firebolt.async_db.cursor import (
ParameterType,
QueryStatus,
check_not_closed,
check_query_executed,
)
Expand Down Expand Up @@ -101,3 +102,13 @@ def __iter__(self) -> Generator[List[ColType], None, None]:
if row is None:
return
yield row

@wraps(AsyncBaseCursor.get_status)
def get_status(self, query_id: str) -> QueryStatus:
with self._query_lock.gen_rlock():
return async_to_sync(super().get_status, self._async_job_thread)(query_id)

@wraps(AsyncBaseCursor.cancel)
def cancel(self, query_id: str) -> None:
with self._query_lock.gen_rlock():
return async_to_sync(super().cancel, self._async_job_thread)(query_id)
Loading