Skip to content

Commit cdf7aba

Browse files
committed
draft
1 parent 066a957 commit cdf7aba

File tree

7 files changed

+325
-17
lines changed

7 files changed

+325
-17
lines changed

docs/layouts/shortcodes/generated/history_server_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@
3232
<td>Integer</td>
3333
<td>The maximum number of jobs to retain in each archive directory defined by `historyserver.archive.fs.dir`. If set to `-1`(default), there is no limit to the number of archives. If set to `0` or less than `-1` HistoryServer will throw an <code class="highlighter-rouge">IllegalConfigurationException</code>. </td>
3434
</tr>
35+
<tr>
36+
<td><h5>historyserver.archive.retained-ttl</h5></td>
37+
<td style="word-wrap: break-word;">(none)</td>
38+
<td>Duration</td>
39+
<td>The time-to-live duration to retain the jobs archived in each archive directory defined by `historyserver.archive.fs.dir`. Note, when there are multiple history server instances with different configurations, they are working independently today and may have conflict configs. This is an existing problem. For the configuration, when there are multiple history servers instances. You can enable the configuration option like following: <ul><li>Using the same `<code class="highlighter-rouge">historyserver.archive.fs.dir</code>` directory as the refresh directories, you should enable and configure this feature in only one HistoryServer instance to avoid errors caused by multiple instances simultaneously cleaning up remote files,</li><li>Or you can keep the value of this configuration consistent across them.</li></ul></td>
40+
</tr>
3541
<tr>
3642
<td><h5>historyserver.log.jobmanager.url-pattern</h5></td>
3743
<td style="word-wrap: break-word;">(none)</td>

flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import static org.apache.flink.configuration.ConfigOptions.key;
2727
import static org.apache.flink.configuration.description.TextElement.code;
28+
import static org.apache.flink.configuration.description.TextElement.text;
2829

2930
/** The set of configuration options relating to the HistoryServer. */
3031
@PublicEvolving
@@ -143,5 +144,29 @@ public class HistoryServerOptions {
143144
code("IllegalConfigurationException"))
144145
.build());
145146

147+
public static final ConfigOption<Duration> HISTORY_SERVER_RETAINED_TTL =
148+
key("historyserver.archive.retained-ttl")
149+
.durationType()
150+
.noDefaultValue()
151+
.withDescription(
152+
Description.builder()
153+
.text(
154+
String.format(
155+
"The time-to-live duration to retain the jobs archived in each archive directory defined by `%s`. ",
156+
HISTORY_SERVER_ARCHIVE_DIRS.key()))
157+
.text(
158+
"Note, when there are multiple history server instances with different configurations, "
159+
+ "they are working independently today and may have conflict configs. "
160+
+ "This is an existing problem. "
161+
+ "For the configuration, when there are multiple history servers instances. You can enable the configuration option like following: ")
162+
.list(
163+
text(
164+
"Using the same `%s` directory as the refresh directories, "
165+
+ "you should enable and configure this feature in only one HistoryServer instance to avoid errors caused by multiple instances simultaneously cleaning up remote files,",
166+
code(HISTORY_SERVER_ARCHIVE_DIRS.key())),
167+
text(
168+
"Or you can keep the value of this configuration consistent across them."))
169+
.build());
170+
146171
private HistoryServerOptions() {}
147172
}

flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.flink.configuration.Configuration;
2323
import org.apache.flink.configuration.GlobalConfiguration;
2424
import org.apache.flink.configuration.HistoryServerOptions;
25-
import org.apache.flink.configuration.IllegalConfigurationException;
2625
import org.apache.flink.core.fs.FileSystem;
2726
import org.apache.flink.core.fs.Path;
2827
import org.apache.flink.core.plugin.PluginUtils;
@@ -38,6 +37,7 @@
3837
import org.apache.flink.runtime.security.SecurityUtils;
3938
import org.apache.flink.runtime.util.EnvironmentInformation;
4039
import org.apache.flink.runtime.util.Runnables;
40+
import org.apache.flink.runtime.webmonitor.history.retaining.JobArchivedRetainedStrategyImpl;
4141
import org.apache.flink.runtime.webmonitor.utils.LogUrlUtil;
4242
import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
4343
import org.apache.flink.util.ExceptionUtils;
@@ -238,19 +238,13 @@ public HistoryServer(
238238

239239
refreshIntervalMillis =
240240
config.get(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL).toMillis();
241-
int maxHistorySize = config.get(HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS);
242-
if (maxHistorySize == 0 || maxHistorySize < -1) {
243-
throw new IllegalConfigurationException(
244-
"Cannot set %s to 0 or less than -1",
245-
HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS.key());
246-
}
247241
archiveFetcher =
248242
new HistoryServerArchiveFetcher(
249243
refreshDirs,
250244
webDir,
251245
jobArchiveEventListener,
252246
cleanupExpiredArchives,
253-
maxHistorySize);
247+
JobArchivedRetainedStrategyImpl.createFrom(config));
254248

255249
this.shutdownHook =
256250
ShutdownHookUtil.addShutdownHook(

flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
3030
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
3131
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
32+
import org.apache.flink.runtime.webmonitor.history.retaining.JobArchivesRetainedStrategy;
3233
import org.apache.flink.util.FileUtils;
3334
import org.apache.flink.util.jackson.JacksonMapperFactory;
3435

@@ -112,8 +113,7 @@ public ArchiveEventType getType() {
112113
private final List<HistoryServer.RefreshLocation> refreshDirs;
113114
private final Consumer<ArchiveEvent> jobArchiveEventListener;
114115
private final boolean processExpiredArchiveDeletion;
115-
private final boolean processBeyondLimitArchiveDeletion;
116-
private final int maxHistorySize;
116+
private final JobArchivesRetainedStrategy jobRetainedStrategy;
117117

118118
/** Cache of all available jobs identified by their id. */
119119
private final Map<Path, Set<String>> cachedArchivesPerRefreshDirectory;
@@ -127,13 +127,12 @@ public ArchiveEventType getType() {
127127
File webDir,
128128
Consumer<ArchiveEvent> jobArchiveEventListener,
129129
boolean cleanupExpiredArchives,
130-
int maxHistorySize)
130+
JobArchivesRetainedStrategy jobRetainedStrategy)
131131
throws IOException {
132132
this.refreshDirs = checkNotNull(refreshDirs);
133133
this.jobArchiveEventListener = jobArchiveEventListener;
134134
this.processExpiredArchiveDeletion = cleanupExpiredArchives;
135-
this.maxHistorySize = maxHistorySize;
136-
this.processBeyondLimitArchiveDeletion = this.maxHistorySize > 0;
135+
this.jobRetainedStrategy = checkNotNull(jobRetainedStrategy);
137136
this.cachedArchivesPerRefreshDirectory = new HashMap<>();
138137
for (HistoryServer.RefreshLocation refreshDir : refreshDirs) {
139138
cachedArchivesPerRefreshDirectory.put(refreshDir.getPath(), new HashSet<>());
@@ -176,7 +175,7 @@ void fetchArchives() {
176175
continue;
177176
}
178177

179-
int historySize = 0;
178+
int fileOrderedIndexOnModifiedTime = 0;
180179
for (FileStatus jobArchive : jobArchives) {
181180
Path jobArchivePath = jobArchive.getPath();
182181
String jobID = jobArchivePath.getName();
@@ -186,8 +185,9 @@ void fetchArchives() {
186185

187186
jobsToRemove.get(refreshDir).remove(jobID);
188187

189-
historySize++;
190-
if (historySize > maxHistorySize && processBeyondLimitArchiveDeletion) {
188+
fileOrderedIndexOnModifiedTime++;
189+
if (!jobRetainedStrategy.shouldRetain(
190+
jobArchive, fileOrderedIndexOnModifiedTime)) {
191191
archivesBeyondSizeLimit
192192
.computeIfAbsent(refreshDir, ignored -> new HashSet<>())
193193
.add(jobArchivePath);
@@ -220,7 +220,7 @@ void fetchArchives() {
220220
&& processExpiredArchiveDeletion) {
221221
events.addAll(cleanupExpiredJobs(jobsToRemove));
222222
}
223-
if (!archivesBeyondSizeLimit.isEmpty() && processBeyondLimitArchiveDeletion) {
223+
if (!archivesBeyondSizeLimit.isEmpty()) {
224224
events.addAll(cleanupJobsBeyondSizeLimit(archivesBeyondSizeLimit));
225225
}
226226
if (!events.isEmpty()) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.runtime.webmonitor.history.retaining;
20+
21+
import org.apache.flink.annotation.VisibleForTesting;
22+
import org.apache.flink.configuration.IllegalConfigurationException;
23+
import org.apache.flink.configuration.ReadableConfig;
24+
import org.apache.flink.core.fs.FileStatus;
25+
26+
import javax.annotation.Nullable;
27+
28+
import java.time.Duration;
29+
import java.time.Instant;
30+
import java.util.Arrays;
31+
import java.util.Collections;
32+
import java.util.List;
33+
import java.util.Optional;
34+
35+
import static org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_JOBS;
36+
import static org.apache.flink.configuration.HistoryServerOptions.HISTORY_SERVER_RETAINED_TTL;
37+
38+
/** The retained strategy. */
39+
public class JobArchivedRetainedStrategyImpl implements JobArchivesRetainedStrategy {
40+
41+
public static JobArchivesRetainedStrategy createFrom(ReadableConfig config) {
42+
int maxHistorySizeByOldKey = config.get(HISTORY_SERVER_RETAINED_JOBS);
43+
if (maxHistorySizeByOldKey == 0 || maxHistorySizeByOldKey < -1) {
44+
throw new IllegalConfigurationException(
45+
"Cannot set %s to 0 or less than -1", HISTORY_SERVER_RETAINED_JOBS.key());
46+
}
47+
Optional<Duration> retainedTtlOpt = config.getOptional(HISTORY_SERVER_RETAINED_TTL);
48+
return new JobArchivedRetainedStrategyImpl(
49+
new QuantityBasedJobRetainedStrategy(maxHistorySizeByOldKey),
50+
new TimeToLiveBasedJobRetainedStrategy(retainedTtlOpt.orElse(null)));
51+
}
52+
53+
private final List<JobArchivesRetainedStrategy> strategies;
54+
55+
JobArchivedRetainedStrategyImpl(JobArchivesRetainedStrategy... strategies) {
56+
this.strategies =
57+
strategies == null || strategies.length == 0
58+
? Collections.emptyList()
59+
: Arrays.asList(strategies);
60+
}
61+
62+
@Override
63+
public boolean shouldRetain(FileStatus file, int fileOrderedIndex) {
64+
if (strategies.isEmpty()) {
65+
return true;
66+
}
67+
return strategies.stream().allMatch(s -> s.shouldRetain(file, fileOrderedIndex));
68+
}
69+
70+
@VisibleForTesting
71+
public List<JobArchivesRetainedStrategy> getStrategies() {
72+
return Collections.unmodifiableList(strategies);
73+
}
74+
}
75+
76+
/** The time to live based retained strategy. */
77+
class TimeToLiveBasedJobRetainedStrategy implements JobArchivesRetainedStrategy {
78+
79+
@Nullable private final Duration ttlThreshold;
80+
81+
TimeToLiveBasedJobRetainedStrategy(Duration ttlThreshold) {
82+
this.ttlThreshold = ttlThreshold;
83+
}
84+
85+
@Override
86+
public boolean shouldRetain(FileStatus file, int fileOrderedIndex) {
87+
if (ttlThreshold == null || ttlThreshold.toMillis() <= 0L) {
88+
return true;
89+
}
90+
return Instant.now().toEpochMilli() - file.getModificationTime() < ttlThreshold.toMillis();
91+
}
92+
}
93+
94+
/** The job quantity based retained strategy. */
95+
class QuantityBasedJobRetainedStrategy implements JobArchivesRetainedStrategy {
96+
97+
private final int quantityThreshold;
98+
99+
QuantityBasedJobRetainedStrategy(int quantityThreshold) {
100+
this.quantityThreshold = quantityThreshold;
101+
}
102+
103+
@Override
104+
public boolean shouldRetain(FileStatus file, int fileOrderedIndex) {
105+
if (quantityThreshold <= 0) {
106+
return true;
107+
}
108+
return quantityThreshold >= fileOrderedIndex;
109+
}
110+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.runtime.webmonitor.history.retaining;
20+
21+
import org.apache.flink.core.fs.FileStatus;
22+
23+
/** To define the strategy interface to judge whether the file should be retained. */
24+
public interface JobArchivesRetainedStrategy {
25+
26+
/**
27+
* Judge whether the file should be retained.
28+
*
29+
* @param file the target file to judge.
30+
* @param fileOrderedIndex the specified order index position of the target file,
31+
* @return The result that indicates whether the file should be retained.
32+
*/
33+
boolean shouldRetain(FileStatus file, int fileOrderedIndex);
34+
}

0 commit comments

Comments
 (0)