Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename ExecutionContext to SessionContext, ExecutionContextState to SessionState, add TaskContext to support multi-tenancy configurations - Part 1 #1987

Merged
merged 1 commit into from
Mar 16, 2022

Conversation

mingmwang
Copy link
Contributor

@mingmwang mingmwang commented Mar 11, 2022

Which issue does this PR close?

Closes #1862.

This PR covers the part 1 which are the trivial changes and UT fixes.

  1. Rename from ExecutionContext to SessionContext and ExecutionContextState to SessionState, Add TaskContext
    and wrap the RuntimeEnv into TaskContext and pass down TaskContext into ExecutionPlan's execute() method, fix all the trivial UTs.

Rationale for this change

See #1862 as well as the discussion on a PR that includes more context for the other parts: #1924

What changes are included in this PR?

See above

Are there any user-facing changes?

Yes

@github-actions github-actions bot added ballista datafusion Changes in the datafusion crate labels Mar 11, 2022
@mingmwang
Copy link
Contributor Author

@alamb @yjshen @houqp

Please help to take a look.

@mingmwang
Copy link
Contributor Author

@andygrove

@yahoNanJing
Copy link
Contributor

Look good to introduce the TaskContext wrapping the RuntimeEnv for ExecutionPlan execute interface.

With renaming ExecutionContext to SessionContext is also much meaningful for future multi-tenancy configurations.

If it's not misleading, how about changing the TaskContext to ExecutionContext which may be better for the consistency of the ExecutionPlan. Then the change will be:

  • ExecutionContext -----> SessionContext
  • RuntimeEnv -----> ExecutionContext wrapping RuntimeEnv

@alamb alamb changed the title Refactor ExecutionContext and related conf to support multi-tenancy configurations - Part 1 Rename ExecutionContext to SessionContext, ExecutionContextState to SessionState, add TaskContext to support multi-tenancy configurations - Part 1 Mar 12, 2022
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @mingmwang

While this PR will likely cause some non trivial churn and pain in downstream users of DataFusion, I believe it is a necessary refactor to improve the overall situation to set DataFusion up to support limited production multi-tenancy (running multiple plans concurrently in the same process). The current ExecutionContext has grown organically over time and has gotten quite haphazard. The naming in this PR is much clearer I think.

Thank you for pushing this forward.

I'll try and get a POC up of one such downstream project (IOx) to validate this approach works for at least one non trivial user of DataFusion.

I also recommend we leave this open for a few days so anyone with an opinion can weigh in. I'll also make a mailing list / slack email to get some more eyes on it.

cc @andygrove @rdettai @xudong963 @liukun4515 @houqp @jimexist @matthewmturner @yjshen

@@ -138,11 +138,11 @@ impl ShuffleWriterExec {
pub async fn execute_shuffle_write(
&self,
input_partition: usize,
runtime: Arc<RuntimeEnv>,
context: Arc<TaskContext>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the idea of having a TaskContext that can have per-plan / per-task state (in addition to overall RuntimeEnv) is a significant improvement

@@ -21,7 +21,7 @@ use std::sync::Arc;
use async_trait::async_trait;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW nice job catching all instances of `ExecutionContext

-*- mode: grep; default-directory: "~/Software/arrow-datafusion/" -*-
Grep started at Sat Mar 12 06:43:13

rg -n -H --no-heading -e 'ExecutionContext' $(git rev-parse --show-toplevel || pwd)

Grep finished with no matches found at Sat Mar 12 06:43:13

@@ -177,6 +179,20 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
);
info!("Start to run task {}", task_id_log);

let runtime = self.executor.ctx.runtime_env();

//TODO get session_id from TaskDefinition
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a TODO for a future PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, will cover this in a future PR.

let sql = get_query_sql(query)?;
ctx.create_logical_plan(&sql)
}

async fn execute_query(
ctx: &mut ExecutionContext,
ctx: &SessionContext,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is a nice improvement to remove some of this mut 👍

@@ -56,7 +56,7 @@
- Add `approx_quantile` support [\#1538](https://github.com/apache/arrow-datafusion/issues/1538)
- support sorting decimal data type [\#1522](https://github.com/apache/arrow-datafusion/issues/1522)
- Keep all datafusion's packages up to date with Dependabot [\#1472](https://github.com/apache/arrow-datafusion/issues/1472)
- ExecutionContext support init ExecutionContextState with `new(state: Arc<Mutex<ExecutionContextState>>)` method [\#1439](https://github.com/apache/arrow-datafusion/issues/1439)
- SessionContext support init SessionState with `new(state: Arc<Mutex<SessionState>>)` method [\#1439](https://github.com/apache/arrow-datafusion/issues/1439)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably should revert changes to the CHANGELOG for past releases

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

second on this since they are auto generated.

@@ -419,7 +423,8 @@ mod tests {

#[tokio::test]
async fn read_alltypes_plain_parquet() -> Result<()> {
let runtime = Arc::new(RuntimeEnv::default());
let session_ctx = SessionContext::new();
let task_ctx = Arc::new(TaskContext::from(&session_ctx));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is such a common pattern (create a Arc` I wonder if it would make sense to create a function for it?

like

impl SessionContext {
  /// Get a new TaskContext to run in this session
  pub fn task_ctx(&self) -> Arc<TaskContext> {
    ...
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is such a common pattern (create a Arc` I wonder if it would make sense to create a function for it?

like

impl SessionContext {
  /// Get a new TaskContext to run in this session
  pub fn task_ctx(&self) -> Arc<TaskContext> {
    ...
  }
}

Done.

@@ -1220,6 +1233,74 @@ impl FunctionRegistry for ExecutionContextState {
}
}

/// Task Context Properties
pub enum TaskProperties {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like TaskProperties might want both the session config as well as possibly key value pairs (rather than either / or)

What about something like

pub strut TaskProperties {
  config: SessionConfig,
  kv_pairs: Option<HashMap<String, String>>
}

?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this change is important (though we could do it as a follow on PR)

Otherwise I was forced to use the following code in https://github.com/influxdata/influxdb_iox/pull/4023 to get the batch size which was quite messy:

    async fn execute(
        &self,
        _partition: usize,
        context: Arc<TaskContext>,
    ) -> DataFusionResult<SendableRecordBatchStream> {
        let batch_size = {
            if let TaskProperties::SessionConfig(config) = &context.task_settings {
                config.runtime.batch_size
            } else {
                todo!("Need to always have properties")
            }
        };

Copy link
Member

@yjshen yjshen Mar 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this depends on how will TaskProperties be populated and lately used. If we are asking about the ability to adjust configs on a task basis, then a combination is needed.

Otherwise, we could do branching with TaskProperties like:

impl TaskProperties {
    fn conf(&self, conf_key: impl Into<String>) -> String {
        "".to_owned()
    }

    fn typed_conf<T>(&self, conf_key: impl Into<String>) -> T {
        T::new()
    }

    fn batch_size(&self) -> usize {
        self.typed_conf("batch_size")
    }
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe in a follow-up PR, as the number of configurations grows, we should probably make confs in SessionConfig as HashMap as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will cover this in the following PR. For batch_size, it will be moved out from RuntimeEnv to SessionConfig.
And SessionConfig just includes the configuration entires, no RuntimeEnv anymore.
And in the TaskContext, there will be a method to get the current SessionConfig.

impl TaskProperties {

/// Return the SessionConfig associated with the Task
    pub fn session_config(&self) -> SessionConfig {
        let task_settings = &self.task_settings;
        match task_settings {
            TaskProperties::KVPairs(props) => {
                let session_config = SessionConfig::new();
                session_config
                    .with_batch_size(props.get(BATCH_SIZE).unwrap().parse().unwrap())
                    .with_target_partitions(
                        props.get(TARGET_PARTITIONS).unwrap().parse().unwrap(),
                    )
                    .with_repartition_joins(
                        props.get(REPARTITION_JOINS).unwrap().parse().unwrap(),
                    )
                    .with_repartition_aggregations(
                        props
                            .get(REPARTITION_AGGREGATIONS)
                            .unwrap()
                            .parse()
                            .unwrap(),
                    )
                    .with_repartition_windows(
                        props.get(REPARTITION_WINDOWS).unwrap().parse().unwrap(),
                    )
                    .with_parquet_pruning(
                        props.get(PARQUET_PRUNING).unwrap().parse().unwrap(),
                    )
            }
            TaskProperties::SessionConfig(session_config) => session_config.clone(),
        }
    }
}

In this PR, to retrieve the batch size, currently, we can use below code, but this will be changed soon in the following PR.

 let batch_size = context.runtime.batch_size();

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a PR porting IOx to use this change. It took a few minutes, but it was pretty much all mechanical: https://github.com/influxdata/influxdb_iox/pull/4023 👍

/// Session Id
pub session_id: String,
/// Task settings
pub task_settings: TaskProperties,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub task_settings: TaskProperties,
pub properties: TaskProperties,

Maybe to be consistent with the type of the struct?

Copy link
Member

@yjshen yjshen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mingmwang again for splitting the original huge PR into parts for the easier review! The mechanical changes make sense to me. And the TaskContext name seems reasonable given it's including the task identifier.

Currently, my main concern exists on the execute() method after this PR merged. Since we are introducing a Task term that does not correspond to any entity inside DataFusion, and there will be two partition terms for execute() method.

A minor one:
There are two remaining avro tests needed to be adjusted within this PR with RuntimeEnv: read_f64_alltypes_plain_avro and read_binary_alltypes_plain_avro.

@@ -1220,6 +1233,74 @@ impl FunctionRegistry for ExecutionContextState {
}
}

/// Task Context Properties
pub enum TaskProperties {
Copy link
Member

@yjshen yjshen Mar 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this depends on how will TaskProperties be populated and lately used. If we are asking about the ability to adjust configs on a task basis, then a combination is needed.

Otherwise, we could do branching with TaskProperties like:

impl TaskProperties {
    fn conf(&self, conf_key: impl Into<String>) -> String {
        "".to_owned()
    }

    fn typed_conf<T>(&self, conf_key: impl Into<String>) -> T {
        T::new()
    }

    fn batch_size(&self) -> usize {
        self.typed_conf("batch_size")
    }
}

@@ -1220,6 +1233,74 @@ impl FunctionRegistry for ExecutionContextState {
}
}

/// Task Context Properties
pub enum TaskProperties {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe in a follow-up PR, as the number of configurations grows, we should probably make confs in SessionConfig as HashMap as well?

/// Optional Task Identify
pub task_id: Option<String>,
/// Session Id
pub session_id: String,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move session_id above task_id?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move session_id above task_id?

Done.

/// Task Execution Context
pub struct TaskContext {
/// Optional Task Identify
pub task_id: Option<String>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably we could make task_id a struct instead of String? And we could replace the usage of task_id_log with simply impl Display.

let task_props = HashMap::new();

let task_context = Arc::new(TaskContext::new(
task_id_log.clone(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _log suffix is a little bit weird. See comments above.

}

/// Task Execution Context
pub struct TaskContext {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Task is a vague term in the scope of DataFusion, I think. In the physical optimization phase, we would repartition plans based on config.target_partition when possible. Therefore, the task represents the initial partition of DataSources?

A plausible solution might require a major change on the current framework by introducing the Stage term in the DataFusion core. Then we could:

  • Partition input data set based on conf likemax_bytes_per_partition.
  • processing data with physical operators serially for each input partition until we meet a "synchronization barrier" required by operators such as sort or aggregate.
  • add an exchange operator (or repartition), and continue the computation in another task from the successor stage.

And by introducing Stages into DataFusion core as well, we could make task_id required, make task_context the only parameter for execute.

For the current PR, I think we should articulate what does Task mean for DataFusion.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. I'm a little confused, too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for introducing Stage term in DataFusion. Ballista currently has job_id, stage_id, and partition_id (which perhaps could have been task_id instead).

) -> Result<SendableRecordBatchStream> {
Ok(Box::pin(CoalesceBatchesStream {
input: self.input.execute(partition, runtime).await?,
input: self.input.execute(partition, context).await?,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

execute with partition as well as context.task_id.partition_id will be vague after this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. That's why I'm using the string task_id in TaskContext instead of a struct.
Maybe we should use another uuid to uniquely present a task for Ballista Task.
And currently in the system, we have task_id and partiton_id used alternatively.

In Ballista proto

#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TaskDefinition {
    #[prost(message, optional, tag = "1")]
    pub task_id: ::core::option::Option<PartitionId>,
    #[prost(bytes = "vec", tag = "2")]
    pub plan: ::prost::alloc::vec::Vec<u8>,
    /// Output partition for shuffle writer
    #[prost(message, optional, tag = "3")]
    pub output_partitioning: ::core::option::Option<PhysicalHashRepartition>,
}

In scheduler/mod.rs

/// Unique identifier for the output partition of an operator.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct PartitionId {
    pub job_id: String,
    pub stage_id: usize,
    pub partition_id: usize,
}

And agree that in DataFusion the Task is a vague term. As @yahoNanJing mentioned early, the TaskContext is actually the execute() method's context. To avoid confusing with the original ExecutionContext, I do not use ExecutionContext but TaskContext. Actually in DuckDb they call it ExecutionContext. I'm open to the naming, If everyone agree to use ExecutionContext, I can change it to avoid introducing a vague Task term to DataFusion.

DuckDB code

OperatorResultType Execute(ExecutionContext &context, DataChunk &input, DataChunk &chunk,
	                           OperatorState &state) const override;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since ExecutionContext is currently used for something quite different in DataFusion (what this PR renames to SessionContext), defining it to be something else may be quite confusing

Perhaps something like RunContext or RuntimeContext would be less vague than TaskContext but also not redefine the ExecutionContext term?

Comment on lines 146 to 149
pub session_id: String,
/// Internal state for the context
pub state: Arc<Mutex<ExecutionContextState>>,
pub state: Arc<Mutex<SessionState>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like there is an invariant between session_id and state.session_id, if so, it's better to keep session_id private so it is always in sync with the id in state. we can add a session_id method to make the id reader to users.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like there is an invariant between session_id and state.session_id, if so, it's better to keep session_id private so it is always in sync with the id in state. we can add a session_id method to make the id reader to users.

Done.

@mingmwang
Copy link
Contributor Author

maining avro tests needed to be adjusted within this PR with RuntimeEnv: read_f64_alltypes_plain_avro and read_binary_alltypes_plain_avro.

Fixed this.

@alamb
Copy link
Contributor

alamb commented Mar 15, 2022

It seems like most / all comments on this PR have been addressed. Are there any objections to clearing up the remaining conflicts, merging this one in, and refining in subsequent PRs?

@yjshen
Copy link
Member

yjshen commented Mar 16, 2022

@mingmwang would you please clean up the conflicts to get this merged?

@alamb
Copy link
Contributor

alamb commented Mar 16, 2022

Yeah, the sooner we can get this merged the better so that it doesn't pick up more conflicts and so that downstream crates can start making the required changes

@mingmwang
Copy link
Contributor Author

I will merge multiple commits in this PR and rebase to master to resolve the conflicts.

@mingmwang
Copy link
Contributor Author

mingmwang commented Mar 16, 2022

@alamb @yjshen Conflicts resolved, please help to take a look and approve the merge.

Copy link
Member

@yjshen yjshen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks @mingmwang!

@yjshen yjshen merged commit 4994eda into apache:master Mar 16, 2022
@yjshen
Copy link
Member

yjshen commented Mar 16, 2022

Thank you @mingmwang for being persistent and driving through this big change step by step.

@liukun4515
Copy link
Contributor

@yjshen please add a api-change tag for this pr or issue.
Thanks

@yjshen yjshen added enhancement New feature or request api change Changes the API exposed to users of the crate labels Mar 16, 2022
@alamb
Copy link
Contributor

alamb commented Mar 16, 2022

Thank you @mingmwang !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api change Changes the API exposed to users of the crate datafusion Changes in the datafusion crate enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Refactor ExecutionContext and related conf to support multi-tenancy configurations.
8 participants