diff --git a/.gitignore b/.gitignore index d1af97cbaea3b..8b2da4dc0832a 100644 --- a/.gitignore +++ b/.gitignore @@ -69,3 +69,6 @@ testfixtures_shared/ # Generated checkstyle_ide.xml x-pack/plugin/esql/src/main/generated-src/generated/ + +# JEnv +.java-version diff --git a/.java-version b/.java-version deleted file mode 100644 index aabe6ec3909c9..0000000000000 --- a/.java-version +++ /dev/null @@ -1 +0,0 @@ -21 diff --git a/docs/changelog/118890.yaml b/docs/changelog/118890.yaml new file mode 100644 index 0000000000000..d3fc17157f130 --- /dev/null +++ b/docs/changelog/118890.yaml @@ -0,0 +1,5 @@ +pr: 118890 +summary: Add action to create index from a source index +area: Data streams +type: enhancement +issues: [] diff --git a/docs/reference/esql/functions/examples/bucket.asciidoc b/docs/reference/esql/functions/examples/bucket.asciidoc index 4afea30660339..264efc191748f 100644 --- a/docs/reference/esql/functions/examples/bucket.asciidoc +++ b/docs/reference/esql/functions/examples/bucket.asciidoc @@ -116,4 +116,18 @@ include::{esql-specs}/bucket.csv-spec[tag=reuseGroupingFunctionWithExpression] |=== include::{esql-specs}/bucket.csv-spec[tag=reuseGroupingFunctionWithExpression-result] |=== +Sometimes you need to change the start value of each bucket by a given duration (similar to date histogram +aggregation's <> parameter). To do so, you will need to +take into account how the language handles expressions within the `STATS` command: if these contain functions or +arithmetic operators, a virtual `EVAL` is inserted before and/or after the `STATS` command. Consequently, a double +compensation is needed to adjust the bucketed date value before the aggregation and then again after. For instance, +inserting a negative offset of `1 hour` to buckets of `1 year` looks like this: +[source.merge.styled,esql] +---- +include::{esql-specs}/bucket.csv-spec[tag=bucketWithOffset] +---- +[%header.monospaced.styled,format=dsv,separator=|] +|=== +include::{esql-specs}/bucket.csv-spec[tag=bucketWithOffset-result] +|=== diff --git a/docs/reference/esql/functions/kibana/definition/bucket.json b/docs/reference/esql/functions/kibana/definition/bucket.json index 18802f5ff8fef..3d96de05c8407 100644 --- a/docs/reference/esql/functions/kibana/definition/bucket.json +++ b/docs/reference/esql/functions/kibana/definition/bucket.json @@ -1598,7 +1598,8 @@ "FROM employees\n| WHERE hire_date >= \"1985-01-01T00:00:00Z\" AND hire_date < \"1986-01-01T00:00:00Z\"\n| STATS c = COUNT(1) BY b = BUCKET(salary, 5000.)\n| SORT b", "FROM sample_data \n| WHERE @timestamp >= NOW() - 1 day and @timestamp < NOW()\n| STATS COUNT(*) BY bucket = BUCKET(@timestamp, 25, NOW() - 1 day, NOW())", "FROM employees\n| WHERE hire_date >= \"1985-01-01T00:00:00Z\" AND hire_date < \"1986-01-01T00:00:00Z\"\n| STATS AVG(salary) BY bucket = BUCKET(hire_date, 20, \"1985-01-01T00:00:00Z\", \"1986-01-01T00:00:00Z\")\n| SORT bucket", - "FROM employees\n| STATS s1 = b1 + 1, s2 = BUCKET(salary / 1000 + 999, 50.) + 2 BY b1 = BUCKET(salary / 100 + 99, 50.), b2 = BUCKET(salary / 1000 + 999, 50.)\n| SORT b1, b2\n| KEEP s1, b1, s2, b2" + "FROM employees\n| STATS s1 = b1 + 1, s2 = BUCKET(salary / 1000 + 999, 50.) + 2 BY b1 = BUCKET(salary / 100 + 99, 50.), b2 = BUCKET(salary / 1000 + 999, 50.)\n| SORT b1, b2\n| KEEP s1, b1, s2, b2", + "FROM employees \n| STATS dates = VALUES(birth_date) BY b = BUCKET(birth_date + 1 HOUR, 1 YEAR) - 1 HOUR\n| EVAL d_count = MV_COUNT(dates)\n| SORT d_count\n| LIMIT 3" ], "preview" : false, "snapshot_only" : false diff --git a/muted-tests.yml b/muted-tests.yml index 6b433f232defd..f534f24718f52 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -238,8 +238,6 @@ tests: - class: org.elasticsearch.datastreams.DataStreamsClientYamlTestSuiteIT method: test {p0=data_stream/120_data_streams_stats/Multiple data stream} issue: https://github.com/elastic/elasticsearch/issues/118217 -- class: org.elasticsearch.validation.DotPrefixClientYamlTestSuiteIT - issue: https://github.com/elastic/elasticsearch/issues/118224 - class: org.elasticsearch.packaging.test.ArchiveTests method: test60StartAndStop issue: https://github.com/elastic/elasticsearch/issues/118216 @@ -292,9 +290,6 @@ tests: issue: https://github.com/elastic/elasticsearch/issues/118914 - class: org.elasticsearch.smoketest.SmokeTestMultiNodeClientYamlTestSuiteIT issue: https://github.com/elastic/elasticsearch/issues/118955 -- class: org.elasticsearch.xpack.migrate.task.ReindexDataStreamStatusTests - method: testEqualsAndHashcode - issue: https://github.com/elastic/elasticsearch/issues/118965 - class: org.elasticsearch.repositories.blobstore.testkit.analyze.SecureHdfsRepositoryAnalysisRestIT issue: https://github.com/elastic/elasticsearch/issues/118970 diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java index 7adafa908ce4f..f0bdf089f69d1 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java @@ -63,7 +63,7 @@ import static org.elasticsearch.xpack.esql.core.util.SpatialCoordinateTypes.GEO; public final class CsvTestUtils { - private static final int MAX_WIDTH = 20; + private static final int MAX_WIDTH = 80; private static final CsvPreference CSV_SPEC_PREFERENCES = new CsvPreference.Builder('"', '|', "\r\n").build(); private static final String NULL_VALUE = "null"; private static final char ESCAPE_CHAR = '\\'; diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/bucket.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/bucket.csv-spec index b29c489910f65..8cfde2bb9bde7 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/bucket.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/bucket.csv-spec @@ -145,6 +145,24 @@ AVG(salary):double | bucket:date // end::bucket_in_agg-result[] ; +bucketWithOffset#[skip:-8.13.99, reason:BUCKET renamed in 8.14] +// tag::bucketWithOffset[] +FROM employees +| STATS dates = MV_SORT(VALUES(birth_date)) BY b = BUCKET(birth_date + 1 HOUR, 1 YEAR) - 1 HOUR +| EVAL d_count = MV_COUNT(dates) +| SORT d_count, b +| LIMIT 3 +// end::bucketWithOffset[] +; + +// tag::bucketWithOffset-result[] +dates:date |b:date |d_count:integer +1965-01-03T00:00:00.000Z |1964-12-31T23:00:00.000Z|1 +[1955-01-21T00:00:00.000Z, 1955-08-20T00:00:00.000Z, 1955-08-28T00:00:00.000Z, 1955-10-04T00:00:00.000Z]|1954-12-31T23:00:00.000Z|4 +[1957-04-04T00:00:00.000Z, 1957-05-23T00:00:00.000Z, 1957-05-25T00:00:00.000Z, 1957-12-03T00:00:00.000Z]|1956-12-31T23:00:00.000Z|4 +// end::bucketWithOffset-result[] +; + docsBucketMonth#[skip:-8.13.99, reason:BUCKET renamed in 8.14] //tag::docsBucketMonth[] FROM employees diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/grouping/Bucket.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/grouping/Bucket.java index 347d542f5212d..12932ba8d6e11 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/grouping/Bucket.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/grouping/Bucket.java @@ -163,6 +163,17 @@ another in which the bucket size is provided directly (two parameters). grouping part, or that it is invoked with the exact same expression:""", file = "bucket", tag = "reuseGroupingFunctionWithExpression" + ), + @Example( + description = """ + Sometimes you need to change the start value of each bucket by a given duration (similar to date histogram + aggregation's <> parameter). To do so, you will need to + take into account how the language handles expressions within the `STATS` command: if these contain functions or + arithmetic operators, a virtual `EVAL` is inserted before and/or after the `STATS` command. Consequently, a double + compensation is needed to adjust the bucketed date value before the aggregation and then again after. For instance, + inserting a negative offset of `1 hour` to buckets of `1 year` looks like this:""", + file = "bucket", + tag = "bucketWithOffset" ) } ) public Bucket( diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java index b81b80a9fdbb4..e627f99322f08 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java @@ -322,13 +322,14 @@ private void doTest() throws Exception { } protected void assertResults(ExpectedResults expected, ActualResults actual, boolean ignoreOrder, Logger logger) { - CsvAssert.assertResults(expected, actual, ignoreOrder, logger); /* - * Comment the assertion above and enable the next two lines to see the results returned by ES without any assertions being done. + * Enable the next two lines to see the results returned by ES. * This is useful when creating a new test or trying to figure out what are the actual results. */ // CsvTestUtils.logMetaData(actual.columnNames(), actual.columnTypes(), LOGGER); // CsvTestUtils.logData(actual.values(), LOGGER); + + CsvAssert.assertResults(expected, actual, ignoreOrder, logger); } private static IndexResolution loadIndexResolution(String mappingName, String indexName, Map typeMapping) { diff --git a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/CreateIndexFromSourceActionIT.java b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/CreateIndexFromSourceActionIT.java new file mode 100644 index 0000000000000..b460c6abfeee4 --- /dev/null +++ b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/CreateIndexFromSourceActionIT.java @@ -0,0 +1,250 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.migrate.action; + +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.get.GetIndexRequest; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.MappingMetadata; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.datastreams.DataStreamsPlugin; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.reindex.ReindexPlugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.migrate.MigratePlugin; + +import java.util.Collection; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.REINDEX_DATA_STREAM_FEATURE_FLAG; + +public class CreateIndexFromSourceActionIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return List.of(MigratePlugin.class, ReindexPlugin.class, MockTransportService.TestPlugin.class, DataStreamsPlugin.class); + } + + public void testDestIndexCreated() throws Exception { + assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()); + + var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + indicesAdmin().create(new CreateIndexRequest(sourceIndex)).get(); + + // create from source + var destIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + assertAcked( + client().execute(CreateIndexFromSourceAction.INSTANCE, new CreateIndexFromSourceAction.Request(sourceIndex, destIndex)) + ); + + try { + indicesAdmin().getIndex(new GetIndexRequest().indices(destIndex)).actionGet(); + } catch (IndexNotFoundException e) { + fail(); + } + } + + public void testSettingsCopiedFromSource() throws Exception { + assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()); + + // start with a static setting + var numShards = randomIntBetween(1, 10); + var staticSettings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards).build(); + var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + indicesAdmin().create(new CreateIndexRequest(sourceIndex, staticSettings)).get(); + + // update with a dynamic setting + var numReplicas = randomIntBetween(0, 10); + var dynamicSettings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicas).build(); + indicesAdmin().updateSettings(new UpdateSettingsRequest(dynamicSettings, sourceIndex)).actionGet(); + + // create from source + var destIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + assertAcked( + client().execute(CreateIndexFromSourceAction.INSTANCE, new CreateIndexFromSourceAction.Request(sourceIndex, destIndex)) + ); + + // assert both static and dynamic settings set on dest index + var settingsResponse = indicesAdmin().getSettings(new GetSettingsRequest().indices(destIndex)).actionGet(); + assertEquals(numReplicas, Integer.parseInt(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_NUMBER_OF_REPLICAS))); + assertEquals(numShards, Integer.parseInt(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_NUMBER_OF_SHARDS))); + } + + public void testMappingsCopiedFromSource() { + assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()); + + var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + String mapping = """ + { + "_doc":{ + "dynamic":"strict", + "properties":{ + "foo1":{ + "type":"text" + } + } + } + } + """; + indicesAdmin().create(new CreateIndexRequest(sourceIndex).mapping(mapping)).actionGet(); + + // create from source + var destIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + assertAcked( + client().execute(CreateIndexFromSourceAction.INSTANCE, new CreateIndexFromSourceAction.Request(sourceIndex, destIndex)) + ); + + var mappingsResponse = indicesAdmin().getMappings(new GetMappingsRequest().indices(sourceIndex, destIndex)).actionGet(); + Map mappings = mappingsResponse.mappings(); + var destMappings = mappings.get(destIndex).sourceAsMap(); + var sourceMappings = mappings.get(sourceIndex).sourceAsMap(); + + assertEquals(sourceMappings, destMappings); + // sanity check specific value from dest mapping + assertEquals("text", XContentMapValues.extractValue("properties.foo1.type", destMappings)); + } + + public void testSettingsOverridden() throws Exception { + assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()); + + var numShardsSource = randomIntBetween(1, 10); + var numReplicasSource = randomIntBetween(0, 10); + var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + var sourceSettings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShardsSource) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicasSource) + .build(); + indicesAdmin().create(new CreateIndexRequest(sourceIndex, sourceSettings)).get(); + + boolean overrideNumShards = randomBoolean(); + Settings settingsOverride = overrideNumShards + ? Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShardsSource + 1).build() + : Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicasSource + 1).build(); + + // create from source + var destIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + assertAcked( + client().execute( + CreateIndexFromSourceAction.INSTANCE, + new CreateIndexFromSourceAction.Request(sourceIndex, destIndex, settingsOverride, Map.of()) + ) + ); + + // assert settings overridden + int expectedShards = overrideNumShards ? numShardsSource + 1 : numShardsSource; + int expectedReplicas = overrideNumShards ? numReplicasSource : numReplicasSource + 1; + var settingsResponse = indicesAdmin().getSettings(new GetSettingsRequest().indices(destIndex)).actionGet(); + assertEquals(expectedShards, Integer.parseInt(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_NUMBER_OF_SHARDS))); + assertEquals(expectedReplicas, Integer.parseInt(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_NUMBER_OF_REPLICAS))); + } + + public void testSettingsNullOverride() throws Exception { + assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()); + + var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + var sourceSettings = Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, true).build(); + indicesAdmin().create(new CreateIndexRequest(sourceIndex, sourceSettings)).get(); + + Settings settingsOverride = Settings.builder().putNull(IndexMetadata.SETTING_BLOCKS_WRITE).build(); + + // create from source + var destIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + assertAcked( + client().execute( + CreateIndexFromSourceAction.INSTANCE, + new CreateIndexFromSourceAction.Request(sourceIndex, destIndex, settingsOverride, Map.of()) + ) + ); + + // assert settings overridden + var settingsResponse = indicesAdmin().getSettings(new GetSettingsRequest().indices(destIndex)).actionGet(); + assertNull(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_BLOCKS_WRITE)); + } + + public void testMappingsOverridden() { + assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()); + + var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + String sourceMapping = """ + { + "_doc":{ + "dynamic":"strict", + "properties":{ + "foo1":{ + "type":"text" + }, + "foo2":{ + "type":"boolean" + } + } + } + } + """; + indicesAdmin().create(new CreateIndexRequest(sourceIndex).mapping(sourceMapping)).actionGet(); + + String mappingOverrideStr = """ + { + "_doc":{ + "dynamic":"strict", + "properties":{ + "foo1":{ + "type":"integer" + }, + "foo3": { + "type":"keyword" + } + } + } + } + """; + var mappingOverride = XContentHelper.convertToMap(JsonXContent.jsonXContent, mappingOverrideStr, false); + + // create from source + var destIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + assertAcked( + client().execute( + CreateIndexFromSourceAction.INSTANCE, + new CreateIndexFromSourceAction.Request(sourceIndex, destIndex, Settings.EMPTY, mappingOverride) + ) + ); + + var mappingsResponse = indicesAdmin().getMappings(new GetMappingsRequest().indices(destIndex)).actionGet(); + Map mappings = mappingsResponse.mappings(); + var destMappings = mappings.get(destIndex).sourceAsMap(); + + String expectedMappingStr = """ + { + "dynamic":"strict", + "properties":{ + "foo1":{ + "type":"integer" + }, + "foo2": { + "type":"boolean" + }, + "foo3": { + "type":"keyword" + } + } + } + """; + var expectedMapping = XContentHelper.convertToMap(JsonXContent.jsonXContent, expectedMappingStr, false); + assertEquals(expectedMapping, destMappings); + } +} diff --git a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexIT.java b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java similarity index 98% rename from x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexIT.java rename to x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java index e492f035da866..0ca58ecf0f0d5 100644 --- a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexIT.java +++ b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java @@ -53,7 +53,7 @@ import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.REINDEX_DATA_STREAM_FEATURE_FLAG; import static org.hamcrest.Matchers.equalTo; -public class ReindexDatastreamIndexIT extends ESIntegTestCase { +public class ReindexDatastreamIndexTransportActionIT extends ESIntegTestCase { private static final String MAPPING = """ { @@ -126,12 +126,14 @@ public void testDestIndexContainsDocs() throws Exception { assertHitCount(prepareSearch(response.getDestIndex()).setSize(0), numDocs); } - public void testSetSourceToReadOnly() throws Exception { + public void testSetSourceToBlockWrites() throws Exception { assumeTrue("requires the migration reindex feature flag", REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()); + var settings = randomBoolean() ? Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, true).build() : Settings.EMPTY; + // empty source index var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); - indicesAdmin().create(new CreateIndexRequest(sourceIndex)).get(); + indicesAdmin().create(new CreateIndexRequest(sourceIndex, settings)).get(); // call reindex client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex)).actionGet(); diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java index f42d05727b9fd..d9dffdefafa2c 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java @@ -34,6 +34,8 @@ import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xpack.migrate.action.CancelReindexDataStreamAction; import org.elasticsearch.xpack.migrate.action.CancelReindexDataStreamTransportAction; +import org.elasticsearch.xpack.migrate.action.CreateIndexFromSourceAction; +import org.elasticsearch.xpack.migrate.action.CreateIndexFromSourceTransportAction; import org.elasticsearch.xpack.migrate.action.GetMigrationReindexStatusAction; import org.elasticsearch.xpack.migrate.action.GetMigrationReindexStatusTransportAction; import org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction; @@ -87,6 +89,7 @@ public List getRestHandlers( actions.add(new ActionHandler<>(GetMigrationReindexStatusAction.INSTANCE, GetMigrationReindexStatusTransportAction.class)); actions.add(new ActionHandler<>(CancelReindexDataStreamAction.INSTANCE, CancelReindexDataStreamTransportAction.class)); actions.add(new ActionHandler<>(ReindexDataStreamIndexAction.INSTANCE, ReindexDataStreamIndexTransportAction.class)); + actions.add(new ActionHandler<>(CreateIndexFromSourceAction.INSTANCE, CreateIndexFromSourceTransportAction.class)); } return actions; } diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CreateIndexFromSourceAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CreateIndexFromSourceAction.java new file mode 100644 index 0000000000000..d67eaee3d251f --- /dev/null +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CreateIndexFromSourceAction.java @@ -0,0 +1,117 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.migrate.action; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +public class CreateIndexFromSourceAction extends ActionType { + + public static final String NAME = "indices:admin/index/create_from_source"; + + public static final ActionType INSTANCE = new CreateIndexFromSourceAction(); + + private CreateIndexFromSourceAction() { + super(NAME); + } + + public static class Request extends ActionRequest implements IndicesRequest { + + private final String sourceIndex; + private final String destIndex; + private final Settings settingsOverride; + private final Map mappingsOverride; + + public Request(String sourceIndex, String destIndex) { + this(sourceIndex, destIndex, Settings.EMPTY, Map.of()); + } + + public Request(String sourceIndex, String destIndex, Settings settingsOverride, Map mappingsOverride) { + Objects.requireNonNull(mappingsOverride); + this.sourceIndex = sourceIndex; + this.destIndex = destIndex; + this.settingsOverride = settingsOverride; + this.mappingsOverride = mappingsOverride; + } + + @SuppressWarnings("unchecked") + public Request(StreamInput in) throws IOException { + super(in); + this.sourceIndex = in.readString(); + this.destIndex = in.readString(); + this.settingsOverride = Settings.readSettingsFromStream(in); + this.mappingsOverride = (Map) in.readGenericValue(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(sourceIndex); + out.writeString(destIndex); + settingsOverride.writeTo(out); + out.writeGenericValue(mappingsOverride); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + public String getSourceIndex() { + return sourceIndex; + } + + public String getDestIndex() { + return destIndex; + } + + public Settings getSettingsOverride() { + return settingsOverride; + } + + public Map getMappingsOverride() { + return mappingsOverride; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return Objects.equals(sourceIndex, request.sourceIndex) + && Objects.equals(destIndex, request.destIndex) + && Objects.equals(settingsOverride, request.settingsOverride) + && Objects.equals(mappingsOverride, request.mappingsOverride); + } + + @Override + public int hashCode() { + return Objects.hash(sourceIndex, destIndex, settingsOverride, mappingsOverride); + } + + @Override + public String[] indices() { + return new String[] { sourceIndex }; + } + + @Override + public IndicesOptions indicesOptions() { + return IndicesOptions.strictSingleIndexNoExpandForbidClosed(); + } + } +} diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CreateIndexFromSourceTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CreateIndexFromSourceTransportAction.java new file mode 100644 index 0000000000000..968b2220628a9 --- /dev/null +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CreateIndexFromSourceTransportAction.java @@ -0,0 +1,126 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.migrate.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.MappingMetadata; +import org.elasticsearch.cluster.metadata.MetadataCreateIndexService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.compress.CompressedXContent; +import org.elasticsearch.common.settings.IndexScopedSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xcontent.XContentType; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +public class CreateIndexFromSourceTransportAction extends HandledTransportAction< + CreateIndexFromSourceAction.Request, + AcknowledgedResponse> { + private static final Logger logger = LogManager.getLogger(CreateIndexFromSourceTransportAction.class); + + private final ClusterService clusterService; + private final Client client; + private final IndexScopedSettings indexScopedSettings; + + @Inject + public CreateIndexFromSourceTransportAction( + TransportService transportService, + ClusterService clusterService, + ActionFilters actionFilters, + Client client, + IndexScopedSettings indexScopedSettings + ) { + super( + CreateIndexFromSourceAction.NAME, + false, + transportService, + actionFilters, + CreateIndexFromSourceAction.Request::new, + transportService.getThreadPool().executor(ThreadPool.Names.GENERIC) + ); + this.clusterService = clusterService; + this.client = client; + this.indexScopedSettings = indexScopedSettings; + } + + @Override + protected void doExecute(Task task, CreateIndexFromSourceAction.Request request, ActionListener listener) { + + IndexMetadata sourceIndex = clusterService.state().getMetadata().index(request.getSourceIndex()); + + if (sourceIndex == null) { + listener.onFailure(new IndexNotFoundException(request.getSourceIndex())); + return; + } + + logger.debug("Creating destination index [{}] for source index [{}]", request.getDestIndex(), request.getSourceIndex()); + + Settings settings = Settings.builder() + // add source settings + .put(filterSettings(sourceIndex)) + // add override settings from request + .put(request.getSettingsOverride()) + .build(); + + Map mergeMappings; + try { + mergeMappings = mergeMappings(sourceIndex.mapping(), request.getMappingsOverride()); + } catch (IOException e) { + listener.onFailure(e); + return; + } + + var createIndexRequest = new CreateIndexRequest(request.getDestIndex()).settings(settings); + if (mergeMappings.isEmpty() == false) { + createIndexRequest.mapping(mergeMappings); + } + + client.admin().indices().create(createIndexRequest, listener.map(response -> response)); + } + + private static Map toMap(@Nullable MappingMetadata sourceMapping) { + return Optional.ofNullable(sourceMapping) + .map(MappingMetadata::source) + .map(CompressedXContent::uncompressed) + .map(s -> XContentHelper.convertToMap(s, true, XContentType.JSON).v2()) + .orElse(Map.of()); + } + + private static Map mergeMappings(@Nullable MappingMetadata sourceMapping, Map mappingAddition) + throws IOException { + Map combinedMappingMap = new HashMap<>(toMap(sourceMapping)); + XContentHelper.update(combinedMappingMap, mappingAddition, true); + return combinedMappingMap; + } + + // Filter source index settings to subset of settings that can be included during reindex. + // Similar to the settings filtering done when reindexing for upgrade in Kibana + // https://github.com/elastic/kibana/blob/8a8363f02cc990732eb9cbb60cd388643a336bed/x-pack + // /plugins/upgrade_assistant/server/lib/reindexing/index_settings.ts#L155 + private Settings filterSettings(IndexMetadata sourceIndex) { + return MetadataCreateIndexService.copySettingsFromSource(false, sourceIndex.getSettings(), indexScopedSettings, Settings.builder()) + .build(); + } +} diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexAction.java index 00c81fdc9fbc6..2e3fd1b76ed32 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexAction.java @@ -41,10 +41,6 @@ public Request(StreamInput in) throws IOException { this.sourceIndex = in.readString(); } - public static Request readFrom(StreamInput in) throws IOException { - return new Request(in); - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java index 8863c45691c92..165fd61ae6599 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java @@ -10,10 +10,10 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; -import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; -import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequest; +import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockResponse; +import org.elasticsearch.action.admin.indices.readonly.TransportAddIndexBlockAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.IndicesOptions; @@ -22,9 +22,7 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.metadata.MetadataCreateIndexService; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.reindex.BulkByScrollResponse; @@ -37,28 +35,25 @@ import java.util.Locale; import java.util.Map; -import java.util.Set; + +import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.READ_ONLY; +import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.WRITE; public class ReindexDataStreamIndexTransportAction extends HandledTransportAction< ReindexDataStreamIndexAction.Request, ReindexDataStreamIndexAction.Response> { private static final Logger logger = LogManager.getLogger(ReindexDataStreamIndexTransportAction.class); - - private static final Set SETTINGS_TO_ADD_BACK = Set.of(IndexMetadata.SETTING_BLOCKS_WRITE, IndexMetadata.SETTING_READ_ONLY); - private static final IndicesOptions IGNORE_MISSING_OPTIONS = IndicesOptions.fromOptions(true, true, false, false); private final ClusterService clusterService; private final Client client; - private final IndexScopedSettings indexScopedSettings; @Inject public ReindexDataStreamIndexTransportAction( TransportService transportService, ClusterService clusterService, ActionFilters actionFilters, - Client client, - IndexScopedSettings indexScopedSettings + Client client ) { super( ReindexDataStreamIndexAction.NAME, @@ -70,7 +65,6 @@ public ReindexDataStreamIndexTransportAction( ); this.clusterService = clusterService; this.client = client; - this.indexScopedSettings = indexScopedSettings; } @Override @@ -96,20 +90,19 @@ protected void doExecute( SubscribableListener.newForked(l -> setBlockWrites(sourceIndexName, l)) .andThen(l -> deleteDestIfExists(destIndexName, l)) - .andThen(l -> createIndex(sourceIndex, destIndexName, l)) + .andThen(l -> createIndex(sourceIndex, destIndexName, l)) .andThen(l -> reindex(sourceIndexName, destIndexName, l)) - .andThen(l -> updateSettings(settingsBefore, destIndexName, l)) + .andThen(l -> addBlockIfFromSource(READ_ONLY, settingsBefore, destIndexName, l)) + .andThen(l -> addBlockIfFromSource(WRITE, settingsBefore, destIndexName, l)) .andThenApply(ignored -> new ReindexDataStreamIndexAction.Response(destIndexName)) .addListener(listener); } private void setBlockWrites(String sourceIndexName, ActionListener listener) { logger.debug("Setting write block on source index [{}]", sourceIndexName); - final Settings readOnlySettings = Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), true).build(); - var updateSettingsRequest = new UpdateSettingsRequest(readOnlySettings, sourceIndexName); - client.admin().indices().updateSettings(updateSettingsRequest, new ActionListener<>() { + addBlockToIndex(WRITE, sourceIndexName, new ActionListener<>() { @Override - public void onResponse(AcknowledgedResponse response) { + public void onResponse(AddIndexBlockResponse response) { if (response.isAcknowledged()) { listener.onResponse(null); } else { @@ -121,7 +114,7 @@ public void onResponse(AcknowledgedResponse response) { @Override public void onFailure(Exception e) { if (e instanceof ClusterBlockException || e.getCause() instanceof ClusterBlockException) { - // It's fine if read-only is already set + // It's fine if block-writes is already set listener.onResponse(null); } else { listener.onFailure(e); @@ -138,18 +131,23 @@ private void deleteDestIfExists(String destIndexName, ActionListener listener) { + private void createIndex(IndexMetadata sourceIndex, String destIndexName, ActionListener listener) { logger.debug("Creating destination index [{}] for source index [{}]", destIndexName, sourceIndex.getIndex().getName()); - // Create destination with subset of source index settings that can be added before reindex - var settings = getPreSettings(sourceIndex); - - var sourceMapping = sourceIndex.mapping(); - Map mapping = sourceMapping != null ? sourceMapping.rawSourceAsMap() : Map.of(); - var createIndexRequest = new CreateIndexRequest(destIndexName).settings(settings).mapping(mapping); - - var errorMessage = String.format(Locale.ROOT, "Could not create index [%s]", destIndexName); - client.admin().indices().create(createIndexRequest, failIfNotAcknowledged(listener, errorMessage)); + // override read-only settings if they exist + var removeReadOnlyOverride = Settings.builder() + .putNull(IndexMetadata.SETTING_READ_ONLY) + .putNull(IndexMetadata.SETTING_BLOCKS_WRITE) + .build(); + + var request = new CreateIndexFromSourceAction.Request( + sourceIndex.getIndex().getName(), + destIndexName, + removeReadOnlyOverride, + Map.of() + ); + var errorMessage = String.format(Locale.ROOT, "Could not create index [%s]", request.getDestIndex()); + client.execute(CreateIndexFromSourceAction.INSTANCE, request, failIfNotAcknowledged(listener, errorMessage)); } private void reindex(String sourceIndexName, String destIndexName, ActionListener listener) { @@ -162,35 +160,18 @@ private void reindex(String sourceIndexName, String destIndexName, ActionListene client.execute(ReindexAction.INSTANCE, reindexRequest, listener); } - private void updateSettings(Settings settingsBefore, String destIndexName, ActionListener listener) { - logger.debug("Adding settings from source index that could not be added before reindex"); - - Settings postSettings = getPostSettings(settingsBefore); - if (postSettings.isEmpty()) { + private void addBlockIfFromSource( + IndexMetadata.APIBlock block, + Settings settingsBefore, + String destIndexName, + ActionListener listener + ) { + if (settingsBefore.getAsBoolean(block.settingName(), false)) { + var errorMessage = String.format(Locale.ROOT, "Add [%s] block to index [%s] was not acknowledged", block.name(), destIndexName); + addBlockToIndex(block, destIndexName, failIfNotAcknowledged(listener, errorMessage)); + } else { listener.onResponse(null); - return; } - - var updateSettingsRequest = new UpdateSettingsRequest(postSettings, destIndexName); - var errorMessage = String.format(Locale.ROOT, "Could not update settings on index [%s]", destIndexName); - client.admin().indices().updateSettings(updateSettingsRequest, failIfNotAcknowledged(listener, errorMessage)); - } - - // Filter source index settings to subset of settings that can be included during reindex. - // Similar to the settings filtering done when reindexing for upgrade in Kibana - // https://github.com/elastic/kibana/blob/8a8363f02cc990732eb9cbb60cd388643a336bed/x-pack - // /plugins/upgrade_assistant/server/lib/reindexing/index_settings.ts#L155 - private Settings getPreSettings(IndexMetadata sourceIndex) { - // filter settings that will be added back later - var filtered = sourceIndex.getSettings().filter(settingName -> SETTINGS_TO_ADD_BACK.contains(settingName) == false); - - // filter private and non-copyable settings - var builder = MetadataCreateIndexService.copySettingsFromSource(false, filtered, indexScopedSettings, Settings.builder()); - return builder.build(); - } - - private Settings getPostSettings(Settings settingsBefore) { - return settingsBefore.filter(SETTINGS_TO_ADD_BACK::contains); } public static String generateDestIndexName(String sourceIndex) { @@ -201,11 +182,16 @@ private static ActionListener failIfNotAckno ActionListener listener, String errorMessage ) { - return listener.delegateFailureAndWrap((delegate, response) -> { + return listener.delegateFailure((delegate, response) -> { if (response.isAcknowledged()) { delegate.onResponse(null); + } else { + delegate.onFailure(new ElasticsearchException(errorMessage)); } - throw new ElasticsearchException(errorMessage); }); } + + private void addBlockToIndex(IndexMetadata.APIBlock block, String index, ActionListener listener) { + client.admin().indices().execute(TransportAddIndexBlockAction.TYPE, new AddIndexBlockRequest(block, index), listener); + } } diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index c07e4b2c541a2..1cb73de4646cc 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -641,6 +641,7 @@ public class Constants { new FeatureFlag("reindex_data_stream").isEnabled() ? "indices:admin/data_stream/index/reindex" : null, new FeatureFlag("reindex_data_stream").isEnabled() ? "indices:admin/data_stream/reindex" : null, new FeatureFlag("reindex_data_stream").isEnabled() ? "indices:admin/data_stream/reindex_cancel" : null, + new FeatureFlag("reindex_data_stream").isEnabled() ? "indices:admin/index/create_from_source" : null, "internal:admin/repository/verify", "internal:admin/repository/verify/coordinate" ).filter(Objects::nonNull).collect(Collectors.toUnmodifiableSet()); diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml index cbaac4e47fdd4..fdb6746bbeed8 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml @@ -1,7 +1,7 @@ --- setup: - requires: - test_runner_features: [capabilities] + test_runner_features: [capabilities, contains] capabilities: - method: POST path: /_query @@ -75,4 +75,4 @@ non-lookup index: catch: "bad_request" - match: { error.type: "verification_exception" } - - match: { error.reason: "Found 1 problem\nline 1:43: invalid [test] resolution in lookup mode to an index in [standard] mode" } + - contains: { error.reason: "Found 1 problem\nline 1:43: invalid [test] resolution in lookup mode to an index in [standard] mode" }