Skip to content

Commit

Permalink
Update price and change orderCount calcs. (#862)
Browse files Browse the repository at this point in the history
* Update price and change orderCount calcs.
* Make price a float.
* Intermediary work for FRE Creations event processing.
* Adds ExchangeRateChanged handling.
* Adds dispenser created handling.
* Update pricing to an object.
* Fix graphql tests.
  • Loading branch information
calina-c authored Aug 12, 2022
1 parent 25a87ce commit 6e1e880
Show file tree
Hide file tree
Showing 9 changed files with 280 additions and 57 deletions.
10 changes: 5 additions & 5 deletions aquarius/app/es_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,24 +136,24 @@
},
"price": {
"properties": {
"datatoken": {
"value": {
"type": "double",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"ocean": {
"type": "double",
"tokenAddress": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"value": {
"type": "double",
"tokenSymbol": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
Expand Down
3 changes: 3 additions & 0 deletions aquarius/events/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ class EventTypes(SimpleEnum):
EVENT_METADATA_STATE = "MetadataState"
EVENT_ORDER_STARTED = "OrderStarted"
EVENT_TOKEN_URI_UPDATE = "TokenURIUpdate"
EVENT_EXCHANGE_CREATED = "ExchangeCreated"
EVENT_EXCHANGE_RATE_CHANGED = "ExchangeRateChanged"
EVENT_DISPENSER_CREATED = "DispenserCreated"


class MetadataStates(IntEnum):
Expand Down
92 changes: 68 additions & 24 deletions aquarius/events/events_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
from aquarius.events.util import (
get_metadata_start_block,
get_defined_block,
get_fre,
get_dispenser,
)
from artifacts import ERC20Template, ERC721Template
from web3.logs import DISCARD
Expand Down Expand Up @@ -187,7 +189,7 @@ def process_block_range(self, from_block, to_block):
to_block,
)

self.handle_order_started(from_block, to_block)
self.handle_price_change(from_block, to_block)
self.handle_token_uri_update(from_block, to_block)

self.store_last_processed_block(to_block)
Expand Down Expand Up @@ -232,32 +234,68 @@ def handle_regular_event_processor(
f"event={event}"
)

def handle_order_started(self, from_block, to_block):
events = self.get_event_logs(
EventTypes.EVENT_ORDER_STARTED, from_block, to_block
)

for event in events:
erc20_contract = self._web3.eth.contract(
abi=ERC20Template.abi,
address=self._web3.toChecksumAddress(event.address),
)

logger.debug(f"OrderStarted detected on ERC20 contract {event.address}.")

try:
event_processor = OrderStartedProcessor(
erc20_contract.caller.getERC721Address(),
self._es_instance,
to_block,
self._chain_id,
def handle_price_change(self, from_block, to_block):
fre = get_fre(self._web3, self._chain_id)
dispenser = get_dispenser(self._web3, self._chain_id)

for event_name in [
EventTypes.EVENT_ORDER_STARTED,
EventTypes.EVENT_EXCHANGE_CREATED,
EventTypes.EVENT_EXCHANGE_RATE_CHANGED,
EventTypes.EVENT_DISPENSER_CREATED,
]:
events = self.get_event_logs(event_name, from_block, to_block)

for event in events:
if event_name == EventTypes.EVENT_EXCHANGE_CREATED:
receipt = self._web3.eth.get_transaction_receipt(
event.transactionHash.hex()
)
erc20_address = receipt.to
elif event_name == EventTypes.EVENT_EXCHANGE_RATE_CHANGED:
receipt = self._web3.eth.get_transaction_receipt(
event.transactionHash.hex()
)
exchange_id = (
fre.events.ExchangeRateChanged()
.processReceipt(receipt)[0]
.args.exchangeId
)
erc20_address = fre.caller.getExchange(exchange_id)[1]
elif event_name == EventTypes.EVENT_DISPENSER_CREATED:
receipt = self._web3.eth.get_transaction_receipt(
event.transactionHash.hex()
)
erc20_address = (
dispenser.events.DispenserCreated()
.processReceipt(receipt)[0]
.args.datatokenAddress
)
else:
erc20_address = event.address

erc20_contract = self._web3.eth.contract(
abi=ERC20Template.abi,
address=self._web3.toChecksumAddress(erc20_address),
)
event_processor.process()
except Exception as e:
logger.error(
f"Error processing order started event: {e}\n" f"event={event}"

logger.debug(
f"{event_name} detected on ERC20 contract {event.address}."
)

try:
event_processor = OrderStartedProcessor(
erc20_contract.caller.getERC721Address(),
self._es_instance,
to_block,
self._chain_id,
)
event_processor.process()
except Exception as e:
logger.error(
f"Error processing {event_name} event: {e}\n" f"event={event}"
)

def handle_token_uri_update(self, from_block, to_block):
events = self.get_event_logs(
EventTypes.EVENT_TOKEN_URI_UPDATE, from_block, to_block
Expand Down Expand Up @@ -386,6 +424,12 @@ def get_event_logs(self, event_name, from_block, to_block, chunk_size=1000):
hash_text = "MetadataState(address,uint8,uint256,uint256)"
elif event_name == EventTypes.EVENT_TOKEN_URI_UPDATE:
hash_text = "TokenURIUpdate(address,string,uint256,uint256,uint256)"
elif event_name == EventTypes.EVENT_EXCHANGE_CREATED:
hash_text = "ExchangeCreated(bytes32,address,address,address,uint256)"
elif event_name == EventTypes.EVENT_EXCHANGE_RATE_CHANGED:
hash_text = "ExchangeRateChanged(bytes32,address,uint256)"
elif event_name == EventTypes.EVENT_DISPENSER_CREATED:
hash_text = "DispenserCreated(address,address,uint256,uint256,address)"
else:
hash_text = (
"OrderStarted(address,address,uint256,uint256,uint256,address,uint256)"
Expand Down
14 changes: 7 additions & 7 deletions aquarius/events/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from aquarius.events.decryptor import decrypt_ddo
from aquarius.events.proof_checker import check_metadata_proofs
from aquarius.events.util import make_did, get_dt_factory
from aquarius.graphql import get_number_orders
from aquarius.graphql import get_number_orders_price
from aquarius.rbac import RBAC
from artifacts import ERC20Template, ERC721Template
from web3.logs import DISCARD
Expand Down Expand Up @@ -91,11 +91,10 @@ def add_aqua_data(self, record):

record[AquariusCustomDDOFields.DATATOKENS] = self.get_tokens_info(record)

record[AquariusCustomDDOFields.STATS] = {
"orders": get_number_orders(
self.dt_contract.address, self.block, self._chain_id
)
}
order_count, price = get_number_orders_price(
self.dt_contract.address, self.block, self._chain_id
)
record[AquariusCustomDDOFields.STATS] = {"orders": order_count, "price": price}

return record, block_time

Expand Down Expand Up @@ -427,10 +426,11 @@ def process(self):
return

logger.debug(f"Retrieving number of orders for {self.token_address}.")
number_orders = get_number_orders(
number_orders, price = get_number_orders_price(
self.token_address, self.last_sync_block, self.chain_id
)
self.asset["stats"]["orders"] = number_orders
self.asset["stats"]["price"] = price

logger.debug(f"Updating number of orders to {number_orders} for {self.did}.")
self.es_instance.update(self.asset, self.did)
Expand Down
33 changes: 27 additions & 6 deletions aquarius/events/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from addresses import address as contract_addresses
from aquarius.app.util import get_bool_env_value
from aquarius.events.http_provider import get_web3_connection_provider
from artifacts import ERC721Factory
from artifacts import ERC721Factory, FixedRateExchange, Dispenser
from web3.logs import DISCARD


Expand Down Expand Up @@ -122,28 +122,49 @@ def deploy_datatoken(w3, account, name, symbol):
raise Exception(f"tx not found: {tx_hash.hex()}")


def get_dt_factory(web3, chain_id=None):
def get_address_of_type(web3, chain_id=None, address_type=None):
chain_id = chain_id if chain_id else web3.eth.chain_id
address_file = get_address_file()

with open(address_file) as f:
address_json = json.load(f)

correspondence = {
elem["chainId"]: elem["ERC721Factory"]
elem["chainId"]: elem[address_type]
for elem in address_json.values()
if "chainId" in elem and "ERC721Factory" in elem
if "chainId" in elem and address_type in elem
}

if chain_id not in correspondence:
raise Exception("No ERC721Factory factory configured for chain id")
raise Exception(b"No {address_type} factory configured for chain id")

return correspondence[chain_id]

address = correspondence[chain_id]

def get_dt_factory(web3, chain_id=None):
chain_id = chain_id if chain_id else web3.eth.chain_id
address = get_address_of_type(web3, chain_id, "ERC721Factory")
abi = ERC721Factory.abi

return web3.eth.contract(address=web3.toChecksumAddress(address), abi=abi)


def get_fre(web3, chain_id=None):
chain_id = chain_id if chain_id else web3.eth.chain_id
address = get_address_of_type(web3, chain_id, "FixedPrice")
abi = FixedRateExchange.abi

return web3.eth.contract(address=web3.toChecksumAddress(address), abi=abi)


def get_dispenser(web3, chain_id=None):
chain_id = chain_id if chain_id else web3.eth.chain_id
address = get_address_of_type(web3, chain_id, "Dispenser")
abi = Dispenser.abi

return web3.eth.contract(address=web3.toChecksumAddress(address), abi=abi)


def get_address_file():
"""Returns Path to the address.json file
Checks envvar first, fallback to address.json included with ocean-contracts.
Expand Down
53 changes: 46 additions & 7 deletions aquarius/graphql.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
import os
import time
from web3.main import Web3

from gql import Client, gql
from gql.transport.aiohttp import AIOHTTPTransport
Expand All @@ -17,7 +18,25 @@
aiohttp_logger.setLevel(logging.WARNING)


def get_number_orders(token_address, last_sync_block, chain_id):
class Price:
def __init__(self, value):
self.value = float(value)
self.token_address = None
self.token_symbol = None

def as_dict(self):
result = {"value": self.value}

if self.token_address:
result["tokenAddress"] = self.token_address

if self.token_symbol:
result["tokenSymbol"] = self.token_symbol

return result


def get_number_orders_price(token_address, last_sync_block, chain_id):
try:
client = get_client(chain_id)

Expand All @@ -29,16 +48,36 @@ def get_number_orders(token_address, last_sync_block, chain_id):
last_block = get_last_block(client)
time.sleep(2)

did_query = gql('{ nft(id: "' + token_address.lower() + '") { orderCount } }')
result = client.execute(did_query)

logger.debug(f"Got result for did query: {result}.")
return int(result["nft"]["orderCount"])
query = gql(
'{tokens(where:{nft:"'
+ token_address.lower()
+ '"}){orderCount, fixedRateExchanges{ price, baseToken {symbol, address} }, dispensers{id}}}'
)
tokens_result = client.execute(query)
logger.debug(f"Got result for did query: {tokens_result}.")

order_count = tokens_result["tokens"][0]["orderCount"]
price = None
fres = tokens_result["tokens"][0].get("fixedRateExchanges", None)
dispensers = tokens_result["tokens"][0].get("dispensers", None)
if fres and "price" in fres[0]:
price = Price(fres[0]["price"])
if "baseToken" in fres[0]:
price.token_address = Web3.toChecksumAddress(
fres[0]["baseToken"].get("address")
)
price.token_symbol = fres[0]["baseToken"].get("symbol")
elif dispensers:
price = Price(0)

price_obj = price.as_dict() if price else {}

return int(order_count), price_obj
except Exception:
logger.exception(
f"Can not get number of orders for subgraph {get_network_name()} token address {token_address}"
)
return -1
return -1, {}


def get_transport(chain_id):
Expand Down
Loading

0 comments on commit 6e1e880

Please sign in to comment.