Skip to content

Commit

Permalink
fix: lock attach table's snapshot location in talbe instance
Browse files Browse the repository at this point in the history
  • Loading branch information
dantengsky committed Nov 26, 2024
1 parent b1304a5 commit 371d2a2
Show file tree
Hide file tree
Showing 17 changed files with 64 additions and 76 deletions.
4 changes: 4 additions & 0 deletions src/meta/app/src/schema/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,10 @@ impl TableInfo {
&self.meta.options
}

pub fn options_mut(&mut self) -> &mut BTreeMap<String, String> {
&mut self.meta.options
}

pub fn catalog(&self) -> &str {
&self.catalog_info.name_ident.catalog_name
}
Expand Down
2 changes: 1 addition & 1 deletion src/query/ee/src/fail_safe/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ impl Amender {
Ok(None) => (),
Err(e) => {
if e.code() == ErrorCode::STORAGE_NOT_FOUND {
let snapshot_location = table.snapshot_loc().await?.unwrap();
let snapshot_location = table.snapshot_loc().unwrap();
self.recover_object(&snapshot_location).await?;
let snapshot = table.read_table_snapshot().await?;
let schema = table.schema();
Expand Down
2 changes: 1 addition & 1 deletion src/query/ee/src/storages/fuse/operations/vacuum_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub async fn get_snapshot_referenced_files(
ctx: &Arc<dyn TableContext>,
) -> Result<Option<SnapshotReferencedFiles>> {
// 1. Read the root snapshot.
let root_snapshot_location_op = fuse_table.snapshot_loc().await?;
let root_snapshot_location_op = fuse_table.snapshot_loc()?;
if root_snapshot_location_op.is_none() {
return Ok(None);
}
Expand Down
4 changes: 2 additions & 2 deletions src/query/service/src/interpreters/common/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub async fn dml_build_update_stream_req(
let table_version = inner_fuse.get_table_info().ident.seq;
let mut options = stream.options().clone();
options.insert(OPT_KEY_TABLE_VER.to_string(), table_version.to_string());
if let Some(snapshot_loc) = inner_fuse.snapshot_loc().await? {
if let Some(snapshot_loc) = inner_fuse.snapshot_loc() {
options.insert(OPT_KEY_SNAPSHOT_LOCATION.to_string(), snapshot_loc);
}

Expand Down Expand Up @@ -147,7 +147,7 @@ pub async fn query_build_update_stream_req(
let table_version = inner_fuse.get_table_info().ident.seq;
let mut options = stream.options().clone();
options.insert(OPT_KEY_TABLE_VER.to_string(), table_version.to_string());
if let Some(snapshot_loc) = inner_fuse.snapshot_loc().await? {
if let Some(snapshot_loc) = inner_fuse.snapshot_loc() {
options.insert(OPT_KEY_SNAPSHOT_LOCATION.to_string(), snapshot_loc);
}
let mut new_table_meta = stream_info.meta.clone();
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/test_kits/check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ pub async fn check_data_dir(
if check_last_snapshot.is_some() {
let table = fixture.latest_default_table().await?;
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
let snapshot_loc = fuse_table.snapshot_loc().await?;
let snapshot_loc = fuse_table.snapshot_loc();
let snapshot_loc = snapshot_loc.unwrap();
assert!(last_snapshot_loc.contains(&snapshot_loc));
assert_eq!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ async fn test_last_snapshot_hint() -> Result<()> {
// check last snapshot hit file
let table = fixture.latest_default_table().await?;
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
let last_snapshot_location = fuse_table.snapshot_loc().await?.unwrap();
let last_snapshot_location = fuse_table.snapshot_loc().unwrap();
let operator = fuse_table.get_operator();
let location = fuse_table
.meta_location_generator()
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/tests/it/storages/fuse/operations/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ async fn test_fuse_purge_older_version() -> Result<()> {
let fuse_table = FuseTable::try_from_table(latest_table.as_ref())?;
let snapshot_files = fuse_table.list_snapshot_files().await?;
let time_point = now - Duration::hours(12);
let snapshot_loc = fuse_table.snapshot_loc().await?.unwrap();
let snapshot_loc = fuse_table.snapshot_loc().unwrap();
let table = fuse_table
.navigate_to_time_point(snapshot_loc, time_point, ctx.clone().get_abort_checker())
.await?;
Expand Down
11 changes: 3 additions & 8 deletions src/query/service/tests/it/storages/fuse/operations/navigate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ async fn test_fuse_navigate() -> Result<()> {
let table = fixture.latest_default_table().await?;
let first_snapshot = FuseTable::try_from_table(table.as_ref())?
.snapshot_loc()
.await?
.unwrap();

// take a nap
Expand All @@ -73,15 +72,14 @@ async fn test_fuse_navigate() -> Result<()> {
let table = fixture.latest_default_table().await?;
let second_snapshot = FuseTable::try_from_table(table.as_ref())?
.snapshot_loc()
.await?
.unwrap();
assert_ne!(second_snapshot, first_snapshot);

// 2. grab the history
let table = fixture.latest_default_table().await?;
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
let reader = MetaReaders::table_snapshot_reader(fuse_table.get_operator());
let loc = fuse_table.snapshot_loc().await?.unwrap();
let loc = fuse_table.snapshot_loc().unwrap();
assert_eq!(second_snapshot, loc);
let version = TableMetaLocationGenerator::snapshot_version(loc.as_str());
let snapshots: Vec<_> = reader
Expand Down Expand Up @@ -111,7 +109,7 @@ async fn test_fuse_navigate() -> Result<()> {
.await?;

// check we got the snapshot of the first insertion
assert_eq!(first_snapshot, tbl.snapshot_loc().await?.unwrap());
assert_eq!(first_snapshot, tbl.snapshot_loc().unwrap());

// 4. navigate beyond the first snapshot
let (first_insertion, _ver) = &snapshots[1];
Expand Down Expand Up @@ -170,7 +168,6 @@ async fn test_navigate_for_purge() -> Result<()> {
let table = fixture.latest_default_table().await?;
let _first_snapshot = FuseTable::try_from_table(table.as_ref())?
.snapshot_loc()
.await?
.unwrap();

// take a nap
Expand All @@ -187,7 +184,6 @@ async fn test_navigate_for_purge() -> Result<()> {
let table = fixture.latest_default_table().await?;
let second_snapshot = FuseTable::try_from_table(table.as_ref())?
.snapshot_loc()
.await?
.unwrap();

// take a nap
Expand All @@ -203,14 +199,13 @@ async fn test_navigate_for_purge() -> Result<()> {
let table = fixture.latest_default_table().await?;
let third_snapshot = FuseTable::try_from_table(table.as_ref())?
.snapshot_loc()
.await?
.unwrap();

// 2. grab the history
let table = fixture.latest_default_table().await?;
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
let reader = MetaReaders::table_snapshot_reader(fuse_table.get_operator());
let loc = fuse_table.snapshot_loc().await?.unwrap();
let loc = fuse_table.snapshot_loc().unwrap();
assert_eq!(third_snapshot, loc);
let version = TableMetaLocationGenerator::snapshot_version(loc.as_str());
let snapshots: Vec<_> = reader
Expand Down
87 changes: 38 additions & 49 deletions src/query/storages/fuse/src/fuse_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::sync::Arc;

use chrono::Duration;
use chrono::TimeDelta;
use databend_common_base::base::tokio;
use databend_common_base::runtime::GlobalIORuntime;
use databend_common_catalog::catalog::StorageDescription;
use databend_common_catalog::plan::DataSourcePlan;
use databend_common_catalog::plan::PartStatistics;
Expand Down Expand Up @@ -83,7 +83,6 @@ use databend_storages_common_table_meta::table::OPT_KEY_CLUSTER_TYPE;
use databend_storages_common_table_meta::table::OPT_KEY_LEGACY_SNAPSHOT_LOC;
use databend_storages_common_table_meta::table::OPT_KEY_SNAPSHOT_LOCATION;
use databend_storages_common_table_meta::table::OPT_KEY_STORAGE_FORMAT;
use databend_storages_common_table_meta::table::OPT_KEY_STORAGE_PREFIX;
use databend_storages_common_table_meta::table::OPT_KEY_TABLE_ATTACHED_DATA_URI;
use databend_storages_common_table_meta::table::OPT_KEY_TABLE_COMPRESSION;
use log::info;
Expand Down Expand Up @@ -133,9 +132,6 @@ pub struct FuseTable {

// If this is set, reading from fuse_table should only return the increment blocks
pub(crate) changes_desc: Option<ChangesDesc>,

// A table instance level cache of snapshot_location, if this table is attaching to someone else.
attached_table_location: tokio::sync::OnceCell<String>,
}

impl FuseTable {
Expand Down Expand Up @@ -172,7 +168,7 @@ impl FuseTable {
}
}

pub fn do_create(table_info: TableInfo) -> Result<Box<FuseTable>> {
pub fn do_create(mut table_info: TableInfo) -> Result<Box<FuseTable>> {
let storage_prefix = Self::parse_storage_prefix_from_table_info(&table_info)?;
let cluster_key_meta = table_info.meta.cluster_key();

Expand All @@ -184,13 +180,35 @@ impl FuseTable {
Some(sp) => {
let table_meta_options = &table_info.meta.options;

let operator = init_operator(&sp)?;

// Extract snapshot location from last snapshot hit file.
let table_type = if Self::is_table_attached(table_meta_options) {
let location = GlobalIORuntime::instance().block_on(async {
let hint =
format!("{}/{}", storage_prefix, FUSE_TBL_LAST_SNAPSHOT_HINT);
let hint_content = operator.read(&hint).await?.to_vec();
let snapshot_full_path = String::from_utf8(hint_content)?;
let operator_info = operator.info();
Ok::<_, ErrorCode>(
snapshot_full_path[operator_info.root().len()..].to_string(),
)
})?;

info!(
"extracted snapshot location {} of table {}, with id {:?} from the last snapshot hint file.",
location, table_info.desc, table_info.ident
);

// Adjust snapshot location to the values extracted from the last snapshot hint
table_info
.options_mut()
.insert(OPT_KEY_SNAPSHOT_LOCATION.to_string(), location);
FuseTableType::Attached
} else {
FuseTableType::External
};

let operator = init_operator(&sp)?;
(operator, table_type)
}
// Normal table.
Expand Down Expand Up @@ -242,7 +260,6 @@ impl FuseTable {
table_compression: table_compression.as_str().try_into()?,
table_type,
changes_desc: None,
attached_table_location: Default::default(),
}))
}

Expand Down Expand Up @@ -338,8 +355,8 @@ impl FuseTable {
&self,
reader: TableSnapshotReader,
) -> Result<Option<Arc<TableSnapshot>>> {
if let Some(loc) = self.snapshot_loc().await? {
let ver = self.snapshot_format_version(Some(loc.clone())).await?;
if let Some(loc) = self.snapshot_loc() {
let ver = self.snapshot_format_version(Some(loc.clone()))?;
let params = LoadParams {
location: loc,
len_hint: None,
Expand All @@ -353,11 +370,11 @@ impl FuseTable {
}

#[async_backtrace::framed]
pub async fn snapshot_format_version(&self, location_opt: Option<String>) -> Result<u64> {
pub fn snapshot_format_version(&self, location_opt: Option<String>) -> Result<u64> {
let location_opt = if location_opt.is_some() {
location_opt
} else {
self.snapshot_loc().await?
self.snapshot_loc()
};
// If no snapshot location here, indicates that there are no data of this table yet
// in this case, we just return the current snapshot version
Expand All @@ -366,41 +383,13 @@ impl FuseTable {
}))
}

#[async_backtrace::framed]
pub async fn snapshot_loc(&self) -> Result<Option<String>> {
match self.table_info.db_type {
DatabaseType::NormalDB => {
let options = self.table_info.options();

if let Some(storage_prefix) = options.get(OPT_KEY_STORAGE_PREFIX) {
// If the table is attaching to someone else,
// parse the snapshot location from the hint file.
//
// The snapshot location is allowed
// to be fetched from the table level instance cache.
let snapshot_location = self
.attached_table_location
.get_or_try_init(|| async {
let hint =
format!("{}/{}", storage_prefix, FUSE_TBL_LAST_SNAPSHOT_HINT);
let hint_content = self.operator.read(&hint).await?.to_vec();
let snapshot_full_path = String::from_utf8(hint_content)?;
let operator_info = self.operator.info();
Ok::<_, ErrorCode>(
snapshot_full_path[operator_info.root().len()..].to_string(),
)
})
.await?;
Ok(Some(snapshot_location.to_owned()))
} else {
Ok(options
.get(OPT_KEY_SNAPSHOT_LOCATION)
// for backward compatibility, we check the legacy table option
.or_else(|| options.get(OPT_KEY_LEGACY_SNAPSHOT_LOC))
.cloned())
}
}
}
pub fn snapshot_loc(&self) -> Option<String> {
let options = self.table_info.options();
options
.get(OPT_KEY_SNAPSHOT_LOCATION)
// for backward compatibility, we check the legacy table option
.or_else(|| options.get(OPT_KEY_LEGACY_SNAPSHOT_LOC))
.cloned()
}

pub fn get_operator(&self) -> Operator {
Expand Down Expand Up @@ -601,7 +590,7 @@ impl Table for FuseTable {
let schema = self.schema().as_ref().clone();

let prev = self.read_table_snapshot().await?;
let prev_version = self.snapshot_format_version(None).await?;
let prev_version = self.snapshot_format_version(None)?;
let prev_timestamp = prev.as_ref().and_then(|v| v.timestamp);
let prev_snapshot_id = prev.as_ref().map(|v| (v.snapshot_id, prev_version));
let prev_statistics_location = prev
Expand Down Expand Up @@ -655,7 +644,7 @@ impl Table for FuseTable {
let schema = self.schema().as_ref().clone();

let prev = self.read_table_snapshot().await?;
let prev_version = self.snapshot_format_version(None).await?;
let prev_version = self.snapshot_format_version(None)?;
let prev_timestamp = prev.as_ref().and_then(|v| v.timestamp);
let prev_statistics_location = prev
.as_ref()
Expand Down
4 changes: 2 additions & 2 deletions src/query/storages/fuse/src/operations/changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl FuseTable {
} else {
self.clone()
};
let location = source.snapshot_loc().await?;
let location = source.snapshot_loc();
let seq = match navigation {
Some(NavigationPoint::StreamInfo(info)) => info
.options()
Expand Down Expand Up @@ -338,7 +338,7 @@ impl FuseTable {
ctx: Arc<dyn TableContext>,
base: &Option<String>,
) -> Result<(Vec<Arc<BlockMeta>>, Vec<Arc<BlockMeta>>)> {
let latest = self.snapshot_loc().await?;
let latest = self.snapshot_loc();

let latest_segments = if let Some(snapshot) = latest {
let (sn, _) =
Expand Down
2 changes: 1 addition & 1 deletion src/query/storages/fuse/src/operations/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ impl FuseTable {
ctx: &Arc<dyn TableContext>,
put_cache: bool,
) -> Result<Option<RootSnapshotInfo>> {
let root_snapshot_location_op = self.snapshot_loc().await?;
let root_snapshot_location_op = self.snapshot_loc();
if root_snapshot_location_op.is_none() {
return Ok(None);
}
Expand Down
6 changes: 3 additions & 3 deletions src/query/storages/fuse/src/operations/navigate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl FuseTable {
.await
}
NavigationPoint::TimePoint(time_point) => {
let Some(location) = self.snapshot_loc().await? else {
let Some(location) = self.snapshot_loc() else {
return Err(ErrorCode::TableHistoricalDataNotFound(
"Empty Table has no historical data",
));
Expand Down Expand Up @@ -114,7 +114,7 @@ impl FuseTable {
snapshot_id: &str,
abort_checker: AbortChecker,
) -> Result<Arc<FuseTable>> {
let Some(location) = self.snapshot_loc().await? else {
let Some(location) = self.snapshot_loc() else {
return Err(ErrorCode::TableHistoricalDataNotFound(
"Empty Table has no historical data",
));
Expand Down Expand Up @@ -273,7 +273,7 @@ impl FuseTable {
));
}

let Some(location) = self.snapshot_loc().await? else {
let Some(location) = self.snapshot_loc() else {
return Err(ErrorCode::TableHistoricalDataNotFound("No historical data"));
};

Expand Down
4 changes: 2 additions & 2 deletions src/query/storages/fuse/src/operations/revert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl FuseTable {
let table_info = table_reverting_to.get_table_info();

// shortcut. if reverting to the same point, just return ok
if self.snapshot_loc().await? == table_reverting_to.snapshot_loc().await? {
if self.snapshot_loc() == table_reverting_to.snapshot_loc() {
return Ok(());
}

Expand All @@ -64,7 +64,7 @@ impl FuseTable {
let reply = catalog.update_single_table_meta(req, table_info).await;
if reply.is_ok() {
// try keeping the snapshot hit
let snapshot_location = table_reverting_to.snapshot_loc().await?.ok_or_else(|| {
let snapshot_location = table_reverting_to.snapshot_loc().ok_or_else(|| {
ErrorCode::Internal("internal error, fuse table which navigated to given point has no snapshot location")
})?;
Self::write_last_snapshot_hint(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async fn location_snapshot(
if let Some(snapshot) = maybe_snapshot {
if let Some(snapshot_id) = snapshot_id {
// prepare the stream of snapshot
let snapshot_version = tbl.snapshot_format_version(None).await?;
let snapshot_version = tbl.snapshot_format_version(None)?;
let snapshot_location = tbl
.meta_location_generator
.snapshot_location_from_uuid(&snapshot.snapshot_id, snapshot_version)?;
Expand Down
Loading

0 comments on commit 371d2a2

Please sign in to comment.