Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor ExecutionContext and related conf to support multi-tenancy configurations. #1862

Closed
mingmwang opened this issue Feb 18, 2022 · 8 comments · Fixed by #1987, #2029 or #2091
Closed
Labels
enhancement New feature or request

Comments

@mingmwang
Copy link
Contributor

mingmwang commented Feb 18, 2022

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

Fixe 1848
Address 138 and 682

(This section helps Arrow developers understand the context and why for this feature, in addition to the what)

Describe the solution you'd like
In order to support more extensible and multi-tenancy configurations , we need to introduce a session related context to isolate the user specific configurations. The configurations should be correctly propagate to the executor side along with tasks/plans.

Here is the proprosal:

  1. Rename ExecutionContext to SessionContext, SessionContext is still the main interface for executing queries with DataFusion. It stands for a connection/session between user and DataFusion/Ballista cluster.
  2. Rename ExecutionContextState to SessionState to hold the session specific state like registered functions, catalogs list,
    optimizers rule and SQL related configurations etc.
  3. Rename ExecutionConfig to SQLConfig to hold all SQL related configuration entries, for example, target partition count, batch_size etc.
  4. Move RuntimeConfig and RuntimeEnv out of ExecutionConfig, use RuntimeConfig and RuntimeEnv to hold non-user specific/static configuration and env settings. For each executor/scheduler instance there is only one RuntimeConfig and RuntimeEnv instance. Once RuntimeEnv and RuntimeConfig were created, they can not be changed.
  5. Add createSession, closeSession methods to SchedulerServer gRPC call. Have a unique ID to represent the current session. Ongoing ExecuteQueryParams will include the session id.
  6. Add getSQLConf method to ExecutionPlan trait. Each ExecutionPlan will hold a reference to the SQLConfig from the user session. ExecutionPlan can access the SQLConf during plan time or execution time(task).
  7. Users can set SQL configuration or change SQL configurations with SQL cmd or just issue execute_query with only configuration setting items.

I can work on the PRs. There will be two or three PRs to cover different parts.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

@mingmwang mingmwang added the enhancement New feature or request label Feb 18, 2022
@mingmwang
Copy link
Contributor Author

@houqp
Copy link
Member

houqp commented Feb 18, 2022

The change makes sense to me over all 👍 Only nitpick is I think SQLConfig is not a good name for the following two reasons: 1) target partition and batch_size are not SQL specific configs 2) I think it would be better to design the system so that SQL is just a subset of the features that are supported by ballista, in the long run, I consider ballista a distributed compute framework that can execute arbitrary relational queries defined by users, even with custom UDFs. Perhaps we can keep the ExecutionConfig name? Or maybe rename to SessionConfig.

@thinkharderdev
Copy link
Contributor

thinkharderdev commented Feb 18, 2022

I think we need a way to allow the registration of custom optimizers, udfs, etc at startup. Part of the rationale for #1677 was to create a global ExecutionContext at startup that could serve as a hook to register extension object stores, planners, optimizers and udf/udaf (i.e. that we can use all the extensible features of DataFusion with Ballista).

Maybe we can still create the scheduler with an ExecutionContext \ SessionContext and call it session_template or something like that. Then new sessions are cloned from that template (with any session-specific settings applied)?

@mingmwang
Copy link
Contributor Author

I'm working on it now, the PR is quite huge. Need to make execution plan session config aware.
Below are some sample code snips:

In the ExecutionPlan trait, I add two methods, and each ExecutionPlan need to add the session_id as its member.

pub trait ExecutionPlan: Debug + Send + Sync  {
    . . . . . . . . . .
   /// Return the Session id associated with the execution plan.
    fn session_id(&self) -> String;

   /// Return the Session configuration associated with the execution plan.
    fn session_config(&self) -> Arc<SessionConfig> {
        let session_id = self.session_id();
        let runtime = RuntimeEnv::global();
        runtime.lookup_session_config(session_id)
    }

}

And RuntimeEnv is changed to a global singleton structure. It was created at the very beginning when Executor or Scheduler is firstly initialized in the main() method. And RuntimeEnv does not need to pass down to plan's execution path anymore.

pub static RUNTIME_ENV: OnceCell<RuntimeEnv> = OnceCell::new();

/// Execution runtime environment. This structure is a singleton for each Scheduler/Executor instance.
pub struct RuntimeEnv {
    /// Executor Id
    pub executor_id: Option<String>,
    /// Local Env
    pub is_local: bool,
    /// Runtime memory management
    pub memory_manager: Arc<MemoryManager>,
    /// Manage temporary files during query execution
    pub disk_manager: Arc<DiskManager>,
    /// Object Store that are registered within the Scheduler's or Executors' Runtime
    pub object_store_registry: Arc<ObjectStoreRegistry>,
    /// DataFusion task contexts that are registered within the Executors' Runtime
    pub task_context_registry: Option<Arc<TaskContextRegistry>>,
    /// DataFusion session contexts that are registered within the Scheduler's Runtime
    pub session_context_registry: Option<Arc<SessionContextRegistry>>,
}

impl RuntimeEnv {
    /// Create an executor env based on configuration
    pub fn newExecutorEnv(config: RuntimeConfig, executor_id: String) -> Result<Self> {
        let RuntimeConfig {
            memory_manager,
            disk_manager,
        } = config;
        Ok(Self {
            executor_id: Some(executor_id),
            is_local: false,
            memory_manager: MemoryManager::new(memory_manager),
            disk_manager: DiskManager::try_new(disk_manager)?,
            object_store_registry: Arc::new(ObjectStoreRegistry::new()),
            task_context_registry: Some(Arc::new(TaskContextRegistry::new())),
            session_context_registry: None,
        })
    }

    /// Create a scheduler env based on configuration
    pub fn newSchedulerEnv(config: RuntimeConfig) -> Result<Self> {
        let RuntimeConfig {
            memory_manager,
            disk_manager,
        } = config;
        Ok(Self {
            executor_id: None,
            is_local: false,
            memory_manager: MemoryManager::new(memory_manager),
            disk_manager: DiskManager::try_new(disk_manager)?,
            object_store_registry: Arc::new(ObjectStoreRegistry::new()),
            task_context_registry: None,
            session_context_registry: Some(Arc::new(SessionContextRegistry::new())),
        })
    }

    /// Create a local env based on configuration
    pub fn newLocalEnv(config: RuntimeConfig) -> Result<Self> {
        let RuntimeConfig {
            memory_manager,
            disk_manager,
        } = config;
        Ok(Self {
            executor_id: None,
            is_local: true,
            memory_manager: MemoryManager::new(memory_manager),
            disk_manager: DiskManager::try_new(disk_manager)?,
            object_store_registry: Arc::new(ObjectStoreRegistry::new()),
            task_context_registry: None,
            session_context_registry: Some(Arc::new(SessionContextRegistry::new())),
        })
    }

    pub fn global() -> &'static RuntimeEnv {
        RUNTIME_ENV.get().expect("RuntimeEnv is not initialized")
    }

    pub fn isScheduler(&self) -> bool {
        (!self.is_local) && self.executor_id.is_none()
    }

   /// Retrieves a `SessionConfig` by session_id
    pub fn lookup_session_config(&self, session_id: String) -> Arc<SessionConfig> {
        if self.isScheduler() {
            let session_conf = self
                .lookup_session(session_id.as_str())
                .expect("SessionContext doesn't exist")
                .state
                .lock()
                .clone()
                .config;
            Arc::new(session_conf)
        } else {
            self.config_from_task_context(session_id)
        }
    }
}

The new SessionContexts are registered to Scheduler's RuntimeEnv. And Task contexts are registered to Executors' RuntimeEnv. SessionContext contains the internal SessionState with the planners, optimizers, udf/udaf, SessionConfig etc.
SessionContext only exists in the Scheduler, because it is related to planning and optimization.

In the TaskDefinition proto, add session_id and props name-value pairs so that session_id and configurations can
propagate to the executor side and recreate the Physical ExecutionPlan.

message TaskDefinition {
  PartitionId task_id = 1;
  bytes plan = 2;
  // Output partition for shuffle writer
  PhysicalHashRepartition output_partitioning = 3;
  string session_id = 4;
  repeated KeyValuePair props = 5;
}

@alamb
Copy link
Contributor

alamb commented Mar 1, 2022

I am sorry for the late review.

So we have something like this in IOx where we have context we want to use and pass down when we run plans using DataFusion and it supports multi-tenancy quite well. The approach we went with is to create a Executor struct code that contains the DiskManager and MemoryManager which creates IOxExecutionContexts which wrap the DataFusion ExecutionContext (as well as registering UDFs, etc)

I am a little worried about introducing a "global singleton" into DataFusion -- it may save plumbing work in the short term but I think it makes it hard to write tests and cause other hard to diagnose problems. I think it is more maintainable longer term to explicitly provide all contexts required

If you are describing a change to Ballista (to add a global singleton into Ballista) I think that makes much more sense as I expect Ballista to be used standalone rather than as a library for others to build upon

@alamb
Copy link
Contributor

alamb commented Mar 1, 2022

Note IOx uses custom optimizers and UDFs too

@mingmwang
Copy link
Contributor Author

o diagnose problems. I think it is more m

@mingmwang
Copy link
Contributor Author

Just close this issue. We can have further discussion in the PR request.
#1924

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment