Skip to content

Commit

Permalink
[Spark] Add Delta Connect Update/Delete Server and Scala Client (#3545)
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
Add support for `update` and `delete` for Delta Connect Server and Scala
Client.

## How was this patch tested?
Added UTs.
<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

## Does this PR introduce _any_ user-facing changes?
No.
<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->

---------

Co-authored-by: Dhruv Arya <[email protected]>
Co-authored-by: Qianru Lao <[email protected]>
Co-authored-by: Zihao Xu <[email protected]>
Co-authored-by: Lukas Rupprecht <[email protected]>
Co-authored-by: Venki Korukanti <[email protected]>
Co-authored-by: jackierwzhang <[email protected]>
  • Loading branch information
7 people authored Aug 19, 2024
1 parent d2a6fa0 commit c67180b
Show file tree
Hide file tree
Showing 8 changed files with 685 additions and 19 deletions.
38 changes: 23 additions & 15 deletions python/delta/connect/proto/relations_pb2.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,38 @@


from delta.connect.proto import base_pb2 as delta_dot_connect_dot_base__pb2
from spark.connect import expressions_pb2 as spark_dot_connect_dot_expressions__pb2
from spark.connect import relations_pb2 as spark_dot_connect_dot_relations__pb2
from spark.connect import types_pb2 as spark_dot_connect_dot_types__pb2


DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
b'\n\x1d\x64\x65lta/connect/relations.proto\x12\rdelta.connect\x1a\x18\x64\x65lta/connect/base.proto\x1a\x19spark/connect/types.proto"\xb6\x03\n\rDeltaRelation\x12)\n\x04scan\x18\x01 \x01(\x0b\x32\x13.delta.connect.ScanH\x00R\x04scan\x12K\n\x10\x64\x65scribe_history\x18\x02 \x01(\x0b\x32\x1e.delta.connect.DescribeHistoryH\x00R\x0f\x64\x65scribeHistory\x12H\n\x0f\x64\x65scribe_detail\x18\x03 \x01(\x0b\x32\x1d.delta.connect.DescribeDetailH\x00R\x0e\x64\x65scribeDetail\x12I\n\x10\x63onvert_to_delta\x18\x04 \x01(\x0b\x32\x1d.delta.connect.ConvertToDeltaH\x00R\x0e\x63onvertToDelta\x12\x42\n\rrestore_table\x18\x05 \x01(\x0b\x32\x1b.delta.connect.RestoreTableH\x00R\x0crestoreTable\x12\x43\n\x0eis_delta_table\x18\x06 \x01(\x0b\x32\x1b.delta.connect.IsDeltaTableH\x00R\x0cisDeltaTableB\x0f\n\rrelation_type"7\n\x04Scan\x12/\n\x05table\x18\x01 \x01(\x0b\x32\x19.delta.connect.DeltaTableR\x05table"B\n\x0f\x44\x65scribeHistory\x12/\n\x05table\x18\x01 \x01(\x0b\x32\x19.delta.connect.DeltaTableR\x05table"A\n\x0e\x44\x65scribeDetail\x12/\n\x05table\x18\x01 \x01(\x0b\x32\x19.delta.connect.DeltaTableR\x05table"\xd1\x01\n\x0e\x43onvertToDelta\x12\x1e\n\nidentifier\x18\x01 \x01(\tR\nidentifier\x12\x38\n\x17partition_schema_string\x18\x02 \x01(\tH\x00R\x15partitionSchemaString\x12Q\n\x17partition_schema_struct\x18\x03 \x01(\x0b\x32\x17.spark.connect.DataTypeH\x00R\x15partitionSchemaStructB\x12\n\x10partition_schema"\x93\x01\n\x0cRestoreTable\x12/\n\x05table\x18\x01 \x01(\x0b\x32\x19.delta.connect.DeltaTableR\x05table\x12\x1a\n\x07version\x18\x02 \x01(\x03H\x00R\x07version\x12\x1e\n\ttimestamp\x18\x03 \x01(\tH\x00R\ttimestampB\x16\n\x14version_or_timestamp""\n\x0cIsDeltaTable\x12\x12\n\x04path\x18\x01 \x01(\tR\x04pathB\x1a\n\x16io.delta.connect.protoP\x01\x62\x06proto3'
b'\n\x1d\x64\x65lta/connect/relations.proto\x12\rdelta.connect\x1a\x18\x64\x65lta/connect/base.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\xc5\x04\n\rDeltaRelation\x12)\n\x04scan\x18\x01 \x01(\x0b\x32\x13.delta.connect.ScanH\x00R\x04scan\x12K\n\x10\x64\x65scribe_history\x18\x02 \x01(\x0b\x32\x1e.delta.connect.DescribeHistoryH\x00R\x0f\x64\x65scribeHistory\x12H\n\x0f\x64\x65scribe_detail\x18\x03 \x01(\x0b\x32\x1d.delta.connect.DescribeDetailH\x00R\x0e\x64\x65scribeDetail\x12I\n\x10\x63onvert_to_delta\x18\x04 \x01(\x0b\x32\x1d.delta.connect.ConvertToDeltaH\x00R\x0e\x63onvertToDelta\x12\x42\n\rrestore_table\x18\x05 \x01(\x0b\x32\x1b.delta.connect.RestoreTableH\x00R\x0crestoreTable\x12\x43\n\x0eis_delta_table\x18\x06 \x01(\x0b\x32\x1b.delta.connect.IsDeltaTableH\x00R\x0cisDeltaTable\x12L\n\x11\x64\x65lete_from_table\x18\x07 \x01(\x0b\x32\x1e.delta.connect.DeleteFromTableH\x00R\x0f\x64\x65leteFromTable\x12?\n\x0cupdate_table\x18\x08 \x01(\x0b\x32\x1a.delta.connect.UpdateTableH\x00R\x0bupdateTableB\x0f\n\rrelation_type"7\n\x04Scan\x12/\n\x05table\x18\x01 \x01(\x0b\x32\x19.delta.connect.DeltaTableR\x05table"B\n\x0f\x44\x65scribeHistory\x12/\n\x05table\x18\x01 \x01(\x0b\x32\x19.delta.connect.DeltaTableR\x05table"A\n\x0e\x44\x65scribeDetail\x12/\n\x05table\x18\x01 \x01(\x0b\x32\x19.delta.connect.DeltaTableR\x05table"\xd1\x01\n\x0e\x43onvertToDelta\x12\x1e\n\nidentifier\x18\x01 \x01(\tR\nidentifier\x12\x38\n\x17partition_schema_string\x18\x02 \x01(\tH\x00R\x15partitionSchemaString\x12Q\n\x17partition_schema_struct\x18\x03 \x01(\x0b\x32\x17.spark.connect.DataTypeH\x00R\x15partitionSchemaStructB\x12\n\x10partition_schema"\x93\x01\n\x0cRestoreTable\x12/\n\x05table\x18\x01 \x01(\x0b\x32\x19.delta.connect.DeltaTableR\x05table\x12\x1a\n\x07version\x18\x02 \x01(\x03H\x00R\x07version\x12\x1e\n\ttimestamp\x18\x03 \x01(\tH\x00R\ttimestampB\x16\n\x14version_or_timestamp""\n\x0cIsDeltaTable\x12\x12\n\x04path\x18\x01 \x01(\tR\x04path"{\n\x0f\x44\x65leteFromTable\x12/\n\x06target\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x06target\x12\x37\n\tcondition\x18\x02 \x01(\x0b\x32\x19.spark.connect.ExpressionR\tcondition"\xb4\x01\n\x0bUpdateTable\x12/\n\x06target\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x06target\x12\x37\n\tcondition\x18\x02 \x01(\x0b\x32\x19.spark.connect.ExpressionR\tcondition\x12;\n\x0b\x61ssignments\x18\x03 \x03(\x0b\x32\x19.delta.connect.AssignmentR\x0b\x61ssignments"n\n\nAssignment\x12/\n\x05\x66ield\x18\x01 \x01(\x0b\x32\x19.spark.connect.ExpressionR\x05\x66ield\x12/\n\x05value\x18\x02 \x01(\x0b\x32\x19.spark.connect.ExpressionR\x05valueB\x1a\n\x16io.delta.connect.protoP\x01\x62\x06proto3'
)

_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, "delta.connect.proto.relations_pb2", globals())
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
DESCRIPTOR._serialized_options = b"\n\026io.delta.connect.protoP\001"
_DELTARELATION._serialized_start = 102
_DELTARELATION._serialized_end = 540
_SCAN._serialized_start = 542
_SCAN._serialized_end = 597
_DESCRIBEHISTORY._serialized_start = 599
_DESCRIBEHISTORY._serialized_end = 665
_DESCRIBEDETAIL._serialized_start = 667
_DESCRIBEDETAIL._serialized_end = 732
_CONVERTTODELTA._serialized_start = 735
_CONVERTTODELTA._serialized_end = 944
_RESTORETABLE._serialized_start = 947
_RESTORETABLE._serialized_end = 1094
_ISDELTATABLE._serialized_start = 1096
_ISDELTATABLE._serialized_end = 1130
_DELTARELATION._serialized_start = 166
_DELTARELATION._serialized_end = 747
_SCAN._serialized_start = 749
_SCAN._serialized_end = 804
_DESCRIBEHISTORY._serialized_start = 806
_DESCRIBEHISTORY._serialized_end = 872
_DESCRIBEDETAIL._serialized_start = 874
_DESCRIBEDETAIL._serialized_end = 939
_CONVERTTODELTA._serialized_start = 942
_CONVERTTODELTA._serialized_end = 1151
_RESTORETABLE._serialized_start = 1154
_RESTORETABLE._serialized_end = 1301
_ISDELTATABLE._serialized_start = 1303
_ISDELTATABLE._serialized_end = 1337
_DELETEFROMTABLE._serialized_start = 1339
_DELETEFROMTABLE._serialized_end = 1462
_UPDATETABLE._serialized_start = 1465
_UPDATETABLE._serialized_end = 1645
_ASSIGNMENT._serialized_start = 1647
_ASSIGNMENT._serialized_end = 1757
# @@protoc_insertion_point(module_scope)
129 changes: 129 additions & 0 deletions python/delta/connect/proto/relations_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
import builtins
import collections.abc
import delta.connect.proto.proto.base_pb2
import google.protobuf.descriptor
import google.protobuf.internal.containers
import google.protobuf.message
import pyspark.sql.connect.proto.expressions_pb2
import pyspark.sql.connect.proto.relations_pb2
import pyspark.sql.connect.proto.types_pb2
import sys

Expand All @@ -56,6 +60,8 @@ class DeltaRelation(google.protobuf.message.Message):
CONVERT_TO_DELTA_FIELD_NUMBER: builtins.int
RESTORE_TABLE_FIELD_NUMBER: builtins.int
IS_DELTA_TABLE_FIELD_NUMBER: builtins.int
DELETE_FROM_TABLE_FIELD_NUMBER: builtins.int
UPDATE_TABLE_FIELD_NUMBER: builtins.int
@property
def scan(self) -> global___Scan: ...
@property
Expand All @@ -68,6 +74,10 @@ class DeltaRelation(google.protobuf.message.Message):
def restore_table(self) -> global___RestoreTable: ...
@property
def is_delta_table(self) -> global___IsDeltaTable: ...
@property
def delete_from_table(self) -> global___DeleteFromTable: ...
@property
def update_table(self) -> global___UpdateTable: ...
def __init__(
self,
*,
Expand All @@ -77,12 +87,16 @@ class DeltaRelation(google.protobuf.message.Message):
convert_to_delta: global___ConvertToDelta | None = ...,
restore_table: global___RestoreTable | None = ...,
is_delta_table: global___IsDeltaTable | None = ...,
delete_from_table: global___DeleteFromTable | None = ...,
update_table: global___UpdateTable | None = ...,
) -> None: ...
def HasField(
self,
field_name: typing_extensions.Literal[
"convert_to_delta",
b"convert_to_delta",
"delete_from_table",
b"delete_from_table",
"describe_detail",
b"describe_detail",
"describe_history",
Expand All @@ -95,13 +109,17 @@ class DeltaRelation(google.protobuf.message.Message):
b"restore_table",
"scan",
b"scan",
"update_table",
b"update_table",
],
) -> builtins.bool: ...
def ClearField(
self,
field_name: typing_extensions.Literal[
"convert_to_delta",
b"convert_to_delta",
"delete_from_table",
b"delete_from_table",
"describe_detail",
b"describe_detail",
"describe_history",
Expand All @@ -114,6 +132,8 @@ class DeltaRelation(google.protobuf.message.Message):
b"restore_table",
"scan",
b"scan",
"update_table",
b"update_table",
],
) -> None: ...
def WhichOneof(
Expand All @@ -126,6 +146,8 @@ class DeltaRelation(google.protobuf.message.Message):
"convert_to_delta",
"restore_table",
"is_delta_table",
"delete_from_table",
"update_table",
]
| None
): ...
Expand Down Expand Up @@ -331,3 +353,110 @@ class IsDeltaTable(google.protobuf.message.Message):
def ClearField(self, field_name: typing_extensions.Literal["path", b"path"]) -> None: ...

global___IsDeltaTable = IsDeltaTable

class DeleteFromTable(google.protobuf.message.Message):
"""Command that deletes data from the target table that matches the given condition.
Needs to be a Relation, as it returns a row containing the execution metrics.
"""

DESCRIPTOR: google.protobuf.descriptor.Descriptor

TARGET_FIELD_NUMBER: builtins.int
CONDITION_FIELD_NUMBER: builtins.int
@property
def target(self) -> pyspark.sql.connect.proto.relations_pb2.Relation:
"""(Required) Target table to delete data from. Must either be a DeltaRelation containing a Scan
or a SubqueryAlias with a DeltaRelation containing a Scan as its input.
"""
@property
def condition(self) -> pyspark.sql.connect.proto.expressions_pb2.Expression:
"""(Optional) Expression returning a boolean."""
def __init__(
self,
*,
target: pyspark.sql.connect.proto.relations_pb2.Relation | None = ...,
condition: pyspark.sql.connect.proto.expressions_pb2.Expression | None = ...,
) -> None: ...
def HasField(
self, field_name: typing_extensions.Literal["condition", b"condition", "target", b"target"]
) -> builtins.bool: ...
def ClearField(
self, field_name: typing_extensions.Literal["condition", b"condition", "target", b"target"]
) -> None: ...

global___DeleteFromTable = DeleteFromTable

class UpdateTable(google.protobuf.message.Message):
"""Command that updates data in the target table using the given assignments for rows that matches
the given condition.
Needs to be a Relation, as it returns a row containing the execution metrics.
"""

DESCRIPTOR: google.protobuf.descriptor.Descriptor

TARGET_FIELD_NUMBER: builtins.int
CONDITION_FIELD_NUMBER: builtins.int
ASSIGNMENTS_FIELD_NUMBER: builtins.int
@property
def target(self) -> pyspark.sql.connect.proto.relations_pb2.Relation:
"""(Required) Target table to delete data from. Must either be a DeltaRelation containing a Scan
or a SubqueryAlias with a DeltaRelation containing a Scan as its input.
"""
@property
def condition(self) -> pyspark.sql.connect.proto.expressions_pb2.Expression:
"""(Optional) Condition that determines which rows must be updated.
Must be an expression returning a boolean.
"""
@property
def assignments(
self,
) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___Assignment]:
"""(Optional) Set of assignments to apply to the rows matching the condition."""
def __init__(
self,
*,
target: pyspark.sql.connect.proto.relations_pb2.Relation | None = ...,
condition: pyspark.sql.connect.proto.expressions_pb2.Expression | None = ...,
assignments: collections.abc.Iterable[global___Assignment] | None = ...,
) -> None: ...
def HasField(
self, field_name: typing_extensions.Literal["condition", b"condition", "target", b"target"]
) -> builtins.bool: ...
def ClearField(
self,
field_name: typing_extensions.Literal[
"assignments", b"assignments", "condition", b"condition", "target", b"target"
],
) -> None: ...

global___UpdateTable = UpdateTable

class Assignment(google.protobuf.message.Message):
"""Represents an assignment of a value to a field."""

DESCRIPTOR: google.protobuf.descriptor.Descriptor

FIELD_FIELD_NUMBER: builtins.int
VALUE_FIELD_NUMBER: builtins.int
@property
def field(self) -> pyspark.sql.connect.proto.expressions_pb2.Expression:
"""(Required) Expression identifying the (struct) field that is assigned a new value."""
@property
def value(self) -> pyspark.sql.connect.proto.expressions_pb2.Expression:
"""(Required) Expression that produces the value to assign to the field."""
def __init__(
self,
*,
field: pyspark.sql.connect.proto.expressions_pb2.Expression | None = ...,
value: pyspark.sql.connect.proto.expressions_pb2.Expression | None = ...,
) -> None: ...
def HasField(
self, field_name: typing_extensions.Literal["field", b"field", "value", b"value"]
) -> builtins.bool: ...
def ClearField(
self, field_name: typing_extensions.Literal["field", b"field", "value", b"value"]
) -> None: ...

global___Assignment = Assignment
Loading

0 comments on commit c67180b

Please sign in to comment.