Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] add driver/executor pod in Spark #3016

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

machichima
Copy link
Contributor

@machichima machichima commented Dec 20, 2024

Tracking issue

Related to flyteorg/flyte#4105

Why are the changes needed?

This PR update the flytekit-spark package to configure driver pod and executor pod separately using PodTemplate. Enable setting the separate primary_container_name for driver/executor pod separate from the task podTemplate.

What changes were proposed in this pull request?

Add driver_pod and executor_pod field with type PodTemplate in SparkJob.

How was this patch tested?

  1. Adding unit test test_spark_driver_executor_podSpec
  2. Modified the @task for hello_spark function in my_spark example here as follow to set the driver_pod and executor_pod.
custom_image = ImageSpec(
    python_version="3.9",
    registry="ghcr.io/machichima",
    packages=["flytekitplugins-spark"],
)


driver_pod = PodTemplate(
    labels={"lKeyA_d": "lValA", "lKeyB_d": "lValB"},
    annotations={"aKeyA_d": "aValA", "aKeyB_d": "aValB"},
    primary_container_name="driver-primary",
    pod_spec=V1PodSpec(
        containers=[
            V1Container(
                name="driver-primary",
                image="ghcr.io/machichima",
                command=["echo"],
                args=["wow"],
                env=[V1EnvVar(name="x/custom-driver", value="driver")],
            ),
            V1Container(
                name="driver-primary",
                image="ghcr.io/machichima",
                command=["echo"],
                args=["wow"],
                # resources=V1ResourceRequirements(requests={"memory": "1000M"}),
                env=[V1EnvVar(name="x/custom-driver-1", value="driver-1")],
            ),
            V1Container(
                name="not-primary",
                command=["echo"],
                args=["not_primary"],
            ),
        ],
        tolerations=[
            V1Toleration(
                key="x/custom-driver",
                operator="Equal",
                value="foo-driver",
                effect="NoSchedule",
            ),
        ],
    ),
)

executor_pod = PodTemplate(
    labels={"lKeyA_e": "lValA", "lKeyB_e": "lValB"},
    annotations={"aKeyA_e": "aValA", "aKeyB_e": "aValB"},
    primary_container_name="executor-primary",
    pod_spec=V1PodSpec(
        containers=[
            V1Container(
                name="executor-primary",
                image="ghcr.io/machichima",
                command=["echo"],
                args=["wow"],
                env=[V1EnvVar(name="x/custom-executor", value="executor")],
            ),
            V1Container(
                name="not-primary",
                command=["echo"],
                args=["not_primary"],
            ),
        ],
        tolerations=[
            V1Toleration(
                key="x/custom-executor",
                operator="Equal",
                value="foo-executor",
                effect="NoSchedule",
            ),
        ],
    ),
)

@task(
    cache_version="0.1.0",
    task_config=Spark(
        # This configuration is applied to the Spark cluster
        spark_conf={
            "spark.executor.cores": "1",
            "spark.executor.instances": "2",
            "spark.driver.cores": "1",
            "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar",
        },
        driver_pod=driver_pod,
        executor_pod=executor_pod,
    ),
    # limits=Resources(cpu="50m", mem="2000M"),
    container_image=custom_image,
    pod_template=PodTemplate(
        labels={"lKeyA": "lValA", "lKeyB": "lValB"},
        annotations={"aKeyA": "aValA", "aKeyB": "aValB"},
        pod_spec=V1PodSpec(
            containers=[
                V1Container(
                    name="primary",
                    image=custom_image,
                    command="echo",
                    args=["wow"],
                ),
            ],
            tolerations=[
                V1Toleration(
                    key="x/custom",
                    operator="Equal",
                    value="foo",
                    effect="NoSchedule",
                ),
            ],
        ),
    ),
)

Verify the pods have Tolerations and EnvVar set.

❯ kubectl describe sparkapplications.sparkoperator.k8s.io -n flytesnacks-development acjqns8twhn6sg5l6dnj-n0-0 | grep "Tolerations:" -A 8
    Tolerations:
      Effect:    NoSchedule
      Key:       x/custom
      Operator:  Equal
      Value:     foo
      Effect:    NoSchedule
      Key:       x/custom-driver
      Operator:  Equal
      Value:     foo-driver
--
    Tolerations:
      Effect:             NoSchedule
      Key:                x/custom
      Operator:           Equal
      Value:              foo
      Effect:             NoSchedule
      Key:                x/custom-executor
      Operator:           Equal
      Value:              foo-executor
❯ kubectl describe sparkapplications.sparkoperator.k8s.io -n flytesnacks-development acjqns8twhn6sg5l6dnj-n0-0 | grep "Name:        x/custom-driver" -A 1
      Name:        x/custom-driver
      Value:       driver
❯ kubectl describe sparkapplications.sparkoperator.k8s.io -n flytesnacks-development acjqns8twhn6sg5l6dnj-n0-0 | grep "Name:        x/custom-executor" -A 1
      Name:        x/custom-executor
      Value:       executor

Setup process

Screenshots

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

flyteorg/flyte#6085

Docs link

Summary by Bito

Enhanced flytekit-spark package by implementing configurable driver and executor pod support through PodTemplate. Added driver_pod and executor_pod fields to SparkJob model with primary_only flag for pod spec serialization. The implementation includes type hint updates from K8sPod to PodTemplate, parameter order modifications, and improved SparkSession cleanup in tests. This enables granular control and customization of labels, annotations, containers, and tolerations for both driver and executor pods.

Unit tests added: True

Estimated effort to review (1-5, lower is better): 2

Copy link

welcome bot commented Dec 20, 2024

Thank you for opening this pull request! 🙌

These tips will help get your PR across the finish line:

  • Most of the repos have a PR template; if not, fill it out to the best of your knowledge.
  • Sign off your commits (Reference: DCO Guide).

@machichima machichima force-pushed the 4105-spark-driver-executor-podtemplate branch from af03383 to 7793398 Compare January 18, 2025 13:04
@flyte-bot
Copy link
Contributor

flyte-bot commented Jan 18, 2025

Code Review Agent Run #3c7587

Actionable Suggestions - 2
  • plugins/flytekit-spark/flytekitplugins/spark/models.py - 1
    • Missing pod config in overrides method · Line 79-80
  • plugins/flytekit-spark/flytekitplugins/spark/task.py - 1
Additional Suggestions - 1
  • plugins/flytekit-spark/tests/test_spark_task.py - 1
Review Details
  • Files reviewed - 3 · Commit Range: 8cc081d..7793398
    • plugins/flytekit-spark/flytekitplugins/spark/models.py
    • plugins/flytekit-spark/flytekitplugins/spark/task.py
    • plugins/flytekit-spark/tests/test_spark_task.py
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

@flyte-bot
Copy link
Contributor

flyte-bot commented Jan 18, 2025

Changelist by Bito

This pull request implements the following key changes.

Key Change Files Impacted
Feature Improvement - Enhanced Spark Pod Configuration Support

utils.py - Added Spark-specific pod template serialization logic

models.py - Added driver_pod and executor_pod fields to SparkJob model

task.py - Implemented driver and executor pod configuration in Spark task

test_spark_task.py - Added comprehensive tests for driver/executor pod configuration

Comment on lines +79 to +80
driver_pod=self.driver_pod,
executor_pod=self.executor_pod,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing pod config in overrides method

Consider adding driver_pod and executor_pod to the with_overrides method to ensure consistent pod configuration overrides.

Code suggestion
Check the AI-generated fix before applying
 @@ -56,6 +56,8 @@ def with_overrides(
          new_spark_conf: Optional[Dict[str, str]] = None,
          new_hadoop_conf: Optional[Dict[str, str]] = None,
          new_databricks_conf: Optional[Dict[str, Dict]] = None,
 +        new_driver_pod: Optional[K8sPod] = None,
 +        new_executor_pod: Optional[K8sPod] = None,
      ) -> "SparkJob":
          if not new_spark_conf:
              new_spark_conf = self.spark_conf
 @@ -66,6 +68,12 @@ def with_overrides(
          if not new_databricks_conf:
              new_databricks_conf = self.databricks_conf
 
 +        if not new_driver_pod:
 +            new_driver_pod = self.driver_pod
 +
 +        if not new_executor_pod:
 +            new_executor_pod = self.executor_pod
 +
          return SparkJob(
              spark_type=self.spark_type,
              application_file=self.application_file,
 @@ -74,6 +82,8 @@ def with_overrides(
              hadoop_conf=new_hadoop_conf,
              databricks_conf=new_databricks_conf,
              databricks_instance=self.databricks_instance,
 +            driver_pod=new_driver_pod,
 +            executor_pod=new_executor_pod,
              executor_path=self.executor_path,
          )

Code Review Run #3c7587


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

@@ -176,6 +185,22 @@ def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]:

return MessageToDict(job.to_flyte_idl())

def to_k8s_pod(self, pod_template: PodTemplate | None, settings: SerializationSettings) -> K8sPod | None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding return type hints

Consider adding type hints for the return value of _get_container() in the to_k8s_pod() method. The method appears to use this internal method but its return type is not clearly specified in the type hints.

Code suggestion
Check the AI-generated fix before applying
Suggested change
def to_k8s_pod(self, pod_template: PodTemplate | None, settings: SerializationSettings) -> K8sPod | None:
def to_k8s_pod(self, pod_template: PodTemplate | None, settings: SerializationSettings) -> K8sPod | None:
from flytekit.models import task as _task_model
_get_container: Callable[..., _task_model.Container] = self._get_container

Code Review Run #3c7587


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Take the container with name set in driver/executor podTempalte
primary_container_name

Signed-off-by: machichima <[email protected]>
Copy link

codecov bot commented Jan 19, 2025

Codecov Report

Attention: Patch coverage is 57.14286% with 3 lines in your changes missing coverage. Please review.

Project coverage is 76.84%. Comparing base (edbf992) to head (167a390).
Report is 28 commits behind head on master.

Files with missing lines Patch % Lines
flytekit/core/utils.py 57.14% 1 Missing and 2 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #3016      +/-   ##
==========================================
- Coverage   78.19%   76.84%   -1.35%     
==========================================
  Files         201      205       +4     
  Lines       21274    21621     +347     
  Branches     2733     2762      +29     
==========================================
- Hits        16635    16615      -20     
- Misses       3843     4242     +399     
+ Partials      796      764      -32     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@flyte-bot
Copy link
Contributor

flyte-bot commented Jan 19, 2025

Code Review Agent Run #f512d4

Actionable Suggestions - 0
Review Details
  • Files reviewed - 2 · Commit Range: 7793398..b21d1e3
    • flytekit/core/utils.py
    • plugins/flytekit-spark/flytekitplugins/spark/task.py
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

Exclude those in the podTemplate of spark driver/executor pod

Signed-off-by: machichima <[email protected]>
@flyte-bot
Copy link
Contributor

flyte-bot commented Jan 19, 2025

Code Review Agent Run #27c6ae

Actionable Suggestions - 2
  • flytekit/core/utils.py - 1
  • plugins/flytekit-spark/tests/test_spark_task.py - 1
    • Consider proper typing instead of ignoring · Line 370-378
Review Details
  • Files reviewed - 3 · Commit Range: b21d1e3..1b7c1c9
    • flytekit/core/utils.py
    • plugins/flytekit-spark/flytekitplugins/spark/task.py
    • plugins/flytekit-spark/tests/test_spark_task.py
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

Comment on lines +181 to +184
if task_type != "spark":
# for spark driver/executor, do not use the command and args from task podTemplate
container.command = primary_container.command
container.args = primary_container.args
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider extracting Spark container logic

Consider extracting the Spark-specific container command/args logic into a separate helper function to improve code organization and readability. The current nested if condition makes the code harder to follow.

Code suggestion
Check the AI-generated fix before applying
 -            if task_type != "spark":
 -                # for spark driver/executor, do not use the command and args from task podTemplate
 -                container.command = primary_container.command
 -                container.args = primary_container.args
 +            if _should_copy_container_command_args(task_type):
 +                container.command = primary_container.command
 +                container.args = primary_container.args
 +
 def _should_copy_container_command_args(task_type: str) -> bool:
 +    # for spark driver/executor, do not use the command and args from task podTemplate
 +    return task_type != "spark"

Code Review Run #27c6ae


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Comment on lines +370 to +378
pod_spec=driver_pod_spec_dict_remove_None, # type: ignore
)

target_executor_k8sPod = K8sPod(
metadata=K8sObjectMetadata(
labels={"lKeyA_e": "lValA", "lKeyB_e": "lValB"},
annotations={"aKeyA_e": "aValA", "aKeyB_e": "aValB"},
),
pod_spec=executor_pod_spec_dict_remove_None, # type: ignore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider proper typing instead of ignoring

Consider removing the # type: ignore comments and properly typing the pod_spec parameter to match the expected type.

Code suggestion
Check the AI-generated fix before applying
 -        pod_spec=driver_pod_spec_dict_remove_None,  # type: ignore
 +        pod_spec=V1PodSpec(**driver_pod_spec_dict_remove_None),
 @@ -378,1 +378,1 @@
 -        pod_spec=executor_pod_spec_dict_remove_None,  # type: ignore
 +        pod_spec=V1PodSpec(**executor_pod_spec_dict_remove_None),

Code Review Run #27c6ae


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Signed-off-by: machichima <[email protected]>
Signed-off-by: machichima <[email protected]>
@flyte-bot
Copy link
Contributor

flyte-bot commented Jan 19, 2025

Code Review Agent Run #41dd0b

Actionable Suggestions - 1
  • plugins/flytekit-spark/tests/test_spark_task.py - 1
    • Consider extracting duplicate SparkSession cleanup code · Line 53-59
Review Details
  • Files reviewed - 2 · Commit Range: 1b7c1c9..167a390
    • plugins/flytekit-spark/flytekitplugins/spark/task.py
    • plugins/flytekit-spark/tests/test_spark_task.py
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

Comment on lines +53 to +59
if SparkSession._instantiatedSession:
SparkSession.builder.getOrCreate().stop()
SparkSession._instantiatedSession = None
yield
pyspark.sql.SparkSession.builder.getOrCreate().stop()

if SparkSession._instantiatedSession:
SparkSession.builder.getOrCreate().stop()
SparkSession._instantiatedSession = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider extracting duplicate SparkSession cleanup code

Consider extracting the duplicate code block for stopping SparkSession into a helper function to improve maintainability and reduce duplication.

Code suggestion
Check the AI-generated fix before applying
 -    if SparkSession._instantiatedSession:
 -        SparkSession.builder.getOrCreate().stop()
 -        SparkSession._instantiatedSession = None
 +    def _cleanup_spark_session():
 +        if SparkSession._instantiatedSession:
 +            SparkSession.builder.getOrCreate().stop()
 +            SparkSession._instantiatedSession = None
 +
 +    _cleanup_spark_session()
      yield
 -    if SparkSession._instantiatedSession:
 -        SparkSession.builder.getOrCreate().stop()
 -        SparkSession._instantiatedSession = None
 +    _cleanup_spark_session()

Code Review Run #41dd0b


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: In review
Development

Successfully merging this pull request may close these issues.

3 participants