Skip to content

Commit

Permalink
feat(python): combine load_version/load_with_datetime into `load_as_v…
Browse files Browse the repository at this point in the history
…ersion` (#1968)

# Description
Combines the two functions into one.

# Related Issue(s)
- closes #1910
- closes #1967

---------

Co-authored-by: Robert Pack <[email protected]>
  • Loading branch information
ion-elgreco and roeap authored Dec 19, 2023
1 parent f6d2061 commit a5a4e69
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 13 deletions.
5 changes: 2 additions & 3 deletions crates/deltalake-core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ impl DeltaTable {
.object_store()
.head(&commit_uri_from_version(version))
.await?;
let ts = meta.last_modified.timestamp();
let ts = meta.last_modified.timestamp_millis();
// also cache timestamp for version
self.version_timestamp.insert(version, ts);

Expand Down Expand Up @@ -875,14 +875,13 @@ impl DeltaTable {
let mut min_version = 0;
let mut max_version = self.get_latest_version().await?;
let mut version = min_version;
let target_ts = datetime.timestamp();
let target_ts = datetime.timestamp_millis();

// binary search
while min_version <= max_version {
let pivot = (max_version + min_version) / 2;
version = pivot;
let pts = self.get_version_timestamp(pivot).await?;

match pts.cmp(&target_ts) {
Ordering::Equal => {
break;
Expand Down
6 changes: 5 additions & 1 deletion crates/deltalake-core/tests/command_restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use rand::Rng;
use std::error::Error;
use std::fs;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use tempdir::TempDir;

#[derive(Debug)]
Expand Down Expand Up @@ -42,19 +44,21 @@ async fn setup_test() -> Result<Context, Box<dyn Error>> {
.await?;

let batch = get_record_batch();

thread::sleep(Duration::from_secs(1));
let table = DeltaOps(table)
.write(vec![batch.clone()])
.with_save_mode(SaveMode::Append)
.await
.unwrap();

thread::sleep(Duration::from_secs(1));
let table = DeltaOps(table)
.write(vec![batch.clone()])
.with_save_mode(SaveMode::Overwrite)
.await
.unwrap();

thread::sleep(Duration::from_secs(1));
let table = DeltaOps(table)
.write(vec![batch.clone()])
.with_save_mode(SaveMode::Append)
Expand Down
54 changes: 54 additions & 0 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,20 +452,69 @@ def file_uris(

file_uris.__doc__ = ""

def load_as_version(self, version: Union[int, str, datetime]) -> None:
"""
Load/time travel a DeltaTable to a specified version number, or a timestamp version of the table. If a
string is passed then the argument should be an RFC 3339 and ISO 8601 date and time string format.
Args:
version: the identifier of the version of the DeltaTable to load
Example:
**Use a version number**
```
dt = DeltaTable("test_table")
dt.load_as_version(1)
```
**Use a datetime object**
```
dt.load_as_version(datetime(2023,1,1))
```
**Use a datetime in string format**
```
dt.load_as_version("2018-01-26T18:30:09Z")
dt.load_as_version("2018-12-19T16:39:57-08:00")
dt.load_as_version("2018-01-26T18:30:09.453+00:00")
```
"""
if isinstance(version, int):
self._table.load_version(version)
elif isinstance(version, datetime):
self._table.load_with_datetime(version.isoformat())
elif isinstance(version, str):
self._table.load_with_datetime(version)
else:
raise TypeError(
"Invalid datatype provided for version, only int, str or datetime are accepted."
)

def load_version(self, version: int) -> None:
"""
Load a DeltaTable with a specified version.
!!! warning "Deprecated"
Load_version and load_with_datetime have been combined into `DeltaTable.load_as_version`.
Args:
version: the identifier of the version of the DeltaTable to load
"""
warnings.warn(
"Call to deprecated method DeltaTable.load_version. Use DeltaTable.load_as_version() instead.",
category=DeprecationWarning,
stacklevel=2,
)
self._table.load_version(version)

def load_with_datetime(self, datetime_string: str) -> None:
"""
Time travel Delta table to the latest version that's created at or before provided `datetime_string` argument.
The `datetime_string` argument should be an RFC 3339 and ISO 8601 date and time string.
!!! warning "Deprecated"
Load_version and load_with_datetime have been combined into `DeltaTable.load_as_version`.
Args:
datetime_string: the identifier of the datetime point of the DeltaTable to load
Expand All @@ -476,6 +525,11 @@ def load_with_datetime(self, datetime_string: str) -> None:
"2018-01-26T18:30:09.453+00:00"
```
"""
warnings.warn(
"Call to deprecated method DeltaTable.load_with_datetime. Use DeltaTable.load_as_version() instead.",
category=DeprecationWarning,
stacklevel=2,
)
self._table.load_with_datetime(datetime_string)

@property
Expand Down
25 changes: 16 additions & 9 deletions python/tests/test_table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,15 @@ def test_read_simple_table_using_options_to_dict():
assert dt.to_pyarrow_dataset().to_table().to_pydict() == {"value": [1, 2, 3]}


def test_load_with_datetime():
@pytest.mark.parametrize(
["date_value", "expected_version"],
[
("2020-05-01T00:47:31-07:00", 0),
("2020-05-02T22:47:31-07:00", 1),
("2020-05-25T22:47:31-07:00", 4),
],
)
def test_load_as_version_datetime(date_value: str, expected_version):
log_dir = "../crates/deltalake-core/tests/data/simple_table/_delta_log"
log_mtime_pair = [
("00000000000000000000.json", 1588398451.0),
Expand All @@ -78,15 +86,14 @@ def test_load_with_datetime():

table_path = "../crates/deltalake-core/tests/data/simple_table"
dt = DeltaTable(table_path)
dt.load_with_datetime("2020-05-01T00:47:31-07:00")
assert dt.version() == 0
dt.load_with_datetime("2020-05-02T22:47:31-07:00")
assert dt.version() == 1
dt.load_with_datetime("2020-05-25T22:47:31-07:00")
assert dt.version() == 4
dt.load_as_version(date_value)
assert dt.version() == expected_version
dt = DeltaTable(table_path)
dt.load_as_version(datetime.fromisoformat(date_value))
assert dt.version() == expected_version


def test_load_with_datetime_bad_format():
def test_load_as_version_datetime_bad_format():
table_path = "../crates/deltalake-core/tests/data/simple_table"
dt = DeltaTable(table_path)

Expand All @@ -96,7 +103,7 @@ def test_load_with_datetime_bad_format():
"2020-05-01T00:47:31+08",
]:
with pytest.raises(Exception, match="Failed to parse datetime string:"):
dt.load_with_datetime(bad_format)
dt.load_as_version(bad_format)


def test_read_simple_table_update_incremental():
Expand Down

0 comments on commit a5a4e69

Please sign in to comment.