Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(listener): use available cores to help other CUs #2163

Merged
merged 5 commits into from
Mar 13, 2024

Conversation

justprosh
Copy link
Member

@justprosh justprosh commented Mar 12, 2024

  • improve logs in case of subscription failure
  • use released ccp crates
  • all units not involved in deals participate in CC calculation

@justprosh justprosh added the e2e Run e2e workflow label Mar 12, 2024
@justprosh justprosh requested a review from folex March 12, 2024 20:44
@folex folex requested a review from mikevoronov March 12, 2024 20:44
persisted_proof_id_dir: PathBuf,

unit_activated: Option<Subscription<Log>>,
unit_deactivated: Option<Subscription<Log>>,
unit_activated: Option<Subscription<JsonValue>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

damn. Any way to add specific type there? Subscription to json doesn't sound very appealing :D

maybe you can .map it, and store mapped?

Comment on lines 679 to +685
let event = event.ok_or(eyre!(
"Failed to process CommitmentActivated event: got None"
))?;
let cc_event = parse_log::<CommitmentActivatedData, CommitmentActivated>(event?)?;
))??;
let log = serde_json::from_value::<Log>(event.clone()).map_err(|err| {
tracing::error!(target: "chain-listener", "Failed to parse CommitmentActivated event: {err}, data: {event}");
err
})?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DRY?

} else {
// Will be activated on the `start_epoch`
self.pending_compute_units.insert(unit_event.info.into());
}

self.refresh_commitment().await?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add a comment on why you do it here

@@ -786,11 +808,43 @@ impl ChainListener {
tracing::info!(target: "chain-listener", "Global nonce: {}", self.global_nonce);
tracing::info!(target: "chain-listener", "Difficulty: {}", self.difficulty);
if let Some(ref ccp_client) = self.ccp_client {
let cores = self.acquire_active_units()?;
let mut cores = self.acquire_active_units()?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cores... what? free, busy, to be used?

Comment on lines +828 to +834
for unit in self.active_compute_units.iter().cycle() {
if let Some(core) = available_cores.pop_first() {
cores.insert(core, *unit);
} else {
break;
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens here? let's leave comments

Comment on lines +846 to +847
self.global_nonce,
self.difficulty,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation

@@ -832,9 +881,11 @@ impl ChainListener {
Ok(())
}

/// Should be called only if Commitment is Inactive, Failed, Removed or not exists
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add debug_assert on that?

Comment on lines 1120 to +1121
}
fn get_available_cores(&self) -> eyre::Result<BTreeSet<PhysicalCoreId>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
}
fn get_available_cores(&self) -> eyre::Result<BTreeSet<PhysicalCoreId>> {
}
fn get_available_cores(&self) -> eyre::Result<BTreeSet<PhysicalCoreId>> {

self.core_manager.acquire_worker_core(AcquireRequest::new(available_units, WorkType::CapacityCommitment))
.map(|acquired| acquired.physical_core_ids)
.map_err(|err| {
tracing::error!(target: "chain-listener", "Failed to acquire cores for active units: {err}");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are the units active, pending or available? I'm confused:

let available = self.pending
log("failed to acquire for active")

too many names, and I don't see how they can be composed

@@ -1069,4 +1118,13 @@ impl ChainListener {
self.active_deals.remove(deal_id);
Ok(())
}
fn get_available_cores(&self) -> eyre::Result<BTreeSet<PhysicalCoreId>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this function do?

Name suggests it's effectless, but body suggests it affects CoreManager's state.

@justprosh
Copy link
Member Author

Comments will be addressed in next PR

@justprosh justprosh merged commit fe423ff into master Mar 13, 2024
14 checks passed
@justprosh justprosh deleted the assign_additional_core branch March 13, 2024 15:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
e2e Run e2e workflow
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants