From 6a989bf30f6c53c5ccd0e16ca6a52b438c04d60d Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Thu, 23 May 2024 19:20:00 +0800 Subject: [PATCH] fix(query): fix missing exchange in create table as select --- .../suites/mode/cluster/create_table.test | 624 ++++++++++++++++++ 1 file changed, 624 insertions(+) create mode 100644 tests/sqllogictests/suites/mode/cluster/create_table.test diff --git a/tests/sqllogictests/suites/mode/cluster/create_table.test b/tests/sqllogictests/suites/mode/cluster/create_table.test new file mode 100644 index 000000000000..b58f352dfe8a --- /dev/null +++ b/tests/sqllogictests/suites/mode/cluster/create_table.test @@ -0,0 +1,624 @@ +query T +explain create or replace table t2 as select number % 400 d, max(number) from numbers(10000000) group by number limit 3; +---- +CreateTableAsSelect: +(empty) +EvalScalar +├── output columns: [max(number) (#6), d (#7)] +├── expressions: [numbers.number (#4) % 400] +├── estimated rows: 3.00 +└── Limit + ├── output columns: [max(number) (#6), numbers.number (#4)] + ├── limit: 3 + ├── offset: 0 + ├── estimated rows: 3.00 + └── Exchange + ├── output columns: [max(number) (#6), numbers.number (#4)] + ├── exchange type: Merge + └── Limit + ├── output columns: [max(number) (#6), numbers.number (#4)] + ├── limit: 3 + ├── offset: 0 + ├── estimated rows: 3.00 + └── AggregateFinal + ├── output columns: [max(number) (#6), numbers.number (#4)] + ├── group by: [number] + ├── aggregate functions: [max(number)] + ├── limit: 3 + ├── estimated rows: 10000000.00 + └── Exchange + ├── output columns: [max(number) (#6), numbers.number (#4)] + ├── exchange type: Hash(0) + └── AggregatePartial + ├── group by: [number] + ├── aggregate functions: [max(number)] + ├── estimated rows: 10000000.00 + └── TableScan + ├── table: default.system.numbers + ├── output columns: [number (#4)] + ├── read rows: 10000000 + ├── read size: 76.29 MiB + ├── partitions total: 153 + ├── partitions scanned: 153 + ├── push downs: [filters: [], limit: NONE] + └── estimated rows: 10000000.00 + +query T +explain select * from numbers(1) t, numbers(2) t1, numbers(3) t2 where t.number = t1.number and t.number = t2.number +---- +Exchange +├── output columns: [t2.number (#2), t1.number (#1), t.number (#0)] +├── exchange type: Merge +└── HashJoin + ├── output columns: [t2.number (#2), t1.number (#1), t.number (#0)] + ├── join type: INNER + ├── build keys: [t.number (#0)] + ├── probe keys: [t2.number (#2)] + ├── filters: [] + ├── estimated rows: 6.00 + ├── Exchange(Build) + │ ├── output columns: [t1.number (#1), t.number (#0)] + │ ├── exchange type: Broadcast + │ └── HashJoin + │ ├── output columns: [t1.number (#1), t.number (#0)] + │ ├── join type: INNER + │ ├── build keys: [t.number (#0)] + │ ├── probe keys: [t1.number (#1)] + │ ├── filters: [] + │ ├── estimated rows: 2.00 + │ ├── Exchange(Build) + │ │ ├── output columns: [t.number (#0)] + │ │ ├── exchange type: Broadcast + │ │ └── TableScan + │ │ ├── table: default.system.numbers + │ │ ├── output columns: [number (#0)] + │ │ ├── read rows: 1 + │ │ ├── read size: < 1 KiB + │ │ ├── partitions total: 1 + │ │ ├── partitions scanned: 1 + │ │ ├── push downs: [filters: [], limit: NONE] + │ │ └── estimated rows: 1.00 + │ └── TableScan(Probe) + │ ├── table: default.system.numbers + │ ├── output columns: [number (#1)] + │ ├── read rows: 2 + │ ├── read size: < 1 KiB + │ ├── partitions total: 1 + │ ├── partitions scanned: 1 + │ ├── push downs: [filters: [], limit: NONE] + │ └── estimated rows: 2.00 + └── TableScan(Probe) + ├── table: default.system.numbers + ├── output columns: [number (#2)] + ├── read rows: 3 + ├── read size: < 1 KiB + ├── partitions total: 1 + ├── partitions scanned: 1 + ├── push downs: [filters: [], limit: NONE] + └── estimated rows: 3.00 + +query T +explain select * from (select number as a, number+1 as b from numbers(1)) t, numbers(2) t1, numbers(3) t2 where a = t1.number and b = t2.number +---- +Exchange +├── output columns: [t2.number (#3), t1.number (#2), numbers.number (#0), b (#1)] +├── exchange type: Merge +└── HashJoin + ├── output columns: [t2.number (#3), t1.number (#2), numbers.number (#0), b (#1)] + ├── join type: INNER + ├── build keys: [t.b (#1)] + ├── probe keys: [t2.number (#3)] + ├── filters: [] + ├── estimated rows: 6.00 + ├── Exchange(Build) + │ ├── output columns: [t1.number (#2), b (#1), numbers.number (#0)] + │ ├── exchange type: Broadcast + │ └── HashJoin + │ ├── output columns: [t1.number (#2), b (#1), numbers.number (#0)] + │ ├── join type: INNER + │ ├── build keys: [t.a (#0)] + │ ├── probe keys: [t1.number (#2)] + │ ├── filters: [] + │ ├── estimated rows: 2.00 + │ ├── Exchange(Build) + │ │ ├── output columns: [numbers.number (#0), b (#1)] + │ │ ├── exchange type: Broadcast + │ │ └── EvalScalar + │ │ ├── output columns: [numbers.number (#0), b (#1)] + │ │ ├── expressions: [numbers.number (#0) + 1] + │ │ ├── estimated rows: 1.00 + │ │ └── TableScan + │ │ ├── table: default.system.numbers + │ │ ├── output columns: [number (#0)] + │ │ ├── read rows: 1 + │ │ ├── read size: < 1 KiB + │ │ ├── partitions total: 1 + │ │ ├── partitions scanned: 1 + │ │ ├── push downs: [filters: [], limit: NONE] + │ │ └── estimated rows: 1.00 + │ └── TableScan(Probe) + │ ├── table: default.system.numbers + │ ├── output columns: [number (#2)] + │ ├── read rows: 2 + │ ├── read size: < 1 KiB + │ ├── partitions total: 1 + │ ├── partitions scanned: 1 + │ ├── push downs: [filters: [], limit: NONE] + │ └── estimated rows: 2.00 + └── TableScan(Probe) + ├── table: default.system.numbers + ├── output columns: [number (#3)] + ├── read rows: 3 + ├── read size: < 1 KiB + ├── partitions total: 1 + ├── partitions scanned: 1 + ├── push downs: [filters: [], limit: NONE] + └── estimated rows: 3.00 + +query T +explain select * from (select sum(number) as number from numbers(1) group by number) t, numbers(2) t1 where t.number = t1.number +---- +Exchange +├── output columns: [t1.number (#3), sum(number) (#2)] +├── exchange type: Merge +└── HashJoin + ├── output columns: [t1.number (#3), sum(number) (#2)] + ├── join type: INNER + ├── build keys: [t.number (#2)] + ├── probe keys: [CAST(t1.number (#3) AS UInt64 NULL)] + ├── filters: [] + ├── estimated rows: 2.00 + ├── Exchange(Build) + │ ├── output columns: [sum(number) (#2), numbers.number (#0)] + │ ├── exchange type: Broadcast + │ └── AggregateFinal + │ ├── output columns: [sum(number) (#2), numbers.number (#0)] + │ ├── group by: [number] + │ ├── aggregate functions: [sum(number)] + │ ├── estimated rows: 1.00 + │ └── Exchange + │ ├── output columns: [sum(number) (#2), numbers.number (#0)] + │ ├── exchange type: Hash(0) + │ └── AggregatePartial + │ ├── group by: [number] + │ ├── aggregate functions: [sum(number)] + │ ├── estimated rows: 1.00 + │ └── TableScan + │ ├── table: default.system.numbers + │ ├── output columns: [number (#0)] + │ ├── read rows: 1 + │ ├── read size: < 1 KiB + │ ├── partitions total: 1 + │ ├── partitions scanned: 1 + │ ├── push downs: [filters: [], limit: NONE] + │ └── estimated rows: 1.00 + └── TableScan(Probe) + ├── table: default.system.numbers + ├── output columns: [number (#3)] + ├── read rows: 2 + ├── read size: < 1 KiB + ├── partitions total: 1 + ├── partitions scanned: 1 + ├── push downs: [filters: [], limit: NONE] + └── estimated rows: 2.00 + +query T +explain fragments select * from (select sum(number) as number from numbers(1) group by number) t, numbers(2) t1 where t.number = t1.number +---- +Fragment 0: + DataExchange: Shuffle + ExchangeSink + ├── output columns: [sum(number) (#2), numbers.number (#0)] + ├── destination fragment: [1] + └── AggregatePartial + ├── group by: [number] + ├── aggregate functions: [sum(number)] + ├── estimated rows: 1.00 + └── TableScan + ├── table: default.system.numbers + ├── output columns: [number (#0)] + ├── read rows: 1 + ├── read size: < 1 KiB + ├── partitions total: 1 + ├── partitions scanned: 1 + ├── push downs: [filters: [], limit: NONE] + └── estimated rows: 1.00 +(empty) +(empty) +Fragment 1: + DataExchange: Broadcast + ExchangeSink + ├── output columns: [sum(number) (#2), numbers.number (#0)] + ├── destination fragment: [2] + └── AggregateFinal + ├── output columns: [sum(number) (#2), numbers.number (#0)] + ├── group by: [number] + ├── aggregate functions: [sum(number)] + ├── estimated rows: 1.00 + └── ExchangeSource + ├── output columns: [sum(number) (#2), numbers.number (#0)] + └── source fragment: [0] +(empty) +(empty) +Fragment 2: + DataExchange: Merge + ExchangeSink + ├── output columns: [t1.number (#3), sum(number) (#2)] + ├── destination fragment: [3] + └── HashJoin + ├── output columns: [t1.number (#3), sum(number) (#2)] + ├── join type: INNER + ├── build keys: [t.number (#2)] + ├── probe keys: [CAST(t1.number (#3) AS UInt64 NULL)] + ├── filters: [] + ├── estimated rows: 2.00 + ├── ExchangeSource(Build) + │ ├── output columns: [sum(number) (#2), numbers.number (#0)] + │ └── source fragment: [1] + └── TableScan(Probe) + ├── table: default.system.numbers + ├── output columns: [number (#3)] + ├── read rows: 2 + ├── read size: < 1 KiB + ├── partitions total: 1 + ├── partitions scanned: 1 + ├── push downs: [filters: [], limit: NONE] + └── estimated rows: 2.00 +(empty) +(empty) +Fragment 3: + ExchangeSource + ├── output columns: [t1.number (#3), sum(number) (#2)] + └── source fragment: [2] +(empty) + +query T +explain +with +t1 as (select number as a from numbers(10)), +t2 as (select number as a from numbers(1000)), +t3 as (select number as a from numbers(10)) +select sum(a) from ( +select t1.a from t1, t2 where t1.a = t2.a +union all +select t1.a from t1, t3 where t1.a = t3.a +) t +---- +AggregateFinal +├── output columns: [sum(a) (#4)] +├── group by: [] +├── aggregate functions: [sum(number)] +├── estimated rows: 1.00 +└── Exchange + ├── output columns: [sum(a) (#4)] + ├── exchange type: Merge + └── AggregatePartial + ├── group by: [] + ├── aggregate functions: [sum(number)] + ├── estimated rows: 1.00 + └── UnionAll + ├── output columns: [numbers.number (#0)] + ├── estimated rows: 10100.00 + ├── HashJoin + │ ├── output columns: [numbers.number (#0)] + │ ├── join type: INNER + │ ├── build keys: [t1.a (#0)] + │ ├── probe keys: [t2.a (#1)] + │ ├── filters: [] + │ ├── estimated rows: 10000.00 + │ ├── Exchange(Build) + │ │ ├── output columns: [numbers.number (#0)] + │ │ ├── exchange type: Broadcast + │ │ └── TableScan + │ │ ├── table: default.system.numbers + │ │ ├── output columns: [number (#0)] + │ │ ├── read rows: 10 + │ │ ├── read size: < 1 KiB + │ │ ├── partitions total: 1 + │ │ ├── partitions scanned: 1 + │ │ ├── push downs: [filters: [], limit: NONE] + │ │ └── estimated rows: 10.00 + │ └── TableScan(Probe) + │ ├── table: default.system.numbers + │ ├── output columns: [number (#1)] + │ ├── read rows: 1000 + │ ├── read size: 7.81 KiB + │ ├── partitions total: 1 + │ ├── partitions scanned: 1 + │ ├── push downs: [filters: [], limit: NONE] + │ └── estimated rows: 1000.00 + └── HashJoin + ├── output columns: [numbers.number (#2)] + ├── join type: INNER + ├── build keys: [t3.a (#3)] + ├── probe keys: [t1.a (#2)] + ├── filters: [] + ├── estimated rows: 100.00 + ├── Exchange(Build) + │ ├── output columns: [numbers.number (#3)] + │ ├── exchange type: Broadcast + │ └── TableScan + │ ├── table: default.system.numbers + │ ├── output columns: [number (#3)] + │ ├── read rows: 10 + │ ├── read size: < 1 KiB + │ ├── partitions total: 1 + │ ├── partitions scanned: 1 + │ ├── push downs: [filters: [], limit: NONE] + │ └── estimated rows: 10.00 + └── TableScan(Probe) + ├── table: default.system.numbers + ├── output columns: [number (#2)] + ├── read rows: 10 + ├── read size: < 1 KiB + ├── partitions total: 1 + ├── partitions scanned: 1 + ├── push downs: [filters: [], limit: NONE] + └── estimated rows: 10.00 + + +query T +explain +with +t1 as (select number as a from numbers(10)), +t2 as (select number as a from numbers(1000)), +t3 as (select number as a from numbers(10)) +select sum(a) from ( +select t1.a from t1, t2 where t1.a = t2.a +union all +select sum(t1.a) from t1, t3 where t1.a = t3.a +) t +---- +AggregateFinal +├── output columns: [sum(a) (#6)] +├── group by: [] +├── aggregate functions: [sum(a)] +├── estimated rows: 1.00 +└── AggregatePartial + ├── group by: [] + ├── aggregate functions: [sum(a)] + ├── estimated rows: 1.00 + └── UnionAll + ├── output columns: [a (#5)] + ├── estimated rows: 10001.00 + ├── Exchange + │ ├── output columns: [a (#5)] + │ ├── exchange type: Merge + │ └── EvalScalar + │ ├── output columns: [a (#5)] + │ ├── expressions: [CAST(t1.a (#0) AS UInt64 NULL)] + │ ├── estimated rows: 10000.00 + │ └── HashJoin + │ ├── output columns: [numbers.number (#0)] + │ ├── join type: INNER + │ ├── build keys: [t1.a (#0)] + │ ├── probe keys: [t2.a (#1)] + │ ├── filters: [] + │ ├── estimated rows: 10000.00 + │ ├── Exchange(Build) + │ │ ├── output columns: [numbers.number (#0)] + │ │ ├── exchange type: Broadcast + │ │ └── TableScan + │ │ ├── table: default.system.numbers + │ │ ├── output columns: [number (#0)] + │ │ ├── read rows: 10 + │ │ ├── read size: < 1 KiB + │ │ ├── partitions total: 1 + │ │ ├── partitions scanned: 1 + │ │ ├── push downs: [filters: [], limit: NONE] + │ │ └── estimated rows: 10.00 + │ └── TableScan(Probe) + │ ├── table: default.system.numbers + │ ├── output columns: [number (#1)] + │ ├── read rows: 1000 + │ ├── read size: 7.81 KiB + │ ├── partitions total: 1 + │ ├── partitions scanned: 1 + │ ├── push downs: [filters: [], limit: NONE] + │ └── estimated rows: 1000.00 + └── AggregateFinal + ├── output columns: [sum(t1.a) (#4)] + ├── group by: [] + ├── aggregate functions: [sum(number)] + ├── estimated rows: 1.00 + └── Exchange + ├── output columns: [sum(t1.a) (#4)] + ├── exchange type: Merge + └── AggregatePartial + ├── group by: [] + ├── aggregate functions: [sum(number)] + ├── estimated rows: 1.00 + └── HashJoin + ├── output columns: [numbers.number (#2)] + ├── join type: INNER + ├── build keys: [t3.a (#3)] + ├── probe keys: [t1.a (#2)] + ├── filters: [] + ├── estimated rows: 100.00 + ├── Exchange(Build) + │ ├── output columns: [numbers.number (#3)] + │ ├── exchange type: Broadcast + │ └── TableScan + │ ├── table: default.system.numbers + │ ├── output columns: [number (#3)] + │ ├── read rows: 10 + │ ├── read size: < 1 KiB + │ ├── partitions total: 1 + │ ├── partitions scanned: 1 + │ ├── push downs: [filters: [], limit: NONE] + │ └── estimated rows: 10.00 + └── TableScan(Probe) + ├── table: default.system.numbers + ├── output columns: [number (#2)] + ├── read rows: 10 + ├── read size: < 1 KiB + ├── partitions total: 1 + ├── partitions scanned: 1 + ├── push downs: [filters: [], limit: NONE] + └── estimated rows: 10.00 + +query T +explain +with +t1 as (select number as a from numbers(10)), +t2 as (select number as a from numbers(1000)), +t3 as (select number as a from numbers(10)) +select sum(a) from ( +select t1.a from t1, t2 where t1.a = t2.a +union all +select t1.a from t1 +) t +---- +AggregateFinal +├── output columns: [sum(a) (#3)] +├── group by: [] +├── aggregate functions: [sum(number)] +├── estimated rows: 1.00 +└── Exchange + ├── output columns: [sum(a) (#3)] + ├── exchange type: Merge + └── AggregatePartial + ├── group by: [] + ├── aggregate functions: [sum(number)] + ├── estimated rows: 1.00 + └── UnionAll + ├── output columns: [numbers.number (#0)] + ├── estimated rows: 10010.00 + ├── HashJoin + │ ├── output columns: [numbers.number (#0)] + │ ├── join type: INNER + │ ├── build keys: [t1.a (#0)] + │ ├── probe keys: [t2.a (#1)] + │ ├── filters: [] + │ ├── estimated rows: 10000.00 + │ ├── Exchange(Build) + │ │ ├── output columns: [numbers.number (#0)] + │ │ ├── exchange type: Broadcast + │ │ └── TableScan + │ │ ├── table: default.system.numbers + │ │ ├── output columns: [number (#0)] + │ │ ├── read rows: 10 + │ │ ├── read size: < 1 KiB + │ │ ├── partitions total: 1 + │ │ ├── partitions scanned: 1 + │ │ ├── push downs: [filters: [], limit: NONE] + │ │ └── estimated rows: 10.00 + │ └── TableScan(Probe) + │ ├── table: default.system.numbers + │ ├── output columns: [number (#1)] + │ ├── read rows: 1000 + │ ├── read size: 7.81 KiB + │ ├── partitions total: 1 + │ ├── partitions scanned: 1 + │ ├── push downs: [filters: [], limit: NONE] + │ └── estimated rows: 1000.00 + └── TableScan + ├── table: default.system.numbers + ├── output columns: [number (#2)] + ├── read rows: 10 + ├── read size: < 1 KiB + ├── partitions total: 1 + ├── partitions scanned: 1 + ├── push downs: [filters: [], limit: NONE] + └── estimated rows: 10.00 + +query T +explain +with +t1 as (select number as a from numbers(10)), +t2 as (select number as a from numbers(1000)), +t3 as (select number as a from numbers(10)) +select * from t1, t2, t3 +---- +Exchange +├── output columns: [numbers.number (#1), numbers.number (#0), numbers.number (#2)] +├── exchange type: Merge +└── HashJoin + ├── output columns: [numbers.number (#1), numbers.number (#0), numbers.number (#2)] + ├── join type: CROSS + ├── build keys: [] + ├── probe keys: [] + ├── filters: [] + ├── estimated rows: 100000.00 + ├── Exchange(Build) + │ ├── output columns: [numbers.number (#2)] + │ ├── exchange type: Broadcast + │ └── TableScan + │ ├── table: default.system.numbers + │ ├── output columns: [number (#2)] + │ ├── read rows: 10 + │ ├── read size: < 1 KiB + │ ├── partitions total: 1 + │ ├── partitions scanned: 1 + │ ├── push downs: [filters: [], limit: NONE] + │ └── estimated rows: 10.00 + └── HashJoin(Probe) + ├── output columns: [numbers.number (#1), numbers.number (#0)] + ├── join type: CROSS + ├── build keys: [] + ├── probe keys: [] + ├── filters: [] + ├── estimated rows: 10000.00 + ├── Exchange(Build) + │ ├── output columns: [numbers.number (#0)] + │ ├── exchange type: Broadcast + │ └── TableScan + │ ├── table: default.system.numbers + │ ├── output columns: [number (#0)] + │ ├── read rows: 10 + │ ├── read size: < 1 KiB + │ ├── partitions total: 1 + │ ├── partitions scanned: 1 + │ ├── push downs: [filters: [], limit: NONE] + │ └── estimated rows: 10.00 + └── TableScan(Probe) + ├── table: default.system.numbers + ├── output columns: [number (#1)] + ├── read rows: 1000 + ├── read size: 7.81 KiB + ├── partitions total: 1 + ├── partitions scanned: 1 + ├── push downs: [filters: [], limit: NONE] + └── estimated rows: 1000.00 + +statement ok +set disable_join_reorder = 1; + +query T +explain select * from numbers(10) as t1 join numbers(20) as t2 on t1.number = t2.number; +---- +Exchange +├── output columns: [t1.number (#0), t2.number (#1)] +├── exchange type: Merge +└── HashJoin + ├── output columns: [t1.number (#0), t2.number (#1)] + ├── join type: INNER + ├── build keys: [t2.number (#1)] + ├── probe keys: [t1.number (#0)] + ├── filters: [] + ├── estimated rows: 200.00 + ├── Exchange(Build) + │ ├── output columns: [t2.number (#1)] + │ ├── exchange type: Broadcast + │ └── TableScan + │ ├── table: default.system.numbers + │ ├── output columns: [number (#1)] + │ ├── read rows: 20 + │ ├── read size: < 1 KiB + │ ├── partitions total: 1 + │ ├── partitions scanned: 1 + │ ├── push downs: [filters: [], limit: NONE] + │ └── estimated rows: 20.00 + └── TableScan(Probe) + ├── table: default.system.numbers + ├── output columns: [number (#0)] + ├── read rows: 10 + ├── read size: < 1 KiB + ├── partitions total: 1 + ├── partitions scanned: 1 + ├── push downs: [filters: [], limit: NONE] + └── estimated rows: 10.00 + + +statement ok +set disable_join_reorder = 0;