Skip to content

Commit

Permalink
Merge branch 'blockchain-etl:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
katiebk authored Jul 10, 2024
2 parents 1ed0376 + 6aeeb57 commit d46b58f
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 1 deletion.
20 changes: 20 additions & 0 deletions .envrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
set -e

# Ensure pyenv is initialized
if command -v pyenv >/dev/null; then
eval "$(pyenv init -)"
fi
# Set the local Python version
if ! pyenv local $(cat .python-version); then
log_error "Failed to activate pyenv version $(cat .python-version)."
log_error "Please ensure pyenv works and has the required python version:"
log_error "# pyenv install $(cat .python-version) && direnv allow"
exit 1
fi
# Create a virtual environment if it doesn't exist
if [ ! -d ".venv" ]; then
python -m venv .venv
echo "Created virtual environment in .venv"
fi
# Activate the virtual environment
source .venv/bin/activate
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,4 @@ voxometadata/secrets
# Env vars
.env

.python-version
.venv
1 change: 1 addition & 0 deletions .python-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.8.12
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@

Read this article: https://cloud.google.com/blog/products/data-analytics/ethereum-bigquery-how-we-built-dataset

## Local Development Prerequisites

- direnv
- pyenv


We are using direnv to automatically set up and load the correct python version. We also create a venv in the root folder,
that is automatically activated when entering the project folder.

## Setting up Airflow DAGs using Google Cloud Composer

### Create BigQuery Datasets
Expand Down Expand Up @@ -118,8 +127,27 @@ The instance requires the `CLOUDSDK_CORE_PROJECT` environment variable to be set

Read this article: https://medium.com/@medvedev1088/query-ens-and-0x-events-with-sql-in-google-bigquery-4d197206e644

### Debugging Table Defenition Files

A utility script for debugging and verifying contract parsing in Ethereum data processing pipelines is available. You can simply run

```
python3 generate_parse_sql.py <path_to_table_definition_file> <date>
```

This will output some example SQL that can be used to debug if the generated json files from the contract parser are correct.

NOTE: certain files may not have the `contract_address` field specified as a valid address [ERC20Pool_event_TransferLP](dags/resources/stages/parse/table_definitions/ajna_v2/ERC20Pool_event_TransferLP.json) but use a select statement on another table instead. For these you can simply pass the contract address yourself like below:

```
python3 generate_parse_sql.py <path_to_table_definition_file> <date> --contract_address <contract_address>
```



### More Information

You can follow the instructions here for Polygon DAGs https://github.com/blockchain-etl/polygon-etl. The architecture
there is very similar to Ethereum so in most case substituting `polygon` for `ethereum` will work. Contributions
to this README file for porting documentation from Polygon to Ethereum are welcome.

167 changes: 167 additions & 0 deletions generate_parse_sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import json
import sys
import argparse
from datetime import datetime
from web3 import Web3

def validate_and_checksum_address(address):
if address is None:
return None
try:
return Web3.toChecksumAddress(address)
except ValueError:
print(f"Error: Invalid contract address: {address}")
sys.exit(1)

def calculate_signature(abi):
if isinstance(abi, str):
abi = json.loads(abi)

if abi['type'] == 'event':
return Web3.keccak(text=f"{abi['name']}({','.join([input['type'] for input in abi['inputs']])})").hex()
elif abi['type'] == 'function':
return Web3.keccak(text=f"{abi['name']}({','.join([input['type'] for input in abi['inputs']])})").hex()[:10]
else:
raise ValueError("ABI must be for an event or function")

def generate_sql_from_json(json_file_path, date, override_contract_address=None):
with open(json_file_path, 'r') as file:
data = json.load(file)

# Use the override_contract_address if provided, otherwise use the one from the JSON file
contract_address = override_contract_address or data['parser'].get('contract_address')

# Validate and convert contract address
if contract_address:
contract_address = validate_and_checksum_address(contract_address)
# Update the contract_address in the data
data['parser']['contract_address'] = contract_address
else:
print("Error: No valid contract address provided. Please use --contract_address argument.")
sys.exit(1)

json_string = json.dumps(data).replace("'", "''")
signature = calculate_signature(data['parser']['abi'])
parser_type = data['parser']['type']

if parser_type == 'log':
sql = generate_log_sql(json_string, signature, date)
elif parser_type == 'trace':
sql = generate_trace_sql(json_string, signature, date)
else:
raise ValueError(f"Unsupported parser type: {parser_type}")

return sql

def generate_log_sql(json_string, signature, date):
return f"""
WITH
abi AS
(
SELECT
JSON_QUERY(json_data, '$.parser.abi') AS abi,
JSON_QUERY(json_data, '$.parser.field_mapping') AS field_mapping,
JSON_QUERY(json_data, '$.table') AS table_details,
JSON_EXTRACT_SCALAR(json_data, '$.parser.contract_address') AS contract_address,
CAST(JSON_EXTRACT_SCALAR(json_data, '$.parser.type') AS STRING) AS parser_type
FROM (
SELECT '{json_string}' AS json_data
)
),
details AS (
SELECT
'{signature}' AS sig,
abi.*
FROM abi
),
logs AS (
SELECT
l.*,
a.sig,
`blockchain-etl-internal.common.parse_log`(
l.data,
l.topics,
REPLACE(a.abi, "'", '"')
) AS parsed_log
FROM
`bigquery-public-data.crypto_ethereum.logs` AS l
INNER JOIN
details AS a
ON
IFNULL(l.topics[SAFE_OFFSET(0)], "") = a.sig
WHERE
DATE(l.block_timestamp) = DATE("{date}")
AND (a.contract_address IS NULL OR l.address = LOWER(a.contract_address))
)
SELECT * FROM logs
LIMIT 100
"""

def generate_trace_sql(json_string, signature, date):
return f"""
WITH
abi AS
(
SELECT
JSON_QUERY(json_data, '$.parser.abi') AS abi,
JSON_QUERY(json_data, '$.parser.field_mapping') AS field_mapping,
JSON_QUERY(json_data, '$.table') AS table_details,
JSON_EXTRACT_SCALAR(json_data, '$.parser.contract_address') AS contract_address,
CAST(JSON_EXTRACT_SCALAR(json_data, '$.parser.type') AS STRING) AS parser_type
FROM (
SELECT '{json_string}' AS json_data
)
),
details AS (
SELECT
'{signature}' AS sig,
abi.*
FROM abi
),
traces AS (
SELECT
t.*,
a.sig,
`blockchain-etl-internal.common.parse_trace`(
t.input,
REPLACE(a.abi, "'", '"')
) AS parsed_trace
FROM
`bigquery-public-data.crypto_ethereum.traces` AS t
INNER JOIN
details AS a
ON
STARTS_WITH(t.input, a.sig)
WHERE
DATE(t.block_timestamp) = DATE("{date}")
AND (a.contract_address IS NULL OR t.to_address = LOWER(a.contract_address))
AND t.status = 1 -- Only successful calls
AND t.call_type = 'call' -- Only direct calls
)
SELECT * FROM traces
LIMIT 100
"""

if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Generate SQL from JSON file")
parser.add_argument("json_file_path", help="Path to the JSON file")
parser.add_argument("date", help="Date in YYYY-MM-DD format")
parser.add_argument("--contract_address", help="Override contract address")

args = parser.parse_args()

try:
# Validate the date format
datetime.strptime(args.date, '%Y-%m-%d')
except ValueError:
print("Error: Date should be in the format YYYY-MM-DD")
sys.exit(1)

sql = generate_sql_from_json(args.json_file_path, args.date, args.contract_address)
print(sql)

0 comments on commit d46b58f

Please sign in to comment.