-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Config Cleanup: Remove TaskProperties and KV structure, keep key=value serialization #4382
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1810,22 +1810,14 @@ impl FunctionRegistry for SessionState { | |
} | ||
} | ||
|
||
/// Task Context Properties | ||
pub enum TaskProperties { | ||
///SessionConfig | ||
SessionConfig(SessionConfig), | ||
/// Name-value pairs of task properties | ||
KVPairs(HashMap<String, String>), | ||
} | ||
|
||
/// Task Execution Context | ||
pub struct TaskContext { | ||
/// Session Id | ||
session_id: String, | ||
/// Optional Task Identify | ||
task_id: Option<String>, | ||
/// Task properties | ||
properties: TaskProperties, | ||
/// Session configuration | ||
session_config: SessionConfig, | ||
/// Scalar functions associated with this task context | ||
scalar_functions: HashMap<String, Arc<ScalarUDF>>, | ||
/// Aggregate functions associated with this task context | ||
|
@@ -1844,55 +1836,52 @@ impl TaskContext { | |
aggregate_functions: HashMap<String, Arc<AggregateUDF>>, | ||
runtime: Arc<RuntimeEnv>, | ||
) -> Self { | ||
let session_config = if task_props.is_empty() { | ||
SessionConfig::new() | ||
} else { | ||
SessionConfig::new() | ||
.with_batch_size(task_props.get(OPT_BATCH_SIZE).unwrap().parse().unwrap()) | ||
.with_target_partitions( | ||
task_props.get(TARGET_PARTITIONS).unwrap().parse().unwrap(), | ||
) | ||
.with_repartition_joins( | ||
task_props.get(REPARTITION_JOINS).unwrap().parse().unwrap(), | ||
) | ||
.with_repartition_aggregations( | ||
task_props | ||
.get(REPARTITION_AGGREGATIONS) | ||
.unwrap() | ||
.parse() | ||
.unwrap(), | ||
) | ||
.with_repartition_windows( | ||
task_props | ||
.get(REPARTITION_WINDOWS) | ||
.unwrap() | ||
.parse() | ||
.unwrap(), | ||
) | ||
.with_parquet_pruning( | ||
task_props.get(PARQUET_PRUNING).unwrap().parse().unwrap(), | ||
) | ||
.with_collect_statistics( | ||
task_props.get(COLLECT_STATISTICS).unwrap().parse().unwrap(), | ||
) | ||
}; | ||
|
||
Self { | ||
task_id: Some(task_id), | ||
session_id, | ||
properties: TaskProperties::KVPairs(task_props), | ||
session_config, | ||
scalar_functions, | ||
aggregate_functions, | ||
runtime, | ||
} | ||
} | ||
|
||
/// Return the SessionConfig associated with the Task | ||
pub fn session_config(&self) -> SessionConfig { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this conversion from string/value properties to SessionConfig is moved to |
||
let task_props = &self.properties; | ||
match task_props { | ||
TaskProperties::KVPairs(props) => { | ||
let session_config = SessionConfig::new(); | ||
if props.is_empty() { | ||
session_config | ||
} else { | ||
session_config | ||
.with_batch_size( | ||
props.get(OPT_BATCH_SIZE).unwrap().parse().unwrap(), | ||
) | ||
.with_target_partitions( | ||
props.get(TARGET_PARTITIONS).unwrap().parse().unwrap(), | ||
) | ||
.with_repartition_joins( | ||
props.get(REPARTITION_JOINS).unwrap().parse().unwrap(), | ||
) | ||
.with_repartition_aggregations( | ||
props | ||
.get(REPARTITION_AGGREGATIONS) | ||
.unwrap() | ||
.parse() | ||
.unwrap(), | ||
) | ||
.with_repartition_windows( | ||
props.get(REPARTITION_WINDOWS).unwrap().parse().unwrap(), | ||
) | ||
.with_parquet_pruning( | ||
props.get(PARQUET_PRUNING).unwrap().parse().unwrap(), | ||
) | ||
.with_collect_statistics( | ||
props.get(COLLECT_STATISTICS).unwrap().parse().unwrap(), | ||
) | ||
} | ||
} | ||
TaskProperties::SessionConfig(session_config) => session_config.clone(), | ||
} | ||
pub fn session_config(&self) -> &SessionConfig { | ||
&self.session_config | ||
} | ||
|
||
/// Return the session_id of this [TaskContext] | ||
|
@@ -1914,39 +1903,22 @@ impl TaskContext { | |
/// Create a new task context instance from SessionContext | ||
impl From<&SessionContext> for TaskContext { | ||
fn from(session: &SessionContext) -> Self { | ||
let session_id = session.session_id.clone(); | ||
let (config, scalar_functions, aggregate_functions) = { | ||
let session_state = session.state.read(); | ||
( | ||
session_state.config.clone(), | ||
session_state.scalar_functions.clone(), | ||
session_state.aggregate_functions.clone(), | ||
) | ||
}; | ||
let runtime = session.runtime_env(); | ||
Self { | ||
task_id: None, | ||
session_id, | ||
properties: TaskProperties::SessionConfig(config), | ||
scalar_functions, | ||
aggregate_functions, | ||
runtime, | ||
} | ||
TaskContext::from(&*session.state.read()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is a drive by clean up as the code was replicated in |
||
} | ||
} | ||
|
||
/// Create a new task context instance from SessionState | ||
impl From<&SessionState> for TaskContext { | ||
fn from(state: &SessionState) -> Self { | ||
let session_id = state.session_id.clone(); | ||
let config = state.config.clone(); | ||
let session_config = state.config.clone(); | ||
let scalar_functions = state.scalar_functions.clone(); | ||
let aggregate_functions = state.aggregate_functions.clone(); | ||
let runtime = state.runtime_env.clone(); | ||
Self { | ||
task_id: None, | ||
session_id, | ||
properties: TaskProperties::SessionConfig(config), | ||
session_config, | ||
scalar_functions, | ||
aggregate_functions, | ||
runtime, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The core of the change is to just use
SessionConfig
everywhere, creating it in the constructor ofTaskContext
if needed