Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cache for unsupported transactions when parsing raw mempool #2855

Merged
merged 4 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 43 additions & 2 deletions counterparty-core/counterpartycore/lib/follow.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import logging
import os
import struct
import threading
import time
Expand Down Expand Up @@ -190,7 +191,8 @@
# parse mempool block if needed
if self.need_to_parse_mempool_block():
# parse mempool block
mempool.parse_mempool_transactions(self.db, self.mempool_block)
not_supported = mempool.parse_mempool_transactions(self.db, self.mempool_block)
NotSupportedTransactionsCache().add(not_supported)
self.last_mempool_parsing_time = time.time()
# reset mempool block
self.mempool_block = []
Expand Down Expand Up @@ -258,9 +260,10 @@
logger.trace(
f"Processing {len(mempool_block)} transaction(s) from the raw mempool..."
)
mempool.parse_mempool_transactions(
not_supported_tx_hashes = mempool.parse_mempool_transactions(
self.db, mempool_block, timestamps=self.mempool_parser.timestamps
)
NotSupportedTransactionsCache().add(not_supported_tx_hashes)
else:
# sequence topic
await self.receive_multipart(self.zmq_sub_socket_sequence, "sequence")
Expand Down Expand Up @@ -319,6 +322,8 @@
cursor = db.cursor()
txhash_list = []
for txid, tx_info in raw_mempool.items():
if NotSupportedTransactionsCache().is_not_supported(txid):
continue
existing_tx_in_mempool = cursor.execute(
"SELECT * FROM mempool WHERE tx_hash = ? LIMIT 1", (txid,)
).fetchone()
Expand Down Expand Up @@ -362,3 +367,39 @@
logger.debug("Stopping RawMempoolParser...")
self.stop_event.set()
self.join()


class NotSupportedTransactionsCache(metaclass=util.SingletonMeta):
def __init__(self):
self.not_suppported_txs = []
self.cache_path = os.path.join(
config.CACHE_DIR, f"not_supported_tx_cache.{config.NETWORK_NAME}.txt"
)
self.restore()

def restore(self):
if os.path.exists(self.cache_path):
with open(self.cache_path, "r") as f:

Check warning

Code scanning / pylint

Using open without explicitly specifying an encoding.

Using open without explicitly specifying an encoding.
self.not_suppported_txs = [line.strip() for line in f]
logger.debug(
f"Restored {len(self.not_suppported_txs)} not supported transactions from cache"
)

def backup(self):
with open(self.cache_path, "w") as f:
Fixed Show fixed Hide fixed

Check warning

Code scanning / pylint

Using open without explicitly specifying an encoding.

Using open without explicitly specifying an encoding.
f.write("\n".join(self.not_suppported_txs[-200000:])) # limit to 200k txs
logger.trace(
f"Backed up {len(self.not_suppported_txs)} not supported transactions to cache"
)

def clear(self):
self.not_suppported_txs = []
if os.path.exists(self.cache_path):
os.remove(self.cache_path)

def add(self, more_not_supported_txs):
self.not_suppported_txs += more_not_supported_txs
self.backup()

def is_not_supported(self, tx_hash):
return tx_hash in self.not_suppported_txs
6 changes: 6 additions & 0 deletions counterparty-core/counterpartycore/lib/mempool.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def parse_mempool_transactions(db, raw_tx_list, timestamps=None):
now = time.time()
transaction_events = []
cursor = db.cursor()
not_supported_txs = []
try:
with db:
# insert fake block
Expand Down Expand Up @@ -46,6 +47,7 @@ def parse_mempool_transactions(db, raw_tx_list, timestamps=None):
for raw_tx in raw_tx_list:
decoded_tx = deserialize.deserialize_tx(raw_tx, use_txid=True)
existing_tx = ledger.get_transaction(db, decoded_tx["tx_hash"])
not_supported_txs.append(decoded_tx["tx_hash"])
if existing_tx:
logger.trace(f"Transaction {decoded_tx['tx_hash']} already in the database")
continue
Expand Down Expand Up @@ -82,6 +84,9 @@ def parse_mempool_transactions(db, raw_tx_list, timestamps=None):
except exceptions.MempoolError:
# save events in the mempool table
for event in transaction_events:
if event["tx_hash"] in not_supported_txs:
not_supported_txs.remove(event["tx_hash"])

if timestamps:
event["timestamp"] = timestamps.get(event["tx_hash"], now)
else:
Expand All @@ -105,6 +110,7 @@ def parse_mempool_transactions(db, raw_tx_list, timestamps=None):
)
logger.trace("Mempool transaction parsed successfully.")
util.PARSING_MEMPOOL = False
return not_supported_txs


def clean_transaction_events(db, tx_hash):
Expand Down
5 changes: 5 additions & 0 deletions counterparty-core/counterpartycore/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ def initialise_config(
os.makedirs(data_dir, mode=0o755)
config.DATA_DIR = data_dir

config.CACHE_DIR = appdirs.user_cache_dir(appauthor=config.XCP_NAME, appname=config.APP_NAME)
if not os.path.isdir(config.CACHE_DIR):
os.makedirs(config.CACHE_DIR, mode=0o755)

# testnet
if testnet:
config.TESTNET = testnet
Expand Down Expand Up @@ -888,6 +892,7 @@ def rollback(block_index=None):
try:
blocks.rollback(ledger_db, block_index=block_index)
dbbuilder.rollback_state_db(state_db, block_index)
follow.NotSupportedTransactionsCache().clear()
finally:
database.optimize(ledger_db)
database.optimize(state_db)
Expand Down
1 change: 1 addition & 0 deletions release-notes/release-notes-v10.9.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

- Refactor raw mempool parsing; Don't block following
- Add a timeout to parse mempool transaction from ZMQ
- Add cache for unsupported transactions when parsing raw mempool

## API

Expand Down
Loading