Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add udf/udaf plugin #1881

Closed

Conversation

EricJoy2048
Copy link
Member

@EricJoy2048 EricJoy2048 commented Feb 25, 2022

closes #1882

In this PR, I have implemented the plug-in of UDF. In the next PR, I will complete the serialization and deserialization of UDF / udaf by ballista relying on UDF plugin.

@github-actions github-actions bot added the datafusion Changes in the datafusion crate label Feb 25, 2022
@EricJoy2048
Copy link
Member Author

EricJoy2048 commented Feb 25, 2022

The clippy check failed because package fuzz-utils. But I don't modify this package.

image

@Ted-Jiang
Copy link
Member

I have meet the same issue, Thanks for your work!

@Ted-Jiang
Copy link
Member

The clippy check failed because package fuzz-utils. But I don't modify this package.

image

fixed in #1880

@Ted-Jiang
Copy link
Member

retest this please

datafusion/build.rs Outdated Show resolved Hide resolved
@EricJoy2048 EricJoy2048 mentioned this pull request Feb 27, 2022
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this contribution @gaojun2048 . I haven't had a chance to go through this entire PR yet, but I do wonder if we need a dynamic plugin manager to support UDF in ballista.

There are at least two distinct use cases:

  1. You are compiling a custom version of ballista and need to use udfs
  2. You want to use a unmodified version of ballista and register udfs that you compiled into your own shared library

A plugin manager is required for the second usecase but not the first. I wonder if you need the flexibility of the second usecase or if we could get away with less of a change if you are building your own version of ballista.

If we need a plugin manager, I would like to see it more fully integrated so that it is covered by existing (as well as the new test). This would mean remove the list of scalar_functions and aggegate_functions on ExecutionContext and replace them with a plugin manager that was always present

datafusion/build.rs Outdated Show resolved Hide resolved
@EricJoy2048
Copy link
Member Author

EricJoy2048 commented Mar 2, 2022

Thank you for your advice @alamb .
Yes, the udf plugin is designed for those who use Ballista as a computing engine, but do not want to modify the source code of ballista. We use ballista in production and we need ballista to be able to use our custom udf. As a user of ballista, I am reluctant to modify the source code of ballista directly, because it means that I need to recompile ballista myself, and in the future, when I want to upgrade ballista to the latest version of the community, I need to do more merges work. If I use the udf plugin, I only need to maintain the custom udf code. When I upgrade the version of ballista, I only need to modify the version number of the datafusion dependency in the code, and then recompile these udf dynamic libraries. I believe this is a more friendly way for those who actually use ballista as a computing engine.

In my opinion, people who use datafusion and people who use ballista are different people, and the udf plugin is more suitable for ballista than datafusion.

  1. People who use datafusion generally develop their own computing engines on the basis of datafusion. In this case, they often do not need udf plugins. They only need to put the udf code into their own computing engines, and they decide for themselves. When to call register_udf to register udf into datafusion. If needed, they can handle the serialization and deserialization of custom UDFs in their own computing engine to achieve distributed scheduling.
  2. People who use ballista generally only use ballista as a computing engine. They often do not have a deep understanding of the source code of datafusion. It is very difficult to directly modify the source code of ballista and datafusion. They may update the version of ballista frequently, and modifying the source code of ballista's datafusion means that each upgrade requires merge code and recompile, which is a very big burden for them. In particular, it should be pointed out that there is no way for udf to work in ballista now, because serialization and deserialization of udf need to know the specific implementation of udf, which cannot be achieved without modifying the source code of ballista and datafusion. The role of the udf plugin in this case is very obvious. They only need to maintain their own udf code and do not need to pay attention to the code changes of ballista's datafusion. And In ballista, we can serialization the udf with the udf's name, And then we deserialization udf use the udf's name get_scalar_udf_by_name(&self, fun_name: &str). These operations are completed through the trail UDFPlugin. Ballista does not need to know who has implemented the UDF plugin.
  3. I don't think scalar_functions and aggregate_functions in ExecutionContext need to be modified as these are for those who use datafusion but not ballista. So I think I should modify the code and migrate the plugin mod into the ballista crate instead of staying in datafusion.

Thanks a lot, can you give me more advice on these?

@EricJoy2048
Copy link
Member Author

Can you explain the need for getting the rustc version?
#1881 (comment)

Yes. from this issue rust-lang/rfcs#600 I sea Rust doesn’t have a stable ABI, meaning different compiler versions can generate incompatible code. For these reasons, the UDF plug-in must be compiled using the same version of rustc as datafusion.

@EricJoy2048
Copy link
Member Author

Thank you for your advice @alamb . Yes, the udf plugin is designed for those who use Ballista as a computing engine, but do not want to modify the source code of ballista. We use ballista in production and we need ballista to be able to use our custom udf. As a user of ballista, I am reluctant to modify the source code of ballista directly, because it means that I need to recompile ballista myself, and in the future, when I want to upgrade ballista to the latest version of the community, I need to do more merges work. If I use the udf plugin, I only need to maintain the custom udf code. When I upgrade the version of ballista, I only need to modify the version number of the datafusion dependency in the code, and then recompile these udf dynamic libraries. I believe this is a more friendly way for those who actually use ballista as a computing engine.

In my opinion, people who use datafusion and people who use ballista are different people, and the udf plugin is more suitable for ballista than datafusion.

  1. People who use datafusion generally develop their own computing engines on the basis of datafusion. In this case, they often do not need udf plugins. They only need to put the udf code into their own computing engines, and they decide for themselves. When to call register_udf to register udf into datafusion. If needed, they can handle the serialization and deserialization of custom UDFs in their own computing engine to achieve distributed scheduling.
  2. People who use ballista generally only use ballista as a computing engine. They often do not have a deep understanding of the source code of datafusion. It is very difficult to directly modify the source code of ballista and datafusion. They may update the version of ballista frequently, and modifying the source code of ballista's datafusion means that each upgrade requires merge code and recompile, which is a very big burden for them. In particular, it should be pointed out that there is no way for udf to work in ballista now, because serialization and deserialization of udf need to know the specific implementation of udf, which cannot be achieved without modifying the source code of ballista and datafusion. The role of the udf plugin in this case is very obvious. They only need to maintain their own udf code and do not need to pay attention to the code changes of ballista's datafusion. And In ballista, we can serialization the udf with the udf's name, And then we deserialization udf use the udf's name get_scalar_udf_by_name(&self, fun_name: &str). These operations are completed through the trail UDFPlugin. Ballista does not need to know who has implemented the UDF plugin.
  3. I don't think scalar_functions and aggregate_functions in ExecutionContext need to be modified as these are for those who use datafusion but not ballista. So I think I should modify the code and migrate the plugin mod into the ballista crate instead of staying in datafusion.

Thanks a lot, can you give me more advice on these?

I sea in this pr : #1887 the scalar_function and aggregate_function deserialization and serialization is move to datafusion-serialization. In datafusion-serialization we need use udf plugin to serialization and deserialization udf. So, Where should we put plugin mod?

@alamb
Copy link
Contributor

alamb commented Mar 2, 2022

Yes. from this issue rust-lang/rfcs#600 I sea Rust doesn’t have a stable ABI, meaning different compiler versions can generate incompatible code. For these reasons, the UDF plug-in must be compiled using the same version of rustc as datafusion.

That makes sense -- it might help to add a comment to the source code explaining that rationale (so that future readers understand as well)

@alamb
Copy link
Contributor

alamb commented Mar 2, 2022

Yes, the udf plugin is designed for those who use Ballista as a computing engine, but do not want to modify the source code of ballista. ... I believe this is a more friendly way for those who actually use ballista as a computing engine.

I agree and this makes sense

In my opinion, people who use datafusion and people who use ballista are different people, and the udf plugin is more suitable for ballista than datafusion.

💯 agree here too

I don't think scalar_functions and aggregate_functions in ExecutionContext need to be modified as these are for those who use datafusion but not ballista. So I think I should modify the code and migrate the plugin mod into the ballista crate instead of staying in datafusion.

I think the idea of moving the plugin module into ballista makes a lot of sense to me

Thanks a lot, can you give me more advice on these?

Thank you for your clear explination and justification 👍

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your efforts @gaojun2048 . I skimmed through the code again and I think if we move the plugin manager to ballista it would be good to go from my perspective 👍 .

cc @andygrove @thinkharderdev @edrevo @matthewmturner @liukun4515 and @realno (I am sorry if you work together or already know about this work)

use std::any::Any;
use std::sync::Arc;

/// this examples show how to implements a udf plugin
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// this examples show how to implements a udf plugin
/// this examples show how to implements a udf plugin for Ballista

@thinkharderdev
Copy link
Contributor

Thank you for your advice @alamb . Yes, the udf plugin is designed for those who use Ballista as a computing engine, but do not want to modify the source code of ballista. We use ballista in production and we need ballista to be able to use our custom udf. As a user of ballista, I am reluctant to modify the source code of ballista directly, because it means that I need to recompile ballista myself, and in the future, when I want to upgrade ballista to the latest version of the community, I need to do more merges work. If I use the udf plugin, I only need to maintain the custom udf code. When I upgrade the version of ballista, I only need to modify the version number of the datafusion dependency in the code, and then recompile these udf dynamic libraries. I believe this is a more friendly way for those who actually use ballista as a computing engine.

In my opinion, people who use datafusion and people who use ballista are different people, and the udf plugin is more suitable for ballista than datafusion.

  1. People who use datafusion generally develop their own computing engines on the basis of datafusion. In this case, they often do not need udf plugins. They only need to put the udf code into their own computing engines, and they decide for themselves. When to call register_udf to register udf into datafusion. If needed, they can handle the serialization and deserialization of custom UDFs in their own computing engine to achieve distributed scheduling.
  2. People who use ballista generally only use ballista as a computing engine. They often do not have a deep understanding of the source code of datafusion. It is very difficult to directly modify the source code of ballista and datafusion. They may update the version of ballista frequently, and modifying the source code of ballista's datafusion means that each upgrade requires merge code and recompile, which is a very big burden for them. In particular, it should be pointed out that there is no way for udf to work in ballista now, because serialization and deserialization of udf need to know the specific implementation of udf, which cannot be achieved without modifying the source code of ballista and datafusion. The role of the udf plugin in this case is very obvious. They only need to maintain their own udf code and do not need to pay attention to the code changes of ballista's datafusion. And In ballista, we can serialization the udf with the udf's name, And then we deserialization udf use the udf's name get_scalar_udf_by_name(&self, fun_name: &str). These operations are completed through the trail UDFPlugin. Ballista does not need to know who has implemented the UDF plugin.
  3. I don't think scalar_functions and aggregate_functions in ExecutionContext need to be modified as these are for those who use datafusion but not ballista. So I think I should modify the code and migrate the plugin mod into the ballista crate instead of staying in datafusion.

Thanks a lot, can you give me more advice on these?

For what it's worth, with the changes in #1677 you wouldn't actually have to build Ballista from source or modify the ballista source. You can just use the ballista crate dependency and define your own main function which registers desired UDF/UDAF in the global execution context.

@EricJoy2048
Copy link
Member Author

EricJoy2048 commented Mar 3, 2022

Thank you for your advice @alamb . Yes, the udf plugin is designed for those who use Ballista as a computing engine, but do not want to modify the source code of ballista. We use ballista in production and we need ballista to be able to use our custom udf. As a user of ballista, I am reluctant to modify the source code of ballista directly, because it means that I need to recompile ballista myself, and in the future, when I want to upgrade ballista to the latest version of the community, I need to do more merges work. If I use the udf plugin, I only need to maintain the custom udf code. When I upgrade the version of ballista, I only need to modify the version number of the datafusion dependency in the code, and then recompile these udf dynamic libraries. I believe this is a more friendly way for those who actually use ballista as a computing engine.
In my opinion, people who use datafusion and people who use ballista are different people, and the udf plugin is more suitable for ballista than datafusion.

  1. People who use datafusion generally develop their own computing engines on the basis of datafusion. In this case, they often do not need udf plugins. They only need to put the udf code into their own computing engines, and they decide for themselves. When to call register_udf to register udf into datafusion. If needed, they can handle the serialization and deserialization of custom UDFs in their own computing engine to achieve distributed scheduling.
  2. People who use ballista generally only use ballista as a computing engine. They often do not have a deep understanding of the source code of datafusion. It is very difficult to directly modify the source code of ballista and datafusion. They may update the version of ballista frequently, and modifying the source code of ballista's datafusion means that each upgrade requires merge code and recompile, which is a very big burden for them. In particular, it should be pointed out that there is no way for udf to work in ballista now, because serialization and deserialization of udf need to know the specific implementation of udf, which cannot be achieved without modifying the source code of ballista and datafusion. The role of the udf plugin in this case is very obvious. They only need to maintain their own udf code and do not need to pay attention to the code changes of ballista's datafusion. And In ballista, we can serialization the udf with the udf's name, And then we deserialization udf use the udf's name get_scalar_udf_by_name(&self, fun_name: &str). These operations are completed through the trail UDFPlugin. Ballista does not need to know who has implemented the UDF plugin.
  3. I don't think scalar_functions and aggregate_functions in ExecutionContext need to be modified as these are for those who use datafusion but not ballista. So I think I should modify the code and migrate the plugin mod into the ballista crate instead of staying in datafusion.

Thanks a lot, can you give me more advice on these?

For what it's worth, with the changes in #1677 you wouldn't actually have to build Ballista from source or modify the ballista source. You can just use the ballista crate dependency and define your own main function which registers desired UDF/UDAF in the global execution context.

Ugh, I always thought that ballista was an out-of-the-box computing engine, like presto/impala, not a computing library, so I don't quite understand that using ballista also requires dependency ballista and defines its own main function. Of course, for those who want to develop their own computing engine based on ballista, this is indeed a good way, which means that the udf plugin does not need to be placed in the ballista crate, because they can maintain the udf plugin in their own projects, and Load udf plugins in their own defined main function and then register them in the global ExecutionContext. When serializing and deserializing LogicalPlan, the implementation of udf can be found through the incoming ExecutionContext.


fn try_into_logical_plan(
        &self,
        ctx: &ExecutionContext,
        extension_codec: &dyn LogicalExtensionCodec,
    ) -> Result<LogicalPlan, BallistaError>;

But I'm still not quite sure, ballista is an out-of-the-box compute engine like presto/impala. Or is it just a dependent library for someone else to implement their own computing engine like datafusion?

@liukun4515
Copy link
Contributor

liukun4515 commented Mar 3, 2022

Thank you for your advice @alamb . Yes, the udf plugin is designed for those who use Ballista as a computing engine, but do not want to modify the source code of ballista. We use ballista in production and we need ballista to be able to use our custom udf. As a user of ballista, I am reluctant to modify the source code of ballista directly, because it means that I need to recompile ballista myself, and in the future, when I want to upgrade ballista to the latest version of the community, I need to do more merges work. If I use the udf plugin, I only need to maintain the custom udf code. When I upgrade the version of ballista, I only need to modify the version number of the datafusion dependency in the code, and then recompile these udf dynamic libraries. I believe this is a more friendly way for those who actually use ballista as a computing engine.
In my opinion, people who use datafusion and people who use ballista are different people, and the udf plugin is more suitable for ballista than datafusion.

  1. People who use datafusion generally develop their own computing engines on the basis of datafusion. In this case, they often do not need udf plugins. They only need to put the udf code into their own computing engines, and they decide for themselves. When to call register_udf to register udf into datafusion. If needed, they can handle the serialization and deserialization of custom UDFs in their own computing engine to achieve distributed scheduling.
  2. People who use ballista generally only use ballista as a computing engine. They often do not have a deep understanding of the source code of datafusion. It is very difficult to directly modify the source code of ballista and datafusion. They may update the version of ballista frequently, and modifying the source code of ballista's datafusion means that each upgrade requires merge code and recompile, which is a very big burden for them. In particular, it should be pointed out that there is no way for udf to work in ballista now, because serialization and deserialization of udf need to know the specific implementation of udf, which cannot be achieved without modifying the source code of ballista and datafusion. The role of the udf plugin in this case is very obvious. They only need to maintain their own udf code and do not need to pay attention to the code changes of ballista's datafusion. And In ballista, we can serialization the udf with the udf's name, And then we deserialization udf use the udf's name get_scalar_udf_by_name(&self, fun_name: &str). These operations are completed through the trail UDFPlugin. Ballista does not need to know who has implemented the UDF plugin.
  3. I don't think scalar_functions and aggregate_functions in ExecutionContext need to be modified as these are for those who use datafusion but not ballista. So I think I should modify the code and migrate the plugin mod into the ballista crate instead of staying in datafusion.

Thanks a lot, can you give me more advice on these?

For what it's worth, with the changes in #1677 you wouldn't actually have to build Ballista from source or modify the ballista source. You can just use the ballista crate dependency and define your own main function which registers desired UDF/UDAF in the global execution context.

Ugh, I always thought that ballista was an out-of-the-box computing engine, like presto/impala, not a computing library, so I don't quite understand that using ballista also requires dependency ballista and defines its own main function. Of course, for those who want to develop their own computing engine based on ballista, this is indeed a good way, which means that the udf plugin does not need to be placed in the ballista crate, because they can maintain the udf plugin in their own projects, and Load udf plugins in their own defined main function and then register them in the global ExecutionContext. When serializing and deserializing LogicalPlan, the implementation of udf can be found through the incoming ExecutionContext.


fn try_into_logical_plan(
        &self,
        ctx: &ExecutionContext,
        extension_codec: &dyn LogicalExtensionCodec,
    ) -> Result<LogicalPlan, BallistaError>;

But I'm still not quite sure, ballista is an out-of-the-box compute engine like presto/impala. Or is it just a dependent library for someone else to implement their own computing engine like datafusion?

Agree with your opinion.
The ballista is a distributed computed engine like spark and others.
Users who want to use the udf and just don't need to recompile the codes.

@liukun4515
Copy link
Contributor

Thank you for your efforts @gaojun2048 . I skimmed through the code again and I think if we move the plugin manager to ballista it would be good to go from my perspective 👍 .

cc @andygrove @thinkharderdev @edrevo @matthewmturner @liukun4515 and @realno (I am sorry if you work together or already know about this work)

Maybe I need to take time to look at this.
I can finish reviewing this today.

@realno
Copy link
Contributor

realno commented Mar 3, 2022

In my opinion, people who use datafusion and people who use ballista are different people, and the udf plugin is more suitable for ballista than datafusion.

💯 agree here too

@alamb @gaojun2048 this is an interesting point, could you explain a bit more?

I feel ideally they should use the same programing interface (SQL or DataFrame), DataFusion provide computation on a single node and Ballista add a distributed layer. With this assumption, DF is the compute core wouldn't it make sense to have udf support in DF?

@EricJoy2048
Copy link
Member Author

I feel ideally they should use the same programing interface (SQL or DataFrame), DataFusion provide computation on a single node and Ballista add a distributed layer. With this assumption, DF is the compute core wouldn't it make sense to have udf support in DF?

I don’t know if my understanding is wrong. I always think that DF is just a computing library, which cannot be directly deployed in production. Those who use DF will use DF as a dependency of the project and then develop their computing engine based on DF. For example, Ballista is a distributed computing engine developed based on DF. Ballista is a mature computing engine just like Presto/spark. People who use Ballista only need to download and deploy Ballista to their machines to start the ballista service. They rarely care about how Ballista is implemented, so a A udf plugin that supports dynamic loading allows these people to define their own udf functions without modifying Ballista's source code.

I feel ideally they should use the same programing interface (SQL or DataFrame), DataFusion provide computation on a single node and Ballista add a distributed layer. With this assumption, DF is the compute core wouldn't it make sense to have udf support in DF?

Yes, it is important and required for DF to support udf. But for those who use DF, it is not necessary to support the udf plugin to dynamically load udf. Because for people who use DF as a dependency to develop their own calculation engine, such as Ballista. Imagine one, if Ballista and DF are not under the same repository, but two separate projects, as a Ballista developer, I need to add my own udf to meet my special analysis needs. What I'm most likely to do is to manage my own udf, such as writing the implementation of udf directly in the Ballista crate. Or add a udf plugin to Ballista like this pr, which supports dynamic loading of udfs developed by Ballista users (not Ballista developers). Then I decide when to call the register_udf method of the DF to register these udfs in the ExecutionContext so that the DF can be used for calculation. Of course, we can directly put the udf plugin in DF, but this feature is not necessary for DF, and doing so will make the register_udf method look redundant, but make the design of DF's udf not easy to understand.

So I would say that the people who need the udf plugin the most are those who use Ballista as a full-fledged computing engine, and they just download and deploy Ballista. They don't modify the source code of Ballista and DF because that would mean a better understanding of Ballista and DF. And once the source code of Ballista and DF is modified, it means that they need to invest more cost to merge and build when upgrading Ballista. But now if the user just downloads and deploys Ballista for use, there is no way for the user to register his udf into the DF. The core goal of the udf plugin is to provide an opportunity for those udfs that have not been compiled into the project to be discovered and registered in DF.

Finally, if we define Ballista's goal as a distributed implementation of datafusion, a library that needs to be used as a dependency of other projects, rather than a distributed computing engine (like presto/spark) that can be directly downloaded and deployed and used. It seems to me that the udf plugin is not necessary, because the core goal of the udf plugin is to provide an opportunity for those udfs that have not been compiled into the project to be discovered and registered in DF. Those projects that use ballista as depencency can manage their own udf and decide when to register their udf into DF.

@EricJoy2048
Copy link
Member Author

@alamb CI get stuck . Can you help me retry?

@alamb
Copy link
Contributor

alamb commented Mar 25, 2022

Hi @gaojun2048 -- I think github has been having some issues: https://www.githubstatus.com/history

I re kicked off the jobs here: https://github.com/apache/arrow-datafusion/actions/runs/2040619251

Hopefully they will complete this time

@EricJoy2048 EricJoy2048 requested review from alamb and jimexist March 26, 2022 10:53
@EricJoy2048
Copy link
Member Author

@liukun4515 @alamb @thinkharderdev Everything is ok. And the udaf test success now.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The datafusion changes look good to me. Thank you very much @gaojun2048. Can someone please review the Ballista changes?

Perhaps @liukun4515 @mingmwang @thinkharderdev has the time and expertise?

@thinkharderdev
Copy link
Contributor

The datafusion changes look good to me. Thank you very much @gaojun2048. Can someone please review the Ballista changes?

Perhaps @liukun4515 @mingmwang @thinkharderdev has the time and expertise?

I can review today

Copy link
Contributor

@thinkharderdev thinkharderdev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome!

@EricJoy2048
Copy link
Member Author

Awesome!

Today I tried to resolve the conflict, but I found it very difficult. In AsExecutionPlan.try_into_physical_plan SessionContext is removed. So I can not serialization and deserialization UDF with SessionContext. So I will update my code and serialization and deserialization UDF with udf_plugin.

@thinkharderdev
Copy link
Contributor

Awesome!

Today I tried to resolve the conflict, but I found it very difficult. In AsExecutionPlan.try_into_physical_plan SessionContext is removed. So I can not serialization and deserialization UDF with SessionContext. So I will update my code and serialization and deserialization UDF with udf_plugin.

It should take a FuntionRegisrty now (which will be a TaskContext) at runtime. I think we should use that since we can setup the TaskContext with any preloaded functions

@EricJoy2048
Copy link
Member Author

It should take a FuntionRegisrty now (which will be a TaskContext) at runtime. I think we should use that since we can setup the TaskContext with any preloaded functions

Ok. I will update code and use TaskContext to serialization and deserialization UDF

@thinkharderdev
Copy link
Contributor

It should take a FuntionRegisrty now (which will be a TaskContext) at runtime. I think we should use that since we can setup the TaskContext with any preloaded functions

Ok. I will update code and use TaskContext to serialization and deserialization UDF

Hi @gaojun2048. I had to implement this on our fork for our project so I went ahead and PR'd it here #2130. Hope that helps!

@EricJoy2048
Copy link
Member Author

#2130

Ok, Can I submit the plugin related code first, regardless of the serialization and deserialization parts of UDF?

@EricJoy2048
Copy link
Member Author

@thinkharderdev I push a sub PR of this PR. please help me review.

#2131

@mingmwang
Copy link
Contributor

It should take a FuntionRegisrty now (which will be a TaskContext) at runtime. I think we should use that since we can setup the TaskContext with any preloaded functions

Ok. I will update code and use TaskContext to serialization and deserialization UDF

Yes, there are several changes to SessionContext in those days. The Executor does not have a global SessionContext now.
You can have your UDF Plugin Manager load all the dynamic UDFs/UDAFs to Executor's member. I had added a TOTO note .

impl Executor {
    /// Create a new executor instance
    pub fn new(
        metadata: ExecutorRegistration,
        work_dir: &str,
        runtime: Arc<RuntimeEnv>,
    ) -> Self {
        Self {
            metadata,
            work_dir: work_dir.to_owned(),
            // TODO add logic to dynamically load UDF/UDAFs libs from files
            scalar_functions: HashMap::new(),
            aggregate_functions: HashMap::new(),
            runtime,
        }
    }
}

In Ballista Scheduler side, there is no global SessionContext either, SessionContext is created on users' requests.
You can add the UDF Plugin Manager to Ballista SchedulerServer, when the new session context was created, you can
call the register the UDF/UDAFs to the created session context.

/// Create a DataFusion session context that is compatible with Ballista Configuration
pub fn create_datafusion_context(
    config: &BallistaConfig,
    session_builder: SessionBuilder,
) -> Arc<SessionContext> {
    let config = SessionConfig::new()
        .with_target_partitions(config.default_shuffle_partitions())
        .with_batch_size(config.default_batch_size())
        .with_repartition_joins(config.repartition_joins())
        .with_repartition_aggregations(config.repartition_aggregations())
        .with_repartition_windows(config.repartition_windows())
        .with_parquet_pruning(config.parquet_pruning());
    let session_state = session_builder(config);
    Arc::new(SessionContext::with_state(session_state))
    /// Add logic to register UDF/UDFS to context.
}

@EricJoy2048
Copy link
Member Author

#2130

It should take a FuntionRegisrty now (which will be a TaskContext) at runtime. I think we should use that since we can setup the TaskContext with any preloaded functions

Ok. I will update code and use TaskContext to serialization and deserialization UDF

Yes, there are several changes to SessionContext in those days. The Executor does not have a global SessionContext now. You can have your UDF Plugin Manager load all the dynamic UDFs/UDAFs to Executor's member. I had added a TOTO note .

impl Executor {
    /// Create a new executor instance
    pub fn new(
        metadata: ExecutorRegistration,
        work_dir: &str,
        runtime: Arc<RuntimeEnv>,
    ) -> Self {
        Self {
            metadata,
            work_dir: work_dir.to_owned(),
            // TODO add logic to dynamically load UDF/UDAFs libs from files
            scalar_functions: HashMap::new(),
            aggregate_functions: HashMap::new(),
            runtime,
        }
    }
}

In Ballista Scheduler side, there is no global SessionContext either, SessionContext is created on users' requests. You can add the UDF Plugin Manager to Ballista SchedulerServer, when the new session context was created, you can call the register the UDF/UDAFs to the created session context.

/// Create a DataFusion session context that is compatible with Ballista Configuration
pub fn create_datafusion_context(
    config: &BallistaConfig,
    session_builder: SessionBuilder,
) -> Arc<SessionContext> {
    let config = SessionConfig::new()
        .with_target_partitions(config.default_shuffle_partitions())
        .with_batch_size(config.default_batch_size())
        .with_repartition_joins(config.repartition_joins())
        .with_repartition_aggregations(config.repartition_aggregations())
        .with_repartition_windows(config.repartition_windows())
        .with_parquet_pruning(config.parquet_pruning());
    let session_state = session_builder(config);
    Arc::new(SessionContext::with_state(session_state))
    /// Add logic to register UDF/UDFS to context.
}

From #2130 I found serialize is changing. So I push a sub PR #2131 which is only includes plugin manager.

1 similar comment
@EricJoy2048
Copy link
Member Author

#2130

It should take a FuntionRegisrty now (which will be a TaskContext) at runtime. I think we should use that since we can setup the TaskContext with any preloaded functions

Ok. I will update code and use TaskContext to serialization and deserialization UDF

Yes, there are several changes to SessionContext in those days. The Executor does not have a global SessionContext now. You can have your UDF Plugin Manager load all the dynamic UDFs/UDAFs to Executor's member. I had added a TOTO note .

impl Executor {
    /// Create a new executor instance
    pub fn new(
        metadata: ExecutorRegistration,
        work_dir: &str,
        runtime: Arc<RuntimeEnv>,
    ) -> Self {
        Self {
            metadata,
            work_dir: work_dir.to_owned(),
            // TODO add logic to dynamically load UDF/UDAFs libs from files
            scalar_functions: HashMap::new(),
            aggregate_functions: HashMap::new(),
            runtime,
        }
    }
}

In Ballista Scheduler side, there is no global SessionContext either, SessionContext is created on users' requests. You can add the UDF Plugin Manager to Ballista SchedulerServer, when the new session context was created, you can call the register the UDF/UDAFs to the created session context.

/// Create a DataFusion session context that is compatible with Ballista Configuration
pub fn create_datafusion_context(
    config: &BallistaConfig,
    session_builder: SessionBuilder,
) -> Arc<SessionContext> {
    let config = SessionConfig::new()
        .with_target_partitions(config.default_shuffle_partitions())
        .with_batch_size(config.default_batch_size())
        .with_repartition_joins(config.repartition_joins())
        .with_repartition_aggregations(config.repartition_aggregations())
        .with_repartition_windows(config.repartition_windows())
        .with_parquet_pruning(config.parquet_pruning());
    let session_state = session_builder(config);
    Arc::new(SessionContext::with_state(session_state))
    /// Add logic to register UDF/UDFS to context.
}

From #2130 I found serialize is changing. So I push a sub PR #2131 which is only includes plugin manager.

@alamb
Copy link
Contributor

alamb commented Apr 15, 2022

I believe this feature was added in #2131 and so this PR is no longer needed so closing. Please let me know if I got that wrong / reopen it so

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

UDF/UDAF plugin
9 participants