-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor SessionContext, SessionState and SessionConfig to support multi-tenancy configurations - Part 2 #2029
Conversation
…lti-tenancy configurations - Part 2
parquet_pruning: bool, | ||
/// Runtime configurations such as memory threshold and local disk for spill | ||
pub runtime: RuntimeConfig, | ||
pub parquet_pruning: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My feelings are getting stronger that we should use a hashmap or an existing config
lib to store the growing number of configurations.
We could provide "helper" methods to fast read/write typed confs, and for the existing ones:
pub fn get_string(&self, key: &str) -> Result<String> {
self.get(key).and_then(Value::into_string)
}
pub fn get_usize(&self, key: &str) -> Result<usize> {
self.get(key).and_then(Value::into_usize)
}
pub fn get_bool(&self, key: &str) -> Result<bool> {
self.get(key).and_then(Value::into_bool)
}
pub fn batch_size(&self) -> usize {
self.get_usize(BATCH_SIZE).unwrap_or_default(DEFAULT_BATCH_SIZE)
}
And we will provide more flexibility for DataFusion users of passing down customized configurations into PhysicalExec
or UDF
s
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could also unify TaskProperties
:
pub enum TaskProperties {
///SessionConfig
SessionConfig(SessionConfig),
/// Name-value pairs of task properties
KVPairs(HashMap<String, String>),
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yjshen I agree that the interface with getters would be good;
pub fn get_bool(&self, key: &str) -> Result<bool> {
self.get(key).and_then(Value::into_bool)
}
Among other things that would allow us to change the implementation (e.g. to properties or typed fields) without issue
I do think typed fields have some nice properties (like being able to do possibly earlier error detection) but in order to support extensions/customized properties we definitely need to support aribtrary key/values as well
In terms of TaskProperties
I personally prefer something like this:
pub struct TaskProperties {
///SessionConfig
config: SessionConfig,
/// Additional Name-value pairs of task properties
properties: HashMap<String, String>,
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about this one, to avoid having a SessionConfig and a HashMap.
#[derive(Clone, Debug)]
enum Value {
USIZE(usize),
STRING(String),
BOOL(bool),
}
impl Value {
fn into_usize(&self) -> Result<usize> {
if let Value::USIZE(u) = self {
Ok(*u)
} else {
Err(DataFusionError::Internal(format!("{:?} not a usize conf", self)))
}
}
fn into_string(&self) -> Result<String> {
if let Value::STRING(s) = self {
Ok(s.to_owned())
} else {
Err(DataFusionError::Internal(format!("{:?} not a string conf", self)))
}
}
}
struct TaskProperties {
pub inner: HashMap<String, Value>,
}
impl TaskProperties {
/// setters
fn set_usize(&mut self, key: impl Into<String>, value: usize) {
self.inner.insert(key.into(), Value::USIZE(value));
}
fn set_bool(&mut self, key: impl Into<String>, value: bool) {
self.inner.insert(key.into(), Value::BOOL(value));
}
/// getters
fn get_usize(&self, key: &str, default: usize) -> usize {
self.inner.get(key).and_then(|x| x.into_usize().ok()).unwrap_or(default)
}
/// known conf fast passes
fn batch_size(&self) -> usize {
self.get_usize("target_batch_size", 10240)
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about this one, to avoid having a SessionConfig and a HashMap.
I think it is overkill at this point, to be honest. If we want to have dynamically typed versions, perhaps we can use ScalarValue
.
Starting with a single hash map with strings, and well documented getter/setter functions would be fine and we can optimizer performance / error handling later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a single hash map with strings
Make sense to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about this one, to avoid having a SessionConfig and a HashMap.
I think it is overkill at this point, to be honest. If we want to have dynamically typed versions, perhaps we can use
ScalarValue
.Starting with a single hash map with strings, and well documented getter/setter functions would be fine and we can optimizer performance / error handling later
I prefer to use a hash map or other more extendable structs too. But can I leave this change in another PR ?
datafusion/src/execution/context.rs
Outdated
pub const REPARTITION_AGGREGATIONS: &str = "repartition_aggregations"; | ||
/// Session Configuration entry name | ||
pub const REPARTITION_WINDOWS: &str = "repartition_windows"; | ||
/// Session Configuration entry name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please update these docs
@@ -208,6 +186,11 @@ impl SessionContext { | |||
self.state.lock().runtime_env.clone() | |||
} | |||
|
|||
/// Return a copied version of config for this Session | |||
pub fn copied_config(&self) -> SessionConfig { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
copied_
is unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a more standard interface would be
pub fn config(&self) -> &SessionConfig {
..
}
And then the caller can choose to do state.config().clone()
if they want a copy
However I see the reason for doing it this way is that SessionConfig
is wrapped in a Mutex.
I wonder if we can remove the Mutex in some future PR (and maybe use
#[derive(Clone)]
pub struct SessionContext {
...
/// Shared session state for the session
pub state: Arc<SessionState>,
}
And then handle cloning / copying internal to SessionContext when it is mutated. 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a more standard interface would be
pub fn config(&self) -> &SessionConfig { .. }And then the caller can choose to do
state.config().clone()
if they want a copyHowever I see the reason for doing it this way is that
SessionConfig
is wrapped in a Mutex.I wonder if we can remove the Mutex in some future PR (and maybe use
#[derive(Clone)] pub struct SessionContext { ... /// Shared session state for the session pub state: Arc<SessionState>, }And then handle cloning / copying internal to SessionContext when it is mutated. 🤔
The reason that I name the method 'copied_config' is because I want to emphasize the fact that the configuration inside the SessionContext can be changed at anytime and this method returns a copied version. And I can not return the reference since it is wrapped by a Mutex.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 makes sense
datafusion/src/execution/context.rs
Outdated
|
||
/// Return the SessionConfig associated with the Task | ||
pub fn session_config(&self) -> SessionConfig { | ||
let task_settings = &self.task_settings; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename task_settings
to properties
as suggested in the previous PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure
Will the multi-tenancy setup be enabled for DataFusion as well as Ballista? My use case is that i'm looking to add multiple SQL Editor / execution tabs to datafusion-tui that would enable executing multiple queries at once (regardless of whether backend is datafusion or ballista). |
That is my understanding @matthewmturner -- we already run multiple queries in IOx concurrently, but they know almost nothing about each other yet. I see this work helping to make them cooperate much more easily |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mingmwang
I went over this PR carefully and it looks like a great step forward to me. Thank you
The use of Mutex
around the SessionState
feels unecessary to me, but I also think we can remove it as a follow on PR
session_ctx: Arc<SessionContext>, | ||
config: &BallistaConfig, | ||
) -> Arc<SessionContext> { | ||
session_ctx.state.lock().config.target_partitions = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the plan over time to copy more fields from Ballista config into SessionState
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I will cover Ballista related changes in part3.
/// Runtime memory management | ||
pub memory_manager: Arc<MemoryManager>, | ||
/// Manage temporary files during query execution | ||
pub disk_manager: Arc<DiskManager>, | ||
/// Object Store Registry | ||
pub object_store_registry: Arc<ObjectStoreRegistry>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a good change 👍
@@ -297,7 +297,7 @@ mod tests { | |||
) -> Result<()> { | |||
let session_ctx = SessionContext::new(); | |||
let task_ctx = session_ctx.task_ctx(); | |||
let conf = session_ctx.state.lock().clone().config; | |||
let conf = session_ctx.copied_config(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned elsewhere, I think most rust programmers would expect this code to look like
let conf = session_ctx.copied_config(); | |
let conf = session_ctx.config().clone(); |
datafusion/src/execution/context.rs
Outdated
} else { | ||
self.optimize_internal(plan, |_, _| {}) | ||
} | ||
self.state.lock().optimize(plan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This lock I think will effectively serialize all optimizer runs (so the optimizer can not run in multiple tasks / threads at once)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about a RW lock ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, I didn't modify the lock related behavior, originally the optimize_internal() method was guarded by the same state lock and serialize the optimization rules running too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just modified the Mutex to RWLock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, I didn't modify the lock related behavior, originally the optimize_internal() method was guarded by the same state lock and serialize the optimization rules running too.
Yeah, it is somewhat of a mess -- it will be nice to clean this up in a subsequent PR
Hi, @alamb The use of |
Yes, after all the parts of the PR finished, it will allow to execute multiple queries concurrently with a single SessionContext. |
@mingmwang since this PR had several conflicts, I took the liberty of merging from master and resolving the conflicts in 832b43b and a2bedc7. I worried that it would hang out for longer and accumulate additional conflicts which I preferred to avoid |
Thanks a lot! |
Which issue does this PR close?
Partially Closes #1862.
This PR covers the part 2:
Rationale for this change
What changes are included in this PR?
Are there any user-facing changes?