Skip to content

Commit

Permalink
feat(testing):[#243] Add requests for batch orders, batches and job r…
Browse files Browse the repository at this point in the history
…equests
  • Loading branch information
ds-jhartmann committed Nov 21, 2023
1 parent eeec28a commit 20d4f99
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 51 deletions.
203 changes: 153 additions & 50 deletions local/demo/ess-demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,19 @@
# under the License.
#
# SPDX-License-Identifier: Apache-2.0

import argparse
import json
import logging
import time
from datetime import datetime
from types import SimpleNamespace

import jwt
import requests
from oauthlib.oauth2 import BackendApplicationClient
from requests.adapters import HTTPAdapter, Retry
from requests.auth import HTTPBasicAuth
from requests_oauthlib import OAuth2Session
from oauthlib.oauth2 import BackendApplicationClient


def get_semantic_ids_from_twin(digital_twin_):
Expand Down Expand Up @@ -106,17 +108,17 @@ def filter_for_as_planned_and_bpn(search_bpn_):
def poll_batch_job(batch_url, token_):
header_ = {
"Content-Type": "application/json",
"Authorization": f"Bearer {token_['access_token']}"
"Authorization": f"Bearer {get_or_refresh_oauth_token(token_url, client_id, client_secret, token_)}"
}
while True:
try:
response_ = oauth.get(batch_url, headers=header_)
response_ = session.get(batch_url, headers=header_)
response_json = response_.json()
logging.info(response_json)
print_order(response_json)

state = response_json.get("state")
if state in ("COMPLETED", "ERROR", "CANCELED"):
logging.info(f"Batch completed in state '{state}'")
logging.info(f"ESS Batch Investigation completed in state '{state}'")
return response_json

time.sleep(5)
Expand All @@ -125,72 +127,173 @@ def poll_batch_job(batch_url, token_):
break


def print_order(response_json):
fields_to_print = ["orderId", "state", "batches"]
fields_to_print_nested = ["batchId", "batchUrl", "batchProcessingState"]
selected_fields = {key: response_json[key] for key in fields_to_print if key in response_json}
if "batches" in response_json:
selected_fields["batches"] = [
{nested_key: item[nested_key] for nested_key in fields_to_print_nested if nested_key in item}
for item in response_json["batches"]
]
# Pretty print the selected fields
pretty_selected_fields = json.dumps(selected_fields, indent=4)
logging.info("Polling ESS Batch. Status: " + pretty_selected_fields)


def get_oauth_token(token_url_, client_id_, client_secret_):
client = BackendApplicationClient(client_id=client_id_)
oauth_ = OAuth2Session(client=client)
token_ = oauth_.fetch_token(token_url=token_url_, auth=HTTPBasicAuth(client_id_, client_secret_))["access_token"]
return oauth_, token_
return token_


def get_or_refresh_oauth_token(token_url_, client_id_, client_secret_, token_: None):
if token_ is None:
return get_oauth_token(token_url_, client_id_, client_secret_)
else:
decoded_token = jwt.decode(token_, options={"verify_signature": False})
exp_timestamp = datetime.fromtimestamp(decoded_token["exp"])
current_timestamp = datetime.now()
if exp_timestamp < current_timestamp:
logging.info("Token expired. Requesting new.")
return get_oauth_token(token_url_, client_id_, client_secret_)
else:
return token_


def start_ess_investigation(irs_ess_url_, incident_bpns_, filtered_twins_, token_):
payload_ = {
"batchSize": 10,
"batchStrategy": "PRESERVE_BATCH_JOB_ORDER",
"incidentBPNSs": incident_bpns_,
"keys": filtered_twins_
}
headers_ = {
"Content-Type": "application/json",
"Authorization": f"Bearer {get_or_refresh_oauth_token(token_url, client_id, client_secret, token_)}"
}
logging.info(f"Starting ESS batch investigation with {json.dumps(payload_, indent=4)}")
response_ = session.post(url=irs_ess_url_, json=payload_, headers=headers_)

if response_.status_code != 201:
logging.error(f"Failed to start ESS Batch Investigation. Status code: {response_.status_code}")
raise Exception("Failed to start ESS Batch Investigation")
else:
batch_id_ = response_.json().get("id")
logging.info(f"Started ESS Batch Investigation with id {batch_id_}")
return batch_id_


def get_jobs_for_batch(url_, token_):
headers_ = {
"Content-Type": "application/json",
"Authorization": f"Bearer {get_or_refresh_oauth_token(token_url, client_id, client_secret, token_)}"
}
response_ = session.get(url_, headers=headers_)
return response_.json().get("jobs")


def get_job_for_id(url_, token_):
headers_ = {
"Content-Type": "application/json",
"Authorization": f"Bearer {get_or_refresh_oauth_token(token_url, client_id, client_secret, token_)}"
}
response_ = session.get(url_, headers=headers_)

return response_.json()


def prepare_arguments():
parser = argparse.ArgumentParser(description="Script to demonstrate the ESS Batch Investigation flow.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--aas", type=str, help="AAS registry URL", required=True)
parser.add_argument("--ownBPN", type=str, help="BPN of the requesting Company", required=True)
parser.add_argument("--searchBPN", type=str, help="BPN of the Company to search for", required=True)
parser.add_argument("--incidentBPNS", type=str, nargs="*",
help="List of BPNS of the Companies where the incidents occurred", required=True)
parser.add_argument("--irs", type=str, help="IRS base URL", required=True)
parser.add_argument("--tokenurl", type=str, help="OAuth2 token URL", required=True)
parser.add_argument("--clientid", type=str, help="Client ID", required=True)
parser.add_argument("--clientsecret", type=str, help="Client Secret", required=True)
parser.add_argument("--debug", help="debug logging", action='store_true', required=False)
args = parser.parse_args()
return vars(args)

# Demo Cases:
# Case 1 (incident and no issues in tree):
# searchBPN: BPNL00ARBITRARY4
# incidentBPNS: BPNS00ARBITRARY6

# Case 2 (no incident and no issues in tree):
# searchBPN: BPNL00ARBITRARY4
# incidentBPNS: BPNS00ARBITRARY8

# Case 3 (incident and not resolvable path in tree):
# searchBPN: BPNL00ARBITRARY8
# incidentBPNS: BPNS0ARBITRARY10

# Case 4 (no incident and not resolvable path in tree):
# searchBPN: BPNL00ARBITRARY8
# incidentBPNS: BPNS0ARBITRARY12


if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)-5.5s] %(message)s')
config = prepare_arguments()
registry_url = config.get("aas")
own_BPN = config.get("ownBPN")
search_BPN = config.get("searchBPN")
incident_BPNSs = config.get("incidentBPNS")
irs_base_url = config.get("irs")
token_url = config.get("tokenurl")
client_id = config.get("clientid")
client_secret = config.get("clientsecret")
is_debug = config.get("debug")

irs_ess_url = f"{irs_base_url}/ess/bpn/investigations"
irs_ess_batch_url = f"{irs_base_url}/irs/ess/orders"

logging_level = logging.INFO
if is_debug:
logging_level = logging.DEBUG

logging.basicConfig(level=logging_level, format='%(asctime)s [%(levelname)-5.5s] %(message)s')
retries = Retry(total=3,
backoff_factor=0.1)
session = requests.Session()
session.mount('https://', HTTPAdapter(max_retries=retries))

# TODO Args: -debug for "debug" instead of "info" logs
# TODO params for registry url, requestor BPN, filter BPN

# TODO eval, if it makes sense to filter via lookup/shells endpoint or filter by getting the entire list of twins

# Fetch all digital Twins from the DTR
digital_twins = fetch_all_registry_data(registry_url, own_BPN)
# print(digital_twins)

# Filter for bomLifecycle "asPlanned" and the provided BPN
filtered_twins = filter_for_as_planned_and_bpn(search_BPN)

logging.info(f"Found {len(filtered_twins)} twin(s) after filtering.")
logging.info(filtered_twins)
logging.info(json.dumps(filtered_twins, indent=4))

# Start IRS batch job

oauth, token = get_oauth_token(oauth_url, client_id, client_secret)
logging.info(token)
# Authenticate
token = get_oauth_token(token_url, client_id, client_secret)

# Start IRS batch job
payload = {
"batchSize": 10,
"batchStrategy": "PRESERVE_BATCH_JOB_ORDER",
"incidentBPNSs": [incident_BPN],
"keys": filtered_twins
}
batch_id = start_ess_investigation(irs_ess_url, incident_BPNSs, filtered_twins, token)

headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {token}"
}
logging.info(payload)
response = oauth.post(url=irs_ess_url, json=payload, headers=headers)
logging.info(response.json())

if response.status_code != 201:
logging.error(f"Failed to start IRS Batch Job. Status code: {response.status_code}")
raise Exception("Failed to start IRS Batch job")
else:
batch_id = response.json().get("id")

irs_batch_url = f"{irs_ess_url}/{batch_id}"
completed_batch = poll_batch_job(irs_batch_url, token)
for batch in completed_batch:
logging.info(batch.get("id"))
logging.info(batch.get("state"))
# Poll batch until it is completed
completed_batch = poll_batch_job(f"{irs_ess_batch_url}/{batch_id}", token)
for batch in completed_batch.get("batches"):
url = batch.get("batchUrl")
logging.info(url)
response = oauth.get(url, headers=headers)
print(response.text)

# TODO better formatted batch polling
# TODO Get jobs from batch
# TODO Show result
jobs = get_jobs_for_batch(url, token)
for job in jobs:
job_id = job.get("id")

job = get_job_for_id(f"{irs_ess_url}/{job_id}", token)

submodels = job.get("submodels")
for submodel in submodels:
submodel_payload = submodel.get('payload')
impacted = submodel_payload.get("supplyChainImpacted")
impacted_suppliers = submodel_payload.get("impactedSuppliersOnFirstTier")
logging.info(f"Investigation result for Job {job_id} resulted in {impacted}.")
if impacted_suppliers:
logging.info(f"Impacted Suppliers on first level: {impacted_suppliers}.")
3 changes: 2 additions & 1 deletion local/demo/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
requests~=2.27.1
requests_oauthlib~=1.3.1
oauthlib~=3.2.2
oauthlib~=3.2.2
PyJWT~=2.6.0

0 comments on commit 20d4f99

Please sign in to comment.