Skip to content

Commit

Permalink
feat: adds StorageDescriptor and tests (#2109)
Browse files Browse the repository at this point in the history
* feat: adds StorageDescriptor and tests

* updates attr names, corrects type hinting
  • Loading branch information
chalmerlowe authored Jan 14, 2025
1 parent 62960f2 commit 6be0272
Show file tree
Hide file tree
Showing 2 changed files with 246 additions and 0 deletions.
118 changes: 118 additions & 0 deletions google/cloud/bigquery/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -644,3 +644,121 @@ def from_api_repr(cls, api_repr: dict) -> SerDeInfo:
config = cls("PLACEHOLDER")
config._properties = api_repr
return config


class StorageDescriptor:
"""Contains information about how a table's data is stored and accessed by open
source query engines.
Args:
input_format (Optional[str]): Specifies the fully qualified class name of
the InputFormat (e.g.
"org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"). The maximum
length is 128 characters.
location_uri (Optional[str]): The physical location of the table (e.g.
'gs://spark-dataproc-data/pangea-data/case_sensitive/' or
'gs://spark-dataproc-data/pangea-data/'). The maximum length is
2056 bytes.
output_format (Optional[str]): Specifies the fully qualified class name
of the OutputFormat (e.g.
"org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"). The maximum
length is 128 characters.
serde_info (Union[SerDeInfo, dict, None]): Serializer and deserializer information.
"""

def __init__(
self,
input_format: Optional[str] = None,
location_uri: Optional[str] = None,
output_format: Optional[str] = None,
serde_info: Union[SerDeInfo, dict, None] = None,
):
self._properties: Dict[str, Any] = {}
self.input_format = input_format
self.location_uri = location_uri
self.output_format = output_format
# Using typing.cast() because mypy cannot wrap it's head around the fact that:
# the setter can accept Union[SerDeInfo, dict, None]
# but the getter will only ever return Optional[SerDeInfo].
self.serde_info = typing.cast(Optional[SerDeInfo], serde_info)

@property
def input_format(self) -> Optional[str]:
"""Optional. Specifies the fully qualified class name of the InputFormat
(e.g. "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"). The maximum
length is 128 characters."""

return self._properties.get("inputFormat")

@input_format.setter
def input_format(self, value: Optional[str]):
value = _helpers._isinstance_or_raise(value, str, none_allowed=True)
self._properties["inputFormat"] = value

@property
def location_uri(self) -> Optional[str]:
"""Optional. The physical location of the table (e.g. 'gs://spark-
dataproc-data/pangea-data/case_sensitive/' or 'gs://spark-dataproc-
data/pangea-data/'). The maximum length is 2056 bytes."""

return self._properties.get("locationUri")

@location_uri.setter
def location_uri(self, value: Optional[str]):
value = _helpers._isinstance_or_raise(value, str, none_allowed=True)
self._properties["locationUri"] = value

@property
def output_format(self) -> Optional[str]:
"""Optional. Specifies the fully qualified class name of the
OutputFormat (e.g. "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat").
The maximum length is 128 characters."""

return self._properties.get("outputFormat")

@output_format.setter
def output_format(self, value: Optional[str]):
value = _helpers._isinstance_or_raise(value, str, none_allowed=True)
self._properties["outputFormat"] = value

@property
def serde_info(self) -> Optional[SerDeInfo]:
"""Optional. Serializer and deserializer information."""

prop = _helpers._get_sub_prop(self._properties, ["serDeInfo"])
if prop is not None:
return typing.cast(SerDeInfo, SerDeInfo.from_api_repr(prop))
return None

@serde_info.setter
def serde_info(self, value: Union[SerDeInfo, dict, None]):
value = _helpers._isinstance_or_raise(
value, (SerDeInfo, dict), none_allowed=True
)

if isinstance(value, SerDeInfo):
self._properties["serDeInfo"] = value.to_api_repr()
else:
self._properties["serDeInfo"] = value

def to_api_repr(self) -> dict:
"""Build an API representation of this object.
Returns:
Dict[str, Any]:
A dictionary in the format used by the BigQuery API.
"""
return self._properties

@classmethod
def from_api_repr(cls, resource: dict) -> StorageDescriptor:
"""Factory: constructs an instance of the class (cls)
given its API representation.
Args:
resource (Dict[str, Any]):
API representation of the object to be instantiated.
Returns:
An instance of the class initialized with data from 'resource'.
"""
config = cls()
config._properties = resource
return config
128 changes: 128 additions & 0 deletions tests/unit/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -1213,3 +1213,131 @@ def test_from_api_repr(self):
# We convert both to dict format because these classes do not have a
# __eq__() method to facilitate direct equality comparisons.
assert result.to_api_repr() == expected.to_api_repr()


class TestStorageDescriptor:
"""Tests for the StorageDescriptor class."""

@staticmethod
def _get_target_class():
return schema.StorageDescriptor

def _make_one(self, *args, **kwargs):
return self._get_target_class()(*args, **kwargs)

serdeinfo_resource = {
"serialization_library": "testpath.to.LazySimpleSerDe",
"name": "serde_lib_name",
"parameters": {"key": "value"},
}

SERDEINFO = schema.SerDeInfo("PLACEHOLDER").from_api_repr(serdeinfo_resource)

STORAGEDESCRIPTOR = {
"inputFormat": "testpath.to.OrcInputFormat",
"locationUri": "gs://test/path/",
"outputFormat": "testpath.to.OrcOutputFormat",
"serDeInfo": SERDEINFO.to_api_repr(),
}

@pytest.mark.parametrize(
"input_format,location_uri,output_format,serde_info",
[
(None, None, None, None),
("testpath.to.OrcInputFormat", None, None, None),
(None, "gs://test/path/", None, None),
(None, None, "testpath.to.OrcOutputFormat", None),
(None, None, None, SERDEINFO),
(
"testpath.to.OrcInputFormat",
"gs://test/path/",
"testpath.to.OrcOutputFormat",
SERDEINFO, # uses SERDEINFO class format
),
(
"testpath.to.OrcInputFormat",
"gs://test/path/",
"testpath.to.OrcOutputFormat",
serdeinfo_resource, # uses api resource format (dict)
),
],
)
def test_ctor_valid_input(
self, input_format, location_uri, output_format, serde_info
):
storage_descriptor = self._make_one(
input_format=input_format,
location_uri=location_uri,
output_format=output_format,
serde_info=serde_info,
)
assert storage_descriptor.input_format == input_format
assert storage_descriptor.location_uri == location_uri
assert storage_descriptor.output_format == output_format
if isinstance(serde_info, schema.SerDeInfo):
assert (
storage_descriptor.serde_info.to_api_repr() == serde_info.to_api_repr()
)
elif isinstance(serde_info, dict):
assert storage_descriptor.serde_info.to_api_repr() == serde_info
else:
assert storage_descriptor.serde_info is None

@pytest.mark.parametrize(
"input_format,location_uri,output_format,serde_info",
[
(123, None, None, None),
(None, 123, None, None),
(None, None, 123, None),
(None, None, None, 123),
],
)
def test_ctor_invalid_input(
self, input_format, location_uri, output_format, serde_info
):
with pytest.raises(TypeError) as e:
self._make_one(
input_format=input_format,
location_uri=location_uri,
output_format=output_format,
serde_info=serde_info,
)

# Looking for the first word from the string "Pass <variable> as..."
assert "Pass " in str(e.value)

def test_to_api_repr(self):
storage_descriptor = self._make_one(
input_format="input_format",
location_uri="location_uri",
output_format="output_format",
serde_info=self.SERDEINFO,
)
expected_repr = {
"inputFormat": "input_format",
"locationUri": "location_uri",
"outputFormat": "output_format",
"serDeInfo": self.SERDEINFO.to_api_repr(),
}
assert storage_descriptor.to_api_repr() == expected_repr

def test_from_api_repr(self):
"""GIVEN an api representation of a StorageDescriptor (i.e. STORAGEDESCRIPTOR)
WHEN converted into a StorageDescriptor using from_api_repr() and
displayed as a dict
THEN it will have the same representation a StorageDescriptor created
directly (via the _make_one() func) and displayed as a dict.
"""

# generate via STORAGEDESCRIPTOR
resource = self.STORAGEDESCRIPTOR
result = self._get_target_class().from_api_repr(resource)
# result = klass.from_api_repr(resource)

expected = self._make_one(
input_format="testpath.to.OrcInputFormat",
location_uri="gs://test/path/",
output_format="testpath.to.OrcOutputFormat",
serde_info=self.SERDEINFO,
)
assert result.to_api_repr() == expected.to_api_repr()

0 comments on commit 6be0272

Please sign in to comment.