Distributed Query Planner
The distributed query planner is entered through the
distributed_planner function in
distributed_planner.c. This is the hook that Postgres calls instead of
If the input query is trivial (e.g., no joins, no subqueries/ctes, single table and single shard), we create a very simple
PlannedStmt. If the query is not trivial, call
standard_planner to build a
PlannedStmt. For queries containing a distributed table or reference table, we then proceed with distributed planning, which overwrites the
planTree in the
Distributed planning (
CreateDistributedPlan) tries several different methods to plan the query:
- Fast-path router planner, proceed if the query prunes down to a single shard of a single table
- Router planner, proceed if the query prunes down to a single set of co-located shards
- Modification planning, proceed if the query is a DML command and all joins are co-located
- Recursive planning, find CTEs and subqueries that cannot be pushed down and go back to 1
- Logical planner, constructs a multi-relational algebra tree to find a distributed execution plan
Fast-path router planner
By examining the query tree, if we can decide that the query hits only a single shard of a single table, we can skip calling
standard_planner(). Later on the execution, we simply fetch the filter on the distribution key and do the pruning.
As the name reveals, this can be considered as a sub-item of Router planner described below. The only difference is that fast-path planner doesn't rely on
standard_planner() for collecting restriction information.
During the call to
standard_planner, Postgres calls a hook named
multi_relation_restriction_hook. We use this hook to determine explicit and implicit filters on (occurrences of) distributed tables. We apply shard pruning to all tables using the filters in
PlanRouterQuery. If all tables prune down to a single shard and all those shards are on the same node, then the query is router plannable meaning it can be fully executed by one of the worker nodes.
The router planner preserves the original query tree, but the query does need to be rewritten to have shard names instead of table names. We cannot simply put the shard names into the query tree, because it actually contains relation OIDs. If the deparsing logic resolves those OIDs, it would throw an error since the shards do not exist on the coordinator. Instead, we replace the table entries with a fake SQL function call to
citus_extradata_container and encode the original table ID and the shard ID into the parameter of the function. The deparsing functions recognise the fake function call and convert it into a shard name.
CTEs and subqueries that cannot be pushed down (checked using
DeferErrorIfCannotPushdownSubquery) and do not contain references to the outer query are planned by recursively calling the
planner function with the subquery as the parse tree. Because the planner is called from the top, any type of query that is supported by Postgres or Citus can be a valid subquery. The resulting plan is added to the
subPlanList in the
DistributedPlan and the subquery is replaced by a call to
read_intermediate_result with a particular subplan name. At execution time, all of the plans in the
subPlanList are executed and their output is sent to all the workers and written into an intermediate result with the subplan name (see
subplan_execution.c). In the remainder of the planner, calls to
read_intermediate_result are treated in the same way as reference tables, which means they can be joined by any column.
The logical planner constructs a multi-relational algebra tree from the query with operators such as
MultiCollect. It first picks a strategy for handling joins in
MultiLogicalPlanCreate (pushdown planning, or join order planning) and then builds a
MultiNode tree based on the original query tree. In the initial
MultiNode tree, each
MultiTable is wrapped in
MultiCollect, which effectively means collect the entire table in one place. The
MultiNode tree is passed to the logical optimizer which transforms the tree into one that requires less network traffic by pushing down operators. Finally, the physical planner transforms the
MultiNode tree into a
DistributedPlan which contains the queries to execute on shards and can be passed to the executor.
During the call to
standard_planner, Postgres calls a hook named
multi_relation_restriction_hook. We use this hook to determine whether all (occurrences of) distributed tables are joined on their respective distribution columns. When this is the case, we can be somewhat agnostic to the structure of subqueries and other joins. In that case, we treat the whole join tree as a single
MultiTable and deparse this part of the query as is during physical planning. Pushing down a subquery is only possible when the subquery can be answered without a merge step (checked using
DeferErrorIfCannotPushdownSubquery). However, you may notice that these subqueries are already replaced by
read_intermediate_result calls during recursive planning. Only subqueries that have references to the outer query remain at this stage would pass through recursive planning and fail the check.
Join order planning
The join order planner is applied to the join tree in the original query and generates all possible pair-wise join orders. If a pair-wise join is not on the distribution column, it requires re-partitioning. The join order that requires the fewest re-partition steps is converted into a tree of
MultiJoin nodes, prior to running the logical optimizer. The optimizer leaves the
MultiJoin tree in tact, but pushes down select and collect operators below the
The logical optimizer uses commutativity rules to push project and select operators down below the
MultiCollect nodes. Everything above the
MultiCollect operator will be is executed on the coordinator and everything below on the workers. Additionally, the optimizer uses distributivity rules to push down operators below the
MultiJoin nodes, such that filters and projections are applied prior to joins. This is primarily relevant for re-partition joins which first try to reduce the data by applying selections and projections, and then re-partitioning the result.
A number of SQL clauses like aggregates, GROUP BY, ORDER BY, LIMIT can only be pushed down below the
MultiCollect under certain conditions. All these clauses are bundled together in a
MultiExtendedOpNode. After the basic transformation, the
MultiExtendedOpNodes are directly above the
MultiCollect nodes. They are then split into a master and a worker part and the worker part is pushed down below the
This section needs to be expanded.
In terms of modification planning, we distinguish between several cases:
- DML planning (
CreateModifyPlan) 1.a. UPDATE/DELETE planning 1.b. INSERT planning
- INSERT...SELECT planning (
UPDATE and DELETE commands are handled by the router planner (
PlanRouterQuery), but when tables prune to multiple shards we do not fall back to other planners, but instead proceed to generate a task for each shard, as long as all subqueries can be pushed down. We can do this because UPDATE and DELETE never have a meaningful merge step on the coordinator, other than concatening RETURNING rows.
When there are CTEs or subqueries that cannot be pushed down in the UPDATE/DELETE, we continue with recursive planning and try again. If recursive planning cannot resolve the issue (e.g. due to a correlated subquery), then the command will error out.
Distributed planning for INSERT commands is relatively complicated because of multi-row INSERT. Each row in a multi-row INSERT may go to different shard and therefore we need to construct a different query for each shard. The logic for this is primarily in
BuildRoutesForInsert, which builds the set of rows for each shard. Rather than construct a full query tree for each shard, we put each set of rows in the
rowValuesLists of the
Task and replace the VALUES section of the INSERT just before deparsing in
One additional complication for INSERTs is that it is very common to have a function call (such as
nextval(..)) in the position of the distribution column. In that case building the task list is deferred until the functions have been evaluated in the executor.
INSERT ... SELECT query planning
INSERT ... SELECT queries either by pushing down the whole query to the worker nodes or pulling the
SELECT part to the coordinator.
INSERT ... SELECT - by pushing down
INSERT ... SELECT query can be planned by pushing down it to the worker nodes, Citus selects to choose that logic first. Query is planned separately for each shard in the target table. Do so by replacing the partitioning qual parameter using the shard's actual boundary values to create modify task for each shard. Then, shard pruning is performed to decide on to which shards query will be pushed down. Finally, checks if the target shardInterval has exactly same placements with the select task's available anchor placements.
INSERT...SELECT - via the coordinator
If the query can not be pushed down to the worker nodes, two different approaches can be followed depending on whether ON CONFLICT or RETURNING clauses are used.
RETURNINGare not used, Citus uses
COPYcommand to handle such queries. After planning the
SELECTpart of the
INSERT ... SELECTquery, including subqueries and CTEs, it executes the plan and send results back to the DestReceiver which is created using the target table info.
COPYcommand supports neither
RETURNINGclauses, Citus perform
INSERT ... SELECTqueries with
RETURNINGclause in two phases. First, Citus plans the
SELECTpart of the query, executes the plan and saves results to the intermediate table which is colocated with target table of the
INSERT ... SELECTquery. Then,
INSERT ... SELECTquery is directly run on the worker node using the intermediate table as the source table.