Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename ExecutionContext to SessionContext, ExecutionContextState to SessionState, add TaskContext to support multi-tenancy configurations - Part 1 #1987

Merged
merged 1 commit into from
Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions ballista/rust/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use datafusion::datasource::TableProvider;
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_plan::{CreateExternalTable, LogicalPlan, TableScan};
use datafusion::prelude::{
AvroReadOptions, CsvReadOptions, ExecutionConfig, ExecutionContext,
AvroReadOptions, CsvReadOptions, SessionConfig, SessionContext,
};
use datafusion::sql::parser::{DFParser, FileType, Statement as DFStatement};

Expand Down Expand Up @@ -304,8 +304,8 @@ impl BallistaContext {
// the show tables、 show columns sql can not run at scheduler because the tables is store at client
if is_show {
let state = self.state.lock();
ctx = ExecutionContext::with_config(
ExecutionConfig::new().with_information_schema(
ctx = SessionContext::with_config(
SessionConfig::new().with_information_schema(
state.config.default_with_information_schema(),
),
);
Expand Down
4 changes: 2 additions & 2 deletions ballista/rust/core/src/execution_plans/distributed_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use datafusion::physical_plan::{

use crate::serde::{AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec};
use async_trait::async_trait;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::context::TaskContext;
use futures::future;
use futures::StreamExt;
use log::{error, info};
Expand Down Expand Up @@ -150,7 +150,7 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
async fn execute(
&self,
partition: usize,
_runtime: Arc<RuntimeEnv>,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
assert_eq!(0, partition);

Expand Down
4 changes: 2 additions & 2 deletions ballista/rust/core/src/execution_plans/shuffle_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use crate::utils::WrappedStream;
use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;

use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{
ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
Expand All @@ -39,6 +38,7 @@ use datafusion::{
};
use futures::{future, StreamExt};

use datafusion::execution::context::TaskContext;
use log::info;

/// ShuffleReaderExec reads partitions that have already been materialized by a ShuffleWriterExec
Expand Down Expand Up @@ -106,7 +106,7 @@ impl ExecutionPlan for ShuffleReaderExec {
async fn execute(
&self,
partition: usize,
_runtime: Arc<RuntimeEnv>,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
info!("ShuffleReaderExec::execute({})", partition);

Expand Down
21 changes: 12 additions & 9 deletions ballista/rust/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};

use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_plan::common::IPCWriter;
use datafusion::physical_plan::hash_utils::create_hashes;
use datafusion::physical_plan::memory::MemoryStream;
Expand All @@ -55,6 +54,7 @@ use datafusion::physical_plan::{
};
use futures::StreamExt;

use datafusion::execution::context::TaskContext;
use log::{debug, info};

/// ShuffleWriterExec represents a section of a query plan that has consistent partitioning and
Expand Down Expand Up @@ -138,11 +138,11 @@ impl ShuffleWriterExec {
pub async fn execute_shuffle_write(
&self,
input_partition: usize,
runtime: Arc<RuntimeEnv>,
context: Arc<TaskContext>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the idea of having a TaskContext that can have per-plan / per-task state (in addition to overall RuntimeEnv) is a significant improvement

) -> Result<Vec<ShuffleWritePartition>> {
let now = Instant::now();

let mut stream = self.plan.execute(input_partition, runtime).await?;
let mut stream = self.plan.execute(input_partition, context).await?;

let mut path = PathBuf::from(&self.work_dir);
path.push(&self.job_id);
Expand Down Expand Up @@ -358,9 +358,9 @@ impl ExecutionPlan for ShuffleWriterExec {
async fn execute(
&self,
partition: usize,
runtime: Arc<RuntimeEnv>,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let part_loc = self.execute_shuffle_write(partition, runtime).await?;
let part_loc = self.execute_shuffle_write(partition, context).await?;

// build metadata result batch
let num_writers = part_loc.len();
Expand Down Expand Up @@ -448,11 +448,13 @@ mod tests {
use datafusion::physical_plan::expressions::Column;

use datafusion::physical_plan::memory::MemoryExec;
use datafusion::prelude::SessionContext;
use tempfile::TempDir;

#[tokio::test]
async fn test() -> Result<()> {
let runtime = Arc::new(RuntimeEnv::default());
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();

let input_plan = Arc::new(CoalescePartitionsExec::new(create_input_plan()?));
let work_dir = TempDir::new()?;
Expand All @@ -463,7 +465,7 @@ mod tests {
work_dir.into_path().to_str().unwrap().to_owned(),
Some(Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 2)),
)?;
let mut stream = query_stage.execute(0, runtime).await?;
let mut stream = query_stage.execute(0, task_ctx).await?;
let batches = utils::collect_stream(&mut stream)
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
Expand Down Expand Up @@ -506,7 +508,8 @@ mod tests {

#[tokio::test]
async fn test_partitioned() -> Result<()> {
let runtime = Arc::new(RuntimeEnv::default());
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();

let input_plan = create_input_plan()?;
let work_dir = TempDir::new()?;
Expand All @@ -517,7 +520,7 @@ mod tests {
work_dir.into_path().to_str().unwrap().to_owned(),
Some(Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 2)),
)?;
let mut stream = query_stage.execute(0, runtime).await?;
let mut stream = query_stage.execute(0, task_ctx).await?;
let batches = utils::collect_stream(&mut stream)
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
Expand Down
4 changes: 2 additions & 2 deletions ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::sync::Arc;
use async_trait::async_trait;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW nice job catching all instances of `ExecutionContext

-*- mode: grep; default-directory: "~/Software/arrow-datafusion/" -*-
Grep started at Sat Mar 12 06:43:13

rg -n -H --no-heading -e 'ExecutionContext' $(git rev-parse --show-toplevel || pwd)

Grep finished with no matches found at Sat Mar 12 06:43:13

use datafusion::arrow::datatypes::SchemaRef;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
Expand Down Expand Up @@ -104,7 +104,7 @@ impl ExecutionPlan for UnresolvedShuffleExec {
async fn execute(
&self,
_partition: usize,
_runtime: Arc<RuntimeEnv>,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
Err(DataFusionError::Plan(
"Ballista UnresolvedShuffleExec does not support execution".to_owned(),
Expand Down
8 changes: 4 additions & 4 deletions ballista/rust/core/src/serde/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use datafusion::logical_plan::{
Column, CreateExternalTable, CrossJoin, Expr, JoinConstraint, Limit, LogicalPlan,
LogicalPlanBuilder, Repartition, TableScan, Values,
};
use datafusion::prelude::ExecutionContext;
use datafusion::prelude::SessionContext;

use prost::bytes::BufMut;
use prost::Message;
Expand Down Expand Up @@ -70,7 +70,7 @@ impl AsLogicalPlan for LogicalPlanNode {

fn try_into_logical_plan(
&self,
ctx: &ExecutionContext,
ctx: &SessionContext,
extension_codec: &dyn LogicalExtensionCodec,
) -> Result<LogicalPlan, BallistaError> {
let plan = self.logical_plan_type.as_ref().ok_or_else(|| {
Expand Down Expand Up @@ -920,7 +920,7 @@ mod roundtrip_tests {
roundtrip_test!($initial_struct, protobuf::LogicalPlanNode, $struct_type);
};
($initial_struct:ident) => {
let ctx = ExecutionContext::new();
let ctx = SessionContext::new();
let codec: BallistaCodec<
protobuf::LogicalPlanNode,
protobuf::PhysicalPlanNode,
Expand Down Expand Up @@ -1252,7 +1252,7 @@ mod roundtrip_tests {

#[tokio::test]
async fn roundtrip_logical_plan_custom_ctx() -> Result<()> {
let ctx = ExecutionContext::new();
let ctx = SessionContext::new();
let codec: BallistaCodec<protobuf::LogicalPlanNode, protobuf::PhysicalPlanNode> =
BallistaCodec::default();
let custom_object_store = Arc::new(TestObjectStore {});
Expand Down
23 changes: 11 additions & 12 deletions ballista/rust/core/src/serde/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::{error::BallistaError, serde::scheduler::Action as BallistaAction};

use datafusion::logical_plan::plan::Extension;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::ExecutionContext;
use datafusion::prelude::SessionContext;
use prost::Message;

// include the generated protobuf source as a submodule
Expand Down Expand Up @@ -67,7 +67,7 @@ pub trait AsLogicalPlan: Debug + Send + Sync + Clone {

fn try_into_logical_plan(
&self,
ctx: &ExecutionContext,
ctx: &SessionContext,
extension_codec: &dyn LogicalExtensionCodec,
) -> Result<LogicalPlan, BallistaError>;

Expand Down Expand Up @@ -130,7 +130,7 @@ pub trait AsExecutionPlan: Debug + Send + Sync + Clone {

fn try_into_physical_plan(
&self,
ctx: &ExecutionContext,
ctx: &SessionContext,
extension_codec: &dyn PhysicalExtensionCodec,
) -> Result<Arc<dyn ExecutionPlan>, BallistaError>;

Expand Down Expand Up @@ -345,8 +345,7 @@ mod tests {
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::datasource::object_store::local::LocalFileSystem;
use datafusion::error::DataFusionError;
use datafusion::execution::context::{ExecutionContextState, QueryPlanner};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::context::{QueryPlanner, SessionState, TaskContext};
use datafusion::logical_plan::plan::Extension;
use datafusion::logical_plan::{
col, DFSchemaRef, Expr, LogicalPlan, LogicalPlanBuilder, UserDefinedLogicalNode,
Expand All @@ -357,7 +356,7 @@ mod tests {
DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PhysicalPlanner,
SendableRecordBatchStream, Statistics,
};
use datafusion::prelude::{CsvReadOptions, ExecutionConfig, ExecutionContext};
use datafusion::prelude::{CsvReadOptions, SessionConfig, SessionContext};
use prost::Message;
use std::any::Any;

Expand Down Expand Up @@ -512,7 +511,7 @@ mod tests {
async fn execute(
&self,
_partition: usize,
_runtime: Arc<RuntimeEnv>,
_context: Arc<TaskContext>,
) -> datafusion::error::Result<SendableRecordBatchStream> {
Err(DataFusionError::NotImplemented(
"not implemented".to_string(),
Expand Down Expand Up @@ -548,7 +547,7 @@ mod tests {
node: &dyn UserDefinedLogicalNode,
logical_inputs: &[&LogicalPlan],
physical_inputs: &[Arc<dyn ExecutionPlan>],
_ctx_state: &ExecutionContextState,
_session_state: &SessionState,
) -> datafusion::error::Result<Option<Arc<dyn ExecutionPlan>>> {
Ok(
if let Some(topk_node) = node.as_any().downcast_ref::<TopKPlanNode>() {
Expand All @@ -575,7 +574,7 @@ mod tests {
async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
ctx_state: &ExecutionContextState,
session_state: &SessionState,
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
// Teach the default physical planner how to plan TopK nodes.
let physical_planner =
Expand All @@ -584,7 +583,7 @@ mod tests {
)]);
// Delegate most work of physical planning to the default physical planner
physical_planner
.create_physical_plan(logical_plan, ctx_state)
.create_physical_plan(logical_plan, session_state)
.await
}
}
Expand Down Expand Up @@ -694,9 +693,9 @@ mod tests {
async fn test_extension_plan() -> crate::error::Result<()> {
let store = Arc::new(LocalFileSystem {});
let config =
ExecutionConfig::new().with_query_planner(Arc::new(TopKQueryPlanner {}));
SessionConfig::new().with_query_planner(Arc::new(TopKQueryPlanner {}));

let ctx = ExecutionContext::with_config(config);
let ctx = SessionContext::with_config(config);

let scan = LogicalPlanBuilder::scan_csv(
store,
Expand Down
24 changes: 5 additions & 19 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,10 @@ use crate::serde::{from_proto_binary_op, proto_error, protobuf};
use crate::{convert_box_required, convert_required};
use chrono::{TimeZone, Utc};

use datafusion::catalog::catalog::{CatalogList, MemoryCatalogList};
use datafusion::datasource::object_store::local::LocalFileSystem;
use datafusion::datasource::object_store::{FileMeta, ObjectStoreRegistry, SizedFile};
use datafusion::datasource::object_store::{FileMeta, SizedFile};
use datafusion::datasource::PartitionedFile;
use datafusion::execution::context::{
ExecutionConfig, ExecutionContextState, ExecutionProps,
};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::context::SessionState;

use datafusion::physical_plan::file_format::FileScanConfig;

Expand Down Expand Up @@ -157,22 +153,12 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc<dyn PhysicalExpr> {
.map(|x| x.try_into())
.collect::<Result<Vec<_>, _>>()?;

let catalog_list =
Arc::new(MemoryCatalogList::new()) as Arc<dyn CatalogList>;

let ctx_state = ExecutionContextState {
catalog_list,
scalar_functions: Default::default(),
aggregate_functions: Default::default(),
config: ExecutionConfig::new(),
execution_props: ExecutionProps::new(),
object_store_registry: Arc::new(ObjectStoreRegistry::new()),
runtime_env: Arc::new(RuntimeEnv::default()),
};
// TODO Do not create new the SessionState
let session_state = SessionState::new();

let fun_expr = functions::create_physical_fun(
&(&scalar_function).into(),
&ctx_state.execution_props,
&session_state.execution_props,
)?;

Arc::new(ScalarFunctionExpr::new(
Expand Down
10 changes: 5 additions & 5 deletions ballista/rust/core/src/serde/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use datafusion::physical_plan::windows::{create_window_expr, WindowAggExec};
use datafusion::physical_plan::{
AggregateExpr, ExecutionPlan, Partitioning, PhysicalExpr, WindowExpr,
};
use datafusion::prelude::ExecutionContext;
use datafusion::prelude::SessionContext;
use prost::bytes::BufMut;
use prost::Message;
use std::convert::TryInto;
Expand Down Expand Up @@ -87,7 +87,7 @@ impl AsExecutionPlan for PhysicalPlanNode {

fn try_into_physical_plan(
&self,
ctx: &ExecutionContext,
ctx: &SessionContext,
extension_codec: &dyn PhysicalExtensionCodec,
) -> Result<Arc<dyn ExecutionPlan>, BallistaError> {
let plan = self.physical_plan_type.as_ref().ok_or_else(|| {
Expand Down Expand Up @@ -883,7 +883,7 @@ impl AsExecutionPlan for PhysicalPlanNode {

fn decode_scan_config(
proto: &protobuf::FileScanExecConf,
ctx: &ExecutionContext,
ctx: &SessionContext,
) -> Result<FileScanConfig, BallistaError> {
let schema = Arc::new(convert_required!(proto.schema)?);
let projection = proto
Expand Down Expand Up @@ -940,7 +940,7 @@ mod roundtrip_tests {
use datafusion::datasource::object_store::local::LocalFileSystem;
use datafusion::datasource::PartitionedFile;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::prelude::ExecutionContext;
use datafusion::prelude::SessionContext;
use datafusion::{
arrow::{
compute::kernels::sort::SortOptions,
Expand Down Expand Up @@ -969,7 +969,7 @@ mod roundtrip_tests {
use crate::serde::protobuf::{LogicalPlanNode, PhysicalPlanNode};

fn roundtrip_test(exec_plan: Arc<dyn ExecutionPlan>) -> Result<()> {
let ctx = ExecutionContext::new();
let ctx = SessionContext::new();
let codec: BallistaCodec<LogicalPlanNode, PhysicalPlanNode> =
BallistaCodec::default();
let proto: protobuf::PhysicalPlanNode =
Expand Down
Loading