Skip to content

Commit

Permalink
Refactor the policy to be non null in managed index config
Browse files Browse the repository at this point in the history
Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>
  • Loading branch information
bowenlan-amzn committed Oct 3, 2023
1 parent 8b0cd6d commit e4a86e4
Show file tree
Hide file tree
Showing 15 changed files with 130 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.IndexMetadataService
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ISMIndexMetadata

class DefaultIndexMetadataService(val customUUIDSetting: String? = null) : IndexMetadataService {
class DefaultIndexMetadataService(private val customUUIDSetting: String? = null) : IndexMetadataService {

/**
* Returns the default index metadata needed for ISM
Expand All @@ -39,7 +39,7 @@ class DefaultIndexMetadataService(val customUUIDSetting: String? = null) : Index

response.state.metadata.indices.forEach {
// TODO waiting to add document count until it is definitely needed
val uuid = getCustomIndexUUID(it.value)
val uuid = getIndexUUID(it.value)
val indexMetadata = ISMIndexMetadata(uuid, it.value.creationDate, -1)
indexNameToMetadata[it.key] = indexMetadata
}
Expand All @@ -48,11 +48,10 @@ class DefaultIndexMetadataService(val customUUIDSetting: String? = null) : Index
}

/*
* If an extension wants Index Management to determine cluster state indices UUID based on a custom index setting if
* present of cluster state, the extension will override this customUUID setting. This allows an index to migrate off
* cluster and back while using this persistent uuid.
* This method prioritize the custom index setting provided by extension to decide the index UUID
* Custom index UUID is needed when index moved out of cluster and re-attach back, it will get a new UUID in cluster metadata
*/
fun getCustomIndexUUID(indexMetadata: IndexMetadata): String {
fun getIndexUUID(indexMetadata: IndexMetadata): String {
return if (customUUIDSetting != null) {
indexMetadata.settings.get(customUUIDSetting, indexMetadata.indexUUID)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ class ManagedIndexCoordinator(
// If there is a custom index uuid associated with the index, we do not auto manage it
// This is because cold index uses custom uuid, and we do not auto manage cold-to-warm index
val indexMetadata = clusterState.metadata.index(indexName)
val wasOffCluster = defaultIndexMetadataService.getCustomIndexUUID(indexMetadata) != indexMetadata.indexUUID
val wasOffCluster = defaultIndexMetadataService.getIndexUUID(indexMetadata) != indexMetadata.indexUUID
val ismIndexMetadata = ismIndicesMetadata[indexName]
// We try to find lookup name instead of using index name as datastream indices need the alias to match policy
val lookupName = findIndexLookupName(indexName, clusterState)
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ data class ManagedIndexConfig(
val policyID: String,
val policySeqNo: Long?,
val policyPrimaryTerm: Long?,
val policy: Policy?,
val policy: Policy,
val changePolicy: ChangePolicy?,
val jobJitter: Double?
) : ScheduledJobParameter {
Expand Down Expand Up @@ -177,11 +177,13 @@ data class ManagedIndexConfig(
policyID = requireNotNull(policyID) { "ManagedIndexConfig policy id is null" },
policySeqNo = policySeqNo,
policyPrimaryTerm = policyPrimaryTerm,
policy = policy?.copy(
id = policyID,
seqNo = policySeqNo ?: SequenceNumbers.UNASSIGNED_SEQ_NO,
primaryTerm = policyPrimaryTerm ?: SequenceNumbers.UNASSIGNED_PRIMARY_TERM
),
policy = requireNotNull(
policy?.copy(
id = policyID,
seqNo = policySeqNo ?: SequenceNumbers.UNASSIGNED_SEQ_NO,
primaryTerm = policyPrimaryTerm ?: SequenceNumbers.UNASSIGNED_PRIMARY_TERM
)
) { "ManagedIndexConfig policy is null" },

Check warning on line 186 in src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt

View check run for this annotation

Codecov / codecov/patch

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt#L186

Added line #L186 was not covered by tests
changePolicy = changePolicy,
jobJitter = jitter
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ fun getUuidsForClosedIndices(state: ClusterState, defaultIndexMetadataService: D
indexMetadatas.forEach {
// it.key is index name
if (it.value.state == IndexMetadata.State.CLOSE) {
closeList.add(defaultIndexMetadataService.getCustomIndexUUID(it.value))
closeList.add(defaultIndexMetadataService.getIndexUUID(it.value))
}
}
return closeList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,16 @@ package org.opensearch.indexmanagement.indexstatemanagement.settings

import org.opensearch.common.settings.Setting
import org.opensearch.common.unit.TimeValue
import org.opensearch.indexmanagement.indexstatemanagement.ISMActionsParser
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.DEFAULT_ISM_ENABLED
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.DEFAULT_JOB_INTERVAL
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.ALLOW_LIST_ALL
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.SNAPSHOT_DENY_LIST_NONE
import java.util.concurrent.TimeUnit
import java.util.function.Function

@Suppress("UtilityClassWithPublicConstructor")
class LegacyOpenDistroManagedIndexSettings {
companion object {
const val DEFAULT_ISM_ENABLED = true
const val DEFAULT_JOB_INTERVAL = 5
private val ALLOW_LIST_ALL = ISMActionsParser.instance.parsers.map { it.getActionType() }.toList()
val ALLOW_LIST_NONE = emptyList<String>()
val SNAPSHOT_DENY_LIST_NONE = emptyList<String>()

val INDEX_STATE_MANAGEMENT_ENABLED: Setting<Boolean> = Setting.boolSetting(
"opendistro.index_state_management.enabled",
DEFAULT_ISM_ENABLED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.indexmanagement.indexstatemanagement.settings
import org.opensearch.common.settings.Setting
import org.opensearch.common.unit.TimeValue
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
import org.opensearch.indexmanagement.indexstatemanagement.ISMActionsParser
import java.util.function.Function

@Suppress("UtilityClassWithPublicConstructor")
Expand All @@ -19,6 +20,7 @@ class ManagedIndexSettings {
const val DEFAULT_JITTER = 0.6
const val DEFAULT_RESTRICTED_PATTERN = "\\.opendistro_security|\\.kibana.*|\\$INDEX_MANAGEMENT_INDEX"
val ALLOW_LIST_NONE = emptyList<String>()
val ALLOW_LIST_ALL = ISMActionsParser.instance.parsers.map { it.getActionType() }.toList()
val SNAPSHOT_DENY_LIST_NONE = emptyList<String>()

val INDEX_STATE_MANAGEMENT_ENABLED: Setting<Boolean> = Setting.boolSetting(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ class TransportChangePolicyAction @Inject constructor(
val clusterState = response.state
val defaultIndexMetadataService = indexMetadataProvider.services[DEFAULT_INDEX_TYPE] as DefaultIndexMetadataService
clusterState.metadata.indices.forEach {
val indexUUID = defaultIndexMetadataService.getCustomIndexUUID(it.value)
val indexUUID = defaultIndexMetadataService.getIndexUUID(it.value)
indexUuidToIndexMetadata[indexUUID] = it.value
}
// ISMIndexMetadata from the default index metadata service uses lenient expand, we want to use strict expand, filter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,10 @@ class TransportExplainAction @Inject constructor(
"enabled" to managedIndex.enabled.toString()
)
if (showPolicy) {
managedIndex.policy?.let { appliedPolicies[managedIndex.index] = it }
managedIndex.policy.let { appliedPolicies[managedIndex.index] = it }
}
if (validateAction) {
managedIndex.policy?.let { policiesforValidation[managedIndex.index] = it }
managedIndex.policy.let { policiesforValidation[managedIndex.index] = it }

Check warning on line 205 in src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt

View check run for this annotation

Codecov / codecov/patch

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt#L205

Added line #L205 was not covered by tests
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class TransportRetryFailedManagedIndexAction @Inject constructor(
override fun onResponse(response: ClusterStateResponse) {
val defaultIndexMetadataService = indexMetadataProvider.services[DEFAULT_INDEX_TYPE] as DefaultIndexMetadataService
response.state.metadata.indices.forEach {
val indexUUID = defaultIndexMetadataService.getCustomIndexUUID(it.value)
val indexUUID = defaultIndexMetadataService.getIndexUUID(it.value)
indexUuidToIndexMetadata[indexUUID] = it.value
}
processResponse()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ fun managedIndexConfigIndexRequest(
uuid: String,
policyID: String,
jobInterval: Int,
policy: Policy? = null,
policy: Policy,
jobJitter: Double?
): IndexRequest {
val managedIndexConfig = ManagedIndexConfig(
Expand All @@ -74,8 +74,8 @@ fun managedIndexConfigIndexRequest(
jobEnabledTime = Instant.now(),
policyID = policyID,
policy = policy,
policySeqNo = policy?.seqNo,
policyPrimaryTerm = policy?.primaryTerm,
policySeqNo = policy.seqNo,
policyPrimaryTerm = policy.primaryTerm,
changePolicy = null,
jobJitter = jobJitter
)
Expand Down Expand Up @@ -400,25 +400,24 @@ fun ManagedIndexConfig.shouldChangePolicy(managedIndexMetaData: ManagedIndexMeta
return true
}

// we need this in so that we can change policy before the first transition happens so policy doesnt get completed
// we need this in so that we can change policy before the first transition happens so policy doesn't get completed
// before we have a chance to change policy
if (actionToExecute?.type == TransitionsAction.name) {
return true
}

if (managedIndexMetaData.actionMetaData?.name != TransitionsAction.name) {
return false
}

return true
// TODO actionToExecute is correlate to the actionMetadata?
// actionToExecute is found out by checking the metadata, it can be current unfinished one or the next
// actionMetadata has already been updated, it can be current unfinished one or the next
// In change policy context, we only accept unfinished transition or the new transition
return managedIndexMetaData.actionMetaData?.name == TransitionsAction.name
}

fun ManagedIndexMetaData.hasVersionConflict(managedIndexConfig: ManagedIndexConfig): Boolean =
fun ManagedIndexMetaData.hasDifferentPolicyVersion(managedIndexConfig: ManagedIndexConfig): Boolean =
this.policySeqNo != managedIndexConfig.policySeqNo || this.policyPrimaryTerm != managedIndexConfig.policyPrimaryTerm

fun ManagedIndexConfig.hasDifferentJobInterval(jobInterval: Int): Boolean {
val schedule = this.schedule
when (schedule) {
when (val schedule = this.schedule) {
is IntervalSchedule -> {
return schedule.interval != jobInterval
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ object RollupFieldValueExpressionResolver {
private lateinit var scriptService: ScriptService
private lateinit var clusterService: ClusterService
lateinit var indexAliasUtils: IndexAliasUtils

fun resolve(rollup: Rollup, fieldValue: String): String {
val script = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, fieldValue, mapOf())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,7 @@ fun randomManagedIndexConfig(
schedule: Schedule = IntervalSchedule(Instant.ofEpochMilli(Instant.now().toEpochMilli()), 5, ChronoUnit.MINUTES),
lastUpdatedTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS),
enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null,
policyID: String = OpenSearchRestTestCase.randomAlphaOfLength(10),
policy: Policy? = randomPolicy(),
policy: Policy = randomPolicy(),
changePolicy: ChangePolicy? = randomChangePolicy(),
jitter: Double? = 0.0
): ManagedIndexConfig {
Expand All @@ -332,10 +331,10 @@ fun randomManagedIndexConfig(
jobSchedule = schedule,
jobLastUpdatedTime = lastUpdatedTime,
jobEnabledTime = enabledTime,
policyID = policy?.id ?: policyID,
policySeqNo = policy?.seqNo,
policyPrimaryTerm = policy?.primaryTerm,
policy = policy?.copy(seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM),
policyID = policy.id,
policySeqNo = policy.seqNo,
policyPrimaryTerm = policy.primaryTerm,
policy = policy.copy(seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM),
changePolicy = changePolicy,
jobJitter = jitter
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,10 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() {
// Will use the unique generated description to ensure they are the same policies, the cached policy does not have
// id, seqNo, primaryTerm on the policy itself so cannot directly compare
// TODO: figure out why the newPolicy.lastUpdatedTime and cached policy lastUpdatedTime is off by a few milliseconds
assertEquals("Initialized policy is not the change policy", newPolicy.description, updatedManagedIndexConfig.policy?.description)
assertEquals(
"Initialized policy is not the change policy", newPolicy.description,
updatedManagedIndexConfig.policy.description
)
}

fun `test changing policy on a valid index and log pattern`() {
Expand Down Expand Up @@ -301,7 +304,7 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() {
updateManagedIndexConfigStartTime(managedIndexConfig)

// After first execution we should expect the change policy to still be null (since we haven't called it yet)
// and the initial policy should of been cached
// and the initial policy should have been cached
val executedManagedIndexConfig: ManagedIndexConfig = waitFor {
val config = getManagedIndexConfigByDocId(managedIndexConfig.id)
assertNotNull("Executed managed index config is null", config)
Expand Down Expand Up @@ -346,7 +349,6 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() {
// speed up to second execution we will have a ChangePolicy but not be in Transitions yet
// which means we should still execute the ReadOnlyAction
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor {
val config = getManagedIndexConfigByDocId(managedIndexConfig.id)
assertNotNull("Next managed index config is null", config)
Expand Down Expand Up @@ -386,7 +388,6 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() {

// speed up to third execution so that we try to move to transitions and trigger a change policy
updateManagedIndexConfigStartTime(managedIndexConfig)

val changedManagedIndexConfig: ManagedIndexConfig = waitFor {
val config = getManagedIndexConfigByDocId(managedIndexConfig.id)
assertNotNull("Changed managed index config is null", config)
Expand Down Expand Up @@ -512,9 +513,7 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() {
RestRequest.Method.POST.toString(),
"${RestChangePolicyAction.CHANGE_POLICY_BASE_URI}/$index", emptyMap(), changePolicy.toHttpEntity()
)

assertAffectedIndicesResponseIsEqual(mapOf(FAILURES to false, FAILED_INDICES to emptyList<Any>(), UPDATED_INDICES to 1), response.asMap())

waitFor { assertNotNull(getExistingManagedIndexConfig(index).changePolicy) }

// speed up to first execution where we initialize the policy on the job
Expand All @@ -529,7 +528,10 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() {
// Will use the unique generated description to ensure they are the same policies, the cached policy does not have
// id, seqNo, primaryTerm on the policy itself so cannot directly compare
// TODO: figure out why the newPolicy.lastUpdatedTime and cached policy lastUpdatedTime is off by a few milliseconds
assertEquals("Initialized policy is not the change policy", newPolicy.description, config.policy?.description)
assertEquals(
"Initialized policy is not the change policy", newPolicy.description,
config.policy.description
)
config
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.Transition
import org.opensearch.indexmanagement.indexstatemanagement.model.coordinator.SweptManagedIndexConfig
import org.opensearch.indexmanagement.indexstatemanagement.randomChangePolicy
import org.opensearch.indexmanagement.indexstatemanagement.randomClusterStateManagedIndexConfig
import org.opensearch.indexmanagement.indexstatemanagement.randomPolicy
import org.opensearch.indexmanagement.indexstatemanagement.randomSweptManagedIndexConfig
import org.opensearch.indexmanagement.opensearchapi.parseWithType
import org.opensearch.test.OpenSearchTestCase
Expand All @@ -34,7 +35,7 @@ class ManagedIndexUtilsTests : OpenSearchTestCase() {
val index = randomAlphaOfLength(10)
val uuid = randomAlphaOfLength(10)
val policyID = randomAlphaOfLength(10)
val createRequest = managedIndexConfigIndexRequest(index, uuid, policyID, 5, jobJitter = 0.0)
val createRequest = managedIndexConfigIndexRequest(index, uuid, policyID, 5, randomPolicy(), jobJitter = 0.0)

assertNotNull("IndexRequest not created", createRequest)
assertEquals("Incorrect ism index used in request", INDEX_MANAGEMENT_INDEX, createRequest.index())
Expand Down

0 comments on commit e4a86e4

Please sign in to comment.