Skip to content

Commit

Permalink
Updates for Databricks User Agent Strings (#725)
Browse files Browse the repository at this point in the history
* Updates for Databricks User Agent Strings

Signed-off-by: GBBBAS <[email protected]>

* Update for pyodbc

Signed-off-by: GBBBAS <[email protected]>

---------

Signed-off-by: GBBBAS <[email protected]>
  • Loading branch information
GBBBAS authored Apr 19, 2024
1 parent a546ea9 commit 92c2446
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 6 deletions.
10 changes: 9 additions & 1 deletion docs/sdk/pipelines/deploy/databricks.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,12 @@ Once a job is deployed to Databricks, it can be executed immediately using the f
```python
# Run/Launch the Job in Databricks
launch_result = databricks_job.launch()
```
```

## Stop

A job that is running and is deployed to Databricks, can be cancelled using the following code.

```python
# Run/Launch the Job in Databricks
stop_result = databricks_job.stop()
3 changes: 2 additions & 1 deletion docs/sdk/queries/spark/spark-connect.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ Please ensure that you have followed the [instructions](https://spark.apache.org

## Example

This
Below is an example of connecting to Spark using Spark Connect.

```python
from rtdip_sdk.connectors import SparkConnection

Expand Down
3 changes: 2 additions & 1 deletion src/sdk/python/rtdip_sdk/connectors/odbc/db_sql_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class DatabricksSQLConnection(ConnectionInterface):
"""
The Databricks SQL Connector for Python is a Python library that allows you to use Python code to run SQL commands on Databricks clusters and Databricks SQL warehouses.
The connection class represents a connection to a database and uses the Databricks SQL Connector API's for Python to intereact with cluster/jobs.
The connection class represents a connection to a database and uses the Databricks SQL Connector API's for Python to interact with cluster/jobs.
To find details for SQL warehouses server_hostname and http_path location to the SQL Warehouse tab in the documentation.
Args:
Expand All @@ -46,6 +46,7 @@ def _connect(self):
server_hostname=self.server_hostname,
http_path=self.http_path,
access_token=self.access_token,
_user_agent_entry="RTDIP",
)
except Exception as e:
logging.exception("error while connecting to the endpoint")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def _connect(self):
+ "SparkServerType=3;"
+ "AuthMech=11;"
+ "UID=token;"
+ "UserAgentEntry=RTDIP;"
+
#'PWD=' + access_token+ ";" +
"Auth_AccessToken=" + self.access_token + ";"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def _connect(self):
DisableLimitZero=1,
EnableAsyncExec=1,
RowsFetchedPerBlock=os.getenv("RTDIP_ODBC_ROW_BLOCK_SIZE", 500000),
UserAgentEntry="RTDIP",
turbodbc_options=options,
)

Expand Down
22 changes: 19 additions & 3 deletions src/sdk/python/rtdip_sdk/pipelines/deploy/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from io import BytesIO

from databricks.sdk import WorkspaceClient
from databricks.sdk.config import Config
from databricks.sdk.service.jobs import CreateJob, JobSettings
from databricks.sdk.service.compute import Library, PythonPyPiLibrary, MavenLibrary
from .interfaces import DeployInterface
Expand Down Expand Up @@ -125,7 +126,12 @@ def deploy(self) -> Union[bool, ValueError]:
"""
# Add libraries to Databricks Job
workspace_client = WorkspaceClient(
host=self.host, token=self.token, auth_type="pat"
host=self.host,
token=self.token,
auth_type="pat",
config=Config(
product="RTDIP",
),
)
for task in self.databricks_job.tasks:
if task.notebook_task is None and task.spark_python_task is None:
Expand Down Expand Up @@ -263,7 +269,12 @@ def launch(self):
Launches an RTDIP Pipeline Job in Databricks Workflows. This will perform the equivalent of a `Run Now` in Databricks Workflows
"""
workspace_client = WorkspaceClient(
host=self.host, token=self.token, auth_type="pat"
host=self.host,
token=self.token,
auth_type="pat",
config=Config(
product="RTDIP",
),
)
job_found = False
for existing_job in workspace_client.jobs.list(name=self.databricks_job.name):
Expand All @@ -281,7 +292,12 @@ def stop(self):
Cancels an RTDIP Pipeline Job in Databricks Workflows. This will perform the equivalent of a `Cancel All Runs` in Databricks Workflows
"""
workspace_client = WorkspaceClient(
host=self.host, token=self.token, auth_type="pat"
host=self.host,
token=self.token,
auth_type="pat",
config=Config(
product="RTDIP",
),
)
job_found = False
for existing_job in workspace_client.jobs.list(name=self.databricks_job.name):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ def __init__(self):
jobs = DummyJob()


class DummyConfig:
def __init__(self, product=None):
return None


default_version = "0.0.0rc0"
default_list_package = "databricks.sdk.service.jobs.JobsAPI.list"

Expand Down Expand Up @@ -161,6 +166,10 @@ def test_pipeline_job_deploy(mocker: MockerFixture):
)
mocker.patch(default_list_package, return_value=[])
mocker.patch("databricks.sdk.service.jobs.JobsAPI.create", return_value=None)
mocker.patch(
"src.sdk.python.rtdip_sdk.pipelines.deploy.databricks.Config",
return_value=DummyConfig(),
)
deploy_result = databricks_job.deploy()
assert deploy_result

Expand Down

0 comments on commit 92c2446

Please sign in to comment.