Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use SessionContext to parse Expr protobuf #2024

Merged
merged 4 commits into from
Mar 20, 2022
Merged

Conversation

thinkharderdev
Copy link
Contributor

@thinkharderdev thinkharderdev commented Mar 16, 2022

Which issue does this PR close?

Partially addresses #1882

Rationale for this change

Add serialization of UDF/UDAF to Ballista and datafusion-proto. We serialize the UDF/UDAF by name and then use the functions registered in the SessionContext to deserialize.

Users can either initialize Ballista instances with an SessionContext registered with the relevant functions or use the plugin mechanism being built as part of #1881.

What changes are included in this PR?

Parsing logical Expr now requires an SessionContext. Additionally, extension codecs in Ballista now need an ExecutionContext.

Are there any user-facing changes?

Yes, the signatures for PhysicalExtensionCodec.try_decode and LogicalExtensionCodec.try_decode now take a reference to an SessionContext.

# Conflicts:
#	ballista/rust/core/src/serde/physical_plan/mod.rs
#	datafusion-proto/src/from_proto.rs
@thinkharderdev thinkharderdev changed the title Use ExecutionContext to parse Expr protobuf Use SessionContext to parse Expr protobuf Mar 16, 2022
@yjshen
Copy link
Member

yjshen commented Mar 16, 2022

cc @yahoNanJing @mingmwang

@EricJoy2048
Copy link
Member

image

Does this place need to be modified ?

@thinkharderdev
Copy link
Contributor Author

image

Does this place need to be modified ?

No. The default AsLogicalPlan instance was updated to pass the context when deserializing.

Copy link
Member

@yjshen yjshen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, Thanks @thinkharderdev!

@alamb alamb merged commit c1f6269 into apache:master Mar 20, 2022
@mingmwang
Copy link
Contributor

mingmwang commented Mar 23, 2022

Sorry for the late response. Looks like this PR is a little problematic and it has conflicts with the multiple tenancy SessionContexts. The major problem is in the Executor side, Executor side also need to run those methods to parse the Expr protobuf to physical plan, but Executor side should not have SessionContexts, the optimizers, planner and physical planner do not make sense in Executor side.

And for the pluggable UDFs/UDAFs, I think we should have global level UDFs/UDAFs and session level UDFs/UDAFs.
For example all the build-in UDFs/UDAFs should be globally available. Different users can also upload/add his own functions
to his SessionContext.

@mingmwang
Copy link
Contributor

mingmwang commented Mar 23, 2022

Hi, @thinkharderdev

I think this PR doesn't resolve the issue. One of the key problem is, users register the UDF/UDFS with SessionContext.register_udf(), but how does the SessionContexts (ExecutionContext) in all the Executors know the registered UDF? The information is not propagate to executor side.

For DataFusion, SessionContext.register_udf() is not a problem. But for Baliista, there are three different kind of SessionContext:

  1. the context created in Ballista client and uses the BallistaQueryPlanner to send logical plans to Scheduler, see the method create_df_ctx_with_ballista_query_planner()

  2. the context created in Scheduler

  3. the context created in Executor

I'm going to remove the context created in Executor because I think it does not make sense to have Executor hold a global SessionContext.

Hi, @yjshen @alamb

Please share your thoughts.

@thinkharderdev
Copy link
Contributor Author

Hi, @thinkharderdev

I think this PR doesn't resolve the issue. One of the key problem is, users register the UDF/UDFS with SessionContext.register_udf(), but how does the SessionContexts (ExecutionContext) in all the Executors know the registered UDF? The information is not propagate to executor side.

For DataFusion, SessionContext.register_udf() is not a problem. But for Baliista, there are three different kind of SessionContext:

  1. the context created in Ballista client and uses the BallistaQueryPlanner to send logical plans to Scheduler, see the method create_df_ctx_with_ballista_query_planner()
  2. the context created in Scheduler
  3. the context created in Executor

I'm going to remove the context created in Executor because I think it does not make sense to have Executor hold a global SessionContext.

Hi, @yjshen @alamb

Please share your thoughts.

I think the idea is that there are two ways to register the UDF/UDAF in the executors:

  1. Build your own executor where you basically write your own entrypoint and setup the SessionContext with any extensions (codecs, udfs, udafs, optimizers, planners, etc) registered at startup.
  2. Use the mechanism being developed in add udf/udaf plugin #1881 to load udf/udaf as plugins which are put in a plugin directory and dynamically linked into the out of the box scheduler/executor binary.

Based on the discussion in that PR we were thinking that the plugin mechanism could use the SessionContext to do the registration of plugins at application start so internally we could use the same mechanism for both approaches. That is, you can either use plugins or roll your own main method but you still end up with extensions registered in the SessionContext and we use that for deserializing udfs/udafs in the plan.

Ultimately I think we do need some sort of context in the executor if we are going to support all the extension points that DataFusion provides in Ballista.

@mingmwang
Copy link
Contributor

mingmwang commented Mar 23, 2022

Hi, @thinkharderdev
I think this PR doesn't resolve the issue. One of the key problem is, users register the UDF/UDFS with SessionContext.register_udf(), but how does the SessionContexts (ExecutionContext) in all the Executors know the registered UDF? The information is not propagate to executor side.
For DataFusion, SessionContext.register_udf() is not a problem. But for Baliista, there are three different kind of SessionContext:

  1. the context created in Ballista client and uses the BallistaQueryPlanner to send logical plans to Scheduler, see the method create_df_ctx_with_ballista_query_planner()
  2. the context created in Scheduler
  3. the context created in Executor

I'm going to remove the context created in Executor because I think it does not make sense to have Executor hold a global SessionContext.
Hi, @yjshen @alamb
Please share your thoughts.

I think the idea is that there are two ways to register the UDF/UDAF in the executors:

  1. Build your own executor where you basically write your own entrypoint and setup the SessionContext with any extensions (codecs, udfs, udafs, optimizers, planners, etc) registered at startup.
  2. Use the mechanism being developed in add udf/udaf plugin #1881 to load udf/udaf as plugins which are put in a plugin directory and dynamically linked into the out of the box scheduler/executor binary.

Based on the discussion in that PR we were thinking that the plugin mechanism could use the SessionContext to do the registration of plugins at application start so internally we could use the same mechanism for both approaches. That is, you can either use plugins or roll your own main method but you still end up with extensions registered in the SessionContext and we use that for deserializing udfs/udafs in the plan.

Ultimately I think we do need some sort of context in the executor if we are going to support all the extension points that DataFusion provides in Ballista.

Hi, @thinkharderdev

I'm going to remove the SessionContext in Executor code and make the Executor itself implement FunctionRegistry trait
and pass the Arc<dyn FunctionRegistry> down to the try_into_physical_plan()and other required methods.
The major reason is Executor side does not need optimizers, planner, actually Executor side should not run any planning logic. The planner logic should only runs in Scheduler side. And to support multiple tenancy SessionContext, it is weird to keep a global SessionContext in Executor side.

For udf/udaf registration in Executor side, I did not get a chance to scan the code in #1881. The two approaches you just mentioned should both work. And to support session level udf/udaf(temporary functions), we also need a way to propagate the temp functions' meta data and lib url or lib files from Scheduler side to Executor side.

@thinkharderdev
Copy link
Contributor Author

Hi, @thinkharderdev
I think this PR doesn't resolve the issue. One of the key problem is, users register the UDF/UDFS with SessionContext.register_udf(), but how does the SessionContexts (ExecutionContext) in all the Executors know the registered UDF? The information is not propagate to executor side.
For DataFusion, SessionContext.register_udf() is not a problem. But for Baliista, there are three different kind of SessionContext:

  1. the context created in Ballista client and uses the BallistaQueryPlanner to send logical plans to Scheduler, see the method create_df_ctx_with_ballista_query_planner()
  2. the context created in Scheduler
  3. the context created in Executor

I'm going to remove the context created in Executor because I think it does not make sense to have Executor hold a global SessionContext.
Hi, @yjshen @alamb
Please share your thoughts.

I think the idea is that there are two ways to register the UDF/UDAF in the executors:

  1. Build your own executor where you basically write your own entrypoint and setup the SessionContext with any extensions (codecs, udfs, udafs, optimizers, planners, etc) registered at startup.
  2. Use the mechanism being developed in add udf/udaf plugin #1881 to load udf/udaf as plugins which are put in a plugin directory and dynamically linked into the out of the box scheduler/executor binary.

Based on the discussion in that PR we were thinking that the plugin mechanism could use the SessionContext to do the registration of plugins at application start so internally we could use the same mechanism for both approaches. That is, you can either use plugins or roll your own main method but you still end up with extensions registered in the SessionContext and we use that for deserializing udfs/udafs in the plan.
Ultimately I think we do need some sort of context in the executor if we are going to support all the extension points that DataFusion provides in Ballista.

Hi, @thinkharderdev

I'm going to remove the SessionContext in Executor code and make the Executor itself implement FunctionRegistry trait and pass the Arc<dyn FunctionRegistry> down to the try_into_physical_plan()and other required methods. The major reason is Executor side does not need optimizers, planner, actually Executor side should not run any planning logic. The planner logic should only runs in Scheduler side. And to support multiple tenancy SessionContext, it is weird to keep a global SessionContext in Executor side.

For udf/udaf registration in Executor side, I did not get a chance to scan the code in #1881. The two approaches you just mentioned should both work. And to support session level udf/udaf(temporary functions), we also need a way to propagate the temp functions' meta data and lib url or lib files from Scheduler side to Executor side.

Can I suggest using TaskContext instead. We can add the FunctionRegistry to the TaskContext and pass that in. We would still need the ObjectStoreRegistry to decode the physical plan protobuf and I think it is a good idea to have a more general way of adding extension points into the executor.

@alamb
Copy link
Contributor

alamb commented Mar 23, 2022

I don't have anything substantial to add to this conversation -- it sounds like the challenges are well understood and there are good ideas of how to do the mapping from "function name" to "UDF implementation" on the executor side.

It is great discussion to see.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants