Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Databricks Agent API 2.1 and Console URL #1935

Merged
merged 18 commits into from
Dec 12, 2023

Conversation

Future-Outlier
Copy link
Member

@Future-Outlier Future-Outlier commented Nov 4, 2023

TL;DR

Currently, we only support Databricks API 2.0 and only the new_cluster key in the databricks config.
However, some users want to use Databricks API 2.1 and want to use existing_cluster_id in the databricks config.
Here's the update version.
image
you can find the difference about existing_cluster_id and new_cluster

Type

  • Bug Fix
  • Feature
  • Plugin

Are all requirements met?

  • Code completed
  • Smoke tested
  • Unit tests added
  • Code documentation added
  • Any pending items have an associated Issue

Complete description

Config yaml in dev mode

tasks:
  task-plugins:
    enabled-plugins:
      - agent-service
      - container
      - sidecar
      - K8S-ARRAY
    default-for-task-types:
      spark: agent-service
      custom_task: agent-service
      container: container
      container_array: K8S-ARRAY

plugins:
  agent-service:
    supportedTaskTypes:
      - spark
      - default_task
      - custom_task
    # By default, all the request will be sent to the default agent.
    defaultAgent:
      endpoint: "dns:///localhost:8000"
      insecure: true
      timeouts:
        GetTask: 100s
      defaultTimeout: 100s
    agents:
      custom_agent:
        endpoint: "dns:///localhost:7777"
        insecure: false
        defaultServiceConfig: '{"loadBalancingConfig": [{"round_robin":{}}]}'
        timeouts:
          DoTask: 100s
          GetTask: 100s
        defaultTimeout: 100s
    agentForTaskTypes:
      # It will override the default agent for custom_task, which means propeller will send the request to this agent.
      - custom_task: custom_agent

Example Code

import datetime
import os
import random
from operator import add

from click.testing import CliRunner

import flytekit
from flytekit import Resources, Secret, task, workflow, ImageSpec
from flytekit.clis.sdk_in_container import pyflyte
from flytekitplugins.spark import Databricks

SECRET_GROUP = "token-info"
SECRET_NAME = "token_secret"

image = ImageSpec(base_image="pingsutw/databricks:v4", registry="pingsutw")

@task(
    task_config=Databricks(
        # this configuration is applied to the spark cluster
        spark_conf={
            "spark.driver.memory": "600M",
            "spark.executor.memory": "600M",
            "spark.executor.cores": "1",
            "spark.executor.instances": "1",
            "spark.driver.cores": "1",
        },
        executor_path="/databricks/python3/bin/python",
        applications_path="local:///usr/local/bin/entrypoint.py",
        databricks_conf={
            "run_name": "flytekit databricks plugin example",
            "new_cluster": {
                "spark_version": "13.3.x-scala2.12",
                "node_type_id": "m6i.large",  # TODO: test m6i.large, i3.xlarge
                "num_workers": 3,
                "aws_attributes": {
                    "availability": "ON_DEMAND",
                    "instance_profile_arn": "arn:aws:iam::479331373192:instance-profile/databricks-agent",
                    "ebs_volume_type": "GENERAL_PURPOSE_SSD",
                    "ebs_volume_count": 1,
                    "ebs_volume_size": 100,
                },
            },
            "timeout_seconds": 3600,
            "max_retries": 1,
        },
        databricks_instance="dbc-2889d011-7c0b.cloud.databricks.com",
    ),
    limits=Resources(mem="2000M"),
    # container_image=image,
    container_image="pingsutw/databricks:v7"
)
def hello_spark(partitions: int) -> float:
    print("Starting Spark with Partitions: {}".format(partitions))

    n = 100000 * partitions
    sess = flytekit.current_context().spark_session
    count = (
        sess.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
    )
    pi_val = 4.0 * count / n
    print("Pi val is :{}".format(pi_val))
    return pi_val


def f(_):
    x = random.random() * 2 - 1
    y = random.random() * 2 - 1
    return 1 if x**2 + y**2 <= 1 else 0


@task(cache_version="1")
def print_every_time(value_to_print: float, date_triggered: datetime.datetime) -> int:
    print("My printed value: {} @ {}".format(value_to_print, date_triggered))
    return 1


@workflow
def wf(
    triggered_date: datetime.datetime = datetime.datetime.now(),
) -> float:
    """
    Using the workflow is still as any other workflow. As image is a property of the task, the workflow does not care
    about how the image is configured.
    """
    pi = hello_spark(partitions=50)
    print_every_time(value_to_print=pi, date_triggered=triggered_date)
    return pi


if __name__ == "__main__":
    print(f"Running {__file__} main...")
    print(
        f"Running To run a Spark job on Databricks platform(triggered_date=datetime.datetime.now()){my_databricks_job(triggered_date=datetime.datetime.now())}"
    )

Test it

pyflyte register spark_example.py --version API-V2

Screenshots

Create a job
image

{
  "run_id": 273988794563328
}

Get the job (PENDING State)
image

{
  "job_id": 135287260562039,
  "run_id": 273988794563328,
  "creator_user_name": "[email protected]",
  "number_in_job": 273988794563328,
  "state": {
    "life_cycle_state": "PENDING",
    "state_message": "Waiting for cluster",
    "user_cancelled_or_timedout": false
  },
  "start_time": 1699085986939,
  "setup_duration": 0,
  "execution_duration": 0,
  "cleanup_duration": 0,
  "end_time": 0,
  "run_name": "flytekit databricks plugin example",
  "run_page_url": "https://359440087090042.2.gcp.databricks.com/?o=359440087090042#job/135287260562039/run/273988794563328",
  "run_type": "SUBMIT_RUN",
  "tasks": [
    {
      "run_id": 273988794563328,
      "task_key": "flytekit_databricks_plugin_example",
      "spark_python_task": {
        "python_file": "dbfs:/entrypoint.py",
        "parameters": [
          "pyflyte-fast-execute",
          "--additional-distribution",
          "s3://my-s3-bucket/flytesnacks/development/3JXD7OHFEARICCE4MLFDLE5DZU======/fastcb52e4c3f1d4e3c59089536098505a95.tar.gz",
          "--dest-dir",
          "/root",
          "--",
          "pyflyte-execute",
          "--inputs",
          "s3://my-s3-bucket/metadata/propeller/flytesnacks-development-avqvhx759fw9kqt6mh6v/n0/data/inputs.pb",
          "--output-prefix",
          "s3://my-s3-bucket/metadata/propeller/flytesnacks-development-avqvhx759fw9kqt6mh6v/n0/data/0",
          "--raw-output-data-prefix",
          "s3://my-s3-bucket/data/3d/avqvhx759fw9kqt6mh6v-n0-0",
          "--checkpoint-path",
          "s3://my-s3-bucket/data/3d/avqvhx759fw9kqt6mh6v-n0-0/_flytecheckpoints",
          "--prev-checkpoint",
          "\"\"",
          "--resolver",
          "flytekit.core.python_auto_container.default_task_resolver",
          "--",
          "task-module",
          "spark_local_example",
          "task-name",
          "hello_spark"
        ]
      },
      "new_cluster": {
        "spark_version": "12.2.x-scala2.12",
        "spark_conf": {
          "spark.driver.memory": "1000M",
          "spark.executor.instances": "2",
          "spark.driver.cores": "1",
          "spark.executor.memory": "1000M",
          "spark.executor.cores": "1"
        },
        "gcp_attributes": {
          "use_preemptible_executors": false
        },
        "node_type_id": "n2-highmem-4",
        "enable_elastic_disk": false,
        "num_workers": 1
      },
      "state": {
        "life_cycle_state": "PENDING",
        "state_message": "Waiting for cluster",
        "user_cancelled_or_timedout": false
      },
      "run_page_url": "https://359440087090042.2.gcp.databricks.com/?o=359440087090042#job/135287260562039/run/273988794563328",
      "start_time": 1699085986939,
      "setup_duration": 0,
      "execution_duration": 0,
      "cleanup_duration": 0,
      "end_time": 0,
      "cluster_instance": {
        "cluster_id": "1104-081950-xrvluew0"
      },
      "attempt_number": 0
    }
  ],
  "format": "MULTI_TASK"
}

Delete the job
image

{ }

Get the job (TERMINATED State)
image

{
  "job_id": 135287260562039,
  "run_id": 273988794563328,
  "creator_user_name": "[email protected]",
  "number_in_job": 273988794563328,
  "state": {
    "life_cycle_state": "TERMINATED",
    "result_state": "CANCELED",
    "state_message": "Run cancelled.",
    "user_cancelled_or_timedout": true
  },
  "start_time": 1699085986939,
  "setup_duration": 0,
  "execution_duration": 0,
  "cleanup_duration": 23000,
  "end_time": 1699086010369,
  "run_name": "flytekit databricks plugin example",
  "run_page_url": "https://359440087090042.2.gcp.databricks.com/?o=359440087090042#job/135287260562039/run/273988794563328",
  "run_type": "SUBMIT_RUN",
  "tasks": [
    {
      "run_id": 273988794563328,
      "task_key": "flytekit_databricks_plugin_example",
      "spark_python_task": {
        "python_file": "dbfs:/entrypoint.py",
        "parameters": [
          "pyflyte-fast-execute",
          "--additional-distribution",
          "s3://my-s3-bucket/flytesnacks/development/3JXD7OHFEARICCE4MLFDLE5DZU======/fastcb52e4c3f1d4e3c59089536098505a95.tar.gz",
          "--dest-dir",
          "/root",
          "--",
          "pyflyte-execute",
          "--inputs",
          "s3://my-s3-bucket/metadata/propeller/flytesnacks-development-avqvhx759fw9kqt6mh6v/n0/data/inputs.pb",
          "--output-prefix",
          "s3://my-s3-bucket/metadata/propeller/flytesnacks-development-avqvhx759fw9kqt6mh6v/n0/data/0",
          "--raw-output-data-prefix",
          "s3://my-s3-bucket/data/3d/avqvhx759fw9kqt6mh6v-n0-0",
          "--checkpoint-path",
          "s3://my-s3-bucket/data/3d/avqvhx759fw9kqt6mh6v-n0-0/_flytecheckpoints",
          "--prev-checkpoint",
          "\"\"",
          "--resolver",
          "flytekit.core.python_auto_container.default_task_resolver",
          "--",
          "task-module",
          "spark_local_example",
          "task-name",
          "hello_spark"
        ]
      },
      "new_cluster": {
        "spark_version": "12.2.x-scala2.12",
        "spark_conf": {
          "spark.driver.memory": "1000M",
          "spark.executor.instances": "2",
          "spark.driver.cores": "1",
          "spark.executor.memory": "1000M",
          "spark.executor.cores": "1"
        },
        "gcp_attributes": {
          "use_preemptible_executors": false
        },
        "node_type_id": "n2-highmem-4",
        "enable_elastic_disk": false,
        "num_workers": 1
      },
      "state": {
        "life_cycle_state": "TERMINATED",
        "result_state": "CANCELED",
        "state_message": "Run cancelled.",
        "user_cancelled_or_timedout": true
      },
      "run_page_url": "https://359440087090042.2.gcp.databricks.com/?o=359440087090042#job/135287260562039/run/273988794563328",
      "start_time": 1699085986939,
      "setup_duration": 0,
      "execution_duration": 0,
      "cleanup_duration": 23000,
      "end_time": 1699086010369,
      "cluster_instance": {
        "cluster_id": "1104-081950-xrvluew0"
      },
      "attempt_number": 0
    }
  ],
  "format": "MULTI_TASK"
}

Test TaskLog proto for Databricks Console.
image

Tracking Issue

flyteorg/flyte#3936
flyteorg/flyte#4362

Related PRs

#1797
#2021

Signed-off-by: Future Outlier <[email protected]>
Future Outlier added 2 commits November 4, 2023 15:25
Signed-off-by: Future Outlier <[email protected]>
@Future-Outlier Future-Outlier marked this pull request as ready for review November 4, 2023 08:37
@Future-Outlier Future-Outlier changed the title Support Databricks Agent Api 2.1 version Support Databricks Agent Api 2.1 version and Support existing_cluster_id and new_cluster to create a Job Nov 4, 2023
@Future-Outlier Future-Outlier changed the title Support Databricks Agent Api 2.1 version and Support existing_cluster_id and new_cluster to create a Job Support Databricks Agent Api 2.1 version and Support existing_cluster_id and new_cluster options to create a Job Nov 4, 2023
Copy link

codecov bot commented Nov 5, 2023

Codecov Report

Attention: 1 lines in your changes are missing coverage. Please review.

Comparison is base (f0d4f13) 85.84% compared to head (0f745d8) 85.86%.

Files Patch % Lines
flytekit/models/core/execution.py 80.00% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #1935      +/-   ##
==========================================
+ Coverage   85.84%   85.86%   +0.01%     
==========================================
  Files         306      306              
  Lines       22897    22907      +10     
  Branches     3470     3471       +1     
==========================================
+ Hits        19657    19668      +11     
+ Misses       2645     2644       -1     
  Partials      595      595              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Future Outlier added 11 commits November 5, 2023 14:23
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
@Future-Outlier Future-Outlier changed the title Support Databricks Agent Api 2.1 version and Support existing_cluster_id and new_cluster options to create a Job Support Databricks Agent Api 2.1 Dec 5, 2023
@Future-Outlier Future-Outlier changed the title Support Databricks Agent Api 2.1 Support Databricks Agent API 2.1 Version Dec 5, 2023
@Future-Outlier Future-Outlier marked this pull request as draft December 5, 2023 06:47
…nto databricks-update-api-version

Signed-off-by: Future Outlier <[email protected]>
@Future-Outlier Future-Outlier changed the title Support Databricks Agent API 2.1 Version Support Databricks Agent API 2.1 and TaskLogs Dec 10, 2023
@Future-Outlier Future-Outlier changed the title Support Databricks Agent API 2.1 and TaskLogs Support Databricks Agent API 2.1 and Console URL Dec 10, 2023
Signed-off-by: Future Outlier <[email protected]>
@Future-Outlier Future-Outlier marked this pull request as ready for review December 11, 2023 08:55
@Future-Outlier Future-Outlier marked this pull request as draft December 11, 2023 08:56
@Future-Outlier Future-Outlier marked this pull request as ready for review December 11, 2023 08:59
@pingsutw pingsutw merged commit 935778f into flyteorg:master Dec 12, 2023
75 checks passed
RRap0so pushed a commit to RRap0so/flytekit that referenced this pull request Dec 15, 2023
Signed-off-by: Future Outlier <[email protected]>
Co-authored-by: Future Outlier <[email protected]>
Signed-off-by: Rafael Raposo <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants