From c1ce72002d9e8d4f1bc867b12efadab049ddf4f6 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Wed, 6 Dec 2023 11:11:36 -0600 Subject: [PATCH] fixing some ml tests --- .../DataFrameAnalysisCustomFeatureIT.java | 93 ++-- .../ml/integration/DeleteExpiredDataIT.java | 422 +++++++++--------- .../xpack/ml/integration/ModelPlotsIT.java | 21 +- 3 files changed, 273 insertions(+), 263 deletions(-) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DataFrameAnalysisCustomFeatureIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DataFrameAnalysisCustomFeatureIT.java index da29fcdcd27c4..579049788c602 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DataFrameAnalysisCustomFeatureIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DataFrameAnalysisCustomFeatureIT.java @@ -227,56 +227,57 @@ private static void createIndex(String index, boolean isDatastream) { } private static void indexData(String sourceIndex, int numTrainingRows, int numNonTrainingRows, String dependentVariable) { - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - for (int i = 0; i < numTrainingRows; i++) { - List source = List.of( - "@timestamp", - "2020-12-12", - BOOLEAN_FIELD, - BOOLEAN_FIELD_VALUES.get(i % BOOLEAN_FIELD_VALUES.size()), - NUMERICAL_FIELD, - NUMERICAL_FIELD_VALUES.get(i % NUMERICAL_FIELD_VALUES.size()), - DISCRETE_NUMERICAL_FIELD, - DISCRETE_NUMERICAL_FIELD_VALUES.get(i % DISCRETE_NUMERICAL_FIELD_VALUES.size()), - TEXT_FIELD, - KEYWORD_FIELD_VALUES.get(i % KEYWORD_FIELD_VALUES.size()), - KEYWORD_FIELD, - KEYWORD_FIELD_VALUES.get(i % KEYWORD_FIELD_VALUES.size()), - NESTED_FIELD, - KEYWORD_FIELD_VALUES.get(i % KEYWORD_FIELD_VALUES.size()) - ); - IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source.toArray()).opType(DocWriteRequest.OpType.CREATE); - bulkRequestBuilder.add(indexRequest); - } - for (int i = numTrainingRows; i < numTrainingRows + numNonTrainingRows; i++) { - List source = new ArrayList<>(); - if (BOOLEAN_FIELD.equals(dependentVariable) == false) { - source.addAll(List.of(BOOLEAN_FIELD, BOOLEAN_FIELD_VALUES.get(i % BOOLEAN_FIELD_VALUES.size()))); - } - if (NUMERICAL_FIELD.equals(dependentVariable) == false) { - source.addAll(List.of(NUMERICAL_FIELD, NUMERICAL_FIELD_VALUES.get(i % NUMERICAL_FIELD_VALUES.size()))); - } - if (DISCRETE_NUMERICAL_FIELD.equals(dependentVariable) == false) { - source.addAll( - List.of(DISCRETE_NUMERICAL_FIELD, DISCRETE_NUMERICAL_FIELD_VALUES.get(i % DISCRETE_NUMERICAL_FIELD_VALUES.size())) + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)) { + for (int i = 0; i < numTrainingRows; i++) { + List source = List.of( + "@timestamp", + "2020-12-12", + BOOLEAN_FIELD, + BOOLEAN_FIELD_VALUES.get(i % BOOLEAN_FIELD_VALUES.size()), + NUMERICAL_FIELD, + NUMERICAL_FIELD_VALUES.get(i % NUMERICAL_FIELD_VALUES.size()), + DISCRETE_NUMERICAL_FIELD, + DISCRETE_NUMERICAL_FIELD_VALUES.get(i % DISCRETE_NUMERICAL_FIELD_VALUES.size()), + TEXT_FIELD, + KEYWORD_FIELD_VALUES.get(i % KEYWORD_FIELD_VALUES.size()), + KEYWORD_FIELD, + KEYWORD_FIELD_VALUES.get(i % KEYWORD_FIELD_VALUES.size()), + NESTED_FIELD, + KEYWORD_FIELD_VALUES.get(i % KEYWORD_FIELD_VALUES.size()) ); + IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source.toArray()).opType(DocWriteRequest.OpType.CREATE); + bulkRequestBuilder.add(indexRequest); } - if (TEXT_FIELD.equals(dependentVariable) == false) { - source.addAll(List.of(TEXT_FIELD, KEYWORD_FIELD_VALUES.get(i % KEYWORD_FIELD_VALUES.size()))); - } - if (KEYWORD_FIELD.equals(dependentVariable) == false) { - source.addAll(List.of(KEYWORD_FIELD, KEYWORD_FIELD_VALUES.get(i % KEYWORD_FIELD_VALUES.size()))); + for (int i = numTrainingRows; i < numTrainingRows + numNonTrainingRows; i++) { + List source = new ArrayList<>(); + if (BOOLEAN_FIELD.equals(dependentVariable) == false) { + source.addAll(List.of(BOOLEAN_FIELD, BOOLEAN_FIELD_VALUES.get(i % BOOLEAN_FIELD_VALUES.size()))); + } + if (NUMERICAL_FIELD.equals(dependentVariable) == false) { + source.addAll(List.of(NUMERICAL_FIELD, NUMERICAL_FIELD_VALUES.get(i % NUMERICAL_FIELD_VALUES.size()))); + } + if (DISCRETE_NUMERICAL_FIELD.equals(dependentVariable) == false) { + source.addAll( + List.of(DISCRETE_NUMERICAL_FIELD, DISCRETE_NUMERICAL_FIELD_VALUES.get(i % DISCRETE_NUMERICAL_FIELD_VALUES.size())) + ); + } + if (TEXT_FIELD.equals(dependentVariable) == false) { + source.addAll(List.of(TEXT_FIELD, KEYWORD_FIELD_VALUES.get(i % KEYWORD_FIELD_VALUES.size()))); + } + if (KEYWORD_FIELD.equals(dependentVariable) == false) { + source.addAll(List.of(KEYWORD_FIELD, KEYWORD_FIELD_VALUES.get(i % KEYWORD_FIELD_VALUES.size()))); + } + if (NESTED_FIELD.equals(dependentVariable) == false) { + source.addAll(List.of(NESTED_FIELD, KEYWORD_FIELD_VALUES.get(i % KEYWORD_FIELD_VALUES.size()))); + } + source.addAll(List.of("@timestamp", "2020-12-12")); + IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source.toArray()).opType(DocWriteRequest.OpType.CREATE); + bulkRequestBuilder.add(indexRequest); } - if (NESTED_FIELD.equals(dependentVariable) == false) { - source.addAll(List.of(NESTED_FIELD, KEYWORD_FIELD_VALUES.get(i % KEYWORD_FIELD_VALUES.size()))); + BulkResponse bulkResponse = bulkRequestBuilder.get(); + if (bulkResponse.hasFailures()) { + fail("Failed to index data: " + bulkResponse.buildFailureMessage()); } - source.addAll(List.of("@timestamp", "2020-12-12")); - IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source.toArray()).opType(DocWriteRequest.OpType.CREATE); - bulkRequestBuilder.add(indexRequest); - } - BulkResponse bulkResponse = bulkRequestBuilder.get(); - if (bulkResponse.hasFailures()) { - fail("Failed to index data: " + bulkResponse.buildFailureMessage()); } } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java index cf73b5a4a7544..2a695f5b60b06 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java @@ -78,19 +78,20 @@ public void setUpData() { int normalRate = 10; int anomalousRate = 100; int anomalousBucket = 30; - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); - for (int bucket = 0; bucket < totalBuckets; bucket++) { - long timestamp = latestBucketTime - TimeValue.timeValueHours(totalBuckets - bucket).getMillis(); - int bucketRate = bucket == anomalousBucket ? anomalousRate : normalRate; - for (int point = 0; point < bucketRate; point++) { - IndexRequest indexRequest = new IndexRequest(DATA_INDEX); - indexRequest.source("time", timestamp); - bulkRequestBuilder.add(indexRequest); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) { + for (int bucket = 0; bucket < totalBuckets; bucket++) { + long timestamp = latestBucketTime - TimeValue.timeValueHours(totalBuckets - bucket).getMillis(); + int bucketRate = bucket == anomalousBucket ? anomalousRate : normalRate; + for (int point = 0; point < bucketRate; point++) { + IndexRequest indexRequest = new IndexRequest(DATA_INDEX); + indexRequest.source("time", timestamp); + bulkRequestBuilder.add(indexRequest); + } } - } - BulkResponse bulkResponse = bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); - assertThat(bulkResponse.hasFailures(), is(false)); + BulkResponse bulkResponse = bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + assertThat(bulkResponse.hasFailures(), is(false)); + } } @After @@ -159,212 +160,219 @@ public void testDeleteExpiredDataWithStandardThrottle() throws Exception { private void testExpiredDeletion(Float customThrottle, int numUnusedState) throws Exception { // Index some unused state documents (more than 10K to test scrolling works) String mlStateIndexName = AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001"; - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - for (int i = 0; i < numUnusedState; i++) { - String docId = "non_existing_job_" + randomFrom("model_state_1234567#" + i, "quantiles", "categorizer_state#" + i); - IndexRequest indexRequest = new IndexRequest(mlStateIndexName).id(docId).source(Collections.emptyMap()); - bulkRequestBuilder.add(indexRequest); - } - ActionFuture indexUnusedStateDocsResponse = bulkRequestBuilder.execute(); - List jobs = new ArrayList<>(); - - // These jobs don't thin out model state; ModelSnapshotRetentionIT tests that - jobs.add( - newJobBuilder("no-retention").setResultsRetentionDays(null) - .setModelSnapshotRetentionDays(1000L) - .setDailyModelSnapshotRetentionAfterDays(1000L) - ); - jobs.add( - newJobBuilder("results-retention").setResultsRetentionDays(1L) - .setModelSnapshotRetentionDays(1000L) - .setDailyModelSnapshotRetentionAfterDays(1000L) - ); - jobs.add( - newJobBuilder("snapshots-retention").setResultsRetentionDays(null) - .setModelSnapshotRetentionDays(2L) - .setDailyModelSnapshotRetentionAfterDays(2L) - ); - jobs.add( - newJobBuilder("snapshots-retention-with-retain").setResultsRetentionDays(null) - .setModelSnapshotRetentionDays(2L) - .setDailyModelSnapshotRetentionAfterDays(2L) - ); - jobs.add( - newJobBuilder("results-and-snapshots-retention").setResultsRetentionDays(1L) - .setModelSnapshotRetentionDays(2L) - .setDailyModelSnapshotRetentionAfterDays(2L) - ); - - List shortExpiryForecastIds = new ArrayList<>(); - - long now = System.currentTimeMillis(); - long oneDayAgo = now - TimeValue.timeValueHours(48).getMillis() - 1; - - // Start all jobs - for (Job.Builder job : jobs) { - putJob(job); - - String datafeedId = job.getId() + "-feed"; - DatafeedConfig.Builder datafeedConfig = new DatafeedConfig.Builder(datafeedId, job.getId()); - datafeedConfig.setIndices(Collections.singletonList(DATA_INDEX)); - DatafeedConfig datafeed = datafeedConfig.build(); - - putDatafeed(datafeed); - - // Run up to a day ago - openJob(job.getId()); - startDatafeed(datafeedId, 0, now - TimeValue.timeValueHours(24).getMillis()); - } - - // Now let's wait for all jobs to be closed - for (Job.Builder job : jobs) { - waitUntilJobIsClosed(job.getId()); - } + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)) { + for (int i = 0; i < numUnusedState; i++) { + String docId = "non_existing_job_" + randomFrom("model_state_1234567#" + i, "quantiles", "categorizer_state#" + i); + IndexRequest indexRequest = new IndexRequest(mlStateIndexName).id(docId).source(Collections.emptyMap()); + bulkRequestBuilder.add(indexRequest); + } + ActionFuture indexUnusedStateDocsResponse = bulkRequestBuilder.execute(); + List jobs = new ArrayList<>(); + + // These jobs don't thin out model state; ModelSnapshotRetentionIT tests that + jobs.add( + newJobBuilder("no-retention").setResultsRetentionDays(null) + .setModelSnapshotRetentionDays(1000L) + .setDailyModelSnapshotRetentionAfterDays(1000L) + ); + jobs.add( + newJobBuilder("results-retention").setResultsRetentionDays(1L) + .setModelSnapshotRetentionDays(1000L) + .setDailyModelSnapshotRetentionAfterDays(1000L) + ); + jobs.add( + newJobBuilder("snapshots-retention").setResultsRetentionDays(null) + .setModelSnapshotRetentionDays(2L) + .setDailyModelSnapshotRetentionAfterDays(2L) + ); + jobs.add( + newJobBuilder("snapshots-retention-with-retain").setResultsRetentionDays(null) + .setModelSnapshotRetentionDays(2L) + .setDailyModelSnapshotRetentionAfterDays(2L) + ); + jobs.add( + newJobBuilder("results-and-snapshots-retention").setResultsRetentionDays(1L) + .setModelSnapshotRetentionDays(2L) + .setDailyModelSnapshotRetentionAfterDays(2L) + ); + + List shortExpiryForecastIds = new ArrayList<>(); + + long now = System.currentTimeMillis(); + long oneDayAgo = now - TimeValue.timeValueHours(48).getMillis() - 1; + + // Start all jobs + for (Job.Builder job : jobs) { + putJob(job); + + String datafeedId = job.getId() + "-feed"; + DatafeedConfig.Builder datafeedConfig = new DatafeedConfig.Builder(datafeedId, job.getId()); + datafeedConfig.setIndices(Collections.singletonList(DATA_INDEX)); + DatafeedConfig datafeed = datafeedConfig.build(); + + putDatafeed(datafeed); + + // Run up to a day ago + openJob(job.getId()); + startDatafeed(datafeedId, 0, now - TimeValue.timeValueHours(24).getMillis()); + } - for (Job.Builder job : jobs) { - assertThat(getBuckets(job.getId()).size(), is(greaterThanOrEqualTo(47))); - assertThat(getRecords(job.getId()).size(), equalTo(2)); - List modelSnapshots = getModelSnapshots(job.getId()); - assertThat(modelSnapshots.size(), equalTo(1)); - String snapshotDocId = ModelSnapshot.documentId(modelSnapshots.get(0)); - - // Update snapshot timestamp to force it out of snapshot retention window - String snapshotUpdate = "{ \"timestamp\": " + oneDayAgo + "}"; - UpdateRequest updateSnapshotRequest = new UpdateRequest(".ml-anomalies-" + job.getId(), snapshotDocId); - updateSnapshotRequest.doc(snapshotUpdate.getBytes(StandardCharsets.UTF_8), XContentType.JSON); - client().execute(UpdateAction.INSTANCE, updateSnapshotRequest).get(); - - // Now let's create some forecasts - openJob(job.getId()); - - // We must set a very small value for expires_in to keep this testable as the deletion cutoff point is the moment - // the DeleteExpiredDataAction is called. - String forecastShortExpiryId = forecast(job.getId(), TimeValue.timeValueHours(1), TimeValue.timeValueSeconds(1)); - shortExpiryForecastIds.add(forecastShortExpiryId); - String forecastDefaultExpiryId = forecast(job.getId(), TimeValue.timeValueHours(1), null); - String forecastNoExpiryId = forecast(job.getId(), TimeValue.timeValueHours(1), TimeValue.ZERO); - waitForecastToFinish(job.getId(), forecastShortExpiryId); - waitForecastToFinish(job.getId(), forecastDefaultExpiryId); - waitForecastToFinish(job.getId(), forecastNoExpiryId); - } + // Now let's wait for all jobs to be closed + for (Job.Builder job : jobs) { + waitUntilJobIsClosed(job.getId()); + } - // Refresh to ensure the snapshot timestamp updates are visible - refresh("*"); + for (Job.Builder job : jobs) { + assertThat(getBuckets(job.getId()).size(), is(greaterThanOrEqualTo(47))); + assertThat(getRecords(job.getId()).size(), equalTo(2)); + List modelSnapshots = getModelSnapshots(job.getId()); + assertThat(modelSnapshots.size(), equalTo(1)); + String snapshotDocId = ModelSnapshot.documentId(modelSnapshots.get(0)); + + // Update snapshot timestamp to force it out of snapshot retention window + String snapshotUpdate = "{ \"timestamp\": " + oneDayAgo + "}"; + UpdateRequest updateSnapshotRequest = new UpdateRequest(".ml-anomalies-" + job.getId(), snapshotDocId); + updateSnapshotRequest.doc(snapshotUpdate.getBytes(StandardCharsets.UTF_8), XContentType.JSON); + client().execute(UpdateAction.INSTANCE, updateSnapshotRequest).get(); + + // Now let's create some forecasts + openJob(job.getId()); + + // We must set a very small value for expires_in to keep this testable as the deletion cutoff point is the moment + // the DeleteExpiredDataAction is called. + String forecastShortExpiryId = forecast(job.getId(), TimeValue.timeValueHours(1), TimeValue.timeValueSeconds(1)); + shortExpiryForecastIds.add(forecastShortExpiryId); + String forecastDefaultExpiryId = forecast(job.getId(), TimeValue.timeValueHours(1), null); + String forecastNoExpiryId = forecast(job.getId(), TimeValue.timeValueHours(1), TimeValue.ZERO); + waitForecastToFinish(job.getId(), forecastShortExpiryId); + waitForecastToFinish(job.getId(), forecastDefaultExpiryId); + waitForecastToFinish(job.getId(), forecastNoExpiryId); + } - // We need to wait for the clock to tick to a new second to ensure the second time - // around model snapshots will have a different ID (it depends on epoch seconds) - long before = System.currentTimeMillis() / 1000; - assertBusy(() -> assertNotEquals(before, System.currentTimeMillis() / 1000), 1, TimeUnit.SECONDS); - - for (Job.Builder job : jobs) { - // Run up to now - startDatafeed(job.getId() + "-feed", 0, now); - waitUntilJobIsClosed(job.getId()); - assertThat(getBuckets(job.getId()).size(), is(greaterThanOrEqualTo(70))); - assertThat(getRecords(job.getId()).size(), equalTo(2)); - List modelSnapshots = getModelSnapshots(job.getId()); - assertThat(modelSnapshots.size(), equalTo(2)); - } + // Refresh to ensure the snapshot timestamp updates are visible + refresh("*"); + + // We need to wait for the clock to tick to a new second to ensure the second time + // around model snapshots will have a different ID (it depends on epoch seconds) + long before = System.currentTimeMillis() / 1000; + assertBusy(() -> assertNotEquals(before, System.currentTimeMillis() / 1000), 1, TimeUnit.SECONDS); + + for (Job.Builder job : jobs) { + // Run up to now + startDatafeed(job.getId() + "-feed", 0, now); + waitUntilJobIsClosed(job.getId()); + assertThat(getBuckets(job.getId()).size(), is(greaterThanOrEqualTo(70))); + assertThat(getRecords(job.getId()).size(), equalTo(2)); + List modelSnapshots = getModelSnapshots(job.getId()); + assertThat(modelSnapshots.size(), equalTo(2)); + } - retainAllSnapshots("snapshots-retention-with-retain"); - - long totalModelSizeStatsBeforeDelete = prepareSearch("*").setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN) - .setQuery(QueryBuilders.termQuery("result_type", "model_size_stats")) - .get() - .getHits() - .getTotalHits().value; - long totalNotificationsCountBeforeDelete = prepareSearch(NotificationsIndex.NOTIFICATIONS_INDEX).get() - .getHits() - .getTotalHits().value; - assertThat(totalModelSizeStatsBeforeDelete, greaterThan(0L)); - assertThat(totalNotificationsCountBeforeDelete, greaterThan(0L)); - - // Verify forecasts were created - List forecastStats = getForecastStats(); - assertThat(forecastStats.size(), equalTo(jobs.size() * 3)); - for (ForecastRequestStats forecastStat : forecastStats) { - assertThat(countForecastDocs(forecastStat.getJobId(), forecastStat.getForecastId()), equalTo(forecastStat.getRecordCount())); - } + retainAllSnapshots("snapshots-retention-with-retain"); + + long totalModelSizeStatsBeforeDelete = prepareSearch("*").setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN) + .setQuery(QueryBuilders.termQuery("result_type", "model_size_stats")) + .get() + .getHits() + .getTotalHits().value; + long totalNotificationsCountBeforeDelete = prepareSearch(NotificationsIndex.NOTIFICATIONS_INDEX).get() + .getHits() + .getTotalHits().value; + assertThat(totalModelSizeStatsBeforeDelete, greaterThan(0L)); + assertThat(totalNotificationsCountBeforeDelete, greaterThan(0L)); + + // Verify forecasts were created + List forecastStats = getForecastStats(); + assertThat(forecastStats.size(), equalTo(jobs.size() * 3)); + for (ForecastRequestStats forecastStat : forecastStats) { + assertThat( + countForecastDocs(forecastStat.getJobId(), forecastStat.getForecastId()), + equalTo(forecastStat.getRecordCount()) + ); + } - // Before we call the delete-expired-data action we need to make sure the unused state docs were indexed - assertFalse(indexUnusedStateDocsResponse.get().hasFailures()); - - // Now call the action under test - assertThat(deleteExpiredData(customThrottle).isDeleted(), is(true)); - - // no-retention job should have kept all data - assertThat(getBuckets("no-retention").size(), is(greaterThanOrEqualTo(70))); - assertThat(getRecords("no-retention").size(), equalTo(2)); - assertThat(getModelSnapshots("no-retention").size(), equalTo(2)); - - List buckets = getBuckets("results-retention"); - assertThat(buckets.size(), is(lessThanOrEqualTo(25))); - assertThat(buckets.size(), is(greaterThanOrEqualTo(22))); - assertThat(buckets.get(0).getTimestamp().getTime(), greaterThanOrEqualTo(oneDayAgo)); - assertThat(getRecords("results-retention").size(), equalTo(0)); - assertThat(getModelSnapshots("results-retention").size(), equalTo(2)); - - assertThat(getBuckets("snapshots-retention").size(), is(greaterThanOrEqualTo(70))); - assertThat(getRecords("snapshots-retention").size(), equalTo(2)); - assertThat(getModelSnapshots("snapshots-retention").size(), equalTo(1)); - - assertThat(getBuckets("snapshots-retention-with-retain").size(), is(greaterThanOrEqualTo(70))); - assertThat(getRecords("snapshots-retention-with-retain").size(), equalTo(2)); - assertThat(getModelSnapshots("snapshots-retention-with-retain").size(), equalTo(2)); - - buckets = getBuckets("results-and-snapshots-retention"); - assertThat(buckets.size(), is(lessThanOrEqualTo(25))); - assertThat(buckets.size(), is(greaterThanOrEqualTo(22))); - assertThat(buckets.get(0).getTimestamp().getTime(), greaterThanOrEqualTo(oneDayAgo)); - assertThat(getRecords("results-and-snapshots-retention").size(), equalTo(0)); - assertThat(getModelSnapshots("results-and-snapshots-retention").size(), equalTo(1)); - - long totalModelSizeStatsAfterDelete = prepareSearch("*").setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN) - .setQuery(QueryBuilders.termQuery("result_type", "model_size_stats")) - .get() - .getHits() - .getTotalHits().value; - long totalNotificationsCountAfterDelete = prepareSearch(NotificationsIndex.NOTIFICATIONS_INDEX).get() - .getHits() - .getTotalHits().value; - assertThat(totalModelSizeStatsAfterDelete, equalTo(totalModelSizeStatsBeforeDelete)); - assertThat(totalNotificationsCountAfterDelete, greaterThanOrEqualTo(totalNotificationsCountBeforeDelete)); - - // Verify short expiry forecasts were deleted only - forecastStats = getForecastStats(); - assertThat(forecastStats.size(), equalTo(jobs.size() * 2)); - for (ForecastRequestStats forecastStat : forecastStats) { - assertThat(countForecastDocs(forecastStat.getJobId(), forecastStat.getForecastId()), equalTo(forecastStat.getRecordCount())); - } - for (Job.Builder job : jobs) { - for (String forecastId : shortExpiryForecastIds) { - assertThat(countForecastDocs(job.getId(), forecastId), equalTo(0L)); + // Before we call the delete-expired-data action we need to make sure the unused state docs were indexed + assertFalse(indexUnusedStateDocsResponse.get().hasFailures()); + + // Now call the action under test + assertThat(deleteExpiredData(customThrottle).isDeleted(), is(true)); + + // no-retention job should have kept all data + assertThat(getBuckets("no-retention").size(), is(greaterThanOrEqualTo(70))); + assertThat(getRecords("no-retention").size(), equalTo(2)); + assertThat(getModelSnapshots("no-retention").size(), equalTo(2)); + + List buckets = getBuckets("results-retention"); + assertThat(buckets.size(), is(lessThanOrEqualTo(25))); + assertThat(buckets.size(), is(greaterThanOrEqualTo(22))); + assertThat(buckets.get(0).getTimestamp().getTime(), greaterThanOrEqualTo(oneDayAgo)); + assertThat(getRecords("results-retention").size(), equalTo(0)); + assertThat(getModelSnapshots("results-retention").size(), equalTo(2)); + + assertThat(getBuckets("snapshots-retention").size(), is(greaterThanOrEqualTo(70))); + assertThat(getRecords("snapshots-retention").size(), equalTo(2)); + assertThat(getModelSnapshots("snapshots-retention").size(), equalTo(1)); + + assertThat(getBuckets("snapshots-retention-with-retain").size(), is(greaterThanOrEqualTo(70))); + assertThat(getRecords("snapshots-retention-with-retain").size(), equalTo(2)); + assertThat(getModelSnapshots("snapshots-retention-with-retain").size(), equalTo(2)); + + buckets = getBuckets("results-and-snapshots-retention"); + assertThat(buckets.size(), is(lessThanOrEqualTo(25))); + assertThat(buckets.size(), is(greaterThanOrEqualTo(22))); + assertThat(buckets.get(0).getTimestamp().getTime(), greaterThanOrEqualTo(oneDayAgo)); + assertThat(getRecords("results-and-snapshots-retention").size(), equalTo(0)); + assertThat(getModelSnapshots("results-and-snapshots-retention").size(), equalTo(1)); + + long totalModelSizeStatsAfterDelete = prepareSearch("*").setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN) + .setQuery(QueryBuilders.termQuery("result_type", "model_size_stats")) + .get() + .getHits() + .getTotalHits().value; + long totalNotificationsCountAfterDelete = prepareSearch(NotificationsIndex.NOTIFICATIONS_INDEX).get() + .getHits() + .getTotalHits().value; + assertThat(totalModelSizeStatsAfterDelete, equalTo(totalModelSizeStatsBeforeDelete)); + assertThat(totalNotificationsCountAfterDelete, greaterThanOrEqualTo(totalNotificationsCountBeforeDelete)); + + // Verify short expiry forecasts were deleted only + forecastStats = getForecastStats(); + assertThat(forecastStats.size(), equalTo(jobs.size() * 2)); + for (ForecastRequestStats forecastStat : forecastStats) { + assertThat( + countForecastDocs(forecastStat.getJobId(), forecastStat.getForecastId()), + equalTo(forecastStat.getRecordCount()) + ); + } + for (Job.Builder job : jobs) { + for (String forecastId : shortExpiryForecastIds) { + assertThat(countForecastDocs(job.getId(), forecastId), equalTo(0L)); + } } - } - // Verify .ml-state doesn't contain unused state documents - assertResponse( - prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern()).setFetchSource(false).setTrackTotalHits(true).setSize(10000), - stateDocsResponse -> { - assertThat(stateDocsResponse.getHits().getTotalHits().value, greaterThanOrEqualTo(5L)); - - int nonExistingJobDocsCount = 0; - List nonExistingJobExampleIds = new ArrayList<>(); - for (SearchHit hit : stateDocsResponse.getHits().getHits()) { - if (hit.getId().startsWith("non_existing_job")) { - nonExistingJobDocsCount++; - if (nonExistingJobExampleIds.size() < 10) { - nonExistingJobExampleIds.add(hit.getId()); + // Verify .ml-state doesn't contain unused state documents + assertResponse( + prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern()).setFetchSource(false).setTrackTotalHits(true).setSize(10000), + stateDocsResponse -> { + assertThat(stateDocsResponse.getHits().getTotalHits().value, greaterThanOrEqualTo(5L)); + + int nonExistingJobDocsCount = 0; + List nonExistingJobExampleIds = new ArrayList<>(); + for (SearchHit hit : stateDocsResponse.getHits().getHits()) { + if (hit.getId().startsWith("non_existing_job")) { + nonExistingJobDocsCount++; + if (nonExistingJobExampleIds.size() < 10) { + nonExistingJobExampleIds.add(hit.getId()); + } } } + assertThat( + "Documents for non_existing_job are still around; examples: " + nonExistingJobExampleIds, + nonExistingJobDocsCount, + equalTo(0) + ); } - assertThat( - "Documents for non_existing_job are still around; examples: " + nonExistingJobExampleIds, - nonExistingJobDocsCount, - equalTo(0) - ); - } - ); + ); + } } public void testDeleteExpiresDataDeletesAnnotations() throws Exception { diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ModelPlotsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ModelPlotsIT.java index 0ce746bc0595d..db0e3de8f16c1 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ModelPlotsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ModelPlotsIT.java @@ -47,18 +47,19 @@ public void setUpData() { // We are going to create data for last day long nowMillis = System.currentTimeMillis(); int totalBuckets = 24; - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); - for (int bucket = 0; bucket < totalBuckets; bucket++) { - long timestamp = nowMillis - TimeValue.timeValueHours(totalBuckets - bucket).getMillis(); - for (String user : users) { - IndexRequest indexRequest = new IndexRequest(DATA_INDEX); - indexRequest.source("time", timestamp, "user", user); - bulkRequestBuilder.add(indexRequest); + try (BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()) { + for (int bucket = 0; bucket < totalBuckets; bucket++) { + long timestamp = nowMillis - TimeValue.timeValueHours(totalBuckets - bucket).getMillis(); + for (String user : users) { + IndexRequest indexRequest = new IndexRequest(DATA_INDEX); + indexRequest.source("time", timestamp, "user", user); + bulkRequestBuilder.add(indexRequest); + } } - } - BulkResponse bulkResponse = bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); - assertThat(bulkResponse.hasFailures(), is(false)); + BulkResponse bulkResponse = bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + assertThat(bulkResponse.hasFailures(), is(false)); + } } @After