Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(appsync): Add new service AppSync #5589

Merged
merged 3 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
4 changes: 4 additions & 0 deletions prowler/providers/aws/services/appsync/appsync_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from prowler.providers.aws.services.appsync.appsync_service import AppSync
from prowler.providers.common.provider import Provider

appsync_client = AppSync(Provider.get_global_provider())
59 changes: 59 additions & 0 deletions prowler/providers/aws/services/appsync/appsync_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from typing import Optional

from pydantic import BaseModel

from prowler.lib.logger import logger
from prowler.lib.scan_filters.scan_filters import is_resource_filtered
from prowler.providers.aws.lib.service.service import AWSService


class AppSync(AWSService):
def __init__(self, provider):
# Call AWSService's __init__
super().__init__(__class__.__name__, provider)
self.graphql_apis = {}
self.__threading_call__(self._list_graphql_apis)

def _list_graphql_apis(self, regional_client):
logger.info("AppSync - Describing APIs...")
try:
list_graphql_apis_paginator = regional_client.get_paginator(
"list_graphql_apis"
)
for page in list_graphql_apis_paginator.paginate():
for api in page["graphqlApis"]:
api_arn = api["arn"]
if not self.audit_resources or (
is_resource_filtered(
api_arn,
self.audit_resources,
)
):
self.graphql_apis[api_arn] = GraphqlApi(
id=api["apiId"],
name=api["name"],
arn=api_arn,
region=regional_client.region,
type=api.get("apiType", "GRAPHQL"),
field_log_level=api.get("logConfig", {}).get(
"fieldLogLevel", ""
),
authentication_type=api.get("authenticationType", ""),
tags=[api.get("tags", {})],
)

except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)


class GraphqlApi(BaseModel):
id: str
name: str
arn: str
region: str
type: str
field_log_level: str
authentication_type: str
tags: Optional[list] = []
66 changes: 66 additions & 0 deletions tests/providers/aws/services/appsync/appsync_service_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from boto3 import client
from mock import patch
from moto import mock_aws

from prowler.providers.aws.services.appsync.appsync_service import AppSync
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)


def mock_generate_regional_clients(provider, service):
regional_client = provider._session.current_session.client(
service, region_name=AWS_REGION_US_EAST_1
)
regional_client.region = AWS_REGION_US_EAST_1
return {AWS_REGION_US_EAST_1: regional_client}


@patch(
"prowler.providers.aws.aws_provider.AwsProvider.generate_regional_clients",
new=mock_generate_regional_clients,
)
class Test_AppSync_Service:
# Test AppSync Service
def test_service(self):
aws_provider = set_mocked_aws_provider()
appsync = AppSync(aws_provider)
assert appsync.service == "appsync"

# Test AppSync Client
def test_client(self):
aws_provider = set_mocked_aws_provider()
appsync = AppSync(aws_provider)
assert appsync.client.__class__.__name__ == "AppSync"

# Test AppSync Session
def test__get_session__(self):
aws_provider = set_mocked_aws_provider()
appsync = AppSync(aws_provider)
assert appsync.session.__class__.__name__ == "Session"

# Test AppSync Session
def test_audited_account(self):
aws_provider = set_mocked_aws_provider()
appsync = AppSync(aws_provider)
assert appsync.audited_account == AWS_ACCOUNT_NUMBER

# Test AppSync Describe File Systems
@mock_aws
def test_list_graphql_apis(self):
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
appsync = client("appsync", region_name=AWS_REGION_US_EAST_1)
api = appsync.create_graphql_api(
name="test-api",
authenticationType="API_KEY",
logConfig={"fieldLogLevel": "ALL", "cloudWatchLogsRoleArn": "test"},
)
api_arn = api["graphqlApi"]["arn"]
appsync_client = AppSync(aws_provider)

assert appsync_client.graphql_apis[api_arn].name == "test-api"
assert appsync_client.graphql_apis[api_arn].field_log_level == "ALL"
assert appsync_client.graphql_apis[api_arn].authentication_type == "API_KEY"
assert appsync_client.graphql_apis[api_arn].tags == [{}]
Loading