From 5064f6c5473951460eaf58d4ca455530de10f475 Mon Sep 17 00:00:00 2001 From: Bruno Wu Date: Mon, 16 Oct 2023 10:36:13 +0100 Subject: [PATCH] Minor fixes to get BQ running --- products/pdt/.gitignore | 4 +--- products/pdt/Cargo.toml | 1 + products/pdt/pdt/src/main.rs | 10 ++++++---- products/pdt/pdtbq/src/meta.rs | 6 +++--- products/pdt/pdtbq/src/tracked.rs | 2 +- products/pdt/pdtparse/src/events.rs | 4 ++++ products/pdt/pdtpsql/src/utils.rs | 1 + 7 files changed, 17 insertions(+), 11 deletions(-) diff --git a/products/pdt/.gitignore b/products/pdt/.gitignore index 130e74d72..629602c99 100644 --- a/products/pdt/.gitignore +++ b/products/pdt/.gitignore @@ -1,6 +1,4 @@ target/ .vscode/ # Target for downloaded files -downloads/ -# Stores download metadata -meta.json +data/ diff --git a/products/pdt/Cargo.toml b/products/pdt/Cargo.toml index d568add41..b5b71951d 100644 --- a/products/pdt/Cargo.toml +++ b/products/pdt/Cargo.toml @@ -1,4 +1,5 @@ [workspace] +resolver = "2" members = [ "pdtlib", "pdtbq", diff --git a/products/pdt/pdt/src/main.rs b/products/pdt/pdt/src/main.rs index c42a6d087..b9297c138 100644 --- a/products/pdt/pdt/src/main.rs +++ b/products/pdt/pdt/src/main.rs @@ -26,7 +26,7 @@ struct Cli { #[arg(long, default_value = "/home/rrw/tmp/unpacked")] unpack_dir: String, - #[arg(long, default_value = "testnet-901")] + #[arg(long, default_value = "testnet-925")] network: String, #[arg(long, value_enum)] @@ -139,7 +139,7 @@ struct ListenOptions { #[arg(long)] dataset_id: String, - #[arg(long, default_value = "rrw-bigquery-test-id")] + #[arg(long, default_value = "prj-c-data-analytics-3xs14wez")] project_id: String, #[arg(long)] @@ -163,8 +163,10 @@ async fn download_persistence( } else { if network_name.starts_with("mainnet-") { &NetworkType::Mainnet - } else { + } else if network_name.starts_with("testnet-") { &NetworkType::Testnet + } else { + panic!("Unexpected network type") } }; let bucket_name = match network_type { @@ -184,7 +186,7 @@ async fn download_persistence( incr.save_meta(max_block)?; println!("Max block {}", max_block); - let render = Renderer::new("testnet-901", download_dir, unpack_dir)?; + let render = Renderer::new(network_name, download_dir, unpack_dir)?; let recovery_points = render.get_recovery_points()?; println!( "persistence blocks: {:?} , state blocks: {:?}", diff --git a/products/pdt/pdtbq/src/meta.rs b/products/pdt/pdtbq/src/meta.rs index e29de7f51..b9d3fb981 100644 --- a/products/pdt/pdtbq/src/meta.rs +++ b/products/pdt/pdtbq/src/meta.rs @@ -99,7 +99,7 @@ impl MetaTable for Meta { start: i64, nr_blks: i64, ) -> Result> { - let query = format!("SELECT start_blk,nr_blks,client_id FROM {} WHERE start_blk <= {} and nr_blks >= {} - start_blk ORDER BY nr_blks DESC LIMIT 1", + let query = format!("SELECT start_blk,nr_blks,client_id FROM `{}` WHERE start_blk <= {} and nr_blks >= {} - start_blk ORDER BY nr_blks DESC LIMIT 1", self.table.get_table_desc(), start, nr_blks + start); let mut result = client .job() @@ -186,7 +186,7 @@ impl MetaTable for Meta { let mut result = client .job() .query(&self.table.dataset.project_id, - QueryRequest::new(format!("SELECT start_blk, nr_blks FROM {} WHERE start_blk >= {} AND nr_blks > 0 ORDER BY start_blk ASC, nr_blks DESC LIMIT 1", + QueryRequest::new(format!("SELECT start_blk, nr_blks FROM `{}` WHERE start_blk >= {} AND nr_blks > 0 ORDER BY start_blk ASC, nr_blks DESC LIMIT 1", self.table.get_table_desc(), blk_to_find))).await?; if result.next_row() { let start_blk = result @@ -211,7 +211,7 @@ impl MetaTable for Meta { let _ = client .job() .query(&self.table.dataset.project_id, - QueryRequest::new(format!("INSERT INTO {} (client_id, event_stamp, start_blk, nr_blks) VALUES (\"{}\",CURRENT_DATETIME(), {}, {})", + QueryRequest::new(format!("INSERT INTO `{}` (client_id, event_stamp, start_blk, nr_blks) VALUES (\"{}\",CURRENT_DATETIME(), {}, {})", self.table.get_table_desc(), self.coords.client_id, range.start, range.end-range.start))) .await?; diff --git a/products/pdt/pdtbq/src/tracked.rs b/products/pdt/pdtbq/src/tracked.rs index 3918016c9..5e0569978 100644 --- a/products/pdt/pdtbq/src/tracked.rs +++ b/products/pdt/pdtbq/src/tracked.rs @@ -57,7 +57,7 @@ impl Trackable for TrackedTable { ) -> Result<(i64, i64)> { let mut result = client.job() .query(&self.location.dataset.project_id, - QueryRequest::new(format!("SELECT block,offset_in_block FROM {} WHERE block >= {} AND block < {} ORDER BY block DESC, offset_in_block DESC LIMIT 1", + QueryRequest::new(format!("SELECT block,offset_in_block FROM `{}` WHERE block >= {} AND block < {} ORDER BY block DESC, offset_in_block DESC LIMIT 1", self.location.get_table_desc(), blks.start, blks.end))).await?; if result.next_row() { // There was one!1 diff --git a/products/pdt/pdtparse/src/events.rs b/products/pdt/pdtparse/src/events.rs index a0543b721..022f36e2f 100644 --- a/products/pdt/pdtparse/src/events.rs +++ b/products/pdt/pdtparse/src/events.rs @@ -19,6 +19,7 @@ pub enum ZRC2Event { pub struct MintedEvent { txn_id: String, block: i64, + #[allow(dead_code)] minter: String, recipient: String, amount: Decimal, @@ -53,6 +54,7 @@ impl LedgerInsertable for MintedEvent { pub struct BurntEvent { txn_id: String, block: i64, + #[allow(dead_code)] burner: String, burn_account: String, amount: Decimal, @@ -124,6 +126,7 @@ impl LedgerInsertable for TransferSuccessEvent { pub struct TransferFromSuccess { txn_id: String, block: i64, + #[allow(dead_code)] initiator: String, sender: String, recipient: String, @@ -165,6 +168,7 @@ impl LedgerInsertable for TransferFromSuccess { pub struct OperatorSendSuccess { txn_id: String, block: i64, + #[allow(dead_code)] initiator: String, sender: String, recipient: String, diff --git a/products/pdt/pdtpsql/src/utils.rs b/products/pdt/pdtpsql/src/utils.rs index b389fdc1a..b59e63aab 100644 --- a/products/pdt/pdtpsql/src/utils.rs +++ b/products/pdt/pdtpsql/src/utils.rs @@ -4,6 +4,7 @@ use anyhow::Result; use sqlx::{postgres::types::PgRange, query_as, PgPool}; #[derive(Debug)] +#[allow(dead_code)] pub struct SchemaColumn { column_name: Option, data_type: Option,