Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for async I/O #1203

Closed
wants to merge 30 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
1044939
Initial commit adding async support
sethmlarson Apr 16, 2020
74e683b
Add unasync-ing of _async gen code
sethmlarson Apr 17, 2020
0d82870
lint
sethmlarson Apr 17, 2020
d504b7f
Address review comments
sethmlarson Apr 20, 2020
754ea31
Move _normalize_hosts into utils
sethmlarson Apr 20, 2020
75218cc
Merge branch 'master' into add-async
sethmlarson Apr 20, 2020
56d49a3
Add AsyncTransport sniffing
sethmlarson Apr 22, 2020
e703d33
Merge branch 'add-async' of ssh://github.com/elastic/elasticsearch-py…
sethmlarson Apr 22, 2020
e7228fb
Merge branch 'master' of ssh://github.com/elastic/elasticsearch-py in…
sethmlarson Apr 29, 2020
aaed0d4
Add tests for AIOHttpConnection
sethmlarson Apr 29, 2020
020cc6f
Add tests for AsyncTransport, async helpers
sethmlarson May 5, 2020
262c1bb
checkpoint
sethmlarson May 7, 2020
490d69c
Fixed bug in async_streaming_bulk()
sethmlarson May 7, 2020
ee597ce
All async helper tests passing
sethmlarson May 7, 2020
1742803
Add [async] deps to dev-requirements.txt
sethmlarson May 8, 2020
d0ad40d
Add AsyncConnectionPool tests
sethmlarson May 8, 2020
01037c2
Add pytest for async REST API tests
sethmlarson May 8, 2020
2ed861a
Skip proper tests for async REST API
sethmlarson May 11, 2020
252c0fc
Add documentation for async helpers and API
sethmlarson May 11, 2020
30c6976
Always use AIOHttpConnection for async tests
sethmlarson May 11, 2020
82531f1
Update sync tests to use pytest setup/teardown
sethmlarson May 11, 2020
5a81989
Update API to latest master, fix order of methods to reduce PR diff
sethmlarson May 11, 2020
96c7d83
Skip v2 index template issues, fix ML cleanup
sethmlarson May 11, 2020
e19674a
Fix async client to use AsyncTransport
sethmlarson May 12, 2020
4d1a9ba
Skip some more tests
sethmlarson May 12, 2020
334c040
Only run REST API tests once per execution
sethmlarson May 12, 2020
b75d420
Also delete index v2 templates
sethmlarson May 13, 2020
e73b6a0
Also delete index aliases?
sethmlarson May 13, 2020
3a88306
Oh yeah we skip that on sync, skip on async too
sethmlarson May 13, 2020
3fbcdda
Use pytest skip marker
sethmlarson May 13, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ pytest
pytest-cov
coverage
mock
nosexcover
sphinx<1.7
sphinx_rtd_theme
jinja2
Expand All @@ -15,3 +14,8 @@ pandas
pyyaml<5.3

black; python_version>="3.6"

# Async dependencies
unasync; python_version>="3.6"
aiohttp; python_version>="3.6"
pytest-asyncio; python_version>="3.6"
207 changes: 207 additions & 0 deletions docs/async.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
Using Asyncio with Elasticsearch
================================

.. py:module:: elasticsearch

Starting in ``elasticsearch-py`` v7.8.0 for Python 3.6+ the ``elasticsearch`` package supports async/await with
`Asyncio <https://docs.python.org/3/library/asyncio.html>`_. Install the package with the ``async``
extra to install the ``aiohttp`` HTTP client and other dependencies required for async support:

.. code-block:: bash

$ python -m pip install elasticsearch[async]>=7.8.0

The same version specifiers for following the Elastic Stack apply to
the ``async`` extra::

# Elasticsearch 7.x
$ python -m pip install elasticsearch[async]>=7,<8

After installation all async API endpoints are available via :class:`~elasticsearch.AsyncElasticsearch`
and are used in the same way as other APIs, just with an extra ``await``:

.. code-block:: python

import asyncio
from elasticsearch import AsyncElasticsearch

es = AsyncElasticsearch()

async def main():
resp = await es.search(
index="documents",
body={"query": {"match_all": {}}}
size=20,
)
print(resp)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

.. note::

Previously asyncio was supported via the `elasticsearch-async <https://github.com/elastic/elasticsearch-py-async>`_ package.
elasticsearch-async has been deprecated in favor of ``elasticsearch`` async support.
For Elasticsearch 7.x and later you must install
``elasticsearch[async]`` and use ``elasticsearch.AsyncElasticsearch()``.

.. note::

Async support is not supported in Python 3.5 or earlier. Upgrade to Python 3.6
or later for async support.

Async Helpers
-------------

Async variants of all helpers are available in ``elasticsearch.helpers``
and are all prefixed with ``async_*``. You'll notice that these APIs
are identical to the ones in the sync :ref:`helpers` documentation.

All async helpers that accept an iterator or generator also accept async iterators
and async generators.

.. code-block:: python

from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers import (
async_bulk,
async_scan,
async_streaming_bulk,
async_reindex
)

es = AsyncElasticsearch()

async def gendata(): # Async generator
mywords = ['foo', 'bar', 'baz']
for word in mywords:
yield {
"_index": "mywords",
"doc": {"word": word},
}

async def main():
await async_bulk(es, gendata())

await async_reindex()


.. py:module:: elasticsearch.helpers

Bulk and Streaming Bulk
~~~~~~~~~~~~~~~~~~~~~~~

.. autofunction:: async_bulk

.. code-block:: python

import asyncio
from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers import async_bulk

es = AsyncElasticsearch()

async def gendata():
mywords = ['foo', 'bar', 'baz']
for word in mywords:
yield {
"_index": "mywords",
"doc": {"word": word},
}

async def main():
await async_bulk(es, gendata())

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

.. autofunction:: async_streaming_bulk

.. code-block:: python

import asyncio
from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers import async_bulk

es = AsyncElasticsearch()

async def gendata():
mywords = ['foo', 'bar', 'baz']
for word in mywords:
yield {
"_index": "mywords",
"doc": {"word": word},
}

async def main():
async for ok, result in async_streaming_bulk(es, gendata()):
action, result = result.popitem()
if not ok:
print("failed to %s document %s" % ())

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Scan
~~~~

.. autofunction:: async_scan

.. code-block:: python

import asyncio
from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers import async_scan

es = AsyncElasticsearch()

async def main():
async for doc in async_scan(
client=es,
query={"query": {"match": {"title": "python"}}},
index="orders-*"
):
print(doc)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Reindex
~~~~~~~

.. autofunction:: async_reindex


API Reference
-------------

.. py:module:: elasticsearch

The API of :class:`~elasticsearch.AsyncElasticsearch` is nearly identical
to the API of :class:`~elasticsearch.Elasticsearch` with the exception that
every API call like :py:func:`~elasticsearch.AsyncElasticsearch.search` is
an ``async`` function and requires an ``await`` to properly return the response
body.

AsyncTransport
~~~~~~~~~~~~~~

.. autoclass:: AsyncTransport
:members:

AIOHttpConnection
~~~~~~~~~~~~~~~~~

.. autoclass:: AIOHttpConnection
:members:

AsyncElasticsearch
~~~~~~~~~~~~~~~~~~

.. note::

To reference Elasticsearch APIs that are namespaced like ``.indices.create()``
refer to the sync API reference. These APIs are identical between sync and async.

.. autoclass:: AsyncElasticsearch
:members:
14 changes: 12 additions & 2 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,14 @@ Installation
Install the ``elasticsearch`` package with `pip
<https://pypi.python.org/pypi/elasticsearch>`_::

pip install elasticsearch
$ python -m pip install elasticsearch

If your application uses async/await in Python you can install with
the ``async`` extra::

$ python -m pip install elasticsearch[async]

Read more about `how to use asyncio with this project <async>`_.

Example Usage
-------------
Expand Down Expand Up @@ -251,12 +258,14 @@ or the port value encoded within ``cloud_id``. Using Cloud ID also disables sni
http_auth=("elastic", "<password>"),
)

APIKey Authentication
API Key Authentication
~~~~~~~~~~~~~~~~~~~~~~

You can configure the client to use Elasticsearch's `API Key`_ for connecting to your cluster.
Please note this authentication method has been introduced with release of Elasticsearch ``6.7.0``.

.. code-block:: python

from elasticsearch import Elasticsearch

# you can use the api key tuple
Expand Down Expand Up @@ -374,6 +383,7 @@ Contents
api
xpack
exceptions
async
connection
transports
helpers
Expand Down
24 changes: 24 additions & 0 deletions elasticsearch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
__version__ = VERSION
__versionstr__ = ".".join(map(str, VERSION))

import sys
import logging
import warnings

Expand Down Expand Up @@ -64,3 +65,26 @@
"AuthorizationException",
"ElasticsearchDeprecationWarning",
]

try:
# Async is only supported on Python 3.6+
if sys.version_info < (3, 6):
raise ImportError()

from ._async import (
AsyncElasticsearch,
AsyncTransport,
AIOHttpConnection,
AsyncConnectionPool,
AsyncDummyConnectionPool,
)

__all__ += [
"AsyncElasticsearch",
"AsyncTransport",
"AIOHttpConnection",
"AsyncConnectionPool",
"AsyncDummyConnectionPool",
]
except (ImportError, SyntaxError):
pass
28 changes: 28 additions & 0 deletions elasticsearch/_async/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Licensed to Elasticsearch B.V under one or more agreements.
# Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
# See the LICENSE file in the project root for more information

from .client import Elasticsearch
from .connection_pool import AsyncConnectionPool, AsyncDummyConnectionPool
from .transport import AsyncTransport
from .http_aiohttp import AIOHttpConnection


class AsyncElasticsearch(Elasticsearch):
# This class def is for both the name 'AsyncElasticsearch'
# and all async-only additions to the class.
async def __aenter__(self):
await self.transport._async_call()
return self


AsyncElasticsearch.__doc__ = Elasticsearch.__doc__


__all__ = [
"AsyncElasticsearch",
"AsyncConnectionPool",
"AsyncDummyConnectionPool",
"AsyncTransport",
"AIOHttpConnection",
]
Loading