Skip to content

Commit

Permalink
use Weak ptr to break catalog list <> info schema cyclic reference (#…
Browse files Browse the repository at this point in the history
…681)

Fixes #680.
  • Loading branch information
crepererum authored Jul 7, 2021
1 parent 0368f59 commit 3664766
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 8 deletions.
16 changes: 10 additions & 6 deletions datafusion/src/catalog/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
//!
//! Information Schema](https://en.wikipedia.org/wiki/Information_schema)
use std::{any, sync::Arc};
use std::{
any,
sync::{Arc, Weak},
};

use arrow::{
array::{StringBuilder, UInt64Builder},
Expand All @@ -41,14 +44,14 @@ const COLUMNS: &str = "columns";
/// Wraps another [`CatalogProvider`] and adds a "information_schema"
/// schema that can introspect on tables in the catalog_list
pub(crate) struct CatalogWithInformationSchema {
catalog_list: Arc<dyn CatalogList>,
catalog_list: Weak<dyn CatalogList>,
/// wrapped provider
inner: Arc<dyn CatalogProvider>,
}

impl CatalogWithInformationSchema {
pub(crate) fn new(
catalog_list: Arc<dyn CatalogList>,
catalog_list: Weak<dyn CatalogList>,
inner: Arc<dyn CatalogProvider>,
) -> Self {
Self {
Expand All @@ -73,9 +76,10 @@ impl CatalogProvider for CatalogWithInformationSchema {

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
if name.eq_ignore_ascii_case(INFORMATION_SCHEMA) {
Some(Arc::new(InformationSchemaProvider {
catalog_list: self.catalog_list.clone(),
}))
Weak::upgrade(&self.catalog_list).map(|catalog_list| {
Arc::new(InformationSchemaProvider { catalog_list })
as Arc<dyn SchemaProvider>
})
} else {
self.inner.schema(name)
}
Expand Down
28 changes: 26 additions & 2 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ impl ExecutionContext {

let default_catalog: Arc<dyn CatalogProvider> = if config.information_schema {
Arc::new(CatalogWithInformationSchema::new(
catalog_list.clone(),
Arc::downgrade(&catalog_list),
Arc::new(default_catalog),
))
} else {
Expand Down Expand Up @@ -346,7 +346,7 @@ impl ExecutionContext {
let state = self.state.lock().unwrap();
let catalog = if state.config.information_schema {
Arc::new(CatalogWithInformationSchema::new(
state.catalog_list.clone(),
Arc::downgrade(&state.catalog_list),
catalog,
))
} else {
Expand Down Expand Up @@ -924,6 +924,7 @@ mod tests {
use arrow::datatypes::*;
use arrow::record_batch::RecordBatch;
use std::fs::File;
use std::sync::Weak;
use std::thread::{self, JoinHandle};
use std::{io::prelude::*, sync::Mutex};
use tempfile::TempDir;
Expand Down Expand Up @@ -3364,6 +3365,29 @@ mod tests {
assert_batches_sorted_eq!(expected, &result);
}

#[tokio::test]
async fn catalogs_not_leaked() {
// the information schema used to introduce cyclic Arcs
let ctx = ExecutionContext::with_config(
ExecutionConfig::new().with_information_schema(true),
);

// register a single catalog
let catalog = Arc::new(MemoryCatalogProvider::new());
let catalog_weak = Arc::downgrade(&catalog);
ctx.register_catalog("my_catalog", catalog);

let catalog_list_weak = {
let state = ctx.state.lock().unwrap();
Arc::downgrade(&state.catalog_list)
};

drop(ctx);

assert_eq!(Weak::strong_count(&catalog_list_weak), 0);
assert_eq!(Weak::strong_count(&catalog_weak), 0);
}

struct MyPhysicalPlanner {}

impl PhysicalPlanner for MyPhysicalPlanner {
Expand Down

0 comments on commit 3664766

Please sign in to comment.