Skip to content

Commit

Permalink
feat: merkle-maker + MerkleFeed mixin (#178)
Browse files Browse the repository at this point in the history
* feat(merkle_feed_mixin): MerkleFeedMixin init

* feat(merkle_feed_mixin): Stuff

* feat(merkle_feed_mixin): Fixes
p

* feat(merkle_feed_mixin): Fixed merge stuff

* feat(merkle_feed_mixin): no any return

* feat(merkle_feed_mixin): Network prefix for Redis

* feat(merkle_feed_mixin): get key util

* feat(merkle_feed_mixin): Var name

* feat(merkle_feed_mixin): Fixed offchain serialisation

* feat(merkle_feed_mixin): better merkle maker

* feat(merkle_feed_mixin): oops pragma oracle

* feat(merkle_feed_mixin): 🧹 brooming

* feat(merkle_feed_mixin): New redis storage

* feat(merkle_feed_mixin): unused constant

* feat(merkle_feed_mixin): Fixed ABIs

* feat(merkle_feed_mixin): Testing generic + remove exception handling

* feat(merkle_feed_mixin): BigDecimal stuff

* feat(merkle_feed_mixin): Storing merkle tree root hash as hexa str

* feat(merkle_feed_mixin): Storing merkle tree as hex strings

* feat(merkle_feed_mixin): __hash__ method fucked me 😹😹😹😹😹😹😹

* feat(merkle_feed_mixin): Fixed test 😀😀😀😀

* feat(merkle_feed_mixin): Fixes from review

* feat(merkle_feed_mixin): Devnet option

* feat(merkle_feed_mixin): Check block processing

* fix: add set expiry

* feat(merkle_feed_mixin): Storing latest block published

* feat(merkle_feed_mixin): Docs

* feat(merkle_feed_mixin): Store block as bytes not json

* fix: typo

* feat(merkle_feed_mixin): Fixed MerkleFeedMixin + Docs

* feat(merkle_feed_mixin): Fixed conversion

* chore: update contract

* feat: merkle feed mixin tests

* ci: add test

* fix: mypy

* fix: ruff

* Update tests.yml

---------

Co-authored-by: 0xevolve <[email protected]>
  • Loading branch information
akhercha and EvolveArt authored Aug 7, 2024
1 parent ca901df commit f4003aa
Show file tree
Hide file tree
Showing 29 changed files with 762 additions and 315 deletions.
1 change: 1 addition & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
poetry run poe test_client
poetry run poe test_vrf
poetry run poe test_fetchers
poetry run poe test_merkle_feed
poetry run poe test_unit
else
poetry run poe test
Expand Down
66 changes: 12 additions & 54 deletions merkle-maker/merkle_maker/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,28 @@
import logging

from pydantic import HttpUrl
from typing import Optional, Literal, Never
from typing import Optional

from pragma_sdk.common.types.pair import Pair
from pragma_sdk.common.fetchers.fetcher_client import FetcherClient
from pragma_sdk.common.fetchers.generic_fetchers import DeribitOptionsFetcher
from pragma_sdk.common.fetchers.generic_fetchers.deribit.fetcher import DeribitOptionsFetcher

from pragma_sdk.onchain.types.types import PrivateKey
from pragma_sdk.onchain.types.types import PrivateKey, NetworkName
from pragma_sdk.onchain.client import PragmaOnChainClient

from pragma_utils.logger import setup_logging
from pragma_utils.cli import load_private_key_from_cli_arg

from merkle_maker.redis import RedisManager
from merkle_maker.publisher import MerkleFeedPublisher

logger = logging.getLogger(__name__)

TIME_TO_WAIT_BETWEEN_BLOCK_NUMBER_POLLING = 1


async def main(
network: Literal["mainnet", "sepolia"],
network: NetworkName,
redis_host: str,
publisher_name: str,
publisher_address: str,
Expand Down Expand Up @@ -54,59 +55,16 @@ async def main(
)
fetcher_client.add_fetcher(deribit_fetcher)

logger.info("🧩 Starting the Merkle Maker...\n")
await _publish_merkle_feeds_forever(
publisher = MerkleFeedPublisher(
network=network,
pragma_client=pragma_client,
fetcher_client=fetcher_client,
redis_manager=redis_manager,
block_interval=block_interval,
time_to_wait_between_block_number_polling=TIME_TO_WAIT_BETWEEN_BLOCK_NUMBER_POLLING,
)


async def _publish_merkle_feeds_forever(
pragma_client: PragmaOnChainClient,
fetcher_client: FetcherClient,
redis_manager: RedisManager,
block_interval: int,
) -> Never:
"""
Publish a new Merkle Feed on chain every [block_interval] block(s) forever.
We store the merkle tree and the options used to generate the merkle root
to a Redis database that will get consumed by our Rust service.
"""
deribit_fetcher: DeribitOptionsFetcher = fetcher_client.fetchers[0] # type: ignore[assignment]
while True:
# TODO: In case of a restart, check if a merkle feed already exists for the
# current block.
current_block = await pragma_client.get_block_number()
logger.info(f"Current block: {current_block}")

logger.info("🔍 Fetching the deribit options...")
entries = await fetcher_client.fetch()

logger.info("🎣 Publishing the merkle root onchain...")
try:
await pragma_client.publish_entries(entries) # type: ignore[arg-type]
logger.info("... done!")
except Exception:
# TODO: remove this part when the contract has been updated
logger.warning("Could not publish! Contract not yet updated.")

logger.info("🏭 Storing the merkle tree & options in Redis...")
success_store = redis_manager.store_latest_data(deribit_fetcher.latest_data)
if not success_store:
raise RuntimeError("Could not store the latest data to the Redis instance.")

logger.info(f"✅ Block {current_block} done!\n")
next_block = current_block + block_interval
logger.info(f"⏳ Waiting for block {next_block}...")

while True:
await asyncio.sleep(TIME_TO_WAIT_BETWEEN_BLOCK_NUMBER_POLLING)
new_block = await pragma_client.get_block_number()
if new_block >= next_block:
logger.info(f"⌛ ... reached block {new_block}!\n")
break
logger.info(f"🧩 Starting the Merkle Maker for {network}...\n")
await publisher.publish_forever()


@click.command()
Expand All @@ -125,7 +83,7 @@ async def _publish_merkle_feeds_forever(
required=True,
default="sepolia",
type=click.Choice(
["sepolia", "mainnet"],
["sepolia", "mainnet", "devnet"],
case_sensitive=False,
),
help="On which networks the checkpoints will be set.",
Expand Down Expand Up @@ -179,7 +137,7 @@ async def _publish_merkle_feeds_forever(
)
def cli_entrypoint(
log_level: str,
network: Literal["mainnet", "sepolia"],
network: NetworkName,
redis_host: str,
rpc_url: Optional[HttpUrl],
publisher_name: str,
Expand Down
147 changes: 147 additions & 0 deletions merkle-maker/merkle_maker/publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import asyncio
import logging

from typing import Never, List
from starknet_py.contract import InvokeResult

from pragma_sdk.common.fetchers.fetcher_client import FetcherClient
from pragma_sdk.common.fetchers.generic_fetchers.deribit.fetcher import DeribitOptionsFetcher

from pragma_sdk.onchain.client import PragmaOnChainClient
from pragma_sdk.onchain.types.types import NetworkName


from merkle_maker.redis import RedisManager

logger = logging.getLogger(__name__)

TIME_TO_SLEEP_BETWEEN_RETRIES = 3


class MerkleFeedPublisher:
"""
Class responsible of querying the latest options, publishing them on-chain
and in our Redis database.
TODO: Implement automatic cleanup so we only keep the latest 100/1000 blocks?
"""

network: NetworkName
pragma_client: PragmaOnChainClient
fetcher_client: FetcherClient
redis_manager: RedisManager
block_interval: int
time_to_wait_between_block_number_polling: int

def __init__(
self,
network: NetworkName,
pragma_client: PragmaOnChainClient,
fetcher_client: FetcherClient,
redis_manager: RedisManager,
block_interval: int = 1,
time_to_wait_between_block_number_polling: int = 1,
):
assert len(fetcher_client.fetchers) == 1
assert isinstance(fetcher_client.fetchers[0], DeribitOptionsFetcher)

self.network = network
self.pragma_client = pragma_client
self.fetcher_client = fetcher_client
self.redis_manager = redis_manager
self.block_interval = block_interval
self.time_to_wait_between_block_number_polling = time_to_wait_between_block_number_polling

@property
def deribit_fetcher(self) -> DeribitOptionsFetcher:
# We know for sure that fetchers[0] is DeribitOptionsFetcher, see assertions above.
return self.fetcher_client.fetchers[0] # type: ignore[return-value]

async def publish_forever(self) -> Never:
"""
Publish a new Merkle Feed on chain every [self.block_interval] block(s)
forever.
We store the merkle tree and the options used to generate the merkle root
to a Redis database that will get consumed by our Rust services.
"""
current_block = await self.pragma_client.get_block_number()
while True:
logger.info(f"Current block: {current_block}")

if self._current_block_is_not_processed(current_block):
try:
await self._publish_and_store(current_block=current_block)
except Exception:
logger.error(
f"⛔ Publishing for block {current_block} failed. "
f"Retrying in {TIME_TO_SLEEP_BETWEEN_RETRIES} seconds...\n"
)
await asyncio.sleep(TIME_TO_SLEEP_BETWEEN_RETRIES)
continue
else:
logger.info(f"🫷 Block {current_block} is already processed!\n")

next_block = current_block + self.block_interval
logger.info(f"⏳ Waiting for block {next_block}...")

while True:
await asyncio.sleep(self.time_to_wait_between_block_number_polling)
new_block = await self.pragma_client.get_block_number()
if new_block >= next_block:
logger.info(f"⌛ ... reached block {new_block}!\n")
current_block = new_block
break

async def _publish_and_store(
self,
current_block: int,
) -> None:
"""
Retrieves the options from Deribit, publish the merkle root on-chain for
the network and block_number.
When done, store to Redis the merkle tree and the options used.
"""
logger.info("🔍 Fetching the deribit options...")
entries = await self.fetcher_client.fetch()
logger.info("... fetched!")

logger.info("🎣 Publishing the merkle root onchain...")
invocations = await self.pragma_client.publish_many(entries) # type: ignore[arg-type]
await self._wait_for_txs_acceptance(invocations)
logger.info("... published!")

logger.info("🏭 Storing the merkle tree & options in Redis...")
latest_data = self.deribit_fetcher.latest_data
success_store = self.redis_manager.store_block_data(
self.network, current_block, latest_data
)
if not success_store:
raise RuntimeError(f"Could not store data for block {current_block} to Redis.")
else:
logger.info("... stored!")

logger.info(f"✅ Block {current_block} done!\n")

async def _wait_for_txs_acceptance(self, invocations: List[InvokeResult]):
"""
Wait for all the transactions in the passed list to be accepted on-chain.
Raises an error if one transaction is not accepted.
"""
for invocation in invocations:
nonce = invocation.invoke_transaction.nonce
logger.info(
f" ⏳ waiting for TX {hex(invocation.hash)} (nonce={nonce}) to be accepted..."
)
await invocation.wait_for_acceptance(check_interval=1)

def _current_block_is_not_processed(
self,
block_number: int,
) -> bool:
"""
Check if the current block is already processed.
"""
latest_published_block = self.redis_manager.get_latest_published_block(self.network)
if latest_published_block is None:
return True
return block_number > latest_published_block
Loading

0 comments on commit f4003aa

Please sign in to comment.