From 2bd16694c1388ee45bcb9e69ceaa08b5b8c9ac1c Mon Sep 17 00:00:00 2001 From: Mikhail Cheshkov Date: Tue, 15 Apr 2025 16:48:20 +0200 Subject: [PATCH 1/2] refactor(cubesql): Refactor FilterPushDown tests --- .../engine/df/optimizers/filter_push_down.rs | 77 ++----------------- ..._filter_down_cross_join_right_one_row.snap | 13 ++++ ...h_down__tests__filter_down_projection.snap | 8 ++ ...er_push_down__tests__filter_down_sort.snap | 8 ++ ...h_down__tests__filters_down_aggregate.snap | 10 +++ ...ts__multiple_filters_down_projections.snap | 11 +++ ...down_projections_with_post_processing.snap | 12 +++ 7 files changed, 70 insertions(+), 69 deletions(-) create mode 100644 rust/cubesql/cubesql/src/compile/engine/df/optimizers/snapshots/cubesql__compile__engine__df__optimizers__filter_push_down__tests__filter_down_cross_join_right_one_row.snap create mode 100644 rust/cubesql/cubesql/src/compile/engine/df/optimizers/snapshots/cubesql__compile__engine__df__optimizers__filter_push_down__tests__filter_down_projection.snap create mode 100644 rust/cubesql/cubesql/src/compile/engine/df/optimizers/snapshots/cubesql__compile__engine__df__optimizers__filter_push_down__tests__filter_down_sort.snap create mode 100644 rust/cubesql/cubesql/src/compile/engine/df/optimizers/snapshots/cubesql__compile__engine__df__optimizers__filter_push_down__tests__filters_down_aggregate.snap create mode 100644 rust/cubesql/cubesql/src/compile/engine/df/optimizers/snapshots/cubesql__compile__engine__df__optimizers__filter_push_down__tests__multiple_filters_down_projections.snap create mode 100644 rust/cubesql/cubesql/src/compile/engine/df/optimizers/snapshots/cubesql__compile__engine__df__optimizers__filter_push_down__tests__multiple_filters_down_projections_with_post_processing.snap diff --git a/rust/cubesql/cubesql/src/compile/engine/df/optimizers/filter_push_down.rs b/rust/cubesql/cubesql/src/compile/engine/df/optimizers/filter_push_down.rs index c615fbc83f6ca..7323d45461bbb 100644 --- a/rust/cubesql/cubesql/src/compile/engine/df/optimizers/filter_push_down.rs +++ b/rust/cubesql/cubesql/src/compile/engine/df/optimizers/filter_push_down.rs @@ -692,15 +692,10 @@ mod tests { }; use datafusion::logical_plan::{binary_expr, col, count, lit, sum, LogicalPlanBuilder}; - fn optimize(plan: &LogicalPlan) -> Result { + fn optimize(plan: &LogicalPlan) -> LogicalPlan { let rule = FilterPushDown::new(); rule.optimize(plan, &OptimizerConfig::new()) - } - - fn assert_optimized_plan_eq(plan: LogicalPlan, expected: &str) { - let optimized_plan = optimize(&plan).expect("failed to optimize plan"); - let formatted_plan = format!("{:?}", optimized_plan); - assert_eq!(formatted_plan, expected); + .expect("failed to optimize plan") } #[test] @@ -714,14 +709,7 @@ mod tests { .filter(col("t2.n2").gt(lit(5i32)))? .build()?; - let expected = "\ - Projection: #t1.c1 AS n1, #t1.c3 AS n2, alias=t2\ - \n Filter: #t1.c3 > Int32(5)\ - \n Projection: #t1.c1, #t1.c3\ - \n TableScan: t1 projection=None\ - "; - - assert_optimized_plan_eq(plan, expected); + insta::assert_debug_snapshot!(optimize(&plan)); Ok(()) } @@ -752,17 +740,7 @@ mod tests { .project(vec![col("c7"), col("c5"), col("c9")])? .build()?; - let expected = "\ - Projection: #t3.c7, #t3.c5, #c9\ - \n Projection: #t3.c7, #t3.c5, #t3.c8 AS c9\ - \n Projection: #t2.c4 AS c7, #t2.c5, #t2.c6 AS c8, alias=t3\ - \n Projection: #t1.c1 AS c4, #t1.c2 AS c5, #t1.c3 AS c6, alias=t2\ - \n Filter: #t1.c2 > Int32(5) AND #t1.c2 <= Int32(10) AND #t1.c3 = Int32(0) AND NOT #t1.c1 < Int32(0)\ - \n Projection: #t1.c1, #t1.c2, #t1.c3\ - \n TableScan: t1 projection=None\ - "; - - assert_optimized_plan_eq(plan, expected); + insta::assert_debug_snapshot!(optimize(&plan)); Ok(()) } @@ -782,18 +760,7 @@ mod tests { .project(vec![col("c1"), col("c2"), col("c3")])? .build()?; - let expected = "\ - Projection: #t1.c1, #c2, #t1.c3\ - \n Filter: #t1.c1 > #t1.c3\ - \n Projection: #t1.c1, #c2, #t1.c3\ - \n Filter: #c2 = Int32(5)\ - \n Projection: #t1.c1, #t1.c2 + Int32(5) AS c2, #t1.c3\ - \n Filter: #t1.c3 < Int32(5)\ - \n Projection: #t1.c1, #t1.c2, #t1.c3\ - \n TableScan: t1 projection=None\ - "; - - assert_optimized_plan_eq(plan, expected); + insta::assert_debug_snapshot!(optimize(&plan)); Ok(()) } @@ -847,16 +814,7 @@ mod tests { .filter(col("c3").eq(lit(0i32)))? .build()?; - let expected = "\ - Projection: #t1.c1, #SUM(t1.c2) AS c2_sum, #t1.c3\ - \n Filter: #SUM(t1.c2) > Int32(10)\ - \n Aggregate: groupBy=[[#t1.c1, #t1.c3]], aggr=[[SUM(#t1.c2)]]\ - \n Filter: #t1.c3 = Int32(0)\ - \n Projection: #t1.c1, #t1.c2, #t1.c3\ - \n TableScan: t1 projection=None\ - "; - - assert_optimized_plan_eq(plan, expected); + insta::assert_debug_snapshot!(optimize(&plan)); Ok(()) } @@ -897,14 +855,7 @@ mod tests { .filter(col("c3").eq(lit(5i32)))? .build()?; - let expected = "\ - Sort: #t1.c2\ - \n Filter: #t1.c3 = Int32(5)\ - \n Projection: #t1.c1, #t1.c2, #t1.c3\ - \n TableScan: t1 projection=None\ - "; - - assert_optimized_plan_eq(plan, expected); + insta::assert_debug_snapshot!(optimize(&plan)); Ok(()) } @@ -998,19 +949,7 @@ mod tests { .filter(col("c2").eq(lit(10i32)))? .build()?; - let expected = "\ - Filter: #j2.c2 = Int32(10)\ - \n CrossJoin:\ - \n Filter: #j1.c1 = Int32(5)\ - \n Projection: #j1.c1\ - \n TableScan: j1 projection=None\ - \n Projection: #COUNT(UInt8(1)) AS c2, alias=j2\ - \n Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]\ - \n Projection: #j2.c2\ - \n TableScan: j2 projection=None\ - "; - - assert_optimized_plan_eq(plan, expected); + insta::assert_debug_snapshot!(optimize(&plan)); Ok(()) } diff --git a/rust/cubesql/cubesql/src/compile/engine/df/optimizers/snapshots/cubesql__compile__engine__df__optimizers__filter_push_down__tests__filter_down_cross_join_right_one_row.snap b/rust/cubesql/cubesql/src/compile/engine/df/optimizers/snapshots/cubesql__compile__engine__df__optimizers__filter_push_down__tests__filter_down_cross_join_right_one_row.snap new file mode 100644 index 0000000000000..f41d557f9968e --- /dev/null +++ b/rust/cubesql/cubesql/src/compile/engine/df/optimizers/snapshots/cubesql__compile__engine__df__optimizers__filter_push_down__tests__filter_down_cross_join_right_one_row.snap @@ -0,0 +1,13 @@ +--- +source: cubesql/src/compile/engine/df/optimizers/filter_push_down.rs +expression: optimize(&plan) +--- +Filter: #j2.c2 = Int32(10) + CrossJoin: + Filter: #j1.c1 = Int32(5) + Projection: #j1.c1 + TableScan: j1 projection=None + Projection: #COUNT(UInt8(1)) AS c2, alias=j2 + Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]] + Projection: #j2.c2 + TableScan: j2 projection=None diff --git a/rust/cubesql/cubesql/src/compile/engine/df/optimizers/snapshots/cubesql__compile__engine__df__optimizers__filter_push_down__tests__filter_down_projection.snap b/rust/cubesql/cubesql/src/compile/engine/df/optimizers/snapshots/cubesql__compile__engine__df__optimizers__filter_push_down__tests__filter_down_projection.snap new file mode 100644 index 0000000000000..629241960b8fd --- /dev/null +++ b/rust/cubesql/cubesql/src/compile/engine/df/optimizers/snapshots/cubesql__compile__engine__df__optimizers__filter_push_down__tests__filter_down_projection.snap @@ -0,0 +1,8 @@ +--- +source: cubesql/src/compile/engine/df/optimizers/filter_push_down.rs +expression: optimize(&plan) +--- +Projection: #t1.c1 AS n1, #t1.c3 AS n2, alias=t2 + Filter: #t1.c3 > Int32(5) + Projection: #t1.c1, #t1.c3 + TableScan: t1 projection=None diff --git a/rust/cubesql/cubesql/src/compile/engine/df/optimizers/snapshots/cubesql__compile__engine__df__optimizers__filter_push_down__tests__filter_down_sort.snap b/rust/cubesql/cubesql/src/compile/engine/df/optimizers/snapshots/cubesql__compile__engine__df__optimizers__filter_push_down__tests__filter_down_sort.snap new file mode 100644 index 0000000000000..46e06a5733147 --- /dev/null +++ b/rust/cubesql/cubesql/src/compile/engine/df/optimizers/snapshots/cubesql__compile__engine__df__optimizers__filter_push_down__tests__filter_down_sort.snap @@ -0,0 +1,8 @@ +--- +source: cubesql/src/compile/engine/df/optimizers/filter_push_down.rs +expression: optimize(&plan) +--- +Sort: #t1.c2 + Filter: #t1.c3 = Int32(5) + Projection: #t1.c1, #t1.c2, #t1.c3 + TableScan: t1 projection=None diff --git a/rust/cubesql/cubesql/src/compile/engine/df/optimizers/snapshots/cubesql__compile__engine__df__optimizers__filter_push_down__tests__filters_down_aggregate.snap b/rust/cubesql/cubesql/src/compile/engine/df/optimizers/snapshots/cubesql__compile__engine__df__optimizers__filter_push_down__tests__filters_down_aggregate.snap new file mode 100644 index 0000000000000..34bc7e1b48959 --- /dev/null +++ b/rust/cubesql/cubesql/src/compile/engine/df/optimizers/snapshots/cubesql__compile__engine__df__optimizers__filter_push_down__tests__filters_down_aggregate.snap @@ -0,0 +1,10 @@ +--- +source: cubesql/src/compile/engine/df/optimizers/filter_push_down.rs +expression: optimize(&plan) +--- +Projection: #t1.c1, #SUM(t1.c2) AS c2_sum, #t1.c3 + Filter: #SUM(t1.c2) > Int32(10) + Aggregate: groupBy=[[#t1.c1, #t1.c3]], aggr=[[SUM(#t1.c2)]] + Filter: #t1.c3 = Int32(0) + Projection: #t1.c1, #t1.c2, #t1.c3 + TableScan: t1 projection=None diff --git a/rust/cubesql/cubesql/src/compile/engine/df/optimizers/snapshots/cubesql__compile__engine__df__optimizers__filter_push_down__tests__multiple_filters_down_projections.snap b/rust/cubesql/cubesql/src/compile/engine/df/optimizers/snapshots/cubesql__compile__engine__df__optimizers__filter_push_down__tests__multiple_filters_down_projections.snap new file mode 100644 index 0000000000000..25fe37423d0b1 --- /dev/null +++ b/rust/cubesql/cubesql/src/compile/engine/df/optimizers/snapshots/cubesql__compile__engine__df__optimizers__filter_push_down__tests__multiple_filters_down_projections.snap @@ -0,0 +1,11 @@ +--- +source: cubesql/src/compile/engine/df/optimizers/filter_push_down.rs +expression: optimize(&plan) +--- +Projection: #t3.c7, #t3.c5, #c9 + Projection: #t3.c7, #t3.c5, #t3.c8 AS c9 + Projection: #t2.c4 AS c7, #t2.c5, #t2.c6 AS c8, alias=t3 + Projection: #t1.c1 AS c4, #t1.c2 AS c5, #t1.c3 AS c6, alias=t2 + Filter: #t1.c2 > Int32(5) AND #t1.c2 <= Int32(10) AND #t1.c3 = Int32(0) AND NOT #t1.c1 < Int32(0) + Projection: #t1.c1, #t1.c2, #t1.c3 + TableScan: t1 projection=None diff --git a/rust/cubesql/cubesql/src/compile/engine/df/optimizers/snapshots/cubesql__compile__engine__df__optimizers__filter_push_down__tests__multiple_filters_down_projections_with_post_processing.snap b/rust/cubesql/cubesql/src/compile/engine/df/optimizers/snapshots/cubesql__compile__engine__df__optimizers__filter_push_down__tests__multiple_filters_down_projections_with_post_processing.snap new file mode 100644 index 0000000000000..c0ca13ab9f05d --- /dev/null +++ b/rust/cubesql/cubesql/src/compile/engine/df/optimizers/snapshots/cubesql__compile__engine__df__optimizers__filter_push_down__tests__multiple_filters_down_projections_with_post_processing.snap @@ -0,0 +1,12 @@ +--- +source: cubesql/src/compile/engine/df/optimizers/filter_push_down.rs +expression: optimize(&plan) +--- +Projection: #t1.c1, #c2, #t1.c3 + Filter: #t1.c1 > #t1.c3 + Projection: #t1.c1, #c2, #t1.c3 + Filter: #c2 = Int32(5) + Projection: #t1.c1, #t1.c2 + Int32(5) AS c2, #t1.c3 + Filter: #t1.c3 < Int32(5) + Projection: #t1.c1, #t1.c2, #t1.c3 + TableScan: t1 projection=None From 4d9020c619ddbe71447cebd85f2f3885a3fff793 Mon Sep 17 00:00:00 2001 From: Mikhail Cheshkov Date: Fri, 11 Apr 2025 20:44:23 +0200 Subject: [PATCH 2/2] fix(cubesql): Disable filter pushdown over Filter(CrossJoin) This should help with rewrites filters on top of complex ungrouped-grouped joins as a subquery joins --- .../engine/df/optimizers/filter_push_down.rs | 9 +++ ..._filter_down_cross_join_right_one_row.snap | 12 ++-- .../compile/test/test_cube_join_grouped.rs | 60 +++++++++++++++++++ 3 files changed, 75 insertions(+), 6 deletions(-) diff --git a/rust/cubesql/cubesql/src/compile/engine/df/optimizers/filter_push_down.rs b/rust/cubesql/cubesql/src/compile/engine/df/optimizers/filter_push_down.rs index 7323d45461bbb..5ee3d2753b6c3 100644 --- a/rust/cubesql/cubesql/src/compile/engine/df/optimizers/filter_push_down.rs +++ b/rust/cubesql/cubesql/src/compile/engine/df/optimizers/filter_push_down.rs @@ -111,6 +111,15 @@ fn filter_push_down( ) } LogicalPlan::Filter(Filter { predicate, input }) => { + // Current DataFusion version plans complex joins as Filter(CrossJoin) + // So for query like `SELECT ... FROM ... JOIN ... ON complex_condition WHERE predicate` + // Plan can look like Filter(predicate, Filter(join_condition, CrossJoin)) + // This optimizer can mess with filter predicates, and break join detection later in rewrites + // So, for now, it just completely pessimizes plans like Filter(CrossJoin) + if let LogicalPlan::CrossJoin(_) = input.as_ref() { + return issue_filter(predicates, plan.clone()); + } + // When encountering a filter, collect it to our list of predicates, // remove the filter from the plan and continue down the plan. diff --git a/rust/cubesql/cubesql/src/compile/engine/df/optimizers/snapshots/cubesql__compile__engine__df__optimizers__filter_push_down__tests__filter_down_cross_join_right_one_row.snap b/rust/cubesql/cubesql/src/compile/engine/df/optimizers/snapshots/cubesql__compile__engine__df__optimizers__filter_push_down__tests__filter_down_cross_join_right_one_row.snap index f41d557f9968e..f045df41a4fcf 100644 --- a/rust/cubesql/cubesql/src/compile/engine/df/optimizers/snapshots/cubesql__compile__engine__df__optimizers__filter_push_down__tests__filter_down_cross_join_right_one_row.snap +++ b/rust/cubesql/cubesql/src/compile/engine/df/optimizers/snapshots/cubesql__compile__engine__df__optimizers__filter_push_down__tests__filter_down_cross_join_right_one_row.snap @@ -3,11 +3,11 @@ source: cubesql/src/compile/engine/df/optimizers/filter_push_down.rs expression: optimize(&plan) --- Filter: #j2.c2 = Int32(10) - CrossJoin: - Filter: #j1.c1 = Int32(5) + Filter: #j1.c1 = Int32(5) + CrossJoin: Projection: #j1.c1 TableScan: j1 projection=None - Projection: #COUNT(UInt8(1)) AS c2, alias=j2 - Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]] - Projection: #j2.c2 - TableScan: j2 projection=None + Projection: #COUNT(UInt8(1)) AS c2, alias=j2 + Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]] + Projection: #j2.c2 + TableScan: j2 projection=None diff --git a/rust/cubesql/cubesql/src/compile/test/test_cube_join_grouped.rs b/rust/cubesql/cubesql/src/compile/test/test_cube_join_grouped.rs index 04ca392ebc79b..7e12061768455 100644 --- a/rust/cubesql/cubesql/src/compile/test/test_cube_join_grouped.rs +++ b/rust/cubesql/cubesql/src/compile/test/test_cube_join_grouped.rs @@ -899,3 +899,63 @@ LIMIT 1 .on .contains(r#"${MultiTypeCube.dim_str0} IS NOT DISTINCT FROM \"t0\".\"dim_str0\""#)); } + +/// Filter on top of ungrouped-grouped join with complex condition should be rewritten as well +#[tokio::test] +async fn test_join_ungrouped_grouped_with_filter_and_measure() { + if !Rewriter::sql_push_down_enabled() { + return; + } + init_testing_logger(); + + let query_plan = convert_select_to_query_plan( + // language=PostgreSQL + r#" +SELECT "t0"."measure" +FROM + MultiTypeCube + INNER JOIN ( + SELECT + dim_str0, + AVG(avgPrice) AS "measure" + FROM + MultiTypeCube + GROUP BY 1 + ) "t0" + ON (MultiTypeCube.dim_str0 IS NOT DISTINCT FROM "t0".dim_str0) +WHERE ("t0"."measure" IS NULL) +LIMIT 1 +; + "# + .to_string(), + DatabaseProtocol::PostgreSQL, + ) + .await; + + let physical_plan = query_plan.as_physical_plan().await.unwrap(); + println!( + "Physical plan: {}", + displayable(physical_plan.as_ref()).indent() + ); + + let request = query_plan + .as_logical_plan() + .find_cube_scan_wrapped_sql() + .request; + + assert_eq!(request.ungrouped, Some(true)); + + assert_eq!(request.subquery_joins.as_ref().unwrap().len(), 1); + + let subquery = &request.subquery_joins.unwrap()[0]; + + assert!(!subquery.sql.contains("ungrouped")); + assert_eq!(subquery.join_type, "INNER"); + assert!(subquery + .on + .contains(r#"${MultiTypeCube.dim_str0} IS NOT DISTINCT FROM \"t0\".\"dim_str0\""#)); + + // Outer filter + assert_eq!(request.segments.as_ref().unwrap().len(), 1); + assert!(request.segments.as_ref().unwrap()[0].contains(r#"\"t0\".\"measure\" IS NULL"#)); +}