Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: merkle-maker + MerkleFeed mixin #178

Merged
merged 40 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
8213375
feat(merkle_feed_mixin): MerkleFeedMixin init
akhercha Jul 29, 2024
f5e4dca
feat(merkle_feed_mixin): Stuff
akhercha Jul 30, 2024
4c88bcb
Merge branch 'master' into feat/merkle_feed_mixin
akhercha Jul 30, 2024
8619a35
feat(merkle_feed_mixin): Fixes
akhercha Jul 31, 2024
63b2551
Merge branch 'master' into feat/merkle_feed_mixin
akhercha Jul 31, 2024
933e472
feat(merkle_feed_mixin): Fixed merge stuff
akhercha Jul 31, 2024
9b13043
feat(merkle_feed_mixin): no any return
akhercha Jul 31, 2024
bb553f5
feat(merkle_feed_mixin): Network prefix for Redis
akhercha Aug 1, 2024
76c1278
feat(merkle_feed_mixin): get key util
akhercha Aug 1, 2024
3d36015
feat(merkle_feed_mixin): Var name
akhercha Aug 1, 2024
a65fd54
feat(merkle_feed_mixin): Fixed offchain serialisation
akhercha Aug 1, 2024
0106351
feat(merkle_feed_mixin): better merkle maker
akhercha Aug 3, 2024
60e3300
feat(merkle_feed_mixin): oops pragma oracle
akhercha Aug 3, 2024
51413b5
feat(merkle_feed_mixin): 🧹 brooming
akhercha Aug 3, 2024
7b1221d
feat(merkle_feed_mixin): New redis storage
akhercha Aug 3, 2024
1c0d255
feat(merkle_feed_mixin): unused constant
akhercha Aug 3, 2024
c7b3873
feat(merkle_feed_mixin): Fixed ABIs
akhercha Aug 3, 2024
8bb1157
feat(merkle_feed_mixin): Testing generic + remove exception handling
akhercha Aug 3, 2024
af5a8d8
feat(merkle_feed_mixin): BigDecimal stuff
akhercha Aug 4, 2024
6ed9f2c
feat(merkle_feed_mixin): Storing merkle tree root hash as hexa str
akhercha Aug 4, 2024
cc1eab3
feat(merkle_feed_mixin): Storing merkle tree as hex strings
akhercha Aug 4, 2024
9feccb1
feat(merkle_feed_mixin): __hash__ method fucked me 😹😹😹😹😹😹😹
akhercha Aug 4, 2024
36b96a6
feat(merkle_feed_mixin): Fixed test 😀😀😀😀
akhercha Aug 4, 2024
b5c5b7f
feat(merkle_feed_mixin): Fixes from review
akhercha Aug 5, 2024
9a63397
feat(merkle_feed_mixin): Devnet option
akhercha Aug 5, 2024
1b32687
feat(merkle_feed_mixin): Check block processing
akhercha Aug 5, 2024
a03b10e
fix: add set expiry
EvolveArt Aug 5, 2024
265f16b
feat(merkle_feed_mixin): Storing latest block published
akhercha Aug 6, 2024
47e64c5
feat(merkle_feed_mixin): Docs
akhercha Aug 6, 2024
bd5e8d9
feat(merkle_feed_mixin): Store block as bytes not json
akhercha Aug 6, 2024
c2ce288
fix: typo
EvolveArt Aug 7, 2024
ecafa97
feat(merkle_feed_mixin): Fixed MerkleFeedMixin + Docs
akhercha Aug 7, 2024
16e50df
feat(merkle_feed_mixin): Fixed conversion
akhercha Aug 7, 2024
7101736
chore: update contract
EvolveArt Aug 7, 2024
4748732
Merge branch 'master' into feat/merkle_feed_mixin
EvolveArt Aug 7, 2024
d7a0ec9
feat: merkle feed mixin tests
EvolveArt Aug 7, 2024
38b083a
ci: add test
EvolveArt Aug 7, 2024
b4c9af0
fix: mypy
EvolveArt Aug 7, 2024
050de99
fix: ruff
EvolveArt Aug 7, 2024
3140c56
Update tests.yml
EvolveArt Aug 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
poetry run poe test_client
poetry run poe test_vrf
poetry run poe test_fetchers
poetry run poe test_merkle_feed
poetry run poe test_unit
else
poetry run poe test
Expand Down
66 changes: 12 additions & 54 deletions merkle-maker/merkle_maker/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,28 @@
import logging

from pydantic import HttpUrl
from typing import Optional, Literal, Never
from typing import Optional

from pragma_sdk.common.types.pair import Pair
from pragma_sdk.common.fetchers.fetcher_client import FetcherClient
from pragma_sdk.common.fetchers.generic_fetchers import DeribitOptionsFetcher
from pragma_sdk.common.fetchers.generic_fetchers.deribit.fetcher import DeribitOptionsFetcher

from pragma_sdk.onchain.types.types import PrivateKey
from pragma_sdk.onchain.types.types import PrivateKey, NetworkName
from pragma_sdk.onchain.client import PragmaOnChainClient

from pragma_utils.logger import setup_logging
from pragma_utils.cli import load_private_key_from_cli_arg

from merkle_maker.redis import RedisManager
from merkle_maker.publisher import MerkleFeedPublisher

logger = logging.getLogger(__name__)

TIME_TO_WAIT_BETWEEN_BLOCK_NUMBER_POLLING = 1


async def main(
network: Literal["mainnet", "sepolia"],
network: NetworkName,
redis_host: str,
publisher_name: str,
publisher_address: str,
Expand Down Expand Up @@ -54,59 +55,16 @@ async def main(
)
fetcher_client.add_fetcher(deribit_fetcher)

logger.info("🧩 Starting the Merkle Maker...\n")
await _publish_merkle_feeds_forever(
publisher = MerkleFeedPublisher(
network=network,
pragma_client=pragma_client,
fetcher_client=fetcher_client,
redis_manager=redis_manager,
block_interval=block_interval,
time_to_wait_between_block_number_polling=TIME_TO_WAIT_BETWEEN_BLOCK_NUMBER_POLLING,
)


async def _publish_merkle_feeds_forever(
pragma_client: PragmaOnChainClient,
fetcher_client: FetcherClient,
redis_manager: RedisManager,
block_interval: int,
) -> Never:
"""
Publish a new Merkle Feed on chain every [block_interval] block(s) forever.
We store the merkle tree and the options used to generate the merkle root
to a Redis database that will get consumed by our Rust service.
"""
deribit_fetcher: DeribitOptionsFetcher = fetcher_client.fetchers[0] # type: ignore[assignment]
while True:
# TODO: In case of a restart, check if a merkle feed already exists for the
# current block.
current_block = await pragma_client.get_block_number()
logger.info(f"Current block: {current_block}")

logger.info("🔍 Fetching the deribit options...")
entries = await fetcher_client.fetch()

logger.info("🎣 Publishing the merkle root onchain...")
try:
await pragma_client.publish_entries(entries) # type: ignore[arg-type]
logger.info("... done!")
except Exception:
# TODO: remove this part when the contract has been updated
logger.warning("Could not publish! Contract not yet updated.")

logger.info("🏭 Storing the merkle tree & options in Redis...")
success_store = redis_manager.store_latest_data(deribit_fetcher.latest_data)
if not success_store:
raise RuntimeError("Could not store the latest data to the Redis instance.")

logger.info(f"✅ Block {current_block} done!\n")
next_block = current_block + block_interval
logger.info(f"⏳ Waiting for block {next_block}...")

while True:
await asyncio.sleep(TIME_TO_WAIT_BETWEEN_BLOCK_NUMBER_POLLING)
new_block = await pragma_client.get_block_number()
if new_block >= next_block:
logger.info(f"⌛ ... reached block {new_block}!\n")
break
logger.info(f"🧩 Starting the Merkle Maker for {network}...\n")
await publisher.publish_forever()


@click.command()
Expand All @@ -125,7 +83,7 @@ async def _publish_merkle_feeds_forever(
required=True,
default="sepolia",
type=click.Choice(
["sepolia", "mainnet"],
["sepolia", "mainnet", "devnet"],
case_sensitive=False,
),
help="On which networks the checkpoints will be set.",
Expand Down Expand Up @@ -179,7 +137,7 @@ async def _publish_merkle_feeds_forever(
)
def cli_entrypoint(
log_level: str,
network: Literal["mainnet", "sepolia"],
network: NetworkName,
redis_host: str,
rpc_url: Optional[HttpUrl],
publisher_name: str,
Expand Down
147 changes: 147 additions & 0 deletions merkle-maker/merkle_maker/publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import asyncio
import logging

from typing import Never, List
from starknet_py.contract import InvokeResult

from pragma_sdk.common.fetchers.fetcher_client import FetcherClient
from pragma_sdk.common.fetchers.generic_fetchers.deribit.fetcher import DeribitOptionsFetcher

from pragma_sdk.onchain.client import PragmaOnChainClient
from pragma_sdk.onchain.types.types import NetworkName


from merkle_maker.redis import RedisManager

logger = logging.getLogger(__name__)

TIME_TO_SLEEP_BETWEEN_RETRIES = 3


class MerkleFeedPublisher:
"""
Class responsible of querying the latest options, publishing them on-chain
and in our Redis database.

TODO: Implement automatic cleanup so we only keep the latest 100/1000 blocks?
"""

network: NetworkName
pragma_client: PragmaOnChainClient
fetcher_client: FetcherClient
redis_manager: RedisManager
block_interval: int
time_to_wait_between_block_number_polling: int

def __init__(
self,
network: NetworkName,
pragma_client: PragmaOnChainClient,
fetcher_client: FetcherClient,
redis_manager: RedisManager,
block_interval: int = 1,
time_to_wait_between_block_number_polling: int = 1,
):
assert len(fetcher_client.fetchers) == 1
assert isinstance(fetcher_client.fetchers[0], DeribitOptionsFetcher)

self.network = network
self.pragma_client = pragma_client
self.fetcher_client = fetcher_client
self.redis_manager = redis_manager
self.block_interval = block_interval
self.time_to_wait_between_block_number_polling = time_to_wait_between_block_number_polling

@property
def deribit_fetcher(self) -> DeribitOptionsFetcher:
# We know for sure that fetchers[0] is DeribitOptionsFetcher, see assertions above.
return self.fetcher_client.fetchers[0] # type: ignore[return-value]

async def publish_forever(self) -> Never:
"""
Publish a new Merkle Feed on chain every [self.block_interval] block(s)
forever.
We store the merkle tree and the options used to generate the merkle root
to a Redis database that will get consumed by our Rust services.
"""
current_block = await self.pragma_client.get_block_number()
while True:
logger.info(f"Current block: {current_block}")

if self._current_block_is_not_processed(current_block):
try:
await self._publish_and_store(current_block=current_block)
except Exception:
logger.error(
f"⛔ Publishing for block {current_block} failed. "
f"Retrying in {TIME_TO_SLEEP_BETWEEN_RETRIES} seconds...\n"
)
await asyncio.sleep(TIME_TO_SLEEP_BETWEEN_RETRIES)
continue
else:
logger.info(f"🫷 Block {current_block} is already processed!\n")

next_block = current_block + self.block_interval
logger.info(f"⏳ Waiting for block {next_block}...")

while True:
await asyncio.sleep(self.time_to_wait_between_block_number_polling)
new_block = await self.pragma_client.get_block_number()
if new_block >= next_block:
logger.info(f"⌛ ... reached block {new_block}!\n")
current_block = new_block
break

async def _publish_and_store(
self,
current_block: int,
) -> None:
"""
Retrieves the options from Deribit, publish the merkle root on-chain for
the network and block_number.
When done, store to Redis the merkle tree and the options used.
"""
logger.info("🔍 Fetching the deribit options...")
entries = await self.fetcher_client.fetch()
logger.info("... fetched!")

logger.info("🎣 Publishing the merkle root onchain...")
invocations = await self.pragma_client.publish_many(entries) # type: ignore[arg-type]
await self._wait_for_txs_acceptance(invocations)
logger.info("... published!")

logger.info("🏭 Storing the merkle tree & options in Redis...")
latest_data = self.deribit_fetcher.latest_data
success_store = self.redis_manager.store_block_data(
self.network, current_block, latest_data
)
if not success_store:
raise RuntimeError(f"Could not store data for block {current_block} to Redis.")
else:
logger.info("... stored!")

logger.info(f"✅ Block {current_block} done!\n")

async def _wait_for_txs_acceptance(self, invocations: List[InvokeResult]):
"""
Wait for all the transactions in the passed list to be accepted on-chain.
Raises an error if one transaction is not accepted.
"""
for invocation in invocations:
nonce = invocation.invoke_transaction.nonce
logger.info(
f" ⏳ waiting for TX {hex(invocation.hash)} (nonce={nonce}) to be accepted..."
)
await invocation.wait_for_acceptance(check_interval=1)

def _current_block_is_not_processed(
self,
block_number: int,
) -> bool:
"""
Check if the current block is already processed.
"""
EvolveArt marked this conversation as resolved.
Show resolved Hide resolved
latest_published_block = self.redis_manager.get_latest_published_block(self.network)
if latest_published_block is None:
return True
return block_number > latest_published_block
Loading
Loading