Skip to content

Commit

Permalink
Merge branch 'main' into clean-6496
Browse files Browse the repository at this point in the history
  • Loading branch information
c2main authored Oct 23, 2023
2 parents 5690d26 + 10198b1 commit 3d64e55
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 36 deletions.
33 changes: 17 additions & 16 deletions src/backend/distributed/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ CREATE TABLE country_codes (
country_code VARCHAR(3) PRIMARY KEY,
country_name VARCHAR(50)
);
SELECT create_reference_table('country_codes');

-- Reference Table: Order Status
CREATE TABLE order_status (
Expand All @@ -269,14 +270,17 @@ The aim of this planner is to avoid relying on PostgreSQL's standard_planner() f

### Main C Functions Involved:

- `FastPathRouterPlan()`: The primary function for creating the fast-path query plan.
- `FastPathPlanner()`: The primary function for creating the fast-path query plan.
- `FastPathRouterQuery()`: Validates if a query is eligible for fast-path routing by checking its structure and the WHERE clause.

With set client_min_messages to debug4; you should see the following in the DEBUG messages: "DEBUG: Distributed planning for a fast-path router query"

```sql
-- Fetches the count of users born in the same year, but only
-- for a single country
-- for a single country, with a filter on the distribution column
-- Normally we have a single user with id = 15 because it's a PRIMARY KEY
-- this is just to demonstrate that fast-path can handle complex queries
-- with EXTRACT(), COUNT(), GROUP BY, HAVING, etc.
SELECT EXTRACT(YEAR FROM date_of_birth) as birth_year, COUNT(*)
FROM users_table
WHERE country_code = 'USA' AND user_id = 15
Expand Down Expand Up @@ -382,11 +386,10 @@ FROM users_table u, orders_table o
WHERE u.user_id = o.user_id AND u.user_id = 42;

-- With Subqueries:

-- Fetch the username and their total order amount
-- for a specific user
SELECT u.username,
(SELECT MAX(o.product_id) FROM orders_table o
(SELECT COUNT(*) FROM orders_table o
WHERE o.user_id = 42 AND
o.user_id = u.user_id)
FROM users_table u
Expand Down Expand Up @@ -692,7 +695,7 @@ Assume that there are two subqueries; each subquery is individually joined on th
-- The join condition between them is: sub1.user_id != sub2.user_id, which does not preserve distribution key equality.
-- Citus qualifies sub1 as the anchor subquery and checks whether all other subqueries are joined on the distribution key.
-- In this case, sub2 is not joined on the distribution key, so Citus decides to recursively plan the whole sub2.
SELECT a.user_id, b.user_id
SELECT sub1.user_id, sub2.user_id
FROM (
SELECT u.user_id
FROM users_table u
Expand Down Expand Up @@ -884,7 +887,7 @@ Citus has a rules-based optimizer. The core function `MultiLogicalPlanCreate()`

For instance, one simple optimization pushes the "filter" operation below the "MultiCollect." Such rules are defined in the function `Commutative()` in `multi_logical_optimizer.c`.

The most interesting part of the optimizer is usually in the final stage, when handling the more complex operators (GROUP BY, DISTINCT window functions, ORDER BY, aggregates). These operators are conjoined in a `MultiExtendedOpNode`. In many cases, they can only partially be pushed down into the worker nodes, which results in one `MultiExtendedOpNode` above the `MultiCollection` (which will run on the coordinator and aggregates across worker nodes), and another `MultiExtendedOpNode` below the `MultiCollect` (which will be pushed down to worker nodes). The bulk of the logic for generating the two nodes lives in `MasterExtendedOpNode()` and `WorkerExtendedOpNode()`, respectively.
The most interesting part of the optimizer is usually in the final stage, when handling the more complex operators (GROUP BY, DISTINCT window functions, ORDER BY, aggregates). These operators are conjoined in a `MultiExtendedOpNode`. In many cases, they can only partially be pushed down into the worker nodes, which results in one `MultiExtendedOpNode` above the `MultiCollect` (which will run on the coordinator and aggregates across worker nodes), and another `MultiExtendedOpNode` below the `MultiCollect` (which will be pushed down to worker nodes). The bulk of the logic for generating the two nodes lives in `MasterExtendedOpNode()` and `WorkerExtendedOpNode()`, respectively.

##### Aggregate functions

Expand Down Expand Up @@ -1034,8 +1037,8 @@ SELECT * FROM cte_1;
-- but as the same cte used twice
-- Citus converts the CTE to intermediate result
WITH cte_1 AS (SELECT DISTINCT user_id FROM orders_table)
SELECT * FROM cte_1 as c1 JOIN
cte_1 as c2 USING (user_id);
SELECT * FROM cte_1 as c1
JOIN cte_1 as c2 USING (user_id);
```

- **Citus Specific Materialization**:
Expand All @@ -1051,8 +1054,7 @@ As of writing this document, Citus does NOT support

```sql
WITH users_that_have_orders AS (SELECT users_table.* FROM users_table JOIN orders_table USING (user_id))
SELECT
max(date_of_birth)
SELECT max(date_of_birth)
FROM users_that_have_orders
GROUP BY GROUPING SETS (user_id, email);
...
Expand Down Expand Up @@ -1099,7 +1101,7 @@ INSERT INTO orders_table (order_id, user_id) VALUES
```

**Debug Info**:
Debug information shows how the query is rebuilt for different user_ids.
Debug information shows how the query is rebuilt for different user_ids. Here, the shard_count is 4.
```sql
-- for user_id: 1
DEBUG: query after rebuilding: INSERT INTO public.orders_table_102041 AS citus_table_alias (order_id, user_id) VALUES ('1'::bigint,'1'::bigint), ('3'::bigint,'1'::bigint)
Expand Down Expand Up @@ -1133,7 +1135,7 @@ DEBUG: query after rebuilding: INSERT INTO public.orders_table_102064 AS citus
**Examples**:
The following section will delve into examples, starting with simple ones and moving to more complex scenarios.

### INSERT.. SELECT Advanced Scenarios
### INSERT.. SELECT Query Planning

**Overview**:
The `INSERT .. SELECT` pushdown logic builds upon the pushdown planning for `SELECT` commands. The key requirements include colocated tables and matching distribution columns. Relevant C functions are `CreateDistributedInsertSelectPlan`, `DistributedInsertSelectSupported()`, and `AllDistributionKeysInQueryAreEqual`.
Expand Down Expand Up @@ -1267,7 +1269,7 @@ WHERE user_id IN (SELECT user_id FROM high_value_users);
Used for more complex queries, like those with subqueries or joins that can't be pushed down. The queries are planned recursively.
```sql
DELETE FROM users_table WHERE user_id
IN (SELECT user_id FROM orders_table WHERE total > 100 ORDER BY total DESC LIMIT 5);
IN (SELECT user_id FROM orders_table WHERE order_date < '2023-01-01' ORDER BY order_date LIMIT 5);
```

### Correlated/Lateral Subqueries in Planning
Expand All @@ -1279,8 +1281,7 @@ Correlated or LATERAL subqueries have special behavior in Citus. They can often
**Key Code Details**:
For more information on the code, check the following functions:
`DeferErrorIfCannotPushdownSubquery()` ->
`ContainsReferencesToOuterQuery()` ->
`DeferErrorIfSubqueryRequiresMerge()`.
`ContainsReferencesToOuterQuery()`, `DeferErrorIfSubqueryRequiresMerge()`, `DeferredErrorIfUnsupportedLateralSubquery()`. LATERAL queries are different/unique: even if the subquery requires a merge step such as a `LIMIT`, if the correlation is on the distribution column, we can push it down. See [#4385](https://github.com/citusdata/citus/pull/4385).



Expand Down Expand Up @@ -1409,7 +1410,7 @@ WITH recent_orders AS (
)
SELECT u.*
FROM users_table u
JOIN recent_orders o ON u.user_id = o.product_id;
JOIN recent_orders o ON u.user_id = o.product_id
JOIN orders_table o2 ON o2.product_id = o.product_id;
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
```
Expand Down
2 changes: 1 addition & 1 deletion src/backend/distributed/planner/fast_path_router_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ GeneratePlaceHolderPlannedStmt(Query *parse)
* being a fast path router query.
* The requirements for the fast path query can be listed below:
*
* - SELECT query without CTES, sublinks-subqueries, set operations
* - SELECT/UPDATE/DELETE query without CTES, sublinks-subqueries, set operations
* - The query should touch only a single hash distributed or reference table
* - The distribution with equality operator should be in the WHERE clause
* and it should be ANDed with any other filters. Also, the distribution
Expand Down
22 changes: 3 additions & 19 deletions src/backend/distributed/planner/multi_router_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -2324,27 +2324,11 @@ PlanRouterQuery(Query *originalQuery,
TargetShardIntervalForFastPathQuery(originalQuery, &isMultiShardQuery,
distributionKeyValue,
partitionValueConst);

/*
* This could only happen when there is a parameter on the distribution key.
* We defer error here, later the planner is forced to use a generic plan
* by assigning arbitrarily high cost to the plan.
*/
if (UpdateOrDeleteOrMergeQuery(originalQuery) && isMultiShardQuery)
{
planningError = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"Router planner cannot handle multi-shard "
"modify queries", NULL, NULL);
return planningError;
}
Assert(!isMultiShardQuery);

*prunedShardIntervalListList = shardIntervalList;

if (!isMultiShardQuery)
{
ereport(DEBUG2, (errmsg("Distributed planning for a fast-path router "
"query")));
}
ereport(DEBUG2, (errmsg("Distributed planning for a fast-path router "
"query")));
}
else
{
Expand Down

0 comments on commit 3d64e55

Please sign in to comment.