Skip to content

Commit

Permalink
[8.x] ESQL: Prune lookup join cols (elastic#118808) (elastic#119126)
Browse files Browse the repository at this point in the history
* ESQL: Prune lookup join cols (elastic#118808)

Use the existing optimizer rule PruneColumns to remove redundant lookup fields from LOOKUP JOIN commands.
Additionally, slightly simplify PruneColumns and the surrogate substitution for LOOKUP JOIN.

* Preserve source compatibility with 8.x

---------

Co-authored-by: Costin Leau <costin@users.noreply.github.com>
  • Loading branch information
alex-spies and costin authored Dec 19, 2024
1 parent d0ed72d commit 7b2d90b
Show file tree
Hide file tree
Showing 7 changed files with 422 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,51 @@ language_code:integer | language_name:keyword
4 | German
;

dropAllLookedUpFieldsOnTheDataNode-Ignore
// Depends on
// https://github.com/elastic/elasticsearch/issues/118778
// https://github.com/elastic/elasticsearch/issues/118781
required_capability: join_lookup_v8

FROM employees
| EVAL language_code = emp_no % 10
| LOOKUP JOIN languages_lookup_non_unique_key ON language_code
| WHERE emp_no == 10001
| SORT emp_no
| DROP language*
;

emp_no:integer
10001
10001
10001
10001
;

dropAllLookedUpFieldsOnTheCoordinator-Ignore
// Depends on
// https://github.com/elastic/elasticsearch/issues/118778
// https://github.com/elastic/elasticsearch/issues/118781
required_capability: join_lookup_v8

FROM employees
| SORT emp_no
| LIMIT 2
| EVAL language_code = emp_no % 10
| LOOKUP JOIN languages_lookup_non_unique_key ON language_code
| DROP language*
;

emp_no:integer
10001
10001
10001
10001
10002
10002
10002
;

###############################################
# Filtering tests with languages_lookup index
###############################################
Expand Down Expand Up @@ -860,6 +905,26 @@ ignoreOrder:true
2023-10-23T12:15:03.360Z | 172.21.2.162 | 3450233 | Connected to 10.1.0.3 | Success | 172.21.2.162 | null
;

lookupMessageFromIndexTwiceFullyShadowing
required_capability: join_lookup_v8

FROM sample_data
| LOOKUP JOIN message_types_lookup ON message
| LOOKUP JOIN message_types_lookup ON message
| KEEP @timestamp, client_ip, event_duration, message, type
;
ignoreOrder:true

@timestamp:date | client_ip:ip | event_duration:long | message:keyword | type:keyword
2023-10-23T13:55:01.543Z | 172.21.3.15 | 1756467 | Connected to 10.1.0.1 | Success
2023-10-23T13:53:55.832Z | 172.21.3.15 | 5033755 | Connection error | Error
2023-10-23T13:52:55.015Z | 172.21.3.15 | 8268153 | Connection error | Error
2023-10-23T13:51:54.732Z | 172.21.3.15 | 725448 | Connection error | Error
2023-10-23T13:33:34.937Z | 172.21.0.5 | 1232382 | Disconnected | Disconnected
2023-10-23T12:27:28.948Z | 172.21.2.113 | 2764889 | Connected to 10.1.0.2 | Success
2023-10-23T12:15:03.360Z | 172.21.2.162 | 3450233 | Connected to 10.1.0.3 | Success
;

###############################################
# Tests with clientips_lookup and message_types_lookup indexes
###############################################
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@

import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockUtils;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.expression.EmptyAttribute;
import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.util.Holder;
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.Limit;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
Expand All @@ -34,13 +34,17 @@ public final class PruneColumns extends Rule<LogicalPlan, LogicalPlan> {

@Override
public LogicalPlan apply(LogicalPlan plan) {
var used = new AttributeSet();
// don't remove Evals without any Project/Aggregate (which might not occur as the last node in the plan)
var seenProjection = new Holder<>(Boolean.FALSE);

// start top-to-bottom
// and track used references
// track used references
var used = plan.outputSet();
// while going top-to-bottom (upstream)
var pl = plan.transformDown(p -> {
// Note: It is NOT required to do anything special for binary plans like JOINs. It is perfectly fine that transformDown descends
// first into the left side, adding all kinds of attributes to the `used` set, and then descends into the right side - even
// though the `used` set will contain stuff only used in the left hand side. That's because any attribute that is used in the
// left hand side must have been created in the left side as well. Even field attributes belonging to the same index fields will
// have different name ids in the left and right hand sides - as in the extreme example
// `FROM lookup_idx | LOOKUP JOIN lookup_idx ON key_field`.

// skip nodes that simply pass the input through
if (p instanceof Limit) {
return p;
Expand All @@ -53,7 +57,7 @@ public LogicalPlan apply(LogicalPlan plan) {
do {
recheck = false;
if (p instanceof Aggregate aggregate) {
var remaining = seenProjection.get() ? removeUnused(aggregate.aggregates(), used) : null;
var remaining = removeUnused(aggregate.aggregates(), used);

if (remaining != null) {
if (remaining.isEmpty()) {
Expand Down Expand Up @@ -87,10 +91,8 @@ public LogicalPlan apply(LogicalPlan plan) {
);
}
}

seenProjection.set(Boolean.TRUE);
} else if (p instanceof Eval eval) {
var remaining = seenProjection.get() ? removeUnused(eval.fields(), used) : null;
var remaining = removeUnused(eval.fields(), used);
// no fields, no eval
if (remaining != null) {
if (remaining.isEmpty()) {
Expand All @@ -100,8 +102,16 @@ public LogicalPlan apply(LogicalPlan plan) {
p = new Eval(eval.source(), eval.child(), remaining);
}
}
} else if (p instanceof Project) {
seenProjection.set(Boolean.TRUE);
} else if (p instanceof EsRelation esRelation && esRelation.indexMode() == IndexMode.LOOKUP) {
// Normally, pruning EsRelation has no effect because InsertFieldExtraction only extracts the required fields, anyway.
// The field extraction for LOOKUP JOIN works differently, however - we extract all fields (other than the join key)
// that the EsRelation has.
var remaining = removeUnused(esRelation.output(), used);
// TODO: LookupFromIndexOperator cannot handle 0 lookup fields, yet. That means 1 field in total (key field + lookup).
// https://github.com/elastic/elasticsearch/issues/118778
if (remaining != null && remaining.size() > 1) {
p = new EsRelation(esRelation.source(), esRelation.index(), remaining, esRelation.indexMode(), esRelation.frozen());
}
}
} while (recheck);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ else if (plan instanceof Project project) {
FieldAttribute.class,
// Do not use the attribute name, this can deviate from the field name for union types.
// Also skip fields from lookup indices because we do not have stats for these.
// TODO: We do have stats for lookup indices in case they are being used in the FROM clause; this can be refined.
f -> stats.exists(f.fieldName()) || lookupFields.contains(f) ? f : Literal.of(f, null)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.logical.SurrogateLogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes.UsingJoinType;

Expand Down Expand Up @@ -50,9 +49,8 @@ public LookupJoin(Source source, LogicalPlan left, LogicalPlan right, JoinConfig
*/
@Override
public LogicalPlan surrogate() {
Join normalized = new Join(source(), left(), right(), config());
// TODO: decide whether to introduce USING or just basic ON semantics - keep the ordering out for now
return new Project(source(), normalized, output());
return new Join(source(), left(), right(), config());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ public String describe() {
return Stream.concat(
Stream.concat(Stream.of(sourceOperatorFactory), intermediateOperatorFactories.stream()),
Stream.of(sinkOperatorFactory)
).map(Describable::describe).collect(joining("\n\\_", "\\_", ""));
).map(describable -> describable == null ? "null" : describable.describe()).collect(joining("\n\\_", "\\_", ""));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3011,6 +3011,24 @@ public void testPruneChainedEval() {
var source = as(limit.child(), EsRelation.class);
}

public void testPruneChainedEvalNoProjection() {
var plan = plan("""
from test
| eval garbage = salary + 3
| eval garbage = emp_no / garbage, garbage = garbage
| eval garbage = 1
""");
var eval = as(plan, Eval.class);
var limit = as(eval.child(), Limit.class);
var source = as(limit.child(), EsRelation.class);

assertEquals(1, eval.fields().size());
var alias = as(eval.fields().get(0), Alias.class);
assertEquals(alias.name(), "garbage");
var literal = as(alias.child(), Literal.class);
assertEquals(1, literal.value());
}

/**
* Expects
* Limit[1000[INTEGER]]
Expand Down Expand Up @@ -4914,8 +4932,7 @@ public void testPlanSanityCheckWithBinaryPlans() throws Exception {
| LOOKUP JOIN languages_lookup ON language_code
""");

var project = as(plan, Project.class);
var join = as(project.child(), Join.class);
var join = as(plan, Join.class);

var joinWithInvalidLeftPlan = join.replaceChildren(join.right(), join.right());
IllegalStateException e = expectThrows(IllegalStateException.class, () -> logicalOptimizer.optimize(joinWithInvalidLeftPlan));
Expand Down Expand Up @@ -5921,10 +5938,9 @@ public void testLookupJoinPushDownFilterOnJoinKeyWithRename() {
""";
var plan = optimizedPlan(query);

var project = as(plan, Project.class);
var join = as(project.child(), Join.class);
var join = as(plan, Join.class);
assertThat(join.config().type(), equalTo(JoinTypes.LEFT));
project = as(join.left(), Project.class);
var project = as(join.left(), Project.class);
var limit = as(project.child(), Limit.class);
assertThat(limit.limit().fold(), equalTo(1000));
var filter = as(limit.child(), Filter.class);
Expand Down Expand Up @@ -5965,10 +5981,9 @@ public void testLookupJoinPushDownFilterOnLeftSideField() {

var plan = optimizedPlan(query);

var project = as(plan, Project.class);
var join = as(project.child(), Join.class);
var join = as(plan, Join.class);
assertThat(join.config().type(), equalTo(JoinTypes.LEFT));
project = as(join.left(), Project.class);
var project = as(join.left(), Project.class);

var limit = as(project.child(), Limit.class);
assertThat(limit.limit().fold(), equalTo(1000));
Expand Down Expand Up @@ -6009,8 +6024,7 @@ public void testLookupJoinPushDownDisabledForLookupField() {

var plan = optimizedPlan(query);

var project = as(plan, Project.class);
var limit = as(project.child(), Limit.class);
var limit = as(plan, Limit.class);
assertThat(limit.limit().fold(), equalTo(1000));

var filter = as(limit.child(), Filter.class);
Expand All @@ -6022,7 +6036,7 @@ public void testLookupJoinPushDownDisabledForLookupField() {

var join = as(filter.child(), Join.class);
assertThat(join.config().type(), equalTo(JoinTypes.LEFT));
project = as(join.left(), Project.class);
var project = as(join.left(), Project.class);

var leftRel = as(project.child(), EsRelation.class);
var rightRel = as(join.right(), EsRelation.class);
Expand Down Expand Up @@ -6054,8 +6068,7 @@ public void testLookupJoinPushDownSeparatedForConjunctionBetweenLeftAndRightFiel

var plan = optimizedPlan(query);

var project = as(plan, Project.class);
var limit = as(project.child(), Limit.class);
var limit = as(plan, Limit.class);
assertThat(limit.limit().fold(), equalTo(1000));
// filter kept in place, working on the right side
var filter = as(limit.child(), Filter.class);
Expand All @@ -6067,7 +6080,7 @@ public void testLookupJoinPushDownSeparatedForConjunctionBetweenLeftAndRightFiel

var join = as(filter.child(), Join.class);
assertThat(join.config().type(), equalTo(JoinTypes.LEFT));
project = as(join.left(), Project.class);
var project = as(join.left(), Project.class);
// filter pushed down
filter = as(project.child(), Filter.class);
op = as(filter.condition(), GreaterThan.class);
Expand All @@ -6079,7 +6092,6 @@ public void testLookupJoinPushDownSeparatedForConjunctionBetweenLeftAndRightFiel

var leftRel = as(filter.child(), EsRelation.class);
var rightRel = as(join.right(), EsRelation.class);

}

/**
Expand Down Expand Up @@ -6107,8 +6119,7 @@ public void testLookupJoinPushDownDisabledForDisjunctionBetweenLeftAndRightField

var plan = optimizedPlan(query);

var project = as(plan, Project.class);
var limit = as(project.child(), Limit.class);
var limit = as(plan, Limit.class);
assertThat(limit.limit().fold(), equalTo(1000));

var filter = as(limit.child(), Filter.class);
Expand All @@ -6128,7 +6139,7 @@ public void testLookupJoinPushDownDisabledForDisjunctionBetweenLeftAndRightField

var join = as(filter.child(), Join.class);
assertThat(join.config().type(), equalTo(JoinTypes.LEFT));
project = as(join.left(), Project.class);
var project = as(join.left(), Project.class);

var leftRel = as(project.child(), EsRelation.class);
var rightRel = as(join.right(), EsRelation.class);
Expand Down
Loading

0 comments on commit 7b2d90b

Please sign in to comment.