-
Notifications
You must be signed in to change notification settings - Fork 79
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Migrate Events Processor to Use New Version Tracker Impl #560
Conversation
inserted_at -> Timestamp, | ||
is_token_v2 -> Nullable<Bool>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needed to be changed around otherwise whenever the migrations are re-ran, the ordering of the produced struct's constructor changes causing some issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to add a bit of context: this is reverting the change I made manually. this order should be always the same unless we change in sql file.
impl ToSql<Text, Pg> for BackfillStatus { | ||
fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Pg>) -> serialize::Result { | ||
match *self { | ||
BackfillStatus::InProgress => out.write_all(b"in_progress")?, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should we have the bytes in a constant var?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
let order_step = OrderByVersionStep::new( | ||
starting_version, | ||
Duration::from_secs(UPDATE_PROCESSOR_STATUS_SECS), | ||
Duration::from_secs(DEFAULT_UPDATE_PROCESSOR_STATUS_SECS), | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we actually remove the OrderByVersionStep
for the events and fa processor? They're not needed since there's no parallel processing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed.
backfill_end_version: backfill_end_version | ||
.unwrap_or(last_success_batch.metadata.end_version) | ||
as i64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If backfill_end_version
isn't set, should the table just store it as null to indicate it's unset?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good call out. This way, it'll be clear that the BFP will run forever rather than incorrectly indicating that the last_processed_version
is equal to the backfill_end_version
and status being in_progress
.
// Otherwise, return the checkpointed version + 1. | ||
return Ok( | ||
backfill_status.and_then(|status| match status.backfill_status { | ||
BackfillStatus::InProgress => Some(status.last_success_version as u64 + 1), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: let's move the + 1
's out of this function and into get_starting_version
since this function is solely responsible for get_latest_processed_version_from_db
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have to leave in the +1
s unfortunately since you can't factor it out from the different logical branches. I renamed the function to get_starting_version_from_db
to correct the misnomer.
Ok(latest_processed_version.unwrap_or( | ||
indexer_processor_config | ||
.transaction_stream_config | ||
.starting_version | ||
.unwrap_or(0), | ||
)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This changes the existing priority of the flags for regular processors which can make it hard if you're developing locally and want to start from a specific version. Could we keep the order for regular processor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you mean the use case where you want starting_version
to have higher precedence than last_processed_version?
That's a trade-off we have to make to have better restart behavior. Otherwise, the regular processor will always restart at the starting_version if that's ever set
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rtso I could add the old starting_version_override
config option lol. But rename it to starting_version_dev_override
to make it clear its for development purposes so its easier for local development. Wdyt?
This way we can prioritize production behavior while having a secondary, but still convenient, way to have the local development behavior you described
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we keep the priority the same for regular processors to match with existing devex, and establish a different priority only for backfill processors?
Once we have separate backfill jobs, head processors in prod should never need to use starting_version in the config. I'd imagine it'd only be used for development purposes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
head processors in prod should never need to use starting_version in the config
how come? if we made a logic change, wouldn't the steps be something like
- create a backfill processor from 0 to HEAD
- create a regular processor from HEAD onwards
to do #2, wouldn't we use starting_version=HEAD
where HEAD is just some recent version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! I was thinking about the current processors already running in prod, but if you're a new processor and you set starting_version=HEAD
then going forward, it should only need to refer to the latest version from DB.
And to my point of developing locally, they can also use the backfill_processor
config to start from arbitrary version!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
awesome. thanks for the discussion! yeah, or even run diesel reset to wipe the state
Purpose
Copied changes
Apply recent changes improving restart behavior from
aptos-indexer-processor-example
repo. Namely:events_processor
to useVersionTracker
step instead of theLatestVersionProcessedTracker
object (original PR)BackfillProcessorStatus
table and write to it whenbackfill_alias
is set in the config (original PR)BackfillStatus
, and improvestarting_version
selection on restart (original PR)See original PRs for more details and discussion. Design [here] (https://www.notion.so/aptoslabs/Improve-Processor-Restart-and-Gap-Detection-Behavior-9e884917f4df4f5690d09f8a59581aab).
Other changes
rust/processor
rather thanrust/sdk-processor
. But added the model underrust/sdk-processor/db/common/models
.execute...()
functions to returnResult<usize, ProcessorError>
instead ofQueryResult<usize>
. A change that has been in the example repo for a while.Remaining Work
LatestVersionProcessedTracker
Complete
when the indexer reaches theend_version
instead of crash looping lolTesting
New Cargo Tests for Starting Version
Regular Indexer Start
Starts from last check pointed version:
Starts from max(last check pointed, starting_version):
Backfill Indexer
Non-SDK Default Processor Sanity Check