diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index abdcb6e1..0b22e1c9 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -57,6 +57,8 @@ jobs: docker image rm alpine:3.16 docker image rm alpine:3.17 docker image rm alpine:3.18 + docker image rm ubuntu:18.04 + docker image rm ubuntu:20.04 - name: Wait for contracts deployment and C2D cluster to be ready working-directory: ${{ github.workspace }}/barge run: | diff --git a/ocean_provider/routes/consume.py b/ocean_provider/routes/consume.py index 91b6860a..dc593891 100644 --- a/ocean_provider/routes/consume.py +++ b/ocean_provider/routes/consume.py @@ -18,7 +18,12 @@ check_asset_consumable, get_asset_from_metadatastore, ) -from ocean_provider.utils.basics import get_metadata_url, get_provider_wallet, get_web3 +from ocean_provider.utils.basics import ( + get_metadata_url, + get_provider_wallet, + get_web3, + get_network_name, +) from ocean_provider.utils.datatoken import validate_order from ocean_provider.utils.error_responses import error_response from ocean_provider.utils.proof import send_proof @@ -66,8 +71,8 @@ def nonce(): if request.method.upper() in BaseURLs.NOT_ALLOWED_METHODS: return error_response("Method Not Allowed", 405, logger) - logger.info("nonce endpoint called") data = get_request_data(request) + logger.info(f"nonce endpoint called with data: {data}") address = data.get("userAddress") nonce = get_nonce(address) @@ -112,7 +117,7 @@ def fileinfo(): return: list of file info (index, valid, contentLength, contentType) """ data = get_request_data(request) - logger.debug(f"fileinfo called. arguments = {data}") + logger.info(f"fileinfo called. arguments = {data}") did = data.get("did") service_id = data.get("serviceId") @@ -124,17 +129,24 @@ def fileinfo(): 400, logger, ) + + network_name = get_network_name(asset.chain_id) + logger.info( + f"Provider {network_name}: Retrieved asset {did} from Metadata store." + ) service = asset.get_service_by_id(service_id) if not service: return error_response( - "Invalid serviceId.", + f"Provider {network_name}: Invalid serviceId.", 400, logger, ) provider_wallet = get_provider_wallet(asset.chain_id) files_list = get_service_files_list(service, provider_wallet, asset) if not files_list: - return error_response("Unable to get dataset files", 400, logger) + return error_response( + f"Provider {network_name}: Unable to get dataset files", 400, logger + ) else: files_list = [data] @@ -193,7 +205,7 @@ def initialize(): ``` """ data = get_request_data(request) - logger.info(f"initialize called. arguments = {data}") + logger.info(f"initialize endpoint called. arguments = {data}") did = data.get("documentId") consumer_address = data.get("consumerAddress") @@ -205,21 +217,24 @@ def initialize(): 400, logger, ) + network_name = get_network_name(asset.chain_id) + logger.info(f"Provider {network_name}: Retrieved asset {did} from Metadata store.") + consumable, message = check_asset_consumable(asset, consumer_address, logger) if not consumable: - return error_response(message, 400, logger) + return error_response(f"Provider {network_name}: " + message, 400, logger) service_id = data.get("serviceId") service = asset.get_service_by_id(service_id) if not service: return error_response( - "Invalid serviceId.", + f"Provider {network_name}: Invalid serviceId.", 400, logger, ) if service.type == "compute": return error_response( - "Use the initializeCompute endpoint to initialize compute jobs.", + f"Provider {network_name}: Use the initializeCompute endpoint to initialize compute jobs.", 400, logger, ) @@ -236,8 +251,10 @@ def initialize(): allow_expired_provider_fees=True, ) return {"validOrder": _order_log.transactionHash.hex()}, 200 - except Exception: - pass + except Exception as e: + logger.error( + f"Provider {network_name}: Received error when validation order: {e}" + ) token_address = service.datatoken_address @@ -249,13 +266,13 @@ def initialize(): url_object["userdata"] = data.get("userdata") valid, message = FilesTypeFactory.validate_and_create(url_object) if not valid: - return error_response(message, 400, logger) + return error_response(f"Provider {network_name}: " + message, 400, logger) file_instance = message valid, url_details = file_instance.check_details(with_checksum=False) if not valid or not url_details: return error_response( - f"Error: Asset URL not found, not available or invalid. \n" + f"Provider {network_name}: Asset URL not found, not available or invalid. \n" f"Payload was: {data}", 400, logger, @@ -278,7 +295,7 @@ def initialize(): approve_params["validOrder"] = valid_order response = jsonify(approve_params), 200 - logger.info(f"initialize response = {response}") + logger.info(f"Provider {network_name}: initialize response = {response}") return response @@ -336,10 +353,18 @@ def download(): # grab asset for did from the metadatastore associated with # the datatoken address asset = get_asset_from_metadatastore(get_metadata_url(), did) + if not asset: + return error_response( + "Cannot resolve DID", + 400, + logger, + ) + network_name = get_network_name(asset.chain_id) + logger.info(f"Provider {network_name}: Retrieved asset {did} from Metadata store.") consumable, message = check_asset_consumable(asset, consumer_address, logger) if not consumable: - return error_response(message, 400, logger) + return error_response(f"Provider {network_name}: " + message, 400, logger) service = asset.get_service_by_id(service_id) @@ -358,12 +383,14 @@ def download(): if not is_c2d_consumer_address: return error_response( - f"Service with index={service_id} is not an access service.", + f"Provider {network_name}: Service with index={service_id} is not an access service.", 400, logger, ) - logger.info("validate_order called from download endpoint.") + logger.info( + f"Provider {network_name}: validate_order called from download endpoint." + ) try: _tx, _order_log, _, _ = validate_order( @@ -371,7 +398,7 @@ def download(): ) except Exception as e: return error_response( - f"=Order with tx_id {tx_id} could not be validated due to error: {e}", + f"Provider {network_name}: Order with tx_id {tx_id} could not be validated due to error: {e}", 400, logger, ) @@ -380,28 +407,30 @@ def download(): provider_wallet = get_provider_wallet(asset.chain_id) files_list = get_service_files_list(service, provider_wallet, asset) if file_index > len(files_list): - return error_response(f"No such fileIndex {file_index}", 400, logger) + return error_response( + f"Provider {network_name}: No such fileIndex {file_index}", 400, logger + ) url_object = files_list[file_index] url_object["userdata"] = data.get("userdata") url_valid, message = FilesTypeFactory.validate_and_create(url_object) if not url_valid: - return error_response(message, 400, logger) + return error_response(f"Provider {network_name}: " + message, 400, logger) file_instance = message valid, details = file_instance.check_details(with_checksum=True) if not valid: - return error_response(details, 400, logger) + return error_response(f"Provider {network_name}: " + details, 400, logger) logger.debug( - f"Done processing consume request for asset {did}, " + f"Provider {network_name}: Done processing consume request for asset {did}, " f" url {file_instance.get_download_url()}" ) update_nonce(consumer_address, data.get("nonce")) response = file_instance.build_download_response(request) - logger.info(f"download response = {response}") + logger.info(f"Provider {network_name}: download response = {response}") provider_proof_data = json.dumps( { @@ -424,5 +453,6 @@ def download(): consumer_address=consumer_address, datatoken_address=service.datatoken_address, ) + logger.info(f"Provider {network_name}: proof approved") return response diff --git a/ocean_provider/utils/basics.py b/ocean_provider/utils/basics.py index 2e91ecc1..aee11f0a 100644 --- a/ocean_provider/utils/basics.py +++ b/ocean_provider/utils/basics.py @@ -5,6 +5,9 @@ import json import logging import os +from pathlib import Path + +import addresses from datetime import datetime, timezone from distutils.util import strtobool from json.decoder import JSONDecodeError @@ -161,6 +164,26 @@ def get_web3_connection_provider( raise AssertionError(msg) +def get_network_name(chain_id: int) -> str: + if not chain_id: + logger.error("Chain ID is missing") + + if chain_id == 8996: + return "Ganache" + + address_path = Path(os.path.join(addresses.__file__, "..", "address.json")) + + address_file = address_path.expanduser().resolve() + with open(address_file) as f: + addresses_json = json.load(f) + + for k, v in addresses_json.items(): + if v["chainId"] == chain_id: + return k + + return "Unknown" + + def send_ether(web3, from_wallet: Account, to_address: str, amount: int): """Sends ether from wallet to the address.""" if not Web3.isChecksumAddress(to_address): diff --git a/ocean_provider/utils/datatoken.py b/ocean_provider/utils/datatoken.py index 78e628dc..d6a74e47 100644 --- a/ocean_provider/utils/datatoken.py +++ b/ocean_provider/utils/datatoken.py @@ -9,7 +9,7 @@ from eth_typing.evm import HexAddress from hexbytes import HexBytes from ocean_provider.utils.address import get_contract_definition -from ocean_provider.utils.basics import get_provider_wallet +from ocean_provider.utils.basics import get_provider_wallet, get_network_name from ocean_provider.utils.currency import to_wei from ocean_provider.utils.data_nft import get_data_nft_contract from ocean_provider.utils.services import Service @@ -55,13 +55,15 @@ def verify_order_tx( except ConnectionClosed: # try again in this case tx_receipt = _get_tx_receipt(web3, tx_id) + + network_name = get_network_name(web3.eth.chain_id) if tx_receipt is None: raise AssertionError( - "Failed to get tx receipt for the `startOrder` transaction.." + f"Provider {network_name}: Failed to get tx receipt for the `startOrder` transaction.." ) if tx_receipt.status == 0: - raise AssertionError("order transaction failed.") + raise AssertionError(f"Provider {network_name}: order transaction failed.") # check provider fees datatoken_contract = get_datatoken_contract(web3, datatoken_address) @@ -86,7 +88,7 @@ def verify_order_tx( if not provider_fee_order_log: raise AssertionError( - f"Cannot find the event for the provider fee in tx id {tx_id}." + f"Provider {network_name}: Cannot find the event for the provider fee in tx id {tx_id}." ) provider_initialize_timestamp = 0 @@ -94,7 +96,7 @@ def verify_order_tx( provider_data = json.loads(provider_fee_order_log.args.providerData) if extra_data["environment"] != provider_data["environment"]: raise AssertionError( - "Mismatch between ordered c2d environment and selected one." + f"Provider {network_name}: Mismatch between ordered c2d environment and selected one." ) provider_initialize_timestamp = provider_data["timestamp"] @@ -102,7 +104,7 @@ def verify_order_tx( provider_fee_order_log.args.providerFeeAddress ) != Web3.toChecksumAddress(provider_wallet.address): raise AssertionError( - f"The providerFeeAddress {provider_fee_order_log.args.providerFeeAddress} in the event does " + f"Provider {network_name}: The providerFeeAddress {provider_fee_order_log.args.providerFeeAddress} in the event does " f"not match the provider address {provider_wallet.address}\n" ) @@ -132,7 +134,7 @@ def verify_order_tx( pk = keys.PrivateKey(provider_wallet.key) if not keys.ecdsa_verify(signable_hash, signature, pk.public_key): raise AssertionError( - "Provider was not able to check the signed message in ProviderFees event\n" + f"Provider {network_name}: Provider was not able to check the signed message in ProviderFees event\n" ) timestamp_now = datetime.now(timezone.utc).timestamp() @@ -147,7 +149,9 @@ def verify_order_tx( >= provider_fee_order_log.args.validUntil + (block.timestamp - provider_initialize_timestamp + 90) ): - raise AssertionError("Ordered c2d time was exceeded, check validUntil.") + raise AssertionError( + f"Provider {network_name}: Ordered c2d time was exceeded, check validUntil." + ) # end check provider fees # check if we have an OrderReused event. If so, get orderTxId and switch next checks to use that @@ -157,8 +161,10 @@ def verify_order_tx( tx_receipt, errors=DISCARD ) except Exception as e: - logger.error(e) - logger.debug(f"Got events log when searching for ReuseOrder : {event_logs}") + logger.error(f"Provider {network_name}: {e}") + logger.debug( + f"Provider {network_name}: Got events log when searching for ReuseOrder : {event_logs}" + ) log_timestamp = None order_log = event_logs[0] if event_logs else None if order_log and order_log.args.orderTxId: @@ -169,11 +175,17 @@ def verify_order_tx( # try again in this case tx_receipt = _get_tx_receipt(web3, order_log.args.orderTxId) if tx_receipt is None: - raise AssertionError("Failed to get tx receipt referenced in OrderReused..") + raise AssertionError( + f"Provider {network_name}: Failed to get tx receipt referenced in OrderReused.." + ) if tx_receipt.status == 0: - raise AssertionError("order referenced in OrderReused failed.") + raise AssertionError( + f"Provider {network_name}: order referenced in OrderReused failed." + ) - logger.debug(f"Search for orderStarted in tx_receipt : {tx_receipt}") + logger.debug( + f"Provider {network_name}: Search for orderStarted in tx_receipt : {tx_receipt}" + ) # this has changed now if the original original_tx was a reuseOrder start_order_tx_id = tx_receipt.transactionHash @@ -182,8 +194,10 @@ def verify_order_tx( tx_receipt, errors=DISCARD ) except Exception as e: - logger.error(e) - logger.debug(f"Got events log when searching for OrderStarted : {event_logs}") + logger.error(f"Provider {network_name}: {e}") + logger.debug( + f"Provider {network_name}: Got events log when searching for OrderStarted : {event_logs}" + ) order_log = None # search in all startOrder events until we have a match. if not, we don't have a valid event for log in event_logs: @@ -192,12 +206,12 @@ def verify_order_tx( if not order_log: raise AssertionError( - f"Cannot find the event for the order transaction with tx id {tx_id}." + f"Provider {network_name}: Cannot find the event for the order transaction with tx id {tx_id}." ) if order_log.args.serviceIndex != service.index: raise AssertionError( - f"The service id in the event does " + f"Provider {network_name}: The service id in the event does " f"not match the requested asset. \n" f"requested: serviceIndex={service.index}\n" f"event: serviceIndex={order_log.args.serviceIndex}" @@ -205,7 +219,7 @@ def verify_order_tx( if order_log.args.amount < amount: raise ValueError( - f"The amount in the event is less than the amount requested. \n" + f"Provider {network_name}: The amount in the event is less than the amount requested. \n" f"requested: amount={amount}\n" f"event: amount={order_log.args.amount}" ) @@ -218,12 +232,12 @@ def verify_order_tx( ) timestamp_delta = timestamp_now - log_timestamp logger.debug( - f"verify_order_tx: service timeout = {service.timeout}, timestamp delta = {timestamp_delta}" + f"Provider {network_name}: verify_order_tx: service timeout = {service.timeout}, timestamp delta = {timestamp_delta}" ) if service.timeout != 0: if timestamp_delta > service.timeout: raise ValueError( - f"The order has expired. \n" + f"Provider {network_name}: The order has expired. \n" f"current timestamp={timestamp_now}\n" f"order timestamp={log_timestamp}\n" f"timestamp delta={timestamp_delta}\n" @@ -234,7 +248,9 @@ def verify_order_tx( web3.toChecksumAddress(order_log.args.consumer), web3.toChecksumAddress(order_log.args.payer), ]: - raise ValueError("sender of order transaction is not the consumer/payer.") + raise ValueError( + f"Provider {network_name}: sender of order transaction is not the consumer/payer." + ) tx = web3.eth.get_transaction(HexBytes(tx_id)) @@ -253,9 +269,9 @@ def validate_order( did = asset.did token_address = web3.toChecksumAddress(service.datatoken_address) num_tokens = 1 - + network_name = get_network_name(asset.chain_id) logger.debug( - f"validate_order: did={did}, service_id={service.id}, tx_id={tx_id}, " + f"Provider {network_name}: validate_order: did={did}, service_id={service.id}, tx_id={tx_id}, " f"sender={sender}, num_tokens={num_tokens}, token_address={token_address}" ) @@ -266,7 +282,9 @@ def validate_order( num_tries = 3 i = 0 while i < num_tries: - logger.debug(f"validate_order is on trial {i + 1} in {num_tries}.") + logger.debug( + f"Provider {network_name}: validate_order is on trial {i + 1} in {num_tries}." + ) i += 1 try: tx, order_event, provider_fees_event, start_order_tx_id = verify_order_tx( @@ -280,17 +298,19 @@ def validate_order( allow_expired_provider_fees, ) logger.debug( - f"validate_order succeeded for: did={did}, service_id={service.id}, tx_id={tx_id}, " + f"Provider {network_name}: validate_order succeeded for: did={did}, service_id={service.id}, tx_id={tx_id}, " f"sender={sender}, num_tokens={num_tokens}, token_address={token_address}. " f"result is: tx={tx}, order_event={order_event}." ) return tx, order_event, provider_fees_event, start_order_tx_id except ConnectionClosed: - logger.debug("got ConnectionClosed error on validate_order.") + logger.debug( + f"Provider {network_name}: got ConnectionClosed error on validate_order." + ) if i == num_tries: logger.debug( - "reached max no. of tries, raise ConnectionClosed in validate_order." + f"Provider {network_name}: reached max no. of tries, raise ConnectionClosed in validate_order." ) raise except Exception: diff --git a/ocean_provider/utils/provider_fees.py b/ocean_provider/utils/provider_fees.py index 249fbb41..74b0697d 100644 --- a/ocean_provider/utils/provider_fees.py +++ b/ocean_provider/utils/provider_fees.py @@ -9,7 +9,12 @@ from ocean_provider.requests_session import get_requests_session from ocean_provider.utils.address import get_provider_fee_token from ocean_provider.utils.asset import get_asset_from_metadatastore -from ocean_provider.utils.basics import get_metadata_url, get_provider_wallet, get_web3 +from ocean_provider.utils.basics import ( + get_metadata_url, + get_provider_wallet, + get_web3, + get_network_name, +) from ocean_provider.utils.compute_environments import ( get_c2d_environments, get_environment, @@ -84,7 +89,8 @@ def get_provider_fees( "s": Web3.toHex(Web3.toBytes(signed.s).rjust(32, b"\0")), "validUntil": valid_until, } - logger.debug(f"Returning provider_fees: {provider_fee}") + network_name = get_network_name(asset.chain_id) + logger.debug(f"Provider {network_name}: Returning provider_fees: {provider_fee}") return provider_fee diff --git a/ocean_provider/utils/util.py b/ocean_provider/utils/util.py index 9bf3f586..013bf4f2 100644 --- a/ocean_provider/utils/util.py +++ b/ocean_provider/utils/util.py @@ -13,6 +13,7 @@ from eth_keys.backends import NativeECCBackend from eth_typing.encoding import HexStr from ocean_provider.utils.asset import Asset +from ocean_provider.utils.basics import get_network_name from ocean_provider.utils.encryption import do_decrypt from ocean_provider.utils.services import Service from web3 import Web3 @@ -40,6 +41,7 @@ def get_service_files_list( if asset is None or version == "4.0.0": return get_service_files_list_old_structure(service, provider_wallet) + network_name = get_network_name(asset.chain_id) try: files_str = do_decrypt(service.encrypted_files, provider_wallet) if not files_str: @@ -49,29 +51,35 @@ def get_service_files_list( for key in ["datatokenAddress", "nftAddress", "files"]: if key not in files_json: - raise Exception(f"Key {key} not found in files.") + raise Exception( + f"Provider {network_name}: Key {key} not found in files." + ) if Web3.toChecksumAddress( files_json["datatokenAddress"] ) != Web3.toChecksumAddress(service.datatoken_address): raise Exception( - f"Mismatch of datatoken. Got {files_json['datatokenAddress']} vs expected {service.datatoken_address}" + f"Provider {network_name}: Mismatch of datatoken. Got {files_json['datatokenAddress']} vs expected {service.datatoken_address}" ) if Web3.toChecksumAddress(files_json["nftAddress"]) != Web3.toChecksumAddress( asset.nftAddress ): raise Exception( - f"Mismatch of dataNft. Got {files_json['nftAddress']} vs expected {asset.nftAddress}" + f"Provider {network_name}: Mismatch of dataNft. Got {files_json['nftAddress']} vs expected {asset.nftAddress}" ) files_list = files_json["files"] if not isinstance(files_list, list): - raise TypeError(f"Expected a files list, got {type(files_list)}.") + raise TypeError( + f"Provider {network_name}: Expected a files list, got {type(files_list)}." + ) return files_list except Exception as e: - logger.error(f"Error decrypting service files {Service}: {str(e)}") + logger.error( + f"Provider {network_name}: Error decrypting service files {Service}: {str(e)}" + ) return None diff --git a/tests/test_download.py b/tests/test_download.py index e5513ffe..9a5fda06 100644 --- a/tests/test_download.py +++ b/tests/test_download.py @@ -413,7 +413,7 @@ def other_service(_): assert response.status_code == 400, f"{response.data}" assert ( response.json["error"] - == f"Service with index={service.id} is not an access service." + == f"Provider Ganache: Service with index={service.id} is not an access service." ) @@ -524,7 +524,7 @@ def test_consume_algo_with_credentials( assert response.status_code == 400, f"{response.data}" assert ( response.json["error"] - == f"Error: Access to asset {alg_ddo.did} was denied with code: ConsumableCodes.CREDENTIAL_IN_DENY_LIST." + == f"Provider Ganache: Error: Access to asset {alg_ddo.did} was denied with code: ConsumableCodes.CREDENTIAL_IN_DENY_LIST." ) # Use case 2: Consume an algorithm asset by an address which is not in the allowed list @@ -577,7 +577,7 @@ def test_consume_algo_with_credentials( assert response.status_code == 400, f"{response.data}" assert ( response.json["error"] - == f"Error: Access to asset {alg_ddo.did} was denied with code: ConsumableCodes.CREDENTIAL_NOT_IN_ALLOW_LIST." + == f"Provider Ganache: Error: Access to asset {alg_ddo.did} was denied with code: ConsumableCodes.CREDENTIAL_NOT_IN_ALLOW_LIST." ) # Use case 3: Consume asset by allowed address diff --git a/tests/test_initialize.py b/tests/test_initialize.py index 939b07be..226ef17c 100644 --- a/tests/test_initialize.py +++ b/tests/test_initialize.py @@ -77,7 +77,7 @@ def test_initialize_on_disabled_asset(client, publisher_wallet, consumer_wallet, client, asset.did, service, consumer_wallet, raw_response=True ) assert "error" in response.json - assert response.json["error"] == "Asset is not consumable." + assert response.json["error"] == "Provider Ganache: Asset is not consumable." @pytest.mark.integration @@ -109,7 +109,7 @@ def test_initialize_on_asset_with_custom_credentials( assert "error" in response.json assert ( response.json["error"] - == f"Error: Access to asset {asset.did} was denied with code: ConsumableCodes.CREDENTIAL_IN_DENY_LIST." + == f"Provider Ganache: Error: Access to asset {asset.did} was denied with code: ConsumableCodes.CREDENTIAL_IN_DENY_LIST." ) @@ -174,7 +174,7 @@ def test_can_not_initialize_compute_service_with_simple_initialize( assert "error" in response.json assert ( response.json["error"] - == "Use the initializeCompute endpoint to initialize compute jobs." + == "Provider Ganache: Use the initializeCompute endpoint to initialize compute jobs." )