Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPIKE] implement job scheduler in flask app #4683

Closed
2 tasks
rshewitt opened this issue Apr 4, 2024 · 17 comments
Closed
2 tasks

[SPIKE] implement job scheduler in flask app #4683

rshewitt opened this issue Apr 4, 2024 · 17 comments
Assignees
Labels
H2.0/Harvest-Runner Harvest Source Processing for Harvesting 2.0

Comments

@rshewitt
Copy link
Contributor

rshewitt commented Apr 4, 2024

User Story

In order to schedule harvest jobs, datagov wants to integrate a task scheduler to our existing flask app

Acceptance Criteria

[ACs should be clearly demoable/verifiable whenever possible. Try specifying them using BDD.]

Manual

  • GIVEN our current flask app
    AND the integration of the flask scheduler library
    WHEN an http request occurs on the scheduling route
    THEN a job will be added to the job table
    AND a harvest task will be started in cf

Automated

  • GIVEN our current flask app
    AND the integration of the flask scheduler library
    WHEN a certain date/time period has been reached
    THEN a job will be added to the job table
    AND a harvest task will be started in cf

Background

[Any helpful contextual notes or links to artifacts/evidence, if needed]

Security Considerations (required)

[Any security concerns that might be implicated in the change. "None" is OK, just be explicit here!]

Sketch

[Notes or a checklist reflecting our understanding of the selected approach]

@rshewitt rshewitt added the H2.0/Harvest-Runner Harvest Source Processing for Harvesting 2.0 label Apr 4, 2024
@jbrown-xentity
Copy link
Contributor

https://pypi.org/project/Flask-APScheduler/ should be what we use to implement this...

@gujral-rei gujral-rei moved this to 📔 Product Backlog in data.gov team board Apr 4, 2024
@gujral-rei gujral-rei moved this from 📔 Product Backlog to 🏗 In Progress [8] in data.gov team board Apr 4, 2024
@rshewitt
Copy link
Contributor Author

rshewitt commented Apr 5, 2024

sqlachemy job store for apscheduler tables. seems redundant to include our existing job table. looks like custom metadata for jobs and tasks will be supported in v4.0. v4.0 progress track. looks like they have a couple of v4 pre-releases

@rshewitt
Copy link
Contributor Author

rshewitt commented Apr 5, 2024

maybe we can store our job results in the return value here?

@rshewitt
Copy link
Contributor Author

rshewitt commented Apr 5, 2024

running jobs immediately

@rshewitt
Copy link
Contributor Author

rshewitt commented Apr 5, 2024

# a way to start a job now
scheduler_object.get_job(job_id ="my_job_id").modify(next_run_time=datetime.datetime.now())

@rshewitt
Copy link
Contributor Author

rshewitt commented Apr 5, 2024

we're anticipating running the harvests as tasks in cloudfoundry. how does flask-apscheduler know when a job is complete if cf is doing the work? do we stream the cf task logs inside the job until the cf task status changes telling flask-apscheduler the task is complete? the executor isn't actually doing the work so what exactly is being multiprocessing/multithreaded?

@rshewitt
Copy link
Contributor Author

rshewitt commented Apr 5, 2024

here's a mockup of a flask-apscheduler job...

cf_handler = CFHandler() # cf interface
app = create_app() # flask app
scheduler = create_scheduler(app)


@scheduler.task("interval", id="do_job_1", seconds=60)
def job1(source_data):
    app_guuid = "0192301923"
    cf_handler.start_task( app_guuid, "python harvest.py {source_data}", "test-job" )
    # ^ this will return the submitted task when it's been registered by cf.
    # this job would most likely complete unless told to wait until something happens... 

# keep the job open until the task status has changed to "SUCCEEDED". basic polling.
@scheduler.task("interval", id="do_job_2", seconds=60)
def job2(source_data):
    app_guuid = "0192301923"
    task = cf_handler.start_task( app_guuid, "python harvest.py {source_data}", "test-job" )
    while task["status"] != "SUCCEEDED":
        task = cf_handler.get_task( task["guuid"] )
        sleep(15)

would we want to implement our own scheduling event?

@rshewitt
Copy link
Contributor Author

rshewitt commented Apr 5, 2024

implementing a custom executor

@jbrown-xentity
Copy link
Contributor

we're anticipating running the harvests as tasks in cloudfoundry. how does flask-apscheduler know when a job is complete if cf is doing the work? do we stream the cf task logs inside the job until the cf task status changes telling flask-apscheduler the task is complete? the executor isn't actually doing the work so what exactly is being multiprocessing/multithreaded?

It would be on the "job" task processor to send a post request back to the flask API, letting it know that it was done processing a job. After accepting this, other things might occur: email notification, kicking off new jobs, etc. That does require an API route being defined; that should be considered a follow up addition/feature. Tagging @btylerburton for awareness or weighing in...

@btylerburton
Copy link
Contributor

Routes and order subject to change, but it's defined in Steps 23 & 24 here
Screenshot 2024-04-08 at 6 36 55 PM

https://raw.githubusercontent.com/GSA/datagov-harvesting-logic/main/docs/diagrams/mermaid/dest/etl_pipeline-1.svg

@rshewitt
Copy link
Contributor Author

rshewitt commented Apr 9, 2024

job processing workflow in apscheduler

  • after the scheduler has determined that a job needs to be run the job is submitted to the thread/process pool (source)
  • run_job runs the callable assigned to the job (source) and returns events to be dispatched afterwards.
  • assuming no exception occurs during job processing, after the job has completed a JobExecutionEvent(EVENT_JOB_EXECUTED, ...) is added to the return event list (source). the job is done WOOHOO!
  • within the context of the thread/process pool, this job is a future. when the callable of the job is done (which by this time is true) the future callback is run. this is when the events are dispatched to the scheduler (source and source)

what we want to do

  • run harvest.py in cloud foundry as a task.
  • when that task completes we want to post to a route in our flask app indicating the job is done. what does this look like?
harvest_source = HarvestSource( data...) 
harvest_source.get_records_changes()
harvest_source.synchronize_records()

harvest_source.submit_job_complete() # as an example

#...
def submit_job_complete(self, data...):
  requests.post(url, body=data) # post to our flask app


#...meanwhile, in our flask app
@mod.route('/harvest_jobs/result', methods=['POST'])
def harvest_job_result():
    # here's where the job processing workflow from before comes into play...
    # we need to ensure a JobExecutionEvent(EVENT_JOB_EXECUTED, ...) event is dispatched to the proper listener(s).
    # this is where I have pause and things start to breakdown in a way where we're starting to call the things apscheduler is supposed to handle

my conclusion, i want to have apscheduler be responsible for its things. i don't want to call things we're not supposed to call. i could be thinking of this wrong though! any thoughts would be appreciated.

@btylerburton
Copy link
Contributor

#this is where I have pause and things start to breakdown in a way where we're starting to call the things apscheduler is supposed to handle

If I'm following your question, the flask app would call the DB here for info that the task has logged. Step 27 & 28 above.

@rshewitt
Copy link
Contributor Author

rshewitt commented Apr 9, 2024

apscheduler has a datastore to keep track of tasks, jobs, and schedules. would this not replace our job table in the harvest db?

@btylerburton
Copy link
Contributor

IMO the Job Table needs to exit as persistent storage for historical metrics.

Given the above questions, it feels like a good time to ask exactly what we're planning to use the scheduler for.

I have:

  • tasks scheduling queue
  • cron functions to trigger insertion to scheduling queue

We can delegate simple queuing to a redis instance, but it's worth listing all the things we want appscheduler to be responsible for first before we write it off.

@rshewitt
Copy link
Contributor Author

rshewitt commented Apr 9, 2024

apscheduler supports sqlalchemy for persistent storage in postgres. (as an example). i'm not sure what level of persistence this supports though ( e.g. does it retain historic job runs )

@rshewitt
Copy link
Contributor Author

draft pr. so far i haven't been able to get the scheduler to start on container start in a non-hacky way. at this point, i've been able to get it to work as a healthcheck.

@btylerburton
Copy link
Contributor

we can park this work for the time being since we're not moving forward with apscheduler.

@rshewitt rshewitt moved this from 🏗 In Progress [8] to 👀 Needs Review [2] in data.gov team board Apr 22, 2024
@rshewitt rshewitt moved this from 👀 Needs Review [2] to ✔ Done in data.gov team board Apr 24, 2024
@rshewitt rshewitt changed the title implement job scheduler in flask app [SPIKE] implement job scheduler in flask app Apr 30, 2024
@hkdctol hkdctol moved this from ✔ Done to 🗄 Closed in data.gov team board May 2, 2024
@gujral-rei gujral-rei mentioned this issue May 28, 2024
1 task
@github-project-automation github-project-automation bot moved this from 🗄 Closed to ✔ Done in data.gov team board Sep 3, 2024
@btylerburton btylerburton moved this from ✔ Done to 🗄 Closed in data.gov team board Sep 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
H2.0/Harvest-Runner Harvest Source Processing for Harvesting 2.0
Projects
Archived in project
Development

No branches or pull requests

3 participants