Recipe for preventing SSRF? #10224
Replies: 3 comments 13 replies
-
I'm thinking of making a custom subclass of |
Beta Was this translation helpful? Give feedback.
-
here's my temporary solution: (it's fragile since it relies on private method import ipaddress
from aiohttp import TCPConnector, ClientSession
import asyncio
# not a public API, at least check if it exists
assert hasattr(TCPConnector, "_convert_hosts_to_addr_infos")
class SSRFException(ValueError):
pass
class SSRFProtectedTCPConnector(TCPConnector):
def _convert_hosts_to_addr_infos(self, hosts):
for host in hosts:
if ipaddress.ip_address(host["host"]).is_private:
raise SSRFException(
"Can't connect to private IP: " + host["host"]
)
return super()._convert_hosts_to_addr_infos(hosts)
async def main():
async with ClientSession(connector=SSRFProtectedTCPConnector()) as client:
# normal requests work
async with client.get("http://example.com/") as resp:
assert resp.ok
# private IPs are blocked
for testcase in [
"http://192.168.0.1/",
"http://localhost/",
"https://localhost/",
"http://127.0.0.1/",
"https://tinyurl.com/localssrftest", # works for redirects, too!
]:
try:
async with client.get(testcase) as resp:
print(resp.status)
assert False, "should have raised an exception"
except SSRFException as e:
print(repr(e))
if __name__ == "__main__":
asyncio.run(main()) |
Beta Was this translation helpful? Give feedback.
-
I came up with a slightly cleaner method, which is to provide a custom DNS resolver (wrapping the default one) which filters its responses. However, it requires a monkeypatch to stop Lines 908 to 915 in 45de81d import ipaddress
from aiohttp import TCPConnector, ClientSession
import aiohttp.connector
from aiohttp.resolver import DefaultResolver, AbstractResolver
import asyncio
# XXX: monkeypatch to force all hosts to go through the resolver
aiohttp.connector.is_ip_address = lambda _: False
class SSRFException(ValueError):
pass
class SSRFSafeResolverWrapper(AbstractResolver):
def __init__(self, resolver: AbstractResolver):
self.resolver = resolver
async def resolve(self, host: str, port: int, family: int):
result = await self.resolver.resolve(host, port, family)
for host in result:
if ipaddress.ip_address(host["host"]).is_private:
raise SSRFException("Can't connect to private IP: " + host["host"])
return result
async def close(self) -> None:
await self.resolver.close()
async def main():
resolver = SSRFSafeResolverWrapper(DefaultResolver())
connector = TCPConnector(resolver=resolver)
async with ClientSession(connector=connector) as client:
# normal requests work
async with client.get("http://example.com/") as resp:
assert resp.ok
# private IPs are blocked
for testcase in [
"http://localhost/",
"http://192.168.0.1/",
"https://localhost/",
"http://127.0.0.1/",
"https://tinyurl.com/localssrftest", # works for redirects, too!
]:
try:
async with client.get(testcase) as resp:
print(resp.status)
assert False, "should have raised an exception"
except SSRFException as e:
print(repr(e))
if __name__ == "__main__":
asyncio.run(main()) |
Beta Was this translation helpful? Give feedback.
-
I'm writing a web application that accepts user-provided URLs, and queries them using an aiohttp client.
I want to restrict this client from being able to access "internal" network resources, including but not limited to "localhost". I want it to remain restricted even if it follows HTTP redirects.
Some more background on why this matters: https://portswigger.net/web-security/ssrf
Does anyone have some ideas for how to approach this?
Beta Was this translation helpful? Give feedback.
All reactions