Skip to content

Commit

Permalink
[Spark] Python DeltaTableBuilder API for Identity Columns (#3404)
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
This PR is part of #1959

In this PR, we extend the addColumn interface in DeltaTableBuilder to
allow for Identity Columns creation.

Resolves #1072

## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

New tests.

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
We update the arguments of addColumn method: 
- Support a new data type for parameter `generatedAlwaysAs`. Users can
specify `generatedAlwaysAs` as `IdentityGenerator` to add an identity
column that is GENERATED ALWAYS.

- Add a new parameter `generatedByDefaultAs`. Users can specify
`generatedByDefaultAs` as `IdentityGenerator` to add an identity column
that is GENERATED BY DEFAULT.

- Users can optionally pass in `start` (default = 1) and `step` (default
= 1) values to construct `IdentityGenerator` object, which specify the
start and step value to generate the identity column.


Interface
```
 def addColumn(
        self,
        colName: str,
        dataType: Union[str, DataType],
        nullable: bool = True,
        generatedAlwaysAs: Optional[Union[str, IdentityGenerator]] = None,
        generatedByDefaultAs: Optional[IdentityGenerator] = None,
        comment: Optional[str] = None,
) -> "DeltaTableBuilder"
```
Example Usage

```
 DeltaTable.create()
    .tableName("tableName")
    .addColumn("id", dataType=LongType(), generatedAlwaysAs=IdentityGenerator())
    .execute()

 DeltaTable.create()
    .tableName("tableName")
    .addColumn("id", dataType=LongType(), generatedAlwaysAs=IdentityGenerator(start=1, step=1))
    .execute()

 DeltaTable.create()
    .tableName("tableName")
    .addColumn("id", dataType=LongType(), generatedByDefaultAs=IdentityGenerator())
    .execute()

 DeltaTable.create()
    .tableName("tableName")
    .addColumn("id", dataType=LongType(), generatedByDefaultAs=IdentityGenerator(start=1, step=1))
    .execute()
```

---------

Co-authored-by: Carmen Kwan <[email protected]>
  • Loading branch information
zhipengmao-db and c27kwan authored Aug 8, 2024
1 parent fd75e83 commit 78cdeb0
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 9 deletions.
59 changes: 52 additions & 7 deletions python/delta/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.
#

from dataclasses import dataclass
from typing import (
TYPE_CHECKING, cast, overload, Any, Dict, Iterable, Optional, Union, NoReturn, List, Tuple
)
Expand Down Expand Up @@ -1060,6 +1061,19 @@ def __getNotMatchedBySourceBuilder(
DeltaTable._condition_to_jcolumn(condition))


@dataclass
class IdentityGenerator:
"""
Identity generator specifications for the identity column in the Delta table.
:param start: the start for the identity column. Default is 1.
:type start: int
:param step: the step for the identity column. Default is 1.
:type step: int
"""
start: int = 1
step: int = 1


class DeltaTableBuilder(object):
"""
Builder to specify how to create / replace a Delta table.
Expand Down Expand Up @@ -1108,6 +1122,10 @@ def _raise_type_error(self, msg: str, objs: Iterable[Any]) -> NoReturn:
errorMsg += " Found %s with type %s" % ((str(obj)), str(type(obj)))
raise TypeError(errorMsg)

def _check_identity_column_spec(self, identityGenerator: IdentityGenerator) -> None:
if identityGenerator.step == 0:
raise ValueError("Column identity generation requires step to be non-zero.")

@since(1.0) # type: ignore[arg-type]
def tableName(self, identifier: str) -> "DeltaTableBuilder":
"""
Expand Down Expand Up @@ -1164,7 +1182,8 @@ def addColumn(
colName: str,
dataType: Union[str, DataType],
nullable: bool = True,
generatedAlwaysAs: Optional[str] = None,
generatedAlwaysAs: Optional[Union[str, IdentityGenerator]] = None,
generatedByDefaultAs: Optional[IdentityGenerator] = None,
comment: Optional[str] = None,
) -> "DeltaTableBuilder":
"""
Expand All @@ -1177,9 +1196,15 @@ def addColumn(
:param nullable: whether column is nullable
:type nullable: bool
:param generatedAlwaysAs: a SQL expression if the column is always generated
as a function of other columns.
as a function of other columns;
an IdentityGenerator object if the column is always
generated using identity generator
See online documentation for details on Generated Columns.
:type generatedAlwaysAs: str
:type generatedAlwaysAs: str or delta.tables.IdentityGenerator
:param generatedByDefaultAs: an IdentityGenerator object to generate identity values
if the user does not provide values for the column
See online documentation for details on Generated Columns.
:type generatedByDefaultAs: delta.tables.IdentityGenerator
:param comment: the column comment
:type comment: str
Expand All @@ -1203,11 +1228,31 @@ def addColumn(
if type(nullable) is not bool:
self._raise_type_error("Column nullable must be bool.", [nullable])
_col_jbuilder = _col_jbuilder.nullable(nullable)

if generatedAlwaysAs is not None and generatedByDefaultAs is not None:
raise ValueError(
"generatedByDefaultAs and generatedAlwaysAs cannot both be set.",
[generatedByDefaultAs, generatedAlwaysAs])
if generatedAlwaysAs is not None:
if type(generatedAlwaysAs) is not str:
self._raise_type_error("Column generation expression must be str.",
[generatedAlwaysAs])
_col_jbuilder = _col_jbuilder.generatedAlwaysAs(generatedAlwaysAs)
if type(generatedAlwaysAs) is str:
_col_jbuilder = _col_jbuilder.generatedAlwaysAs(generatedAlwaysAs)
elif isinstance(generatedAlwaysAs, IdentityGenerator):
self._check_identity_column_spec(generatedAlwaysAs)
_col_jbuilder = _col_jbuilder.generatedAlwaysAsIdentity(
generatedAlwaysAs.start, generatedAlwaysAs.step)
else:
self._raise_type_error(
"Generated always as expression must be str or IdentityGenerator.",
[generatedAlwaysAs])
elif generatedByDefaultAs is not None:
if not isinstance(generatedByDefaultAs, IdentityGenerator):
self._raise_type_error(
"Generated by default expression must be IdentityGenerator.",
[generatedByDefaultAs])
self._check_identity_column_spec(generatedByDefaultAs)
_col_jbuilder = _col_jbuilder.generatedByDefaultAsIdentity(
generatedByDefaultAs.start, generatedByDefaultAs.step)

if comment is not None:
if type(comment) is not str:
self._raise_type_error("Column comment must be str.", [comment])
Expand Down
102 changes: 100 additions & 2 deletions python/delta/tests/test_deltatable.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@
from multiprocessing.pool import ThreadPool
from typing import List, Set, Dict, Optional, Any, Callable, Union, Tuple

from pyspark.errors.exceptions.base import UnsupportedOperationException
from pyspark.sql import DataFrame, Row
from pyspark.sql.functions import col, lit, expr, floor
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, DataType
from pyspark.sql.utils import AnalysisException, ParseException

from delta.tables import DeltaTable, DeltaTableBuilder, DeltaOptimizeBuilder
from delta.tables import DeltaTable, DeltaTableBuilder, DeltaOptimizeBuilder, IdentityGenerator
from delta.testing.utils import DeltaTestCase


Expand Down Expand Up @@ -638,6 +639,16 @@ def __verify_generated_column(self, tableName: str, deltaTable: DeltaTable) -> N
deltaTable.update(expr("col2 = 11"), {"col1": expr("2")})
self.__checkAnswer(deltaTable.toDF(), [(2, 12)], schema=["col1", "col2"])

def __verify_identity_column(self, tableName: str, deltaTable: DeltaTable) -> None:
for i in range(2):
cmd = "INSERT INTO {table} (val) VALUES ({i})".format(table=tableName, i=i)
self.spark.sql(cmd)
cmd = "INSERT INTO {table} (id3, val) VALUES (8, 2)".format(table=tableName)
self.spark.sql(cmd)
self.__checkAnswer(deltaTable.toDF(),
expectedAnswer=[(1, 2, 2, 0), (2, 3, 4, 1), (3, 4, 8, 2)],
schema=["id1", "id2", "id3", "val"])

def __build_delta_table(self, builder: DeltaTableBuilder) -> DeltaTable:
return builder.addColumn("col1", "int", comment="foo", nullable=False) \
.addColumn("col2", IntegerType(), generatedAlwaysAs="col1 + 10") \
Expand Down Expand Up @@ -941,6 +952,41 @@ def test_verify_paritionedBy_compatibility(self) -> None:
partitioningColumns=["col1"],
tblComment="comment")

def test_create_table_with_identity_column(self) -> None:
for ifNotExists in (False, True):
tableName = "testTable{}".format(ifNotExists)
with self.table(tableName):
try:
self.spark.conf.set("spark.databricks.delta.identityColumn.enabled", "true")
builder = (
DeltaTable.createIfNotExists(self.spark)
if ifNotExists
else DeltaTable.create(self.spark))
builder = builder.tableName(tableName)
builder = (
builder.addColumn(
"id1", LongType(), generatedAlwaysAs=IdentityGenerator())
.addColumn(
"id2",
"BIGINT",
generatedAlwaysAs=IdentityGenerator(start=2))
.addColumn(
"id3",
"bigint",
generatedByDefaultAs=IdentityGenerator(start=2, step=2))
.addColumn("val", "bigint", nullable=False))

deltaTable = builder.execute()
self.__verify_table_schema(
tableName,
deltaTable.toDF().schema,
["id1", "id2", "id3", "val"],
[LongType(), LongType(), LongType(), LongType()],
nullables={"id1", "id2", "id3"})
self.__verify_identity_column(tableName, deltaTable)
finally:
self.spark.conf.unset("spark.databricks.delta.identityColumn.enabled")

def test_delta_table_builder_with_bad_args(self) -> None:
builder = DeltaTable.create(self.spark).location(self.tempFile)

Expand All @@ -964,11 +1010,14 @@ def test_delta_table_builder_with_bad_args(self) -> None:
with self.assertRaises(TypeError):
builder.addColumn("a", 1) # type: ignore[arg-type]

# bad column datatype - can't be pared
# bad column datatype - can't be parsed
with self.assertRaises(ParseException):
builder.addColumn("a", "1")
builder.execute()

# reset the builder
builder = DeltaTable.create(self.spark).location(self.tempFile)

# bad comment
with self.assertRaises(TypeError):
builder.addColumn("a", "int", comment=1) # type: ignore[arg-type]
Expand All @@ -977,6 +1026,55 @@ def test_delta_table_builder_with_bad_args(self) -> None:
with self.assertRaises(TypeError):
builder.addColumn("a", "int", generatedAlwaysAs=1) # type: ignore[arg-type]

# bad generatedAlwaysAs - identity column data type must be Long
with self.assertRaises(UnsupportedOperationException):
builder.addColumn(
"a",
"int",
generatedAlwaysAs=IdentityGenerator()
) # type: ignore[arg-type]

# bad generatedAlwaysAs - step can't be 0
with self.assertRaises(ValueError):
builder.addColumn(
"a",
"bigint",
generatedAlwaysAs=IdentityGenerator(step=0)
) # type: ignore[arg-type]

# bad generatedByDefaultAs - can't be set with generatedAlwaysAs
with self.assertRaises(ValueError):
builder.addColumn(
"a",
"bigint",
generatedAlwaysAs="",
generatedByDefaultAs=IdentityGenerator()
) # type: ignore[arg-type]

# bad generatedByDefaultAs - argument type must be IdentityGenerator
with self.assertRaises(TypeError):
builder.addColumn(
"a",
"bigint",
generatedByDefaultAs=""
) # type: ignore[arg-type]

# bad generatedByDefaultAs - identity column data type must be Long
with self.assertRaises(UnsupportedOperationException):
builder.addColumn(
"a",
"int",
generatedByDefaultAs=IdentityGenerator()
) # type: ignore[arg-type]

# bad generatedByDefaultAs - step can't be 0
with self.assertRaises(ValueError):
builder.addColumn(
"a",
"bigint",
generatedByDefaultAs=IdentityGenerator(step=0)
) # type: ignore[arg-type]

# bad nullable
with self.assertRaises(TypeError):
builder.addColumn("a", "int", nullable=1) # type: ignore[arg-type]
Expand Down

0 comments on commit 78cdeb0

Please sign in to comment.