Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor main loop #928

Merged
merged 63 commits into from
Jan 6, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
f90e69e
refactor main loop
alexcos20 Oct 20, 2022
7b4bf41
optimize more
alexcos20 Oct 21, 2022
3af3955
fix test
alexcos20 Oct 21, 2022
46e839d
optimize more & better logic for failures
alexcos20 Oct 22, 2022
dabb388
better comment
alexcos20 Oct 22, 2022
459b117
remove log
alexcos20 Oct 22, 2022
ad362f4
use exists to speed up transferOwnership
alexcos20 Oct 27, 2022
946c937
force pytest versions
alexcos20 Oct 27, 2022
10a3420
Merge branch 'main' into bug/refactor_main_loop
alexcos20 Oct 27, 2022
5074ec1
fix tests
alexcos20 Oct 27, 2022
f1d7425
Merge branch 'bug/refactor_main_loop' of https://github.com/oceanprot…
alexcos20 Oct 27, 2022
76615f4
optimize more
alexcos20 Oct 28, 2022
a55a463
optimize more
alexcos20 Oct 28, 2022
9f4a94b
fix test
alexcos20 Oct 28, 2022
4a32c27
lint
alexcos20 Oct 28, 2022
859a4ec
fix test logic
alexcos20 Oct 28, 2022
6d324a1
fix test logic
alexcos20 Oct 28, 2022
b3d9bfa
log level change
alexcos20 Oct 28, 2022
919260d
add logs
alexcos20 Oct 28, 2022
696b654
more logs
alexcos20 Oct 28, 2022
823fa77
clean up receipt warnings
alexcos20 Oct 29, 2022
607dfb1
add more comments, optimize code
alexcos20 Nov 8, 2022
6ef8945
use es.exists instead of es.read
alexcos20 Nov 8, 2022
63692fa
revert 607dfb1
alexcos20 Nov 8, 2022
da5ef9d
split logs for one block flow
alexcos20 Nov 9, 2022
40a02f4
fix typo
alexcos20 Nov 9, 2022
71115b0
make function names more descriptive
alexcos20 Nov 9, 2022
cd5f57c
new retry queue (#940)
alexcos20 Nov 22, 2022
da01348
Feature/new transfer ownership (#943)
alexcos20 Nov 28, 2022
af4f91d
Merge branch 'main' into bug/refactor_main_loop
alexcos20 Nov 28, 2022
4216334
use ve_allocate_realtime instead of ve_allocate (#939)
alexcos20 Nov 28, 2022
2a91a7a
Upgrade to ES 8.5.1 (#944)
calina-c Nov 29, 2022
da03b51
Remove config object. (#945)
calina-c Nov 29, 2022
686be97
Fix multiple issues (#932 , #924 , #933) (#951)
alexcos20 Dec 8, 2022
b04d925
add minLenghts (#952)
alexcos20 Dec 8, 2022
c36e792
Merge branch 'main' into bug/refactor_main_loop
alexcos20 Dec 9, 2022
a87e66a
fix es 8.5
alexcos20 Dec 9, 2022
2bf54b5
make chunk_size 5k by default
alexcos20 Dec 9, 2022
19c1769
remove es, since we have it in barge
alexcos20 Dec 10, 2022
d0055ee
use proper schema
alexcos20 Dec 10, 2022
824f9a8
do things in parallel
alexcos20 Dec 10, 2022
930f405
customize EVENTS_MONITOR_SLEEP_TIME
alexcos20 Dec 10, 2022
6ee5159
add multiple timers
alexcos20 Dec 10, 2022
566bf58
update loglevel
alexcos20 Dec 10, 2022
98f7af4
handle non-existant metadatastate changes
alexcos20 Dec 10, 2022
ba6c1a1
add timers info
alexcos20 Dec 10, 2022
79d7e0e
fix test
alexcos20 Dec 10, 2022
f4f26e2
fix log statement
alexcos20 Dec 10, 2022
ea156e4
fix search
alexcos20 Dec 10, 2022
7e4005a
fix query
alexcos20 Dec 11, 2022
1893c77
more es8.5 updates
alexcos20 Dec 11, 2022
b16b0a4
fix TokenURIUpdatedProcessor
alexcos20 Dec 11, 2022
60fba37
lint
alexcos20 Dec 11, 2022
bab3e2f
proper use of timeout for requests
alexcos20 Dec 12, 2022
4f2d023
use timeouts for requests
alexcos20 Dec 12, 2022
6ece5fc
more logs for error queue
alexcos20 Dec 14, 2022
393e5de
make sure that did is using checksum addrs
alexcos20 Dec 14, 2022
e433c90
fix failing code by removing it :)
alexcos20 Dec 14, 2022
cc04325
fix tests
alexcos20 Dec 14, 2022
63e6fee
fix tests
alexcos20 Dec 14, 2022
537d5ae
Feature/retry queue max hold (#960)
alexcos20 Dec 15, 2022
1d7c0aa
add ddo states (#959)
alexcos20 Dec 16, 2022
f0f56d6
wait for es when deleting elements
alexcos20 Dec 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions aquarius/events/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,53 @@ class EventTypes(SimpleEnum):
EVENT_EXCHANGE_RATE_CHANGED = "ExchangeRateChanged"
EVENT_DISPENSER_CREATED = "DispenserCreated"
EVENT_TRANSFER = "Transfer"
list = [
{
"type": EVENT_METADATA_CREATED,
"text": "MetadataCreated(address,uint8,string,bytes,bytes,bytes32,uint256,uint256)",
"hash": "0x5463569dcc320958360074a9ab27e809e8a6942c394fb151d139b5f7b4ecb1bd",
},
{
"type": EVENT_METADATA_UPDATED,
"text": "MetadataUpdated(address,uint8,string,bytes,bytes,bytes32,uint256,uint256)",
"hash": "0xe5c4cf86b1815151e6f453e1e133d4454ae3b0b07145db39f2e0178685deac84",
},
{
"type": EVENT_METADATA_STATE,
"text": "MetadataState(address,uint8,uint256,uint256)",
"hash": "0xa8336411cc72db0e5bdc4dff989eeb35879bafaceffb59b54b37645c3395adb9",
},
{
"type": EVENT_ORDER_STARTED,
"text": "OrderStarted(address,address,uint256,uint256,uint256,address,uint256)",
"hash": "0xe1c4fa794edfa8f619b8257a077398950357b9c6398528f94480307352f9afcc",
},
{
"type": EVENT_TOKEN_URI_UPDATE,
"text": "TokenURIUpdate(address,string,uint256,uint256,uint256)",
"hash": "0x6de6cd3982065cbd31e789e3109106f4d76d1c8a46e85262045cf947fb3fd4ed",
},
{
"type": EVENT_EXCHANGE_CREATED,
"text": "ExchangeCreated(bytes32,address,address,address,uint256)",
"hash": "0xeb7a353641f7d3cc54b497ef1553fdc292b64d9cc3be8587c23dfba01f310b19",
},
{
"type": EVENT_EXCHANGE_RATE_CHANGED,
"text": "ExchangeRateChanged(bytes32,address,uint256)",
"hash": "0xe50f9919fdc524004a4ee0cb934f4734f144bec0713a52e5483b753f5de0f08c",
},
{
"type": EVENT_DISPENSER_CREATED,
"text": "DispenserCreated(address,address,uint256,uint256,address)",
"hash": "0x7d0aa581e6eb87e15f58588ff20c39ff6622fc796ec9bb664df6ed3eb02442c9",
},
{
"type": EVENT_TRANSFER,
"text": "Transfer(address,address,uint256)",
"hash": "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
},
]


class MetadataStates(IntEnum):
Expand Down
193 changes: 66 additions & 127 deletions aquarius/events/events_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,100 +234,29 @@ def process_block_range(self, from_block, to_block):
if from_block > to_block:
return

processor_args = [
self._es_instance,
self._web3,
self._allowed_publishers,
self.purgatory,
self._chain_id,
]

# event retrieval
all_events = self.get_all_events(from_block, to_block)

regular_event_processors = {
EventTypes.EVENT_METADATA_CREATED: MetadataCreatedProcessor,
EventTypes.EVENT_METADATA_UPDATED: MetadataUpdatedProcessor,
EventTypes.EVENT_METADATA_STATE: MetadataStateProcessor,
}

# event handling
for event_name, events_to_process in all_events.items():
if event_name == EventTypes.EVENT_TRANSFER:
logger.debug(
f"Starting handle_transfer_ownership for {len(events_to_process)} events"
)
self.handle_transfer_ownership(events_to_process)
elif event_name in regular_event_processors.keys():
logger.debug(
f"Starting handle_regular_event_processor for {len(events_to_process)} events"
)
self.handle_regular_event_processor(
event_name,
regular_event_processors[event_name],
processor_args,
events_to_process,
)
elif event_name in [
EventTypes.EVENT_ORDER_STARTED,
EventTypes.EVENT_EXCHANGE_CREATED,
EventTypes.EVENT_EXCHANGE_RATE_CHANGED,
EventTypes.EVENT_DISPENSER_CREATED,
]:
logger.debug(
f"Starting handle_price_change for {len(events_to_process)} events"
)
self.handle_price_change(event_name, events_to_process, to_block)
elif event_name == EventTypes.EVENT_TOKEN_URI_UPDATE:
logger.debug(
f"Starting handle_token_uri_update for {len(events_to_process)} events"
)
self.handle_token_uri_update(events_to_process)
else:
logger.warning(f"Unknown event {event_name}")
logger.warning(events_to_process)
self.store_last_processed_block(to_block)
self.get_all_events(from_block, to_block)

def get_all_events(self, from_block, to_block):
logger.debug(
f"**************Getting all events between {from_block} to {to_block}"
)
all_events = {
EventTypes.EVENT_TRANSFER: [],
# "regular" events
EventTypes.EVENT_METADATA_CREATED: [],
EventTypes.EVENT_METADATA_UPDATED: [],
EventTypes.EVENT_METADATA_STATE: [],
# price changed events
EventTypes.EVENT_ORDER_STARTED: [],
EventTypes.EVENT_EXCHANGE_CREATED: [],
EventTypes.EVENT_EXCHANGE_RATE_CHANGED: [],
EventTypes.EVENT_DISPENSER_CREATED: [],
#
EventTypes.EVENT_TOKEN_URI_UPDATE: [],
}

if from_block >= to_block:
alexcos20 marked this conversation as resolved.
Show resolved Hide resolved
return all_events
return

try:
for event_name in all_events.keys():
all_events[event_name] = self.get_event_logs(
event_name, from_block, to_block
)
self.get_event_logs(from_block, to_block)

return all_events
except Exception:
logger.info(f"Failed to get events from {from_block} to {to_block}")
middle = int((from_block + to_block) / 2)
middle_plus = middle + 1
logger.info(
f"Splitting in two: {from_block} -> {middle} and {middle_plus} to {to_block}"
)
return merge_list_dictionary(
self.get_all_events(from_block, middle),
self.get_all_events(middle_plus, to_block),
)
self.get_all_events(from_block, middle)
self.get_all_events(middle_plus, to_block)
return

def handle_regular_event_processor(
self, event_name, processor, processor_args, events
Expand Down Expand Up @@ -574,70 +503,80 @@ def get_assets_in_chain(self):

return object_list

def get_event_logs(self, event_name, from_block, to_block, chunk_size=1000):
if event_name not in EventTypes.get_all_values():
return []

if event_name == EventTypes.EVENT_METADATA_CREATED:
hash_text = "MetadataCreated(address,uint8,string,bytes,bytes,bytes32,uint256,uint256)"
elif event_name == EventTypes.EVENT_METADATA_UPDATED:
hash_text = "MetadataUpdated(address,uint8,string,bytes,bytes,bytes32,uint256,uint256)"
elif event_name == EventTypes.EVENT_METADATA_STATE:
hash_text = "MetadataState(address,uint8,uint256,uint256)"
elif event_name == EventTypes.EVENT_TOKEN_URI_UPDATE:
hash_text = "TokenURIUpdate(address,string,uint256,uint256,uint256)"
elif event_name == EventTypes.EVENT_EXCHANGE_CREATED:
hash_text = "ExchangeCreated(bytes32,address,address,address,uint256)"
elif event_name == EventTypes.EVENT_EXCHANGE_RATE_CHANGED:
hash_text = "ExchangeRateChanged(bytes32,address,uint256)"
elif event_name == EventTypes.EVENT_DISPENSER_CREATED:
hash_text = "DispenserCreated(address,address,uint256,uint256,address)"
elif event_name == EventTypes.EVENT_TRANSFER:
hash_text = "Transfer(address,address,uint256)"
else:
hash_text = (
"OrderStarted(address,address,uint256,uint256,uint256,address,uint256)"
)

event_signature_hash = self._web3.keccak(text=hash_text).hex()

def get_event_logs(self, from_block, to_block, chunk_size=1000):
_from = from_block
_to = min(_from + chunk_size - 1, to_block)

logger.debug(
f"Searching for {event_name} events on chain {self._chain_id} "
f"Searching for events events on chain {self._chain_id} "
f"in blocks {from_block} to {to_block}."
)

filter_params = {
"topics": [event_signature_hash],
"topics": [[d["hash"] for d in EventTypes.list]],
"fromBlock": _from,
"toBlock": _to,
}

all_logs = []
while _from <= to_block:
# Search current chunk
logs = self._web3.eth.get_logs(filter_params)
all_logs.extend(logs)
if (_from - from_block) % 1000 == 0:
logger.debug(
f"Searched blocks {_from} to {_to} on chain {self._chain_id}; "
f"{len(all_logs)} {event_name} events detected so far."
)

# Prepare for next chunk
_from = _to + 1
_to = min(_from + chunk_size - 1, to_block)
filter_params.update({"fromBlock": _from, "toBlock": _to})
processor_args = [
self._es_instance,
self._web3,
self._allowed_publishers,
self.purgatory,
self._chain_id,
]

logger.info(
f"Finished searching for {event_name} events on chain {self._chain_id} "
f"in blocks {from_block} to {to_block}. "
f"{len(all_logs)} {event_name} events detected."
)
logs = self._web3.eth.get_logs(filter_params)
logger.debug(f"{len(logs)} events detected so far.")
# event handling
for event in logs:
match = next(
(
item
alexcos20 marked this conversation as resolved.
Show resolved Hide resolved
for item in EventTypes.list
if item["hash"] == event.topics[0].hex()
),
None,
)
if match is None:
logger.warning(f"Unknown event ")
logger.warning([event])
continue
if match["type"] == EventTypes.EVENT_TRANSFER:
self.handle_transfer_ownership([event])
elif match["type"] == EventTypes.EVENT_METADATA_CREATED:
calina-c marked this conversation as resolved.
Show resolved Hide resolved
self.handle_regular_event_processor(
match["type"],
MetadataCreatedProcessor,
processor_args,
[event],
)
elif match["type"] == EventTypes.EVENT_METADATA_UPDATED:
self.handle_regular_event_processor(
match["type"],
MetadataUpdatedProcessor,
processor_args,
[event],
)
elif match["type"] == EventTypes.EVENT_METADATA_STATE:
self.handle_regular_event_processor(
match["type"],
MetadataStateProcessor,
processor_args,
[event],
)
elif match["type"] in [
EventTypes.EVENT_ORDER_STARTED,
EventTypes.EVENT_EXCHANGE_CREATED,
EventTypes.EVENT_EXCHANGE_RATE_CHANGED,
EventTypes.EVENT_DISPENSER_CREATED,
]:
self.handle_price_change(match["type"], [event], to_block)
elif match["type"] == EventTypes.EVENT_TOKEN_URI_UPDATE:
self.handle_token_uri_update([event])

return all_logs
self.store_last_processed_block(_to)
return


def merge_list_dictionary(dict_1, dict_2):
Expand Down
4 changes: 0 additions & 4 deletions tests/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,6 @@ def test_add_chain_id_to_chains_list(events_object):
assert events_object.add_chain_id_to_chains_list() is None


def test_get_event_logs(events_object):
assert events_object.get_event_logs("NonExistentEvent", 0, 10) == []


def test_order_started(events_object, client, base_ddo_url):
web3 = events_object._web3 # get_web3()
block = web3.eth.block_number
Expand Down