-
Notifications
You must be signed in to change notification settings - Fork 14.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove JobRunners back reference from Job #30376
Remove JobRunners back reference from Job #30376
Conversation
4bc3d55
to
b10344c
Compare
airflow/dag_processing/manager.py
Outdated
@@ -348,951 +295,3 @@ def end(self): | |||
self._process.join(timeout=1.0) | |||
reap_process_group(self._process.pid, logger=self.log) | |||
self._parent_signal_conn.close() | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Merged this one into DagProcessorJobRunner - this way it is consistent with other Jobs - which "do something" rather than create another object and "run" it.
2bf14da
to
71920bf
Compare
71920bf
to
a858a1e
Compare
a858a1e
to
2514572
Compare
OK. I got it green finally @uranusjr -> this is the target of the refactor, I would love to merge - if possible - the 4 dependent PRs before we cut-off 2.6.0 branch to make cherry-picking easier for 2.6 branch (cc: @jedcunningham, @ephraimbuddy ) |
88b4146
to
79b62f2
Compare
68815f8
to
8e2bb8b
Compare
8e2bb8b
to
bdca1ef
Compare
8889b44
to
727f3af
Compare
727f3af
to
7ee67a6
Compare
1a251d8
to
1ce3d6a
Compare
This is the final step of decoupling of the job runner from ORM based BaseJob. After this change, finally we rich the state that the BaseJob is just a state of the Job being run, but all the logic is kept in separate "JobRunner" entity which just keeps the reference to the job. Also it makes sure that job in each runner is defined as appropriate for each job type: * SchedulerJobRunner, BackfillJobRunner can only use BaseJob * DagProcessorJobRunner, TriggererJobRunner and especially the LocalTaskJobRunner can keep both BaseJob and it's Pydantic BaseJobPydantic representation - for AIP-44 usage. The highlights of this change: * Job does not have job_runner reference any more * Job is a mandatory parameter when creating each JobRunner * run_job method takes as parameter the job (i.e. where the state of the job is called) and executor_callable - i.e. the method to run when the job gets executed * heartbeat callback is also passed a generic callable in order to execute the post-heartbeat operation of each of the job type * there is no more need to specify job_type when you create BaseJob, the job gets its type by a simply creating a runner with the job This is the final stage of refactoring that was split into reviewable stages: apache#30255 -> apache#30302 -> apache#30308 -> this PR. Closes: apache#30325
1ce3d6a
to
18b6b5b
Compare
This is the final step of decoupling of the job runner from ORM
based BaseJob. After this change, finally we rich the state that
the BaseJob is just a state of the Job being run, but all
the logic is kept in separate "JobRunner" entity which just
keeps the reference to the job. Also it makes sure that
job in each runner is defined as appropriate for each job type:
LocalTaskJobRunner can keep both BaseJob and it's Pydantic
BaseJobPydantic representation - for AIP-44 usage.
The highlights of this change:
of the job is called) and executor_callable - i.e. the method
to run when the job gets executed
to execute the post-heartbeat operation of each of the job
type
BaseJob, the job gets its type by a simply creating a runner
with the job
The JobRuner was essentially calling the processormanager that
created processors. In order to make it consistent with other
Runners - all of it has been moved into runner, so that it
starts doing "something" - similar as other runners - rather than
creating processor manager and running it.
This is the final stage of refactoring that was split into
reviewable stages: #30255 -> #30302 -> #30308 -> this PR.
Please check only the last commit.
Closes: #30325
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.