Skip to content

Commit

Permalink
Deprecate RuntimeConfig, update code to use new builder style
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Dec 4, 2024
1 parent dc4ae20 commit 32113c9
Show file tree
Hide file tree
Showing 11 changed files with 80 additions and 75 deletions.
12 changes: 9 additions & 3 deletions benchmarks/src/bin/external_aggr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ use datafusion::datasource::{MemTable, TableProvider};
use datafusion::error::Result;
use datafusion::execution::memory_pool::FairSpillPool;
use datafusion::execution::memory_pool::{human_readable_size, units};
use datafusion::execution::runtime_env::RuntimeConfig;
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::execution::SessionStateBuilder;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{collect, displayable};
use datafusion::prelude::*;
Expand Down Expand Up @@ -195,10 +196,15 @@ impl ExternalAggrConfig {
let query_name =
format!("Q{query_id}({})", human_readable_size(mem_limit as usize));
let config = self.common.config();
let runtime_config = RuntimeConfig::new()
let runtime_env = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(FairSpillPool::new(mem_limit as usize)))
.build_arc()?;
let ctx = SessionContext::new_with_config_rt(config, runtime_config);
let state = SessionStateBuilder::new()
.with_config(config)
.with_runtime_env(runtime_env)
.with_default_features()
.build();
let ctx = SessionContext::from(state);

// register tables
self.register_tables(&ctx).await?;
Expand Down
10 changes: 6 additions & 4 deletions benchmarks/src/sort_tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use datafusion::datasource::listing::{
};
use datafusion::datasource::{MemTable, TableProvider};
use datafusion::error::Result;
use datafusion::execution::runtime_env::RuntimeConfig;
use datafusion::execution::SessionStateBuilder;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{displayable, execute_stream};
use datafusion::prelude::*;
Expand Down Expand Up @@ -188,9 +188,11 @@ impl RunOpt {
/// Benchmark query `query_id` in `SORT_QUERIES`
async fn benchmark_query(&self, query_id: usize) -> Result<Vec<QueryResult>> {
let config = self.common.config();

let runtime_config = RuntimeConfig::new().build_arc()?;
let ctx = SessionContext::new_with_config_rt(config, runtime_config);
let state = SessionStateBuilder::new()
.with_config(config)
.with_default_features()
.build();
let ctx = SessionContext::from(state);

// register tables
self.register_tables(&ctx).await?;
Expand Down
34 changes: 13 additions & 21 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use std::sync::{Arc, OnceLock};

use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::SessionConfig;
use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool};
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool, MemoryPool};
use datafusion::execution::runtime_env::{RuntimeEnvBuilder};
use datafusion::prelude::SessionContext;
use datafusion_cli::catalog::DynamicObjectStoreCatalog;
use datafusion_cli::functions::ParquetMetadataFunc;
Expand Down Expand Up @@ -156,26 +156,22 @@ async fn main_inner() -> Result<()> {
session_config = session_config.with_batch_size(batch_size);
};

let rt_config = RuntimeConfig::new();
let rt_config =
// set memory pool size
if let Some(memory_limit) = args.memory_limit {
// set memory pool type
match args.mem_pool_type {
PoolType::Fair => rt_config
.with_memory_pool(Arc::new(FairSpillPool::new(memory_limit))),
PoolType::Greedy => rt_config
.with_memory_pool(Arc::new(GreedyMemoryPool::new(memory_limit)))
}
} else {
rt_config
let mut rt_builder = RuntimeEnvBuilder::new();
// set memory pool size
if let Some(memory_limit) = args.memory_limit {
// set memory pool type
let pool: Arc<dyn MemoryPool> = match args.mem_pool_type {
PoolType::Fair => Arc::new(FairSpillPool::new(memory_limit)),
PoolType::Greedy => Arc::new(GreedyMemoryPool::new(memory_limit)),
};
rt_builder = rt_builder.with_memory_pool(pool)
}

let runtime_env = create_runtime_env(rt_config.clone())?;
let runtime_env = rt_builder.build_arc()?;

// enable dynamic file query
let ctx =
SessionContext::new_with_config_rt(session_config.clone(), Arc::new(runtime_env))
SessionContext::new_with_config_rt(session_config.clone(), runtime_env)
.enable_url_table();
ctx.refresh_catalogs().await?;
// install dynamic catalog provider that can register required object stores
Expand Down Expand Up @@ -231,10 +227,6 @@ async fn main_inner() -> Result<()> {
Ok(())
}

fn create_runtime_env(rn_config: RuntimeConfig) -> Result<RuntimeEnv> {
RuntimeEnv::try_new(rn_config)
}

fn parse_valid_file(dir: &str) -> Result<String, String> {
if Path::new(dir).is_file() {
Ok(dir.to_string())
Expand Down
3 changes: 0 additions & 3 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,6 @@ mod tests {
use datafusion_common::cast::as_string_array;
use datafusion_common::internal_err;
use datafusion_common::stats::Precision;
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use datafusion_expr::{col, lit};

use chrono::DateTime;
Expand Down Expand Up @@ -984,12 +983,10 @@ mod tests {
async fn query_compress_data(
file_compression_type: FileCompressionType,
) -> Result<()> {
let runtime = Arc::new(RuntimeEnvBuilder::new().build()?);
let mut cfg = SessionConfig::new();
cfg.options_mut().catalog.has_header = true;
let session_state = SessionStateBuilder::new()
.with_config(cfg)
.with_runtime_env(runtime)
.with_default_features()
.build();
let integration = LocalFileSystem::new_with_prefix(arrow_test_data()).unwrap();
Expand Down
3 changes: 0 additions & 3 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1792,7 +1792,6 @@ mod tests {
use super::{super::options::CsvReadOptions, *};
use crate::assert_batches_eq;
use crate::execution::memory_pool::MemoryConsumer;
use crate::execution::runtime_env::RuntimeEnvBuilder;
use crate::test;
use crate::test_util::{plan_and_collect, populate_csv_partitions};
use arrow_schema::{DataType, TimeUnit};
Expand Down Expand Up @@ -1932,14 +1931,12 @@ mod tests {
let path = path.join("tests/tpch-csv");
let url = format!("file://{}", path.display());

let runtime = RuntimeEnvBuilder::new().build_arc()?;
let cfg = SessionConfig::new()
.set_str("datafusion.catalog.location", url.as_str())
.set_str("datafusion.catalog.format", "CSV")
.set_str("datafusion.catalog.has_header", "true");
let session_state = SessionStateBuilder::new()
.with_config(cfg)
.with_runtime_env(runtime)
.with_default_features()
.build();
let ctx = SessionContext::new_with_state(session_state);
Expand Down
3 changes: 1 addition & 2 deletions datafusion/execution/src/disk_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! Manages files generated during query execution, files are
//! hashed among the directories listed in RuntimeConfig::local_dirs.
//! [`DiskManager`]: Manages files generated during query execution
use datafusion_common::{resources_datafusion_err, DataFusionError, Result};
use log::debug;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/execution/src/memory_pool/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub struct GreedyMemoryPool {
}

impl GreedyMemoryPool {
/// Allocate up to `limit` bytes
/// Create a new pool that can allocate up to `pool_size` bytes
pub fn new(pool_size: usize) -> Self {
debug!("Created new GreedyMemoryPool(pool_size={pool_size})");
Self {
Expand Down
72 changes: 44 additions & 28 deletions datafusion/execution/src/runtime_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,32 @@ use url::Url;
/// Execution runtime environment that manages system resources such
/// as memory, disk, cache and storage.
///
/// A [`RuntimeEnv`] is created from a [`RuntimeEnvBuilder`] and has the
/// A [`RuntimeEnv`] can be created using [`RuntimeEnvBuilder`] and has the
/// following resource management functionality:
///
/// * [`MemoryPool`]: Manage memory
/// * [`DiskManager`]: Manage temporary files on local disk
/// * [`CacheManager`]: Manage temporary cache data during the session lifetime
/// * [`ObjectStoreRegistry`]: Manage mapping URLs to object store instances
///
/// # Example: Create default `RuntimeEnv`
/// ```
/// # use datafusion_execution::runtime_env::RuntimeEnv;
/// let runtime_env = RuntimeEnv::default();
/// ```
///
/// # Example: Create a `RuntimeEnv` from [`RuntimeEnvBuilder`] with a new memory pool
/// ```
/// # use std::sync::Arc;
/// # use datafusion_execution::memory_pool::GreedyMemoryPool;
/// # use datafusion_execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
/// // restrict to using at most 100MB of memory
/// let pool_size = 100 * 1024 * 1024;
/// let runtime_env = RuntimeEnvBuilder::new()
/// .with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size)))
/// .build()
/// .unwrap();
/// ```
pub struct RuntimeEnv {
/// Runtime memory management
pub memory_pool: Arc<dyn MemoryPool>,
Expand All @@ -66,28 +85,16 @@ impl Debug for RuntimeEnv {
}

impl RuntimeEnv {
#[deprecated(since = "43.0.0", note = "please use `try_new` instead")]
#[deprecated(since = "43.0.0", note = "please use `RuntimeEnvBuilder` instead")]
#[allow(deprecated)]
pub fn new(config: RuntimeConfig) -> Result<Self> {
Self::try_new(config)
}
/// Create env based on configuration
#[deprecated(since = "44.0.0", note = "please use `RuntimeEnvBuilder` instead")]
#[allow(deprecated)]
pub fn try_new(config: RuntimeConfig) -> Result<Self> {
let RuntimeConfig {
memory_pool,
disk_manager,
cache_manager,
object_store_registry,
} = config;

let memory_pool =
memory_pool.unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default()));

Ok(Self {
memory_pool,
disk_manager: DiskManager::try_new(disk_manager)?,
cache_manager: CacheManager::try_new(&cache_manager)?,
object_store_registry,
})
config.build()
}

/// Registers a custom `ObjectStore` to be used with a specific url.
Expand All @@ -104,7 +111,7 @@ impl RuntimeEnv {
/// # use std::sync::Arc;
/// # use url::Url;
/// # use datafusion_execution::runtime_env::RuntimeEnv;
/// # let runtime_env = RuntimeEnv::try_new(Default::default()).unwrap();
/// # let runtime_env = RuntimeEnv::default();
/// let url = Url::try_from("file://").unwrap();
/// let object_store = object_store::local::LocalFileSystem::new();
/// // register the object store with the runtime environment
Expand All @@ -119,11 +126,12 @@ impl RuntimeEnv {
/// # use std::sync::Arc;
/// # use url::Url;
/// # use datafusion_execution::runtime_env::RuntimeEnv;
/// # let runtime_env = RuntimeEnv::try_new(Default::default()).unwrap();
/// # let runtime_env = RuntimeEnv::default();
/// # // use local store for example as http feature is not enabled
/// # let http_store = object_store::local::LocalFileSystem::new();
/// // create a new object store via object_store::http::HttpBuilder;
/// let base_url = Url::parse("https://github.com").unwrap();
/// // (note this example can't depend on the http feature)
/// // let http_store = HttpBuilder::new()
/// // .with_url(base_url.clone())
/// // .build()
Expand Down Expand Up @@ -155,12 +163,15 @@ impl Default for RuntimeEnv {
}
}

/// Please see: <https://github.com/apache/datafusion/issues/12156>
/// Please see: <https://github.com/apache/datafusion/issues/12156a>
/// This a type alias for backwards compatibility.
#[deprecated(since = "43.0.0", note = "please use `RuntimeEnvBuilder` instead")]
pub type RuntimeConfig = RuntimeEnvBuilder;

#[derive(Clone)]
/// Execution runtime configuration
/// Execution runtime configuration builder.
///
/// See example on [`RuntimeEnv`]
pub struct RuntimeEnvBuilder {
/// DiskManager to manage temporary disk file usage
pub disk_manager: DiskManagerConfig,
Expand Down Expand Up @@ -239,15 +250,20 @@ impl RuntimeEnvBuilder {

/// Build a RuntimeEnv
pub fn build(self) -> Result<RuntimeEnv> {
let memory_pool = self
.memory_pool
.unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default()));
let Self {
disk_manager,
memory_pool,
cache_manager,
object_store_registry,
} = self;
let memory_pool =
memory_pool.unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default()));

Ok(RuntimeEnv {
memory_pool,
disk_manager: DiskManager::try_new(self.disk_manager)?,
cache_manager: CacheManager::try_new(&self.cache_manager)?,
object_store_registry: self.object_store_registry,
disk_manager: DiskManager::try_new(disk_manager)?,
cache_manager: CacheManager::try_new(&cache_manager)?,
object_store_registry,
})
}

Expand Down
10 changes: 3 additions & 7 deletions datafusion/execution/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@
// under the License.

use crate::{
config::SessionConfig,
memory_pool::MemoryPool,
registry::FunctionRegistry,
runtime_env::{RuntimeEnv, RuntimeEnvBuilder},
config::SessionConfig, memory_pool::MemoryPool, registry::FunctionRegistry,
runtime_env::RuntimeEnv,
};
use datafusion_common::{plan_datafusion_err, DataFusionError, Result};
use datafusion_expr::planner::ExprPlanner;
Expand Down Expand Up @@ -54,9 +52,7 @@ pub struct TaskContext {

impl Default for TaskContext {
fn default() -> Self {
let runtime = RuntimeEnvBuilder::new()
.build_arc()
.expect("default runtime created successfully");
let runtime = Arc::new(RuntimeEnv::default());

// Create a default task context, mostly useful for testing
Self {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1429,7 +1429,7 @@ mod tests {

fn new_spill_ctx(batch_size: usize, max_memory: usize) -> Arc<TaskContext> {
let session_config = SessionConfig::new().with_batch_size(batch_size);
let runtime = RuntimeEnvBuilder::default()
let runtime = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(FairSpillPool::new(max_memory)))
.build_arc()
.unwrap();
Expand Down Expand Up @@ -1914,7 +1914,7 @@ mod tests {
let input: Arc<dyn ExecutionPlan> = Arc::new(TestYieldingExec::new(true));
let input_schema = input.schema();

let runtime = RuntimeEnvBuilder::default()
let runtime = RuntimeEnvBuilder::new()
.with_memory_limit(1, 1.0)
.build_arc()?;
let task_ctx = TaskContext::default().with_runtime(runtime);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
|| matches!(tz_info, TimezoneInfo::WithTimeZone)
{
// Timestamp With Time Zone
// INPUT : [SQLDataType] TimestampTz + [RuntimeConfig] Time Zone
// INPUT : [SQLDataType] TimestampTz + [Config] Time Zone
// OUTPUT: [ArrowDataType] Timestamp<TimeUnit, Some(Time Zone)>
self.context_provider.options().execution.time_zone.clone()
} else {
Expand Down

0 comments on commit 32113c9

Please sign in to comment.