Skip to content

Commit

Permalink
Add config option for cache stores
Browse files Browse the repository at this point in the history
  • Loading branch information
encalypto committed Nov 23, 2024
1 parent 6b48bfd commit 08323d7
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 8 deletions.
6 changes: 5 additions & 1 deletion chain/substreams/src/trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,11 @@ where
logger,
);

state.entity_cache.set(key, entity)?;
state.entity_cache.set(
key,
entity,
Some(&mut state.write_capacity_remaining),
)?;
}
ParsedChanges::Delete(entity_key) => {
let entity_type = entity_key.entity_type.cheap_clone();
Expand Down
2 changes: 1 addition & 1 deletion core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1607,7 +1607,7 @@ async fn update_proof_of_indexing(
data.push((entity_cache.schema.poi_block_time(), block_time));
}
let poi = entity_cache.make_entity(data)?;
entity_cache.set(key, poi)
entity_cache.set(key, poi, None)
}

let _section_guard = stopwatch.start_section("update_proof_of_indexing");
Expand Down
22 changes: 20 additions & 2 deletions graph/src/components/store/entity_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::cheap_clone::CheapClone;
use crate::components::store::write::EntityModification;
use crate::components::store::{self as s, Entity, EntityOperation};
use crate::data::store::{EntityValidationError, Id, IdType, IntoEntityIterator};
use crate::prelude::ENV_VARS;
use crate::prelude::{CacheWeight, ENV_VARS};
use crate::schema::{EntityKey, InputSchema};
use crate::util::intern::Error as InternError;
use crate::util::lfu_cache::{EvictStats, LfuCache};
Expand Down Expand Up @@ -349,10 +349,28 @@ impl EntityCache {
/// with existing data. The entity will be validated against the
/// subgraph schema, and any errors will result in an `Err` being
/// returned.
pub fn set(&mut self, key: EntityKey, entity: Entity) -> Result<(), anyhow::Error> {
pub fn set(
&mut self,
key: EntityKey,
entity: Entity,
write_capacity_remaining: Option<&mut usize>,
) -> Result<(), anyhow::Error> {
// check the validate for derived fields
let is_valid = entity.validate(&key).is_ok();

if let Some(write_capacity_remaining) = write_capacity_remaining {
let weight = entity.weight();

if !self.current.contains_key(&key) && weight > *write_capacity_remaining {
return Err(anyhow!(
"exceeded block write limit when writing entity `{}`",
key.entity_id,
));
}

*write_capacity_remaining -= weight;
}

self.entity_op(key.clone(), EntityOp::Update(entity));

// The updates we were given are not valid by themselves; force a
Expand Down
8 changes: 7 additions & 1 deletion graph/src/components/subgraph/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ pub struct BlockState {
in_handler: bool,

pub metrics: BlockStateMetrics,

pub write_capacity_remaining: usize,
}

impl BlockState {
Expand All @@ -94,6 +96,7 @@ impl BlockState {
processed_data_sources: Vec::new(),
in_handler: false,
metrics: BlockStateMetrics::new(),
write_capacity_remaining: ENV_VARS.block_write_capacity,
}
}
}
Expand All @@ -111,6 +114,7 @@ impl BlockState {
processed_data_sources,
in_handler,
metrics,
write_capacity_remaining,
} = self;

match in_handler {
Expand All @@ -121,7 +125,9 @@ impl BlockState {
entity_cache.extend(other.entity_cache);
processed_data_sources.extend(other.processed_data_sources);
persisted_data_sources.extend(other.persisted_data_sources);
metrics.extend(other.metrics)
metrics.extend(other.metrics);
*write_capacity_remaining =
write_capacity_remaining.saturating_sub(other.write_capacity_remaining);
}

pub fn has_errors(&self) -> bool {
Expand Down
5 changes: 5 additions & 0 deletions graph/src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ pub struct EnvVars {
///
/// Defaults to an empty list, which means that this feature is enabled for all chains;
pub firehose_disable_extended_blocks_for_chains: Vec<String>,

pub block_write_capacity: usize,
}

impl EnvVars {
Expand Down Expand Up @@ -327,6 +329,7 @@ impl EnvVars {
Self::firehose_disable_extended_blocks_for_chains(
inner.firehose_disable_extended_blocks_for_chains,
),
block_write_capacity: inner.block_write_capacity.0,
})
}

Expand Down Expand Up @@ -488,6 +491,8 @@ struct Inner {
graphman_server_auth_token: Option<String>,
#[envconfig(from = "GRAPH_NODE_FIREHOSE_DISABLE_EXTENDED_BLOCKS_FOR_CHAINS")]
firehose_disable_extended_blocks_for_chains: Option<String>,
#[envconfig(from = "GRAPH_NODE_BLOCK_WRITE_CAPACITY", default = "4_000_000_000")]
block_write_capacity: NoUnderscores<usize>,
}

#[derive(Clone, Debug)]
Expand Down
4 changes: 3 additions & 1 deletion runtime/wasm/src/host_exports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,9 @@ impl HostExports {

state.metrics.track_entity_write(&entity_type, &entity);

state.entity_cache.set(key, entity)?;
state
.entity_cache
.set(key, entity, Some(&mut state.write_capacity_remaining))?;

Ok(())
}
Expand Down
3 changes: 1 addition & 2 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -874,8 +874,7 @@ impl DeploymentStore {
}
}

/// Methods that back the trait `graph::components::Store`, but have small
/// variations in their signatures
/// Methods that back the trait `WritableStore`, but have small variations in their signatures
impl DeploymentStore {
pub(crate) async fn block_ptr(&self, site: Arc<Site>) -> Result<Option<BlockPtr>, StoreError> {
let site = site.cheap_clone();
Expand Down

0 comments on commit 08323d7

Please sign in to comment.