Skip to content

Commit

Permalink
Add an asynchronous method so DNS queries can be run asynchronously
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshData committed Feb 27, 2024
1 parent ea52546 commit 00050ab
Show file tree
Hide file tree
Showing 12 changed files with 461 additions and 67 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
In Development
--------------

* The library now includes an asynchronous version of the main method named validate_email_async, which can be called with await, that runs DNS-based deliverability checks asychronously.

2.1.1 (February 26, 2024)
-------------------------

Expand Down
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Key features:
can display to end-users.
* Checks deliverability (optional): Does the domain name resolve?
(You can override the default DNS resolver to add query caching.)
* Can be called asynchronously with `await`.
* Supports internationalized domain names and internationalized local parts.
* Rejects addresses with unsafe Unicode characters, obsolete email address
syntax that you'd find unexpected, special use domain names like
Expand Down Expand Up @@ -83,6 +84,9 @@ This validates the address and gives you its normalized form. You should
checking if an address is in your database. When using this in a login form,
set `check_deliverability` to `False` to avoid unnecessary DNS queries.

See below for examples for caching DNS queries and calling the library
asynchronously with `await`.

Usage
-----

Expand Down Expand Up @@ -161,6 +165,30 @@ while True:
validate_email(email, dns_resolver=resolver)
```

### Asynchronous call

The library has an alternative, asynchronous method named `validate_email_async` which must be called with `await`. This method uses an [asynchronous DNS resolver](https://dnspython.readthedocs.io/en/latest/async.html) so that multiple DNS-based deliverability checks can be performed in parallel.

Here how to use it. In this example, `import ... as` is used to alias the async method to the usual method name `validate_email`.

```python
from email_validator import validate_email_async as validate_email, \
EmailNotValidError, caching_async_resolver

resolver = caching_async_resolver(timeout=10)

email = "[email protected]"
try:
emailinfo = await validate_email(email)
email = emailinfo.normalized
except EmailNotValidError as e:
print(str(e))
```

Note that to create a caching asynchronous resolver, use `caching_async_resolver`. As with the synchronous version, creating a resolver is optional.

When processing batches of email addresses, I found that chunking around 25 email addresses at a time (using e.g. `asyncio.gather()`) resulted in the highest performance. I tested on a residential Internet connection with valid addresses.

### Test addresses

This library rejects email addresses that use the [Special Use Domain Names](https://www.iana.org/assignments/special-use-domain-names/special-use-domain-names.xhtml) `invalid`, `localhost`, `test`, and some others by raising `EmailSyntaxError`. This is to protect your system from abuse: You probably don't want a user to be able to cause an email to be sent to `localhost` (although they might be able to still do so via a malicious MX record). However, in your non-production test environments you may want to use `@test` or `@myname.test` email addresses. There are three ways you can allow this:
Expand Down
14 changes: 11 additions & 3 deletions email_validator/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
# Export the main method, helper methods, and the public data types.
from .exceptions_types import ValidatedEmail, EmailNotValidError, \
EmailSyntaxError, EmailUndeliverableError
from .validate_email import validate_email
from .validate_email import validate_email_sync as validate_email, validate_email_async
from .version import __version__

__all__ = ["validate_email",
__all__ = ["validate_email", "validate_email_async",
"ValidatedEmail", "EmailNotValidError",
"EmailSyntaxError", "EmailUndeliverableError",
"caching_resolver", "__version__"]
"caching_resolver", "caching_async_resolver",
"__version__"]


def caching_resolver(*args, **kwargs):
Expand All @@ -17,6 +18,13 @@ def caching_resolver(*args, **kwargs):
return caching_resolver(*args, **kwargs)


def caching_async_resolver(*args, **kwargs):
# Lazy load `deliverability` as it is slow to import (due to dns.resolver)
from .deliverability import caching_async_resolver

return caching_async_resolver(*args, **kwargs)


# These global attributes are a part of the library's API and can be
# changed by library users.

Expand Down
100 changes: 76 additions & 24 deletions email_validator/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,86 @@
# python -m email_validator [email protected]
# python -m email_validator < LIST_OF_ADDRESSES.TXT
#
# Provide email addresses to validate either as a command-line argument
# or in STDIN separated by newlines. Validation errors will be printed for
# invalid email addresses. When passing an email address on the command
# line, if the email address is valid, information about it will be printed.
# When using STDIN, no output will be given for valid email addresses.
# Provide email addresses to validate either as a single command-line argument
# or on STDIN separated by newlines.
#
# When passing an email address on the command line, if the email address
# is valid, information about it will be printed to STDOUT. If the email
# address is invalid, an error message will be printed to STDOUT and
# the exit code will be set to 1.
#
# When passsing email addresses on STDIN, validation errors will be printed
# for invalid email addresses. No output is given for valid email addresses.
# Validation errors are preceded by the email address that failed and a tab
# character. It is the user's responsibility to ensure email addresses
# do not contain tab or newline characters.
#
# Keyword arguments to validate_email can be set in environment variables
# of the same name but upprcase (see below).

import json
import os
import sys
import itertools

from .validate_email import validate_email
from .deliverability import caching_resolver
from .deliverability import caching_async_resolver
from .exceptions_types import EmailNotValidError


def main_command_line(email_address, options, dns_resolver):
# Validate the email address passed on the command line.

from . import validate_email

try:
result = validate_email(email_address, dns_resolver=dns_resolver, **options)
print(json.dumps(result.as_dict(), indent=2, sort_keys=True, ensure_ascii=False))
return True
except EmailNotValidError as e:
print(e)
return False


async def main_stdin(options, dns_resolver):
# Validate the email addresses pased line-by-line on STDIN.
# Chunk the addresses and call the async version of validate_email
# for all the addresses in the chunk, and wait for the chunk
# to complete.

import asyncio

from . import validate_email_async as validate_email

dns_resolver = dns_resolver or caching_async_resolver()

# https://stackoverflow.com/a/312467
def split_seq(iterable, size):
it = iter(iterable)
item = list(itertools.islice(it, size))
while item:
yield item
item = list(itertools.islice(it, size))

CHUNK_SIZE = 25

async def process_line(line):
email = line.strip()
try:
await validate_email(email, dns_resolver=dns_resolver, **options)
# If the email was valid, do nothing.
return None
except EmailNotValidError as e:
return (email, e)

chunks = split_seq(sys.stdin, CHUNK_SIZE)
for chunk in chunks:
awaitables = [process_line(line) for line in chunk]
errors = await asyncio.gather(*awaitables)
for error in errors:
if error is not None:
print(*error, sep='\t')


def main(dns_resolver=None):
# The dns_resolver argument is for tests.

Expand All @@ -36,24 +98,14 @@ def main(dns_resolver=None):
if varname in os.environ:
options[varname.lower()] = float(os.environ[varname])

if len(sys.argv) == 1:
# Validate the email addresses pased line-by-line on STDIN.
dns_resolver = dns_resolver or caching_resolver()
for line in sys.stdin:
email = line.strip()
try:
validate_email(email, dns_resolver=dns_resolver, **options)
except EmailNotValidError as e:
print(f"{email} {e}")
if len(sys.argv) == 2:
return main_command_line(sys.argv[1], options, dns_resolver)
else:
# Validate the email address passed on the command line.
email = sys.argv[1]
try:
result = validate_email(email, dns_resolver=dns_resolver, **options)
print(json.dumps(result.as_dict(), indent=2, sort_keys=True, ensure_ascii=False))
except EmailNotValidError as e:
print(e)
import asyncio
asyncio.run(main_stdin(options, dns_resolver))
return True


if __name__ == "__main__":
main()
if not main():
sys.exit(1)
61 changes: 53 additions & 8 deletions email_validator/deliverability.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from .exceptions_types import EmailUndeliverableError

import dns.resolver
import dns.asyncresolver
import dns.exception


Expand All @@ -16,30 +17,74 @@ def caching_resolver(*, timeout: Optional[int] = None, cache=None, dns_resolver=
return resolver


def validate_email_deliverability(domain: str, domain_i18n: str, timeout: Optional[int] = None, dns_resolver=None):
def caching_async_resolver(*, timeout: Optional[int] = None, cache=None, dns_resolver=None):
if timeout is None:
from . import DEFAULT_TIMEOUT
timeout = DEFAULT_TIMEOUT
resolver = dns_resolver or dns.asyncresolver.Resolver()
resolver.cache = cache or dns.resolver.LRUCache() # type: ignore
resolver.lifetime = timeout # type: ignore # timeout, in seconds
return resolver


async def validate_email_deliverability(
domain: str,
domain_i18n: str,
timeout: Optional[int] = None,
dns_resolver=None,
async_loop: Optional[bool] = None
) -> Dict[str, Any]:

# Check that the domain resolves to an MX record. If there is no MX record,
# try an A or AAAA record which is a deprecated fallback for deliverability.
# Raises an EmailUndeliverableError on failure. On success, returns a dict
# with deliverability information.

# When async_loop is None, the caller drives the coroutine manually to get
# the result synchronously, and consequently this call must not yield execution.
# It can use 'await' so long as the callee does not yield execution either.
# Otherwise, if async_loop is not None, there is no restriction on 'await' calls'.

# If no dns.resolver.Resolver was given, get dnspython's default resolver.
# Override the default resolver's timeout. This may affect other uses of
# dnspython in this process.
# Use the asyncresolver if async_loop is not None.
if dns_resolver is None:
if not async_loop:
dns_resolver = dns.resolver.get_default_resolver()
else:
dns_resolver = dns.asyncresolver.get_default_resolver()

# Override the default resolver's timeout. This may affect other uses of
# dnspython in this process.
from . import DEFAULT_TIMEOUT
if timeout is None:
timeout = DEFAULT_TIMEOUT
dns_resolver = dns.resolver.get_default_resolver()
dns_resolver.lifetime = timeout

elif timeout is not None:
raise ValueError("It's not valid to pass both timeout and dns_resolver.")

# Define a resolve function that works with a regular or
# asynchronous dns.resolver.Resolver instance.
async def resolve(qname, rtype):
# When called non-asynchronously, expect a regular
# resolver that returns synchronously. Or if async_loop
# is not None but the caller didn't pass an
# dns.asyncresolver.Resolver, call it synchronously.
if not async_loop or not isinstance(dns_resolver, dns.asyncresolver.Resolver):
return dns_resolver.resolve(qname, rtype)

# When async_loop is not None and if given a
# dns.asyncresolver.Resolver, call it asynchronously.
else:
return await dns_resolver.resolve(qname, rtype)

# Collect successful deliverability information here.
deliverability_info: Dict[str, Any] = {}

try:
try:
# Try resolving for MX records (RFC 5321 Section 5).
response = dns_resolver.resolve(domain, "MX")
response = await resolve(domain, "MX")

# For reporting, put them in priority order and remove the trailing dot in the qnames.
mtas = sorted([(r.preference, str(r.exchange).rstrip('.')) for r in response])
Expand All @@ -59,7 +104,7 @@ def validate_email_deliverability(domain: str, domain_i18n: str, timeout: Option
except dns.resolver.NoAnswer:
# If there was no MX record, fall back to an A record. (RFC 5321 Section 5)
try:
response = dns_resolver.resolve(domain, "A")
response = await resolve(domain, "A")
deliverability_info["mx"] = [(0, str(r)) for r in response]
deliverability_info["mx_fallback_type"] = "A"

Expand All @@ -68,7 +113,7 @@ def validate_email_deliverability(domain: str, domain_i18n: str, timeout: Option
# If there was no A record, fall back to an AAAA record.
# (It's unclear if SMTP servers actually do this.)
try:
response = dns_resolver.resolve(domain, "AAAA")
response = await resolve(domain, "AAAA")
deliverability_info["mx"] = [(0, str(r)) for r in response]
deliverability_info["mx_fallback_type"] = "AAAA"

Expand All @@ -85,7 +130,7 @@ def validate_email_deliverability(domain: str, domain_i18n: str, timeout: Option
# absence of an MX record, this is probably a good sign that the
# domain is not used for email.
try:
response = dns_resolver.resolve(domain, "TXT")
response = await resolve(domain, "TXT")
for rec in response:
value = b"".join(rec.strings)
if value.startswith(b"v=spf1 "):
Expand Down
Loading

0 comments on commit 00050ab

Please sign in to comment.