Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: read rows query model class #752

Merged
merged 20 commits into from
Apr 3, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 194 additions & 12 deletions google/cloud/bigtable/read_rows_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,36 +13,158 @@
# limitations under the License.
#
from __future__ import annotations
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any
from .row_response import row_key
from dataclasses import dataclass
from google.cloud.bigtable.row_filters import RowFilter

if TYPE_CHECKING:
from google.cloud.bigtable.row_filters import RowFilter
from google.cloud.bigtable import RowKeySamples


@dataclass
class _RangePoint:
# model class for a point in a row range
daniel-sanche marked this conversation as resolved.
Show resolved Hide resolved
key: row_key
is_inclusive: bool


class ReadRowsQuery:
"""
Class to encapsulate details of a read row request
"""

def __init__(
self, row_keys: list[str | bytes] | str | bytes | None = None, limit=None
self,
row_keys: list[str | bytes] | str | bytes | None = None,
limit: int | None = None,
row_filter: RowFilter | dict[str, Any] | None = None,
mutianf marked this conversation as resolved.
Show resolved Hide resolved
):
pass
"""
Create a new ReadRowsQuery

def set_limit(self, limit: int) -> ReadRowsQuery:
raise NotImplementedError
Args:
- row_keys: a list of row keys to include in the query
- limit: the maximum number of rows to return. None or 0 means no limit
default: None (no limit)
- row_filter: a RowFilter to apply to the query
"""
self.row_keys: set[bytes] = set()
self.row_ranges: list[tuple[_RangePoint | None, _RangePoint | None]] = []
if row_keys:
self.add_rows(row_keys)
self.limit: int | None = limit
self.filter: RowFilter | dict[str, Any] = row_filter

def set_filter(self, filter: "RowFilter") -> ReadRowsQuery:
raise NotImplementedError
def set_limit(self, new_limit: int | None):
daniel-sanche marked this conversation as resolved.
Show resolved Hide resolved
"""
Set the maximum number of rows to return by this query.

def add_rows(self, row_id_list: list[str]) -> ReadRowsQuery:
raise NotImplementedError
None or 0 means no limit

Args:
- new_limit: the new limit to apply to this query
Returns:
- a reference to this query for chaining
Raises:
- ValueError if new_limit is < 0
"""
if new_limit is not None and new_limit < 0:
raise ValueError("limit must be >= 0")
self._limit = new_limit
return self

def set_filter(
self, row_filter: RowFilter | dict[str, Any] | None
) -> ReadRowsQuery:
"""
Set a RowFilter to apply to this query

Args:
- row_filter: a RowFilter to apply to this query
Can be a RowFilter object or a dict representation
Returns:
- a reference to this query for chaining
"""
if not (
isinstance(row_filter, dict)
or isinstance(row_filter, RowFilter)
or row_filter is None
):
raise ValueError("row_filter must be a RowFilter or dict")
self._filter = row_filter
return self

def add_rows(self, row_keys: list[str | bytes] | str | bytes) -> ReadRowsQuery:
daniel-sanche marked this conversation as resolved.
Show resolved Hide resolved
"""
Add a list of row keys to this query

Args:
- row_keys: a list of row keys to add to this query
Returns:
- a reference to this query for chaining
Raises:
- ValueError if an input is not a string or bytes
"""
if not isinstance(row_keys, list):
row_keys = [row_keys]
update_set = set()
for k in row_keys:
if isinstance(k, str):
k = k.encode()
elif not isinstance(k, bytes):
raise ValueError("row_keys must be strings or bytes")
update_set.add(k)
self.row_keys.update(update_set)
return self

def add_range(
daniel-sanche marked this conversation as resolved.
Show resolved Hide resolved
self, start_key: str | bytes | None = None, end_key: str | bytes | None = None
self,
start_key: str | bytes | None = None,
end_key: str | bytes | None = None,
start_is_inclusive: bool | None = None,
end_is_inclusive: bool | None = None,
) -> ReadRowsQuery:
raise NotImplementedError
"""
Add a range of row keys to this query.

Args:
- start_key: the start of the range
if None, start_key is interpreted as the empty string, inclusive
- end_key: the end of the range
if None, end_key is interpreted as the infinite row key, exclusive
- start_is_inclusive: if True, the start key is included in the range
defaults to True if None. Must not be included if start_key is None
- end_is_inclusive: if True, the end key is included in the range
defaults to False if None. Must not be included if end_key is None
"""
# check for invalid combinations of arguments
if start_is_inclusive is None:
start_is_inclusive = True
elif start_key is None:
raise ValueError("start_is_inclusive must be set with start_key")
if end_is_inclusive is None:
end_is_inclusive = False
elif end_key is None:
raise ValueError("end_is_inclusive must be set with end_key")
# ensure that start_key and end_key are bytes
if isinstance(start_key, str):
start_key = start_key.encode()
elif start_key is not None and not isinstance(start_key, bytes):
raise ValueError("start_key must be a string or bytes")
if isinstance(end_key, str):
end_key = end_key.encode()
elif end_key is not None and not isinstance(end_key, bytes):
raise ValueError("end_key must be a string or bytes")

start_pt = (
_RangePoint(start_key, start_is_inclusive)
if start_key is not None
else None
)
end_pt = _RangePoint(end_key, end_is_inclusive) if end_key is not None else None
self.row_ranges.append((start_pt, end_pt))
return self

def shard(self, shard_keys: "RowKeySamples" | None = None) -> list[ReadRowsQuery]:
"""
Expand All @@ -54,3 +176,63 @@ def shard(self, shard_keys: "RowKeySamples" | None = None) -> list[ReadRowsQuery
query (if possible)
"""
raise NotImplementedError

def to_dict(self) -> dict[str, Any]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please reconsider making dicts the primary api contract for methods.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and if you stick with this, to_dict should probably be a an internal method

"""
Convert this query into a dictionary that can be used to construct a
ReadRowsRequest protobuf
"""
ranges = []
for start, end in self.row_ranges:
new_range = {}
if start is not None:
key = "start_key_closed" if start.is_inclusive else "start_key_open"
new_range[key] = start.key
if end is not None:
key = "end_key_closed" if end.is_inclusive else "end_key_open"
new_range[key] = end.key
ranges.append(new_range)
row_keys = list(self.row_keys)
row_keys.sort()
row_set = {"row_keys": row_keys, "row_ranges": ranges}
final_dict: dict[str, Any] = {
"rows": row_set,
}
dict_filter = (
self.filter.to_dict() if isinstance(self.filter, RowFilter) else self.filter
)
if dict_filter:
final_dict["filter"] = dict_filter
if self.limit is not None:
final_dict["rows_limit"] = self.limit
return final_dict

# Support limit and filter as properties

@property
def limit(self) -> int | None:
"""
Getter implementation for limit property
"""
return self._limit

@limit.setter
def limit(self, new_limit: int | None):
"""
Setter implementation for limit property
"""
self.set_limit(new_limit)

@property
def filter(self):
"""
Getter implemntation for filter property
"""
return self._filter

@filter.setter
def filter(self, row_filter: RowFilter | dict[str, Any] | None):
"""
Setter implementation for filter property
"""
self.set_filter(row_filter)
Loading