A Very Basic Decorrelator
Today we're going to begin implementing a simple query decorrelator. We're not going to finish it in this post, and I'm not sure how many posts it will take, and I'm not sure all the posts for it will be contiguous. This post will be building on the code from Chapter 4: A Pushdown Party.
I'm going to omit a lot of details and bookkeeping on the assumption if you need any gaps filled in you can see all the code for this here. That said, you should probably have at least a basic grasp on how queries are processed in a relational database to follow along with this one. We're also going to be very incomplete: there's going to be lots of cases we don't manage to decorrelate. Also: I haven't subjected this code to the kind of rigorous testing that an actual production implementation would demand, so there might be bugs present! I don't typically like to write posts that have a lot of code in them, so this is an experiment. My hope is that as a toy it's an interesting artifact, even if it's incomplete and a bit buggy.
First of all, we're going to extend our little query language to even have the notion of subqueries. We'll make the following changes to Expr
and RelExpr
:
enum Expr {
ColRef { id: usize },
Int { val: i64 },
Eq { left: Box<Expr>, right: Box<Expr> },
Plus { left: Box<Expr>, right: Box<Expr> },
Subquery { expr: Box<RelExpr> },
}
enum RelExpr {
Scan {
table_name: String,
column_names: Vec<usize>,
},
Select {
src: Box<RelExpr>,
predicates: Vec<Expr>,
},
Join {
left: Box<RelExpr>,
right: Box<RelExpr>,
predicates: Vec<Expr>,
},
Project {
src: Box<RelExpr>,
cols: HashSet<usize>,
},
Map {
input: Box<RelExpr>,
exprs: Vec<(usize, Expr)>,
},
}
We've added to Expr
a Subquery
variant. A "subquery" is a relational expression that evaluates to a single column. If it evaluates to multiple rows, all those rows are returned. In any expression involving multiple subqueries that return multiple rows, we take the cross product of all their rows.
Map
takes an input relation, along with a list of (column name, expression) pairs. For each row in the input relation, each expression is evaluated and assigned the name and added to the row. If a given scalar expression returns multiple rows, we emit the input row multiple times, once for each input (taking the product for each such expression that returns multiple rows).
Those familiar with SQL might recognize these semantics as a departure from SQL, where a subquery returning multiple rows is an error:
> select (select * from (values (1), (2)))+(select * from (values (3), (4)));
ERROR: more than one row returned by a subquery used as an expression
These semantics are...annoying for decorrelation. There are ways around this problem, but they muddy the story for what we're doing today and we're just going to define our semantics differently to keep things simple.
In our version, this would be:
> select (select * from (values (1), (2)))+(select * from (values (3), (4)));
4
5
5
6
Even though the query only looks like it should return one row.
The point of decorrelation is this: evaluating an entire subquery per-row is expensive, and we'd like to avoid doing that. With some algebraic manipulation, we can usually turn these subqueries into joins.
We're going to add some extra state we need to shove around our computations: we're going to be generating new column names and we need a way to do so uniquely:
#[derive(Debug, Clone)]
struct State {
next_id: Rc<RefCell<usize>>,
}
impl State {
fn new() -> Self {
State {
next_id: Rc::new(RefCell::new(0)),
}
}
fn next(&self) -> usize {
let id = *self.next_id.borrow();
*self.next_id.borrow_mut() += 1;
id
}
}
With these changes, we can construct a query that contains a subquery:
fn main() {
let state = State::new();
let a = state.next();
let b = state.next();
let x = state.next();
let y = state.next();
let sum_col = state.next();
let join = RelExpr::scan("a".into(), vec![a, b]).map(
&state,
vec![(
state.next(),
Expr::int(4).plus(Expr::Subquery {
expr: Box::new(
RelExpr::scan("x".into(), vec![x, y])
.project(&state, [x].into_iter().collect())
.map(&state, [(sum_col, Expr::col_ref(x).plus(Expr::col_ref(a)))])
.project(&state, [sum_col].into_iter().collect()),
),
}),
),
],
);
let mut out = String::new();
join.print(0, &mut out);
println!("{}", out);
}
This renders like so (if this is hard to read, don't worry, seeing the matrix for understanding query plans like this takes some practice):
-> map(
@5 <- 4+λ.(
-> project({4})
-> map(
@4 <- @2+@0,
)
-> project({2})
-> scan("x", [2, 3])
),
)
-> scan("a", [0, 1])
Semantically, for each row in a
, we scan it, then scan all of x
, summing the first column of a
and the first column of x
, returning both columns of a
and the sum column.
Decorrelating is a two-step process.
Hoisting
The first step is hoisting. At a high level, hoisting is the process of going from "as part of the computation of this row, compute this value," to "for each row, compute this value." This seems like a superficial difference, but it will allow us to treat when that computation happens more algebraically.
To hoist, we need a new kind of RelExpr
:
FlatMap {
input: Box<RelExpr>,
func: Box<RelExpr>,
},
Intuitively, FlatMap
evaluates the right-hand expression for each row in the left hand side (where the right-hand side is allowed to refer to columns in the left-hand side), then emits the left row concatenated with each row computed from the right side. This is sometimes called "apply," "dependent join," or "lateral join." You can think of this like the functional programming function flatmap
.
To hoist Map
, we pull its expressions into a FlatMap
. Starting with a query like this:
-> map(
@5 <- λ.(
-> project({2})
-> scan("x", [2, 3])
),
)
-> scan("a", [0, 1])
We turn it into this:
-> flatmap
-> scan("a", [0, 1])
λ.{}
-> map(
@5 <- @2,
)
-> project({2})
-> scan("x", [2, 3])
Try to convince yourself this transformation is correct, it's not particularly obvious without some meditation.
Let's look at the implementation of the map
constructor:
fn map(self, state: &State, exprs: impl IntoIterator<Item = (usize, Expr)>) -> Self {
let mut exprs: Vec<_> = exprs.into_iter().collect();
if exprs.is_empty() {
return self;
}
for i in 0..exprs.len() {
// Only hoist expressions with subqueries.
if exprs[i].1.has_subquery() {
let (id, expr) = exprs.swap_remove(i);
return self.map(state, exprs).hoist(state, id, expr);
}
}
RelExpr::Map {
input: Box::new(self),
exprs,
}
}
And the implementation of hoist
is:
fn hoist(self, state: &State, id: usize, expr: Expr) -> Self {
match expr {
Expr::Subquery { expr } => {
let att = expr.att();
assert!(att.len() == 1);
let input_col_id = att.iter().next().unwrap();
// Give the column the name that's expected.
let rhs = expr.map(state, vec![(id, Expr::ColRef { id: *input_col_id })]);
self.flatmap(state, rhs)
}
x => unimplemented!("{:?}", x),
}
}
Once we have more complex expressions we want to hoist, things get a bit trickier. Let's return to our first query:
-> map(
@5 <- 4+λ.(
-> project({4})
-> map(
@4 <- @2+@0,
)
-> project({2})
-> scan("x", [2, 3])
),
)
-> scan("a", [0, 1])
We also need to be able to handle a subquery not at the root position. To hoist a plus, we need to hoist each of its inputs, then refer to them:
Expr::Plus { left, right } => {
// Hoist the left, hoist the right, then perform the plus.
let lhs_id = state.next();
let rhs_id = state.next();
self.hoist(state, lhs_id, *left)
.hoist(state, rhs_id, *right)
.map(
state,
[(
id,
Expr::Plus {
left: Box::new(Expr::ColRef { id: lhs_id }),
right: Box::new(Expr::ColRef { id: rhs_id }),
},
)],
)
// Throw away the extra columns.
.project(state, att.into_iter().chain([id].into_iter()).collect())
}
To hoist things like constants and column references, we don't need to do anything special, we can just map
them like usual:
Expr::Int { .. } | Expr::ColRef { .. } => self.map(state, vec![(id, expr)]),
Running this on the above query, we get:
-> map(
@5 <- @6+@7,
)
-> flatmap
-> map(
@6 <- 4,
)
-> scan("a", [0, 1])
λ.{0}
-> map(
@7 <- @4,
)
-> project({4})
-> map(
@4 <- @2+@0,
)
-> project({2})
-> scan("x", [2, 3])
This is still not executable as flat joins because the subquery still refers to columns from the LHS of the flatmap, so we need to move on to step two:
Decorrelation
The key observation is that if the right side of a flatmap doesn't reference any of the columns in its left side, it's equivalent to a join:
fn flatmap(self, state: &State, func: Self) -> Self {
// Not correlated!
if func.free().is_empty() {
return self.join(func, vec![]);
}
RelExpr::FlatMap {
input: Box::new(self),
func: Box::new(func),
}
}
The next step is to try to push down flatmaps as much as we can to try to eliminate correlation to find these flatmaps we can turn into joins:
fn flatmap(self, state: &State, func: Self) -> Self {
// Not correlated!
if func.free().is_empty() {
return self.join(func, vec![]);
}
// Pull up Projects.
if let RelExpr::Project { src, mut cols } = func {
cols.extend(self.att());
return self.flatmap(state, *src).project(state, cols);
}
// Pull up Maps.
if let RelExpr::Map { input, exprs } = func {
return self.flatmap(state, *input).map(state, exprs);
}
RelExpr::FlatMap {
input: Box::new(self),
func: Box::new(func),
}
}
Running our planner now gives:
-> project({1, 0, 5})
-> map(
@5 <- @6+@7,
)
-> map(
@7 <- @4,
)
-> project({6, 1, 4, 0})
-> map(
@4 <- @2+@0,
)
-> join()
-> map(
@6 <- 4,
)
-> scan("a", [0, 1])
-> project({2})
-> scan("x", [2, 3])
Which is completely devoid of any nested subqueries or correlated flatmaps. It's not a super pretty query—we probably still need to do some work to clean it up, but we've solved the problem of correlation here. In a future post we will extend this further to handle other types of queries.
Further Reading
- JOIN: The Ultimate Projection
- How Materialize and other databases optimize SQL subqueries
- Orthogonal Optimization of Subqueries and Aggregation