Skip to content

Commit

Permalink
feat: Implement load table api. (#89)
Browse files Browse the repository at this point in the history
* Init commit

* Some comments

* Done

* Fix format

* fix clippy

* Fix fmt

* sort

* Use error response

* Fix

* Fix comments

* fmt

* fmt

* Fix

* Rename
  • Loading branch information
liurenjie1024 authored Nov 8, 2023
1 parent e398b5a commit e26bda3
Show file tree
Hide file tree
Showing 12 changed files with 636 additions and 138 deletions.
2 changes: 2 additions & 0 deletions crates/catalog/rest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ keywords = ["iceberg", "rest", "catalog"]

[dependencies]
async-trait = "0.1"
chrono = "0.4"
iceberg = { path = "../../iceberg" }
reqwest = { version = "^0.11", features = ["json"] }
serde = { version = "^1.0", features = ["rc"] }
serde_derive = "^1.0"
serde_json = "^1.0"
typed-builder = "^0.18"
urlencoding = "2"
uuid = { version = "1.5.0", features = ["v4"] }

[dev-dependencies]
mockito = "^1"
Expand Down
238 changes: 222 additions & 16 deletions crates/catalog/rest/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,17 @@ use serde::de::DeserializeOwned;
use typed_builder::TypedBuilder;
use urlencoding::encode;

use crate::catalog::_serde::LoadTableResponse;
use iceberg::io::{FileIO, FileIOBuilder};
use iceberg::table::Table;
use iceberg::Result;
use iceberg::{
Catalog, Error, ErrorKind, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent,
};

use self::_serde::{
CatalogConfig, ErrorModel, ErrorResponse, ListNamespaceResponse, ListTableResponse,
NamespaceSerde, RenameTableRequest, NO_CONTENT, OK,
CatalogConfig, ErrorResponse, ListNamespaceResponse, ListTableResponse, NamespaceSerde,
RenameTableRequest, NO_CONTENT, OK,
};

const ICEBERG_REST_SPEC_VERSION: &str = "1.14";
Expand Down Expand Up @@ -195,7 +197,7 @@ impl Catalog for RestCatalog {

let resp = self
.client
.query::<ListNamespaceResponse, ErrorModel, OK>(request.build()?)
.query::<ListNamespaceResponse, ErrorResponse, OK>(request.build()?)
.await?;

resp.namespaces
Expand All @@ -222,7 +224,7 @@ impl Catalog for RestCatalog {

let resp = self
.client
.query::<NamespaceSerde, ErrorModel, OK>(request)
.query::<NamespaceSerde, ErrorResponse, OK>(request)
.await?;

Namespace::try_from(resp)
Expand All @@ -238,7 +240,7 @@ impl Catalog for RestCatalog {

let resp = self
.client
.query::<NamespaceSerde, ErrorModel, OK>(request)
.query::<NamespaceSerde, ErrorResponse, OK>(request)
.await?;
Namespace::try_from(resp)
}
Expand Down Expand Up @@ -267,7 +269,7 @@ impl Catalog for RestCatalog {
.build()?;

self.client
.execute::<ErrorModel, NO_CONTENT>(request)
.execute::<ErrorResponse, NO_CONTENT>(request)
.await
.map(|_| true)
}
Expand All @@ -280,7 +282,9 @@ impl Catalog for RestCatalog {
.delete(self.config.namespace_endpoint(namespace))
.build()?;

self.client.execute::<ErrorModel, NO_CONTENT>(request).await
self.client
.execute::<ErrorResponse, NO_CONTENT>(request)
.await
}

/// List tables from namespace.
Expand All @@ -293,7 +297,7 @@ impl Catalog for RestCatalog {

let resp = self
.client
.query::<ListTableResponse, ErrorModel, OK>(request)
.query::<ListTableResponse, ErrorResponse, OK>(request)
.await?;

Ok(resp.identifiers)
Expand All @@ -312,11 +316,43 @@ impl Catalog for RestCatalog {
}

/// Load table from the catalog.
async fn load_table(&self, _table: &TableIdent) -> Result<Table> {
Err(Error::new(
ErrorKind::FeatureUnsupported,
"Creating table not supported yet!",
))
async fn load_table(&self, table: &TableIdent) -> Result<Table> {
let request = self
.client
.0
.get(self.config.table_endpoint(table))
.build()?;

let resp = self
.client
.query::<LoadTableResponse, ErrorResponse, OK>(request)
.await?;

let mut props = self.config.props.clone();
if let Some(config) = resp.config {
props.extend(config);
}

let file_io = match self
.config
.warehouse
.as_ref()
.or_else(|| resp.metadata_location.as_ref())
{
Some(url) => FileIO::from_path(url)?.with_props(props).build()?,
None => FileIOBuilder::new("s3").with_props(props).build()?,
};

let table_builder = Table::builder()
.identifier(table.clone())
.file_io(file_io)
.metadata(resp.metadata);

if let Some(metadata_location) = resp.metadata_location {
Ok(table_builder.metadata_location(metadata_location).build())
} else {
Ok(table_builder.build())
}
}

/// Drop a table from the catalog.
Expand All @@ -327,7 +363,9 @@ impl Catalog for RestCatalog {
.delete(self.config.table_endpoint(table))
.build()?;

self.client.execute::<ErrorModel, NO_CONTENT>(request).await
self.client
.execute::<ErrorResponse, NO_CONTENT>(request)
.await
}

/// Check if a table exists in the catalog.
Expand All @@ -339,7 +377,7 @@ impl Catalog for RestCatalog {
.build()?;

self.client
.execute::<ErrorModel, NO_CONTENT>(request)
.execute::<ErrorResponse, NO_CONTENT>(request)
.await
.map(|_| true)
}
Expand All @@ -356,7 +394,9 @@ impl Catalog for RestCatalog {
})
.build()?;

self.client.execute::<ErrorModel, NO_CONTENT>(request).await
self.client
.execute::<ErrorResponse, NO_CONTENT>(request)
.await
}

/// Update a table to the catalog.
Expand Down Expand Up @@ -412,10 +452,12 @@ mod _serde {

use serde_derive::{Deserialize, Serialize};

use iceberg::spec::TableMetadata;
use iceberg::{Error, ErrorKind, Namespace, TableIdent};

pub(super) const OK: u16 = 200u16;
pub(super) const NO_CONTENT: u16 = 204u16;

#[derive(Clone, Debug, Serialize, Deserialize)]
pub(super) struct CatalogConfig {
pub(super) overrides: HashMap<String, String>,
Expand Down Expand Up @@ -534,11 +576,26 @@ mod _serde {
pub(super) source: TableIdent,
pub(super) destination: TableIdent,
}

#[derive(Debug, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub(super) struct LoadTableResponse {
pub(super) metadata_location: Option<String>,
pub(super) metadata: TableMetadata,
pub(super) config: Option<HashMap<String, String>>,
}
}

#[cfg(test)]
mod tests {
use iceberg::spec::ManifestListLocation::ManifestListFile;
use iceberg::spec::{
FormatVersion, NestedField, Operation, PrimitiveType, Schema, Snapshot, SnapshotLog,
SortOrder, Summary, Type,
};
use mockito::{Mock, Server, ServerGuard};
use std::sync::Arc;
use uuid::uuid;

use super::*;

Expand Down Expand Up @@ -884,4 +941,153 @@ mod tests {
config_mock.assert_async().await;
rename_table_mock.assert_async().await;
}

#[tokio::test]
async fn test_load_table() {
let mut server = Server::new_async().await;

let config_mock = create_config_mock(&mut server).await;

let rename_table_mock = server
.mock("GET", "/v1/namespaces/ns1/tables/test1")
.with_status(200)
.with_body_from_file(format!(
"{}/testdata/{}",
env!("CARGO_MANIFEST_DIR"),
"load_table_response.json"
))
.create_async()
.await;

let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
.await
.unwrap();

let table = catalog
.load_table(&TableIdent::new(
NamespaceIdent::new("ns1".to_string()),
"test1".to_string(),
))
.await
.unwrap();

assert_eq!(
&TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
table.identifier()
);
assert_eq!("s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json", table.metadata_location().unwrap());
assert_eq!(FormatVersion::V1, table.metadata().format_version());
assert_eq!("s3://warehouse/database/table", table.metadata().location());
assert_eq!(
uuid!("b55d9dda-6561-423a-8bfc-787980ce421f"),
table.metadata().uuid()
);
assert_eq!(1646787054459, table.metadata().last_updated_ms());
assert_eq!(
vec![&Arc::new(
Schema::builder()
.with_fields(vec![
NestedField::optional(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::optional(2, "data", Type::Primitive(PrimitiveType::String))
.into(),
])
.build()
.unwrap()
)],
table.metadata().schemas_iter().collect::<Vec<_>>()
);
assert_eq!(
&HashMap::from([
("owner".to_string(), "bryan".to_string()),
(
"write.metadata.compression-codec".to_string(),
"gzip".to_string()
)
]),
table.metadata().properties()
);
assert_eq!(vec![&Arc::new(Snapshot::builder()
.with_snapshot_id(3497810964824022504)
.with_timestamp_ms(1646787054459)
.with_manifest_list(ManifestListFile("s3://warehouse/database/table/metadata/snap-3497810964824022504-1-c4f68204-666b-4e50-a9df-b10c34bf6b82.avro".to_string()))
.with_sequence_number(0)
.with_schema_id(0)
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::from_iter([
("spark.app.id", "local-1646787004168"),
("added-data-files", "1"),
("added-records", "1"),
("added-files-size", "697"),
("changed-partition-count", "1"),
("total-records", "1"),
("total-files-size", "697"),
("total-data-files", "1"),
("total-delete-files", "0"),
("total-position-deletes", "0"),
("total-equality-deletes", "0")
].iter().map(|p|(p.0.to_string(), p.1.to_string())))
}).build().unwrap()
)], table.metadata().snapshots().collect::<Vec<_>>());
assert_eq!(
&[SnapshotLog {
timestamp_ms: 1646787054459,
snapshot_id: 3497810964824022504
}],
table.metadata().history()
);
assert_eq!(
vec![&Arc::new(SortOrder {
order_id: 0,
fields: vec![]
})],
table.metadata().sort_orders_iter().collect::<Vec<_>>()
);

config_mock.assert_async().await;
rename_table_mock.assert_async().await;
}

#[tokio::test]
async fn test_load_table_404() {
let mut server = Server::new_async().await;

let config_mock = create_config_mock(&mut server).await;

let rename_table_mock = server
.mock("GET", "/v1/namespaces/ns1/tables/test1")
.with_status(404)
.with_body(r#"
{
"error": {
"message": "Table does not exist: ns1.test1 in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
"type": "NoSuchNamespaceErrorException",
"code": 404
}
}
"#)
.create_async()
.await;

let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
.await
.unwrap();

let table = catalog
.load_table(&TableIdent::new(
NamespaceIdent::new("ns1".to_string()),
"test1".to_string(),
))
.await;

assert!(table.is_err());
assert!(table
.err()
.unwrap()
.message()
.contains("Table does not exist"));

config_mock.assert_async().await;
rename_table_mock.assert_async().await;
}
}
Loading

0 comments on commit e26bda3

Please sign in to comment.