Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write the dag relationship in batch #4304

Draft
wants to merge 36 commits into
base: dag-master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
a6403bc
no store the sync dag block
jackzhhuang Nov 4, 2024
c173559
add rm sync in kube file
jackzhhuang Nov 4, 2024
48c9bac
use ghostdata to verify the dag block
jackzhhuang Nov 6, 2024
c8b259a
use ghodata if the verify is not correct
jackzhhuang Nov 6, 2024
1a6c851
use ghostdata instead of verify
jackzhhuang Nov 6, 2024
264a5d6
add verification code for discovering the blue blocks error
jackzhhuang Nov 6, 2024
3b2a145
verify and if it fails it use ghostdata
jackzhhuang Nov 7, 2024
197bc4f
bail out if the ghost data checking is wrong
jackzhhuang Nov 7, 2024
adfcb23
remove to print red blocks avoiding lots log data
jackzhhuang Nov 8, 2024
e06797d
test dag verification
jackzhhuang Nov 11, 2024
86ee296
add log to trace
jackzhhuang Nov 11, 2024
8250159
add reachability data with parents
jackzhhuang Nov 11, 2024
b0a9dde
add worker scheduler for stopping the worker gracefully
jackzhhuang Nov 7, 2024
f5b729c
tell the worker to stop before a new syncp process starts
jackzhhuang Nov 7, 2024
39aaa26
add test and log
jackzhhuang Nov 8, 2024
5b75651
add drop helper for scheduler
jackzhhuang Nov 8, 2024
573bc89
fix fmt
jackzhhuang Nov 13, 2024
e3dca70
use unordered_mergeset_without_selected_parent when storing into the …
jackzhhuang Nov 13, 2024
248b7fb
rm commented codes
jackzhhuang Nov 14, 2024
e4eff6c
add uncle test in reachability
jackzhhuang Nov 14, 2024
a516332
add worker scheduler for stopping the worker gracefully
jackzhhuang Nov 7, 2024
289c05b
tell the worker to stop before a new syncp process starts
jackzhhuang Nov 7, 2024
7000a99
add test and log
jackzhhuang Nov 8, 2024
57f73af
add drop helper for scheduler
jackzhhuang Nov 8, 2024
3ff4ab4
cancel when sync is interrupted
jackzhhuang Nov 19, 2024
71c8f01
Merge branch 'loop-break-sync' of github.com:starcoinorg/starcoin int…
jackzhhuang Nov 19, 2024
ed41063
add batch write
jackzhhuang Nov 27, 2024
d19e726
add cache and db test case
jackzhhuang Nov 27, 2024
4f15309
add batch write parent and children in dag
jackzhhuang Nov 27, 2024
63ab93a
add push the new child in parent's vec
jackzhhuang Nov 27, 2024
0685111
fix fmt
jackzhhuang Nov 27, 2024
327649c
1, fix encode bug
jackzhhuang Nov 28, 2024
216d7b4
1, flush cache if batch writing done
jackzhhuang Nov 28, 2024
aa0c54d
no use sync writing
jackzhhuang Nov 28, 2024
018e02d
fix sync_block_process
jackzhhuang Nov 29, 2024
799f99d
revert the mergeset code
jackzhhuang Nov 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions commons/stream-task/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ impl TaskEventHandle for TaskEventCounterHandle {
}

fn on_error(&self) {
println!("jacktest: on_error()");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Remove temporary debug print statement.

This appears to be accidentally committed debugging code. The println! with "jacktest" prefix is inconsistent with the codebase's logging practices and bypasses the logging framework. Additionally, this change seems unrelated to the PR's objective of implementing batch DAG relationship writing.

If additional error visibility is needed, consider using the existing logging framework:

-println!("jacktest: on_error()");
+log::debug!("Task error occurred");  // If debug-level logging is needed
// or
+log::error!("Task error occurred");  // If error-level logging is needed

Committable suggestion skipped: line range outside the PR's diff.

if let Some(counter) = self.current_counter.lock().as_ref() {
counter.error_counter.fetch_add(1, Ordering::Release);
}
Expand Down
14 changes: 8 additions & 6 deletions flexidag/src/blockdag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,12 @@ impl BlockDAG {
);
let reachability_store = self.storage.reachability_store.clone();

let mut merge_set = ghostdata
.unordered_mergeset_without_selected_parent()
.filter(|hash| self.storage.reachability_store.read().has(*hash).unwrap())
.collect::<Vec<_>>()
let mut merge_set = self
.ghost_dag_manager()
.unordered_mergeset_without_selected_parent(
ghostdata.selected_parent,
&header.parents(),
)
.into_iter();
let add_block_result = {
let mut reachability_writer = reachability_store.write();
Expand Down Expand Up @@ -482,8 +484,8 @@ impl BlockDAG {
let dag_state = self.get_dag_state(previous_pruning_point)?;
let next_ghostdata = self.ghostdata(&dag_state.tips)?;
info!(
"start to calculate the mergeset and tips for tips: {:?}, and last pruning point: {:?} and next ghostdata: {:?}",
dag_state.tips, previous_pruning_point, next_ghostdata,
"start to calculate the mergeset and tips for tips: {:?}, and last pruning point: {:?} and next ghostdata's selected parents: {:?} and blues set are {:?}",
dag_state.tips, previous_pruning_point, next_ghostdata.selected_parent, next_ghostdata.mergeset_blues,
);
let next_pruning_point = self.pruning_point_manager().next_pruning_point(
previous_pruning_point,
Expand Down
7 changes: 7 additions & 0 deletions flexidag/src/consensusdb/access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@ where
Ok(())
}

pub fn flush_cache(&self, data: &[(S::Key, S::Value)]) -> Result<(), StoreError> {
for (key, value) in data {
self.cache.insert(key.clone(), value.clone());
}
Ok(())
}

/// Write directly from an iterator and do not cache any data. NOTE: this action also clears the cache
pub fn write_many_without_cache(
&self,
Expand Down
114 changes: 57 additions & 57 deletions flexidag/src/consensusdb/consensus_relations.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use super::schema::{KeyCodec, ValueCodec};
use super::{
db::DBStorage,
prelude::{BatchDbWriter, CachedDbAccess, DirectDbWriter, StoreError},
prelude::{CachedDbAccess, StoreError},
};
use crate::define_schema;
use rocksdb::WriteBatch;
use starcoin_crypto::HashValue as Hash;
use starcoin_storage::batch::{WriteBatch, WriteBatchData, WriteBatchWithColumn};
use starcoin_storage::storage::{InnerStore, WriteOp};
use starcoin_types::blockhash::{BlockHashes, BlockLevel};
use std::collections::HashMap;
use std::sync::Arc;

/// Reader API for `RelationsStore`.
pub trait RelationsStoreReader {
fn get_parents(&self, hash: Hash) -> Result<BlockHashes, StoreError>;
Expand Down Expand Up @@ -90,41 +91,6 @@ impl DbRelationsStore {
pub fn clone_with_new_cache(&self, cache_size: usize) -> Self {
Self::new(Arc::clone(&self.db), self.level, cache_size)
}

pub fn insert_batch(
&mut self,
batch: &mut WriteBatch,
hash: Hash,
parents: BlockHashes,
) -> Result<(), StoreError> {
if self.has(hash)? {
return Err(StoreError::KeyAlreadyExists(hash.to_string()));
}

// Insert a new entry for `hash`
self.parents_access
.write(BatchDbWriter::new(batch, &self.db), hash, parents.clone())?;

// The new hash has no children yet
self.children_access.write(
BatchDbWriter::new(batch, &self.db),
hash,
BlockHashes::new(Vec::new()),
)?;

// Update `children` for each parent
for parent in parents.iter().cloned() {
let mut children = (*self.get_children(parent)?).clone();
children.push(hash);
self.children_access.write(
BatchDbWriter::new(batch, &self.db),
parent,
BlockHashes::new(children),
)?;
}

Ok(())
}
}

impl RelationsStoreReader for DbRelationsStore {
Expand All @@ -147,35 +113,69 @@ impl RelationsStoreReader for DbRelationsStore {
}

impl RelationsStore for DbRelationsStore {
/// See `insert_batch` as well
/// TODO: use one function with DbWriter for both this function and insert_batch
fn insert(&self, hash: Hash, parents: BlockHashes) -> Result<(), StoreError> {
if self.has(hash)? {
return Err(StoreError::KeyAlreadyExists(hash.to_string()));
}

// Insert a new entry for `hash`
self.parents_access
.write(DirectDbWriter::new(&self.db), hash, parents.clone())?;
let mut parent_to_children = HashMap::new();
parent_to_children.insert(hash, vec![]);

// The new hash has no children yet
self.children_access.write(
DirectDbWriter::new(&self.db),
hash,
BlockHashes::new(Vec::new()),
)?;

// Update `children` for each parent
for parent in parents.iter().cloned() {
let mut children = (*self.get_children(parent)?).clone();
let mut children = match self.get_children(parent) {
Ok(children) => (*children).clone(),
Err(e) => match e {
StoreError::KeyNotFound(_) => vec![],
_ => return std::result::Result::Err(e),
},
};
children.push(hash);
self.children_access.write(
DirectDbWriter::new(&self.db),
parent,
BlockHashes::new(children),
)?;
parent_to_children.insert(parent, children);
}

let batch = WriteBatchWithColumn {
data: vec![
WriteBatchData {
column: PARENTS_CF.to_string(),
row_data: WriteBatch::new_with_rows(vec![(
hash.to_vec(),
WriteOp::Value(
<Arc<Vec<Hash>> as ValueCodec<RelationParent>>::encode_value(&parents)?,
),
)]),
},
WriteBatchData {
column: CHILDREN_CF.to_string(),
row_data: WriteBatch::new_with_rows(
parent_to_children
.iter()
.map(|(key, value)| {
std::result::Result::Ok((
key.to_vec(),
WriteOp::Value(<Arc<Vec<Hash>> as ValueCodec<
RelationChildren,
>>::encode_value(
&Arc::new(value.clone())
)?),
))
})
.collect::<std::result::Result<Vec<_>, StoreError>>()?,
),
},
],
};
self.db
.write_batch_with_column(batch)
.map_err(|e| StoreError::DBIoError(e.to_string()))?;
Comment on lines +167 to +169
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance error handling for database operations

The current error handling converts all database errors to a generic string. Consider preserving more error context.

-           .map_err(|e| StoreError::DBIoError(e.to_string()))?;
+           .map_err(|e| StoreError::DBIoError(format!("Failed to write batch: {}", e)))?;

Also consider adding:

  1. Transaction rollback capability in case of partial failures
  2. Logging of the specific operations that failed

Committable suggestion skipped: line range outside the PR's diff.


self.parents_access.flush_cache(&[(hash, parents)])?;
self.children_access.flush_cache(
&parent_to_children
.into_iter()
.map(|(key, value)| (key, BlockHashes::new(value)))
.collect::<Vec<_>>(),
)?;

Ok(())
}
}
Expand Down
35 changes: 27 additions & 8 deletions flexidag/src/ghostdag/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,22 +210,29 @@ impl<
}
}

let remote_blue_set = blue_blocks
.iter()
.map(|header| header.id())
.collect::<HashSet<_>>();
if new_block_data
.mergeset_blues
.iter()
.skip(1)
.cloned()
.collect::<HashSet<_>>()
!= blue_blocks
!= remote_blue_set
{
warn!("The data of blue set is not equal when executing the block: {:?}, for {:?}, checking data: {:?}", header.id(), blue_blocks.iter().map(|header| header.id()).collect::<Vec<_>>(), new_block_data.mergeset_blues);
let ghostdata = self.ghostdag(&header.parents_hash())?;
if ghostdata
.mergeset_blues
.iter()
.map(|header| header.id())
.skip(1)
.cloned()
.collect::<HashSet<_>>()
{
if header.number() < 10000000 {
// no bail before 10000000
warn!("The data of blue set is not equal when executing the block: {:?}, for {:?}, checking data: {:?}", header.id(), blue_blocks.iter().map(|header| header.id()).collect::<Vec<_>>(), new_block_data.mergeset_blues);
} else {
bail!("The data of blue set is not equal when executing the block: {:?}, for {:?}, checking data: {:?}", header.id(), blue_blocks.iter().map(|header| header.id()).collect::<Vec<_>>(), new_block_data.mergeset_blues);
!= remote_blue_set
{
bail!("The ghost data of blue set is not equal when executing the block: {:?}, for {:?}, checking data: {:?}", header.id(), blue_blocks.iter().map(|header| header.id()).collect::<Vec<_>>(), ghostdata.mergeset_blues);
}
}

Expand Down Expand Up @@ -377,12 +384,20 @@ impl<
*candidate_blue_anticone_size = (*candidate_blue_anticone_size).checked_add(1).unwrap();
if *candidate_blue_anticone_size > self.k {
// k-cluster violation: The candidate's blue anticone exceeded k
info!(
"Checking blue candidate: {} failed, blue anticone exceeded k",
blue_candidate
);
return Ok(ColoringState::Red);
}

if *candidate_blues_anticone_sizes.get(&block).unwrap() == self.k {
// k-cluster violation: A block in candidate's blue anticone already
// has k blue blocks in its own anticone
info!(
"Checking blue candidate: {} failed, block {} has k blue blocks in its anticone",
blue_candidate, block
);
return Ok(ColoringState::Red);
}

Expand Down Expand Up @@ -431,6 +446,10 @@ impl<
// The maximum length of new_block_data.mergeset_blues can be K+1 because
// it contains the selected parent.
if new_block_data.mergeset_blues.len() as KType == self.k.checked_add(1).unwrap() {
info!(
"Checking blue candidate: {} failed, mergeset blues size is K+1",
blue_candidate
);
return Ok(ColoringOutput::Red);
}

Expand Down
5 changes: 5 additions & 0 deletions flexidag/src/prune/pruning_point_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ impl<T: ReachabilityStoreReader + Clone> PruningPointManagerT<T> {
min_required_blue_score_for_next_pruning_point
);

debug!("previous_pruning_point: {:?}, previous_ghostdata: {:?}, next_ghostdata: {:?}, pruning_depth: {:?}, pruning_finality: {:?}",
previous_pruning_point, previous_ghostdata, next_ghostdata,
pruning_depth, pruning_finality,
);

let mut latest_pruning_ghost_data = previous_ghostdata.to_compact();
if min_required_blue_score_for_next_pruning_point + pruning_depth
<= next_ghostdata.blue_score
Expand Down
Loading
Loading