From f15a81305a86256d50b91fb2424742208a6b61eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Fri, 6 Feb 2026 14:13:53 +0000 Subject: [PATCH 1/5] Move `QueryPlanner` to datafusion session --- .../examples/dataframe/cache_factory.rs | 13 +++++-- .../examples/relation_planner/table_sample.rs | 18 +++++++--- datafusion/core/src/execution/context/mod.rs | 21 +++++------- .../core/src/execution/session_state.rs | 15 +++++--- .../tests/user_defined/user_defined_plan.rs | 15 ++++++-- datafusion/session/src/lib.rs | 2 ++ datafusion/session/src/planner.rs | 34 +++++++++++++++++++ 7 files changed, 94 insertions(+), 24 deletions(-) create mode 100644 datafusion/session/src/planner.rs diff --git a/datafusion-examples/examples/dataframe/cache_factory.rs b/datafusion-examples/examples/dataframe/cache_factory.rs index a92c3dc4ce26a..ee0b2ff2b4546 100644 --- a/datafusion-examples/examples/dataframe/cache_factory.rs +++ b/datafusion-examples/examples/dataframe/cache_factory.rs @@ -23,6 +23,7 @@ use std::sync::{Arc, RwLock}; use arrow::array::RecordBatch; use async_trait::async_trait; +use datafusion::catalog::Session; use datafusion::catalog::memory::MemorySourceConfig; use datafusion::common::DFSchemaRef; use datafusion::error::Result; @@ -37,7 +38,7 @@ use datafusion::physical_planner::{ DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner, }; use datafusion::prelude::*; -use datafusion_common::HashMap; +use datafusion_common::{HashMap, exec_datafusion_err}; use datafusion_examples::utils::{datasets::ExampleDataset, write_csv_to_parquet}; /// This example demonstrates how to leverage [CacheFactory] to implement custom caching strategies for dataframes in DataFusion. @@ -198,8 +199,16 @@ impl QueryPlanner for CacheNodeQueryPlanner { async fn create_physical_plan( &self, logical_plan: &LogicalPlan, - session_state: &SessionState, + session: &dyn Session, ) -> Result> { + let session_state = + session + .as_any() + .downcast_ref::() + .ok_or_else(|| { + exec_datafusion_err!("Failed to downcast Session to SessionState") + })?; + let physical_planner = DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new( CacheNodePlanner { diff --git a/datafusion-examples/examples/relation_planner/table_sample.rs b/datafusion-examples/examples/relation_planner/table_sample.rs index 657432ef31362..7841af7c096cb 100644 --- a/datafusion-examples/examples/relation_planner/table_sample.rs +++ b/datafusion-examples/examples/relation_planner/table_sample.rs @@ -101,7 +101,9 @@ use futures::{ use rand::{Rng, SeedableRng, rngs::StdRng}; use tonic::async_trait; -use datafusion::optimizer::simplify_expressions::simplify_literal::parse_literal; +use datafusion::{ + catalog::Session, optimizer::simplify_expressions::simplify_literal::parse_literal, +}; use datafusion::{ execution::{ RecordBatchStream, SendableRecordBatchStream, SessionState, SessionStateBuilder, @@ -116,8 +118,8 @@ use datafusion::{ prelude::*, }; use datafusion_common::{ - DFSchemaRef, DataFusionError, Result, Statistics, internal_err, not_impl_err, - plan_datafusion_err, plan_err, + DFSchemaRef, DataFusionError, Result, Statistics, exec_datafusion_err, internal_err, + not_impl_err, plan_datafusion_err, plan_err, }; use datafusion_expr::{ UserDefinedLogicalNode, UserDefinedLogicalNodeCore, @@ -564,8 +566,16 @@ impl QueryPlanner for TableSampleQueryPlanner { async fn create_physical_plan( &self, logical_plan: &LogicalPlan, - session_state: &SessionState, + session: &dyn Session, ) -> Result> { + let session_state = + session + .as_any() + .downcast_ref::() + .ok_or_else(|| { + exec_datafusion_err!("Failed to downcast Session to SessionState") + })?; + let planner = DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new( TableSampleExtensionPlanner, )]); diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index b6c606ff467f9..08b1046736aef 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -56,6 +56,7 @@ use crate::{ // backwards compatibility pub use crate::execution::session_state::SessionState; +pub use datafusion_session::QueryPlanner; use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; @@ -1970,17 +1971,6 @@ impl From for SessionStateBuilder { } } -/// A planner used to add extensions to DataFusion logical and physical plans. -#[async_trait] -pub trait QueryPlanner: Debug { - /// Given a [`LogicalPlan`], create an [`ExecutionPlan`] suitable for execution - async fn create_physical_plan( - &self, - logical_plan: &LogicalPlan, - session_state: &SessionState, - ) -> Result>; -} - /// Interface for handling `CREATE FUNCTION` statements and interacting with /// [SessionState] to create and register functions ([`ScalarUDF`], /// [`AggregateUDF`], [`WindowUDF`], and [`TableFunctionImpl`]) dynamically. @@ -2162,6 +2152,7 @@ mod tests { use crate::test_util::{plan_and_collect, populate_csv_partitions}; use arrow::datatypes::{DataType, TimeUnit}; use datafusion_common::DataFusionError; + use datafusion_session::Session; use std::error::Error; use std::path::PathBuf; @@ -2643,8 +2634,14 @@ mod tests { async fn create_physical_plan( &self, logical_plan: &LogicalPlan, - session_state: &SessionState, + session: &dyn Session, ) -> Result> { + let session_state = session + .as_any() + .downcast_ref::() + .ok_or_else(|| { + exec_datafusion_err!("Failed to downcast Session to SessionState") + })?; let physical_planner = MyPhysicalPlanner {}; physical_planner .create_physical_plan(logical_plan, session_state) diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index 9560616c1b6da..16ead50bfe7dc 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -28,7 +28,7 @@ use crate::datasource::file_format::FileFormatFactory; #[cfg(feature = "sql")] use crate::datasource::provider_as_source; use crate::execution::SessionStateDefaults; -use crate::execution::context::{EmptySerializerRegistry, FunctionFactory, QueryPlanner}; +use crate::execution::context::{EmptySerializerRegistry, FunctionFactory}; use crate::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}; use arrow_schema::{DataType, FieldRef}; use datafusion_catalog::MemoryCatalogProviderList; @@ -44,7 +44,7 @@ use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan}; use datafusion_common::tree_node::TreeNode; use datafusion_common::{ DFSchema, DataFusionError, ResolvedTableReference, TableReference, config_err, - exec_err, plan_datafusion_err, + exec_datafusion_err, exec_err, plan_datafusion_err, }; use datafusion_execution::TaskContext; use datafusion_execution::config::SessionConfig; @@ -68,7 +68,7 @@ use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_optimizer::optimizer::PhysicalOptimizer; use datafusion_physical_plan::ExecutionPlan; -use datafusion_session::Session; +use datafusion_session::{QueryPlanner, Session}; #[cfg(feature = "sql")] use datafusion_sql::{ parser::{DFParserBuilder, Statement}, @@ -2115,8 +2115,15 @@ impl QueryPlanner for DefaultQueryPlanner { async fn create_physical_plan( &self, logical_plan: &LogicalPlan, - session_state: &SessionState, + session: &dyn Session, ) -> datafusion_common::Result> { + let session_state = + session + .as_any() + .downcast_ref::() + .ok_or_else(|| { + exec_datafusion_err!("Failed to downcast Session to SessionState") + })?; let planner = DefaultPhysicalPlanner::default(); planner .create_physical_plan(logical_plan, session_state) diff --git a/datafusion/core/tests/user_defined/user_defined_plan.rs b/datafusion/core/tests/user_defined/user_defined_plan.rs index d53e076739608..11ddc89c3aaf7 100644 --- a/datafusion/core/tests/user_defined/user_defined_plan.rs +++ b/datafusion/core/tests/user_defined/user_defined_plan.rs @@ -91,7 +91,9 @@ use datafusion::{ }; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::{ScalarValue, assert_eq_or_internal_err, assert_or_internal_err}; +use datafusion_common::{ + ScalarValue, assert_eq_or_internal_err, assert_or_internal_err, exec_datafusion_err, +}; use datafusion_expr::{FetchType, InvariantLevel, Projection, SortExpr}; use datafusion_optimizer::AnalyzerRule; use datafusion_optimizer::optimizer::ApplyOrder; @@ -99,6 +101,7 @@ use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; use async_trait::async_trait; use datafusion_common::cast::as_string_view_array; +use datafusion_session::Session; use futures::{Stream, StreamExt}; /// Execute the specified sql and return the resulting record batches @@ -466,8 +469,16 @@ impl QueryPlanner for TopKQueryPlanner { async fn create_physical_plan( &self, logical_plan: &LogicalPlan, - session_state: &SessionState, + session: &dyn Session, ) -> Result> { + let session_state = + session + .as_any() + .downcast_ref::() + .ok_or_else(|| { + exec_datafusion_err!("Failed to downcast Session to SessionState") + })?; + // Teach the default physical planner how to plan TopK nodes. let physical_planner = DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new( diff --git a/datafusion/session/src/lib.rs b/datafusion/session/src/lib.rs index 11f734e757452..7d356586837b1 100644 --- a/datafusion/session/src/lib.rs +++ b/datafusion/session/src/lib.rs @@ -36,6 +36,8 @@ //! * Runtime environment configuration //! * Query state persistence +pub mod planner; pub mod session; +pub use crate::planner::QueryPlanner; pub use crate::session::{Session, SessionStore}; diff --git a/datafusion/session/src/planner.rs b/datafusion/session/src/planner.rs new file mode 100644 index 0000000000000..1268ac0848ec7 --- /dev/null +++ b/datafusion/session/src/planner.rs @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::Session; +use async_trait::async_trait; +use datafusion_expr::LogicalPlan; +use datafusion_physical_plan::ExecutionPlan; +use std::fmt::Debug; +use std::sync::Arc; + +/// A planner used to add extensions to DataFusion logical and physical plans. +#[async_trait] +pub trait QueryPlanner: Debug { + /// Given a [`LogicalPlan`], create an [`ExecutionPlan`] suitable for execution + async fn create_physical_plan( + &self, + logical_plan: &LogicalPlan, + session_state: &dyn Session, + ) -> datafusion_common::Result>; +} From 62cf3d8645fafbe743915540b63e77a5268c28a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Fri, 6 Feb 2026 16:45:34 +0000 Subject: [PATCH 2/5] initial implementation --- datafusion/execution/src/task.rs | 2 +- datafusion/ffi/src/session/mod.rs | 1 + datafusion/ffi/src/session/planner.rs | 287 ++++++++++++++++++++++++++ datafusion/session/src/planner.rs | 2 +- 4 files changed, 290 insertions(+), 2 deletions(-) create mode 100644 datafusion/ffi/src/session/planner.rs diff --git a/datafusion/execution/src/task.rs b/datafusion/execution/src/task.rs index 38f31cf4629eb..823e8f76fd30c 100644 --- a/datafusion/execution/src/task.rs +++ b/datafusion/execution/src/task.rs @@ -212,7 +212,7 @@ impl FunctionRegistry for TaskContext { } /// Produce the [`TaskContext`]. -pub trait TaskContextProvider { +pub trait TaskContextProvider: Sync + Send { fn task_ctx(&self) -> Arc; } diff --git a/datafusion/ffi/src/session/mod.rs b/datafusion/ffi/src/session/mod.rs index aa910abb9149a..df2dd0354f365 100644 --- a/datafusion/ffi/src/session/mod.rs +++ b/datafusion/ffi/src/session/mod.rs @@ -60,6 +60,7 @@ use crate::util::FFIResult; use crate::{df_result, rresult, rresult_return}; pub mod config; +pub mod planner; /// A stable struct for sharing [`Session`] across FFI boundaries. /// diff --git a/datafusion/ffi/src/session/planner.rs b/datafusion/ffi/src/session/planner.rs new file mode 100644 index 0000000000000..f9f120ab216ea --- /dev/null +++ b/datafusion/ffi/src/session/planner.rs @@ -0,0 +1,287 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +use std::{ffi::c_void, sync::Arc}; + +use abi_stable::{StableAbi, std_types::RVec}; +use async_ffi::{FfiFuture, FutureExt}; +use async_trait::async_trait; +use datafusion_execution::{TaskContext, TaskContextProvider}; +use datafusion_expr::LogicalPlan; +use datafusion_physical_plan::ExecutionPlan; +use datafusion_proto::{ + bytes::{ + logical_plan_from_bytes_with_extension_codec, + logical_plan_to_bytes_with_extension_codec, + }, + logical_plan::{DefaultLogicalExtensionCodec, LogicalExtensionCodec}, +}; +use datafusion_session::{QueryPlanner, Session}; + +use crate::{ + df_result, + execution::FFI_TaskContextProvider, + execution_plan::FFI_ExecutionPlan, + proto::logical_extension_codec::FFI_LogicalExtensionCodec, + rresult, rresult_return, + session::{FFI_SessionRef, ForeignSession}, + util::FFIResult, +}; + +#[repr(C)] +#[derive(Debug, StableAbi)] +pub struct FFI_QueryPlanner { + // it would make sense, for ballista, to get access to this method. + // as the plan is going to be decode at the scheduler side. + // + // at the moment,FFI_SessionRef is not public, so we'd need to change it + /// Given a [`LogicalPlan`], create an [`ExecutionPlan`] suitable for execution + create_physical_plan: unsafe extern "C" fn( + &Self, + logical_plan_serialized: RVec, + session: &FFI_SessionRef, + ) + -> FfiFuture>, + + /// Release the memory of the private data when it is no longer being used. + pub release: unsafe extern "C" fn(arg: &mut Self), + + /// Internal data. This is only to be accessed by the planner of the plan. + /// The foreign library should never attempt to access this data. + pub private_data: *mut c_void, + + /// Used to create a clone on the planner . This should + /// only need to be called by the receiver of the plan. + pub clone: unsafe extern "C" fn(plan: &Self) -> Self, + + /// Utility to identify when FFI objects are accessed locally through + /// the foreign interface. See [`crate::get_library_marker_id`] and + /// the crate's `README.md` for more information. + pub library_marker_id: extern "C" fn() -> usize, +} + +unsafe impl Send for FFI_QueryPlanner {} +unsafe impl Sync for FFI_QueryPlanner {} + +struct QueryPlannerPrivateData { + planner: Arc, +} + +impl FFI_QueryPlanner { + fn inner(&self) -> &Arc { + unsafe { + let private_data = self.private_data as *const QueryPlannerPrivateData; + &(*private_data).planner + } + } +} + +unsafe extern "C" fn release_fn_wrapper(ctx: &mut FFI_QueryPlanner) { + unsafe { + let private_data = + Box::from_raw(ctx.private_data as *mut QueryPlannerPrivateData); + drop(private_data); + } +} + +unsafe extern "C" fn create_physical_plan_fn_wrapper( + planner: &FFI_QueryPlanner, + logical_plan_serialized: RVec, + session: &FFI_SessionRef, +) -> FfiFuture> { + unsafe { + let planner = Arc::clone(planner.inner()); + let codec: Arc = (&session.logical_codec).into(); + let runtime = session.runtime().clone(); + let session = ForeignSession::try_from(session); + + async move { + let session = rresult_return!(session); + let task_ctx: Arc = session.task_ctx(); + + let logical_plan = + rresult_return!(logical_plan_from_bytes_with_extension_codec( + logical_plan_serialized.as_slice(), + task_ctx.as_ref(), + codec.as_ref() + )); + + let physical_plan = + planner.create_physical_plan(&logical_plan, &session).await; + + rresult!(physical_plan.map(|plan| FFI_ExecutionPlan::new(plan, runtime))) + } + .into_ffi() + } +} + +unsafe extern "C" fn clone_fn_wrapper(planner: &FFI_QueryPlanner) -> FFI_QueryPlanner { + let planner = Arc::clone(planner.inner()); + planner.into() +} + +impl Drop for FFI_QueryPlanner { + fn drop(&mut self) { + unsafe { (self.release)(self) } + } +} + +impl Clone for FFI_QueryPlanner { + fn clone(&self) -> Self { + unsafe { (self.clone)(self) } + } +} + +#[derive(Debug)] +pub struct ForeignQueryPlanner( + pub FFI_QueryPlanner, + pub Arc, +); + +impl From> for FFI_QueryPlanner { + fn from(planner: Arc) -> Self { + let private_data = Box::new(QueryPlannerPrivateData { planner }); + + FFI_QueryPlanner { + create_physical_plan: create_physical_plan_fn_wrapper, + release: release_fn_wrapper, + private_data: Box::into_raw(private_data) as *mut c_void, + library_marker_id: crate::get_library_marker_id, + clone: clone_fn_wrapper, + } + } +} + +impl From<&FFI_QueryPlanner> for Arc { + fn from(planner: &FFI_QueryPlanner) -> Self { + if (planner.library_marker_id)() == crate::get_library_marker_id() { + Arc::clone(planner.inner()) + } else { + Arc::new(ForeignQueryPlanner( + planner.clone(), + Arc::new(DefaultLogicalExtensionCodec {}), + )) + } + } +} + +impl ForeignQueryPlanner { + pub fn new(planner: FFI_QueryPlanner) -> Self { + Self(planner, Arc::new(DefaultLogicalExtensionCodec {})) + } + + pub fn new_with_logical_codec( + planner: FFI_QueryPlanner, + codec: Arc, + ) -> Self { + Self(planner, codec) + } +} + +#[async_trait] +impl QueryPlanner for ForeignQueryPlanner { + /// Given a [`LogicalPlan`], create an [`ExecutionPlan`] suitable for execution + async fn create_physical_plan( + &self, + logical_plan: &LogicalPlan, + session_state: &dyn Session, + ) -> datafusion_common::Result> { + let logical_plan_buf = + logical_plan_to_bytes_with_extension_codec(logical_plan, self.1.as_ref())?; + let task_ctx = session_state.task_ctx(); + + // I'm not sure if there is better way to extract + // context provider + let task_ctx_provider: Arc = + Arc::new(ConstantContextProvider { ctx: task_ctx }); + + let task_ctx_provider: FFI_TaskContextProvider = (&task_ctx_provider).into(); + + let logical_codec = + FFI_LogicalExtensionCodec::new(Arc::clone(&self.1), None, task_ctx_provider); + + let session_ref = FFI_SessionRef::new(session_state, None, logical_codec); + + let plan = df_result!(unsafe { + (self.0.create_physical_plan)( + &self.0, + logical_plan_buf.as_ref().into(), + &session_ref, + ) + .await + })?; + + Ok((&plan).try_into()?) + } +} + +// this is temporary if there is better way to do this +struct ConstantContextProvider { + ctx: Arc, +} + +impl TaskContextProvider for ConstantContextProvider { + fn task_ctx(&self) -> Arc { + Arc::clone(&self.ctx) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use datafusion_expr::LogicalPlan; + use datafusion_physical_plan::{ExecutionPlan, empty::EmptyExec}; + use datafusion_session::{QueryPlanner, Session}; + + use crate::session::planner::{FFI_QueryPlanner, ForeignQueryPlanner}; + + #[derive(Debug, Default)] + struct DummyPlanner {} + + #[async_trait::async_trait] + impl QueryPlanner for DummyPlanner { + async fn create_physical_plan( + &self, + logical_plan: &LogicalPlan, + _session_state: &dyn Session, + ) -> datafusion_common::Result> { + let schema = logical_plan.schema().as_arrow().clone(); + // will need better test + Ok(Arc::new(EmptyExec::new(Arc::new(schema)))) + } + } + + #[tokio::test] + async fn test_end_to_end() -> datafusion::common::Result<()> { + let (ctx, _) = crate::util::tests::test_session_and_ctx(); + + let df = ctx.sql("select 1 as i").await?; + let logical_plan = df.logical_plan(); + + let planner: Arc = Arc::new(DummyPlanner::default()); + + let ffi_planner: FFI_QueryPlanner = planner.into(); + let foreign_planner: ForeignQueryPlanner = ForeignQueryPlanner::new(ffi_planner); + + let empty_exec = foreign_planner + .create_physical_plan(logical_plan, &ctx.state()) + .await?; + + assert!(empty_exec.as_any().downcast_ref::().is_some()); + Ok(()) + } +} diff --git a/datafusion/session/src/planner.rs b/datafusion/session/src/planner.rs index 1268ac0848ec7..06d7415ce4a4a 100644 --- a/datafusion/session/src/planner.rs +++ b/datafusion/session/src/planner.rs @@ -24,7 +24,7 @@ use std::sync::Arc; /// A planner used to add extensions to DataFusion logical and physical plans. #[async_trait] -pub trait QueryPlanner: Debug { +pub trait QueryPlanner: Debug + Send + Sync { /// Given a [`LogicalPlan`], create an [`ExecutionPlan`] suitable for execution async fn create_physical_plan( &self, From 46015d5f611b36ce7dc927190389bf80a067acb4 Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Tue, 10 Feb 2026 17:13:24 -0500 Subject: [PATCH 3/5] Add recommendations for query planner ffi code (#1) * Add recommendations for query planner ffi code * remove cruft --- datafusion/ffi/src/session/mod.rs | 8 +- datafusion/ffi/src/session/planner.rs | 170 +++++++++++++------------- 2 files changed, 91 insertions(+), 87 deletions(-) diff --git a/datafusion/ffi/src/session/mod.rs b/datafusion/ffi/src/session/mod.rs index df2dd0354f365..0c8169345c4ce 100644 --- a/datafusion/ffi/src/session/mod.rs +++ b/datafusion/ffi/src/session/mod.rs @@ -38,7 +38,9 @@ use datafusion_expr::{ }; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_plan::ExecutionPlan; -use datafusion_proto::bytes::{logical_plan_from_bytes, logical_plan_to_bytes}; +use datafusion_proto::bytes::{ + logical_plan_from_bytes, logical_plan_to_bytes_with_extension_codec, +}; use datafusion_proto::logical_plan::LogicalExtensionCodec; use datafusion_proto::logical_plan::from_proto::parse_expr; use datafusion_proto::logical_plan::to_proto::serialize_expr; @@ -468,8 +470,10 @@ impl Session for ForeignSession { &self, logical_plan: &LogicalPlan, ) -> datafusion_common::Result> { + let codec: Arc = (&self.session.logical_codec).into(); unsafe { - let logical_plan = logical_plan_to_bytes(logical_plan)?; + let logical_plan = + logical_plan_to_bytes_with_extension_codec(logical_plan, codec.as_ref())?; let physical_plan = df_result!( (self.session.create_physical_plan)( &self.session, diff --git a/datafusion/ffi/src/session/planner.rs b/datafusion/ffi/src/session/planner.rs index f9f120ab216ea..a056ded15f682 100644 --- a/datafusion/ffi/src/session/planner.rs +++ b/datafusion/ffi/src/session/planner.rs @@ -14,32 +14,33 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -use std::{ffi::c_void, sync::Arc}; +use std::ffi::c_void; +use std::sync::Arc; -use abi_stable::{StableAbi, std_types::RVec}; +use abi_stable::StableAbi; +use abi_stable::std_types::RVec; use async_ffi::{FfiFuture, FutureExt}; use async_trait::async_trait; -use datafusion_execution::{TaskContext, TaskContextProvider}; +use datafusion_common::DataFusionError; +use datafusion_execution::TaskContext; use datafusion_expr::LogicalPlan; use datafusion_physical_plan::ExecutionPlan; -use datafusion_proto::{ - bytes::{ - logical_plan_from_bytes_with_extension_codec, - logical_plan_to_bytes_with_extension_codec, - }, - logical_plan::{DefaultLogicalExtensionCodec, LogicalExtensionCodec}, +use datafusion_proto::bytes::{ + logical_plan_from_bytes_with_extension_codec, + logical_plan_to_bytes_with_extension_codec, +}; +use datafusion_proto::logical_plan::{ + DefaultLogicalExtensionCodec, LogicalExtensionCodec, }; use datafusion_session::{QueryPlanner, Session}; +use tokio::runtime::Handle; -use crate::{ - df_result, - execution::FFI_TaskContextProvider, - execution_plan::FFI_ExecutionPlan, - proto::logical_extension_codec::FFI_LogicalExtensionCodec, - rresult, rresult_return, - session::{FFI_SessionRef, ForeignSession}, - util::FFIResult, -}; +use crate::execution::FFI_TaskContextProvider; +use crate::execution_plan::FFI_ExecutionPlan; +use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec; +use crate::session::{FFI_SessionRef, ForeignSession}; +use crate::util::FFIResult; +use crate::{df_result, rresult, rresult_return}; #[repr(C)] #[derive(Debug, StableAbi)] @@ -52,10 +53,13 @@ pub struct FFI_QueryPlanner { create_physical_plan: unsafe extern "C" fn( &Self, logical_plan_serialized: RVec, - session: &FFI_SessionRef, + session: FFI_SessionRef, ) -> FfiFuture>, + /// Logical codec used to provide encoding and decoding of plans. + pub logical_codec: FFI_LogicalExtensionCodec, + /// Release the memory of the private data when it is no longer being used. pub release: unsafe extern "C" fn(arg: &mut Self), @@ -100,16 +104,25 @@ unsafe extern "C" fn release_fn_wrapper(ctx: &mut FFI_QueryPlanner) { unsafe extern "C" fn create_physical_plan_fn_wrapper( planner: &FFI_QueryPlanner, logical_plan_serialized: RVec, - session: &FFI_SessionRef, + session: FFI_SessionRef, ) -> FfiFuture> { unsafe { let planner = Arc::clone(planner.inner()); let codec: Arc = (&session.logical_codec).into(); let runtime = session.runtime().clone(); - let session = ForeignSession::try_from(session); async move { - let session = rresult_return!(session); + let mut foreign_session = None; + let session = rresult_return!( + session + .as_local() + .map(Ok::<&(dyn Session + Send + Sync), DataFusionError>) + .unwrap_or_else(|| { + foreign_session = Some(ForeignSession::try_from(&session)?); + Ok(foreign_session.as_ref().unwrap()) + }) + ); + let task_ctx: Arc = session.task_ctx(); let logical_plan = @@ -120,7 +133,7 @@ unsafe extern "C" fn create_physical_plan_fn_wrapper( )); let physical_plan = - planner.create_physical_plan(&logical_plan, &session).await; + planner.create_physical_plan(&logical_plan, session).await; rresult!(physical_plan.map(|plan| FFI_ExecutionPlan::new(plan, runtime))) } @@ -129,8 +142,9 @@ unsafe extern "C" fn create_physical_plan_fn_wrapper( } unsafe extern "C" fn clone_fn_wrapper(planner: &FFI_QueryPlanner) -> FFI_QueryPlanner { + let codec = planner.logical_codec.clone(); let planner = Arc::clone(planner.inner()); - planner.into() + FFI_QueryPlanner::new_with_ffi_codec(planner, codec) } impl Drop for FFI_QueryPlanner { @@ -139,58 +153,60 @@ impl Drop for FFI_QueryPlanner { } } -impl Clone for FFI_QueryPlanner { - fn clone(&self) -> Self { - unsafe { (self.clone)(self) } +impl FFI_QueryPlanner { + pub fn new( + planner: Arc, + runtime: Option<&Handle>, + task_ctx_provider: impl Into, + logical_codec: Option>, + ) -> Self { + let logical_codec = + logical_codec.unwrap_or_else(|| Arc::new(DefaultLogicalExtensionCodec {})); + let logical_codec = FFI_LogicalExtensionCodec::new( + logical_codec, + runtime.cloned(), + task_ctx_provider.into(), + ); + + Self::new_with_ffi_codec(planner, logical_codec) } -} - -#[derive(Debug)] -pub struct ForeignQueryPlanner( - pub FFI_QueryPlanner, - pub Arc, -); -impl From> for FFI_QueryPlanner { - fn from(planner: Arc) -> Self { + pub fn new_with_ffi_codec( + planner: Arc, + codec: FFI_LogicalExtensionCodec, + ) -> Self { let private_data = Box::new(QueryPlannerPrivateData { planner }); - FFI_QueryPlanner { + Self { create_physical_plan: create_physical_plan_fn_wrapper, + logical_codec: codec, + clone: clone_fn_wrapper, release: release_fn_wrapper, private_data: Box::into_raw(private_data) as *mut c_void, library_marker_id: crate::get_library_marker_id, - clone: clone_fn_wrapper, } } } +impl Clone for FFI_QueryPlanner { + fn clone(&self) -> Self { + unsafe { (self.clone)(self) } + } +} + +#[derive(Debug)] +pub struct ForeignQueryPlanner(pub FFI_QueryPlanner); + impl From<&FFI_QueryPlanner> for Arc { fn from(planner: &FFI_QueryPlanner) -> Self { if (planner.library_marker_id)() == crate::get_library_marker_id() { Arc::clone(planner.inner()) } else { - Arc::new(ForeignQueryPlanner( - planner.clone(), - Arc::new(DefaultLogicalExtensionCodec {}), - )) + Arc::new(ForeignQueryPlanner(planner.clone())) } } } -impl ForeignQueryPlanner { - pub fn new(planner: FFI_QueryPlanner) -> Self { - Self(planner, Arc::new(DefaultLogicalExtensionCodec {})) - } - - pub fn new_with_logical_codec( - planner: FFI_QueryPlanner, - codec: Arc, - ) -> Self { - Self(planner, codec) - } -} - #[async_trait] impl QueryPlanner for ForeignQueryPlanner { /// Given a [`LogicalPlan`], create an [`ExecutionPlan`] suitable for execution @@ -199,27 +215,18 @@ impl QueryPlanner for ForeignQueryPlanner { logical_plan: &LogicalPlan, session_state: &dyn Session, ) -> datafusion_common::Result> { + let codec: Arc = (&self.0.logical_codec).into(); let logical_plan_buf = - logical_plan_to_bytes_with_extension_codec(logical_plan, self.1.as_ref())?; - let task_ctx = session_state.task_ctx(); + logical_plan_to_bytes_with_extension_codec(logical_plan, codec.as_ref())?; - // I'm not sure if there is better way to extract - // context provider - let task_ctx_provider: Arc = - Arc::new(ConstantContextProvider { ctx: task_ctx }); - - let task_ctx_provider: FFI_TaskContextProvider = (&task_ctx_provider).into(); - - let logical_codec = - FFI_LogicalExtensionCodec::new(Arc::clone(&self.1), None, task_ctx_provider); - - let session_ref = FFI_SessionRef::new(session_state, None, logical_codec); + let session_ref = + FFI_SessionRef::new(session_state, None, self.0.logical_codec.clone()); let plan = df_result!(unsafe { (self.0.create_physical_plan)( &self.0, logical_plan_buf.as_ref().into(), - &session_ref, + session_ref, ) .await })?; @@ -228,26 +235,16 @@ impl QueryPlanner for ForeignQueryPlanner { } } -// this is temporary if there is better way to do this -struct ConstantContextProvider { - ctx: Arc, -} - -impl TaskContextProvider for ConstantContextProvider { - fn task_ctx(&self) -> Arc { - Arc::clone(&self.ctx) - } -} - #[cfg(test)] mod tests { use std::sync::Arc; use datafusion_expr::LogicalPlan; - use datafusion_physical_plan::{ExecutionPlan, empty::EmptyExec}; + use datafusion_physical_plan::ExecutionPlan; + use datafusion_physical_plan::empty::EmptyExec; use datafusion_session::{QueryPlanner, Session}; - use crate::session::planner::{FFI_QueryPlanner, ForeignQueryPlanner}; + use crate::session::planner::FFI_QueryPlanner; #[derive(Debug, Default)] struct DummyPlanner {} @@ -267,15 +264,18 @@ mod tests { #[tokio::test] async fn test_end_to_end() -> datafusion::common::Result<()> { - let (ctx, _) = crate::util::tests::test_session_and_ctx(); + let (ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx(); let df = ctx.sql("select 1 as i").await?; let logical_plan = df.logical_plan(); let planner: Arc = Arc::new(DummyPlanner::default()); - let ffi_planner: FFI_QueryPlanner = planner.into(); - let foreign_planner: ForeignQueryPlanner = ForeignQueryPlanner::new(ffi_planner); + let mut ffi_planner = + FFI_QueryPlanner::new(planner, None, task_ctx_provider, None); + ffi_planner.library_marker_id = crate::mock_foreign_marker_id; + + let foreign_planner: Arc = (&ffi_planner).into(); let empty_exec = foreign_planner .create_physical_plan(logical_plan, &ctx.state()) From 855f902699ab9450707ef1cf642cf75dbb62b10e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Tue, 10 Feb 2026 22:20:55 +0000 Subject: [PATCH 4/5] add missing license header --- datafusion/ffi/src/session/planner.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/datafusion/ffi/src/session/planner.rs b/datafusion/ffi/src/session/planner.rs index a056ded15f682..46dc0358a07ea 100644 --- a/datafusion/ffi/src/session/planner.rs +++ b/datafusion/ffi/src/session/planner.rs @@ -14,6 +14,23 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use std::ffi::c_void; use std::sync::Arc; From 5af7c18a15a8732a1f5306de6edcae364472ded5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Tue, 10 Feb 2026 22:25:52 +0000 Subject: [PATCH 5/5] add missing license header --- datafusion/ffi/src/session/planner.rs | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/datafusion/ffi/src/session/planner.rs b/datafusion/ffi/src/session/planner.rs index 46dc0358a07ea..a4619126485cc 100644 --- a/datafusion/ffi/src/session/planner.rs +++ b/datafusion/ffi/src/session/planner.rs @@ -14,22 +14,6 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. use std::ffi::c_void; use std::sync::Arc;