Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(tiger): Raise a different timeout exception from tiger clusters #2680

Merged
merged 4 commits into from
May 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions snuba/state/cache/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ class ExecutionTimeoutError(ExecutionError):
pass


class TigerExecutionTimeoutError(ExecutionError):
pass


class Cache(Generic[TValue], ABC):
@abstractmethod
def get(self, key: str) -> Optional[TValue]:
Expand Down
6 changes: 4 additions & 2 deletions snuba/state/cache/redis/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
import uuid
from concurrent.futures import ThreadPoolExecutor
from typing import Callable, Optional
from typing import Callable, Optional, Type

from pkg_resources import resource_string

Expand Down Expand Up @@ -34,11 +34,13 @@ def __init__(
prefix: str,
codec: ExceptionAwareCodec[bytes, TValue],
executor: ThreadPoolExecutor,
timeout_exception: Type[Exception] = ExecutionTimeoutError,
) -> None:
self.__client = client
self.__prefix = prefix
self.__codec = codec
self.__executor = executor
self.__timeout_exception = timeout_exception

# TODO: This should probably be lazily instantiated, rather than
# automatically happening at startup.
Expand Down Expand Up @@ -257,7 +259,7 @@ def build_notify_queue_key(task_ident: str) -> str:
# If the effective timeout was the remaining task timeout,
# this means that the client responsible for generating the
# cache value didn't do so before it promised to.
raise ExecutionTimeoutError(
raise self.__timeout_exception(
"result not available before execution deadline"
)
else:
Expand Down
22 changes: 19 additions & 3 deletions snuba/web/db_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@
from snuba.reader import Reader, Result
from snuba.redis import redis_client
from snuba.request.request_settings import RequestSettings
from snuba.state.cache.abstract import Cache, ExecutionTimeoutError
from snuba.state.cache.abstract import (
Cache,
ExecutionTimeoutError,
TigerExecutionTimeoutError,
)
from snuba.state.cache.redis.backend import RESULT_VALUE, RESULT_WAIT, RedisCache
from snuba.state.rate_limit import (
GLOBAL_RATE_LIMIT_NAME,
Expand Down Expand Up @@ -79,7 +83,10 @@ def encode_exception(self, value: SerializableException) -> bytes:
# reader when running a query.
cache_partitions: MutableMapping[str, Cache[Result]] = {
DEFAULT_CACHE_PARTITION_ID: RedisCache(
redis_client, "snuba-query-cache:", ResultCacheCodec(), ThreadPoolExecutor()
redis_client,
"snuba-query-cache:",
ResultCacheCodec(),
ThreadPoolExecutor(),
)
}
# This lock prevents us from initializing the cache twice. The cache is initialized
Expand Down Expand Up @@ -362,11 +369,17 @@ def _get_cache_partition(reader: Reader) -> Cache[Result]:
# during the first query. So, for the vast majority of queries, the overhead
# of acquiring the lock is not needed.
if partition_id not in cache_partitions:
exception = (
TigerExecutionTimeoutError
if "tiger" in partition_id
else ExecutionTimeoutError
)
cache_partitions[partition_id] = RedisCache(
redis_client,
f"snuba-query-cache:{partition_id}:",
ResultCacheCodec(),
ThreadPoolExecutor(),
exception,
)

return cache_partitions[
Expand Down Expand Up @@ -587,7 +600,10 @@ def raw_query(
errors.ErrorCodes.NETWORK_ERROR,
):
sentry_sdk.set_tag("timeout", "network")
elif isinstance(cause, (TimeoutError, ExecutionTimeoutError)):
elif isinstance(
cause,
(TimeoutError, ExecutionTimeoutError, TigerExecutionTimeoutError),
):
if scope.span:
sentry_sdk.set_tag("timeout", "cache_timeout")

Expand Down
33 changes: 30 additions & 3 deletions tests/state/test_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@
import rapidjson

from snuba.redis import RedisClientType, redis_client
from snuba.state.cache.abstract import Cache, ExecutionTimeoutError
from snuba.state.cache.abstract import (
Cache,
ExecutionTimeoutError,
TigerExecutionTimeoutError,
)
from snuba.state.cache.redis.backend import RedisCache
from snuba.utils.codecs import ExceptionAwareCodec
from snuba.utils.serializable_exception import SerializableException
Expand Down Expand Up @@ -163,7 +167,30 @@ def worker() -> bytes:
assert raised_exc


def test_get_readthrough_set_wait_timeout(backend: Cache[bytes]) -> None:
@pytest.mark.parametrize(
"backend, exc",
[
pytest.param(
RedisCache(redis_client, "test", PassthroughCodec(), ThreadPoolExecutor()),
ExecutionTimeoutError,
id="regular cluster",
),
pytest.param(
RedisCache(
redis_client,
"test",
PassthroughCodec(),
ThreadPoolExecutor(),
TigerExecutionTimeoutError,
),
TigerExecutionTimeoutError,
id="tiger cluster",
),
],
)
def test_get_readthrough_set_wait_timeout(
backend: Cache[bytes], exc: Exception
) -> None:
key = "key"
value = b"value"

Expand All @@ -186,7 +213,7 @@ def worker(timeout: int) -> bytes:
with pytest.raises(TimeoutError):
waiter_fast.result()

with pytest.raises(ExecutionTimeoutError):
with pytest.raises(exc):
waiter_slow.result()


Expand Down