From b6802aef839d37ccfcc32ed4f4d2d88d4782f191 Mon Sep 17 00:00:00 2001 From: kkewwei Date: Sun, 24 Nov 2024 12:03:27 +0800 Subject: [PATCH] Coordinator can return partial results after the timeout when allow_partial_search_results is true Signed-off-by: kkewwei Signed-off-by: kkewwei --- CHANGELOG.md | 1 + .../search/CoordinatorTimeoutIT.java | 177 ++++++++++++++++++ .../search/SearchCancellationIT.java | 139 +++----------- .../action/search/MultiSearchRequest.java | 5 + .../action/search/SearchRequest.java | 36 +++- .../action/search/SearchRequestBuilder.java | 5 + .../opensearch/action/search/SearchTask.java | 11 +- .../action/search/SearchTransportService.java | 19 +- .../action/search/RestMultiSearchAction.java | 4 + .../rest/action/search/RestSearchAction.java | 1 + .../AbstractSearchAsyncActionTests.java | 71 ++++++- .../search/MultiSearchRequestTests.java | 31 +++ .../action/search/SearchRequestTests.java | 10 + .../search/RandomSearchRequestGenerator.java | 3 + ...StaticSettingsOpenSearchIntegTestCase.java | 77 ++++++++ 15 files changed, 469 insertions(+), 121 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/search/CoordinatorTimeoutIT.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 70245afda0dd1..8b0d14d0889cb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Support prefix list for remote repository attributes([#16271](https://github.com/opensearch-project/OpenSearch/pull/16271)) - Add new configuration setting `synonym_analyzer`, to the `synonym` and `synonym_graph` filters, enabling the specification of a custom analyzer for reading the synonym file ([#16488](https://github.com/opensearch-project/OpenSearch/pull/16488)). - Add stats for remote publication failure and move download failure stats to remote methods([#16682](https://github.com/opensearch-project/OpenSearch/pull/16682/)) +- Coordinator can return partial results after the timeout when allow_partial_search_results is true ([#16681](https://github.com/opensearch-project/OpenSearch/pull/16681)). ### Dependencies - Bump `com.google.cloud:google-cloud-core-http` from 2.23.0 to 2.47.0 ([#16504](https://github.com/opensearch-project/OpenSearch/pull/16504)) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/CoordinatorTimeoutIT.java b/server/src/internalClusterTest/java/org/opensearch/search/CoordinatorTimeoutIT.java new file mode 100644 index 0000000000000..3638f3bed92a0 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/search/CoordinatorTimeoutIT.java @@ -0,0 +1,177 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search; + +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + +import org.opensearch.action.search.MultiSearchResponse; +import org.opensearch.action.search.SearchPhaseExecutionException; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.search.ShardSearchFailure; +import org.opensearch.common.action.ActionFuture; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.plugins.Plugin; +import org.opensearch.plugins.PluginsService; +import org.opensearch.script.Script; +import org.opensearch.script.ScriptType; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase; +import org.opensearch.transport.ReceiveTimeoutTransportException; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.opensearch.index.query.QueryBuilders.scriptQuery; +import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; + +@OpenSearchIntegTestCase.ClusterScope(numDataNodes = 2, numClientNodes = 0) +public class CoordinatorTimeoutIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase { + + private long coordinatorTimeoutMills = 500; + + public CoordinatorTimeoutIT(Settings nodeSettings) { + super(nodeSettings); + } + + @ParametersFactory + public static Collection parameters() { + return Arrays.asList( + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() }, + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() } + ); + } + + @Override + protected Collection> nodePlugins() { + return Collections.singleton(ScriptedBlockPlugin.class); + } + + public void testTimeoutDuringQueryPhase() throws Exception { + int dataNumber = internalCluster().numDataNodes(); + createIndex("test", Settings.builder().put("index.number_of_shards", dataNumber).put("index.number_of_replicas", 0).build()); + + List plugins = initBlockFactory(); + indexTestData(client()); + TimeValue coordinatorTimeout = new TimeValue(coordinatorTimeoutMills, TimeUnit.MILLISECONDS); + ActionFuture searchResponseFuture = client().prepareSearch("test") + .setCoordinatorTimeout(coordinatorTimeout) + .setAllowPartialSearchResults(true) + .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap()))) + .execute(); + awaitForBlock(plugins); + logger.info("begin to sleep for " + coordinatorTimeout.getMillis() + " ms"); + Thread.sleep(coordinatorTimeout.getMillis() + 100); + logger.info("wake up"); + disableBlocks(plugins); + SearchResponse searchResponse = searchResponseFuture.get(); + assertEquals(1, searchResponse.getSuccessfulShards()); + verifyFailedException(searchResponse.getShardFailures()); + // wait in-flight contexts to finish + Thread.sleep(100); + } + + public void testMSearchChildRequestTimeout() throws Exception { + int dataNumber = internalCluster().numDataNodes(); + createIndex("test", Settings.builder().put("index.number_of_shards", dataNumber).put("index.number_of_replicas", 0).build()); + + List plugins = initBlockFactory(); + indexTestData(client()); + + TimeValue coordinatorTimeout = new TimeValue(coordinatorTimeoutMills, TimeUnit.MILLISECONDS); + ActionFuture mSearchResponse = client().prepareMultiSearch() + .add( + client().prepareSearch("test") + .setAllowPartialSearchResults(true) + .setRequestCache(false) + .setCoordinatorTimeout(coordinatorTimeout) + .setQuery( + scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap())) + ) + ) + .add( + client().prepareSearch("test") + .setAllowPartialSearchResults(true) + .setRequestCache(false) + .setQuery( + scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap())) + ) + ) + .execute(); + awaitForBlock(plugins); + Thread.sleep(coordinatorTimeout.getMillis() + 100); + // unblock the search thread + disableBlocks(plugins); + // one child request is expected to fail + final Set expectedFailedRequests = new HashSet<>(); + expectedFailedRequests.add(0); + ensureMSearchThrowException(mSearchResponse, expectedFailedRequests); + // wait in-flight contexts to finish + Thread.sleep(100); + } + + private void verifyFailedException(ShardSearchFailure[] shardFailures) { + for (ShardSearchFailure shardFailure : shardFailures) { + final Throwable topFailureCause = shardFailure.getCause(); + assertTrue(shardFailure.toString(), topFailureCause instanceof ReceiveTimeoutTransportException); + } + } + + private void ensureMSearchThrowException(ActionFuture mSearchResponse, Set expectedFailedChildRequests) { + MultiSearchResponse response = mSearchResponse.actionGet(); + Set actualFailedChildRequests = new HashSet<>(); + for (int i = 0; i < response.getResponses().length; ++i) { + SearchResponse sResponse = response.getResponses()[i].getResponse(); + // check if response is null means all the shard failed for this search request + if (sResponse == null) { + Exception ex = response.getResponses()[i].getFailure(); + assertTrue(ex instanceof SearchPhaseExecutionException); + verifyFailedException(((SearchPhaseExecutionException) ex).shardFailures()); + actualFailedChildRequests.add(i); + + } else if (sResponse.getShardFailures().length > 0) { + verifyFailedException(sResponse.getShardFailures()); + actualFailedChildRequests.add(i); + } + } + assertEquals( + "Actual child request with timeout failure is different that expected", + expectedFailedChildRequests, + actualFailedChildRequests + ); + } + + private List initBlockFactory() { + List plugins = new ArrayList<>(); + boolean notBlockFirst = true; + for (PluginsService pluginsService : internalCluster().getDataNodeInstances(PluginsService.class)) { + List scriptedBlockPlugins = pluginsService.filterPlugins(ScriptedBlockPlugin.class); + for (ScriptedBlockPlugin plugin : scriptedBlockPlugins) { + plugin.reset(); + // just block the first node + if (notBlockFirst) { + notBlockFirst = false; + // default is enable block + plugin.disableBlock(); + } else { + plugin.enableBlock(); + } + } + plugins.addAll(scriptedBlockPlugins); + } + return plugins; + } + +} diff --git a/server/src/internalClusterTest/java/org/opensearch/search/SearchCancellationIT.java b/server/src/internalClusterTest/java/org/opensearch/search/SearchCancellationIT.java index 5a19e2b841c08..0094d634933aa 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/SearchCancellationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/SearchCancellationIT.java @@ -34,11 +34,9 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; -import org.apache.logging.log4j.LogManager; import org.opensearch.ExceptionsHelper; import org.opensearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksResponse; -import org.opensearch.action.bulk.BulkRequestBuilder; import org.opensearch.action.search.MultiSearchAction; import org.opensearch.action.search.MultiSearchResponse; import org.opensearch.action.search.SearchAction; @@ -46,7 +44,6 @@ import org.opensearch.action.search.SearchResponse; import org.opensearch.action.search.SearchScrollAction; import org.opensearch.action.search.ShardSearchFailure; -import org.opensearch.action.support.WriteRequest; import org.opensearch.common.action.ActionFuture; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; @@ -55,10 +52,8 @@ import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.plugins.Plugin; import org.opensearch.plugins.PluginsService; -import org.opensearch.script.MockScriptPlugin; import org.opensearch.script.Script; import org.opensearch.script.ScriptType; -import org.opensearch.search.lookup.LeafFieldsLookup; import org.opensearch.tasks.TaskInfo; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase; @@ -71,21 +66,15 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Function; import static org.opensearch.action.search.TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING_KEY; import static org.opensearch.index.query.QueryBuilders.scriptQuery; -import static org.opensearch.search.SearchCancellationIT.ScriptedBlockPlugin.SCRIPT_NAME; import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; import static org.opensearch.search.SearchService.NO_TIMEOUT; +import static org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase.ScriptedBlockPlugin.SCRIPT_NAME; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertFailures; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.notNullValue; @@ -128,18 +117,7 @@ public void cleanup() { client().admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder().putNull("*")).get(); } - private void indexTestData() { - for (int i = 0; i < 5; i++) { - // Make sure we have a few segments - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - for (int j = 0; j < 20; j++) { - bulkRequestBuilder.add(client().prepareIndex("test").setId(Integer.toString(i * 5 + j)).setSource("field", "value")); - } - assertNoFailures(bulkRequestBuilder.get()); - } - } - - private List initBlockFactory() { + protected List initBlockFactory() { List plugins = new ArrayList<>(); for (PluginsService pluginsService : internalCluster().getDataNodeInstances(PluginsService.class)) { plugins.addAll(pluginsService.filterPlugins(ScriptedBlockPlugin.class)); @@ -151,24 +129,6 @@ private List initBlockFactory() { return plugins; } - private void awaitForBlock(List plugins) throws Exception { - int numberOfShards = getNumShards("test").numPrimaries; - assertBusy(() -> { - int numberOfBlockedPlugins = 0; - for (ScriptedBlockPlugin plugin : plugins) { - numberOfBlockedPlugins += plugin.hits.get(); - } - logger.info("The plugin blocked on {} out of {} shards", numberOfBlockedPlugins, numberOfShards); - assertThat(numberOfBlockedPlugins, greaterThan(0)); - }); - } - - private void disableBlocks(List plugins) throws Exception { - for (ScriptedBlockPlugin plugin : plugins) { - plugin.disableBlock(); - } - } - private void cancelSearch(String action) { ListTasksResponse listTasksResponse = client().admin().cluster().prepareListTasks().setActions(action).get(); assertThat(listTasksResponse.getTasks(), hasSize(1)); @@ -180,7 +140,7 @@ private void cancelSearch(String action) { assertThat(cancelTasksResponse.getTasks().get(0).getTaskId(), equalTo(searchTask.getTaskId())); } - private SearchResponse ensureSearchWasCancelled(ActionFuture searchResponse) { + SearchResponse ensureSearchWasCancelled(ActionFuture searchResponse) { try { SearchResponse response = searchResponse.actionGet(); logger.info("Search response {}", response); @@ -235,7 +195,7 @@ private void verifyCancellationException(ShardSearchFailure[] failures) { public void testCancellationDuringQueryPhase() throws Exception { List plugins = initBlockFactory(); - indexTestData(); + indexTestData(client()); logger.info("Executing search"); ActionFuture searchResponse = client().prepareSearch("test") @@ -251,12 +211,12 @@ public void testCancellationDuringQueryPhase() throws Exception { public void testCancellationDuringQueryPhaseUsingRequestParameter() throws Exception { List plugins = initBlockFactory(); - indexTestData(); + indexTestData(client()); ActionFuture searchResponse = client().prepareSearch("test") .setCancelAfterTimeInterval(requestCancellationTimeout) .setAllowPartialSearchResults(randomBoolean()) - .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap()))) + .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap()))) .execute(); awaitForBlock(plugins); sleepForAtLeast(requestCancellationTimeout.getMillis()); @@ -267,7 +227,7 @@ public void testCancellationDuringQueryPhaseUsingRequestParameter() throws Excep public void testCancellationDuringQueryPhaseUsingClusterSetting() throws Exception { List plugins = initBlockFactory(); - indexTestData(); + indexTestData(client()); client().admin() .cluster() @@ -278,7 +238,7 @@ public void testCancellationDuringQueryPhaseUsingClusterSetting() throws Excepti .get(); ActionFuture searchResponse = client().prepareSearch("test") .setAllowPartialSearchResults(randomBoolean()) - .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap()))) + .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap()))) .execute(); awaitForBlock(plugins); sleepForAtLeast(clusterCancellationTimeout.getMillis()); @@ -289,7 +249,7 @@ public void testCancellationDuringQueryPhaseUsingClusterSetting() throws Excepti public void testCancellationDuringFetchPhase() throws Exception { List plugins = initBlockFactory(); - indexTestData(); + indexTestData(client()); logger.info("Executing search"); ActionFuture searchResponse = client().prepareSearch("test") @@ -305,7 +265,7 @@ public void testCancellationDuringFetchPhase() throws Exception { public void testCancellationDuringFetchPhaseUsingRequestParameter() throws Exception { List plugins = initBlockFactory(); - indexTestData(); + indexTestData(client()); ActionFuture searchResponse = client().prepareSearch("test") .setCancelAfterTimeInterval(requestCancellationTimeout) .addScriptField("test_field", new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap())) @@ -319,7 +279,7 @@ public void testCancellationDuringFetchPhaseUsingRequestParameter() throws Excep public void testCancellationOfScrollSearches() throws Exception { List plugins = initBlockFactory(); - indexTestData(); + indexTestData(client()); logger.info("Executing search"); ActionFuture searchResponse = client().prepareSearch("test") @@ -341,7 +301,7 @@ public void testCancellationOfScrollSearches() throws Exception { public void testCancellationOfFirstScrollSearchRequestUsingRequestParameter() throws Exception { List plugins = initBlockFactory(); - indexTestData(); + indexTestData(client()); ActionFuture searchResponse = client().prepareSearch("test") .setScroll(keepAlive) .setCancelAfterTimeInterval(requestCancellationTimeout) @@ -364,7 +324,7 @@ public void testCancellationOfFirstScrollSearchRequestUsingRequestParameter() th public void testCancellationOfScrollSearchesOnFollowupRequests() throws Exception { List plugins = initBlockFactory(); - indexTestData(); + indexTestData(client()); // Disable block so the first request would pass disableBlocks(plugins); @@ -405,7 +365,7 @@ public void testCancellationOfScrollSearchesOnFollowupRequests() throws Exceptio public void testNoCancellationOfScrollSearchOnFollowUpRequest() throws Exception { List plugins = initBlockFactory(); - indexTestData(); + indexTestData(client()); // Disable block so the first request would pass disableBlocks(plugins); @@ -413,7 +373,7 @@ public void testNoCancellationOfScrollSearchOnFollowUpRequest() throws Exception .setScroll(keepAlive) .setCancelAfterTimeInterval(requestCancellationTimeout) .setSize(2) - .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap()))) + .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap()))) .get(); assertNotNull(searchResponse.getScrollId()); @@ -444,7 +404,7 @@ public void testNoCancellationOfScrollSearchOnFollowUpRequest() throws Exception public void testDisableCancellationAtRequestLevel() throws Exception { List plugins = initBlockFactory(); - indexTestData(); + indexTestData(client()); client().admin() .cluster() .prepareUpdateSettings() @@ -455,7 +415,7 @@ public void testDisableCancellationAtRequestLevel() throws Exception { ActionFuture searchResponse = client().prepareSearch("test") .setAllowPartialSearchResults(randomBoolean()) .setCancelAfterTimeInterval(NO_TIMEOUT) - .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap()))) + .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap()))) .execute(); awaitForBlock(plugins); sleepForAtLeast(clusterCancellationTimeout.getMillis()); @@ -467,7 +427,7 @@ public void testDisableCancellationAtRequestLevel() throws Exception { public void testDisableCancellationAtClusterLevel() throws Exception { List plugins = initBlockFactory(); - indexTestData(); + indexTestData(client()); client().admin() .cluster() .prepareUpdateSettings() @@ -475,7 +435,7 @@ public void testDisableCancellationAtClusterLevel() throws Exception { .get(); ActionFuture searchResponse = client().prepareSearch("test") .setAllowPartialSearchResults(randomBoolean()) - .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap()))) + .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap()))) .execute(); awaitForBlock(plugins); sleepForAtLeast(clusterCancellationTimeout.getMillis()); @@ -487,7 +447,7 @@ public void testDisableCancellationAtClusterLevel() throws Exception { public void testCancelMultiSearch() throws Exception { List plugins = initBlockFactory(); - indexTestData(); + indexTestData(client()); ActionFuture msearchResponse = client().prepareMultiSearch() .add( client().prepareSearch("test") @@ -511,7 +471,7 @@ public void testCancelMultiSearch() throws Exception { public void testMSearchChildRequestCancellationWithClusterLevelTimeout() throws Exception { List plugins = initBlockFactory(); - indexTestData(); + indexTestData(client()); client().admin() .cluster() .prepareUpdateSettings() @@ -524,17 +484,13 @@ public void testMSearchChildRequestCancellationWithClusterLevelTimeout() throws .add( client().prepareSearch("test") .setAllowPartialSearchResults(randomBoolean()) - .setQuery( - scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap())) - ) + .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap()))) ) .add( client().prepareSearch("test") .setAllowPartialSearchResults(randomBoolean()) .setRequestCache(false) - .setQuery( - scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap())) - ) + .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap()))) ) .execute(); awaitForBlock(plugins); @@ -554,7 +510,7 @@ public void testMSearchChildRequestCancellationWithClusterLevelTimeout() throws */ public void testMSearchChildReqCancellationWithHybridTimeout() throws Exception { List plugins = initBlockFactory(); - indexTestData(); + indexTestData(client()); client().admin() .cluster() .prepareUpdateSettings() @@ -568,25 +524,19 @@ public void testMSearchChildReqCancellationWithHybridTimeout() throws Exception client().prepareSearch("test") .setAllowPartialSearchResults(randomBoolean()) .setCancelAfterTimeInterval(requestCancellationTimeout) - .setQuery( - scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap())) - ) + .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap()))) ) .add( client().prepareSearch("test") .setAllowPartialSearchResults(randomBoolean()) .setCancelAfterTimeInterval(NO_TIMEOUT) - .setQuery( - scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap())) - ) + .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap()))) ) .add( client().prepareSearch("test") .setAllowPartialSearchResults(randomBoolean()) .setRequestCache(false) - .setQuery( - scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap())) - ) + .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap()))) ) .execute(); awaitForBlock(plugins); @@ -609,39 +559,4 @@ public void testMSearchChildReqCancellationWithHybridTimeout() throws Exception private static void sleepForAtLeast(long milliseconds) throws InterruptedException { Thread.sleep(milliseconds + 100L); } - - public static class ScriptedBlockPlugin extends MockScriptPlugin { - static final String SCRIPT_NAME = "search_block"; - - private final AtomicInteger hits = new AtomicInteger(); - - private final AtomicBoolean shouldBlock = new AtomicBoolean(true); - - public void reset() { - hits.set(0); - } - - public void disableBlock() { - shouldBlock.set(false); - } - - public void enableBlock() { - shouldBlock.set(true); - } - - @Override - public Map, Object>> pluginScripts() { - return Collections.singletonMap(SCRIPT_NAME, params -> { - LeafFieldsLookup fieldsLookup = (LeafFieldsLookup) params.get("_fields"); - LogManager.getLogger(SearchCancellationIT.class).info("Blocking on the document {}", fieldsLookup.get("_id")); - hits.incrementAndGet(); - try { - assertBusy(() -> assertFalse(shouldBlock.get())); - } catch (Exception e) { - throw new RuntimeException(e); - } - return true; - }); - } - } } diff --git a/server/src/main/java/org/opensearch/action/search/MultiSearchRequest.java b/server/src/main/java/org/opensearch/action/search/MultiSearchRequest.java index f16d7d1e7d6a3..8a2fae9779f3b 100644 --- a/server/src/main/java/org/opensearch/action/search/MultiSearchRequest.java +++ b/server/src/main/java/org/opensearch/action/search/MultiSearchRequest.java @@ -281,6 +281,8 @@ public static void readMultiLineFormat( searchRequest.setCancelAfterTimeInterval(nodeTimeValue(value, null)); } else if ("phase_took".equals(entry.getKey())) { searchRequest.setPhaseTook(nodeBooleanValue(value)); + } else if ("coordinator_timeout".equals(entry.getKey())) { + searchRequest.setCoordinatorTimeout(nodeTimeValue(value)); } else { throw new IllegalArgumentException("key [" + entry.getKey() + "] is not supported in the metadata section"); } @@ -385,6 +387,9 @@ public static void writeSearchRequestParams(SearchRequest request, XContentBuild if (request.isPhaseTook() != null) { xContentBuilder.field("phase_took", request.isPhaseTook()); } + if (request.getCoordinatorTimeout() != null) { + xContentBuilder.field("coordinator_timeout", request.getCoordinatorTimeout().getStringRep()); + } xContentBuilder.endObject(); } diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequest.java b/server/src/main/java/org/opensearch/action/search/SearchRequest.java index 4d3bb868b779a..4e717db9f1355 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequest.java @@ -59,6 +59,7 @@ import java.util.Objects; import static org.opensearch.action.ValidateActions.addValidationError; +import static org.opensearch.search.SearchService.NO_TIMEOUT; /** * A request to execute search against one or more indices (or all). Best created using @@ -123,6 +124,8 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla private Boolean phaseTook = null; + private TimeValue coordinatorTimeout = null; + public SearchRequest() { this.localClusterAlias = null; this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS; @@ -228,6 +231,7 @@ private SearchRequest( this.finalReduce = finalReduce; this.cancelAfterTimeInterval = searchRequest.cancelAfterTimeInterval; this.phaseTook = searchRequest.phaseTook; + this.coordinatorTimeout = searchRequest.coordinatorTimeout; } /** @@ -275,6 +279,9 @@ public SearchRequest(StreamInput in) throws IOException { if (in.getVersion().onOrAfter(Version.V_2_12_0)) { phaseTook = in.readOptionalBoolean(); } + if (in.getVersion().onOrAfter(Version.V_2_19_0)) { + coordinatorTimeout = in.readOptionalTimeValue(); + } } @Override @@ -309,6 +316,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_12_0)) { out.writeOptionalBoolean(phaseTook); } + if (out.getVersion().onOrAfter(Version.V_2_19_0)) { + out.writeOptionalTimeValue(coordinatorTimeout); + } } @Override @@ -341,6 +351,13 @@ public ActionRequestValidationException validate() { if (source.aggregations() != null) { validationException = source.aggregations().validate(validationException); } + if (source.timeout() != null && coordinatorTimeout != null && source.timeout().compareTo(coordinatorTimeout) < 0) { + validationException = addValidationError( + "coordinatorTimeout [" + coordinatorTimeout + "] must be smaller than timeout [" + source.timeout() + "]", + validationException + ); + + } } if (pointInTimeBuilder() != null) { if (scroll) { @@ -711,9 +728,18 @@ public String pipeline() { return pipeline; } + public void setCoordinatorTimeout(TimeValue coordinatorTimeout) { + assert coordinatorTimeout != NO_TIMEOUT; + this.coordinatorTimeout = coordinatorTimeout; + } + + public TimeValue getCoordinatorTimeout() { + return coordinatorTimeout; + } + @Override public SearchTask createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - return new SearchTask(id, type, action, this::buildDescription, parentTaskId, headers, cancelAfterTimeInterval); + return new SearchTask(id, type, action, this::buildDescription, parentTaskId, headers, cancelAfterTimeInterval, coordinatorTimeout); } public final String buildDescription() { @@ -765,7 +791,8 @@ public boolean equals(Object o) { && ccsMinimizeRoundtrips == that.ccsMinimizeRoundtrips && Objects.equals(cancelAfterTimeInterval, that.cancelAfterTimeInterval) && Objects.equals(pipeline, that.pipeline) - && Objects.equals(phaseTook, that.phaseTook); + && Objects.equals(phaseTook, that.phaseTook) + && Objects.equals(coordinatorTimeout, that.coordinatorTimeout); } @Override @@ -787,7 +814,8 @@ public int hashCode() { absoluteStartMillis, ccsMinimizeRoundtrips, cancelAfterTimeInterval, - phaseTook + phaseTook, + coordinatorTimeout ); } @@ -832,6 +860,8 @@ public String toString() { + pipeline + ", phaseTook=" + phaseTook + + ", coordinatorTimeout=" + + coordinatorTimeout + "}"; } } diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestBuilder.java b/server/src/main/java/org/opensearch/action/search/SearchRequestBuilder.java index 4a547ee2c82bd..ee703212dcbfe 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestBuilder.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestBuilder.java @@ -651,4 +651,9 @@ public SearchRequestBuilder setCancelAfterTimeInterval(TimeValue cancelAfterTime this.request.setCancelAfterTimeInterval(cancelAfterTimeInterval); return this; } + + public SearchRequestBuilder setCoordinatorTimeout(TimeValue coordinatorTimeout) { + this.request.setCoordinatorTimeout(coordinatorTimeout); + return this; + } } diff --git a/server/src/main/java/org/opensearch/action/search/SearchTask.java b/server/src/main/java/org/opensearch/action/search/SearchTask.java index 2a1a961e7607b..9aa64d97e18a3 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchTask.java +++ b/server/src/main/java/org/opensearch/action/search/SearchTask.java @@ -53,6 +53,7 @@ public class SearchTask extends QueryGroupTask implements SearchBackpressureTask // generating description in a lazy way since source can be quite big private final Supplier descriptionSupplier; private SearchProgressListener progressListener = SearchProgressListener.NOOP; + private final TimeValue coordinatorTimeout; public SearchTask( long id, @@ -62,7 +63,7 @@ public SearchTask( TaskId parentTaskId, Map headers ) { - this(id, type, action, descriptionSupplier, parentTaskId, headers, NO_TIMEOUT); + this(id, type, action, descriptionSupplier, parentTaskId, headers, NO_TIMEOUT, null); } public SearchTask( @@ -72,10 +73,12 @@ public SearchTask( Supplier descriptionSupplier, TaskId parentTaskId, Map headers, - TimeValue cancelAfterTimeInterval + TimeValue cancelAfterTimeInterval, + TimeValue coordinatorTimeout ) { super(id, type, action, null, parentTaskId, headers, cancelAfterTimeInterval); this.descriptionSupplier = descriptionSupplier; + this.coordinatorTimeout = coordinatorTimeout; } @Override @@ -106,4 +109,8 @@ public final SearchProgressListener getProgressListener() { public boolean shouldCancelChildrenOnCancellation() { return true; } + + public TimeValue getCoordinatorTimeout() { + return coordinatorTimeout; + } } diff --git a/server/src/main/java/org/opensearch/action/search/SearchTransportService.java b/server/src/main/java/org/opensearch/action/search/SearchTransportService.java index 64c738f633f2e..aa3cdfd99ab1d 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/opensearch/action/search/SearchTransportService.java @@ -39,6 +39,7 @@ import org.opensearch.action.support.IndicesOptions; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.Nullable; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.StreamInput; @@ -172,7 +173,7 @@ public void createPitContext( CREATE_READER_CONTEXT_ACTION_NAME, request, task, - TransportRequestOptions.EMPTY, + getTransportRequestOptions(task.getCoordinatorTimeout()), new ActionListenerResponseHandler<>(actionListener, TransportCreatePitAction.CreateReaderContextResponse::new) ); } @@ -188,7 +189,7 @@ public void sendCanMatch( QUERY_CAN_MATCH_NAME, request, task, - TransportRequestOptions.EMPTY, + getTransportRequestOptions(task.getCoordinatorTimeout()), new ActionListenerResponseHandler<>(listener, SearchService.CanMatchResponse::new) ); } @@ -228,6 +229,7 @@ public void sendExecuteDfs( DFS_ACTION_NAME, request, task, + getTransportRequestOptions(task.getCoordinatorTimeout()), new ConnectionCountingHandler<>(listener, DfsSearchResult::new, clientConnections, connection.getNode().getId()) ); } @@ -249,6 +251,7 @@ public void sendExecuteQuery( QUERY_ACTION_NAME, request, task, + getTransportRequestOptions(task.getCoordinatorTimeout()), new ConnectionCountingHandler<>(handler, reader, clientConnections, connection.getNode().getId()) ); } @@ -264,6 +267,7 @@ public void sendExecuteQuery( QUERY_ID_ACTION_NAME, request, task, + getTransportRequestOptions(task.getCoordinatorTimeout()), new ConnectionCountingHandler<>(listener, QuerySearchResult::new, clientConnections, connection.getNode().getId()) ); } @@ -279,6 +283,7 @@ public void sendExecuteScrollQuery( QUERY_SCROLL_ACTION_NAME, request, task, + getTransportRequestOptions(task.getCoordinatorTimeout()), new ConnectionCountingHandler<>(listener, ScrollQuerySearchResult::new, clientConnections, connection.getNode().getId()) ); } @@ -328,6 +333,7 @@ private void sendExecuteFetch( action, request, task, + getTransportRequestOptions(task.getCoordinatorTimeout()), new ConnectionCountingHandler<>(listener, FetchSearchResult::new, clientConnections, connection.getNode().getId()) ); } @@ -342,10 +348,19 @@ void sendExecuteMultiSearch(final MultiSearchRequest request, SearchTask task, f MultiSearchAction.NAME, request, task, + getTransportRequestOptions(task.getCoordinatorTimeout()), new ConnectionCountingHandler<>(listener, MultiSearchResponse::new, clientConnections, connection.getNode().getId()) ); } + static TransportRequestOptions getTransportRequestOptions(TimeValue coordinatorTimeout) { + if (coordinatorTimeout != null) { + return TransportRequestOptions.builder().withTimeout(coordinatorTimeout).build(); + } else { + return TransportRequestOptions.EMPTY; + } + } + public RemoteClusterService getRemoteClusterService() { return transportService.getRemoteClusterService(); } diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestMultiSearchAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestMultiSearchAction.java index 4b11670450727..a340a5eca1ee7 100644 --- a/server/src/main/java/org/opensearch/rest/action/search/RestMultiSearchAction.java +++ b/server/src/main/java/org/opensearch/rest/action/search/RestMultiSearchAction.java @@ -157,6 +157,7 @@ public static MultiSearchRequest parseRequest( multiRequest.add(searchRequest); }); List requests = multiRequest.requests(); + final TimeValue coordinatorTimeout = restRequest.paramAsTime("coordinator_timeout", null); final TimeValue cancelAfterTimeInterval = restRequest.paramAsTime("cancel_after_time_interval", null); for (SearchRequest request : requests) { // preserve if it's set on the request @@ -171,6 +172,9 @@ public static MultiSearchRequest parseRequest( if (request.getCancelAfterTimeInterval() == null) { request.setCancelAfterTimeInterval(cancelAfterTimeInterval); } + if (request.getCoordinatorTimeout() == null) { + request.setCoordinatorTimeout(coordinatorTimeout); + } } return multiRequest; } diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java index 05465e32631fd..d5131a9c869e8 100644 --- a/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java +++ b/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java @@ -224,6 +224,7 @@ public static void parseSearchRequest( } searchRequest.setCancelAfterTimeInterval(request.paramAsTime("cancel_after_time_interval", null)); + searchRequest.setCoordinatorTimeout(request.paramAsTime("coordinator_timeout", null)); } /** diff --git a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java index 27336e86e52b0..aa8eced5d27e5 100644 --- a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java @@ -36,11 +36,13 @@ import org.opensearch.action.OriginalIndices; import org.opensearch.action.support.IndicesOptions; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.GroupShardsIterator; import org.opensearch.common.UUIDs; import org.opensearch.common.collect.Tuple; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.AtomicArray; import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.util.set.Sets; @@ -55,6 +57,7 @@ import org.opensearch.index.shard.ShardNotFoundException; import org.opensearch.search.SearchPhaseResult; import org.opensearch.search.SearchShardTarget; +import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.internal.AliasFilter; import org.opensearch.search.internal.InternalSearchResponse; import org.opensearch.search.internal.ShardSearchContextId; @@ -65,6 +68,7 @@ import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.ReceiveTimeoutTransportException; import org.opensearch.transport.Transport; import org.junit.After; import org.junit.Before; @@ -89,6 +93,9 @@ import java.util.function.BiFunction; import java.util.stream.IntStream; +import org.mockito.Mockito; + +import static org.opensearch.action.search.SearchTransportService.QUERY_ACTION_NAME; import static org.opensearch.tasks.TaskResourceTrackingService.TASK_RESOURCE_USAGE; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -138,6 +145,7 @@ private AbstractSearchAsyncAction createAction( false, expected, resourceUsage, + false, new SearchShardIterator(null, null, Collections.emptyList(), null) ); } @@ -151,6 +159,7 @@ private AbstractSearchAsyncAction createAction( final boolean catchExceptionWhenExecutePhaseOnShard, final AtomicLong expected, final TaskResourceUsage resourceUsage, + final boolean blockTheFirstQueryPhase, final SearchShardIterator... shards ) { @@ -179,7 +188,7 @@ private AbstractSearchAsyncAction createAction( .setNodeId(randomAlphaOfLengthBetween(1, 5)) .build(); threadPool.getThreadContext().addResponseHeader(TASK_RESOURCE_USAGE, taskResourceInfo.toString()); - + AtomicBoolean firstShard = new AtomicBoolean(true); return new AbstractSearchAsyncAction( "test", logger, @@ -207,7 +216,17 @@ private AbstractSearchAsyncAction createAction( ) { @Override protected SearchPhase getNextPhase(final SearchPhaseResults results, SearchPhaseContext context) { - return null; + if (blockTheFirstQueryPhase) { + return new SearchPhase("test") { + @Override + public void run() { + listener.onResponse(new SearchResponse(null, null, 0, 0, 0, 0, null, null)); + assertingListener.onPhaseEnd(context, null); + } + }; + } else { + return null; + } } @Override @@ -218,6 +237,16 @@ protected void executePhaseOnShard( ) { if (failExecutePhaseOnShard) { listener.onFailure(new ShardNotFoundException(shardIt.shardId())); + } else if (blockTheFirstQueryPhase && firstShard.compareAndSet(true, false)) { + // Sleep and throw ReceiveTimeoutTransportException to simulate node blocked + try { + Thread.sleep(request.getCoordinatorTimeout().millis()); + } catch (InterruptedException e) {} + DiscoveryNode node = Mockito.mock(DiscoveryNode.class); + Mockito.when(node.getName()).thenReturn("test_nodes"); + listener.onFailure( + new ReceiveTimeoutTransportException(node, QUERY_ACTION_NAME, "request_id [171] timed out after [413ms]") + ); } else { if (catchExceptionWhenExecutePhaseOnShard) { try { @@ -227,6 +256,7 @@ protected void executePhaseOnShard( } } else { listener.onResponse(new QuerySearchResult()); + } } } @@ -587,6 +617,7 @@ public void onFailure(Exception e) { false, new AtomicLong(), new TaskResourceUsage(randomLong(), randomLong()), + false, shards ); action.run(); @@ -635,6 +666,7 @@ public void onFailure(Exception e) { false, new AtomicLong(), new TaskResourceUsage(randomLong(), randomLong()), + false, shards ); action.run(); @@ -688,6 +720,7 @@ public void onFailure(Exception e) { catchExceptionWhenExecutePhaseOnShard, new AtomicLong(), new TaskResourceUsage(randomLong(), randomLong()), + false, shards ); action.run(); @@ -791,6 +824,40 @@ public void testOnPhaseListenersWithDfsType() throws InterruptedException { assertEquals(0, testListener.getPhaseCurrent(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName())); } + public void testExecutePhaseOnShardBlockAndRetrunPartialResult() { + // on shard is blocked in query phase + final Index index = new Index("test", UUID.randomUUID().toString()); + + final SearchShardIterator[] shards = IntStream.range(0, 2 + randomInt(4)) + .mapToObj(i -> new SearchShardIterator(null, new ShardId(index, i), List.of("n1"), null, null, null)) + .toArray(SearchShardIterator[]::new); + + SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true); + searchRequest.source(new SearchSourceBuilder()); + long timeoutMills = 500; + searchRequest.setCoordinatorTimeout(new TimeValue(timeoutMills, TimeUnit.MILLISECONDS)); + searchRequest.setMaxConcurrentShardRequests(shards.length); + final AtomicBoolean successed = new AtomicBoolean(false); + long current = System.currentTimeMillis(); + + final ArraySearchPhaseResults queryResult = new ArraySearchPhaseResults<>(shards.length); + AbstractSearchAsyncAction action = createAction(searchRequest, queryResult, new ActionListener<>() { + @Override + public void onResponse(SearchResponse response) { + successed.set(true); + } + + @Override + public void onFailure(Exception e) { + successed.set(false); + } + }, false, false, false, new AtomicLong(), new TaskResourceUsage(randomLong(), randomLong()), true, shards); + action.run(); + long s = System.currentTimeMillis() - current; + assertTrue(s > timeoutMills); + assertTrue(successed.get()); + } + private SearchDfsQueryThenFetchAsyncAction createSearchDfsQueryThenFetchAsyncAction( List searchRequestOperationsListeners ) { diff --git a/server/src/test/java/org/opensearch/action/search/MultiSearchRequestTests.java b/server/src/test/java/org/opensearch/action/search/MultiSearchRequestTests.java index 2577dfdc20698..57d51a6d4ab30 100644 --- a/server/src/test/java/org/opensearch/action/search/MultiSearchRequestTests.java +++ b/server/src/test/java/org/opensearch/action/search/MultiSearchRequestTests.java @@ -180,6 +180,37 @@ public void testOnlyParentMSearchRequestWithCancelAfterTimeIntervalParameter() t assertEquals(new TimeValue(20, TimeUnit.SECONDS), request.requests().get(0).getCancelAfterTimeInterval()); } + public void tesCoordinatorTimeoutAtParentAndFewChildRequest() throws IOException { + final String requestContent = "{\"index\":\"test\", \"expand_wildcards\" : \"open,closed\", " + + "\"coordinator_timeout\" : \"10s\"}\r\n" + + "{\"query\" : {\"match_all\" :{}}}\r\n {\"search_type\" : \"dfs_query_then_fetch\"}\n" + + "{\"query\" : {\"match_all\" :{}}}\r\n"; + FakeRestRequest restRequest = new FakeRestRequest.Builder(xContentRegistry()).withContent( + new BytesArray(requestContent), + XContentType.JSON + ).withParams(Collections.singletonMap("coordinator_timeout", "20s")).build(); + MultiSearchRequest request = RestMultiSearchAction.parseRequest(restRequest, null, true); + assertThat(request.requests().size(), equalTo(2)); + assertThat(request.requests().get(0).indices()[0], equalTo("test")); + // verifies that child search request parameter value is used for first search request + assertEquals(new TimeValue(10, TimeUnit.SECONDS), request.requests().get(0).getCoordinatorTimeout()); + // verifies that parent msearch parameter value is used for second search request + assertEquals(new TimeValue(20, TimeUnit.SECONDS), request.requests().get(1).getCoordinatorTimeout()); + } + + public void testOnlyParentMSearchRequestWithCoordinatorTimeoutParameter() throws IOException { + final String requestContent = "{\"index\":\"test\", \"expand_wildcards\" : \"open,closed\"}}\r\n" + + "{\"query\" : {\"match_all\" :{}}}\r\n"; + FakeRestRequest restRequest = new FakeRestRequest.Builder(xContentRegistry()).withContent( + new BytesArray(requestContent), + XContentType.JSON + ).withParams(Collections.singletonMap("coordinator_timeout", "20s")).build(); + MultiSearchRequest request = RestMultiSearchAction.parseRequest(restRequest, null, true); + assertThat(request.requests().size(), equalTo(1)); + assertThat(request.requests().get(0).indices()[0], equalTo("test")); + assertEquals(new TimeValue(20, TimeUnit.SECONDS), request.requests().get(0).getCoordinatorTimeout()); + } + public void testDefaultIndicesOptions() throws IOException { final String requestContent = "{\"index\":\"test\", \"expand_wildcards\" : \"open,closed\"}}\r\n" + "{\"query\" : {\"match_all\" :{}}}\r\n"; diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestTests.java index acda1445bacbb..8ed64fd8efb4c 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestTests.java @@ -238,6 +238,16 @@ public void testValidate() throws IOException { assertEquals(1, validationErrors.validationErrors().size()); assertEquals("using [point in time] is not allowed in a scroll context", validationErrors.validationErrors().get(0)); } + + { + // timeout must be smaller than coordinator_timeout + SearchRequest searchRequest = createSearchRequest().source(new SearchSourceBuilder().timeout(TimeValue.timeValueMillis(10))); + searchRequest.setCoordinatorTimeout(TimeValue.timeValueMillis(100)); + ActionRequestValidationException validationErrors = searchRequest.validate(); + assertNotNull(validationErrors); + assertEquals(1, validationErrors.validationErrors().size()); + assertEquals("coordinatorTimeout [100ms] must be smaller than timeout [10ms]", validationErrors.validationErrors().get(0)); + } } public void testCopyConstructor() throws IOException { diff --git a/test/framework/src/main/java/org/opensearch/search/RandomSearchRequestGenerator.java b/test/framework/src/main/java/org/opensearch/search/RandomSearchRequestGenerator.java index 74de1e6d96d93..25ba9e33e32c8 100644 --- a/test/framework/src/main/java/org/opensearch/search/RandomSearchRequestGenerator.java +++ b/test/framework/src/main/java/org/opensearch/search/RandomSearchRequestGenerator.java @@ -134,6 +134,9 @@ public static SearchRequest randomSearchRequest(Supplier ra if (randomBoolean()) { searchRequest.setPhaseTook(randomBoolean()); } + if (randomBoolean()) { + searchRequest.setCoordinatorTimeout(TimeValue.parseTimeValue(randomTimeValue(), null, "coordinator_timeout")); + } return searchRequest; } diff --git a/test/framework/src/main/java/org/opensearch/test/ParameterizedStaticSettingsOpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/ParameterizedStaticSettingsOpenSearchIntegTestCase.java index 7d2c9ad686a01..2828acf18fe4c 100644 --- a/test/framework/src/main/java/org/opensearch/test/ParameterizedStaticSettingsOpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/ParameterizedStaticSettingsOpenSearchIntegTestCase.java @@ -8,16 +8,29 @@ package org.opensearch.test; +import org.apache.logging.log4j.LogManager; +import org.opensearch.action.bulk.BulkRequestBuilder; +import org.opensearch.action.support.WriteRequest; +import org.opensearch.client.Client; import org.opensearch.common.settings.Settings; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.script.MockScriptPlugin; +import org.opensearch.search.lookup.LeafFieldsLookup; import java.nio.file.Path; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures; +import static org.hamcrest.Matchers.greaterThan; /** * Base class for running the tests with parameterization with static settings: the cluster will be pre-created with the settings at startup, the method @@ -77,4 +90,68 @@ boolean hasSameParametersAs(ParameterizedOpenSearchIntegTestCase obj) { final ParameterizedStaticSettingsOpenSearchIntegTestCase other = (ParameterizedStaticSettingsOpenSearchIntegTestCase) obj; return Objects.equals(settings, other.settings); } + + protected static void indexTestData(Client client) { + for (int i = 0; i < 5; i++) { + // Make sure we have a few segments + BulkRequestBuilder bulkRequestBuilder = client.prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + for (int j = 0; j < 20; j++) { + bulkRequestBuilder.add(client.prepareIndex("test").setId(Integer.toString(i * 5 + j)).setSource("field", "value")); + } + assertNoFailures(bulkRequestBuilder.get()); + } + } + + public static class ScriptedBlockPlugin extends MockScriptPlugin { + public static final String SCRIPT_NAME = "search_block"; + + public final AtomicInteger hits = new AtomicInteger(); + + private final AtomicBoolean shouldBlock = new AtomicBoolean(true); + + public void reset() { + hits.set(0); + } + + public void disableBlock() { + shouldBlock.set(false); + } + + public void enableBlock() { + shouldBlock.set(true); + } + + @Override + public Map, Object>> pluginScripts() { + return Collections.singletonMap(SCRIPT_NAME, params -> { + LeafFieldsLookup fieldsLookup = (LeafFieldsLookup) params.get("_fields"); + LogManager.getLogger(ScriptedBlockPlugin.class).info("Blocking on the document {}", fieldsLookup.get("_id")); + hits.incrementAndGet(); + try { + assertBusy(() -> assertFalse(shouldBlock.get())); + } catch (Exception e) { + throw new RuntimeException(e); + } + return true; + }); + } + } + + protected void awaitForBlock(List plugins) throws Exception { + int numberOfShards = getNumShards("test").numPrimaries; + assertBusy(() -> { + int numberOfBlockedPlugins = 0; + for (ScriptedBlockPlugin plugin : plugins) { + numberOfBlockedPlugins += plugin.hits.get(); + } + logger.info("The plugin blocked on {} out of {} shards", numberOfBlockedPlugins, numberOfShards); + assertThat(numberOfBlockedPlugins, greaterThan(0)); + }); + } + + protected void disableBlocks(List plugins) throws Exception { + for (ScriptedBlockPlugin plugin : plugins) { + plugin.disableBlock(); + } + } }