Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Tags v3 in Expanderv1 #27111

Merged
merged 4 commits into from
Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 46 additions & 20 deletions Packs/ExpanseV2/Integrations/ExpanseV2/ExpanseV2.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@
'Domain': 'domains',
'Certificate': 'certificates',
'CloudResource': 'cloud-resources',
'IpRange': 'ip-range'
'IpRange': 'ip-range',
'ResponsiveIP': 'responsive-ip',
'Network': 'network',
'Device': 'device'
}

ASSET_TAG_OPERATIONS = ['ASSIGN', 'UNASSIGN']
Expand Down Expand Up @@ -383,34 +386,57 @@ def get_asset_details(self, asset_type: str, asset_id: str,

def manage_asset_tags(self, asset_type: str, operation_type: str, asset_id: str,
tag_ids: List[str]) -> Dict[str, Any]:
endpoint_base = asset_type if asset_type == "ip-range" else f"assets/{asset_type}"
# Only custom ranges need to use the v2 APIs, otherwise we should always use v3
if asset_type == "ip-range":
tag_url = f'/v2/{asset_type}/tag-assignments/bulk'
data = {"operations": [{
'operationType': operation_type,
'tagIds': tag_ids,
'assetId': asset_id
}]}

data: Dict = {"operations": [{
'operationType': operation_type,
'tagIds': tag_ids,
'assetId': asset_id
else:
tag_url = '/v3/assets/assets/annotations'
data = {"operations": [
{
"operationType": operation_type,
"annotationType": "TAG",
"annotationIds": tag_ids,
"assetId": asset_id
}]}

}]}
return self._http_request(
method='POST',
url_suffix=f'/v2/{endpoint_base}/tag-assignments/bulk',
url_suffix=tag_url,
json_data=data,
retries=3
)

def manage_asset_pocs(self, asset_type: str, operation_type: str, asset_id: str, poc_ids: List[str]) -> Dict[str, Any]:
endpoint_base = asset_type if asset_type == "ip-range" else f"assets/{asset_type}"
# Only custom ranges need to use the v2 APIs, otherwise we should always use v3
if asset_type == "ip-range":
poc_url = f'/v2/{asset_type}/contact-assignments/bulk'
data = {"operations": [{
'operationType': operation_type,
'contactIds': poc_ids,
'assetId': asset_id
}]}

data: Dict = {"operations": [{
'operationType': operation_type,
'contactIds': poc_ids,
'assetId': asset_id
else:
poc_url = '/v3/assets/assets/annotations'
data = {"operations": [
{
"operationType": operation_type,
"annotationType": "CONTACT",
"annotationIds": poc_ids,
"assetId": asset_id
}]}

}]}
return self._http_request(
method='POST',
url_suffix=f'/v2/{endpoint_base}/contact-assignments/bulk',
json_data=data
url_suffix=poc_url,
json_data=data,
retries=3
)

def update_issue(self, issue_id: str, update_type: str, value: str) -> Dict[str, Any]:
Expand Down Expand Up @@ -585,8 +611,8 @@ def parse_asset_data(self, issue: Dict[str, Any],
and (re := rri[0].get('registryEntities'))
and isinstance(re, list)
):
ml_feature_list.extend(set(r['formattedName']
for r in re if 'formattedName' in r)) # pylint: disable=E1133
ml_feature_list.extend({r['formattedName']
for r in re if 'formattedName' in r}) # pylint: disable=E1133

elif a.get('assetType') == "Certificate":
# for Certificate collect issuerOrg, issuerName,
Expand Down Expand Up @@ -2474,11 +2500,11 @@ def cidr_command(client: Client, args: Dict[str, Any]) -> List[CommandResults]:


def list_risk_rules_command(client: Client, args: Dict[str, Any]):
raise DeprecatedCommandException()
raise DeprecatedCommandException


def get_risky_flows_command(client: Client, args: Dict[str, Any]):
raise DeprecatedCommandException()
raise DeprecatedCommandException


def domains_for_certificate_command(client: Client, args: Dict[str, Any]) -> CommandResults:
Expand Down
14 changes: 13 additions & 1 deletion Packs/ExpanseV2/Integrations/ExpanseV2/ExpanseV2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2490,6 +2490,9 @@ script:
- Certificate
- Domain
- CloudResource
- Network
- Device
- ResponsiveIP
required: true
secret: false
- default: false
Expand Down Expand Up @@ -2525,6 +2528,9 @@ script:
- Certificate
- Domain
- CloudResource
- Network
- Device
- ResponsiveIP
required: true
secret: false
- default: false
Expand Down Expand Up @@ -2731,6 +2737,9 @@ script:
- Certificate
- Domain
- CloudResource
- Network
- Device
- ResponsiveIP
required: true
secret: false
- default: false
Expand Down Expand Up @@ -2766,6 +2775,9 @@ script:
- Certificate
- Domain
- CloudResource
- Network
- Device
- ResponsiveIP
required: true
secret: false
- default: false
Expand Down Expand Up @@ -6110,7 +6122,7 @@ script:
- contextPath: Expanse.IPDomains.DomainList
description: An array of domain objects. This is truncated at 50.
type: Unknown
dockerimage: demisto/python3:3.10.11.54132
dockerimage: demisto/python3:3.10.11.61265
feed: false
isfetch: true
longRunning: false
Expand Down
107 changes: 107 additions & 0 deletions Packs/ExpanseV2/Integrations/ExpanseV2/ExpanseV2_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2051,3 +2051,110 @@ def test_format_domain_data_empty_domainStatuses():

results = format_domain_data([mock_domain_data])
assert results

@pytest.mark.parametrize(
"args, expected_url, expected_data",
[
(
{
"asset_type": "ip-range",
"operation_type": "UNASSIGN",
"asset_id": "c871feab-7d38-4cc5-9d36-5dad76f6b389",
"tag_ids": ["tag1", "tag2"]
},
"https://example.com/api/v2/ip-range/tag-assignments/bulk",
{
"operations": [{
'operationType': "UNASSIGN",
'tagIds': ["tag1", "tag2"],
'assetId': "c871feab-7d38-4cc5-9d36-5dad76f6b389"
}]
}
),
(
{
"asset_type": "other-asset", # one of : Certificate, Domain, CloudResource, Network, Device, ResponsiveIP
"operation_type": "UNASSIGN",
"asset_id": "c871feab-7d38-4cc5-9d36-5dad76f6b389",
"tag_ids": ["tag3", "tag4"]
},
"https://example.com/api/v3/assets/assets/annotations",
{
"operations": [{
"operationType": "UNASSIGN",
"annotationType": "TAG",
"annotationIds": ["tag3", "tag4"],
"assetId": "c871feab-7d38-4cc5-9d36-5dad76f6b389"
}]
}
)
]
)
def test_manage_asset_tags(requests_mock, args, expected_url, expected_data):
"""
Given:
- Different asset types
When
- Calling the `manage_asset_tags` method with the given `args`.
Then
- Assert that the HTTP request was made with the correct parameters
"""
from ExpanseV2 import Client
client = Client(api_key="key", base_url="https://example.com/api", verify=True, proxy=False)
requests_mock.post(expected_url, json=expected_data)

client.manage_asset_tags(**args)


@pytest.mark.parametrize(
"args, expected_url, expected_data",
[
(
{
"asset_type": "ip-range",
"operation_type": "UNASSIGN",
"asset_id": "f491b7ef-a7b9-4644-af90-36dc0a6b2000",
"poc_ids": ["f491b7ef-a7b9-4644-af90-36dc0a6b2000"]
},
"https://example.com/api/v2/ip-range/contact-assignments/bulk",
{
"operations": [{
'operationType': "UNASSIGN",
'contactIds': ["f491b7ef-a7b9-4644-af90-36dc0a6b2000"],
'assetId': "f491b7ef-a7b9-4644-af90-36dc0a6b2000"
}]
}
),
(
{
"asset_type": "other-asset", # one of : Certificate, Domain, CloudResource, Network, Device, ResponsiveIP
"operation_type": "UNASSIGN",
"asset_id": "f491b7ef-a7b9-4644-af90-36dc0a6b2000",
"poc_ids": ["f491b7ef-a7b9-4644-af90-36dc0a6b2000"]
},
"https://example.com/api/v3/assets/assets/annotations",
{
"operations": [{
"operationType": "UNASSIGN",
"annotationType": "CONTACT",
"annotationIds": ["f491b7ef-a7b9-4644-af90-36dc0a6b2000"],
"assetId": "f491b7ef-a7b9-4644-af90-36dc0a6b2000"
}]
}
)
]
)
def test_manage_asset_pocs(requests_mock, args, expected_url, expected_data):
"""
Given:
- Different asset types
When
- Calling the `manage_asset_pocs` method with the given `args`.
Then
- Assert that the HTTP request was made with the correct parameters
"""
from ExpanseV2 import Client
client = Client(api_key="key", base_url="https://example.com/api", verify=True, proxy=False)
requests_mock.post(expected_url, json=expected_data)

client.manage_asset_pocs(**args)
Loading