Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Create initial Coordinator V2 service #444

Merged
merged 46 commits into from
Jan 11, 2024
Merged
Changes from 1 commit
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
8e6e95d
feat: Create crate for coordinator control service
morgsmccauley Dec 12, 2023
c2f857b
feat: Fetch and print registry contract
morgsmccauley Dec 12, 2023
dd26099
feat: Used shared registry types
morgsmccauley Dec 14, 2023
025b315
refactor: Extract registry fetch to own mod
morgsmccauley Dec 14, 2023
3e05d9a
refactor: Use structured logging
morgsmccauley Dec 17, 2023
4ba5296
feat: Start all streams with stubbed data
morgsmccauley Dec 18, 2023
126296d
feat: Start stream with filter specified in registry
morgsmccauley Dec 18, 2023
64d835b
feat: Add `update_at`/`created_at` fields to registry
morgsmccauley Dec 18, 2023
c91f814
chore: Temporarily mock registry
morgsmccauley Dec 18, 2023
b19c607
feat: Write `last_indexed_block` per block stream
morgsmccauley Dec 18, 2023
0db45b0
feat: Start block stream from desired height
morgsmccauley Dec 18, 2023
a849a1a
refactor: Abstract block stream handling away
morgsmccauley Dec 18, 2023
32ca6b7
refactor: Allow mocking of `Registry`
morgsmccauley Dec 18, 2023
34b21f7
refactor: Allow mocking of `BlockStreamHandler`
morgsmccauley Dec 18, 2023
574b629
test: Add unit test for control loop
morgsmccauley Dec 18, 2023
b8cbcee
refactor: Rename for clarity
morgsmccauley Dec 18, 2023
19fe056
test: Test starting of block streams
morgsmccauley Dec 18, 2023
dd4f587
feat: Add `version` to block streams
morgsmccauley Dec 18, 2023
d011a22
feat: Write block stream version from coordinator
morgsmccauley Dec 18, 2023
bb157e7
feat: Stop streams not in registry
morgsmccauley Dec 19, 2023
fbc71d5
feat: Ignores streams with matching versions
morgsmccauley Dec 19, 2023
b803a03
feat: Restart streams with mismatched versions
morgsmccauley Dec 19, 2023
e0c3714
refactor: Avoid `.clone()`
morgsmccauley Dec 21, 2023
09cca2b
feat: Add support for `ActionFunctionCallRule`
morgsmccauley Dec 21, 2023
124cec9
feat: Log block stream requests
morgsmccauley Dec 21, 2023
a0568ed
fix: Map correct status values
morgsmccauley Dec 21, 2023
b70b25d
feat: Skip historical/delta lake processing for function/event rules
morgsmccauley Dec 21, 2023
4a81033
chore: Pin `near-lake-framework` to `0.7.5`
morgsmccauley Dec 21, 2023
dbdc1df
feat: Continuously loop registry config synchronization
morgsmccauley Dec 21, 2023
b2d8720
chore: Remove stubbed registry contract
morgsmccauley Dec 21, 2023
6e89697
chore: Use `near-primitives` feature for `registry-types`
morgsmccauley Jan 3, 2024
67e60fd
refactor: Rename `synchronise_registry_config` -> `synchronise_block_…
morgsmccauley Jan 7, 2024
83a2d92
feat: Start executors
morgsmccauley Jan 7, 2024
2977df0
feat: Add error context
morgsmccauley Jan 7, 2024
20b0fb4
feat: Restart executors with mismatched versions
morgsmccauley Jan 8, 2024
73609d8
feat: Stop executors not in registry
morgsmccauley Jan 8, 2024
a9cf247
refactor: Configure block stream redis stream from coordinator
morgsmccauley Jan 8, 2024
240ba3d
fix: Enable (hack) mocking by pinning lake framework version
morgsmccauley Jan 8, 2024
165111f
ci: Add checks for Coordinator
morgsmccauley Jan 8, 2024
62a3334
refactor: Rename to `last_published_block` as it has not been executed
morgsmccauley Jan 8, 2024
d7aa296
chore: Remove "historical" name references from logging
morgsmccauley Jan 11, 2024
e373d4c
fix: Publish blocks to correct redis stream
morgsmccauley Jan 11, 2024
abc7272
chore: Fix spelling
morgsmccauley Jan 11, 2024
762a078
fixup! fix: Publish blocks to correct redis stream
morgsmccauley Jan 11, 2024
d250f1f
refactor: Rename for clarity
morgsmccauley Jan 11, 2024
46745d2
fix: Fix rate limiting on protoc setup
morgsmccauley Jan 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: Write last_indexed_block per block stream
morgsmccauley committed Jan 8, 2024
commit b19c607547add8fd26185cfb797319e46c570a2a
5 changes: 2 additions & 3 deletions block-streamer/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion block-streamer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -26,7 +26,7 @@ wildmatch = "2.1.1"

registry-types = { path = "../registry/types", features = ["near-primitives"] }

near-lake-framework = "0.7.4"
near-lake-framework = { git = "https://github.com/near/near-lake-framework-rs", branch = "aws-logging" }

[build-dependencies]
tonic-build = "0.10"
22 changes: 21 additions & 1 deletion block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
@@ -155,13 +155,21 @@ pub(crate) async fn start_block_stream(
);

for block in &blocks_from_index {
let block = block.to_owned();
redis_client
.xadd(
crate::redis::generate_historical_stream_key(&indexer.get_full_name()),
&[("block_height".to_string(), block.to_owned())],
&[("block_height".to_string(), block)],
)
.await
.context("Failed to add block to Redis Stream")?;
redis_client
.set(
format!("{}:last_indexed_block", indexer.get_full_name()),
block,
)
.await
.context("Failed to set last_indexed_block")?;
}

let mut last_indexed_block =
@@ -194,6 +202,14 @@ pub(crate) async fn start_block_stream(
let block_height = streamer_message.block.header.height;
last_indexed_block = block_height;

redis_client
.set(
format!("{}:last_indexed_block", indexer.get_full_name()),
last_indexed_block,
)
.await
.context("Failed to set last_indexed_block")?;

let matches = crate::rules::reduce_indexer_rule_matches(
&indexer.indexer_rule,
&streamer_message,
@@ -252,6 +268,10 @@ mod tests {
.expect_xadd::<String, u64>()
.returning(|_, _| Ok(()))
.times(expected_matching_block_height_count);
mock_redis_client
.expect_set::<String, u64>()
.returning(|_, _| Ok(()))
.times(4);

let indexer_config = crate::indexer_config::IndexerConfig {
account_id: near_indexer_primitives::types::AccountId::try_from(
16 changes: 15 additions & 1 deletion block-streamer/src/redis.rs
Original file line number Diff line number Diff line change
@@ -18,7 +18,6 @@ pub struct RedisClientImpl {
#[cfg_attr(test, mockall::automock)]
impl RedisClientImpl {
pub async fn connect(redis_connection_str: &str) -> Result<Self, RedisError> {
println!("called this");
let connection = redis::Client::open(redis_connection_str)?
.get_tokio_connection_manager()
.await?;
@@ -44,4 +43,19 @@ impl RedisClientImpl {

Ok(())
}

pub async fn set<T, U>(&self, key: T, value: U) -> Result<(), RedisError>
where
T: ToRedisArgs + Debug + Send + Sync + 'static,
U: ToRedisArgs + Debug + Send + Sync + 'static,
{
tracing::debug!("SET: {:?}, {:?}", key, value);

let mut cmd = redis::cmd("SET");
cmd.arg(key).arg(value);

cmd.query_async(&mut self.connection.clone()).await?;

Ok(())
}
}