diff --git a/datafusion-examples/examples/dataframe/cache_factory.rs b/datafusion-examples/examples/dataframe/cache_factory.rs index a92c3dc4ce26a..ee0b2ff2b4546 100644 --- a/datafusion-examples/examples/dataframe/cache_factory.rs +++ b/datafusion-examples/examples/dataframe/cache_factory.rs @@ -23,6 +23,7 @@ use std::sync::{Arc, RwLock}; use arrow::array::RecordBatch; use async_trait::async_trait; +use datafusion::catalog::Session; use datafusion::catalog::memory::MemorySourceConfig; use datafusion::common::DFSchemaRef; use datafusion::error::Result; @@ -37,7 +38,7 @@ use datafusion::physical_planner::{ DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner, }; use datafusion::prelude::*; -use datafusion_common::HashMap; +use datafusion_common::{HashMap, exec_datafusion_err}; use datafusion_examples::utils::{datasets::ExampleDataset, write_csv_to_parquet}; /// This example demonstrates how to leverage [CacheFactory] to implement custom caching strategies for dataframes in DataFusion. @@ -198,8 +199,16 @@ impl QueryPlanner for CacheNodeQueryPlanner { async fn create_physical_plan( &self, logical_plan: &LogicalPlan, - session_state: &SessionState, + session: &dyn Session, ) -> Result> { + let session_state = + session + .as_any() + .downcast_ref::() + .ok_or_else(|| { + exec_datafusion_err!("Failed to downcast Session to SessionState") + })?; + let physical_planner = DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new( CacheNodePlanner { diff --git a/datafusion-examples/examples/relation_planner/table_sample.rs b/datafusion-examples/examples/relation_planner/table_sample.rs index 657432ef31362..7841af7c096cb 100644 --- a/datafusion-examples/examples/relation_planner/table_sample.rs +++ b/datafusion-examples/examples/relation_planner/table_sample.rs @@ -101,7 +101,9 @@ use futures::{ use rand::{Rng, SeedableRng, rngs::StdRng}; use tonic::async_trait; -use datafusion::optimizer::simplify_expressions::simplify_literal::parse_literal; +use datafusion::{ + catalog::Session, optimizer::simplify_expressions::simplify_literal::parse_literal, +}; use datafusion::{ execution::{ RecordBatchStream, SendableRecordBatchStream, SessionState, SessionStateBuilder, @@ -116,8 +118,8 @@ use datafusion::{ prelude::*, }; use datafusion_common::{ - DFSchemaRef, DataFusionError, Result, Statistics, internal_err, not_impl_err, - plan_datafusion_err, plan_err, + DFSchemaRef, DataFusionError, Result, Statistics, exec_datafusion_err, internal_err, + not_impl_err, plan_datafusion_err, plan_err, }; use datafusion_expr::{ UserDefinedLogicalNode, UserDefinedLogicalNodeCore, @@ -564,8 +566,16 @@ impl QueryPlanner for TableSampleQueryPlanner { async fn create_physical_plan( &self, logical_plan: &LogicalPlan, - session_state: &SessionState, + session: &dyn Session, ) -> Result> { + let session_state = + session + .as_any() + .downcast_ref::() + .ok_or_else(|| { + exec_datafusion_err!("Failed to downcast Session to SessionState") + })?; + let planner = DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new( TableSampleExtensionPlanner, )]); diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index b6c606ff467f9..08b1046736aef 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -56,6 +56,7 @@ use crate::{ // backwards compatibility pub use crate::execution::session_state::SessionState; +pub use datafusion_session::QueryPlanner; use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; @@ -1970,17 +1971,6 @@ impl From for SessionStateBuilder { } } -/// A planner used to add extensions to DataFusion logical and physical plans. -#[async_trait] -pub trait QueryPlanner: Debug { - /// Given a [`LogicalPlan`], create an [`ExecutionPlan`] suitable for execution - async fn create_physical_plan( - &self, - logical_plan: &LogicalPlan, - session_state: &SessionState, - ) -> Result>; -} - /// Interface for handling `CREATE FUNCTION` statements and interacting with /// [SessionState] to create and register functions ([`ScalarUDF`], /// [`AggregateUDF`], [`WindowUDF`], and [`TableFunctionImpl`]) dynamically. @@ -2162,6 +2152,7 @@ mod tests { use crate::test_util::{plan_and_collect, populate_csv_partitions}; use arrow::datatypes::{DataType, TimeUnit}; use datafusion_common::DataFusionError; + use datafusion_session::Session; use std::error::Error; use std::path::PathBuf; @@ -2643,8 +2634,14 @@ mod tests { async fn create_physical_plan( &self, logical_plan: &LogicalPlan, - session_state: &SessionState, + session: &dyn Session, ) -> Result> { + let session_state = session + .as_any() + .downcast_ref::() + .ok_or_else(|| { + exec_datafusion_err!("Failed to downcast Session to SessionState") + })?; let physical_planner = MyPhysicalPlanner {}; physical_planner .create_physical_plan(logical_plan, session_state) diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index 9560616c1b6da..16ead50bfe7dc 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -28,7 +28,7 @@ use crate::datasource::file_format::FileFormatFactory; #[cfg(feature = "sql")] use crate::datasource::provider_as_source; use crate::execution::SessionStateDefaults; -use crate::execution::context::{EmptySerializerRegistry, FunctionFactory, QueryPlanner}; +use crate::execution::context::{EmptySerializerRegistry, FunctionFactory}; use crate::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}; use arrow_schema::{DataType, FieldRef}; use datafusion_catalog::MemoryCatalogProviderList; @@ -44,7 +44,7 @@ use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan}; use datafusion_common::tree_node::TreeNode; use datafusion_common::{ DFSchema, DataFusionError, ResolvedTableReference, TableReference, config_err, - exec_err, plan_datafusion_err, + exec_datafusion_err, exec_err, plan_datafusion_err, }; use datafusion_execution::TaskContext; use datafusion_execution::config::SessionConfig; @@ -68,7 +68,7 @@ use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_optimizer::optimizer::PhysicalOptimizer; use datafusion_physical_plan::ExecutionPlan; -use datafusion_session::Session; +use datafusion_session::{QueryPlanner, Session}; #[cfg(feature = "sql")] use datafusion_sql::{ parser::{DFParserBuilder, Statement}, @@ -2115,8 +2115,15 @@ impl QueryPlanner for DefaultQueryPlanner { async fn create_physical_plan( &self, logical_plan: &LogicalPlan, - session_state: &SessionState, + session: &dyn Session, ) -> datafusion_common::Result> { + let session_state = + session + .as_any() + .downcast_ref::() + .ok_or_else(|| { + exec_datafusion_err!("Failed to downcast Session to SessionState") + })?; let planner = DefaultPhysicalPlanner::default(); planner .create_physical_plan(logical_plan, session_state) diff --git a/datafusion/core/tests/user_defined/user_defined_plan.rs b/datafusion/core/tests/user_defined/user_defined_plan.rs index d53e076739608..11ddc89c3aaf7 100644 --- a/datafusion/core/tests/user_defined/user_defined_plan.rs +++ b/datafusion/core/tests/user_defined/user_defined_plan.rs @@ -91,7 +91,9 @@ use datafusion::{ }; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::{ScalarValue, assert_eq_or_internal_err, assert_or_internal_err}; +use datafusion_common::{ + ScalarValue, assert_eq_or_internal_err, assert_or_internal_err, exec_datafusion_err, +}; use datafusion_expr::{FetchType, InvariantLevel, Projection, SortExpr}; use datafusion_optimizer::AnalyzerRule; use datafusion_optimizer::optimizer::ApplyOrder; @@ -99,6 +101,7 @@ use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; use async_trait::async_trait; use datafusion_common::cast::as_string_view_array; +use datafusion_session::Session; use futures::{Stream, StreamExt}; /// Execute the specified sql and return the resulting record batches @@ -466,8 +469,16 @@ impl QueryPlanner for TopKQueryPlanner { async fn create_physical_plan( &self, logical_plan: &LogicalPlan, - session_state: &SessionState, + session: &dyn Session, ) -> Result> { + let session_state = + session + .as_any() + .downcast_ref::() + .ok_or_else(|| { + exec_datafusion_err!("Failed to downcast Session to SessionState") + })?; + // Teach the default physical planner how to plan TopK nodes. let physical_planner = DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new( diff --git a/datafusion/execution/src/task.rs b/datafusion/execution/src/task.rs index 38f31cf4629eb..823e8f76fd30c 100644 --- a/datafusion/execution/src/task.rs +++ b/datafusion/execution/src/task.rs @@ -212,7 +212,7 @@ impl FunctionRegistry for TaskContext { } /// Produce the [`TaskContext`]. -pub trait TaskContextProvider { +pub trait TaskContextProvider: Sync + Send { fn task_ctx(&self) -> Arc; } diff --git a/datafusion/ffi/src/session/mod.rs b/datafusion/ffi/src/session/mod.rs index aa910abb9149a..0c8169345c4ce 100644 --- a/datafusion/ffi/src/session/mod.rs +++ b/datafusion/ffi/src/session/mod.rs @@ -38,7 +38,9 @@ use datafusion_expr::{ }; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_plan::ExecutionPlan; -use datafusion_proto::bytes::{logical_plan_from_bytes, logical_plan_to_bytes}; +use datafusion_proto::bytes::{ + logical_plan_from_bytes, logical_plan_to_bytes_with_extension_codec, +}; use datafusion_proto::logical_plan::LogicalExtensionCodec; use datafusion_proto::logical_plan::from_proto::parse_expr; use datafusion_proto::logical_plan::to_proto::serialize_expr; @@ -60,6 +62,7 @@ use crate::util::FFIResult; use crate::{df_result, rresult, rresult_return}; pub mod config; +pub mod planner; /// A stable struct for sharing [`Session`] across FFI boundaries. /// @@ -467,8 +470,10 @@ impl Session for ForeignSession { &self, logical_plan: &LogicalPlan, ) -> datafusion_common::Result> { + let codec: Arc = (&self.session.logical_codec).into(); unsafe { - let logical_plan = logical_plan_to_bytes(logical_plan)?; + let logical_plan = + logical_plan_to_bytes_with_extension_codec(logical_plan, codec.as_ref())?; let physical_plan = df_result!( (self.session.create_physical_plan)( &self.session, diff --git a/datafusion/ffi/src/session/planner.rs b/datafusion/ffi/src/session/planner.rs new file mode 100644 index 0000000000000..a4619126485cc --- /dev/null +++ b/datafusion/ffi/src/session/planner.rs @@ -0,0 +1,288 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::ffi::c_void; +use std::sync::Arc; + +use abi_stable::StableAbi; +use abi_stable::std_types::RVec; +use async_ffi::{FfiFuture, FutureExt}; +use async_trait::async_trait; +use datafusion_common::DataFusionError; +use datafusion_execution::TaskContext; +use datafusion_expr::LogicalPlan; +use datafusion_physical_plan::ExecutionPlan; +use datafusion_proto::bytes::{ + logical_plan_from_bytes_with_extension_codec, + logical_plan_to_bytes_with_extension_codec, +}; +use datafusion_proto::logical_plan::{ + DefaultLogicalExtensionCodec, LogicalExtensionCodec, +}; +use datafusion_session::{QueryPlanner, Session}; +use tokio::runtime::Handle; + +use crate::execution::FFI_TaskContextProvider; +use crate::execution_plan::FFI_ExecutionPlan; +use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec; +use crate::session::{FFI_SessionRef, ForeignSession}; +use crate::util::FFIResult; +use crate::{df_result, rresult, rresult_return}; + +#[repr(C)] +#[derive(Debug, StableAbi)] +pub struct FFI_QueryPlanner { + // it would make sense, for ballista, to get access to this method. + // as the plan is going to be decode at the scheduler side. + // + // at the moment,FFI_SessionRef is not public, so we'd need to change it + /// Given a [`LogicalPlan`], create an [`ExecutionPlan`] suitable for execution + create_physical_plan: unsafe extern "C" fn( + &Self, + logical_plan_serialized: RVec, + session: FFI_SessionRef, + ) + -> FfiFuture>, + + /// Logical codec used to provide encoding and decoding of plans. + pub logical_codec: FFI_LogicalExtensionCodec, + + /// Release the memory of the private data when it is no longer being used. + pub release: unsafe extern "C" fn(arg: &mut Self), + + /// Internal data. This is only to be accessed by the planner of the plan. + /// The foreign library should never attempt to access this data. + pub private_data: *mut c_void, + + /// Used to create a clone on the planner . This should + /// only need to be called by the receiver of the plan. + pub clone: unsafe extern "C" fn(plan: &Self) -> Self, + + /// Utility to identify when FFI objects are accessed locally through + /// the foreign interface. See [`crate::get_library_marker_id`] and + /// the crate's `README.md` for more information. + pub library_marker_id: extern "C" fn() -> usize, +} + +unsafe impl Send for FFI_QueryPlanner {} +unsafe impl Sync for FFI_QueryPlanner {} + +struct QueryPlannerPrivateData { + planner: Arc, +} + +impl FFI_QueryPlanner { + fn inner(&self) -> &Arc { + unsafe { + let private_data = self.private_data as *const QueryPlannerPrivateData; + &(*private_data).planner + } + } +} + +unsafe extern "C" fn release_fn_wrapper(ctx: &mut FFI_QueryPlanner) { + unsafe { + let private_data = + Box::from_raw(ctx.private_data as *mut QueryPlannerPrivateData); + drop(private_data); + } +} + +unsafe extern "C" fn create_physical_plan_fn_wrapper( + planner: &FFI_QueryPlanner, + logical_plan_serialized: RVec, + session: FFI_SessionRef, +) -> FfiFuture> { + unsafe { + let planner = Arc::clone(planner.inner()); + let codec: Arc = (&session.logical_codec).into(); + let runtime = session.runtime().clone(); + + async move { + let mut foreign_session = None; + let session = rresult_return!( + session + .as_local() + .map(Ok::<&(dyn Session + Send + Sync), DataFusionError>) + .unwrap_or_else(|| { + foreign_session = Some(ForeignSession::try_from(&session)?); + Ok(foreign_session.as_ref().unwrap()) + }) + ); + + let task_ctx: Arc = session.task_ctx(); + + let logical_plan = + rresult_return!(logical_plan_from_bytes_with_extension_codec( + logical_plan_serialized.as_slice(), + task_ctx.as_ref(), + codec.as_ref() + )); + + let physical_plan = + planner.create_physical_plan(&logical_plan, session).await; + + rresult!(physical_plan.map(|plan| FFI_ExecutionPlan::new(plan, runtime))) + } + .into_ffi() + } +} + +unsafe extern "C" fn clone_fn_wrapper(planner: &FFI_QueryPlanner) -> FFI_QueryPlanner { + let codec = planner.logical_codec.clone(); + let planner = Arc::clone(planner.inner()); + FFI_QueryPlanner::new_with_ffi_codec(planner, codec) +} + +impl Drop for FFI_QueryPlanner { + fn drop(&mut self) { + unsafe { (self.release)(self) } + } +} + +impl FFI_QueryPlanner { + pub fn new( + planner: Arc, + runtime: Option<&Handle>, + task_ctx_provider: impl Into, + logical_codec: Option>, + ) -> Self { + let logical_codec = + logical_codec.unwrap_or_else(|| Arc::new(DefaultLogicalExtensionCodec {})); + let logical_codec = FFI_LogicalExtensionCodec::new( + logical_codec, + runtime.cloned(), + task_ctx_provider.into(), + ); + + Self::new_with_ffi_codec(planner, logical_codec) + } + + pub fn new_with_ffi_codec( + planner: Arc, + codec: FFI_LogicalExtensionCodec, + ) -> Self { + let private_data = Box::new(QueryPlannerPrivateData { planner }); + + Self { + create_physical_plan: create_physical_plan_fn_wrapper, + logical_codec: codec, + clone: clone_fn_wrapper, + release: release_fn_wrapper, + private_data: Box::into_raw(private_data) as *mut c_void, + library_marker_id: crate::get_library_marker_id, + } + } +} + +impl Clone for FFI_QueryPlanner { + fn clone(&self) -> Self { + unsafe { (self.clone)(self) } + } +} + +#[derive(Debug)] +pub struct ForeignQueryPlanner(pub FFI_QueryPlanner); + +impl From<&FFI_QueryPlanner> for Arc { + fn from(planner: &FFI_QueryPlanner) -> Self { + if (planner.library_marker_id)() == crate::get_library_marker_id() { + Arc::clone(planner.inner()) + } else { + Arc::new(ForeignQueryPlanner(planner.clone())) + } + } +} + +#[async_trait] +impl QueryPlanner for ForeignQueryPlanner { + /// Given a [`LogicalPlan`], create an [`ExecutionPlan`] suitable for execution + async fn create_physical_plan( + &self, + logical_plan: &LogicalPlan, + session_state: &dyn Session, + ) -> datafusion_common::Result> { + let codec: Arc = (&self.0.logical_codec).into(); + let logical_plan_buf = + logical_plan_to_bytes_with_extension_codec(logical_plan, codec.as_ref())?; + + let session_ref = + FFI_SessionRef::new(session_state, None, self.0.logical_codec.clone()); + + let plan = df_result!(unsafe { + (self.0.create_physical_plan)( + &self.0, + logical_plan_buf.as_ref().into(), + session_ref, + ) + .await + })?; + + Ok((&plan).try_into()?) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use datafusion_expr::LogicalPlan; + use datafusion_physical_plan::ExecutionPlan; + use datafusion_physical_plan::empty::EmptyExec; + use datafusion_session::{QueryPlanner, Session}; + + use crate::session::planner::FFI_QueryPlanner; + + #[derive(Debug, Default)] + struct DummyPlanner {} + + #[async_trait::async_trait] + impl QueryPlanner for DummyPlanner { + async fn create_physical_plan( + &self, + logical_plan: &LogicalPlan, + _session_state: &dyn Session, + ) -> datafusion_common::Result> { + let schema = logical_plan.schema().as_arrow().clone(); + // will need better test + Ok(Arc::new(EmptyExec::new(Arc::new(schema)))) + } + } + + #[tokio::test] + async fn test_end_to_end() -> datafusion::common::Result<()> { + let (ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx(); + + let df = ctx.sql("select 1 as i").await?; + let logical_plan = df.logical_plan(); + + let planner: Arc = Arc::new(DummyPlanner::default()); + + let mut ffi_planner = + FFI_QueryPlanner::new(planner, None, task_ctx_provider, None); + ffi_planner.library_marker_id = crate::mock_foreign_marker_id; + + let foreign_planner: Arc = (&ffi_planner).into(); + + let empty_exec = foreign_planner + .create_physical_plan(logical_plan, &ctx.state()) + .await?; + + assert!(empty_exec.as_any().downcast_ref::().is_some()); + Ok(()) + } +} diff --git a/datafusion/session/src/lib.rs b/datafusion/session/src/lib.rs index 11f734e757452..7d356586837b1 100644 --- a/datafusion/session/src/lib.rs +++ b/datafusion/session/src/lib.rs @@ -36,6 +36,8 @@ //! * Runtime environment configuration //! * Query state persistence +pub mod planner; pub mod session; +pub use crate::planner::QueryPlanner; pub use crate::session::{Session, SessionStore}; diff --git a/datafusion/session/src/planner.rs b/datafusion/session/src/planner.rs new file mode 100644 index 0000000000000..06d7415ce4a4a --- /dev/null +++ b/datafusion/session/src/planner.rs @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::Session; +use async_trait::async_trait; +use datafusion_expr::LogicalPlan; +use datafusion_physical_plan::ExecutionPlan; +use std::fmt::Debug; +use std::sync::Arc; + +/// A planner used to add extensions to DataFusion logical and physical plans. +#[async_trait] +pub trait QueryPlanner: Debug + Send + Sync { + /// Given a [`LogicalPlan`], create an [`ExecutionPlan`] suitable for execution + async fn create_physical_plan( + &self, + logical_plan: &LogicalPlan, + session_state: &dyn Session, + ) -> datafusion_common::Result>; +}