Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue #3899] Task to Add new record into OpportunityVersion table #4061

Open
wants to merge 24 commits into
base: main
Choose a base branch
from

Conversation

babebe
Copy link
Collaborator

@babebe babebe commented Feb 28, 2025

Summary

Fixes #{3899}

Time to review: 15 mins

Changes proposed

StoreOpportunityVersionTask has been implemented to call saved_opportunity_version whenever there’s an update in the OpportunityChangeAudit table for any opportunity since the last task run. Added as a 4th step to the load_transform job.
The saved_opportunity_version utility has been updated to invoke the diff_nested_dicts function. If there’s a difference between the latest saved opportunity version and the current opportunity, a new record is created in the OpportunityVersion table.
Added JobLogFactory
Added tests.

@babebe babebe added the draft Not yet ready for review label Feb 28, 2025
select(Opportunity).where(Opportunity.opportunity_id == opp.opportunity_id)
).scalar_one()
# Get Json
opportunity_v1 = SCHEMA.dump(opportunity)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You made a utility for loading these records into that table, we should be using that as much as possible - I know it's used below, but re-converting something into a schema multiple times is a bit wasteful.

We'd likely need to add logic to that method of "only add if there is a diff", but we should keep that contained in one place.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the logic to check for detecting diffs to save_opportunity_version function. And the task only checks if there is a change in the OpportunityChangeAudit table.

select(JobLog)
.where(JobLog.job_type == self.cls_name())
.where(
or_(JobLog.job_status == JobStatus.COMPLETED, JobLog.job_status == JobStatus.FAILED)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the job failed, we probably don't want to count it.

latest_time = (
latest_job.created_at
if latest_job
else get_now_us_eastern_datetime() - timedelta(hours=24)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DB values are stored in UTC, use utcnow whenever you do a comparison with now against something in the DB.

I think if there hasn't been a prior job, we should actually just grab everything (ie. pick a datetime of like 1970-01-01) since we want to backfill these records anyways.

Comment on lines 55 to 64
updated_opportunities = self.db_session.scalars(
select(OpportunityChangeAudit).where(OpportunityChangeAudit.updated_at > latest_time)
).all()

for opp in updated_opportunities:
# Get Opportunity object
opportunity = self.db_session.execute(
select(Opportunity).where(Opportunity.opportunity_id == opp.opportunity_id)
).scalar_one()
# Get Json
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to write a query when the relationship already is that:

Suggested change
updated_opportunities = self.db_session.scalars(
select(OpportunityChangeAudit).where(OpportunityChangeAudit.updated_at > latest_time)
).all()
for opp in updated_opportunities:
# Get Opportunity object
opportunity = self.db_session.execute(
select(Opportunity).where(Opportunity.opportunity_id == opp.opportunity_id)
).scalar_one()
# Get Json
opportunity_change_audits = self.db_session.scalars(
select(OpportunityChangeAudit).where(OpportunityChangeAudit.updated_at > latest_time)
).all()
for opp_change_audit in opportunity_change_audits:
opportunity = opp_change_audit.opportunity

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right! Updated!

Comment on lines 68 to 72
latest_versioned_opp = self.db_session.execute(
select(OpportunityVersion).where(
OpportunityVersion.opportunity_id == opp.opportunity_id
)
).scalar_one_or_none()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if there are multiple versions?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ordered by created_at desc

@babebe babebe requested a review from chouinar March 6, 2025 18:35
@babebe babebe removed the draft Not yet ready for review label Mar 6, 2025
Comment on lines 29 to 33
latest_opp_version = db_session.execute(
select(OpportunityVersion)
.where(OpportunityVersion.opportunity_id == opportunity.opportunity_id)
.order_by(OpportunityVersion.created_at.desc())
).scalar_one_or_none()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One additional recommendation:

Suggested change
latest_opp_version = db_session.execute(
select(OpportunityVersion)
.where(OpportunityVersion.opportunity_id == opportunity.opportunity_id)
.order_by(OpportunityVersion.created_at.desc())
).scalar_one_or_none()
latest_opp_version = db_session.execute(
select(OpportunityVersion)
.where(OpportunityVersion.opportunity_id == opportunity.opportunity_id)
.order_by(OpportunityVersion.created_at.desc())
.options(selectinload("*"))
).scalar_one_or_none()

Won't have any obvious affect on how the code works, but makes it so SQLAlchemy will select all parts of an opportunity in one set of queries rather than lazy-loading every relationship which each individually fire a select query off to the database. When we have a job that'll iterate over tens of thousands of records, it'll make it much faster (I tested adding it with set-current-opportunities and I want to say it was 70x faster?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes indeed! Updated , thanks

Comment on lines 37 to 47
opportunity_existing = latest_opp_version.opportunity_data if latest_opp_version else {}

diffs = diff_nested_dicts(opportunity_new, opportunity_existing)
if diffs:
# Add new OpportunityVersion instance to the database session
opportunity_version = OpportunityVersion(
opportunity_id=opportunity.opportunity_id,
opportunity_data=opportunity_new,
)

db_session.add(opportunity_version)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can put a small optimization/short circuit in the case where there is no existing since there will always be a diff.

Something like:

Suggested change
opportunity_existing = latest_opp_version.opportunity_data if latest_opp_version else {}
diffs = diff_nested_dicts(opportunity_new, opportunity_existing)
if diffs:
# Add new OpportunityVersion instance to the database session
opportunity_version = OpportunityVersion(
opportunity_id=opportunity.opportunity_id,
opportunity_data=opportunity_new,
)
db_session.add(opportunity_version)
diffs = {}
if latest_opp_version:
diffs = diff_nested_dicts(opportunity_new, latest_opp_version.opportunity_data)
if diffs or latest_opp_version is None:
# Add new OpportunityVersion instance to the database session
opportunity_version = OpportunityVersion(
opportunity_id=opportunity.opportunity_id,
opportunity_data=opportunity_new,
)
db_session.add(opportunity_version)

Comment on lines 19 to 25
@task_blueprint.cli.command(
"store-opportunity-version",
help="Store a new opportunity version if an opportunity has been updated",
)
@flask_db.with_db_session()
def store_opportunity_version(db_session: db.Session) -> None:
StoreOpportunityVersionTask(db_session).run()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need a way to run this as a task directly, we want to add it as a 4th step of the existing transform jobs: https://github.com/HHS/simpler-grants-gov/blob/main/api/src/data_migration/command/load_transform.py

Comment on lines 32 to 33
def __init__(self, db_session: db.Session) -> None:
super().__init__(db_session)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're not changing the init function, can exclude it and just use the one from task

Suggested change
def __init__(self, db_session: db.Session) -> None:
super().__init__(db_session)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

Comment on lines 48 to 56
updated_opportunities_change_audit = self.db_session.scalars(
select(OpportunityChangeAudit).where(OpportunityChangeAudit.updated_at > latest_time)
).all()

for oca in updated_opportunities_change_audit:
# Get Opportunity object
opportunity = self.db_session.execute(
select(Opportunity).where(Opportunity.opportunity_id == oca.opportunity_id)
).scalar_one()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to query for the opportunity separately - the change audit object has a relationship to it. Also like I commented elsewhere, performance will be a lot better adding the selectinload bit.

Suggested change
updated_opportunities_change_audit = self.db_session.scalars(
select(OpportunityChangeAudit).where(OpportunityChangeAudit.updated_at > latest_time)
).all()
for oca in updated_opportunities_change_audit:
# Get Opportunity object
opportunity = self.db_session.execute(
select(Opportunity).where(Opportunity.opportunity_id == oca.opportunity_id)
).scalar_one()
updated_opportunities_change_audit = self.db_session.scalars(
select(OpportunityChangeAudit)
.where(OpportunityChangeAudit.updated_at > latest_time)
.options(selectinload("*"))
).all()
for oca in updated_opportunities_change_audit:
opportunity = oca.opportunity

StoreOpportunityVersionTask(db_session).run()


SCHEMA = OpportunityV1Schema()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think this is used here anymore

Suggested change
SCHEMA = OpportunityV1Schema()

model = task_models.JobLog

job_id = Generators.UuidObj
job_type = factory.LazyAttribute(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the sort of field that if you want to use a factory for it, you should pass the value in yourself since a random value is going to be weird with tests (since you usually care a lot about the exact value).

Copy link
Collaborator Author

@babebe babebe Mar 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed. Can optionally do a validation?

job_type = factory.LazyAttribute(
lambda _: random.choice(["StoreOpportunityVersionTask", "SetCurrentOpportunitiesTask"])
)
job_status = factory.Iterator(JobStatus)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably say to just make this always the success value since most tests would only care about that (or want to override it to started/failed).

latest_job = self.db_session.scalars(
select(JobLog)
.where(JobLog.job_type == self.cls_name())
.where(or_(JobLog.job_status == JobStatus.COMPLETED))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.where(or_(JobLog.job_status == JobStatus.COMPLETED))
.where(JobLog.job_status == JobStatus.COMPLETED)

@babebe babebe requested a review from chouinar March 7, 2025 15:43
Comment on lines 36 to 38
@click.option(
"--store-version/--no-store-version", default=True, help="run StoreOpportunityVersionTask"
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us default this to not being enabled, I think we should test the behavior out manually first to be safe.

Comment on lines 21 to 28
@pytest.fixture(autouse=True)
def clear_db(self, db_session):
opportunities = db_session.query(Opportunity).all()
for opp in opportunities:
db_session.delete(opp)

db_session.execute(delete(OpportunityVersion))
db_session.execute(delete(OpportunityChangeAudit))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nitpick, technically those two deletes do nothing because they had to have been deleted for the opportunity deletes to work (the cascade="all, delete-orphan" on the relationships tells SQLAlchemy to cleanup orphaned records if the opportunity itself is deleted).

Suggested change
@pytest.fixture(autouse=True)
def clear_db(self, db_session):
opportunities = db_session.query(Opportunity).all()
for opp in opportunities:
db_session.delete(opp)
db_session.execute(delete(OpportunityVersion))
db_session.execute(delete(OpportunityChangeAudit))
@pytest.fixture(autouse=True)
def clear_db(self, db_session):
opportunities = db_session.query(Opportunity).all()
for opp in opportunities:
db_session.delete(opp)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats right ! Deleted

Comment on lines +32 to +36
for opp_change_audit in opportunity_change_audits:
opportunity = opp_change_audit.opportunity

# Store to OpportunityVersion table
save_opportunity_version(self.db_session, opportunity)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something I hadn't thought of until now, but has been briefly mentioned before - if an opportunity is a draft, we don't want to store anything for it yet in the version table.

I think it would make more sense to put that check in the save_opportunity_version function so when we use it elsewhere, we don't need to implement that twice, but I'm not 100% on that behavior.

Should be just a simple if statement, but apologies for not mentioning it sooner.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NP, added check.

@@ -74,5 +74,7 @@ def diff_nested_dicts(dict1: dict, dict2: dict) -> list:

def _convert_iterables_to_set(data: Any) -> Any:
if isinstance(data, (list, tuple)):
if data and isinstance(data[0], (dict, list)): # test with applicant typen
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this comment still be here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope. Removed

@babebe babebe requested a review from chouinar March 7, 2025 19:37
Copy link
Collaborator

@chouinar chouinar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two minor nitpicks, LGTM otherwise

Comment on lines +36 to +38
@click.option(
"--store-version/--no-store-version", default=False, help="run StoreOpportunityVersionTask"
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit - put this option between the set-current and insert-chunk-size options. Since these options are big "turn on/off parts of the job" and the other options are configurational.


schema_data = SCHEMA.dump(opportunity)
if not opportunity.is_draft:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit - flip the logic here so the whole function doesn't need an indentation level

Suggested change
if not opportunity.is_draft:
if opportunity.is_draft:
return
...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Create a task that adds a version to the opportunity version table after the transformations run
2 participants