Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move DB upserts to it's own batch of tasks via channel #288

Open
wants to merge 11 commits into
base: main
Choose a base branch
from

Conversation

CapCap
Copy link
Contributor

@CapCap CapCap commented Feb 21, 2024

Worker Flow

  • The application starts by initializing a Worker instance with the necessary configurations such as the processor
    configuration, database connection string, GRPC data service address, and other parameters.

  • The Worker instance then runs migrations on the database to ensure the schema is up-to-date.

  • The Worker fetches the chain ID from the GRPC service and verifies it against the database.

  • The Worker then starts a fetcher task that continuously fetches transactions from the GRPC stream and writes them into a channel.
    The number of transactions fetched in each batch is determined by the pb_channel_txn_chunk_size parameter.

  • Concurrently, the Worker also starts multiple processor tasks that consume the transactions from the channel.
    These tasks process the transactions in parallel.
    The number of processor tasks is determined by the number_concurrent_processing_tasks parameter.
    TODO: is this right? The size of the channel is determined by the PB_FETCH_QUEUE_SIZE parameter.

  • Each processor task uses a specific Processor instance to process the transactions.
    The type of Processor used depends on the configuration provided when initializing the Worker.
    Each Processor type corresponds to a different way of processing transactions.

  • After processing the transactions, the processor tasks send the results to a gap detector.
    The gap detector checks for any gaps in the processed transactions and panics if it finds any.
    The maximum batch size for gap detection is determined by the gap_detection_batch_size parameter.

  • The processed transactions are also sent to the DbWriter instance associated with the Processor, via a channel.
    The DbWriter is responsible for writing the processed transactions to the database ("executing" them).
    The size of the channel is determined by the query_executor_channel_size parameter.
    The number of concurrent DB writer tasks is determined by the number_concurrent_db_writer_tasks parameter.

  • The DbWriter sends the transactions to be written to the database in chunks.
    It uses an AsyncSender to send QueryGenerator instances to a DB writer task.
    Each QueryGenerator contains a table name and a DbExecutable instance, which represents the transactions to be written to the database.
    The chunk size for sending queries to the database is determined by the per_table_chunk_sizes parameter;
    this parameter specifies the maximum number of rows to be inserted in a single query. It is a map from table name to chunk size.

  • The DB writer task executes the queries represented by the DbExecutable instances.
    If an error occurs during execution, it logs the error and continues with the next query.

  • This process continues in a loop, with the fetcher task fetching transactions,
    the processor tasks processing them, and the DB writer task writing them to the database,
    and the gap detector ensuring that if there is a large gap in the transactions, it panics.

# Architecture Diagram

 ┌──────────────┐
 │ GRPC Service │
 └─────┬────────┘
       │Stream
 ┌─────▼────────┐ Transaction ┌───────────┐
 │ Fetcher Task ├────Chunk ───► Processor │
 └─────┬────────┘   Channel   │ Tasks     │
       │ Channel              └────┬──────┘
 ┌─────▼────────┐                  │ DbWriter
 │ Gap Detector │                  │ Channel
 └──────────────┘             ┌────▼────────┐
                              │ DB Executor │
                              └────┬────────┘
                                   │
                              ┌────▼─────┐
                              │ Database │
                              └──────────┘

Also adds the ability to tune insert sizes per table, and skip writes to tables entirely 😄

@just-in-chang just-in-chang self-requested a review February 22, 2024 19:35
@CapCap CapCap changed the base branch from main to arc_in_processors February 29, 2024 00:54
@CapCap CapCap force-pushed the processor_channel_to_db_writers branch 2 times, most recently from 4aba938 to e38eebd Compare February 29, 2024 01:02
@CapCap CapCap requested a review from rtso February 29, 2024 04:31

let mut res = self.execute_query(conn.clone()).await;

// TODO: HAVE BETTER RETRY LOGIC HERE?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible exponential backoff here like we talked about 😁

rust/processor/src/db_writer.rs Outdated Show resolved Hide resolved
rust/processor/src/db_writer.rs Outdated Show resolved Hide resolved
@@ -161,19 +163,19 @@ impl ProcessorTrait for NftMetadataProcessor {
.await?;
}

let db_insertion_duration_in_secs = db_insertion_start.elapsed().as_secs_f64();
let db_channel_insertion_duration_in_secs = db_insertion_start.elapsed().as_secs_f64();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just using the duration it takes to send all the PubSub messages for this metric since I'm not writing to Postgres at all. Not sure if we should just set this to 0 so it doesn't affect the distribution? Not sure if it matters at all but I can make that change haha

rust/processor/src/processors/stake_processor.rs Outdated Show resolved Hide resolved
@CapCap CapCap force-pushed the arc_in_processors branch 5 times, most recently from d459a44 to 5726900 Compare March 6, 2024 19:50
Base automatically changed from arc_in_processors to main March 6, 2024 20:23
@CapCap CapCap force-pushed the processor_channel_to_db_writers branch 5 times, most recently from 83cbe81 to 56e03ca Compare March 7, 2024 22:03
@ying-w ying-w marked this pull request as ready for review March 8, 2024 02:21
rust/processor/src/config.rs Show resolved Hide resolved
rust/processor/src/db_writer.rs Show resolved Hide resolved
rust/processor/src/db_writer.rs Outdated Show resolved Hide resolved
@CapCap CapCap force-pushed the processor_channel_to_db_writers branch from 7fd40ee to 35c9ef2 Compare March 9, 2024 00:35
CapCap added 3 commits March 11, 2024 12:38
Remove transactions and make migrations not async

lint

fmt

clippy

diesel async TLS sslrootcert

lint

lint

support optional tls

format

more log

parallel inserst

lint

bigger pool

pool size 200

try bigger buffer

try fixed 100 insert size

use ahash + update rust

smaller batches, bigger pool

increase pool size to 800

small refac for readability

increase buffer to 150

try batch size 20

back to 100 buffer

refactor grpc into separate file

lint

try 40mb buffers

insert of 10 again

ARC instead of cloning txns

lint

avoid another clone

try size 50

try 100

tryp 65

Change threading model for higher parallelism and throughput (#249)

Co-authored-by: jillxuu <[email protected]>

clean

cleanup

try 200 connections

coin processor spawn blocking

sleep well

ARC and consistent parallelism

database parallelism undo

no CDB compat

Use gap detector

Don't panic in gaps

TEMP CHANGE FOR LOAD TEST

send in chunks

gap detector bigger

parallel writes to db

try chunks of 40

5k gap

fix channel length

post load test cleanup

temporary execute in chunks

cleanup and comments

Add config for table chunk size

cleanup
progress

more progress

more trying

temp pause

migrating over

using traits

lint

lint and clean

lint
@CapCap CapCap force-pushed the processor_channel_to_db_writers branch from 35c9ef2 to 4a52342 Compare March 11, 2024 19:45

// A holder struct for processors db writing so we don't need to keep adding new params
#[derive(Clone)]
pub struct DbWriter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel this should be called as DbQueryGenerator, QueryGenerator should be called as Query, and launch_db_writer_task should be DbWriter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added DataWithQuery- let me know if you think the current is more or less clear?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still feel the QueryGenerator is a weird name, especially when it doesn't have a method called generate.


pub fn launch_db_writer_task(
query_receiver: AsyncReceiver<QueryGenerator>,
processor_name: &'static str,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be better to pass this per query?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to move this to a trait or OOP or something instead of all this passing

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I just want to decouple the writer from the processor (especially you only need this for logging/metrics purpose), to allow us potentially running multiple processors together in a process.

let tasks = (0..num_tasks)
.map(|_| launch_db_writer_task(query_receiver.clone(), processor_name, conn.clone()))
.collect::<Vec<_>>();
futures::future::try_join_all(tasks)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you probably want a separate runtime for this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, I probably do- will do those changes in a follow up PR

@CapCap CapCap force-pushed the processor_channel_to_db_writers branch from aa91bac to fa1a28d Compare March 14, 2024 22:00
@CapCap CapCap force-pushed the processor_channel_to_db_writers branch from fa1a28d to cbd47d6 Compare March 14, 2024 22:23
}
}

/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove this?


pub fn launch_db_writer_task(
query_receiver: AsyncReceiver<QueryGenerator>,
processor_name: &'static str,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I just want to decouple the writer from the processor (especially you only need this for logging/metrics purpose), to allow us potentially running multiple processors together in a process.

pub db_executable: Box<dyn DbExecutable>,
}

pub fn launch_db_writer_task(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: don't need pub?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will always pub everything because the number of times something being pub that shouldn't be has hurt me across my career is 0, whereas the number of times i've had to fork things because things arent public is somewhere around 1000 :-P

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my experience was the other way around 🤔 I heavily rely on visibility to reason about the design when reading other people's code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ex: diesel doesnt expose table names; we'd need a proc macro, or to fork, or PR it, etc. Easily avoidable mess 🙃

}
}

pub fn diesel_error_to_metric_str(error: &Error) -> &'static str {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: don't need pub?


// A holder struct for processors db writing so we don't need to keep adding new params
#[derive(Clone)]
pub struct DbWriter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we use pub(crate) when possible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

)
.await;
query_res.expect("Error executing query");
drop(query_generator);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to explicitly drop here?
should we drop it in a background thread?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I explicitly drop so that I can be guaranteed it holds across this unsafe block


// A holder struct for processors db writing so we don't need to keep adding new params
#[derive(Clone)]
pub struct DbWriter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still feel the QueryGenerator is a weird name, especially when it doesn't have a method called generate.

}
}

pub fn chunk_size<Item: field_count::FieldCount>(&self, table_name: &str) -> usize {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related to this PR, but one day we probably also need to have limit on bytes size in addition to # of rows.

let chunk_size = self.chunk_size::<Item>(table_name);
let chunks = get_chunks(items_to_insert.len(), chunk_size);
for (start_ind, end_ind) in chunks {
let items = items_to_insert[start_ind..end_ind].to_vec();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it a copy here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is :-( I think there's likely a way to avoid it (and I didn't want pop or whatever) but I did not do it yet

@@ -10,7 +10,7 @@ const RUNTIME_WORKER_MULTIPLIER: usize = 2;

fn main() -> Result<()> {
let num_cpus = num_cpus::get();
let worker_threads = (num_cpus * RUNTIME_WORKER_MULTIPLIER).max(16);
let worker_threads = num_cpus * RUNTIME_WORKER_MULTIPLIER;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related to this PR, but why do we need a multiplier here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hyperthreading? IDK aptos-core had this I think

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

num_cpus::get() actually considers hyperthreading.

Comment on lines +66 to +69
// TODO: make this config/constant?
fn max_retries(&self) -> usize {
2
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have #321 to make num retries and timeout into config params for the 1.10 release, could follow smtg similar?

Copy link
Contributor Author

@CapCap CapCap Mar 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes- I want to wait until we can pipe configs through to processors etc more easily

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this PR, but in the future we should prob have a system that allows each processor to have a "context" that can be accessed/passed around from where ever... The context would just contain processor specific stuff like Arcs of channel txs/rxs, in-memory cache, config, etc

Been thinking of ways to do this but think the best is just to have a trait in SDK 🤣

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100000%%%%

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants