Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async operations with patched aiobotocore gives error "Already ended segment and subsegment cannot be modified." #164

Open
IvDoorn opened this issue Jul 1, 2019 · 49 comments · Fixed by #340
Labels

Comments

@IvDoorn
Copy link

IvDoorn commented Jul 1, 2019

I'm running into some problems with XRay in combination with async functions and aiobotocore.

Packages:

aioboto3==6.4.1
aiobotocore==0.10.2
aws-xray-sdk==2.4.2
boto3==1.9.176
botocore==1.12.176

I've created a very small sample script to demonstrate my issue:

import aioboto3
import asyncio
import logging
import sys

from datetime import datetime

from aws_xray_sdk.core import patch
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core.async_context import AsyncContext

logger = logging.getLogger()
logger.setLevel(logging.INFO)

xray_logger = logging.getLogger('aws_xray_sdk')
xray_logger.setLevel(logging.INFO)

handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
logging.getLogger().addHandler(handler)

xray_recorder.configure(service='Test', context=AsyncContext(), sampling=False)

patch([
    'aiobotocore',
], raise_errors=False)


async def perform_test(gather=False, loop_count=10):
    async_client = aioboto3.client('clouddirectory', region_name='eu-west-1')

    try:
        with xray_recorder.in_segment_async('ASYNC'):
            @xray_recorder.capture_async('execute')
            async def execute(client):
                retval = await client.list_directories(state='ENABLED')
                return retval['Directories']

            results = []
            if gather:
                with xray_recorder.capture_async('gather'):
                    for result in await asyncio.gather(*[execute(async_client) for i in range(loop_count)]):
                        results += result

            else:
                with xray_recorder.capture_async('loop'):
                    for i in range(loop_count):
                        results += await execute(async_client)

            return results
    finally:
        await async_client.close()


def test_gather(loop_count=10):
    try:
        now = datetime.utcnow()
        asyncio.get_event_loop().run_until_complete(perform_test(gather=True, loop_count=loop_count))
        logger.info('GATHER: PASS %s' % str(datetime.utcnow() - now))
    except:
        logger.exception('GATHER: FAIL')


def test_loop(loop_count=10):
    try:
        now = datetime.utcnow()
        asyncio.get_event_loop().run_until_complete(perform_test(gather=False, loop_count=loop_count))
        logger.info('LOOP: PASS %s' % str(datetime.utcnow() - now))
    except:
        logger.exception('LOOP: FAIL')


logger.info('==========================================')
test_gather()
logger.info('==========================================')
logger.info('==========================================')
test_loop()
logger.info('==========================================')

The expected outcome should be something like:

==========================================
Found credentials in shared credentials file: ~/.aws/credentials
GATHER: PASS 0:00:00.251635
==========================================
==========================================
LOOP: PASS 0:00:01.386801
==========================================

The timings are irrelevant, but it it means that both approaches of using aiobotocore and XRay work together. However, the actual outcome is:

successfully patched module aiobotocore
==========================================
Found credentials in shared credentials file: ~/.aws/credentials
GATHER: FAIL
Traceback (most recent call last):
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/async_recorder.py", line 82, in record_subsegment_async
    return_value = await wrapped(*args, **kwargs)
  File "/home/idoorn/xray_test.py", line 36, in execute
    retval = await client.list_directories(state='ENABLED')
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/ext/aiobotocore/patch.py", line 36, in _xray_traced_aiobotocore
    meta_processor=aws_meta_processor,
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/async_recorder.py", line 101, in record_subsegment_async
    stack=stack,
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/ext/boto_utils.py", line 57, in aws_meta_processor
    resp_meta.get('HTTPStatusCode'))
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/models/entity.py", line 102, in put_http_meta
    self._check_ended()
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/models/entity.py", line 283, in _check_ended
    raise AlreadyEndedException("Already ended segment and subsegment cannot be modified.")
aws_xray_sdk.core.exceptions.exceptions.AlreadyEndedException: Already ended segment and subsegment cannot be modified.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/idoorn/xray_test.py", line 58, in test_gather
    asyncio.get_event_loop().run_until_complete(perform_test(gather=True, loop_count=loop_count))
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/asyncio/base_events.py", line 484, in run_until_complete
    return future.result()
  File "/home/idoorn/xray_test.py", line 42, in perform_test
    for result in await asyncio.gather(*[execute(async_client) for i in range(loop_count)]):
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/async_recorder.py", line 33, in __call__
    meta_processor=None,
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/async_recorder.py", line 105, in record_subsegment_async
    subsegment.add_exception(exception, stack)
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/models/entity.py", line 220, in add_exception
    self._check_ended()
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/models/entity.py", line 283, in _check_ended
    raise AlreadyEndedException("Already ended segment and subsegment cannot be modified.")
aws_xray_sdk.core.exceptions.exceptions.AlreadyEndedException: Already ended segment and subsegment cannot be modified.
==========================================
==========================================
LOOP: PASS 0:00:01.989784
==========================================

Using asyncio.gather() doesn't seem to play nice with xray, as when the call to clouddirectory comes, the subsegment which was used for tracing the call has already been closed. Only when calling the async calls sequentially will xray work nicely. But that obviously would kind of defeat the benefits of using async. :)

NOTE: if I remove the lines:

patch([
    'aiobotocore',
], raise_errors=False)

from the above test script, everything works well, but no tracing is occurring on the clouddirectory calls obviously.

@IvDoorn
Copy link
Author

IvDoorn commented Jul 1, 2019

Not sure if it is relevant, but I've noticed that the XRay segments when using asyncio.gather() are build up in a deep tree.
For example:
asyncio.gather([task() for i in range(5)])

yields segments:

segment = {
  id = 1
  subsegment = [{
    id = 2
    subsegment = [{
      id = 3
      subsegment = [{
        id = 4
        subsegment = [{
          id = 5
          subsegment = [{
            id = 6
          }]
        }]
      }]
    }]
  }]
}

which doesn't really feel correct, as I would expect a similar outcome as when I called them sequentially:

segment = {
  id = 1
  subsegment = [{
    id = 2
  },{
    id = 3
  },{
    id = 4
  },{
    id = 5
  },{
    id = 6
  }]
}

I'm just raising this as observation, as maybe this isconstruction is the cause of the issue. If not then my comment can be discarded.

@IvDoorn
Copy link
Author

IvDoorn commented Jul 10, 2019

I've added annotations in the AsyncContext, where the (sub)segment id and name are being printed.

The output when aiobotocore is not being patched:

==========================================
Found credentials in shared credentials file: ~/.aws/credentials
 put_segment 711fcc470e7cbb6f ASYNC
    put_subsegment 792f4b2aad19f420 gather
    put_subsegment 216f5c6ac6ccd048 execute
    put_subsegment ba38aa8f88d00fb2 execute
    put_subsegment c36a983cea5ab9f6 execute
    put_subsegment 6ee6263fb314a68c execute
    put_subsegment 789485876d6d256b execute
    put_subsegment 31690b346919ac6c execute
    put_subsegment 81a182a0ba306165 execute
    put_subsegment 4d21fdd41115ddae execute
    put_subsegment a63f45b6de4a6e99 execute
    put_subsegment b3b3e4c98a08c13f execute
    end_subsegment b3b3e4c98a08c13f execute
    end_subsegment a63f45b6de4a6e99 execute
    end_subsegment 4d21fdd41115ddae execute
    end_subsegment 81a182a0ba306165 execute
    end_subsegment 31690b346919ac6c execute
    end_subsegment 789485876d6d256b execute
    end_subsegment 6ee6263fb314a68c execute
    end_subsegment c36a983cea5ab9f6 execute
    end_subsegment ba38aa8f88d00fb2 execute
    end_subsegment 216f5c6ac6ccd048 execute
    end_subsegment 792f4b2aad19f420 gather
 end_segment 711fcc470e7cbb6f ASYNC
 clear_trace_entities
GATHER: PASS 0:00:00.297979
==========================================
==========================================
 put_segment 6c96a0ef56cf1b02 ASYNC
    put_subsegment b8ec066ffbcd99cf loop
    put_subsegment 3c3482cda5262ec8 execute
    end_subsegment 3c3482cda5262ec8 execute
    put_subsegment 7ac32e55b3374b38 execute
    end_subsegment 7ac32e55b3374b38 execute
    put_subsegment 982172a693e4a66a execute
    end_subsegment 982172a693e4a66a execute
    put_subsegment 50b4e3b69de27bed execute
    end_subsegment 50b4e3b69de27bed execute
    put_subsegment f57d1221c5dfb563 execute
    end_subsegment f57d1221c5dfb563 execute
    put_subsegment 2366ef95317cd3af execute
    end_subsegment 2366ef95317cd3af execute
    put_subsegment 3e026e0c86b8271b execute
    end_subsegment 3e026e0c86b8271b execute
    put_subsegment 943026938a58dd59 execute
    end_subsegment 943026938a58dd59 execute
    put_subsegment 54bbf67d63f2fe2f execute
    end_subsegment 54bbf67d63f2fe2f execute
    put_subsegment 65264211d9e192b6 execute
    end_subsegment 65264211d9e192b6 execute
    end_subsegment b8ec066ffbcd99cf loop
 end_segment 6c96a0ef56cf1b02 ASYNC
 clear_trace_entities
LOOP: PASS 0:00:00.580935

The output when aiobotocore is being patched:

successfully patched module aiobotocore
==========================================
Found credentials in shared credentials file: ~/.aws/credentials
 put_segment 78e4d90abc45a819 ASYNC
    put_subsegment a082565910e4592a gather
    put_subsegment 14b465d5c688d2d0 execute
    put_subsegment 7b191e9abbb0d7bf clouddirectory
    put_subsegment 323647a6f040e268 execute
    put_subsegment ba22149f76c4fc29 clouddirectory
    put_subsegment d14964c4e833cc61 execute
    put_subsegment d2556484dd044914 clouddirectory
    put_subsegment ab770ada232edea0 execute
    put_subsegment ac084b5cfa75188f clouddirectory
    put_subsegment bbc0eb69fb076fc6 execute
    put_subsegment 568672914d3e285d clouddirectory
    put_subsegment 3c7be0ff1506e0f1 execute
    put_subsegment d2222a649d43b44b clouddirectory
    put_subsegment 2723da4417455290 execute
    put_subsegment dc8bbbc0562f5a94 clouddirectory
    put_subsegment 0ace3622760b52d1 execute
    put_subsegment 77883c8fc76a8a54 clouddirectory
    put_subsegment 53b97b82d0e67aee execute
    put_subsegment c37d884576651c9d clouddirectory
    put_subsegment bb9e01f8106a9197 execute
    put_subsegment 6a818f5c10a7d7f4 clouddirectory
    end_subsegment 6a818f5c10a7d7f4 clouddirectory
    end_subsegment bb9e01f8106a9197 execute
    end_subsegment c37d884576651c9d clouddirectory
    end_subsegment 53b97b82d0e67aee execute
    end_subsegment 77883c8fc76a8a54 clouddirectory
    end_subsegment 0ace3622760b52d1 execute
    end_subsegment dc8bbbc0562f5a94 clouddirectory
    end_subsegment 2723da4417455290 execute
    end_subsegment d2222a649d43b44b clouddirectory
    end_subsegment 3c7be0ff1506e0f1 execute
    end_subsegment 568672914d3e285d clouddirectory
    end_subsegment bbc0eb69fb076fc6 execute
    end_subsegment ac084b5cfa75188f clouddirectory
    end_subsegment ab770ada232edea0 execute
    end_subsegment d2556484dd044914 clouddirectory
 end_segment 78e4d90abc45a819 ASYNC STILL HAS OPENED SUBSEGMENTS
 end_subsegment d14964c4e833cc61 execute
 end_subsegment ba22149f76c4fc29 clouddirectory
GATHER: FAIL
Traceback (most recent call last):
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/async_recorder.py", line 82, in record_subsegment_async
    return_value = await wrapped(*args, **kwargs)
  File "xray_test.py", line 79, in execute
    retval = await client.list_directories(state='ENABLED')
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/ext/aiobotocore/patch.py", line 36, in _xray_traced_aiobotocore
    meta_processor=aws_meta_processor,
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/async_recorder.py", line 101, in record_subsegment_async
    stack=stack,
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/ext/boto_utils.py", line 57, in aws_meta_processor
    resp_meta.get('HTTPStatusCode'))
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/models/entity.py", line 102, in put_http_meta
    self._check_ended()
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/models/entity.py", line 283, in _check_ended
    raise AlreadyEndedException("Already ended segment and subsegment cannot be modified.")
aws_xray_sdk.core.exceptions.exceptions.AlreadyEndedException: Already ended segment and subsegment cannot be modified.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "xray_test.py", line 101, in test_gather
    asyncio.get_event_loop().run_until_complete(perform_test(gather=True, loop_count=loop_count))
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/asyncio/base_events.py", line 484, in run_until_complete
    return future.result()
  File "xray_test.py", line 85, in perform_test
    for result in await asyncio.gather(*[execute(async_client) for i in range(loop_count)]):
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/async_recorder.py", line 33, in __call__
    meta_processor=None,
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/async_recorder.py", line 105, in record_subsegment_async
    subsegment.add_exception(exception, stack)
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/models/entity.py", line 220, in add_exception
    self._check_ended()
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/models/entity.py", line 283, in _check_ended
    raise AlreadyEndedException("Already ended segment and subsegment cannot be modified.")
aws_xray_sdk.core.exceptions.exceptions.AlreadyEndedException: Already ended segment and subsegment cannot be modified.
==========================================
==========================================
 put_segment b6fedb006c67a028 ASYNC
    put_subsegment 671b3fabef5c1d20 loop
    put_subsegment 8ea469a8e76f7e8d execute
    put_subsegment cfc5e9c02e445749 clouddirectory
    end_subsegment cfc5e9c02e445749 clouddirectory
    end_subsegment 8ea469a8e76f7e8d execute
    put_subsegment bf7ef80b842e0860 execute
    put_subsegment 68b3ecec109207b9 clouddirectory
    end_subsegment 68b3ecec109207b9 clouddirectory
    end_subsegment bf7ef80b842e0860 execute
    put_subsegment 7f80392a32abc60a execute
    put_subsegment f8f32b915be7a8f8 clouddirectory
    end_subsegment f8f32b915be7a8f8 clouddirectory
    end_subsegment 7f80392a32abc60a execute
    put_subsegment 238af8551c63c191 execute
    put_subsegment b36f43bb28a0d133 clouddirectory
    end_subsegment b36f43bb28a0d133 clouddirectory
    end_subsegment 238af8551c63c191 execute
    put_subsegment 8a30e5f2397d5e38 execute
    put_subsegment 006d078cbf530831 clouddirectory
    end_subsegment 006d078cbf530831 clouddirectory
    end_subsegment 8a30e5f2397d5e38 execute
    put_subsegment 48391cd8290590fc execute
    put_subsegment d3cedf188da9f5d6 clouddirectory
    end_subsegment d3cedf188da9f5d6 clouddirectory
    end_subsegment 48391cd8290590fc execute
    put_subsegment 99bd19c4174e0484 execute
    put_subsegment 33fd5a68dd3901c3 clouddirectory
    end_subsegment 33fd5a68dd3901c3 clouddirectory
    end_subsegment 99bd19c4174e0484 execute
    put_subsegment e0da9caf1d677cff execute
    put_subsegment a3315fa97484ff42 clouddirectory
    end_subsegment a3315fa97484ff42 clouddirectory
    end_subsegment e0da9caf1d677cff execute
    put_subsegment fbfbf5e350e94293 execute
    put_subsegment 74dc916aa8d9d322 clouddirectory
    end_subsegment 74dc916aa8d9d322 clouddirectory
    end_subsegment fbfbf5e350e94293 execute
    put_subsegment c16594d7542ccc5c execute
    put_subsegment 17ef29ec0b79aee0 clouddirectory
    end_subsegment 17ef29ec0b79aee0 clouddirectory
    end_subsegment c16594d7542ccc5c execute
    end_subsegment 671b3fabef5c1d20 loop
 end_segment b6fedb006c67a028 ASYNC
 clear_trace_entities
LOOP: PASS 0:00:00.614093
==========================================

The number of subsegments which are being closed before the segment is being closed varies between runs.

@IvDoorn
Copy link
Author

IvDoorn commented Jul 10, 2019

The problem seems to be that the aws_xray_sdk has a single list of traces which are open. And has problems when segments are closed out of order (which is a normal pattern when asyncio.gather() is being used, and I think also common for the async pattern in general)

With some debugging I found that async_recorder.py::record_subsegment_async() keeps track of the subsegment. as soon as it arrives in the finally statement, it adds the required annotations to the subsegment which it has a reference to. But when it calls end_subsegment it doesn't close the subsegment it has a reference to, but it closes a subsegment which is on top of the entities stack, which might actually be a completely different subsegment.

@oba11
Copy link

oba11 commented Jul 31, 2019

I'm experiencing this as well. This happens if I have an async function that uses await asyncio.gather(*[foo(x) for x in list]).

Once it's switched to for x in list: await foo(x), it works fine.

@chanchiem
Copy link
Contributor

Hey,

Thank you for reporting the issue you are having and giving a detailed explanation of it. This will definitely help us in determining the root cause and finding a solution for it. It sounds like the issue is that the asynchronous tasks aren't holding the appropriate references to their parents, and so when the tasks are finished and the subsegments are closed, only the top-most parents in the context entity list are being referenced. We'll have to investigate this and see what changes are necessary to provide a fix. Please stay tuned

Thanks again for reporting this issue!

@dangtrinhnt-mmt
Copy link

dangtrinhnt-mmt commented Oct 1, 2019

Any update on this issue? I'm experiencing a similar problem but even with or without patching aiobotocore.

Traceback (most recent call last):
  File '/path/to/app/tasks.py', line 276, in work
    await self._fire_medium_messages(medium, client)
  File '/path/to/app/tasks.py', line 243, in _fire_medium_messages
    await self._execute_fire_tasks(tasks, message_ids, is_bulk_test=bulk_test, template=msg_state_template)
  File '/path/to/app/tasks.py', line 200, in _execute_fire_tasks
    results = await asyncio.gather(*tasks)
  File '/path/to/app/mediums/message.py', line 51, in fire
    await client.send_message(**args)
  File '/usr/local/lib/python3.6/site-packages/aws_xray_sdk/ext/aiobotocore/patch.py', line 36, in _xray_traced_aiobotocore
    meta_processor=aws_meta_processor,
  File '/usr/local/lib/python3.6/site-packages/aws_xray_sdk/core/async_recorder.py', line 101, in record_subsegment_async
    stack=stack,
  File '/usr/local/lib/python3.6/site-packages/aws_xray_sdk/ext/boto_utils.py', line 57, in aws_meta_processor
    resp_meta.get('HTTPStatusCode'))
  File '/usr/local/lib/python3.6/site-packages/aws_xray_sdk/core/models/entity.py', line 102, in put_http_meta
    self._check_ended()
  File '/usr/local/lib/python3.6/site-packages/aws_xray_sdk/core/models/entity.py', line 283, in _check_ended
    raise AlreadyEndedException('Already ended segment and subsegment cannot be modified.')
aws_xray_sdk.core.exceptions.exceptions.AlreadyEndedException: Already ended segment and subsegment cannot be modified.

This is how I used aws_xray_sdk

import aiobotocore
from aws_xray_sdk.core import patch, xray_recorder
from aws_xray_sdk.core.async_context import AsyncContext
xray_recorder.configure(service='sqs', context=AsyncContext())
patch(modules_to_patch=['aiobotocore'])

These are the modules I'm using:

aiobotocore==0.10.3
aiohttp==3.3.2
aioredlock==0.2.0
aiomysql==0.0.19
aioelasticsearch
asyncpgsa==0.22.0
aws-xray-sdk==2.4.2
boto3==1.9.99

@chanchiem
Copy link
Contributor

Hey,

Sorry for the delayed response. We're still investigating this internally and are really breaking it down so that we can isolate whether the issue is truly coming from the way we are patching aiobotocore or if it's a bug with how we propagate the context information using TaskLocalStorage.

@dangtrinhnt-mmt You mentioned that you are experiencing the problem with or without patching aiobotocore. Is there a way you can provide a minimal reproduction code sample to us?

@IvDoorn
Copy link
Author

IvDoorn commented Oct 10, 2019

Hey,

I'm afraid you are staring at aiobotocore, while that is not where the problem is. As mentioned in my first post, the problem is in asyncio.gather(). Also please see my comment here: #164 (comment)

Here is my updated script, which still demonstrates the issue, but does not use aiobotocore.

import asyncio
import logging
import sys

from datetime import datetime

from aws_xray_sdk.core import patch
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core.async_context import AsyncContext

logger = logging.getLogger()
logger.setLevel(logging.INFO)

xray_logger = logging.getLogger('aws_xray_sdk')
xray_logger.setLevel(logging.INFO)

handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
logging.getLogger().addHandler(handler)

xray_recorder.configure(service='Test', context=AsyncContext(), sampling=False)

async def perform_test(gather=False, loop_count=10):

    with xray_recorder.in_segment_async('ASYNC'):
        @xray_recorder.capture_async('execute')
        async def execute(i, maximum):
            segment = xray_recorder.current_subsegment()
            segment.put_annotation('before', True)
            await asyncio.sleep(maximum - 1)
            segment.put_annotation('after', True)

        if gather:
            with xray_recorder.capture_async('gather'):
                await asyncio.gather(*[execute(i, loop_count) for i in range(loop_count)])
        else:
            with xray_recorder.capture_async('loop'):
                for i in range(loop_count):
                    await execute(i, loop_count)


def test_gather(loop_count=10):
    try:
        now = datetime.utcnow()
        asyncio.get_event_loop().run_until_complete(perform_test(gather=True, loop_count=loop_count))
        logger.info('GATHER: PASS %s' % str(datetime.utcnow() - now))
    except:
        logger.exception('GATHER: FAIL')


def test_loop(loop_count=10):
    try:
        now = datetime.utcnow()
        asyncio.get_event_loop().run_until_complete(perform_test(gather=False, loop_count=loop_count))
        logger.info('LOOP: PASS %s' % str(datetime.utcnow() - now))
    except:
        logger.exception('LOOP: FAIL')


logger.info('==========================================')
test_gather(5)
logger.info('==========================================')
logger.info('==========================================')
test_loop(5)
logger.info('==========================================')

The problem is this:

1) -> start Segment
2) -> Run asyncio.gather([range(5)])
3) -> Handler 1 starts
4) -> Create Subsegment 1, grab current segment as parent (which is Segment)
5) -> Perform an IO operation (go to sleep)
6) -> Handler 2 starts
7) -> Create Subsegment 2, grab current segment as parent (which is Subsegment 1)
8) -> Perform an IO operation (go to sleep)
9) -> Handler 1 IO operation completes
10) -> Handler 1 closes the current segment (which is Subsegment 2)
11) -> Handler 2 IO operation completes
12 -> Handler 2 closes the current segment (which is Subsegment 1)

As you can see, steps 7, 10 and 12 are all performing bad operations. If the handlers would try to put an annotation to their segment, handler2 would discover his segment to have been closed already.

The XRay code assumes that each handler passed to asyncio.gather() are completing in the exact reversed order as provided to asyncio.gather call. For which there is no such guarantee, stuff is async, so one handler might be much faster then another.

I guess what needs to happen is:

  • Patch asyncio.gather(), to ensure that each of the handlers which is passed get an environment where the "current segment" points at the correct segment (the segment which was current at the time of calling asyncio.gather().
  • Anywhere in the code where the stack of segments is conspired a lifo queue, should be updated, to ensure they always interact with correct segment.

@chanchiem
Copy link
Contributor

chanchiem commented Oct 17, 2019

Thanks for confirming that this is not related to aiobotocore. In our async tasks, we store the information context inside task local storage here. It makes me wonder why this mechanism breaks inside the asyncio.gather() call. By design, each context should be confined within a particular task that is running the code.

It seems asyncio.gather() might spin off its own tasks, which might be why we're seeing nested subsegments instead. I think we have a good starting point to work with here.

For reference, here is where asyncio.gather() is implemented.

@dangtrinhnt-mmt
Copy link

Hey,

Sorry for the delayed response. We're still investigating this internally and are really breaking it down so that we can isolate whether the issue is truly coming from the way we are patching aiobotocore or if it's a bug with how we propagate the context information using TaskLocalStorage.

@dangtrinhnt-mmt You mentioned that you are experiencing the problem with or without patching aiobotocore. Is there a way you can provide a minimal reproduction code sample to us?

oh sorry, I miss your comment. It's a little bit complicated for me to provide the minimal reproduction code sample because the error comes from many of our services in the microservice stack. Let me see if I can do something. Thanks for looking into this.

@jmgamboa
Copy link

jmgamboa commented Jan 9, 2020

Hi! Having a similar issue however not with asyncio but the segmentio library which uses a background thread to send http requests. I am doing patch_all and essentially my background threads are failing and not performing the their task.

@awssandra awssandra added the bug label Jan 10, 2020
@cejohnson-bw
Copy link

I'm seeing this using gevent as well, with the regular, non-aio versions of boto3 and botocore. Since gevent doesn't use asyncio (it's a concurrency alternative to asyncio that uses greenlets for anyone unfamiliar with it), I would guess it's running into the issue @IvDoorn mentioned where segments are assumed to be stacks, but I can't say for sure.

@srprash
Copy link
Contributor

srprash commented Mar 3, 2020

Hi all,
Apologies for the delay on diving into this issue. I'll try to get this prioritized on our backlog. Thank you for being patient.

@xjia1
Copy link

xjia1 commented Mar 7, 2020

#203 might be relevant. Please see if that's an easier repro.

@jonathan-kosgei
Copy link

I'm running into this as well, any use of asyncio.gather causes the same error.

@srprash
Copy link
Contributor

srprash commented May 17, 2020

@stfairy Thanks for the simpler code. Taking a look at it, it doesn't use asyncio.gather to run the coroutine concurrently. After removing this line and changing these lines to use asyncio.gather(), I was able to run the code without any errors. However, there were two issues. First, as mentioned by @willarmiros in the last line of the comment that removing AsyncContext may cause issues in more complex use cases. Second, the subsegments appear nested within each other (since they are opened nearly concurrently and before previous one closes) instead of being at a same level.

The issue with @IvDoorn 's repro code is that it holds a reference of subsegment to put annotation after the sleep. When the subsegments are closed out of order, this reference is messed up since now the subsegment has been closed by some other coroutine execution.

This is clearly an issue and we are working on finding an efficient way to address it.

@srprash
Copy link
Contributor

srprash commented May 18, 2020

Hi @jonathan-kosgei
Can you provide a simple repro of your code? We are trying to understand different use cases for this issue. Thanks!

@jonathan-kosgei
Copy link

jonathan-kosgei commented May 18, 2020

@srprash Thanks, here is a simplified version of my code;

lambda_handler.py

import asyncio
from xray import *

@xray_profile()
def lambda_handler(event, context):
  asyncio.run(worker())
  return "bleh"

async def worker():
  asyncio.gather(asyncio.sleep(3), asyncio.sleep(2), asyncio.sleep(1))

xray.py

import os

xray = os.environ.get("XRAY")

# Xray Monitoring Block
def is_lambda():
    if os.environ.get('AWS_REGION') and not os.environ.get('CODEBUILD_AGENT_NAME'):
        return True
    return False

if is_lambda() and xray:
    from aws_xray_sdk.core import xray_recorder, patch_all
    patch_all()

class xray_profile(object):
    def __call__(self, func):
        if not is_lambda() or not xray:
            # Return the function unchanged, not decorated.
            return func
        return xray_recorder.capture(func.__name__)(func)

class xray_profile_async(object):
    def __call__(self, func):
        if not is_lambda() or not xray:
            # Return the function unchanged, not decorated.
            return func
        return xray_recorder.capture_async(func.__name__)(func)

@cbuschka
Copy link

Hello, any progress on the issue? Would you mind sharing information and thoughts so others could join thinking :)

@willarmiros
Copy link
Contributor

Hi @cbuschka,

Sorry we haven't been able to figure this one out yet. I spent quite a bit of time trying to fix this issue myself to no avail. I think that the root cause might lie with us using a stack here stored in each task (when using AsyncContext) to record all in-flight (sub)segments. There might be a race condition with retrieving entities from the stack that causes this issue.

Unfortunately I spent a bit too much trying to fix our current setup instead of trying to replace it altogether. It might be worth looking into using the contextvars object in Python to store the stack instead of just storing it as a field in the task. According to the python docs, contextvars is the right tool to use for async context use cases.

@cbuschka
Copy link

cbuschka commented Sep 30, 2020

Thank you for sharing your findings.

I agree that contextvars is the way to go. I think we have to check backward compatibility. IMHO the problem is caused by the add/remove order mentioned by IvDoorn in comment #164 (comment). I hope I can find some time on weekend to investigate the rewrite of AsyncContext.

I'll come back to you when I have something to share.

@cbuschka
Copy link

cbuschka commented Nov 21, 2020

I have started working on it but unfortunately I am too busy currently to make any real progress. So I wanted to share my results. You can find my results in repo https://github.com/cbuschka/aws-async-xray-issue.

IMHO the general strategy for handling the segments in async context should be:

If someone is interested in working on it and has the time, feel free to reuse the code if you consider it to be useful. Possibly I have time to work on it in the last weeks of the year. Feel free to contact me if you have questions regarding the code.

@Padwicker
Copy link

Hey, any progress on this issue?

@srprash
Copy link
Contributor

srprash commented Jan 20, 2021

Hi @Padwicker
We still have this task in our backlog. Will try to prioritize this as soon as possible. We would appreciate if you want to contribute based on the investigation in this issue.
Thanks!

@cbuschka
Copy link

Hi, I am sorry for not responding in a timely fashion. I am currently very busy in my job and had no time to dig into it, yet. So do not expect progress on this from my side in the near future.

@bigswede74
Copy link

Hello, any updates on this issue?

@abivolmv
Copy link

abivolmv commented Jul 8, 2021

I tried with asyncio.wait() and got the same error.

@jack-burridge-cfh
Copy link

Surely something like this would work.

from contextvars import ContextVar

entities = ContextVar("entities")
segment = ContextVar("segment")


class ContextProperty:
    def __init__(self, context_var):
        self.context_var = context_var

    def __set__(self, instance, value):
        self.context_var.set(value)

    def __get__(self, instance, owner):
        return self.context_var.get()


class ContextVarStorage(object):
    entities = ContextProperty(entities)
    segment = ContextProperty(segment)

    def clear(self):
        entities.set(None)
        segment.set(None)

Issue it's not generic like the other solutions, threading.local() and the custom TaskLocalStorage. If new attributes are added it may be difficult to test.

@garyd203
Copy link
Contributor

garyd203 commented Dec 23, 2021

I encountered OP's original error, and solved it by patching AsyncContext. It's in a library of other X-Ray patches and extensions called xraysink. See that library's documentation for how to patch asyncio tasks.

Detailed Explanation

I agree with @IvDoorn 's analysis that this problem is to do with asyncio usage rather than any specific library, and addiitonally displays as a symptom of endlessly nested subsegments (rather than parallel subsegments). Specifically, every asyncio Task will used a shared segment stack (see task_factory) which means subsegments will be added and removed by parallel processing, whereas the segment stack is designed to be used in sequential processing only.

My solution was to propagate a copy of the segment stack to child asyncio Task's. This means that any subsegments created by the Task will have the correct parent. It was over a year ago, so i don't remember exactly what happens to the unused top part of the segment stack in the child Task, but IIRC it gets ignored and just works out in the wash

aws-xray-sdk maintainers (@willarmiros @srprash): The xraysink library is Apache licensed, so feel free to migrate any of that code into your main library.

@willarmiros
Copy link
Contributor

Hi @garyd203,

Thank you for developing this library & proposing your solution. We will add it to our backlog to investigate using your AsyncContext solution in the SDK here. Alternatively, if you were able to make a pull request we would be happy to test it with the initial problematic sample app to verify it resolves the bug.

@jall
Copy link

jall commented Jun 28, 2023

@NathanielRN I don't think this PR should be closed

I have updated to a version that includes #340 and am still seeing the original issue

aws_xray_sdk.core.exceptions.exceptions.AlreadyEndedException: Already ended segment and subsegment cannot be modified.

My package versions are:

aioboto3==11.2.0
aiobotocore==2.5.0
aws-xray-sdk==2.12.0
aws-lambda-powertools==2.18.0
boto3==1.26.76
botocore==1.29.76

I'm planning to try @garyd203's library next to see if it resolves

@bkblanton
Copy link

I'm experiencing the same issue and I have the same versions as @jall.

@carolabadeer
Copy link
Contributor

Reopening this issue as there are multiple reports of it persisting even after PR #340

@carolabadeer carolabadeer reopened this Aug 23, 2023
@carolabadeer
Copy link
Contributor

@jall can you please share your findings using @garyd203's patch library and if it resolves the issue? If it does, @garyd203 are you open to creating a PR that we can review to fix the issue at the root?

@jall
Copy link

jall commented Aug 24, 2023

I did try & hit this

cannot find the current segment/subsegment, please make sure you have a segment open

which I couldn't resolve with some brief debugging. IIRC it seemed like the patch library could've been out of date?

I ended up removing aiboto from the patched libraries until this is fixed:

from aws_lambda_powertools import Tracer
from aws_xray_sdk.core.patcher import SUPPORTED_MODULES

tracer = Tracer(service=service_name, auto_patch=False)
modules_to_patch = [
    m for m in SUPPORTED_MODULES if m not in {"aioboto3", "aiobotocore"}
]
tracer.provider.patch(modules_to_patch, raise_errors=False)

@garyd203
Copy link
Contributor

garyd203 commented Sep 1, 2023

@jall do you happen to have some code that reproduces your error? Context propagation is dependent on the actual calling code, so if it fails (which is what cannot find the current segment/subsegment generally means) then that might help us track down what the specific problem is.

If you don't know where the error comes from, IIRC you can change the config on your dev environment (with something like xray_recorder.configure(context_missing="RUNTIME_ERROR")) to raise an exception, and you can hopefully follow the traceback to figure it out.

@garyd203
Copy link
Contributor

garyd203 commented Sep 1, 2023

@carolabadeer note that there's multiple similar-looking issues here.

  1. The OP had Already ended segment and subsegment cannot be modified in conjunction with parallel subsegments being incorrectly nested in a strange order. This was caused by the buggy AsyncContext implementation in aws_xray_sdk v2.9 and earlier. The fix in aws_xray_sdk v2.10 (see Bug Fix: Defensively copy context entities #340) is substantially the same as the fix in xraysink, so this problem should no longer occur.
  2. A lot of async web frameworks have support for transparently executing non-async functions as part of a request handler. I believe the context will not be propagated into the non-async code, which will result in a cannot find the current segment/subsegment error like jall encountered. A solution for this will involve integrating with each individual framework's context-propagation system.
  3. A background task started outside of a request handler needs to be explicitly instrumented with something like xray_task_async. Otherwise any outgoing requests from that background task will also cause a cannot find the current segment/subsegment error.

@mdnorman
Copy link

mdnorman commented Sep 11, 2023

I'm still getting the original error as well when using aws-xray-sdk==2.12.0, and when testing the xraysink, it gave the same cannot find the current segment/subsegment error, along with a warning: No segment found, cannot begin subsegment ecs. The new errors may be because I'm using an older version of psycopg that doesn't support async operations, and I'm calling it from an async function.

In any case, I don't think that this error should throw an exception out to the caller. Logging shouldn't ever cause my system to break, and xray is effectively a logger. So, similar to the context_missing config, I would expect that the original error should just log and move on and let me do something with the logs (or not).

If it helps, I'm using strawberry-graphql==0.193.1 which makes heavy use of asyncio.

@garyd203
Copy link
Contributor

@mdnorman can you confirm my suspicion that your strawberry resolver functions are not async (as a consequence of using non-async psycopg)?

@mdnorman
Copy link

My resolver functions are asynchronous, because they do other things besides the Postgres calls

@garyd203
Copy link
Contributor

Fair enough. How do you call the non-async postgres functions?

@mdnorman
Copy link

Not at my computer right now but I don't do anything special. It's just a call to most-likely already open connection to make a query. I use a connections pool so it's not exactly straightforward but I'm also not explicitly passing any additional context down to it.

@garyd203
Copy link
Contributor

oh, i mean how do you execute the non-async function from within an async function (like the resolver). Do you call it directly, or do you put it in a thread-pool, or some other approach?

No rush to respond, I'm just trying to figure out how to support your use case. Ta!

@mdnorman
Copy link

The async function is in a class, and the sync function is in the same class, so it's something like this:

class  MyClass:
    async def do_some_async_work(self, params: list[str]):
        value_ids = await asyncio.gather(*[self.get_a_value(param) for param in params])
        return self.get_some_results(value_ids)

    async def get_a_value(self, param: str) -> str:
        response = await call_some_service(param)
        return response["id"]

    def get_some_results(self, value_ids: list[str]) -> :
        with self.get_db_connection() as conn:
            results = conn.execute("select something from somewhere where value_id in %s", tuple(value_ids)).fetchall()

        return [result[0] for result in results]

The conn.execute is synchronous (as is the get_db_connection).

No thread-pools for this right now.

@garyd203
Copy link
Contributor

Thanks @mdnorman , it's helpful to know how people are using xray.

In your case, I suspect the problem is that the instrumented sync code for psycopg doesn't know where to find the information on the current xray trace, since sync and async code store the trace data differently. I don't know of any solution for this yet - "someone" will have to code something up :-)

Cheers,
gary

@georgebv
Copy link

I'm having this error when patching httpx which sends requests ayncrhonously inside a task group.

My solution was to patch the httpx patcher (pun intended) - you can see that error happens after response. For some reason subsegment gets closed before context manager is exited.

from aws_xray_sdk.core import patch, xray_recorder
from aws_xray_sdk.core.exceptions.exceptions import AlreadyEndedException
from aws_xray_sdk.core.models import http
from aws_xray_sdk.ext.httpx.patch import AsyncInstrumentedTransport
from aws_xray_sdk.ext.util import get_hostname, inject_trace_header


def configure_xray():
    xray_recorder.configure(
        context_missing="IGNORE_ERROR",
    )

    async def handle_async_request(
        self: AsyncInstrumentedTransport,
        request: httpx.Request,
    ) -> httpx.Response:
        async with xray_recorder.in_subsegment_async(
            get_hostname(str(request.url)), namespace="remote"
        ) as subsegment:
            if subsegment is not None:
                subsegment.put_http_meta(http.METHOD, request.method)
                subsegment.put_http_meta(
                    http.URL,
                    str(
                        request.url.copy_with(password=None, query=None, fragment=None)
                    ),
                )
                inject_trace_header(request.headers, subsegment)

            response = await self._wrapped_transport.handle_async_request(request)
            if subsegment is not None:
                # Added try-except here
                try:
                    subsegment.put_http_meta(http.STATUS, response.status_code)
                except AlreadyEndedException:
                    pass
            return response

    AsyncInstrumentedTransport.handle_async_request = handle_async_request

    patch(
        [
            "aioboto3",
            "aiobotocore",
            "boto3",
            "botocore",
            "httpx",
        ],
        raise_errors=False,
    )

@ryancausey
Copy link

I'm getting this while using aioboto3 with aws-xray-sdk = "2.13.0". Any progress on a fix?

@sevetseh28
Copy link

sevetseh28 commented Jul 21, 2024

Same, I'm getting this same error when using asyncio.gather to perform concurrent calls to different AWS services. It's the whole point of looking traces to see if our code is efficient and making concurrent calls, so I'm a bit surprised this hasn't gotten the attention it deserves for five years already!

I'm using AWS Lambda Powertools + aioboto3.

Looking forward to a much needed fix soon,
thanks! 🙏

@ShedPlant
Copy link

ShedPlant commented Dec 13, 2024

I have seen this when trying to enable X-Ray for a lambda webapp using aws-powertools. It fails to makes multiple simultaneous aiobotocore calls in gather.

aws-lambda-powertools 3.3.0                           Powertools for AWS Lambda (Python) is a develo...
aws-xray-sdk          2.14.0                          The AWS X-Ray SDK for Python (the SDK) enables...

Snippets to demonstrate (incomplete, not runnable as is):

async def get_ssm_param(ssm_client, param_name: str):
    response = await ssm_client.get_parameter(
        Name=param_name,
        WithDecryption=True,
    )

    return response["Parameter"]["Value"]

async def create_web_app() -> App:
    app = App(routes=create_routes())
    aio_session = get_session()
    async with aio_session.create_client("ssm") as ssm:
        username, password = await gather(
            get_ssm_param(ssm, DB_USERNAME),
            get_ssm_param(ssm, DB_PASSWORD),
        )
@metrics.log_metrics(capture_cold_start_metric=True)
@tracer.capture_lambda_handler()
@logger.inject_lambda_context(
    correlation_id_path=correlation_paths.API_GATEWAY_REST,
    log_event=True,
    clear_state=True,
)
def main(event, context):
    global app
    global mangum

    if mangum is ...:
        app = loop.run_until_complete(create_web_app())
        mangum = Mangum(app, lifespan="off")

    return mangum(event, context)
  File "/var/task/my_app/lambda_handler.py", line 35, in main
    app = loop.run_until_complete(create_web_app())
  File "/var/lang/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
    return future.result()
  File "/var/task/my_app/web_app/create_web_app.py", line 57, in create_web_app
    username, password = await gather(
  File "/var/task/my_app/web_app/create_web_app.py", line 86, in get_ssm_param
    response = await ssm_client.get_parameter(
  File "/opt/python/lib/python3.9/site-packages/aws_xray_sdk/ext/aiobotocore/patch.py", line 32, in _xray_traced_aiobotocore
    result = await xray_recorder.record_subsegment_async(
  File "/opt/python/lib/python3.9/site-packages/aws_xray_sdk/core/async_recorder.py", line 93, in record_subsegment_async
    meta_processor(
  File "/opt/python/lib/python3.9/site-packages/aws_xray_sdk/ext/boto_utils.py", line 53, in aws_meta_processor
    subsegment.put_http_meta(http.STATUS,
  File "/opt/python/lib/python3.9/site-packages/aws_xray_sdk/core/models/entity.py", line 110, in put_http_meta
    self._check_ended()
  File "/opt/python/lib/python3.9/site-packages/aws_xray_sdk/core/models/entity.py", line 306, in _check_ended
    raise AlreadyEndedException("Already ended segment and subsegment cannot be modified.")

I agree with above posters that:

  • using gather should work
  • if xray crashes it should not interrupt the main execution

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.