diff --git a/.github/workflows/integration_test.yml b/.github/workflows/integration_test.yml index 29763320..9a00b695 100644 --- a/.github/workflows/integration_test.yml +++ b/.github/workflows/integration_test.yml @@ -1,5 +1,18 @@ +--- name: Integration tests +permissions: + actions: none + checks: none + contents: read + deployments: none + issues: none + packages: none + pages: none + pull-requests: none + repository-projects: none + security-events: none + on: [pull_request] jobs: @@ -9,10 +22,13 @@ jobs: strategy: max-parallel: 4 matrix: - python-version: ['3.10', '3.11'] + python-version: ["3.10", "3.11"] steps: - - name: Checkout source - uses: actions/checkout@v3 + - name: Checkout + uses: actions/checkout@v4 + with: + ref: ${{ github.event.pull_request.head.sha }} + fetch-depth: 0 - name: Run integration tests run: | diff --git a/aws/logs_monitoring/caching/base_tags_cache.py b/aws/logs_monitoring/caching/base_tags_cache.py index e4d75bb1..1818cd0e 100644 --- a/aws/logs_monitoring/caching/base_tags_cache.py +++ b/aws/logs_monitoring/caching/base_tags_cache.py @@ -1,16 +1,18 @@ -import os -import logging import json -from time import time +import logging +import os from random import randint +from time import time + import boto3 from botocore.exceptions import ClientError + +from caching.common import convert_last_modified_time, get_last_modified_time from settings import ( DD_S3_BUCKET_NAME, - DD_TAGS_CACHE_TTL_SECONDS, DD_S3_CACHE_LOCK_TTL_SECONDS, + DD_TAGS_CACHE_TTL_SECONDS, ) -from caching.common import get_last_modified_time from telemetry import send_forwarder_internal_metrics JITTER_MIN = 1 @@ -61,23 +63,32 @@ def write_cache_to_s3(self, data): self.logger.debug("Unable to write new cache to S3", exc_info=True) def acquire_s3_cache_lock(self): + key = self.get_cache_lock_with_prefix() + """Acquire cache lock""" - cache_lock_object = self.s3_client.Object( - DD_S3_BUCKET_NAME, self.get_cache_lock_with_prefix() - ) try: - file_content = cache_lock_object.get() + response = self.s3_client.list_objects_v2( + Bucket=DD_S3_BUCKET_NAME, Prefix=key + ) + for content in response.get("Contents", []): + if content["Key"] != key: + continue + + # check lock file expiration + last_modified_unix_time = convert_last_modified_time( + content["LastModified"] + ) + if last_modified_unix_time + DD_S3_CACHE_LOCK_TTL_SECONDS >= time(): + return False - # check lock file expiration - last_modified_unix_time = get_last_modified_time(file_content) - if last_modified_unix_time + DD_S3_CACHE_LOCK_TTL_SECONDS >= time(): - return False except Exception: self.logger.debug("Unable to get cache lock file") # lock file doesn't exist, create file to acquire lock try: - cache_lock_object.put(Body=(bytes("lock".encode("UTF-8")))) + self.s3_client.Object(DD_S3_BUCKET_NAME, key).put( + Body=(bytes("lock".encode("UTF-8"))) + ) send_forwarder_internal_metrics("s3_cache_lock_acquired") self.logger.debug("S3 cache lock acquired") except ClientError: diff --git a/aws/logs_monitoring/caching/cloudwatch_log_group_cache.py b/aws/logs_monitoring/caching/cloudwatch_log_group_cache.py index f14fc28b..9ad12cec 100644 --- a/aws/logs_monitoring/caching/cloudwatch_log_group_cache.py +++ b/aws/logs_monitoring/caching/cloudwatch_log_group_cache.py @@ -6,6 +6,7 @@ import boto3 from botocore.config import Config + from caching.common import sanitize_aws_tag_string from settings import ( DD_S3_BUCKET_NAME, diff --git a/aws/logs_monitoring/caching/common.py b/aws/logs_monitoring/caching/common.py index 7d7db881..26999a40 100644 --- a/aws/logs_monitoring/caching/common.py +++ b/aws/logs_monitoring/caching/common.py @@ -1,6 +1,6 @@ -import os import datetime import logging +import os import re from collections import defaultdict @@ -19,8 +19,11 @@ def get_last_modified_time(s3_file): last_modified_date = datetime.datetime.strptime( last_modified_str, "%a, %d %b %Y %H:%M:%S %Z" ) - last_modified_unix_time = int(last_modified_date.strftime("%s")) - return last_modified_unix_time + return convert_last_modified_time(last_modified_date) + + +def convert_last_modified_time(last_modified_time): + return int(last_modified_time.strftime("%s")) def parse_get_resources_response_for_tags_by_arn(get_resources_page): diff --git a/aws/logs_monitoring/lambda_function.py b/aws/logs_monitoring/lambda_function.py index b27f5c80..26ebeddd 100644 --- a/aws/logs_monitoring/lambda_function.py +++ b/aws/logs_monitoring/lambda_function.py @@ -4,29 +4,30 @@ # Copyright 2021 Datadog, Inc. import json +import logging import os +from hashlib import sha1 + import boto3 -import logging import requests -from hashlib import sha1 -from datadog_lambda.wrapper import datadog_lambda_wrapper from datadog import api -from enhanced_lambda_metrics import parse_and_submit_enhanced_metrics -from steps.parsing import parse -from steps.enrichment import enrich -from steps.transformation import transform -from steps.splitting import split +from datadog_lambda.wrapper import datadog_lambda_wrapper + from caching.cache_layer import CacheLayer +from enhanced_lambda_metrics import parse_and_submit_enhanced_metrics from forwarder import Forwarder from settings import ( + DD_ADDITIONAL_TARGET_LAMBDAS, DD_API_KEY, - DD_SKIP_SSL_VALIDATION, DD_API_URL, DD_FORWARDER_VERSION, - DD_ADDITIONAL_TARGET_LAMBDAS, DD_RETRY_KEYWORD, + DD_SKIP_SSL_VALIDATION, ) - +from steps.enrichment import enrich +from steps.parsing import parse +from steps.splitting import split +from steps.transformation import transform logger = logging.getLogger() logger.setLevel(logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper())) diff --git a/aws/logs_monitoring/template.yaml b/aws/logs_monitoring/template.yaml index 4a752966..779ae4b5 100644 --- a/aws/logs_monitoring/template.yaml +++ b/aws/logs_monitoring/template.yaml @@ -606,8 +606,9 @@ Resources: - !Sub "${DdApiKeySecretArn}*" Effect: Allow # Fetch Lambda resource tags for data enrichment + # Fetch Step Functions resource tags for data enrichment - !If - - SetDdFetchLambdaTags + - ShouldDdFetchTags - Action: - tag:GetResources Resource: "*" @@ -621,14 +622,6 @@ Resources: Resource: "*" Effect: Allow - !Ref AWS::NoValue - # Fetch Step Functions resource tags for data enrichment - - !If - - SetDdFetchStepFunctionsTags - - Action: - - tag:GetResources - Resource: "*" - Effect: Allow - - !Ref AWS::NoValue # Required for Lambda deployed in VPC - !If - UseVPC