Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use pyarrow-unity to convert pyarrow schemas to UC compatible ones #6

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 3 additions & 107 deletions dbt/adapters/duckdb/plugins/unity.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from __future__ import annotations

import json
from enum import Enum
from typing import Any
from typing import Dict
from typing import Literal

import pyarrow as pa
from pyarrow_unity.model import model_unity_schema
from pyarrow_unity.model import UCSupportedFormatLiteral
from unitycatalog import Unitycatalog
from unitycatalog.types import GenerateTemporaryTableCredentialResponse
from unitycatalog.types.table_create_params import Column
Expand Down Expand Up @@ -76,110 +76,6 @@ def uc_get_storage_credentials(
return {}


UCSupportedTypeLiteral = Literal[
"BOOLEAN",
"BYTE",
"SHORT",
"INT",
"LONG",
"FLOAT",
"DOUBLE",
"DATE",
"TIMESTAMP",
"TIMESTAMP_NTZ",
"STRING",
"BINARY",
"DECIMAL",
"INTERVAL",
"ARRAY",
"STRUCT",
"MAP",
"CHAR",
"NULL",
"USER_DEFINED_TYPE",
"TABLE_TYPE",
]

UCSupportedFormatLiteral = Literal["DELTA", "CSV", "JSON", "AVRO", "PARQUET", "ORC", "TEXT"]


def pyarrow_type_to_supported_uc_json_type(data_type: pa.DataType) -> UCSupportedTypeLiteral:
"""Convert a PyArrow data type to a supported Unitycatalog JSON type."""
if pa.types.is_boolean(data_type):
return "BOOLEAN"
elif pa.types.is_int8(data_type):
return "BYTE"
elif pa.types.is_int16(data_type):
return "SHORT"
elif pa.types.is_int32(data_type):
return "INT"
elif pa.types.is_int64(data_type):
return "LONG"
elif pa.types.is_float32(data_type):
return "FLOAT"
elif pa.types.is_float64(data_type):
return "DOUBLE"
elif pa.types.is_date32(data_type):
return "DATE"
elif pa.types.is_timestamp(data_type):
return "TIMESTAMP"
elif pa.types.is_string(data_type):
return "STRING"
elif pa.types.is_binary(data_type):
return "BINARY"
elif pa.types.is_decimal(data_type):
return "DECIMAL"
elif pa.types.is_duration(data_type):
return "INTERVAL"
elif pa.types.is_list(data_type):
return "ARRAY"
elif pa.types.is_struct(data_type):
return "STRUCT"
elif pa.types.is_map(data_type):
return "MAP"
elif pa.types.is_null(data_type):
return "NULL"
else:
raise NotImplementedError(f"Type {data_type} not supported")


def pyarrow_schema_to_columns(schema: pa.Schema) -> list[Column]:
"""Convert a PyArrow schema to a list of Unitycatalog Column objects."""
columns = []

for i, field in enumerate(schema):
data_type = field.type
json_type = pyarrow_type_to_supported_uc_json_type(data_type)

column = Column(
name=field.name,
type_name=json_type,
nullable=field.nullable,
comment=f"Field {field.name}", # Generic comment, modify as needed
position=i,
type_json=json.dumps(
{
"name": field.name,
"type": json_type,
"nullable": field.nullable,
"metadata": field.metadata or {},
}
),
type_precision=0,
type_scale=0,
type_text=json_type,
)

# Adjust type precision and scale for decimal types
if pa.types.is_decimal(data_type):
column["type_precision"] = data_type.precision
column["type_scale"] = data_type.scale

columns.append(column)

return columns


def create_table_if_not_exists(
uc_client: Unitycatalog,
table_name: str,
Expand Down Expand Up @@ -283,7 +179,7 @@ def store(self, target_config: TargetConfig, df: pa.lib.Table = None):
storage_format = self.plugin_config.get("format", self.default_format)

# Convert the pa schema to columns
converted_schema = pyarrow_schema_to_columns(schema=df.schema)
converted_schema = model_unity_schema(schema=df.schema)

# Create the table in the Unitycatalog if it does not exist
create_table_if_not_exists(
Expand Down
1 change: 1 addition & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ twine
wheel
deltalake
unitycatalog
pyarrow-unity
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ unity =
unitycatalog==0.1.1
deltalake==0.18.2
pyarrow==17.0.0
pyarrow-unity==0.0.1
delta =
deltalake==0.18.2
pyarrow==17.0.0
Expand Down
Loading