From 3c67aaced45d69aa65e4e93c37fc68131216ec6f Mon Sep 17 00:00:00 2001 From: Alexandr Romanenko Date: Mon, 14 Apr 2025 20:48:40 +0200 Subject: [PATCH 1/4] feat(tesseract): Logical plan --- rust/cubesqlplanner/cubesqlplanner/src/lib.rs | 2 + .../aggregate_multiplied_subquery.rs | 54 + .../cubesqlplanner/src/logical_plan/cube.rs | 16 + .../src/logical_plan/dimension_subquery.rs | 29 + .../cubesqlplanner/src/logical_plan/filter.rs | 51 + .../src/logical_plan/full_key_aggregate.rs | 56 + .../logical_plan/full_key_aggregate_query.rs | 49 + .../cubesqlplanner/src/logical_plan/join.rs | 74 ++ .../src/logical_plan/keys_subquery.rs | 45 + .../cubesqlplanner/src/logical_plan/mod.rs | 31 + .../logical_plan/multistage/calculation.rs | 78 ++ .../src/logical_plan/multistage/common.rs | 54 + .../logical_plan/multistage/get_date_range.rs | 26 + .../logical_plan/multistage/leaf_measure.rs | 37 + .../src/logical_plan/multistage/member.rs | 35 + .../src/logical_plan/multistage/mod.rs | 15 + .../logical_plan/multistage/rolling_window.rs | 85 ++ .../logical_plan/multistage/time_series.rs | 22 + .../src/logical_plan/pretty_print.rs | 128 +++ .../cubesqlplanner/src/logical_plan/query.rs | 16 + .../logical_plan/regular_measures_query.rs | 47 + .../resolve_multiplied_measures.rs | 26 + .../cubesqlplanner/src/logical_plan/schema.rs | 66 ++ .../src/logical_plan/simple_query.rs | 47 + .../src/physical_plan_builder/builder.rs | 980 ++++++++++++++++++ .../src/physical_plan_builder/mod.rs | 2 + .../cubesqlplanner/src/plan/builder/join.rs | 31 +- .../cubesqlplanner/src/plan/from.rs | 5 + .../cubesqlplanner/src/planner/base_member.rs | 61 +- .../cubesqlplanner/src/planner/base_query.rs | 10 +- .../src/planner/base_time_dimension.rs | 4 +- .../cubesqlplanner/src/planner/mod.rs | 2 +- .../planners/dimension_subquery_planner.rs | 62 ++ .../full_key_query_aggregate_planner.rs | 66 ++ .../src/planner/planners/join_planner.rs | 22 + .../planners/multi_stage/applied_state.rs | 10 +- .../planner/planners/multi_stage/member.rs | 8 +- .../multi_stage/member_query_planner.rs | 299 +++++- .../multi_stage/multi_stage_query_planner.rs | 82 +- .../multiplied_measures_query_planner.rs | 229 +++- .../src/planner/planners/query_planner.rs | 52 +- .../planner/planners/simple_query_planer.rs | 63 +- .../src/planner/query_properties.rs | 98 +- .../src/planner/sql_evaluator/compiler.rs | 14 + .../src/planner/sql_evaluator/dependecy.rs | 13 +- .../src/planner/sql_evaluator/sql_call.rs | 1 + .../sql_evaluator/symbols/member_symbol.rs | 37 + .../symbols/time_dimension_symbol.rs | 38 + 48 files changed, 3227 insertions(+), 51 deletions(-) create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/logical_plan/aggregate_multiplied_subquery.rs create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/logical_plan/cube.rs create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/logical_plan/dimension_subquery.rs create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/logical_plan/filter.rs create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/logical_plan/full_key_aggregate.rs create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/logical_plan/full_key_aggregate_query.rs create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/logical_plan/join.rs create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/logical_plan/keys_subquery.rs create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/logical_plan/mod.rs create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/calculation.rs create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/common.rs create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/get_date_range.rs create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/leaf_measure.rs create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/member.rs create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/mod.rs create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/rolling_window.rs create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/time_series.rs create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/logical_plan/pretty_print.rs create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/logical_plan/query.rs create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/logical_plan/regular_measures_query.rs create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/logical_plan/resolve_multiplied_measures.rs create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/logical_plan/schema.rs create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/logical_plan/simple_query.rs create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/builder.rs create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/mod.rs diff --git a/rust/cubesqlplanner/cubesqlplanner/src/lib.rs b/rust/cubesqlplanner/cubesqlplanner/src/lib.rs index 93a0defd82826..ab9b2e8b4268f 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/lib.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/lib.rs @@ -1,3 +1,5 @@ pub mod cube_bridge; +pub mod logical_plan; +pub mod physical_plan_builder; pub mod plan; pub mod planner; diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/aggregate_multiplied_subquery.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/aggregate_multiplied_subquery.rs new file mode 100644 index 0000000000000..ba2507547eddb --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/aggregate_multiplied_subquery.rs @@ -0,0 +1,54 @@ +use super::pretty_print::*; +use super::LogicalFilter; +use super::LogicalJoin; +use super::*; +use crate::plan::{Expr, Filter, FilterItem, MemberExpression}; +use crate::planner::sql_evaluator::MemberSymbol; +use crate::planner::query_properties::OrderByItem; +use std::rc::Rc; + +pub enum AggregateMultipliedSubquerySouce { + Cube(Rc) +} + +impl PrettyPrint for AggregateMultipliedSubquerySouce { + fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { + match self { + AggregateMultipliedSubquerySouce::Cube(cube) => { + result.println(&format!("Cube: {}", cube.cube.name()), state); + } + } + } +} + + + +pub struct AggregateMultipliedSubquery { + pub schema: Rc, + pub dimension_subqueries: Vec>, + pub keys_subquery: Rc, + pub source: Rc, + +} + +impl PrettyPrint for AggregateMultipliedSubquery { + fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { + result.println("AggregateMultipliedSubquery: ", state); + let state = state.new_level(); + let details_state = state.new_level(); + result.println("schema:", &state); + self.schema.pretty_print(result, &details_state); + if !self.dimension_subqueries.is_empty() { + result.println("dimension_subqueries:", &state); + for subquery in self.dimension_subqueries.iter() { + subquery.pretty_print(result, &details_state); + } + } + result.println("keys_subquery:", &state); + self.keys_subquery.pretty_print(result, &details_state); + result.println("source:", &state); + self.source.pretty_print(result, &details_state); + + } +} + diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/cube.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/cube.rs new file mode 100644 index 0000000000000..edc4dd14073d4 --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/cube.rs @@ -0,0 +1,16 @@ +use crate::planner::BaseCube; +use std::rc::Rc; +#[derive(Clone)] +pub struct Cube { + pub name: String, + pub cube: Rc, +} + +impl Cube { + pub fn new(cube: Rc) -> Rc { + Rc::new(Self { + name: cube.name().clone(), + cube, + }) + } +} diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/dimension_subquery.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/dimension_subquery.rs new file mode 100644 index 0000000000000..9e2eba2e2699e --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/dimension_subquery.rs @@ -0,0 +1,29 @@ +use super::pretty_print::*; +use super::*; +use super::LogicalSchema; +use crate::plan::{Expr, Filter, FilterItem, MemberExpression}; +use crate::planner::sql_evaluator::MemberSymbol; +use std::rc::Rc; + +pub struct DimensionSubQuery { + pub query: Rc, + pub primary_keys_dimensions: Vec>, + pub subquery_dimension: Rc, + pub measure_for_subquery_dimension: Rc, +} + +impl PrettyPrint for DimensionSubQuery { + fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { + result.println("DimensionSubQuery: ", state); + let state = state.new_level(); + let details_state = state.new_level(); + result.println(&format!("query: "), &state); + self.query.pretty_print(result, &details_state); + result.println( + &format!("-primary_keys_dimensions: {}", print_symbols(&self.primary_keys_dimensions)), + &state, + ); + result.println(&format!("-subquery_dimension: {}", self.subquery_dimension.full_name()), &state); + result.println(&format!("-measure_for_subquery_dimension: {}", self.measure_for_subquery_dimension.full_name()), &state); + } +} \ No newline at end of file diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/filter.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/filter.rs new file mode 100644 index 0000000000000..6350d640ccf2d --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/filter.rs @@ -0,0 +1,51 @@ +use super::pretty_print::*; +use crate::plan::{Expr, Filter, FilterItem, MemberExpression}; +use crate::planner::sql_evaluator::MemberSymbol; +use std::rc::Rc; +use itertools::Itertools; + +pub struct LogicalFilter { + pub dimensions_filters: Vec, + pub time_dimensions_filters: Vec, + pub measures_filter: Vec, + pub segments: Vec, +} + +impl LogicalFilter { + pub fn all_filters(&self) -> Option { + let items = self + .time_dimensions_filters + .iter() + .chain(self.dimensions_filters.iter()) + .chain(self.segments.iter()) + .cloned() + .collect_vec(); + if items.is_empty() { + None + } else { + Some(Filter { items }) + } + } +} + +impl PrettyPrint for LogicalFilter { + fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { + let details_state = state.new_level(); + result.println("dimensions_filters:", &state); + for filter in self.dimensions_filters.iter() { + pretty_print_filter_item(result, &details_state, filter); + } + result.println("time_dimensions_filters:", &state); + for filter in self.time_dimensions_filters.iter() { + pretty_print_filter_item(result, &details_state, filter); + } + result.println("measures_filter:", &state); + for filter in self.measures_filter.iter() { + pretty_print_filter_item(result, &details_state, filter); + } + result.println("segments:", &state); + for filter in self.segments.iter() { + pretty_print_filter_item(result, &details_state, filter); + } + } +} diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/full_key_aggregate.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/full_key_aggregate.rs new file mode 100644 index 0000000000000..3d837660f32e9 --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/full_key_aggregate.rs @@ -0,0 +1,56 @@ +use super::*; +use crate::plan::{Expr, Filter, FilterItem, MemberExpression}; +use crate::planner::sql_evaluator::MemberSymbol; +use crate::planner::query_properties::OrderByItem; +use std::rc::Rc; + +pub struct MultiStageSubqueryRef { + pub name: String +} + +impl PrettyPrint for MultiStageSubqueryRef { + fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { + result.println(&format!("MultiStageSubqueryRef: {}", self.name), state); + } +} + + +pub enum FullKeyAggregateSource { + ResolveMultipliedMeasures(Rc), + MultiStageSubqueryRef(Rc) +} + +impl PrettyPrint for FullKeyAggregateSource { + fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { + match self { + Self::ResolveMultipliedMeasures(resolve_multiplied_measures) => { + resolve_multiplied_measures.pretty_print(result, state); + } + Self::MultiStageSubqueryRef(subquery_ref) => { + subquery_ref.pretty_print(result, state); + } + } + } +} + +pub struct FullKeyAggregate { + pub join_dimensions: Vec>, + pub sources: Vec +} + +impl PrettyPrint for FullKeyAggregate { + fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { + result.println("FullKeyAggregate: ", state); + let state = state.new_level(); + let details_state = state.new_level(); + result.println( + &format!("join_dimensions: {}", print_symbols(&self.join_dimensions)), + &state, + ); + result.println("sources:", &state); + for source in self.sources.iter() { + source.pretty_print(result, &details_state); + } + } +} + diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/full_key_aggregate_query.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/full_key_aggregate_query.rs new file mode 100644 index 0000000000000..389a46f8fc0fd --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/full_key_aggregate_query.rs @@ -0,0 +1,49 @@ +use super::*; +use std::rc::Rc; +use crate::planner::query_properties::OrderByItem; + +pub struct FullKeyAggregateQuery { + pub multistage_members: Vec>, + pub schema: Rc, + pub filter: Rc, + pub offset: Option, + pub limit: Option, + pub ungrouped: bool, + pub order_by: Vec, + pub source: Rc, +} + +impl PrettyPrint for FullKeyAggregateQuery { + fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { + result.println("FullKeyAggregateQuery: ", state); + let state = state.new_level(); + let details_state = state.new_level(); + if !self.multistage_members.is_empty() { + result.println("multistage_members:", &state); + for member in self.multistage_members.iter() { + member.pretty_print(result, &details_state); + } + } + + result.println("schema:", &state); + self.schema.pretty_print(result, &details_state); + result.println("filter:", &state); + self.filter.pretty_print(result, &details_state); + if let Some(offset) = &self.offset { + result.println(&format!("offset:{}", offset), &state); + } + if let Some(limit) = &self.limit { + result.println(&format!("limit:{}", limit), &state); + } + result.println(&format!("ungrouped:{}", self.ungrouped), &state); + if !self.order_by.is_empty() { + result.println("order_by:", &state); + for order_by in self.order_by.iter() { + result.println(&format!("{} {}", order_by.name(), if order_by.desc() { "desc" } else { "asc" }), &details_state); + } + } + result.println("source:", &state); + self.source.pretty_print(result, &details_state); + } +} + \ No newline at end of file diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/join.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/join.rs new file mode 100644 index 0000000000000..baac0baaf49ef --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/join.rs @@ -0,0 +1,74 @@ +use super::pretty_print::*; +use super::Cube; +use super::SimpleQuery; +use crate::planner::sql_evaluator::{MemberSymbol, SqlCall}; +use std::rc::Rc; + +#[derive(Clone)] +pub struct CubeJoinItem { + pub cube: Rc, + pub on_sql: Rc, +} + +impl PrettyPrint for CubeJoinItem { + fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { + result.println(&format!("CubeJoinItem for cube: {}", self.cube.name), state); + } +} + +#[derive(Clone)] +pub struct SubqueryDimensionJoinItem { + pub subquery: Rc, + pub dimension: Rc, + pub primary_keys_dimensions: Vec>, +} + +impl PrettyPrint for SubqueryDimensionJoinItem { + fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { + result.println( + &format!( + "SubqueryDimensionJoinItem for dimension `{}`: ", + self.dimension.full_name() + ), + state, + ); + result.println("subquery:", state); + result.println("primary_keys_dimensions:", state); + let state = state.new_level(); + for dim in self.primary_keys_dimensions.iter() { + result.println(&format!("- {}", dim.full_name()), &state); + } + } +} + +#[derive(Clone)] +pub enum LogicalJoinItem { + CubeJoinItem(CubeJoinItem), +} + +impl PrettyPrint for LogicalJoinItem { + fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { + match self { + LogicalJoinItem::CubeJoinItem(item) => item.pretty_print(result, state), + } + } +} + +#[derive(Clone)] +pub struct LogicalJoin { + pub root: Rc, + pub joins: Vec, +} + +impl PrettyPrint for LogicalJoin { + fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { + result.println(&format!("Join: "), state); + let state = state.new_level(); + result.println(&format!("root: {}", self.root.name), &state); + result.println(&format!("joins: "), &state); + let state = state.new_level(); + for join in self.joins.iter() { + join.pretty_print(result, &state); + } + } +} diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/keys_subquery.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/keys_subquery.rs new file mode 100644 index 0000000000000..6a06514aaa17f --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/keys_subquery.rs @@ -0,0 +1,45 @@ +use super::*; +use crate::planner::sql_evaluator::MemberSymbol; +use std::rc::Rc; + +#[derive(Clone)] +pub struct KeysSubQuery { + pub key_cube_name: String, + pub time_dimensions: Vec>, + pub dimensions: Vec>, + pub dimension_subqueries: Vec>, + pub primary_keys_dimensions: Vec>, + pub filter: Rc, + pub source: Rc, +} + +impl PrettyPrint for KeysSubQuery { + fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { + result.println("KeysSubQuery: ", state); + let state = state.new_level(); + let details_state = state.new_level(); + result.println(&format!("-key_cube_name: {}", self.key_cube_name), &state); + result.println( + &format!("-time_dimensions: {}", print_symbols(&self.time_dimensions)), + &state, + ); + result.println( + &format!("-dimensions: {}", print_symbols(&self.dimensions)), + &state, + ); +/* if !self.dimension_subqueries.is_empty() { + result.println("dimension_subqueries:", &state); + for subquery in self.dimension_subqueries.iter() { + subquery.pretty_print(result, &details_state); + } + } */ + result.println( + &format!("-primary_keys_dimensions: {}", print_symbols(&self.primary_keys_dimensions)), + &state, + ); + result.println("filters:", &state); + self.filter.pretty_print(result, &details_state); + result.println("source:", &state); + self.source.pretty_print(result, &details_state); + } +} diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/mod.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/mod.rs new file mode 100644 index 0000000000000..d5c6575348f0b --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/mod.rs @@ -0,0 +1,31 @@ +mod cube; +mod filter; +mod join; +mod pretty_print; +mod simple_query; +mod schema; +mod regular_measures_query; +mod keys_subquery; +mod aggregate_multiplied_subquery; +mod resolve_multiplied_measures; +mod full_key_aggregate; +mod full_key_aggregate_query; +mod query; +mod dimension_subquery; +mod multistage; + +pub use cube::*; +pub use filter::*; +pub use join::*; +pub use pretty_print::*; +pub use simple_query::*; +pub use dimension_subquery::*; +pub use schema::*; +pub use aggregate_multiplied_subquery::*; +pub use regular_measures_query::*; +pub use keys_subquery::*; +pub use resolve_multiplied_measures::*; +pub use full_key_aggregate::*; +pub use full_key_aggregate_query::*; +pub use query::*; +pub use multistage::*; \ No newline at end of file diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/calculation.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/calculation.rs new file mode 100644 index 0000000000000..3a7c727bc33f3 --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/calculation.rs @@ -0,0 +1,78 @@ +use crate::logical_plan::*; +use crate::planner::sql_evaluator::MeasureTimeShift; +use std::rc::Rc; +use crate::planner::query_properties::OrderByItem; + +#[derive(PartialEq)] +pub enum MultiStageCalculationType { + Rank, + Aggregate, + Calculate, +} + + +impl ToString for MultiStageCalculationType { + fn to_string(&self) -> String { + match self { + MultiStageCalculationType::Rank => "Rank".to_string(), + MultiStageCalculationType::Aggregate => "Aggregate".to_string(), + MultiStageCalculationType::Calculate => "Calculate".to_string(), + } + } +} + +#[derive(PartialEq)] +pub enum MultiStageCalculationWindowFunction { + Rank, + Window, + None +} + +impl ToString for MultiStageCalculationWindowFunction { + fn to_string(&self) -> String { + match self { + MultiStageCalculationWindowFunction::Rank => "Rank".to_string(), + MultiStageCalculationWindowFunction::Window => "Window".to_string(), + MultiStageCalculationWindowFunction::None => "None".to_string(), + } + } +} + +pub struct MultiStageMeasureCalculation { + pub schema: Rc, + pub is_ungrouped: bool, + pub calculation_type: MultiStageCalculationType, + pub partition_by: Vec, + pub window_function_to_use: MultiStageCalculationWindowFunction, + pub order_by: Vec, + pub source: Rc +} + +impl PrettyPrint for MultiStageMeasureCalculation { + fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { + result.println(&format!("Measure Calculation: {}", self.calculation_type.to_string()), state); + let state = state.new_level(); + let details_state = state.new_level(); + result.println("schema:", &state); + self.schema.pretty_print(result, &details_state); + if !self.partition_by.is_empty() { + result.println(&format!("partition_by: {}", self.partition_by.join(", ")), &state); + } + if self.window_function_to_use != MultiStageCalculationWindowFunction::None { + result.println(&format!("window_function_to_use: {}", self.window_function_to_use.to_string()), &state); + } + if self.is_ungrouped { + result.println("is_ungrouped: true", &state); + } + if !self.order_by.is_empty() { + result.println("order_by:", &state); + for order_by in self.order_by.iter() { + result.println(&format!("{} {}", order_by.name(), if order_by.desc() { "desc" } else { "asc" }), &details_state); + } + } + result.println("source:", &state); + self.source.pretty_print(result, &details_state); + } +} + + diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/common.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/common.rs new file mode 100644 index 0000000000000..de5ea11c21db1 --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/common.rs @@ -0,0 +1,54 @@ +use crate::logical_plan::pretty_print::*; +use crate::planner::planners::multi_stage::MultiStageAppliedState; + +use crate::planner::sql_evaluator::MemberSymbol; +use crate::plan::{FilterGroup, FilterItem}; +use crate::planner::filter::FilterOperator; +use crate::planner::sql_evaluator::MeasureTimeShift; +use crate::planner::{BaseDimension, BaseMember, BaseTimeDimension}; +use cubenativeutils::CubeError; +use itertools::Itertools; +use std::cmp::PartialEq; +use std::collections::HashMap; +use std::fmt::Debug; +use std::rc::Rc; + + +impl PrettyPrint for MultiStageAppliedState { + fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { + let details_state = state.new_level(); + result.println( + &format!("-time_dimensions: {}", print_symbols(&self.time_dimensions().iter().map(|d| d.member_evaluator()).collect_vec())), + state, + ); + + result.println( + &format!("-dimensions: {}", print_symbols(&self.dimensions().iter().map(|d| d.member_evaluator()).collect_vec())), + state, + ); + + result.println("dimensions_filters:", &state); + for filter in self.dimensions_filters().iter() { + pretty_print_filter_item(result, &details_state, filter); + } + result.println("time_dimensions_filters:", &state); + for filter in self.time_dimensions_filters().iter() { + pretty_print_filter_item(result, &details_state, filter); + } + result.println("measures_filter:", &state); + for filter in self.measures_filters().iter() { + pretty_print_filter_item(result, &details_state, filter); + } + result.println("segments:", &state); + for filter in self.segments().iter() { + pretty_print_filter_item(result, &details_state, filter); + } + + result.println("time_shifts:", &state); + for (_, time_shift) in self.time_shifts().iter() { + + result.println(&format!("- {}: {}", time_shift.dimension.full_name(), time_shift.interval.to_sql()), &details_state); + } + + } +} \ No newline at end of file diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/get_date_range.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/get_date_range.rs new file mode 100644 index 0000000000000..2912495752e52 --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/get_date_range.rs @@ -0,0 +1,26 @@ +use crate::logical_plan::*; +use crate::planner::sql_evaluator::MemberSymbol; +use std::rc::Rc; +pub struct MultiStageGetDateRange { + pub time_dimension: Rc, + pub dimension_subqueries: Vec>, + pub source: Rc, + +} + +impl PrettyPrint for MultiStageGetDateRange { + fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { + result.println("Get Date Range", state); + let state = state.new_level(); + let details_state = state.new_level(); + result.println(&format!("time_dimension: {}", self.time_dimension.full_name()), &details_state); + if !self.dimension_subqueries.is_empty() { + result.println("dimension_subqueries:", &state); + for subquery in self.dimension_subqueries.iter() { + subquery.pretty_print(result, &details_state); + } + } + result.println("source:", &state); + self.source.pretty_print(result, &details_state); + } +} \ No newline at end of file diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/leaf_measure.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/leaf_measure.rs new file mode 100644 index 0000000000000..9dc59f044bbed --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/leaf_measure.rs @@ -0,0 +1,37 @@ +use crate::planner::sql_evaluator::MeasureTimeShift; +use crate::logical_plan::*; +use crate::planner::sql_evaluator::MemberSymbol; +use std::collections::HashMap; +use std::rc::Rc; + +pub struct MultiStageLeafMeasure { + pub measure: Rc, + pub render_measure_as_state: bool, //Render measure as state, for example hll state for count_approx + pub render_measure_for_ungrouped: bool, + pub time_shifts: HashMap, + pub query: Rc, +} + +impl PrettyPrint for MultiStageLeafMeasure { + fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { + result.println("Leaf Measure Query", state); + let state = state.new_level(); + result.println(&format!("measure: {}", self.measure.full_name()), &state); + if self.render_measure_as_state { + result.println("render_measure_as_state: true", &state); + } + if self.render_measure_for_ungrouped { + result.println("render_measure_for_ungrouped: true", &state); + } + if !self.time_shifts.is_empty() { + result.println("time_shifts:", &state); + let details_state = state.new_level(); + for (_, time_shift) in self.time_shifts.iter() { + result.println(&format!("- {}: {}", time_shift.dimension.full_name(), time_shift.interval.to_sql()), &details_state); + } + } + result.println(&format!("query:"), &state); + let details_state = state.new_level(); + self.query.pretty_print(result, &details_state); + } +} \ No newline at end of file diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/member.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/member.rs new file mode 100644 index 0000000000000..fc13b3159cd44 --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/member.rs @@ -0,0 +1,35 @@ +use crate::logical_plan::*; + +pub enum MultiStageMemberLogicalType { + LeafMeasure(MultiStageLeafMeasure), + MeasureCalculation(MultiStageMeasureCalculation), + GetDateRange(MultiStageGetDateRange), + TimeSeries(MultiStageTimeSeries), + RollingWindow(MultiStageRollingWindow) +} + +impl PrettyPrint for MultiStageMemberLogicalType { + fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { + match self { + Self::LeafMeasure(measure) => measure.pretty_print(result, state), + Self::MeasureCalculation(calculation) => calculation.pretty_print(result, state), + Self::GetDateRange(get_date_range) => get_date_range.pretty_print(result, state), + Self::TimeSeries(time_series) => time_series.pretty_print(result, state), + Self::RollingWindow(rolling_window) => rolling_window.pretty_print(result, state), + } + } +} + +pub struct LogicalMultiStageMember { + pub name: String, + pub member_type: MultiStageMemberLogicalType, +} + +impl PrettyPrint for LogicalMultiStageMember { + fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { + + result.println(&format!("MultiStageMember `{}`: ", self.name), state); + let details_state = state.new_level(); + self.member_type.pretty_print(result, &details_state); + } +} diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/mod.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/mod.rs new file mode 100644 index 0000000000000..e006e4f7ebe16 --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/mod.rs @@ -0,0 +1,15 @@ +mod common; +mod member; +mod leaf_measure; +mod get_date_range; +mod time_series; +mod rolling_window; +mod calculation; + +pub use common::*; +pub use member::*; +pub use leaf_measure::*; +pub use get_date_range::*; +pub use time_series::*; +pub use rolling_window::*; +pub use calculation::*; \ No newline at end of file diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/rolling_window.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/rolling_window.rs new file mode 100644 index 0000000000000..45d8de783125c --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/rolling_window.rs @@ -0,0 +1,85 @@ +use crate::planner::query_properties::OrderByItem; +use crate::logical_plan::*; +use std::rc::Rc; +use crate::planner::sql_evaluator::MemberSymbol; + +pub struct MultiStageRegularRollingWindow { + pub trailing: Option, + pub leading: Option, + pub offset: String, +} + +impl PrettyPrint for MultiStageRegularRollingWindow { + fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { + result.println("Regular Rolling Window", state); + let state = state.new_level(); + if let Some(trailing) = &self.trailing { + result.println(&format!("trailing: {}", trailing), &state); + } + if let Some(leading) = &self.leading { + result.println(&format!("leading: {}", leading), &state); + } + result.println(&format!("offset: {}", self.offset), &state); + } +} + +pub struct MultiStageToDateRollingWindow { + pub granularity: String, +} + +impl PrettyPrint for MultiStageToDateRollingWindow { + fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { + result.println("ToDate Rolling Window", state); + let state = state.new_level(); + result.println(&format!("granularity: {}", self.granularity), &state); + } +} + +pub enum MultiStageRollingWindowType { + Regular(MultiStageRegularRollingWindow), + ToDate(MultiStageToDateRollingWindow), + RunningTotal, +} + + +impl PrettyPrint for MultiStageRollingWindowType { + fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { + match self { + MultiStageRollingWindowType::Regular(window) => window.pretty_print(result, state), + MultiStageRollingWindowType::ToDate(window) => window.pretty_print(result, state), + MultiStageRollingWindowType::RunningTotal => result.println("Running Total Rolling Window", state), + } + } +} + +pub struct MultiStageRollingWindow { + pub schema: Rc, + pub is_ungrouped: bool, + pub rolling_time_dimension: Rc, + pub rolling_window: MultiStageRollingWindowType, + pub order_by: Vec, + pub time_series_input: String, + pub measure_input: String +} + +impl PrettyPrint for MultiStageRollingWindow { + fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { + self.rolling_window.pretty_print(result, &state); + let details_state = state.new_level(); + if self.is_ungrouped { + result.println("is_ungrouped: true", &state); + } + result.println("schema:", &state); + self.schema.pretty_print(result, &details_state); + result.println(&format!("rolling_time_dimension: {}", self.rolling_time_dimension.full_name()), state); + if !self.order_by.is_empty() { + result.println("order_by:", &state); + for order_by in self.order_by.iter() { + result.println(&format!("{} {}", order_by.name(), if order_by.desc() { "desc" } else { "asc" }), &details_state); + } + } + result.println(&format!("time_series_input: {}", self.time_series_input), &state); + result.println(&format!("measure_input: {}", self.measure_input), &state); + } +} + \ No newline at end of file diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/time_series.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/time_series.rs new file mode 100644 index 0000000000000..2a3781b7c4a85 --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/multistage/time_series.rs @@ -0,0 +1,22 @@ +use crate::logical_plan::*; +use crate::planner::sql_evaluator::MemberSymbol; +use std::rc::Rc; +pub struct MultiStageTimeSeries { + pub time_dimension: Rc, + pub date_range: Option>, + pub get_date_range_multistage_ref: Option, +} + +impl PrettyPrint for MultiStageTimeSeries { + fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { + result.println("Time Series", state); + let state = state.new_level(); + result.println(&format!("time_dimension: {}", self.time_dimension.full_name()), &state); + if let Some(date_range) = &self.date_range { + result.println(&format!("date_range: [{}, {}]", date_range[0], date_range[1]), &state); + } + if let Some(get_date_range_multistage_ref) = &self.get_date_range_multistage_ref { + result.println(&format!("get_date_range_multistage_ref: {}", get_date_range_multistage_ref), &state); + } + } +} \ No newline at end of file diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/pretty_print.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/pretty_print.rs new file mode 100644 index 0000000000000..27ac1bc71a92a --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/pretty_print.rs @@ -0,0 +1,128 @@ +use crate::plan::FilterItem; +use crate::planner::sql_evaluator::MemberSymbol; +use std::rc::Rc; + +#[derive(Clone, Debug)] +pub struct PrettyPrintConfig { + ident_space: usize, +} + +impl Default for PrettyPrintConfig { + fn default() -> Self { + Self { ident_space: 2 } + } +} + +#[derive(Clone, Debug)] +pub struct PrettyPrintState { + config: PrettyPrintConfig, + level: usize, + indent: String, +} + +pub struct PrettyPrintResult { + result: Vec, +} + +impl PrettyPrintResult { + pub fn new() -> Self { + Self { result: Vec::new() } + } + + pub fn println(&mut self, text: &str, state: &PrettyPrintState) { + self.result.push(state.format(text)); + } + + pub fn into_string(self) -> String { + self.result.join("\n") + } +} + +impl PrettyPrintState { + pub fn new(config: PrettyPrintConfig, level: usize) -> Self { + Self { + level, + indent: " ".repeat(level * config.ident_space), + config, + } + } + + pub fn new_level(&self) -> Self { + Self::new(self.config.clone(), self.level + 1) + } + + pub fn format(&self, text: &str) -> String { + format!("{}{}", self.indent, text) + } +} + +impl Default for PrettyPrintState { + fn default() -> Self { + Self::new(PrettyPrintConfig::default(), 0) + } +} + +pub trait PrettyPrint { + fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState); +} + +pub fn print_symbol(symbol: &MemberSymbol) -> String { + format!("{}", symbol.full_name()) +} + +pub fn print_symbols(symbols: &[Rc]) -> String { + symbols + .iter() + .map(|s| print_symbol(s)) + .collect::>() + .join(", ") +} + +pub fn pretty_print(obj: &T) -> String { + let mut result = PrettyPrintResult::new(); + let state = PrettyPrintState::default(); + obj.pretty_print(&mut result, &state); + result.into_string() +} + +pub fn pretty_print_rc(obj: &Rc) -> String { + let mut result = PrettyPrintResult::new(); + let state = PrettyPrintState::default(); + obj.pretty_print(&mut result, &state); + result.into_string() +} + +pub fn pretty_print_filter_item( + result: &mut PrettyPrintResult, + state: &PrettyPrintState, + filter_item: &FilterItem, +) { + match filter_item { + FilterItem::Item(item) => { + result.println( + &format!( + "{{name: {}, operator: {} values: [{}]}}", + item.member_name(), + item.filter_operator().to_string(), + item.values() + .iter() + .map(|v| v.clone().unwrap_or("null".to_string())) + .collect::>() + .join(", ") + ), + state, + ); + } + FilterItem::Group(group) => { + result.println(&format!("{{{}:[", group.operator.to_string()), state); + let items_state = state.new_level(); + for item in group.items.iter() { + pretty_print_filter_item(result, state, item); + } + result.println("]}", state); + } + FilterItem::Segment(base_segment) => { + result.println(&format!("{{segment: {}}}", base_segment.full_name()), state); + }, + } +} diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/query.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/query.rs new file mode 100644 index 0000000000000..eb8afcaf3d2a4 --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/query.rs @@ -0,0 +1,16 @@ +use super::*; +use std::rc::Rc; + +pub enum Query { + SimpleQuery(SimpleQuery), + FullKeyAggregateQuery(FullKeyAggregateQuery), +} + +impl PrettyPrint for Query { + fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { + match self { + Query::SimpleQuery(query) => query.pretty_print(result, state), + Query::FullKeyAggregateQuery(query) => query.pretty_print(result, state), + } + } +} \ No newline at end of file diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/regular_measures_query.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/regular_measures_query.rs new file mode 100644 index 0000000000000..51c16d559ae9f --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/regular_measures_query.rs @@ -0,0 +1,47 @@ +use super::pretty_print::*; +use super::LogicalFilter; +use super::LogicalJoin; +use super::LogicalSchema; +use crate::plan::{Expr, Filter, FilterItem, MemberExpression}; +use crate::planner::sql_evaluator::MemberSymbol; +use crate::planner::query_properties::OrderByItem; +use std::rc::Rc; + +#[derive(Clone)] +pub struct RegularMeasuresQuery { + pub schema: Rc, + pub filter: Rc, + pub offset: Option, + pub limit: Option, + pub ungrouped: bool, + pub order_by: Vec, + pub source: Rc, +} + +impl PrettyPrint for RegularMeasuresQuery { + fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { + result.println("RegularMeasuresQuery: ", state); + let state = state.new_level(); + let details_state = state.new_level(); + result.println("schema:", &state); + self.schema.pretty_print(result, &details_state); + result.println("filters:", &state); + self.filter.pretty_print(result, &details_state); + if let Some(offset) = &self.offset { + result.println(&format!("offset:{}", offset), &state); + } + if let Some(limit) = &self.limit { + result.println(&format!("limit:{}", limit), &state); + } + result.println(&format!("ungrouped:{}", self.ungrouped), &state); + if !self.order_by.is_empty() { + result.println("order_by:", &state); + for order_by in self.order_by.iter() { + result.println(&format!("{} {}", order_by.name(), if order_by.desc() { "desc" } else { "asc" }), &details_state); + } + } + + result.println("source:", &state); + self.source.pretty_print(result, &details_state); + } +} diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/resolve_multiplied_measures.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/resolve_multiplied_measures.rs new file mode 100644 index 0000000000000..7d059e13ecb53 --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/resolve_multiplied_measures.rs @@ -0,0 +1,26 @@ +use super::*; +use crate::plan::{Expr, Filter, FilterItem, MemberExpression}; +use crate::planner::sql_evaluator::MemberSymbol; +use crate::planner::query_properties::OrderByItem; +use std::rc::Rc; + +pub struct ResolveMultipliedMeasures { + pub regular_measure_subqueries: Vec>, + pub aggregate_multiplied_subqueries: Vec>, +} + +impl PrettyPrint for ResolveMultipliedMeasures { + fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { + result.println("ResolveMultipliedMeasures: ", state); + let state = state.new_level(); + let details_state = state.new_level(); + result.println("regular_measure_subqueries:", &state); + for subquery in self.regular_measure_subqueries.iter() { + subquery.pretty_print(result, &details_state); + } + result.println("aggregate_multiplied_subqueries:", &state); + for subquery in self.aggregate_multiplied_subqueries.iter() { + subquery.pretty_print(result, &details_state); + } + } +} \ No newline at end of file diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/schema.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/schema.rs new file mode 100644 index 0000000000000..6eaa64c5e7d46 --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/schema.rs @@ -0,0 +1,66 @@ +use itertools::Itertools; + +use super::pretty_print::*; +use crate::plan::{Expr, Filter, FilterItem, MemberExpression}; +use crate::planner::sql_evaluator::MemberSymbol; +use std::collections::HashSet; +use std::rc::Rc; +pub struct LogicalSchema { + pub time_dimensions: Vec>, + pub dimensions: Vec>, + pub measures: Vec>, + pub multiplied_measures: HashSet, +} + +impl LogicalSchema { + pub fn find_member_position(&self, name: &str) -> Option { + for (i, m) in self.dimensions.iter().enumerate() { + if m.full_name() == name { + return Some(i); + } + } + for (i, m) in self.time_dimensions.iter().enumerate() { + if m.full_name() == name { + return Some(i + self.dimensions.len()); + } else if let Ok(time_dimension) = m.as_time_dimension() { + if time_dimension.base_symbol().full_name() == name { + return Some(i + self.dimensions.len()); + } + } + } + for (i, m) in self.measures.iter().enumerate() { + if m.full_name() == name { + return Some(i + self.time_dimensions.len() + self.dimensions.len()); + } + } + None + } + + pub fn all_dimensions(&self) -> impl Iterator> { + self.dimensions.iter().chain(self.time_dimensions.iter()) + } +} + + +impl PrettyPrint for LogicalSchema { + fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { + result.println( + &format!("-time_dimensions: {}", print_symbols(&self.time_dimensions)), + state, + ); + result.println( + &format!("-dimensions: {}", print_symbols(&self.dimensions)), + state, + ); + result.println( + &format!("-measures: {}", print_symbols(&self.measures)), + state, + ); + if !self.multiplied_measures.is_empty() { + result.println( + &format!("-multiplied_measures: {}", self.multiplied_measures.iter().join(", ")), + state, + ); + } + } +} diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/simple_query.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/simple_query.rs new file mode 100644 index 0000000000000..63ad4a23a551f --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/simple_query.rs @@ -0,0 +1,47 @@ +use super::*; +use std::rc::Rc; +use crate::planner::query_properties::OrderByItem; +pub struct SimpleQuery { + pub schema: Rc, + pub dimension_subqueries: Vec>, + pub filter: Rc, + pub offset: Option, + pub limit: Option, + pub ungrouped: bool, + pub order_by: Vec, + pub source: Rc, +} + +impl PrettyPrint for SimpleQuery { + fn pretty_print(&self, result: &mut PrettyPrintResult, state: &PrettyPrintState) { + result.println("RegularMeasuresQuery: ", state); + let state = state.new_level(); + let details_state = state.new_level(); + result.println("schema:", &state); + self.schema.pretty_print(result, &details_state); + if !self.dimension_subqueries.is_empty() { + result.println("dimension_subqueries:", &state); + for subquery in self.dimension_subqueries.iter() { + subquery.pretty_print(result, &details_state); + } + } + result.println("filters:", &state); + self.filter.pretty_print(result, &details_state); + if let Some(offset) = &self.offset { + result.println(&format!("offset:{}", offset), &state); + } + if let Some(limit) = &self.limit { + result.println(&format!("limit:{}", limit), &state); + } + result.println(&format!("ungrouped:{}", self.ungrouped), &state); + if !self.order_by.is_empty() { + result.println("order_by:", &state); + for order_by in self.order_by.iter() { + result.println(&format!("{} {}", order_by.name(), if order_by.desc() { "desc" } else { "asc" }), &details_state); + } + } + + result.println("source:", &state); + self.source.pretty_print(result, &details_state); + } +} diff --git a/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/builder.rs b/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/builder.rs new file mode 100644 index 0000000000000..6a5fee1d4e6ce --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/builder.rs @@ -0,0 +1,980 @@ +use crate::logical_plan::*; +use crate::plan::schema::QualifiedColumnName; +use crate::plan::*; +use crate::planner::query_properties::OrderByItem; +use crate::planner::query_tools::QueryTools; +use crate::planner::sql_evaluator::sql_nodes::SqlNodesFactory; +use crate::planner::sql_evaluator::MeasureTimeShift; +use crate::planner::sql_evaluator::MemberSymbol; +use crate::planner::sql_evaluator::ReferencesBuilder; +use crate::planner::sql_templates::PlanSqlTemplates; +use crate::planner::SqlJoinCondition; +use crate::planner::{BaseMember, MemberSymbolRef}; +use cubenativeutils::CubeError; +use itertools::Itertools; +use std::collections::HashMap; +use std::collections::HashSet; +use std::rc::Rc; + +#[derive(Clone, Debug)] +struct PhysicalPlanBuilderContext { + pub alias_prefix: Option, + pub render_measure_as_state: bool, //Render measure as state, for example hll state for count_approx + pub render_measure_for_ungrouped: bool, + pub time_shifts: HashMap, +} + +impl Default for PhysicalPlanBuilderContext { + fn default() -> Self { + Self { + alias_prefix: None, + render_measure_as_state: false, + render_measure_for_ungrouped: false, + time_shifts: HashMap::new(), + } + } +} + +impl PhysicalPlanBuilderContext { + pub fn make_sql_nodes_factory(&self) -> SqlNodesFactory { + let mut factory = SqlNodesFactory::new(); + factory.set_time_shifts(self.time_shifts.clone()); + factory.set_count_approx_as_state(self.render_measure_as_state); + factory.set_ungrouped_measure(self.render_measure_for_ungrouped); + factory + } +} + +pub struct PhysicalPlanBuilder { + query_tools: Rc, + plan_sql_templates: PlanSqlTemplates, +} + +impl PhysicalPlanBuilder { + pub fn new(query_tools: Rc) -> Self { + let plan_sql_templates = PlanSqlTemplates::new(query_tools.templates_render()); + Self { + query_tools, + plan_sql_templates, + } + } + + pub fn build(&self, logical_plan: Rc) -> Result, CubeError> { + self.build_impl(logical_plan, &PhysicalPlanBuilderContext::default()) + } + + fn build_impl( + &self, + logical_plan: Rc, + context: &PhysicalPlanBuilderContext, + ) -> Result, CubeError> { + match logical_plan.as_ref() { + Query::SimpleQuery(query) => self.build_simple_query(query, context), + Query::FullKeyAggregateQuery(query) => { + self.build_full_key_aggregate_query(query, context) + } + } + } + + fn build_simple_query( + &self, + logical_plan: &SimpleQuery, + context: &PhysicalPlanBuilderContext, + ) -> Result, CubeError> { + let mut render_references = HashMap::new(); + let from = self.process_logical_join( + &logical_plan.source, + context, + &logical_plan.dimension_subqueries, + &mut render_references, + )?; + let mut select_builder = SelectBuilder::new(from); + let mut context_factory = context.make_sql_nodes_factory(); + context_factory.set_ungrouped(logical_plan.ungrouped); + + let mut group_by = Vec::new(); + for member in logical_plan.schema.dimensions.iter() { + let member_ref: Rc = + MemberSymbolRef::try_new(member.clone(), self.query_tools.clone())?; + select_builder.add_projection_member(&member_ref, None); + if !logical_plan.ungrouped { + group_by.push(Expr::Member(MemberExpression::new(member_ref.clone()))); + } + } + for member in logical_plan.schema.time_dimensions.iter() { + let member_ref: Rc = + MemberSymbolRef::try_new(member.clone(), self.query_tools.clone())?; + select_builder.add_projection_member(&member_ref, None); + if !logical_plan.ungrouped { + group_by.push(Expr::Member(MemberExpression::new(member_ref.clone()))); + } + } + for member in logical_plan.schema.measures.iter() { + select_builder.add_projection_member( + &MemberSymbolRef::try_new(member.clone(), self.query_tools.clone())?, + None, + ); + } + + let filter = logical_plan.filter.all_filters(); + let having = if logical_plan.filter.measures_filter.is_empty() { + None + } else { + Some(Filter { + items: logical_plan.filter.measures_filter.clone(), + }) + }; + + select_builder.set_filter(filter); + select_builder.set_group_by(group_by); + select_builder + .set_order_by(self.make_order_by(&logical_plan.schema, &logical_plan.order_by)?); + select_builder.set_having(having); + select_builder.set_limit(logical_plan.limit); + select_builder.set_offset(logical_plan.offset); + + context_factory + .set_rendered_as_multiplied_measures(logical_plan.schema.multiplied_measures.clone()); + context_factory.set_render_references(render_references); + if logical_plan.ungrouped { + context_factory.set_ungrouped(true); + } + + let res = Rc::new(select_builder.build(context_factory)); + Ok(res) + } + + fn build_full_key_aggregate_query( + &self, + logical_plan: &FullKeyAggregateQuery, + context: &PhysicalPlanBuilderContext, + ) -> Result, CubeError> { + let mut multi_stage_schemas = HashMap::new(); + let mut ctes = Vec::new(); + for multi_stage_member in logical_plan.multistage_members.iter() { + ctes.push(self.processs_multi_stage_member(multi_stage_member, &mut multi_stage_schemas, context)?); + } + let (from, joins_len) = + self.process_full_key_aggregate(&logical_plan.source, context, &multi_stage_schemas)?; + + let references_builder = ReferencesBuilder::new(from.clone()); + let mut render_references = HashMap::new(); + + let mut select_builder = SelectBuilder::new(from.clone()); + let all_dimensions = logical_plan.schema.all_dimensions().cloned().collect_vec(); + + self.process_full_key_aggregate_dimensions( + &all_dimensions, + &logical_plan.source, + &mut select_builder, + &references_builder, + &mut render_references, + joins_len, + context, + )?; + + for measure in logical_plan.schema.measures.iter() { + references_builder.resolve_references_for_member( + measure.clone(), + &None, + &mut render_references, + )?; + let alias = references_builder.resolve_alias_for_member(&measure.full_name(), &None); + select_builder.add_projection_member( + &measure.clone().as_base_member(self.query_tools.clone())?, + alias, + ); + } + + let having = if logical_plan.filter.measures_filter.is_empty() { + None + } else { + let filter = Filter { + items: logical_plan.filter.measures_filter.clone(), + }; + references_builder.resolve_references_for_filter(&filter, &mut render_references)?; + Some(filter) + }; + + select_builder + .set_order_by(self.make_order_by(&logical_plan.schema, &logical_plan.order_by)?); + select_builder.set_filter(having); + select_builder.set_limit(logical_plan.limit); + select_builder.set_offset(logical_plan.offset); + select_builder.set_ctes(ctes); + + let mut context_factory = context.make_sql_nodes_factory(); + context_factory.set_render_references(render_references); + + Ok(Rc::new(select_builder.build(context_factory))) + } + + //FIXME refactor required + fn process_full_key_aggregate_dimensions( + &self, + dimensions: &Vec>, + full_key_aggregate: &Rc, + select_builder: &mut SelectBuilder, + references_builder: &ReferencesBuilder, + render_references: &mut HashMap, + joins_len: usize, + context: &PhysicalPlanBuilderContext, + ) -> Result<(), CubeError> { + let dimensions_for_join_names = full_key_aggregate + .join_dimensions + .iter() + .map(|dim| dim.full_name()) + .collect::>(); + let source_for_join_dimensions = Some(format!("q_0")); + for dim in dimensions.iter() { + let dimension_ref = dim.clone().as_base_member(self.query_tools.clone())?; + if dimensions_for_join_names.contains(&dim.full_name()) { + references_builder.resolve_references_for_member( + dim.clone(), + &source_for_join_dimensions, + render_references, + )?; + let references = (0..joins_len) + .map(|i| { + let alias = format!("q_{}", i); + references_builder + .find_reference_for_member(&dim.full_name(), &Some(alias.clone())) + .ok_or_else(|| { + CubeError::internal(format!( + "Reference for join not found for {} in {}", + dim.full_name(), + alias + )) + }) + }) + .collect::, _>>()?; + let alias = references_builder + .resolve_alias_for_member(&dim.full_name(), &source_for_join_dimensions); + select_builder.add_projection_coalesce_member(&dimension_ref, references, alias)?; + } else { + references_builder.resolve_references_for_member( + dim.clone(), + &None, + render_references, + )?; + select_builder.add_projection_member(&dimension_ref, None); + } + } + Ok(()) + } + + fn process_full_key_aggregate( + &self, + full_key_aggregate: &Rc, + context: &PhysicalPlanBuilderContext, + multi_stage_schemas: &HashMap>, + ) -> Result<(Rc, usize), CubeError> { + let mut joins = Vec::new(); + for source in full_key_aggregate.sources.iter() { + match source { + FullKeyAggregateSource::ResolveMultipliedMeasures(resolve_multiplied_measures) => { + joins.append(&mut self.process_resolve_multiplied_measures( + resolve_multiplied_measures, + context, + )?); + } + FullKeyAggregateSource::MultiStageSubqueryRef(subquery_ref) => { + if let Some(schema) = multi_stage_schemas.get(&subquery_ref.name) { + joins.push(SingleSource::TableReference( + subquery_ref.name.clone(), + schema.clone(), + )); + } else { + return Err(CubeError::internal(format!( + "MultiStageSubqueryRef not found: {}", + subquery_ref.name + ))); + } + } + } + } + + if joins.is_empty() { + return Err(CubeError::internal(format!( + "FullKeyAggregate should have at least one source: {}", + pretty_print_rc(full_key_aggregate) + ))); + } + + let dimensions_for_join = full_key_aggregate + .join_dimensions + .iter() + .map(|dim| -> Result, CubeError> { + dim.clone().as_base_member(self.query_tools.clone()) + }) + .collect::, _>>()?; + + let mut join_builder = JoinBuilder::new_from_source(joins[0].clone(), format!("q_0")); + + for (i, join) in joins.iter().enumerate().skip(1) { + let right_alias = format!("q_{}", i); + let left_schema = joins[i - 1].schema(); + let right_schema = joins[i].schema(); + // TODO every next join should join to all previous dimensions through OR: q_0.a = q_1.a, q_0.a = q_2.a OR q_1.a = q_2.a, ... + let conditions = dimensions_for_join + .iter() + .map(|dim| { + (0..i) + .map(|left_i| { + let left_alias = format!("q_{}", left_i); + let alias_in_left_query = left_schema.resolve_member_alias(dim); + let left_ref = Expr::Reference(QualifiedColumnName::new( + Some(left_alias.clone()), + alias_in_left_query, + )); + let alias_in_right_query = right_schema.resolve_member_alias(dim); + let right_ref = Expr::Reference(QualifiedColumnName::new( + Some(right_alias.clone()), + alias_in_right_query, + )); + (left_ref, right_ref) + }) + .collect::>() + }) + .collect_vec(); + let on = JoinCondition::new_dimension_join(conditions, true); + let next_alias = format!("q_{}", i); + if self.plan_sql_templates.supports_full_join() { + join_builder.full_join_source(join.clone(), next_alias, on); + } else { + // TODO in case of full join is not supported there should be correct blending query that keeps NULL values + join_builder.inner_join_source(join.clone(), next_alias, on); + } + } + + let result = From::new_from_join(join_builder.build()); + Ok((result, joins.len())) + } + + fn process_resolve_multiplied_measures( + &self, + resolve_multiplied_measures: &Rc, + context: &PhysicalPlanBuilderContext, + ) -> Result, CubeError> { + let mut joins = Vec::new(); + for (i, regular_measure_subquery) in resolve_multiplied_measures + .regular_measure_subqueries + .iter() + .enumerate() + { + let mut regular_measure_context = context.clone(); + regular_measure_context.alias_prefix = if i == 0 { + Some(format!("main")) + } else { + Some(format!("main_{}", i)) + }; + let select = + self.build_simple_query(regular_measure_subquery, ®ular_measure_context)?; + let source = SingleSource::Subquery(Rc::new(QueryPlan::Select(select))); + joins.push(source); + } + for multiplied_measure_subquery in resolve_multiplied_measures + .aggregate_multiplied_subqueries + .iter() + { + let select = + self.process_aggregate_multiplied_subquery(multiplied_measure_subquery, context)?; + let source = SingleSource::Subquery(Rc::new(QueryPlan::Select(select))); + joins.push(source); + } + Ok(joins) + } + + fn process_logical_join( + &self, + logical_join: &LogicalJoin, + context: &PhysicalPlanBuilderContext, + dimension_subqueries: &Vec>, + render_references: &mut HashMap, + ) -> Result, CubeError> { + let root = logical_join.root.cube.clone(); + if logical_join.joins.is_empty() && dimension_subqueries.is_empty() { + Ok(From::new_from_cube(root, context.alias_prefix.clone())) + } else { + let mut join_builder = JoinBuilder::new_from_cube( + root.clone(), + Some(root.default_alias_with_prefix(&context.alias_prefix)), + ); + for dimension_subquery in dimension_subqueries + .iter() + .filter(|d| &d.subquery_dimension.cube_name() == root.name()) + { + self.add_subquery_join( + dimension_subquery.clone(), + &mut join_builder, + render_references, + context, + )?; + } + for join in logical_join.joins.iter() { + match join { + LogicalJoinItem::CubeJoinItem(CubeJoinItem { cube, on_sql }) => { + join_builder.left_join_cube( + cube.cube.clone(), + Some(cube.cube.default_alias_with_prefix(&context.alias_prefix)), + JoinCondition::new_base_join(SqlJoinCondition::try_new( + self.query_tools.clone(), + on_sql.clone(), + )?), + ); + for dimension_subquery in dimension_subqueries + .iter() + .filter(|d| &d.subquery_dimension.cube_name() == cube.cube.name()) + { + self.add_subquery_join( + dimension_subquery.clone(), + &mut join_builder, + render_references, + context, + )?; + } + } + } + } + Ok(From::new_from_join(join_builder.build())) + } + } + + fn add_subquery_join( + &self, + dimension_subquery: Rc, + join_builder: &mut JoinBuilder, + render_references: &mut HashMap, + context: &PhysicalPlanBuilderContext, + ) -> Result<(), CubeError> { + let sub_query = self.build_impl(dimension_subquery.query.clone(), context)?; + let dim_name = dimension_subquery.subquery_dimension.name(); + let cube_name = dimension_subquery.subquery_dimension.cube_name(); + let primary_keys_dimensions = &dimension_subquery.primary_keys_dimensions; + let sub_query_alias = format!("{cube_name}_{dim_name}_subquery"); + let conditions = primary_keys_dimensions + .iter() + .map(|dim| -> Result<_, CubeError> { + let dim = dim.clone().as_base_member(self.query_tools.clone())?; + let alias_in_sub_query = sub_query.schema().resolve_member_alias(&dim); + let sub_query_ref = Expr::Reference(QualifiedColumnName::new( + Some(sub_query_alias.clone()), + alias_in_sub_query.clone(), + )); + + Ok(vec![(sub_query_ref, Expr::new_member(dim))]) + }) + .collect::, _>>()?; + + if let Some(dim_ref) = sub_query.schema().resolve_member_reference( + &dimension_subquery + .measure_for_subquery_dimension + .full_name(), + ) { + let qualified_column_name = + QualifiedColumnName::new(Some(sub_query_alias.clone()), dim_ref); + render_references.insert( + dimension_subquery.subquery_dimension.full_name(), + qualified_column_name, + ); + } else { + return Err(CubeError::internal(format!( + "Can't find source for subquery dimension {}", + dim_name + ))); + } + join_builder.left_join_subselect( + sub_query, + sub_query_alias, + JoinCondition::new_dimension_join(conditions, false), + ); + Ok(()) + } + + fn process_aggregate_multiplied_subquery( + &self, + aggregate_multiplied_subquery: &Rc, + context: &PhysicalPlanBuilderContext, + ) -> Result, CubeError> { + let mut render_references = HashMap::new(); + let keys_query = + self.process_keys_sub_query(&aggregate_multiplied_subquery.keys_subquery, context)?; + + let keys_query_alias = format!("keys"); + + let mut join_builder = + JoinBuilder::new_from_subselect(keys_query.clone(), keys_query_alias.clone()); + + match aggregate_multiplied_subquery.source.as_ref() { + AggregateMultipliedSubquerySouce::Cube(cube) => { + let primary_keys_dimensions = &aggregate_multiplied_subquery + .keys_subquery + .primary_keys_dimensions; + + let conditions = primary_keys_dimensions + .iter() + .map(|dim| -> Result<_, CubeError> { + let member_ref = dim.clone().as_base_member(self.query_tools.clone())?; + let alias_in_keys_query = + keys_query.schema().resolve_member_alias(&member_ref); + let keys_query_ref = Expr::Reference(QualifiedColumnName::new( + Some(keys_query_alias.clone()), + alias_in_keys_query, + )); + let pk_cube_expr = Expr::Member(MemberExpression::new(member_ref.clone())); + Ok(vec![(keys_query_ref, pk_cube_expr)]) + }) + .collect::, _>>()?; + + let pk_cube_alias = cube + .cube + .default_alias_with_prefix(&Some(format!("{}_key", cube.cube.default_alias()))); + + join_builder.left_join_cube( + cube.cube.clone(), + Some(pk_cube_alias.clone()), + JoinCondition::new_dimension_join(conditions, false), + ); + for dimension_subquery in aggregate_multiplied_subquery.dimension_subqueries.iter() + { + self.add_subquery_join( + dimension_subquery.clone(), + &mut join_builder, + &mut render_references, + context, + )?; + } + } + } + + let from = From::new_from_join(join_builder.build()); + let references_builder = ReferencesBuilder::new(from.clone()); + let mut select_builder = SelectBuilder::new(from.clone()); + let mut group_by = Vec::new(); + for member in aggregate_multiplied_subquery.schema.all_dimensions() { + references_builder.resolve_references_for_member( + member.clone(), + &None, + &mut render_references, + )?; + let alias = references_builder.resolve_alias_for_member(&member.full_name(), &None); + let member_ref = member.clone().as_base_member(self.query_tools.clone())?; + group_by.push(Expr::Member(MemberExpression::new(member_ref.clone()))); + select_builder.add_projection_member(&member_ref, alias); + } + for member in aggregate_multiplied_subquery.schema.measures.iter() { + references_builder.resolve_references_for_member( + member.clone(), + &None, + &mut render_references, + )?; + let alias = references_builder.resolve_alias_for_member(&member.full_name(), &None); + select_builder.add_projection_member( + &member.clone().as_base_member(self.query_tools.clone())?, + alias, + ); + } + select_builder.set_group_by(group_by); + let mut context_factory = context.make_sql_nodes_factory(); + context_factory.set_render_references(render_references); + Ok(Rc::new(select_builder.build(context_factory))) + } + + fn process_keys_sub_query( + &self, + keys_subquery: &Rc, + context: &PhysicalPlanBuilderContext, + ) -> Result, CubeError> { + let mut render_references = HashMap::new(); + let alias_prefix = Some(format!( + "{}_key", + self.query_tools + .alias_for_cube(&keys_subquery.key_cube_name)? + )); + + let mut context = context.clone(); + context.alias_prefix = alias_prefix; + let source = self.process_logical_join( + &keys_subquery.source, + &context, + &keys_subquery.dimension_subqueries, + &mut render_references, + )?; + let mut select_builder = SelectBuilder::new(source); + for member in keys_subquery + .dimensions + .iter() + .chain(keys_subquery.time_dimensions.iter()) + .chain(keys_subquery.primary_keys_dimensions.iter()) + { + let member_ref: Rc = + MemberSymbolRef::try_new(member.clone(), self.query_tools.clone())?; + let alias = member_ref.alias_name(); + select_builder.add_projection_member(&member_ref, Some(alias.clone())); + } + + select_builder.set_distinct(); + select_builder.set_filter(keys_subquery.filter.all_filters()); + let mut context_factory = context.make_sql_nodes_factory(); + context_factory.set_render_references(render_references); + let res = Rc::new(select_builder.build(context_factory)); + Ok(res) + } + + fn make_order_by( + &self, + logical_schema: &LogicalSchema, + order_by: &Vec, + ) -> Result, CubeError> { + let mut result = Vec::new(); + for o in order_by.iter() { + if let Some(position) = logical_schema.find_member_position(&o.name()) { + let member_ref: Rc = + MemberSymbolRef::try_new(o.member_symbol(), self.query_tools.clone())?; + result.push(OrderBy::new( + Expr::Member(MemberExpression::new(member_ref)), + position + 1, + o.desc(), + )); + } + } + Ok(result) + } + + fn processs_multi_stage_member( + &self, + logical_plan: &Rc, + multi_stage_schemas: &mut HashMap>, + context: &PhysicalPlanBuilderContext, + ) -> Result, CubeError> { + let query = match &logical_plan.member_type { + MultiStageMemberLogicalType::LeafMeasure(measure) => { + self.process_multi_stage_leaf_measure(&measure, context)? + } + MultiStageMemberLogicalType::MeasureCalculation(calculation) => { + self.process_multi_stage_measure_calculation(&calculation, multi_stage_schemas, context)? + } + MultiStageMemberLogicalType::GetDateRange(get_date_range) => { + self.process_multi_stage_get_date_range(&get_date_range, context)? + } + MultiStageMemberLogicalType::TimeSeries(time_series) => self.process_multi_stage_time_series(&time_series, context)?, + MultiStageMemberLogicalType::RollingWindow(rolling_window) => self.process_multi_stage_rolling_window(&rolling_window, multi_stage_schemas, context)?, + }; + let alias = logical_plan.name.clone(); + multi_stage_schemas.insert(alias.clone(), query.schema().clone()); + Ok(Rc::new(Cte::new(query, alias))) + } + + fn process_multi_stage_leaf_measure( + &self, + leaf_measure: &MultiStageLeafMeasure, + context: &PhysicalPlanBuilderContext, + ) -> Result, CubeError> { + let mut context = context.clone(); + context.render_measure_as_state = leaf_measure.render_measure_as_state; + context.render_measure_for_ungrouped = leaf_measure.render_measure_for_ungrouped; + context.time_shifts = leaf_measure.time_shifts.clone(); + let select = self.build_impl(leaf_measure.query.clone(), &context)?; + Ok(Rc::new(QueryPlan::Select(select))) + } + + fn process_multi_stage_get_date_range( + &self, + get_date_range: &MultiStageGetDateRange, + context: &PhysicalPlanBuilderContext, + ) -> Result, CubeError> { + + let mut render_references = HashMap::new(); + let from = self.process_logical_join( + &get_date_range.source, + context, + &get_date_range.dimension_subqueries, + &mut render_references, + )?; + let mut select_builder = SelectBuilder::new(from); + let mut context_factory = context.make_sql_nodes_factory(); + let args = vec![get_date_range.time_dimension.clone().as_base_member(self.query_tools.clone())?]; + select_builder.add_projection_function_expression( + "MAX", + args.clone(), + "date_to".to_string(), + ); + + select_builder.add_projection_function_expression( + "MIN", + args.clone(), + "date_from".to_string(), + ); + context_factory.set_render_references(render_references); + let select = Rc::new(select_builder.build(context_factory)); + Ok(Rc::new(QueryPlan::Select(select))) + } + + fn process_multi_stage_time_series( + &self, + time_series: &MultiStageTimeSeries, + context: &PhysicalPlanBuilderContext, + ) -> Result, CubeError> { + let time_dimension = time_series.time_dimension.clone(); + let time_dimension_symbol = time_dimension.as_time_dimension()?; + let date_range = time_series.date_range.clone(); + let granularity_obj = if let Some(granularity_obj) = time_dimension_symbol.granularity_obj() { + granularity_obj.clone() + } else { + return Err(CubeError::user( + "Time dimension granularity is required for rolling window".to_string(), + )); + }; + + let templates = PlanSqlTemplates::new(self.query_tools.templates_render()); + + let ts_date_range = if templates.supports_generated_time_series() { + if let Some(date_range) = time_dimension_symbol.get_range_for_time_series(date_range, self.query_tools.timezone())? { + TimeSeriesDateRange::Filter(date_range.0.clone(), date_range.1.clone()) + } else { + if let Some(date_range_cte) = &time_series.get_date_range_multistage_ref { + TimeSeriesDateRange::Generated(date_range_cte.clone()) + } else { + return Err(CubeError::internal( + "Date range cte is required for time series without date range".to_string(), + )); + } + } + } else { + if let Some(date_range) = &time_series.date_range { + TimeSeriesDateRange::Filter(date_range[0].clone(), date_range[1].clone()) + } else { + return Err(CubeError::internal( + "Date range is required for time series without date range".to_string(), + )); + } + }; + + let time_seira = TimeSeries::new( + self.query_tools.clone(), + time_dimension.full_name(), + ts_date_range, + granularity_obj, + ); + let query_plan = Rc::new(QueryPlan::TimeSeries(Rc::new(time_seira))); + Ok(query_plan) + } + + fn process_multi_stage_rolling_window( + &self, + rolling_window: &MultiStageRollingWindow, + multi_stage_schemas: &HashMap>, + context: &PhysicalPlanBuilderContext, + ) -> Result, CubeError> { + let time_dimension = rolling_window.rolling_time_dimension.clone(); + let time_dimension_ref = time_dimension.clone().as_base_member(self.query_tools.clone())?; + let time_series_ref = rolling_window.time_series_input.clone(); + let measure_input_ref = rolling_window.measure_input.clone(); + + let time_series_schema = if let Some(schema) = multi_stage_schemas.get(&time_series_ref) { + schema.clone() + } else { + return Err(CubeError::internal( + format!("Schema not found for cte {}", time_series_ref), + )); + }; + let measure_input_schema = if let Some(schema) = multi_stage_schemas.get(&measure_input_ref) { + schema.clone() + } else { + return Err(CubeError::internal( + format!("Schema not found for cte {}", measure_input_ref), + )); + }; + + let time_dimension_alias = measure_input_schema + .resolve_member_alias(&time_dimension_ref); + + let root_alias = format!("time_series"); + let measure_input_alias = format!("rolling_source"); + + let mut join_builder = JoinBuilder::new_from_table_reference( + time_series_ref.clone(), + time_series_schema, + Some(root_alias.clone()), + ); + + let on = match &rolling_window.rolling_window { + MultiStageRollingWindowType::Regular(regular_rolling_window) => { + JoinCondition::new_regular_rolling_join( + root_alias.clone(), + regular_rolling_window.trailing.clone(), + regular_rolling_window.leading.clone(), + regular_rolling_window.offset.clone(), + Expr::Reference(QualifiedColumnName::new( + Some(measure_input_alias.clone()), + time_dimension_alias, + )), + ) + } + MultiStageRollingWindowType::ToDate(to_date_rolling_window) => { + JoinCondition::new_to_date_rolling_join( + root_alias.clone(), + to_date_rolling_window.granularity.clone(), + Expr::Reference(QualifiedColumnName::new( + Some(measure_input_alias.clone()), + time_dimension_alias, + )), + self.query_tools.clone(), + ) + } + MultiStageRollingWindowType::RunningTotal => JoinCondition::new_rolling_total_join( + root_alias.clone(), + Expr::Reference(QualifiedColumnName::new( + Some(measure_input_alias.clone()), + time_dimension_alias, + )), + ), + }; + + join_builder.left_join_table_reference( + measure_input_ref.clone(), + measure_input_schema.clone(), + Some(measure_input_alias.clone()), + on, + ); + + let mut context_factory = context.make_sql_nodes_factory(); + context_factory.set_rolling_window(true); + let from = From::new_from_join(join_builder.build()); + let references_builder = ReferencesBuilder::new(from.clone()); + let mut render_references = HashMap::new(); + let mut select_builder = SelectBuilder::new(from.clone()); + + //We insert render reference for main time dimension (with the some granularity as in time series to avoid unnessesary date_tranc) + render_references.insert( + time_dimension.full_name(), + QualifiedColumnName::new(Some(root_alias.clone()), format!("date_from")), + ); + + //We also insert render reference for the base dimension of time dimension (i.e. without `_granularity` prefix to let other time dimensions make date_tranc) + render_references.insert( + time_dimension + .as_time_dimension()? + .base_symbol() + .full_name(), + QualifiedColumnName::new(Some(root_alias.clone()), format!("date_from")), + ); + + for dim in rolling_window.schema.time_dimensions.iter() { + context_factory.add_dimensions_with_ignored_timezone(dim.full_name()); + } + + for dim in rolling_window.schema.all_dimensions() { + if dim.full_name() != time_dimension.full_name() { + references_builder.resolve_references_for_member( + dim.clone(), + &Some(measure_input_alias.clone()), + &mut render_references, + )?; + } + let alias = + references_builder.resolve_alias_for_member(&dim.full_name(), &Some(measure_input_alias.clone())); + select_builder.add_projection_member(&dim.clone().as_base_member(self.query_tools.clone())?, alias); + } + + for measure in rolling_window.schema.measures.iter() { + let measure_ref = measure.clone().as_base_member(self.query_tools.clone())?; + let name_in_base_query = measure_input_schema.resolve_member_alias(&measure_ref); + context_factory.add_ungrouped_measure_reference( + measure.full_name(), + QualifiedColumnName::new(Some(measure_input_alias.clone()), name_in_base_query), + ); + + select_builder.add_projection_member(&measure_ref, None); + } + + if !rolling_window.is_ungrouped { + let group_by = rolling_window.schema.all_dimensions().map(|dim| -> Result<_, CubeError> { + + let member_ref: Rc = + MemberSymbolRef::try_new(dim.clone(), self.query_tools.clone())?; + Ok(Expr::Member(MemberExpression::new(member_ref.clone()))) + }).collect::, _>>()?; + select_builder.set_group_by(group_by); + select_builder.set_order_by( + self.make_order_by(&rolling_window.schema, &rolling_window.order_by)?, + ); + } else { + context_factory.set_ungrouped(true); + } + + context_factory.set_render_references(render_references); + let select = Rc::new(select_builder.build(context_factory)); + Ok(Rc::new(QueryPlan::Select(select))) + } + + + fn process_multi_stage_measure_calculation( + &self, + measure_calculation: &MultiStageMeasureCalculation, + multi_stage_schemas: &HashMap>, + context: &PhysicalPlanBuilderContext, + ) -> Result, CubeError> { + let (from, joins_len) = self.process_full_key_aggregate( + &measure_calculation.source, + context, + multi_stage_schemas, + )?; + let references_builder = ReferencesBuilder::new(from.clone()); + let mut render_references = HashMap::new(); + + let mut select_builder = SelectBuilder::new(from.clone()); + let all_dimensions = measure_calculation + .schema + .all_dimensions() + .cloned() + .collect_vec(); + + self.process_full_key_aggregate_dimensions( + &all_dimensions, + &measure_calculation.source, + &mut select_builder, + &references_builder, + &mut render_references, + joins_len, + context, + )?; + + for measure in measure_calculation.schema.measures.iter() { + references_builder.resolve_references_for_member( + measure.clone(), + &None, + &mut render_references, + )?; + let alias = references_builder.resolve_alias_for_member(&measure.full_name(), &None); + select_builder.add_projection_member( + &measure.clone().as_base_member(self.query_tools.clone())?, + alias, + ); + } + + + if !measure_calculation.is_ungrouped { + let group_by = all_dimensions.iter().map(|dim| -> Result<_, CubeError> { + + let member_ref: Rc = + MemberSymbolRef::try_new(dim.clone(), self.query_tools.clone())?; + Ok(Expr::Member(MemberExpression::new(member_ref.clone()))) + }).collect::, _>>()?; + select_builder.set_group_by(group_by); + select_builder.set_order_by( + self.make_order_by(&measure_calculation.schema, &measure_calculation.order_by)?, + ); + } + + let mut context_factory = context.make_sql_nodes_factory(); + match &measure_calculation.window_function_to_use { + MultiStageCalculationWindowFunction::Rank => context_factory.set_multi_stage_rank(measure_calculation.partition_by.clone()), + MultiStageCalculationWindowFunction::Window => context_factory.set_multi_stage_window(measure_calculation.partition_by.clone()), + MultiStageCalculationWindowFunction::None => {}, + } + context_factory.set_render_references(render_references); + let select = Rc::new(select_builder.build(context_factory)); + Ok(Rc::new(QueryPlan::Select(select))) + } +} diff --git a/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/mod.rs b/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/mod.rs new file mode 100644 index 0000000000000..26bc1f8648f03 --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/mod.rs @@ -0,0 +1,2 @@ +mod builder; +pub use builder::*; diff --git a/rust/cubesqlplanner/cubesqlplanner/src/plan/builder/join.rs b/rust/cubesqlplanner/cubesqlplanner/src/plan/builder/join.rs index 22ea6911dcf07..1f5c0f241175d 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/plan/builder/join.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/plan/builder/join.rs @@ -1,5 +1,5 @@ use crate::plan::join::JoinType; -use crate::plan::{Join, JoinCondition, JoinItem, QueryPlan, Schema, Select, SingleAliasedSource}; +use crate::plan::{Join, JoinCondition, JoinItem, QueryPlan, Schema, Select, SingleAliasedSource, SingleSource}; use crate::planner::BaseCube; use std::rc::Rc; @@ -34,6 +34,10 @@ impl JoinBuilder { Self::new(SingleAliasedSource::new_from_subquery(plan, alias)) } + pub fn new_from_source(source: SingleSource, alias: String) -> Self { + Self::new(SingleAliasedSource::new_from_source(source, alias)) + } + pub fn new_from_subselect(plan: Rc { - let schema = cte_schemas.get(alias).unwrap().clone(); - let select_builder = - SelectBuilder::new(From::new_from_table_reference(alias.clone(), schema, None)); - - Rc::new(select_builder.build(SqlNodesFactory::new())) - } - fn create_multi_stage_inode_member( &self, base_member: Rc, @@ -341,11 +262,4 @@ impl MultiStageQueryPlanner { descriptions.push(description.clone()); Ok(description) } - - fn compile_dimension(&self, name: &String) -> Result, CubeError> { - let evaluator_compiler_cell = self.query_tools.evaluator_compiler().clone(); - let mut evaluator_compiler = evaluator_compiler_cell.borrow_mut(); - let evaluator = evaluator_compiler.add_dimension_evaluator(name.clone())?; - BaseDimension::try_new_required(evaluator, self.query_tools.clone()) - } } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multiplied_measures_query_planner.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multiplied_measures_query_planner.rs index e4fc736eaafc2..12161af9d8fbf 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multiplied_measures_query_planner.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multiplied_measures_query_planner.rs @@ -1,24 +1,16 @@ use super::{CommonUtils, DimensionSubqueryPlanner, JoinPlanner}; use crate::cube_bridge::join_definition::JoinDefinition; use crate::logical_plan::*; -use crate::plan::{ - Expr, From, JoinBuilder, JoinCondition, MemberExpression, QualifiedColumnName, Select, - SelectBuilder, -}; use crate::planner::query_tools::QueryTools; use crate::planner::sql_evaluator::collectors::{ collect_cube_names, collect_join_hints, collect_join_hints_for_measures, collect_sub_query_dimensions_from_members, collect_sub_query_dimensions_from_symbols, }; -use crate::planner::sql_evaluator::sql_nodes::SqlNodesFactory; -use crate::planner::sql_evaluator::ReferencesBuilder; -use crate::planner::sql_templates::PlanSqlTemplates; use crate::planner::{ BaseMeasure, BaseMember, BaseMemberHelper, FullKeyAggregateMeasures, QueryProperties, }; use cubenativeutils::CubeError; use itertools::Itertools; -use std::collections::HashMap; use std::rc::Rc; pub struct MultipliedMeasuresQueryPlanner { @@ -26,7 +18,6 @@ pub struct MultipliedMeasuresQueryPlanner { query_properties: Rc, join_planner: JoinPlanner, common_utils: CommonUtils, - context_factory: SqlNodesFactory, full_key_aggregate_measures: FullKeyAggregateMeasures, } @@ -34,7 +25,6 @@ impl MultipliedMeasuresQueryPlanner { pub fn try_new( query_tools: Rc, query_properties: Rc, - context_factory: SqlNodesFactory, ) -> Result { let full_key_aggregate_measures = query_properties.full_key_aggregate_measures()?; Ok(Self { @@ -42,12 +32,11 @@ impl MultipliedMeasuresQueryPlanner { join_planner: JoinPlanner::new(query_tools.clone()), common_utils: CommonUtils::new(query_tools.clone()), query_properties, - context_factory, full_key_aggregate_measures, }) } - pub fn plan_logical_queries(&self) -> Result, CubeError> { + pub fn plan_queries(&self) -> Result, CubeError> { if self.query_properties.is_simple_query()? { return Err(CubeError::internal(format!( "MultipliedMeasuresQueryPlanner should not be used for simple query" @@ -65,9 +54,9 @@ impl MultipliedMeasuresQueryPlanner { .compute_join_multi_fact_groups_with_measures( &full_key_aggregate_measures.regular_measures, )?; - for (i, (join, measures)) in join_multi_fact_groups.iter().enumerate() { + for (join, measures) in join_multi_fact_groups.iter() { let regular_subquery_logical_plan = - self.regular_measures_subquery_logical_plan(measures, join.clone())?; + self.regular_measures_subquery(measures, join.clone())?; regular_measure_subqueries.push(regular_subquery_logical_plan); } } @@ -89,7 +78,7 @@ impl MultipliedMeasuresQueryPlanner { ) )); } - let aggregate_subquery_logical_plan = self.aggregate_subquery_logical_plan( + let aggregate_subquery_logical_plan = self.aggregate_subquery_plan( &cube_name, &measures, join_multi_fact_groups.into_iter().next().unwrap().0, @@ -102,70 +91,8 @@ impl MultipliedMeasuresQueryPlanner { }); Ok(result) } - pub fn plan_queries(&self) -> Result>, CubeError> { - if self.query_properties.is_simple_query()? { - return Err(CubeError::internal(format!( - "MultipliedMeasuresQueryPlanner should not be used for simple query" - ))); - } - - let full_key_aggregate_measures = &self.full_key_aggregate_measures; - - let mut joins = Vec::new(); - - if !full_key_aggregate_measures.regular_measures.is_empty() { - let join_multi_fact_groups = self - .query_properties - .compute_join_multi_fact_groups_with_measures( - &full_key_aggregate_measures.regular_measures, - )?; - for (i, (join, measures)) in join_multi_fact_groups.iter().enumerate() { - let regular_subquery = self.regular_measures_subquery( - measures, - join.clone(), - if i == 0 { - "main".to_string() - } else { - format!("main_{}", i) - }, - )?; - joins.push(regular_subquery); - } - } - - for (cube_name, measures) in full_key_aggregate_measures - .multiplied_measures - .clone() - .into_iter() - .into_group_map_by(|m| m.cube_name().clone()) - { - let join_multi_fact_groups = self - .query_properties - .compute_join_multi_fact_groups_with_measures(&measures)?; - if join_multi_fact_groups.len() != 1 { - return Err(CubeError::internal( - format!( - "Expected just one multi-fact join group for aggregate measures but got multiple: {}", - join_multi_fact_groups.into_iter().map(|(_, measures)| format!("({})", measures.iter().map(|m| m.full_name()).join(", "))).join(", ") - ) - )); - } - let aggregate_subquery = self.aggregate_subquery( - &cube_name, - &measures, - join_multi_fact_groups.iter().next().unwrap().0.clone(), - )?; - let aggregate_subquery_logical_plan = self.aggregate_subquery_logical_plan( - &cube_name, - &measures, - join_multi_fact_groups.into_iter().next().unwrap().0, - )?; - joins.push(aggregate_subquery); - } - Ok(joins) - } - fn aggregate_subquery_logical_plan( + fn aggregate_subquery_plan( &self, key_cube_name: &String, measures: &Vec>, @@ -188,7 +115,7 @@ impl MultipliedMeasuresQueryPlanner { self.query_properties.clone(), )?; let subquery_dimension_queries = - dimension_subquery_planner.plan_logical_queries(&subquery_dimensions)?; + dimension_subquery_planner.plan_queries(&subquery_dimensions)?; let primary_keys_dimensions = self .common_utils @@ -197,7 +124,7 @@ impl MultipliedMeasuresQueryPlanner { .map(|d| d.as_base_member()) .collect_vec(); let keys_subquery = - self.key_query_logical_plan(&primary_keys_dimensions, key_join.clone(), key_cube_name)?; + self.key_query(&primary_keys_dimensions, key_join.clone(), key_cube_name)?; let schema = Rc::new(LogicalSchema { time_dimensions: self.query_properties.time_dimension_symbols(), dimensions: self.query_properties.dimension_symbols(), @@ -211,7 +138,7 @@ impl MultipliedMeasuresQueryPlanner { let should_build_join_for_measure_select = self.check_should_build_join_for_measure_select(measures, key_cube_name)?; let source = if should_build_join_for_measure_select { - let measure_subquery = self.logical_aggregate_subquery_measure( + let measure_subquery = self.aggregate_subquery_measure( key_join.clone(), &measures, &primary_keys_dimensions, @@ -231,153 +158,6 @@ impl MultipliedMeasuresQueryPlanner { })) } - fn aggregate_subquery( - &self, - key_cube_name: &String, - measures: &Vec>, - key_join: Rc, - ) -> Result, CubeError> { - let subquery_dimensions = collect_sub_query_dimensions_from_members( - &BaseMemberHelper::iter_as_base_member(measures).collect_vec(), - &self.join_planner, - &key_join, - self.query_tools.clone(), - )?; - - let dimension_subquery_planner = DimensionSubqueryPlanner::try_new( - &subquery_dimensions, - self.query_tools.clone(), - self.query_properties.clone(), - )?; - - let primary_keys_dimensions = self - .common_utils - .primary_keys_dimensions(key_cube_name)? - .into_iter() - .map(|d| d.as_base_member()) - .collect_vec(); - let keys_query = - self.key_query(&primary_keys_dimensions, key_join.clone(), key_cube_name)?; - - let keys_query_alias = format!("keys"); - let should_build_join_for_measure_select = - self.check_should_build_join_for_measure_select(measures, key_cube_name)?; - - let mut join_builder = - JoinBuilder::new_from_subselect(keys_query.clone(), keys_query_alias.clone()); - - let pk_cube = self.common_utils.cube_from_path(key_cube_name.clone())?; - let pk_cube_alias = - pk_cube.default_alias_with_prefix(&Some(format!("{}_key", pk_cube.default_alias()))); - let mut ungrouped_measure_references = HashMap::new(); - if should_build_join_for_measure_select { - let subquery = self.aggregate_subquery_measure_join( - key_cube_name, - &measures, - &primary_keys_dimensions, - &dimension_subquery_planner, - )?; - - let conditions = primary_keys_dimensions - .iter() - .map(|dim| { - let alias_in_keys_query = keys_query.schema().resolve_member_alias(dim); - let keys_query_ref = Expr::Reference(QualifiedColumnName::new( - Some(keys_query_alias.clone()), - alias_in_keys_query, - )); - let alias_in_subquery = subquery.schema().resolve_member_alias(dim); - let subquery_ref = Expr::Reference(QualifiedColumnName::new( - Some(pk_cube_alias.clone()), - alias_in_subquery, - )); - vec![(keys_query_ref, subquery_ref)] - }) - .collect_vec(); - - for meas in measures.iter() { - ungrouped_measure_references.insert( - meas.full_name(), - QualifiedColumnName::new( - Some(pk_cube_alias.clone()), - subquery - .schema() - .resolve_member_alias(&meas.clone().as_base_member()), - ), - ); - } - - join_builder.left_join_subselect( - subquery, - pk_cube_alias.clone(), - JoinCondition::new_dimension_join(conditions, false), - ); - } else { - let conditions = primary_keys_dimensions - .iter() - .map(|dim| { - let alias_in_keys_query = keys_query.schema().resolve_member_alias(dim); - let keys_query_ref = Expr::Reference(QualifiedColumnName::new( - Some(keys_query_alias.clone()), - alias_in_keys_query, - )); - let pk_cube_expr = Expr::Member(MemberExpression::new(dim.clone())); - vec![(keys_query_ref, pk_cube_expr)] - }) - .collect_vec(); - join_builder.left_join_cube( - pk_cube.clone(), - Some(pk_cube_alias.clone()), - JoinCondition::new_dimension_join(conditions, false), - ); - for sub_dim in subquery_dimensions.iter() { - dimension_subquery_planner.add_join(&mut join_builder, sub_dim.clone())?; - } - }; - - let from = From::new_from_join(join_builder.build()); - let references_builder = ReferencesBuilder::new(from.clone()); - let mut select_builder = SelectBuilder::new(from.clone()); - let mut render_references = dimension_subquery_planner.dimensions_refs().clone(); - for member in self - .query_properties - .all_dimensions_and_measures(&vec![])? - .iter() - { - references_builder.resolve_references_for_member( - member.member_evaluator(), - &None, - &mut render_references, - )?; - let alias = references_builder.resolve_alias_for_member(&member.full_name(), &None); - select_builder.add_projection_member(member, alias); - } - for member in BaseMemberHelper::iter_as_base_member(&measures) { - let alias = if !should_build_join_for_measure_select { - references_builder.resolve_references_for_member( - member.member_evaluator(), - &None, - &mut render_references, - )?; - references_builder.resolve_alias_for_member(&member.full_name(), &None) - } else { - None - }; - select_builder.add_projection_member(&member, alias); - } - select_builder.set_group_by(self.query_properties.group_by()); - let mut context_factory = self.context_factory.clone(); - context_factory.set_render_references(render_references); - context_factory.set_ungrouped_measure_references(ungrouped_measure_references); - context_factory.set_rendered_as_multiplied_measures( - self.full_key_aggregate_measures - .rendered_as_multiplied_measures - .clone(), - ); - let res = Rc::new(select_builder.build(context_factory)); - Ok(res) - } - fn check_should_build_join_for_measure_select( &self, measures: &Vec>, @@ -402,7 +182,7 @@ impl MultipliedMeasuresQueryPlanner { Ok(false) } - fn logical_aggregate_subquery_measure( + fn aggregate_subquery_measure( &self, key_join: Rc, measures: &Vec>, @@ -420,7 +200,7 @@ impl MultipliedMeasuresQueryPlanner { self.query_properties.clone(), )?; let subquery_dimension_queries = - dimension_subquery_planner.plan_logical_queries(&subquery_dimensions)?; + dimension_subquery_planner.plan_queries(&subquery_dimensions)?; let join_hints = collect_join_hints_for_measures(measures)?; let source = self .join_planner @@ -441,42 +221,7 @@ impl MultipliedMeasuresQueryPlanner { Ok(Rc::new(result)) } - fn aggregate_subquery_measure_join( - &self, - _key_cube_name: &String, - measures: &Vec>, - primary_keys_dimensions: &Vec>, - dimension_subquery_planner: &DimensionSubqueryPlanner, - ) -> Result, CubeError> { - let join_hints = collect_join_hints_for_measures(measures)?; - let from = self - .join_planner - .make_join_node_with_prefix_and_join_hints( - &None, - join_hints, - &dimension_subquery_planner, - )?; - let mut context_factory = self.context_factory.clone(); - context_factory.set_ungrouped_measure(true); - context_factory.set_render_references(dimension_subquery_planner.dimensions_refs().clone()); - - context_factory.set_rendered_as_multiplied_measures( - self.full_key_aggregate_measures - .rendered_as_multiplied_measures - .clone(), - ); - - let mut select_builder = SelectBuilder::new(from); - for dim in primary_keys_dimensions.iter() { - select_builder.add_projection_member(dim, None); - } - for meas in measures.iter() { - select_builder.add_projection_member(&meas.clone().as_base_member(), None); - } - Ok(Rc::new(select_builder.build(context_factory))) - } - - fn regular_measures_subquery_logical_plan( + fn regular_measures_subquery( &self, measures: &Vec>, join: Rc, @@ -502,7 +247,7 @@ impl MultipliedMeasuresQueryPlanner { self.query_properties.clone(), )?; let subquery_dimension_queries = - dimension_subquery_planner.plan_logical_queries(&subquery_dimensions)?; + dimension_subquery_planner.plan_queries(&subquery_dimensions)?; let source = self.join_planner.make_join_logical_plan(join)?; @@ -535,56 +280,8 @@ impl MultipliedMeasuresQueryPlanner { }; Ok(Rc::new(query)) } - fn regular_measures_subquery( - &self, - measures: &Vec>, - join: Rc, - alias_prefix: String, - ) -> Result, CubeError> { - let subquery_dimensions = collect_sub_query_dimensions_from_symbols( - &self.query_properties.all_member_symbols(false), - &self.join_planner, - &join, - self.query_tools.clone(), - )?; - - let dimension_subquery_planner = DimensionSubqueryPlanner::try_new( - &subquery_dimensions, - self.query_tools.clone(), - self.query_properties.clone(), - )?; - let source = self.join_planner.make_join_node_impl( - &Some(alias_prefix), - join, - &dimension_subquery_planner, - )?; - - let mut select_builder = SelectBuilder::new(source.clone()); - let mut context_factory = self.context_factory.clone(); - for member in self - .query_properties - .all_dimensions_and_measures(&measures)? - .iter() - { - select_builder.add_projection_member(member, None); - } - let filter = self.query_properties.all_filters(); - select_builder.set_filter(filter); - select_builder.set_group_by(self.query_properties.group_by()); - - let render_references = dimension_subquery_planner.dimensions_refs().clone(); - context_factory.set_render_references(render_references); - context_factory.set_rendered_as_multiplied_measures( - self.full_key_aggregate_measures - .rendered_as_multiplied_measures - .clone(), - ); - - Ok(Rc::new(select_builder.build(context_factory))) - } - - fn key_query_logical_plan( + fn key_query( &self, dimensions: &Vec>, key_join: Rc, @@ -614,7 +311,7 @@ impl MultipliedMeasuresQueryPlanner { self.query_properties.clone(), )?; let subquery_dimension_queries = - dimension_subquery_planner.plan_logical_queries(&subquery_dimensions)?; + dimension_subquery_planner.plan_queries(&subquery_dimensions)?; let logical_filter = Rc::new(LogicalFilter { dimensions_filters: self.query_properties.dimensions_filters().clone(), @@ -635,66 +332,4 @@ impl MultipliedMeasuresQueryPlanner { Ok(Rc::new(keys_query)) } - - fn key_query( - &self, - dimensions: &Vec>, - key_join: Rc, - key_cube_name: &String, - ) -> Result, CubeError> { - let dimensions = self - .query_properties - .dimensions_for_select_append(dimensions); - - let mut symbols_for_subquery_dimensions = - BaseMemberHelper::extract_symbols_from_members(&dimensions); - for item in self.query_properties.dimensions_filters() { - item.find_all_member_evaluators(&mut symbols_for_subquery_dimensions); - } - - for item in self.query_properties.measures_filters() { - item.find_all_member_evaluators(&mut symbols_for_subquery_dimensions); - } - - let symbols_for_subquery_dimensions = symbols_for_subquery_dimensions - .into_iter() - .unique_by(|m| m.full_name()) - .collect_vec(); - - let subquery_dimensions = collect_sub_query_dimensions_from_symbols( - &symbols_for_subquery_dimensions, - &self.join_planner, - &key_join, - self.query_tools.clone(), - )?; - - let dimension_subquery_planner = DimensionSubqueryPlanner::try_new( - &subquery_dimensions, - self.query_tools.clone(), - self.query_properties.clone(), - )?; - - let source = self.join_planner.make_join_node_impl( - &Some(format!( - "{}_key", - self.query_tools.alias_for_cube(key_cube_name)? - )), - key_join, - &dimension_subquery_planner, - )?; - - let mut select_builder = SelectBuilder::new(source); - let mut context_factory = self.context_factory.clone(); - - context_factory.set_render_references(dimension_subquery_planner.dimensions_refs().clone()); - - for member in dimensions.iter() { - select_builder.add_projection_member(&member, None); - } - select_builder.set_distinct(); - select_builder.set_filter(self.query_properties.all_filters()); - - let res = Rc::new(select_builder.build(context_factory)); - Ok(res) - } } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/query_planner.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/query_planner.rs index 0b1e2c9d095aa..c96372725db86 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/query_planner.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/query_planner.rs @@ -3,11 +3,7 @@ use super::{ SimpleQueryPlanner, }; use crate::logical_plan::*; -use crate::physical_plan_builder::PhysicalPlanBuilder; -use crate::plan::Select; use crate::planner::query_tools::QueryTools; -use crate::planner::sql_evaluator::sql_nodes::SqlNodesFactory; -use crate::planner::sql_templates::PlanSqlTemplates; use crate::planner::QueryProperties; use cubenativeutils::CubeError; use std::rc::Rc; @@ -15,7 +11,6 @@ use std::rc::Rc; pub struct QueryPlanner { query_tools: Rc, request: Rc, - context_factory: Option, } impl QueryPlanner { @@ -23,63 +18,23 @@ impl QueryPlanner { Self { request, query_tools, - context_factory: None, } } - pub fn new_with_context_factory( - request: Rc, - query_tools: Rc, - context_factory: SqlNodesFactory, - ) -> Self { - Self { - request, - query_tools, - context_factory: Some(context_factory), - } - } - - pub fn plan(&self) -> Result, CubeError> { - let templates = PlanSqlTemplates::new(self.query_tools.templates_render()); - self.build_sql_impl(templates) - } - - pub fn plan_logical(&self) -> Result, CubeError> { - let mut nodes_factory = if let Some(context_factory) = &self.context_factory { - context_factory.clone() - } else { - SqlNodesFactory::new() - }; - - if self.request.ungrouped() { - nodes_factory.set_ungrouped(true) - } - + pub fn plan(&self) -> Result, CubeError> { if self.request.is_simple_query()? { - let planner = SimpleQueryPlanner::new( - self.query_tools.clone(), - self.request.clone(), - nodes_factory.clone(), - ); - planner.logical_plan() + let planner = SimpleQueryPlanner::new(self.query_tools.clone(), self.request.clone()); + planner.plan() } else { let request = self.request.clone(); - let multiplied_measures_query_planner = MultipliedMeasuresQueryPlanner::try_new( - self.query_tools.clone(), - request.clone(), - nodes_factory.clone(), - )?; + let multiplied_measures_query_planner = + MultipliedMeasuresQueryPlanner::try_new(self.query_tools.clone(), request.clone())?; let multi_stage_query_planner = MultiStageQueryPlanner::new(self.query_tools.clone(), request.clone()); - let templates = PlanSqlTemplates::new(self.query_tools.templates_render()); - let full_key_aggregate_planner = FullKeyAggregateQueryPlanner::new( - request.clone(), - nodes_factory.clone(), - templates, - ); - let multiplied_resolver = multiplied_measures_query_planner.plan_logical_queries()?; + let full_key_aggregate_planner = FullKeyAggregateQueryPlanner::new(request.clone()); + let multiplied_resolver = multiplied_measures_query_planner.plan_queries()?; let (multi_stage_members, multi_stage_refs) = - multi_stage_query_planner.plan_logical_queries()?; + multi_stage_query_planner.plan_queries()?; let result = full_key_aggregate_planner.plan_logical_plan( Some(multiplied_resolver), @@ -90,48 +45,4 @@ impl QueryPlanner { Ok(result) } } - - fn build_sql_impl(&self, templates: PlanSqlTemplates) -> Result, CubeError> { - let mut nodes_factory = if let Some(context_factory) = &self.context_factory { - context_factory.clone() - } else { - SqlNodesFactory::new() - }; - - if self.request.ungrouped() { - nodes_factory.set_ungrouped(true) - } - - if self.request.is_simple_query()? { - let planner = SimpleQueryPlanner::new( - self.query_tools.clone(), - self.request.clone(), - nodes_factory.clone(), - ); - planner.plan() - } else { - let request = self.request.clone(); - let multiplied_measures_query_planner = MultipliedMeasuresQueryPlanner::try_new( - self.query_tools.clone(), - request.clone(), - nodes_factory.clone(), - )?; - let multi_stage_query_planner = - MultiStageQueryPlanner::new(self.query_tools.clone(), request.clone()); - let full_key_aggregate_planner = FullKeyAggregateQueryPlanner::new( - request.clone(), - nodes_factory.clone(), - templates, - ); - let mut subqueries = multiplied_measures_query_planner.plan_queries()?; - - let (multi_stage_ctes, multi_stage_subqueries) = - multi_stage_query_planner.plan_queries()?; - - subqueries.extend(multi_stage_subqueries.into_iter()); - let result = full_key_aggregate_planner.plan(subqueries, multi_stage_ctes)?; - - Ok(result) - } - } } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/simple_query_planer.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/simple_query_planer.rs index c22d540bdef20..db5c6b886a53a 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/simple_query_planer.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/simple_query_planer.rs @@ -1,39 +1,27 @@ -use super::{DimensionSubqueryPlanner, JoinPlanner, OrderPlanner}; +use super::{DimensionSubqueryPlanner, JoinPlanner}; use crate::logical_plan::*; -use crate::physical_plan_builder::*; -use crate::plan::{Filter, QualifiedColumnName, Select, SelectBuilder}; use crate::planner::query_tools::QueryTools; use crate::planner::sql_evaluator::collectors::collect_sub_query_dimensions_from_symbols; -use crate::planner::sql_evaluator::sql_nodes::SqlNodesFactory; use crate::planner::QueryProperties; use cubenativeutils::CubeError; -use std::collections::HashMap; use std::rc::Rc; pub struct SimpleQueryPlanner { query_tools: Rc, query_properties: Rc, join_planner: JoinPlanner, - order_planner: OrderPlanner, - context_factory: SqlNodesFactory, } impl SimpleQueryPlanner { - pub fn new( - query_tools: Rc, - query_properties: Rc, - context_factory: SqlNodesFactory, - ) -> Self { + pub fn new(query_tools: Rc, query_properties: Rc) -> Self { Self { join_planner: JoinPlanner::new(query_tools.clone()), - order_planner: OrderPlanner::new(query_properties.clone()), query_properties, - context_factory, query_tools, } } - pub fn logical_plan(&self) -> Result, CubeError> { - let (source, subquery_dimension_queries) = self.logical_source_and_subquery_dimensions()?; + pub fn plan(&self) -> Result, CubeError> { + let (source, subquery_dimension_queries) = self.source_and_subquery_dimensions()?; let multiplied_measures = self .query_properties @@ -65,7 +53,7 @@ impl SimpleQueryPlanner { Ok(Rc::new(Query::SimpleQuery(result))) } - pub fn logical_source_and_subquery_dimensions( + pub fn source_and_subquery_dimensions( &self, ) -> Result<(Rc, Vec>), CubeError> { let join = self.query_properties.simple_query_join()?; @@ -81,71 +69,8 @@ impl SimpleQueryPlanner { self.query_properties.clone(), )?; let subquery_dimension_queries = - dimension_subquery_planner.plan_logical_queries(&subquery_dimensions)?; + dimension_subquery_planner.plan_queries(&subquery_dimensions)?; let source = self.join_planner.make_join_logical_plan(join.clone())?; Ok((source, subquery_dimension_queries)) } - - pub fn plan(&self) -> Result, CubeError> { - self.old_plan() - } - pub fn old_plan(&self) -> Result, CubeError> { - let (mut select_builder, render_references) = self.make_select_builder()?; - - let filter = self.query_properties.all_filters(); - let having = if self.query_properties.measures_filters().is_empty() { - None - } else { - Some(Filter { - items: self.query_properties.measures_filters().clone(), - }) - }; - let mut context_factory = self.context_factory.clone(); - - for member in self - .query_properties - .all_dimensions_and_measures(self.query_properties.measures())? - .iter() - { - select_builder.add_projection_member(member, None); - } - context_factory.set_render_references(render_references); - context_factory.set_rendered_as_multiplied_measures( - self.query_properties - .full_key_aggregate_measures()? - .rendered_as_multiplied_measures - .clone(), - ); - select_builder.set_filter(filter); - select_builder.set_group_by(self.query_properties.group_by()); - select_builder.set_order_by(self.order_planner.default_order()); - select_builder.set_having(having); - select_builder.set_limit(self.query_properties.row_limit()); - select_builder.set_offset(self.query_properties.offset()); - let res = Rc::new(select_builder.build(context_factory)); - Ok(res) - } - - pub fn make_select_builder( - &self, - ) -> Result<(SelectBuilder, HashMap), CubeError> { - let join = self.query_properties.simple_query_join().unwrap(); - let subquery_dimensions = collect_sub_query_dimensions_from_symbols( - &self.query_properties.all_member_symbols(false), - &self.join_planner, - &join, - self.query_tools.clone(), - )?; - let dimension_subquery_planner = DimensionSubqueryPlanner::try_new( - &subquery_dimensions, - self.query_tools.clone(), - self.query_properties.clone(), - )?; - let from = - self.join_planner - .make_join_node_impl(&None, join, &dimension_subquery_planner)?; - let render_references = dimension_subquery_planner.dimensions_refs().clone(); - let select_builder = SelectBuilder::new(from); - Ok((select_builder, render_references)) - } } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/symbols/time_dimension_symbol.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/symbols/time_dimension_symbol.rs index f87eeefe1a0ea..4868a552b80a7 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/symbols/time_dimension_symbol.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/symbols/time_dimension_symbol.rs @@ -1,5 +1,4 @@ use super::MemberSymbol; -use crate::planner::query_tools::QueryTools; use crate::planner::time_dimension::Granularity; use crate::planner::QueryDateTime; use chrono_tz::Tz; From 37e247966a7b1b64bd9c0a1ac81323c3835f40a9 Mon Sep 17 00:00:00 2001 From: Alexandr Romanenko Date: Sat, 26 Apr 2025 14:55:18 +0200 Subject: [PATCH 4/4] fix --- .../cubesqlplanner/src/physical_plan_builder/builder.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/builder.rs b/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/builder.rs index e3b3441f2b8bc..8ca076b70bd75 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/builder.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/builder.rs @@ -872,13 +872,13 @@ impl PhysicalPlanBuilder { } }; - let time_seira = TimeSeries::new( + let time_series = TimeSeries::new( self.query_tools.clone(), time_dimension.full_name(), ts_date_range, granularity_obj, ); - let query_plan = Rc::new(QueryPlan::TimeSeries(Rc::new(time_seira))); + let query_plan = Rc::new(QueryPlan::TimeSeries(Rc::new(time_series))); Ok(query_plan) } @@ -973,13 +973,13 @@ impl PhysicalPlanBuilder { let mut render_references = HashMap::new(); let mut select_builder = SelectBuilder::new(from.clone()); - //We insert render reference for main time dimension (with the some granularity as in time series to avoid unnessesary date_tranc) + //We insert render reference for main time dimension (with the some granularity as in time series to avoid unnecessary date_tranc) render_references.insert( time_dimension.full_name(), QualifiedColumnName::new(Some(root_alias.clone()), format!("date_from")), ); - //We also insert render reference for the base dimension of time dimension (i.e. without `_granularity` prefix to let other time dimensions make date_tranc) + //We also insert render reference for the base dimension of the time dimension (i.e. without `_granularity` prefix to let other time dimensions make date_tranc) render_references.insert( time_dimension .as_time_dimension()?