Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error using tracer with aioboto3 #10

Closed
marcioemiranda opened this issue Mar 24, 2020 · 7 comments
Closed

Error using tracer with aioboto3 #10

marcioemiranda opened this issue Mar 24, 2020 · 7 comments
Assignees

Comments

@marcioemiranda
Copy link

Hello Heitor,

I have a lambda that uses the tracer from aws_power_lambda_tools and the asyncio module. Everything runs fine with boto3, but if I use aioboto3 I get the following error:

Traceback (most recent call last):
  File "/opt/python/aws_lambda_powertools/tracing/tracer.py", line 152, in decorate
    raise err
  File "/opt/python/aws_lambda_powertools/tracing/tracer.py", line 144, in decorate
    response = lambda_handler(event, context)
  File "/opt/python/aws_lambda_powertools/logging/logger.py", line 157, in decorate
    return lambda_handler(event, context)
  File "/var/task/onExecuteCampaign/lambda_function.py", line 228, in lambda_handler
    raise e
  File "/var/task/onExecuteCampaign/lambda_function.py", line 225, in lambda_handler
    loop.run_until_complete(main(event, context))
  File "/var/lang/lib/python3.6/asyncio/base_events.py", line 488, in run_until_complete
    return future.result()
  File "/var/task/onExecuteCampaign/lambda_function.py", line 200, in main
    raise e
  File "/var/task/onExecuteCampaign/lambda_function.py", line 196, in main
    count_list = await asyncio.gather(*(process_segment(campaignId, segment) for segment in segments))
  File "/var/task/onExecuteCampaign/lambda_function.py", line 161, in process_segment
    raise e
  File "/var/task/onExecuteCampaign/lambda_function.py", line 158, in process_segment
    count_list  = await asyncio.gather(*(process_segment_partition(campaignId, segment,partition) for partition in range(num_partitions)))
  File "/var/task/onExecuteCampaign/lambda_function.py", line 140, in process_segment_partition
    raise e
  File "/var/task/onExecuteCampaign/lambda_function.py", line 119, in process_segment_partition
    Limit = pageSize
  File "/opt/python/aioboto3/resources.py", line 299, in do_action
    response = await action.async_call(self, *args, **kwargs)
  File "/opt/python/aioboto3/resources.py", line 67, in async_call
    response = await getattr(parent.meta.client, operation_name)(**params)
  File "/opt/python/aws_xray_sdk/ext/aiobotocore/patch.py", line 36, in _xray_traced_aiobotocore
    meta_processor=aws_meta_processor,
  File "/opt/python/aws_xray_sdk/core/async_recorder.py", line 101, in record_subsegment_async
    stack=stack,
  File "/opt/python/aws_xray_sdk/ext/boto_utils.py", line 57, in aws_meta_processor
    resp_meta.get('HTTPStatusCode'))
  File "/opt/python/aws_xray_sdk/core/models/entity.py", line 102, in put_http_meta
    self._check_ended()
  File "/opt/python/aws_xray_sdk/core/models/entity.py", line 283, in _check_ended
    raise AlreadyEndedException("Already ended segment and subsegment cannot be modified.")
aws_xray_sdk.core.exceptions.exceptions.AlreadyEndedException: Already ended segment and subsegment cannot be modified.

Here is the relevant snippet of code with aioboto3 (with python 3.6):

@tracer.capture_method
async def process_segment_partition(campaignId, segment, partition):
  ...
  async with aioboto3.resource('dynamodb') as dynamo_resource:

            # async table resource
            async_table = dynamo_resource.Table(environ['TABLE_NAME'])
            # query first page
            response = await async_table.query(
                KeyConditionExpression=Key('pk').eq(segmentPartitionId),
                Limit = pageSize
            )
  ...
  

async def process_segment(campaignId, segment):
    ...   
    await asyncio.gather(*(process_segment_partition(campaignId, segment,partition) for partition in range(num_partitions)))
    ...

# main loop implementation
async def main(event, context):
  ...
  await asyncio.gather(*(process_segment(campaignId, segment) for segment in segments))
  ...

@tracer.capture_lambda_handler
@logger_inject_lambda_context
def lambda_handler(event, context):
    
    try:
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main(event, context))
    except Exception as e:
        logger.exception(e)
        raise e
    finally:    
        loop.close()  

The code above raises the error mentioned.

If I just remove the aioboto3 part in the method process_segment_partition, it runs fine:

Using boto3:

@tracer.capture_method
async def process_segment_partition(campaignId, segment, partition):
    
    ...    
    response = table.query(
        KeyConditionExpression=Key('pk').eq(segmentPartitionId),
        Limit = pageSize
    )
    ...

Any ideas?

@heitorlessa
Copy link
Contributor

I haven't added support for async functions as I hadn't seen many in the wild - That proves me wrong :)

https://docs.aws.amazon.com/xray/latest/devguide/xray-sdk-python-patching.html#xray-sdk-python-patching-async

For this to work, we'd need A) a signal to know the decorated function is async or B) Detect whether there's an event loop to use Async Context.

I haven't done any proper research on this yet but will create a board with features to ensure this is done before GA

@heitorlessa
Copy link
Contributor

We're nearly there for full support for async - #29

Currently waiting for code review, and hopefully this will be available in 0.9.0. This does not add support for Lambda handler being async as that bring all sorts of issues with that. I've also spotted an edge case with running concurrent async calls (asyncio.gather) with X-Ray that's being investigated by the team

@marcioemiranda
Copy link
Author

marcioemiranda commented May 11, 2020 via email

@heitorlessa
Copy link
Contributor

Resolving it as this is now live in 0.9.1 release - One caveat though, concurrent async calls via async.gather() cannot be traced at the same time due to a known bug in X-Ray. You can either trace only one of the functions (capture_method), or use the new escape hatch mechanism following the same I added to the docs.

Also, aiohttp is also supported now ;)

@sevetseh28
Copy link

@heitorlessa I've tried using the escape hatch as described in the docs here https://docs.powertools.aws.dev/lambda/python/1.20.2/core/tracer/ .

However im still getting the same exception when using asyncio.gather . How did you create the event loop?

@heitorlessa
Copy link
Contributor

heitorlessa commented Jul 22, 2024 via email

@sevetseh28
Copy link

sevetseh28 commented Jul 22, 2024

hey @heitorlessa thanks for your quick reply. I didn't use any decorator on the functions called within gather, still the issue remained. I did exactly as said in the docs: using the context manager to manually instrument the trace.

The versions I'm using are the latest one as of now:

  • aws-lambda-powertools==2.41.0
  • aioboto3==13.1.1
  • aws-xray-sdk==2.14.0

I can provide more versions of other deps if needed.

My snippet showing the important parts of the code are these (I had to adapt it because I can't show the source code):

Main app module initialisation:

# Resolver
app = APIGatewayRestResolver(enable_validation=True, cors=cors_config)

# These are defined here so that they are reused by subsequent calls
app.dynamodb_client = None 
aioboto3_session = aioboto3.Session()


async def get_dynamodb_client():
    async with tracer.provider.in_subsegment_async("## get_dynamodb_client"):
        if app.dynamodb_client is None:
            app.dynamodb_client = await aioboto3_session.resource(
                "dynamodb"
            ).__aenter__()
        return app.dynamodb_client

def run_in_asyncio_event_loop(func):
    """
    This decorator is used to run a function in the asyncio event loop.
    This allows the Lambda function to run async code and reuse the same event loop between
    executions, as well as resources defined in the global scope such as boto3 clients defined above.
    """

    @wraps(func)
    def async_to_sync_wrapper(*args, **kwargs):
        loop = asyncio.get_event_loop()
        return loop.run_until_complete(func(*args, **kwargs))

    return async_to_sync_wrapper

These is an example use of querying DynamoDB using aioboto3:

async def query_url(
    id: str, table_name: str, dynamodb_client
) -> str | None:
    async with tracer.provider.in_subsegment_async("## query_url"):
        table = await dynamodb_client.Table(table_name)
        response = await table.query(
            KeyConditionExpression=Key("Identifier").eq(id),
        )
        if response["Items"] and "URL" in response["Items"][0]:
            return response["Items"][0]["URL"]

        return None

Now an example endpoint using asyncio.gather:

@app.get("/concurrentTest")
@run_in_asyncio_event_loop # I tried moving this decorator up and down - same behaviour
@tracer.capture_method # I tried with and without this decorator here - same behaviour
async def concurrent_test():
    dynamo_db_client = await get_dynamodb_client()

    dynamo_response_1, dynamo_response_2 = await asyncio.gather(
        query_url(123, "testTable", dynamo_db_client), query_url(123, "testTable", dynamo_db_client)
    ) 

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Development

No branches or pull requests

3 participants