Skip to content

Commit

Permalink
Merge branch 'main' into data-stream-reindex-wired-up
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke authored Dec 18, 2024
2 parents a5c0754 + 1004da2 commit f122bbe
Show file tree
Hide file tree
Showing 19 changed files with 606 additions and 78 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,6 @@ testfixtures_shared/
# Generated
checkstyle_ide.xml
x-pack/plugin/esql/src/main/generated-src/generated/

# JEnv
.java-version
1 change: 0 additions & 1 deletion .java-version

This file was deleted.

5 changes: 5 additions & 0 deletions docs/changelog/118890.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 118890
summary: Add action to create index from a source index
area: Data streams
type: enhancement
issues: []
14 changes: 14 additions & 0 deletions docs/reference/esql/functions/examples/bucket.asciidoc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion docs/reference/esql/functions/kibana/definition/bucket.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 0 additions & 5 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,6 @@ tests:
- class: org.elasticsearch.datastreams.DataStreamsClientYamlTestSuiteIT
method: test {p0=data_stream/120_data_streams_stats/Multiple data stream}
issue: https://github.com/elastic/elasticsearch/issues/118217
- class: org.elasticsearch.validation.DotPrefixClientYamlTestSuiteIT
issue: https://github.com/elastic/elasticsearch/issues/118224
- class: org.elasticsearch.packaging.test.ArchiveTests
method: test60StartAndStop
issue: https://github.com/elastic/elasticsearch/issues/118216
Expand Down Expand Up @@ -292,9 +290,6 @@ tests:
issue: https://github.com/elastic/elasticsearch/issues/118914
- class: org.elasticsearch.smoketest.SmokeTestMultiNodeClientYamlTestSuiteIT
issue: https://github.com/elastic/elasticsearch/issues/118955
- class: org.elasticsearch.xpack.migrate.task.ReindexDataStreamStatusTests
method: testEqualsAndHashcode
issue: https://github.com/elastic/elasticsearch/issues/118965
- class: org.elasticsearch.repositories.blobstore.testkit.analyze.SecureHdfsRepositoryAnalysisRestIT
issue: https://github.com/elastic/elasticsearch/issues/118970

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
import static org.elasticsearch.xpack.esql.core.util.SpatialCoordinateTypes.GEO;

public final class CsvTestUtils {
private static final int MAX_WIDTH = 20;
private static final int MAX_WIDTH = 80;
private static final CsvPreference CSV_SPEC_PREFERENCES = new CsvPreference.Builder('"', '|', "\r\n").build();
private static final String NULL_VALUE = "null";
private static final char ESCAPE_CHAR = '\\';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,24 @@ AVG(salary):double | bucket:date
// end::bucket_in_agg-result[]
;

bucketWithOffset#[skip:-8.13.99, reason:BUCKET renamed in 8.14]
// tag::bucketWithOffset[]
FROM employees
| STATS dates = MV_SORT(VALUES(birth_date)) BY b = BUCKET(birth_date + 1 HOUR, 1 YEAR) - 1 HOUR
| EVAL d_count = MV_COUNT(dates)
| SORT d_count, b
| LIMIT 3
// end::bucketWithOffset[]
;

// tag::bucketWithOffset-result[]
dates:date |b:date |d_count:integer
1965-01-03T00:00:00.000Z |1964-12-31T23:00:00.000Z|1
[1955-01-21T00:00:00.000Z, 1955-08-20T00:00:00.000Z, 1955-08-28T00:00:00.000Z, 1955-10-04T00:00:00.000Z]|1954-12-31T23:00:00.000Z|4
[1957-04-04T00:00:00.000Z, 1957-05-23T00:00:00.000Z, 1957-05-25T00:00:00.000Z, 1957-12-03T00:00:00.000Z]|1956-12-31T23:00:00.000Z|4
// end::bucketWithOffset-result[]
;

docsBucketMonth#[skip:-8.13.99, reason:BUCKET renamed in 8.14]
//tag::docsBucketMonth[]
FROM employees
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,17 @@ another in which the bucket size is provided directly (two parameters).
grouping part, or that it is invoked with the exact same expression:""",
file = "bucket",
tag = "reuseGroupingFunctionWithExpression"
),
@Example(
description = """
Sometimes you need to change the start value of each bucket by a given duration (similar to date histogram
aggregation's <<search-aggregations-bucket-histogram-aggregation,`offset`>> parameter). To do so, you will need to
take into account how the language handles expressions within the `STATS` command: if these contain functions or
arithmetic operators, a virtual `EVAL` is inserted before and/or after the `STATS` command. Consequently, a double
compensation is needed to adjust the bucketed date value before the aggregation and then again after. For instance,
inserting a negative offset of `1 hour` to buckets of `1 year` looks like this:""",
file = "bucket",
tag = "bucketWithOffset"
) }
)
public Bucket(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,13 +322,14 @@ private void doTest() throws Exception {
}

protected void assertResults(ExpectedResults expected, ActualResults actual, boolean ignoreOrder, Logger logger) {
CsvAssert.assertResults(expected, actual, ignoreOrder, logger);
/*
* Comment the assertion above and enable the next two lines to see the results returned by ES without any assertions being done.
* Enable the next two lines to see the results returned by ES.
* This is useful when creating a new test or trying to figure out what are the actual results.
*/
// CsvTestUtils.logMetaData(actual.columnNames(), actual.columnTypes(), LOGGER);
// CsvTestUtils.logData(actual.values(), LOGGER);

CsvAssert.assertResults(expected, actual, ignoreOrder, logger);
}

private static IndexResolution loadIndexResolution(String mappingName, String indexName, Map<String, String> typeMapping) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.migrate.action;

import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.datastreams.DataStreamsPlugin;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.reindex.ReindexPlugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.migrate.MigratePlugin;

import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.REINDEX_DATA_STREAM_FEATURE_FLAG;

public class CreateIndexFromSourceActionIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(MigratePlugin.class, ReindexPlugin.class, MockTransportService.TestPlugin.class, DataStreamsPlugin.class);
}

public void testDestIndexCreated() throws Exception {
assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled());

var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
indicesAdmin().create(new CreateIndexRequest(sourceIndex)).get();

// create from source
var destIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
assertAcked(
client().execute(CreateIndexFromSourceAction.INSTANCE, new CreateIndexFromSourceAction.Request(sourceIndex, destIndex))
);

try {
indicesAdmin().getIndex(new GetIndexRequest().indices(destIndex)).actionGet();
} catch (IndexNotFoundException e) {
fail();
}
}

public void testSettingsCopiedFromSource() throws Exception {
assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled());

// start with a static setting
var numShards = randomIntBetween(1, 10);
var staticSettings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards).build();
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
indicesAdmin().create(new CreateIndexRequest(sourceIndex, staticSettings)).get();

// update with a dynamic setting
var numReplicas = randomIntBetween(0, 10);
var dynamicSettings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicas).build();
indicesAdmin().updateSettings(new UpdateSettingsRequest(dynamicSettings, sourceIndex)).actionGet();

// create from source
var destIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
assertAcked(
client().execute(CreateIndexFromSourceAction.INSTANCE, new CreateIndexFromSourceAction.Request(sourceIndex, destIndex))
);

// assert both static and dynamic settings set on dest index
var settingsResponse = indicesAdmin().getSettings(new GetSettingsRequest().indices(destIndex)).actionGet();
assertEquals(numReplicas, Integer.parseInt(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_NUMBER_OF_REPLICAS)));
assertEquals(numShards, Integer.parseInt(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_NUMBER_OF_SHARDS)));
}

public void testMappingsCopiedFromSource() {
assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled());

var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
String mapping = """
{
"_doc":{
"dynamic":"strict",
"properties":{
"foo1":{
"type":"text"
}
}
}
}
""";
indicesAdmin().create(new CreateIndexRequest(sourceIndex).mapping(mapping)).actionGet();

// create from source
var destIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
assertAcked(
client().execute(CreateIndexFromSourceAction.INSTANCE, new CreateIndexFromSourceAction.Request(sourceIndex, destIndex))
);

var mappingsResponse = indicesAdmin().getMappings(new GetMappingsRequest().indices(sourceIndex, destIndex)).actionGet();
Map<String, MappingMetadata> mappings = mappingsResponse.mappings();
var destMappings = mappings.get(destIndex).sourceAsMap();
var sourceMappings = mappings.get(sourceIndex).sourceAsMap();

assertEquals(sourceMappings, destMappings);
// sanity check specific value from dest mapping
assertEquals("text", XContentMapValues.extractValue("properties.foo1.type", destMappings));
}

public void testSettingsOverridden() throws Exception {
assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled());

var numShardsSource = randomIntBetween(1, 10);
var numReplicasSource = randomIntBetween(0, 10);
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
var sourceSettings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShardsSource)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicasSource)
.build();
indicesAdmin().create(new CreateIndexRequest(sourceIndex, sourceSettings)).get();

boolean overrideNumShards = randomBoolean();
Settings settingsOverride = overrideNumShards
? Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShardsSource + 1).build()
: Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicasSource + 1).build();

// create from source
var destIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
assertAcked(
client().execute(
CreateIndexFromSourceAction.INSTANCE,
new CreateIndexFromSourceAction.Request(sourceIndex, destIndex, settingsOverride, Map.of())
)
);

// assert settings overridden
int expectedShards = overrideNumShards ? numShardsSource + 1 : numShardsSource;
int expectedReplicas = overrideNumShards ? numReplicasSource : numReplicasSource + 1;
var settingsResponse = indicesAdmin().getSettings(new GetSettingsRequest().indices(destIndex)).actionGet();
assertEquals(expectedShards, Integer.parseInt(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_NUMBER_OF_SHARDS)));
assertEquals(expectedReplicas, Integer.parseInt(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_NUMBER_OF_REPLICAS)));
}

public void testSettingsNullOverride() throws Exception {
assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled());

var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
var sourceSettings = Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, true).build();
indicesAdmin().create(new CreateIndexRequest(sourceIndex, sourceSettings)).get();

Settings settingsOverride = Settings.builder().putNull(IndexMetadata.SETTING_BLOCKS_WRITE).build();

// create from source
var destIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
assertAcked(
client().execute(
CreateIndexFromSourceAction.INSTANCE,
new CreateIndexFromSourceAction.Request(sourceIndex, destIndex, settingsOverride, Map.of())
)
);

// assert settings overridden
var settingsResponse = indicesAdmin().getSettings(new GetSettingsRequest().indices(destIndex)).actionGet();
assertNull(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_BLOCKS_WRITE));
}

public void testMappingsOverridden() {
assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled());

var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
String sourceMapping = """
{
"_doc":{
"dynamic":"strict",
"properties":{
"foo1":{
"type":"text"
},
"foo2":{
"type":"boolean"
}
}
}
}
""";
indicesAdmin().create(new CreateIndexRequest(sourceIndex).mapping(sourceMapping)).actionGet();

String mappingOverrideStr = """
{
"_doc":{
"dynamic":"strict",
"properties":{
"foo1":{
"type":"integer"
},
"foo3": {
"type":"keyword"
}
}
}
}
""";
var mappingOverride = XContentHelper.convertToMap(JsonXContent.jsonXContent, mappingOverrideStr, false);

// create from source
var destIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
assertAcked(
client().execute(
CreateIndexFromSourceAction.INSTANCE,
new CreateIndexFromSourceAction.Request(sourceIndex, destIndex, Settings.EMPTY, mappingOverride)
)
);

var mappingsResponse = indicesAdmin().getMappings(new GetMappingsRequest().indices(destIndex)).actionGet();
Map<String, MappingMetadata> mappings = mappingsResponse.mappings();
var destMappings = mappings.get(destIndex).sourceAsMap();

String expectedMappingStr = """
{
"dynamic":"strict",
"properties":{
"foo1":{
"type":"integer"
},
"foo2": {
"type":"boolean"
},
"foo3": {
"type":"keyword"
}
}
}
""";
var expectedMapping = XContentHelper.convertToMap(JsonXContent.jsonXContent, expectedMappingStr, false);
assertEquals(expectedMapping, destMappings);
}
}
Loading

0 comments on commit f122bbe

Please sign in to comment.