Skip to content

Commit

Permalink
fix(hubble): various fixes (#2449)
Browse files Browse the repository at this point in the history
Various fixes allowing us to bootstrap hubble on a new machine.
  • Loading branch information
KaiserKarel authored Jul 16, 2024
2 parents fbb770a + ea1a271 commit ebfad7a
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 19 deletions.
5 changes: 2 additions & 3 deletions hubble/hubble.nix
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,13 @@
text =
let
indexersJson = builtins.toJSON cfg.indexers;
datastore = if cfg.datastore-method == "hasura" then ''--hasura-admin-secret "$(head -n 1 ${cfg.api-key-file})" --url ${cfg.url}'' else ''--database-url "$(head -n 1 ${cfg.api-key-file})"'';
in
''
${pkgs.lib.getExe cfg.package} \
${datastore} \
--database-url "$(head -n 1 ${cfg.api-key-file})" \
--log-format ${cfg.log-format} \
--metrics-addr ${cfg.metrics-addr} \
--indexers '${indexersJson}'
--indexers '${indexersJson}'
'';
};
in
Expand Down
9 changes: 5 additions & 4 deletions hubble/src/beacon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ use crate::consensus::{Indexer, Querier};
#[derive(Clone, Debug, serde::Deserialize)]
pub struct Config {
pub label: String,
pub url: url::Url,
pub urls: Vec<url::Url>,
pub chain_id: String,
pub start_height: Option<i64>,
}

impl Config {
Expand All @@ -31,9 +32,9 @@ impl Config {
.retry(&ExponentialBuilder::default())
.await?;

let querier = Beacon::new(self.url, reqwest::Client::new());
let querier = Beacon::new(self.urls[0].clone(), reqwest::Client::new());

Ok(Indexer::new(chain_id, db, querier))
Ok(Indexer::new(chain_id, db, querier, self.start_height))
}
}

Expand Down Expand Up @@ -184,7 +185,7 @@ mod block {
#[serde(rename = "voluntary_exits")]
pub voluntary_exits: Vec<Value>,
#[serde(rename = "sync_aggregate")]
pub sync_aggregate: SyncAggregate,
pub sync_aggregate: Option<SyncAggregate>,
#[serde(rename = "execution_payload")]
pub execution_payload: ExecutionPayload,
#[serde(rename = "bls_to_execution_changes")]
Expand Down
15 changes: 10 additions & 5 deletions hubble/src/bera.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ use crate::consensus::{Indexer, Querier};
#[derive(Clone, Debug, serde::Deserialize)]
pub struct Config {
pub label: String,
url: url::Url,
chain_id: String,
pub urls: Vec<url::Url>,
pub chain_id: String,
pub start_height: Option<i64>,
}

impl Config {
Expand All @@ -32,10 +33,10 @@ impl Config {
.retry(&ExponentialBuilder::default())
.await?;

let client = Client::new(self.url.as_str()).await?;
let client = Client::new(self.urls[0].as_str()).await?;
let querier = Bera::new(client);

Ok(Indexer::new(chain_id, db, querier))
Ok(Indexer::new(chain_id, db, querier, self.start_height))
}
}

Expand All @@ -60,7 +61,11 @@ impl Bera {
.abci_query(
"store/beacon/key",
data,
Some((slot - 1).try_into().unwrap()),
Some(
(slot - 1)
.try_into()
.expect("converting slot to abci_query slot"),
),
prove,
)
.await?;
Expand Down
11 changes: 7 additions & 4 deletions hubble/src/chain_id_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,13 @@ pub async fn tx(db: PgPool, indexers: Indexers) {
.unwrap()
.unwrap();

let msg = <contracts::ibc_handler::CreateClientCall as ethers::abi::AbiDecode>::decode(
&tx.input,
)
.unwrap();
let msg = match <contracts::ibc_handler::CreateClientCall as ethers::abi::AbiDecode>::decode(&tx.input) {
Ok(msg) => msg,
Err(err) => {
warn!("could not decode CreateClientCall, most likely due to ABI change: {}", err);
continue
}
};

match &*msg.0.client_type {
"cometbls" => {
Expand Down
17 changes: 14 additions & 3 deletions hubble/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,35 @@ pub struct Indexer<T: Querier + Send + Sync> {
chain_id: ChainId,
pool: sqlx::PgPool,
querier: T,
start_height: Option<i64>,
}

pub trait Querier {
async fn get_execution_height(&self, height: i64) -> Result<(i64, i64)>;
}

impl<T: Querier + Send + Sync> Indexer<T> {
pub fn new(chain_id: ChainId, pool: sqlx::PgPool, querier: T) -> Self {
pub fn new(
chain_id: ChainId,
pool: sqlx::PgPool,
querier: T,
start_height: Option<i64>,
) -> Self {
Self {
chain_id,
pool,
querier,
start_height,
}
}

pub async fn index(&self) -> Result<()> {
let mut consensus_height =
crate::postgres::get_max_consensus_height(&self.pool, self.chain_id).await? + 1;
let start_height = self.start_height.unwrap_or_default();
let mut consensus_height = std::cmp::max(
crate::postgres::get_max_consensus_height(&self.pool, self.chain_id).await? + 1,
start_height,
);

loop {
info!("mapping consensus height {consensus_height}");

Expand Down

0 comments on commit ebfad7a

Please sign in to comment.