Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
f58acd6
Replace all instances of ThreadContext.stashContext
cwperks Jan 29, 2025
b49179a
Start fixing tests
cwperks Jan 29, 2025
78fe182
Fix all of TransformSecurityBehaviorIT
cwperks Jan 29, 2025
7e6da93
Fix more test suites
cwperks Jan 29, 2025
01aa5ea
Fix index lron
cwperks Jan 31, 2025
c68eda9
Fix more security tests
cwperks Jan 31, 2025
92187f7
Update a few more instances
cwperks Feb 2, 2025
4f7b63d
Fix failing tests
cwperks Feb 3, 2025
0cfa6c5
Use clusterService
cwperks Feb 3, 2025
a8caa19
Remove other usages of cluster state request
cwperks Feb 3, 2025
95a4ce7
Replace more instances with pluginClient
cwperks Feb 4, 2025
9eb07e3
Fix more tests
cwperks Feb 4, 2025
dda1da0
Run ktlint
cwperks Feb 4, 2025
519169b
Fix detekt errors
cwperks Feb 4, 2025
6f12863
Merge branch 'main' into remove-stash-context
cwperks Feb 5, 2025
61416cd
Add Permissive warning handler
cwperks Feb 5, 2025
1b48d1f
Remove comment
cwperks Feb 5, 2025
ce86426
Increase timeout for flaky test
cwperks Feb 5, 2025
3b60feb
Merge branch 'main' into remove-stash-context
cwperks Feb 27, 2025
35f6e4c
Adapt to breaking changes
cwperks Feb 27, 2025
8c0ec32
Merge branch 'main' into remove-stash-context
cwperks Jun 25, 2025
638f4e7
Rename to PluginClient
cwperks Jun 25, 2025
004be98
Remove additional instances of cluster state request
cwperks Jun 25, 2025
d677700
reinstate
cwperks Jun 25, 2025
f5993ed
Merge branch 'main' into remove-stash-context
cwperks Jun 25, 2025
9908719
Fix some tests
cwperks Jun 25, 2025
82a3bbb
Fix Rollover tests
cwperks Jun 25, 2025
1267a54
Add OpenForTest annotation
cwperks Jun 26, 2025
c9f7ee6
Allow 404
cwperks Jun 26, 2025
861bf96
More instances of IMMEDIATE refreshPolicy
cwperks Jul 1, 2025
f51e62b
Improve assertion
cwperks Jul 2, 2025
77e0fd7
Increase timeout
cwperks Jul 2, 2025
976e69d
Allow retry up to 5x
cwperks Jul 2, 2025
6f39b2b
Add gradle retry plugin
cwperks Jul 2, 2025
077f6c1
Merge branch 'main' into remove-stash-context
cwperks Jul 11, 2025
fb3889f
Merge branch 'main' into remove-stash-context
cwperks Jul 11, 2025
1f30694
Merge branch 'main' into remove-stash-context
cwperks Aug 13, 2025
f559f41
WIP on simplification
cwperks Aug 13, 2025
6e5b73c
More cleanup
cwperks Aug 13, 2025
801d5bc
Change to Client
cwperks Aug 13, 2025
1ff37f9
Fix compilation issue
cwperks Aug 14, 2025
20e1ac6
Add missing permissions
cwperks Aug 20, 2025
1215d04
Update AddPolicy
cwperks Aug 20, 2025
d24456b
Fix more tests
cwperks Aug 20, 2025
00d4064
get innerClient
cwperks Aug 20, 2025
7390f92
mock PluginClient
cwperks Aug 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ plugins {
id "de.undercouch.download" version "5.6.0"
id "com.netflix.nebula.ospackage" version "12.0.0"
id "com.dorongold.task-tree" version "2.1.1"
id "org.gradle.test-retry" version "1.6.2"
}

apply plugin: 'java'
Expand Down Expand Up @@ -146,6 +147,10 @@ opensearchplugin {

tasks.named("integTest").configure {
it.dependsOn(project.tasks.named("bundlePlugin"))
it.retry {
failOnPassedAfterRetry = false
maxRetries = 5
}
}

allOpen {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.opensearch.core.xcontent.XContentParser.Token
import org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.env.Environment
import org.opensearch.env.NodeEnvironment
import org.opensearch.identity.PluginSubject
import org.opensearch.indexmanagement.IndexManagementIndices.Companion.HISTORY_ALL
import org.opensearch.indexmanagement.controlcenter.notification.ControlCenterIndices
import org.opensearch.indexmanagement.controlcenter.notification.action.delete.DeleteLRONConfigAction
Expand Down Expand Up @@ -175,13 +176,15 @@ import org.opensearch.indexmanagement.transform.resthandler.RestPreviewTransform
import org.opensearch.indexmanagement.transform.resthandler.RestStartTransformAction
import org.opensearch.indexmanagement.transform.resthandler.RestStopTransformAction
import org.opensearch.indexmanagement.transform.settings.TransformSettings
import org.opensearch.indexmanagement.util.PluginClient
import org.opensearch.indices.SystemIndexDescriptor
import org.opensearch.jobscheduler.spi.JobSchedulerExtension
import org.opensearch.jobscheduler.spi.ScheduledJobParser
import org.opensearch.jobscheduler.spi.ScheduledJobRunner
import org.opensearch.monitor.jvm.JvmService
import org.opensearch.plugins.ActionPlugin
import org.opensearch.plugins.ExtensiblePlugin
import org.opensearch.plugins.IdentityAwarePlugin
import org.opensearch.plugins.NetworkPlugin
import org.opensearch.plugins.Plugin
import org.opensearch.plugins.SystemIndexPlugin
Expand All @@ -208,7 +211,8 @@ class IndexManagementPlugin :
ActionPlugin,
ExtensiblePlugin,
SystemIndexPlugin,
TelemetryAwarePlugin {
TelemetryAwarePlugin,
IdentityAwarePlugin {
private val logger = LogManager.getLogger(javaClass)
lateinit var indexManagementIndices: IndexManagementIndices
lateinit var actionValidation: ActionValidation
Expand All @@ -223,6 +227,7 @@ class IndexManagementPlugin :
private val extensionCheckerMap = mutableMapOf<String, StatusChecker>()
lateinit var indexOperationActionFilter: IndexOperationActionFilter
private lateinit var metricsRegistry: MetricsRegistry
private lateinit var pluginClient: PluginClient

companion object {
const val PLUGINS_BASE_URI = "/_plugins"
Expand Down Expand Up @@ -398,6 +403,8 @@ class IndexManagementPlugin :
environment,
)

this.pluginClient = PluginClient(client)

IndexManagementActionsMetrics.instance.initialize(metricsRegistry)
rollupInterceptor = RollupInterceptor(clusterService, settings, indexNameExpressionResolver)
val jvmService = JvmService(environment.settings())
Expand Down Expand Up @@ -430,13 +437,13 @@ class IndexManagementPlugin :
.registerMetadataServices(RollupMetadataService(client, xContentRegistry))
.registerConsumers()
.registerClusterConfigurationProvider(skipFlag)
indexManagementIndices = IndexManagementIndices(settings, client.admin().indices(), clusterService)
val controlCenterIndices = ControlCenterIndices(client.admin().indices(), clusterService)
indexManagementIndices = IndexManagementIndices(settings, this.pluginClient.admin().indices(), clusterService)
val controlCenterIndices = ControlCenterIndices(this.pluginClient.admin().indices(), clusterService)
actionValidation = ActionValidation(settings, clusterService, jvmService)
val indexStateManagementHistory =
IndexStateManagementHistory(
settings,
client,
this.pluginClient,
threadPool,
clusterService,
indexManagementIndices,
Expand All @@ -454,7 +461,7 @@ class IndexManagementPlugin :
val extensionChecker = ExtensionStatusChecker(extensionCheckerMap, clusterService)
val managedIndexRunner =
ManagedIndexRunner
.registerClient(client)
.registerClient(pluginClient)
.registerClusterService(clusterService)
.registerValidationService(actionValidation)
.registerNamedXContentRegistry(xContentRegistry)
Expand All @@ -472,7 +479,7 @@ class IndexManagementPlugin :
val managedIndexCoordinator =
ManagedIndexCoordinator(
environment.settings(),
client, clusterService, threadPool, indexManagementIndices, indexMetadataProvider, xContentRegistry,
pluginClient, clusterService, threadPool, indexManagementIndices, indexMetadataProvider, xContentRegistry,
)

val smRunner = SMRunner.init(client, threadPool, settings, indexManagementIndices, clusterService)
Expand Down Expand Up @@ -500,6 +507,7 @@ class IndexManagementPlugin :
indexMetadataProvider,
smRunner,
pluginVersionSweepCoordinator,
pluginClient,
)
}

Expand Down Expand Up @@ -614,6 +622,10 @@ class IndexManagementPlugin :
ActionPlugin.ActionHandler(DeleteLRONConfigAction.INSTANCE, TransportDeleteLRONConfigAction::class.java),
)

override fun assignSubject(pluginSubject: PluginSubject?) {
pluginClient.setSubject(pluginSubject)
}

override fun getTransportInterceptors(
namedWriteableRegistry: NamedWriteableRegistry,
threadContext: ThreadContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,21 @@ data class Channel(val id: String) :
user: User?,
) {
val channel = this
client.threadPool().threadContext.stashContext().use {
// We need to set the user context information in the thread context for notification plugin to correctly resolve the user object
// TODO Understand why this is called twice when reindexing is finished in NotificationActionListenerIT.test notify for reindex with runtime policy
// We need to set the user context information in the thread context for notification plugin to correctly resolve the user object
client.threadPool().threadContext.getTransient<String>(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT)?.let {
client.threadPool().threadContext.putTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT, generateUserString(user))
val res: SendNotificationResponse =
NotificationsPluginInterface.suspendUntil {
this.sendNotification(
(client as NodeClient),
eventSource,
ChannelMessage(message, null, null),
listOf(channel.id),
it,
)
}
validateResponseStatus(res.getStatus(), res.notificationEvent.eventSource.referenceId)
}
val res: SendNotificationResponse =
NotificationsPluginInterface.suspendUntil {
this.sendNotification(
(client as NodeClient),
eventSource,
ChannelMessage(message, null, null),
listOf(channel.id),
it,
)
}
validateResponseStatus(res.getStatus(), res.notificationEvent.eventSource.referenceId)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,17 @@ import org.opensearch.common.inject.Inject
import org.opensearch.commons.ConfigConstants
import org.opensearch.core.action.ActionListener
import org.opensearch.indexmanagement.IndexManagementPlugin
import org.opensearch.indexmanagement.util.PluginClient
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
import org.opensearch.transport.client.node.NodeClient
import org.opensearch.transport.client.Client

class TransportDeleteLRONConfigAction
@Inject
constructor(
val client: NodeClient,
transportService: TransportService,
actionFilters: ActionFilters,
val client: PluginClient,
) : HandledTransportAction<DeleteLRONConfigRequest, DeleteResponse>(
DeleteLRONConfigAction.NAME, transportService, actionFilters, ::DeleteLRONConfigRequest,
) {
Expand All @@ -35,7 +36,7 @@ constructor(
}

inner class DeleteLRONConfigHandler(
private val client: NodeClient,
private val client: Client,
private val actionListener: ActionListener<DeleteResponse>,
private val request: DeleteLRONConfigRequest,
private val docId: String = request.docId,
Expand All @@ -49,13 +50,11 @@ constructor(
}",
)

client.threadPool().threadContext.stashContext().use {
val deleteRequest =
DeleteRequest(IndexManagementPlugin.CONTROL_CENTER_INDEX, docId)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
val deleteRequest =
DeleteRequest(IndexManagementPlugin.CONTROL_CENTER_INDEX, docId)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)

client.delete(deleteRequest, actionListener)
}
client.delete(deleteRequest, actionListener)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,16 @@ import org.opensearch.indexmanagement.controlcenter.notification.LRONConfigRespo
import org.opensearch.indexmanagement.controlcenter.notification.model.LRONConfig
import org.opensearch.indexmanagement.controlcenter.notification.util.getLRONConfigAndParse
import org.opensearch.indexmanagement.opensearchapi.parseWithType
import org.opensearch.indexmanagement.util.PluginClient
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
import org.opensearch.transport.client.node.NodeClient
import org.opensearch.transport.client.Client

class TransportGetLRONConfigAction
@Inject
constructor(
val client: NodeClient,
val client: PluginClient,
transportService: TransportService,
actionFilters: ActionFilters,
val xContentRegistry: NamedXContentRegistry,
Expand All @@ -48,7 +49,7 @@ constructor(
}

inner class GetLRONConfigHandler(
private val client: NodeClient,
private val client: Client,
private val actionListener: ActionListener<GetLRONConfigResponse>,
private val request: GetLRONConfigRequest,
) {
Expand All @@ -58,25 +59,23 @@ constructor(
ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT,
)}",
)
client.threadPool().threadContext.stashContext().use {
if (null != request.docId) {
getLRONConfigAndParse(
client,
request.docId,
xContentRegistry,
object : ActionListener<LRONConfigResponse> {
override fun onResponse(response: LRONConfigResponse) {
actionListener.onResponse(GetLRONConfigResponse(listOf(response), 1))
}
if (null != request.docId) {
getLRONConfigAndParse(
client,
request.docId,
xContentRegistry,
object : ActionListener<LRONConfigResponse> {
override fun onResponse(response: LRONConfigResponse) {
actionListener.onResponse(GetLRONConfigResponse(listOf(response), 1))
}

override fun onFailure(e: Exception) {
actionListener.onFailure(ExceptionsHelper.unwrapCause(e) as Exception)
}
},
)
} else {
doSearch()
}
override fun onFailure(e: Exception) {
actionListener.onFailure(ExceptionsHelper.unwrapCause(e) as Exception)
}
},
)
} else {
doSearch()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,17 @@ import org.opensearch.indexmanagement.controlcenter.notification.ControlCenterIn
import org.opensearch.indexmanagement.controlcenter.notification.LRONConfigResponse
import org.opensearch.indexmanagement.controlcenter.notification.util.getDocID
import org.opensearch.indexmanagement.controlcenter.notification.util.getPriority
import org.opensearch.indexmanagement.util.PluginClient
import org.opensearch.indexmanagement.util.SecurityUtils
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
import org.opensearch.transport.client.node.NodeClient
import org.opensearch.transport.client.Client

@Suppress("LongParameterList")
class TransportIndexLRONConfigAction
@Inject
constructor(
val client: NodeClient,
val client: PluginClient,
transportService: TransportService,
actionFilters: ActionFilters,
val clusterService: ClusterService,
Expand All @@ -53,7 +54,7 @@ constructor(
}

inner class IndexLRONConfigHandler(
private val client: NodeClient,
private val client: Client,
private val actionListener: ActionListener<LRONConfigResponse>,
private val request: IndexLRONConfigRequest,
private val user: User? = SecurityUtils.buildUser(client.threadPool().threadContext),
Expand All @@ -65,16 +66,14 @@ constructor(
ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT,
)}",
)
client.threadPool().threadContext.stashContext().use {
// we use dryRun to help check permission and do request validation
if (request.dryRun) {
validate()
return
}
controlCenterIndices.checkAndUpdateControlCenterIndex(
ActionListener.wrap(::onCreateMappingsResponse, actionListener::onFailure),
)
// we use dryRun to help check permission and do request validation
if (request.dryRun) {
validate()
return
}
controlCenterIndices.checkAndUpdateControlCenterIndex(
ActionListener.wrap(::onCreateMappingsResponse, actionListener::onFailure),
)
}

private fun onCreateMappingsResponse(response: AcknowledgedResponse) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,31 +168,30 @@ class NotificationActionListener<Request : ActionRequest, Response : ActionRespo
DEFAULT_PAGINATION_SIZE, 0, DEFAULT_LRON_CONFIG_SORT_FIELD, SORT_ORDER_DESC, queryString,
)

client.threadPool().threadContext.stashContext().use {
client.execute(
GetLRONConfigAction.INSTANCE,
GetLRONConfigRequest(searchParams = searchParam),
object : ActionListener<GetLRONConfigResponse> {
override fun onResponse(lronConfigResponse: GetLRONConfigResponse) {
launch {
sendNotification(lronConfigResponse, taskId, action, result)
}
// TODO verify this works
client.execute(
GetLRONConfigAction.INSTANCE,
GetLRONConfigRequest(searchParams = searchParam),
object : ActionListener<GetLRONConfigResponse> {
override fun onResponse(lronConfigResponse: GetLRONConfigResponse) {
launch {
sendNotification(lronConfigResponse, taskId, action, result)
}
}

override fun onFailure(e: Exception) {
if (e is IndexNotFoundException) {
logger.debug(
"No notification policy configured for task: {} action: {}",
taskId.toString(),
action,
)
} else {
logger.error("Can't get notification policy for action: {}", action, e)
}
override fun onFailure(e: Exception) {
if (e is IndexNotFoundException) {
logger.debug(
"No notification policy configured for task: {} action: {}",
taskId.toString(),
action,
)
} else {
logger.error("Can't get notification policy for action: {}", action, e)
}
},
)
}
}
},
)
}

@Suppress("NestedBlockDepth")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.opensearch.indexmanagement.IndexManagementPlugin
import org.opensearch.indexmanagement.controlcenter.notification.LRONConfigResponse
import org.opensearch.indexmanagement.controlcenter.notification.model.LRONConfig
import org.opensearch.indexmanagement.opensearchapi.parseFromGetResponse
import org.opensearch.transport.client.node.NodeClient
import org.opensearch.transport.client.Client

const val LRON_DOC_ID_PREFIX = "LRON:"

Expand Down Expand Up @@ -68,7 +68,7 @@ fun getDocID(taskId: TaskId? = null, actionName: String? = null): String {
}

fun getLRONConfigAndParse(
client: NodeClient,
client: Client,
docId: String,
xContentRegistry: NamedXContentRegistry,
actionListener: ActionListener<LRONConfigResponse>,
Expand Down
Loading
Loading