An unresponsive service can be worse than a down one. It can tie up your entire system if not handled properly. All network requests should have a timeout.
Here’s how to add timeouts for popular Python packages. All have been tested. The default is no timeout, unless otherwise specified. Enjoy!
Also available for Ruby, Node, Go, and Rust
Standard library
PyPI
- aiohttp
- asyncpg
- boto3
- cassandra-driver
- elasticsearch
- httpx
- influxdb
- mongoengine
- mysqlclient
- opensearch-py
- psycopg
- psycopg2
- pymemcache
- pymongo
- redis
- requests
- scs
- SQLAlchemy
- trino
- typesense
- urllib3
FTP(host, timeout=1)
Raises socket.timeout
HTTPConnection(host, port, timeout=1)
Raises socket.timeout
IMAP4(host, timeout=1)
Raises socket.timeout
Note: Requires Python 3.9+
NNTP(host, timeout=1)
Raises socket.timeout
POP3(host, timeout=1)
Raises socket.timeout
SMTP(host, timeout=1)
Raises
socket.timeout
on connect timeoutsmtplib.SMTPServerDisconnected
on read timeout
sock.settimeout(1)
Raises socket.timeout
subprocess.run(cmd, timeout=1)
Raises subprocess.TimeoutExpired
Telnet(host, timeout=1)
Raises socket.timeout
urlopen(url, timeout=1)
Raises
urllib.error.URLError
on connect timeoutsocket.timeout
on read timeout
timeout = aiohttp.ClientTimeout(total=1)
async with aiohttp.ClientSession(timeout=timeout) as session:
# ...
Raises asyncio.exceptions.TimeoutError
asyncpg.connect(timeout=1)
Default: 60s
Raises asyncio.exceptions.TimeoutError
boto3.client('s3', config=Config(connect_timeout=1, read_timeout=1))
Raises
botocore.exceptions.ConnectTimeoutError
on connect timeoutbotocore.exceptions.ReadTimeoutError
on read timeout
Cluster([host], connect_timeout=1)
Raises cassandra.cluster.NoHostAvailable
on connect timeout
Elasticsearch(request_timeout=1)
Raises elastic_transport.ConnectionTimeout
httpx.get(url, timeout=1)
# or
httpx.Client(timeout=1)
Raises
httpx.ConnectTimeout
on connect timeouthttpx.ReadTimeout
on read timeout
InfluxDBClient(timeout=1)
Raises
requests.exceptions.ConnectTimeout
on connect timeoutrequests.exceptions.ReadTimeout
on read timeout
connect(connectTimeoutMS=1000, socketTimeoutMS=1000, serverSelectionTimeoutMS=1000)
Raises pymongo.errors.ServerSelectionTimeoutError
MySQLdb.connect(connect_timeout=1)
Raises MySQLdb._exceptions.OperationalError
OpenSearch(timeout=1)
Raises opensearchpy.exceptions.ConnectionError
psycopg.connect(connect_timeout=1)
Raises psycopg.OperationalError
psycopg2.connect(connect_timeout=1)
Raises psycopg2.OperationalError
Client(host, connect_timeout=1, timeout=1)
Raises socket.timeout
MongoClient(connectTimeoutMS=1000, socketTimeoutMS=1000, serverSelectionTimeoutMS=1000)
Default: 20s connect timeout, 30s server selection timeout
Raises pymongo.errors.ServerSelectionTimeoutError
Redis(socket_connect_timeout=1, socket_timeout=1)
Raises redis.exceptions.TimeoutError
requests.get(url, timeout=1)
Raises
requests.exceptions.ConnectTimeout
on connect timeoutrequests.exceptions.ReadTimeout
on read timeout
sol = scs.solve(data, cone, time_limit_secs=1)
Check for a sol['info']['status']
of solved (inaccurate - reached time_limit_secs)
for a timeout
create_engine(url, connect_args={'connect_timeout': 1})
Raises sqlalchemy.exc.OperationalError
trino.dbapi.connect(request_timeout=1)
# or
trino.dbapi.connect(request_timeout=(1, 1)) # (connect, read)
Raises trino.exceptions.TrinoConnectionError
Client({'connection_timeout_seconds': 1})
Raises
requests.exceptions.ConnectTimeout
on connect timeoutrequests.exceptions.ReadTimeout
on read timeout
http = urllib3.PoolManager(timeout=urllib3.Timeout(connect=1, read=1))
# or
http.request('GET', url, timeout=urllib3.Timeout(connect=1, read=1))
Raises urllib3.exceptions.MaxRetryError
Let us know. Even better, create a pull request for it.
git clone https://github.com/ankane/python-timeouts.git
cd python-timeouts
pip install -r requirements.txt
To run all tests, use:
pytest
To run individual tests, use:
pytest tests/test_redis.py