Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add ddo states #959

Merged
merged 15 commits into from
Dec 16, 2022
72 changes: 50 additions & 22 deletions API.md
Original file line number Diff line number Diff line change
Expand Up @@ -245,28 +245,6 @@

## Chains

### **GET** `/api/aquarius/chains/retryQueue`

- Description

Returns all queue retry logs

- Parameters

| name | description |
|----------------|--------------------------------------|
| `did` | filter for did |
| `chainId` | chain id |
| `nft` | nft |
| `type` | retry event type (tx, event or block)|


- Example
```bash
curl --location --request GET 'https://v4.aquarius.oceanprotocol.com/api/aquarius/chains/retryQueue?chainId=1'
```


### **GET** `/api/aquarius/chains/list`

- Description
Expand Down Expand Up @@ -311,6 +289,56 @@
"version": "4.4.1"}
```

## State

### **GET** `/api/aquarius/state/retryQueue`

- Description

Returns all queue retry logs

- Parameters

| name | description |
|----------------|--------------------------------------|
| `did` | filter for did |
| `chainId` | chain id |
| `nft` | nft |
| `type` | retry event type (tx, event or block)|


- Example
```bash
curl --location --request GET 'https://v4.aquarius.oceanprotocol.com/api/aquarius/state/retryQueue?chainId=1'
```


### **GET** `/api/aquarius/state/ddo`

- Description

Returns ddo(s) state(s)

- Parameters for filtering:

| name | description |
|----------------|--------------------------------------|
| `did` | did |
| `chainId` | chain id |
| `nft` | nft |
| `txId` | tx id |



- Examples
```bash
curl --location --request GET 'https://v4.aquarius.oceanprotocol.com/api/aquarius/state/ddo?did=did:op:9c1235050bcd51c8ec9a7058110102c9595136834911c315b4f739bc9a880b8e
```

```bash
curl --location --request GET 'https://v4.aquarius.oceanprotocol.com/api/aquarius/state/ddo?nft=0xC7ED00725AAb7E679fCB46C9620115fE0B6dD94a
```

## Others


Expand Down
36 changes: 0 additions & 36 deletions aquarius/app/chains.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,39 +74,3 @@ def get_index_status(chain_id):
f"Cannot get index status for chain {chain_id}. Error encountered is: {str(e)}"
)
return jsonify(error=f"Error retrieving chain {chain_id}: {str(e)}."), 404


@chains.route("/retryQueue", methods=["GET"])
def get_retry_queue():
"""Returns the current retry queue for all chains
---
responses:
200:
description: successful operation.
"""
data = request.args
chain_id = data.get("chainId", None)
nft_address = data.get("nft", None)
did = data.get("did", None)
retry_type = data.get("type", None)
if chain_id is None and nft_address is None and did is None and retry_type is None:
q = {"match_all": {}}
else:
conditions = []
if chain_id:
conditions.append({"term": {"chain_id": chain_id}})
if nft_address:
conditions.append({"term": {"nft_address": nft_address}})
if did:
conditions.append({"term": {"did": did}})
if retry_type:
conditions.append({"term": {"type": retry_type}})
q = {"bool": {"filter": conditions}}
try:
result = es_instance.es.search(index=f"{es_instance.db_index}_retries", query=q)
return jsonify(result.body)
except Exception as e:
return (
jsonify(error=f"Encountered error : {str(e)}."),
500,
)
30 changes: 30 additions & 0 deletions aquarius/app/es_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from elasticsearch import Elasticsearch
from elasticsearch.exceptions import NotFoundError
from aquarius.events.util import make_did

_DB_INSTANCE = None

Expand Down Expand Up @@ -38,13 +39,15 @@ def __init__(self):
args["client_cert"] = os.getenv("DB_CLIENT_CERT", None)
index = os.getenv("DB_INDEX", "oceandb")
self._index = index
self._did_states_index = f"{self._index}_did_states"
try:
self._es = Elasticsearch(host + ":" + str(port), **args)
while self._es.ping() is False:
logging.info("Trying to connect...")
time.sleep(5)

self._es.indices.create(index=index, ignore=400)
self._es.indices.create(index=self._did_states_index, ignore=400)

except Exception as e:
logging.info(f"Exception trying to connect... {e}")
Expand Down Expand Up @@ -170,3 +173,30 @@ def is_listed(asset):
return False

return True

def update_did_state(self, nft_address, chain_id, txid, valid, error):
"""Updates did state."""
did = make_did(nft_address, chain_id)
obj = {
"nft": nft_address,
"did": did,
"chain_id": chain_id,
"tx_id": txid,
"valid": valid,
"error": error,
}
logger.info(f"Set did state {obj} for {did}")
return self.es.index(
index=self._did_states_index,
id=did,
body=obj,
refresh="wait_for",
)["_id"]

def read_did_state(self, did):
"""Read did index state.
:param did
:return: object value from elasticsearch.
"""
# logger.debug("elasticsearch::read::{}".format(resource_id))
alexcos20 marked this conversation as resolved.
Show resolved Hide resolved
return self.es.get(index=self._did_states_index, id=did)["_source"]
100 changes: 100 additions & 0 deletions aquarius/app/state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#
# Copyright 2021 Ocean Protocol Foundation
# SPDX-License-Identifier: Apache-2.0
#
import logging

import elasticsearch
from flask import Blueprint, jsonify, request

from aquarius.app.es_instance import ElasticsearchInstance
from aquarius.log import setup_logging
from aquarius.myapp import app

setup_logging()
state = Blueprint("state", __name__)
logger = logging.getLogger("aquarius")
es_instance = ElasticsearchInstance()


def get_retry_queue(chain_id, nft_address, did, retry_type):
if chain_id is None and nft_address is None and did is None and retry_type is None:
q = {"match_all": {}}
else:
conditions = []
if chain_id:
conditions.append({"term": {"chain_id": chain_id}})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about adding the match one separately, then doing for k in ["chain_id", "did", "conditions"]: if locals[k]: conditions.append("term": {k: locals()[k]}

easier to read

Ref: https://www.programiz.com/python-programming/methods/built-in/locals

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will not work. for some, you have "term". for some (as nft), you have "match". so we will end up with bunch of ifs inside loop

if nft_address:
conditions.append({"match": {"nft_address": nft_address}})
if did:
conditions.append({"term": {"did": did}})
if retry_type:
conditions.append({"term": {"type": retry_type}})
q = {"bool": {"filter": conditions}}
return es_instance.es.search(
index=f"{es_instance.db_index}_retries", query=q, from_=0, size=10000
)


@state.route("/retryQueue", methods=["GET"])
def route_get_retry_queue():
"""Returns the current retry queue for all chains
---
responses:
200:
description: successful operation.
"""
data = request.args
try:
result = get_retry_queue(
data.get("chainId"), data.get("nft"), data.get("did"), data.get("type")
)
return jsonify(result.body)
except Exception as e:
return (
jsonify(error=f"Encountered error : {str(e)}."),
500,
)


def get_did_state(chain_id, nft_address, tx_id, did):
alexcos20 marked this conversation as resolved.
Show resolved Hide resolved
if chain_id is None and nft_address is None and did is None and tx_id is None:
q = {"match_all": {}}
else:
conditions = []
if chain_id:
conditions.append({"term": {"chain_id": chain_id}})
if nft_address:
conditions.append({"match": {"nft": nft_address}})
if tx_id:
conditions.append({"match": {"tx_id": tx_id}})
if did:
conditions.append({"term": {"_id": did}})
q = {"bool": {"filter": conditions}}
return es_instance.es.search(index=es_instance._did_states_index, query=q)


@state.route("/ddo", methods=["GET"])
def route_get_did_state():
"""Returns the current state for a did
---
responses:
200:
description: successful operation.
"""
data = request.args
if not data.get("nft") and not data.get("txId") and not data.get("did"):
alexcos20 marked this conversation as resolved.
Show resolved Hide resolved
return (
jsonify(error="You need to specify one of: nft, txId, did"),
400,
)
try:
result = get_did_state(
data.get("chainId"), data.get("nft"), data.get("txId"), data.get("did")
)
return jsonify(result.body["hits"]["hits"][0]["_source"])
except Exception as e:
return (
jsonify(error=f"Encountered error : {str(e)}."),
500,
)
1 change: 1 addition & 0 deletions aquarius/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class BaseURLs:
SWAGGER_URL = "/api/docs" # URL for exposing Swagger UI (without trailing '/')
ASSETS_URL = BASE_AQUARIUS_URL + "/assets"
CHAINS_URL = BASE_AQUARIUS_URL + "/chains"
STATE_URL = BASE_AQUARIUS_URL + "/state"
VALIDATION_URL = BASE_AQUARIUS_URL + "/validation"


Expand Down
16 changes: 13 additions & 3 deletions aquarius/events/decryptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
logger = logging.getLogger(__name__)


def decrypt_ddo(w3, provider_url, contract_address, chain_id, txid, hash):
def decrypt_ddo(w3, provider_url, contract_address, chain_id, txid, hash, es_instance):
aquarius_account = get_aquarius_wallet()
nonce = str(int(datetime.utcnow().timestamp()))

Expand All @@ -40,24 +40,34 @@ def decrypt_ddo(w3, provider_url, contract_address, chain_id, txid, hash):

if not hasattr(response, "status_code"):
msg = f"Failed to get a response for decrypt DDO with provider={provider_url}, payload={payload}, response={response}"
if es_instance:
es_instance.update_did_state(contract_address, chain_id, txid, False, msg)
logger.error(msg)
raise Exception(f"in decrypt_ddo: {msg}")

if response.status_code == 201:
if sha256(response.content).hexdigest() != hash.hex():
msg = f"Hash check failed: response={response.content}, encoded response={sha256(response.content).hexdigest()}\n metadata hash={hash.hex()}"
logger.error(msg)
if es_instance:
es_instance.update_did_state(
contract_address, chain_id, txid, False, msg
)
raise Exception(f"in decrypt_ddo: {msg}")
logger.info("Decrypted DDO successfully.")
response_content = response.content.decode("utf-8")

return json.loads(response_content)

if response.status_code == 403:
# unauthorised decrypter
logger.info(f"403, response={response.content}")
msg = f"403, response={response.content}"
if es_instance:
es_instance.update_did_state(contract_address, chain_id, txid, False, msg)
logger.info(msg)
return False

msg = f"Provider exception on decrypt DDO. Status:{response.status_code}, {response.content}\n provider URL={provider_url}, payload={payload}."
if es_instance:
es_instance.update_did_state(contract_address, chain_id, txid, False, msg)
logger.error(msg)
raise Exception(f"in decrypt_ddo: {msg}")
Loading