Skip to content

Commit

Permalink
Update docs, add comments, reorganize code into smaller files.
Browse files Browse the repository at this point in the history
  • Loading branch information
peterallenwebb committed Jul 16, 2024
1 parent f8ad09e commit 18d8f62
Show file tree
Hide file tree
Showing 11 changed files with 274 additions and 220 deletions.
206 changes: 0 additions & 206 deletions dbt/adapters/record.py

This file was deleted.

2 changes: 2 additions & 0 deletions dbt/adapters/record/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from dbt.adapters.record.handle import RecordReplayHandle
from dbt.adapters.record.cursor.cursor import RecordReplayCursor
54 changes: 54 additions & 0 deletions dbt/adapters/record/cursor/cursor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from typing import Any, Optional

from dbt_common.record import record_function

from dbt.adapters.contracts.connection import Connection
from dbt.adapters.record.cursor.description import CursorGetDescriptionRecord
from dbt.adapters.record.cursor.execute import CursorExecuteRecord
from dbt.adapters.record.cursor.fetchone import CursorFetchOneRecord
from dbt.adapters.record.cursor.fetchmany import CursorFetchManyRecord
from dbt.adapters.record.cursor.fetchall import CursorFetchAllRecord
from dbt.adapters.record.cursor.rowcount import CursorGetRowCountRecord


class RecordReplayCursor:
"""A proxy object used to wrap native database cursors under record/replay
modes. In record mode, this proxy notes the parameters and return values
of the methods and properties it implements, which closely match the Python
DB API 2.0 cursor methods used by many dbt adapters to interact with the
database or DWH. In replay mode, it mocks out those calls using previously
recorded calls, so that no interaction with a database actually occurs."""

def __init__(self, native_cursor: Any, connection: Connection) -> None:
self.native_cursor = native_cursor
self.connection = connection

@record_function(CursorExecuteRecord, method=True, id_field_name="connection_name")
def execute(self, operation, parameters=None) -> None:
self.native_cursor.execute(operation, parameters)

@record_function(CursorFetchOneRecord, method=True, id_field_name="connection_name")
def fetchone(self) -> Any:
return self.native_cursor.fetchone()

@record_function(CursorFetchManyRecord, method=True, id_field_name="connection_name")
def fetchmany(self, size: int) -> Any:
return self.native_cursor.fetchmany(size)

@record_function(CursorFetchAllRecord, method=True, id_field_name="connection_name")
def fetchall(self) -> Any:
return self.native_cursor.fetchall()

@property
def connection_name(self) -> Optional[str]:
return self.connection.name

@property
@record_function(CursorGetRowCountRecord, method=True, id_field_name="connection_name")
def rowcount(self) -> int:
return self.native_cursor.rowcount

@property
@record_function(CursorGetDescriptionRecord, method=True, id_field_name="connection_name")
def description(self) -> str:
return self.native_cursor.description
37 changes: 37 additions & 0 deletions dbt/adapters/record/cursor/description.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import dataclasses
from typing import Any, Iterable, Mapping

from dbt_common.record import Record, Recorder


@dataclasses.dataclass
class CursorGetDescriptionParams:
connection_name: str


@dataclasses.dataclass
class CursorGetDescriptionResult:
columns: Iterable[Any]

def _to_dict(self) -> Any:
column_dicts = []
for c in self.columns:
# This captures the mandatory column information, but we might need
# more for some adapters.
# See https://peps.python.org/pep-0249/#description
column_dicts.append((c[0], c[1]))

return {"columns": column_dicts}

@classmethod
def _from_dict(cls, dct: Mapping) -> "CursorGetDescriptionResult":
return CursorGetDescriptionResult(columns=dct["columns"])


@Recorder.register_record_type
class CursorGetDescriptionRecord(Record):
"""Implements record/replay support for the cursor.description property."""

params_cls = CursorGetDescriptionParams
result_cls = CursorGetDescriptionResult
group = "Database"
20 changes: 20 additions & 0 deletions dbt/adapters/record/cursor/execute.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import dataclasses
from typing import Any, Iterable, Union, Mapping

from dbt_common.record import Record, Recorder


@dataclasses.dataclass
class CursorExecuteParams:
connection_name: str
operation: str
parameters: Union[Iterable[Any], Mapping[str, Any]]


@Recorder.register_record_type
class CursorExecuteRecord(Record):
"""Implements record/replay support for the cursor.execute() method."""

params_cls = CursorExecuteParams
result_cls = None
group = "Database"
66 changes: 66 additions & 0 deletions dbt/adapters/record/cursor/fetchall.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import dataclasses
import datetime
from typing import Any, Dict, List, Mapping

from dbt_common.record import Record, Recorder


@dataclasses.dataclass
class CursorFetchAllParams:
connection_name: str


@dataclasses.dataclass
class CursorFetchAllResult:
results: List[Any]

def _to_dict(self) -> Dict[str, Any]:
processed_results = []
for result in self.results:
result = tuple(map(self._process_value, result))
processed_results.append(result)

return {"results": processed_results}

@classmethod
def _from_dict(cls, dct: Mapping) -> "CursorFetchAllResult":
unprocessed_results = []
for result in dct["results"]:
result = tuple(map(cls._unprocess_value, result))
unprocessed_results.append(result)

return CursorFetchAllResult(unprocessed_results)

@classmethod
def _process_value(cls, value: Any) -> Any:
if type(value) is datetime.date:
return {"type": "date", "value": value.isoformat()}
elif type(value) is datetime.datetime:
return {"type": "datetime", "value": value.isoformat()}
else:
return value

@classmethod
def _unprocess_value(cls, value: Any) -> Any:
if type(value) is dict:
value_type = value.get("type")
if value_type == "date":
date_string = value.get("value")
assert isinstance(date_string, str)
return datetime.date.fromisoformat(date_string)
elif value_type == "datetime":
date_string = value.get("value")
assert isinstance(date_string, str)
return datetime.datetime.fromisoformat(date_string)
return value
else:
return value


@Recorder.register_record_type
class CursorFetchAllRecord(Record):
"""Implements record/replay support for the cursor.fetchall() method."""

params_cls = CursorFetchAllParams
result_cls = CursorFetchAllResult
group = "Database"
Loading

0 comments on commit 18d8f62

Please sign in to comment.