Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create BackfillProcessorStatus Table and Write to it when backfill_alias is set #6

Merged
merged 13 commits into from
Oct 10, 2024

Conversation

dermanyang
Copy link
Contributor

@dermanyang dermanyang commented Oct 7, 2024

dependent on #8
cherry picked from PR in old repo structure

Purpose

Repoint backfill processors to write to the new backfill_processor_status table instead of the processor_status table so that backfill processors can run concurrently as head processors without having their checkpoints overwritten. Read more in design here.

Technical Overview

BackfillConfig

BackfillConfig.backfill_alias is introduced to IndexerProcessorConfig. This optional field introduced a new logic branch in LatestVersionTracker (saved as field backfill_mode) which writes to the new table when backfill_alias is Some (equivalently when backfill_mode is true).

The backfill_alias field is similarly used in the get_starting_version logic to determine where to read from.

ProcessorVersionSaverEnum

Created a ProcessorStatusSaverEnum for the DefaultProcessorStatusSaver and BackfillProcessorStatusSaver. This makes it easier to init the VersionTracker without exposing too much logic. See usage:

let version_tracker = VersionTrackerStep::new(
            get_processor_status_saver(self.db_pool.clone(), self.config.clone()),
            DEFAULT_UPDATE_PROCESSOR_STATUS_SECS,
        );

Where

pub fn get_processor_status_saver(
    conn_pool: ArcDbPool,
    config: IndexerProcessorConfig,
) -> ProcessorStatusSaverEnum {
    if let Some(backfill_config) = config.backfill_config {
        let txn_stream_cfg = config.transaction_stream_config;
        let backfill_start_version = txn_stream_cfg.starting_version;
        let backfill_end_version = txn_stream_cfg.request_ending_version;
        let backfill_alias = backfill_config.backfill_alias.clone();
        ProcessorStatusSaverEnum::Backfill {
            conn_pool,
            backfill_alias,
            backfill_start_version,
            backfill_end_version,
        }
    } else {
        let processor_name = config.processor_config.name().to_string();
        ProcessorStatusSaverEnum::Default {
            conn_pool,
            processor_name,
        }
    }
}

Testing

Ran a few manual tests validating existing and new behavior. Screenshots below:

Regular processor working as expected. No backfillconfig set.
image

Backfill processor starting from genesis and writing to new table. Backfillconfig.backfill_alias set to events_processor_backfill_1:
image

Backfill processor picking up where it left off (see bottom):
image

Backfill processor starting from starting_version
image

@dermanyang dermanyang changed the title Sdy/backfill table Create BackfillProcessorStatus Table and Write to it when backfill_alias is set Oct 7, 2024
@dermanyang dermanyang changed the title Create BackfillProcessorStatus Table and Write to it when backfill_alias is set [WIP] Create BackfillProcessorStatus Table and Write to it when backfill_alias is set Oct 9, 2024
@dermanyang dermanyang requested a review from rtso October 10, 2024 16:41
@dermanyang dermanyang changed the title [WIP] Create BackfillProcessorStatus Table and Write to it when backfill_alias is set Create BackfillProcessorStatus Table and Write to it when backfill_alias is set Oct 10, 2024
@dermanyang dermanyang requested a review from a team October 10, 2024 21:20
pub struct DefaultProcessorStatusSaver {
pub conn_pool: ArcDbPool,
pub processor_type: String,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we call this processor_name in other places, so let's rename it here too

)
.await?
{
Some(status) => Ok(Some(status.last_success_version as u64 + 1)),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're already here, we should probably move the +1 to a higher level, this function seems like the wrong place for it.

Copy link
Contributor Author

@dermanyang dermanyang Oct 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call. it disagrees with the name of the function. We now have:

get_starting_version(): 
  return get_latest_processed_version_from_db() + 1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants