Skip to content

Commit f3068f4

Browse files
committed
YARN-11809. Support application backoff mechanism for CapacityScheduler.
1 parent e8a64d0 commit f3068f4

File tree

5 files changed

+425
-2
lines changed

5 files changed

+425
-2
lines changed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java

+22-1
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,16 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
184184
private ConcurrentHashMultiset<SchedulerRequestKey>
185185
missedNonPartitionedReqSchedulingOpportunity =
186186
ConcurrentHashMultiset.create();
187-
187+
188+
/**
189+
* Tracks the total number of times the application has missed scheduling
190+
* opportunities, which will be incremented when the scheduler cannot allocate
191+
* resources for the application, and reset to 0 when the scheduler
192+
* successfully allocates resources for the application or transitions
193+
* the application to the backoff state.
194+
*/
195+
private final AtomicLong appMissedSchedulingOpportunities = new AtomicLong();
196+
188197
// Time of the last container scheduled at the current allowed level
189198
protected Map<SchedulerRequestKey, Long> lastScheduledContainer =
190199
new ConcurrentHashMap<>();
@@ -1106,6 +1115,18 @@ void setSchedulingOpportunities(SchedulerRequestKey schedulerKey, int count) {
11061115
schedulingOpportunities.setCount(schedulerKey, count);
11071116
}
11081117

1118+
public void addAppMissedSchedulingOpportunities() {
1119+
appMissedSchedulingOpportunities.incrementAndGet();
1120+
}
1121+
1122+
public void resetAppMissedSchedulingOpportunities() {
1123+
appMissedSchedulingOpportunities.set(0);
1124+
}
1125+
1126+
public long getAppMissedSchedulingOpportunities() {
1127+
return appMissedSchedulingOpportunities.get();
1128+
}
1129+
11091130
private AggregateAppResourceUsage getRunningAggregateAppResourceUsage() {
11101131
long currentTimeMillis = System.currentTimeMillis();
11111132
// Don't walk the whole container list if the resources were computed

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityDiagnosticConstant.java

+3
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ public class ActivityDiagnosticConstant {
7070
public final static String APPLICATION_DO_NOT_NEED_RESOURCE =
7171
"Application does not need more resource";
7272

73+
public static final String APPLICATION_IN_BACKOFF_STATE =
74+
"Application is in backoff state due to reaching missed scheduling threshold";
75+
7376
/*
7477
* Request level diagnostics
7578
*/

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java

+77-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.TreeSet;
3232
import java.util.concurrent.ConcurrentHashMap;
3333
import java.util.concurrent.ConcurrentMap;
34+
import java.util.concurrent.TimeUnit;
3435
import java.util.stream.Stream;
3536

3637
import org.apache.commons.lang3.StringUtils;
@@ -39,6 +40,8 @@
3940
import org.apache.hadoop.security.AccessControlException;
4041
import org.apache.hadoop.security.UserGroupInformation;
4142
import org.apache.hadoop.security.authorize.AccessControlList;
43+
import org.apache.hadoop.thirdparty.com.google.common.cache.Cache;
44+
import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder;
4245
import org.apache.hadoop.util.Sets;
4346
import org.apache.hadoop.util.Time;
4447
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -154,6 +157,13 @@ public class AbstractLeafQueue extends AbstractCSQueue {
154157
private final List<FiCaSchedulerApp> runnableApps = new ArrayList<>();
155158
private final List<FiCaSchedulerApp> nonRunnableApps = new ArrayList<>();
156159

160+
// Backoff related variables
161+
private final boolean appBackoffEnabled;
162+
private long appBackoffIntervalMs = 0L;
163+
private long appBackoffMissedThreshold = 0L;
164+
// Cache of applications that are in backoff state
165+
private Cache<ApplicationId, Boolean> appsInBackoffState = null;
166+
157167
public AbstractLeafQueue(CapacitySchedulerQueueContext queueContext,
158168
String queueName, CSQueue parent, CSQueue old) throws IOException {
159169
this(queueContext, queueName, parent, old, false);
@@ -170,6 +180,26 @@ public AbstractLeafQueue(CapacitySchedulerQueueContext queueContext,
170180

171181
// One time initialization is enough since it is static ordering policy
172182
this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps<>();
183+
184+
// Initialize the backoff configurations
185+
CapacitySchedulerConfiguration conf = queueContext.getConfiguration();
186+
appBackoffEnabled = conf.isAppBackoffEnabled(queuePath);
187+
if (appBackoffEnabled) {
188+
appBackoffIntervalMs = conf.getAppBackoffIntervalMs(queuePath);
189+
if (appBackoffIntervalMs <= 0) {
190+
throw new IOException(
191+
"Backoff interval must be greater than 0 for queue: " + queuePath);
192+
}
193+
appBackoffMissedThreshold =
194+
conf.getAppBackoffMissedThreshold(queuePath);
195+
if (appBackoffMissedThreshold <= 0) {
196+
throw new IOException(
197+
"Backoff app missed threshold must be greater than 0 for queue: "
198+
+ queuePath);
199+
}
200+
appsInBackoffState = CacheBuilder.newBuilder().expireAfterAccess(
201+
appBackoffIntervalMs, TimeUnit.MILLISECONDS).build();
202+
}
173203
}
174204

175205
@SuppressWarnings("checkstyle:nowhitespaceafter")
@@ -314,7 +344,10 @@ protected void setupQueueConfigs(Resource clusterResource) throws
314344
+ defaultAppPriorityPerQueue + "\npriority = " + priority
315345
+ "\nmaxLifetime = " + getMaximumApplicationLifetime()
316346
+ " seconds" + "\ndefaultLifetime = "
317-
+ getDefaultApplicationLifetime() + " seconds");
347+
+ getDefaultApplicationLifetime() + " seconds"
348+
+ "\nbackoffEnabled = " + appBackoffEnabled
349+
+ "\nbackoffIntervalMs = " + appBackoffIntervalMs
350+
+ "\nbackoffAppMissedThreshold = " + appBackoffMissedThreshold);
318351
} finally {
319352
writeLock.unlock();
320353
}
@@ -1212,6 +1245,33 @@ public CSAssignment assignContainers(Resource clusterResource,
12121245
assignmentIterator.hasNext();) {
12131246
FiCaSchedulerApp application = assignmentIterator.next();
12141247

1248+
// Check for backoff state
1249+
if (isAppInBackoffState(application.getApplicationId())) {
1250+
// Skip if this app is still in backoff state
1251+
ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
1252+
activitiesManager, node, application, application.getPriority(),
1253+
ActivityDiagnosticConstant.APPLICATION_IN_BACKOFF_STATE);
1254+
continue;
1255+
}
1256+
1257+
// Check for missed scheduling opportunities
1258+
if (isAppShouldEnterBackoffState(application)) {
1259+
// Don't assign containers to this app when the missed opportunities reached the threshold.
1260+
LOG.info("Skip scheduling for application {} as it has reached the "
1261+
+ "missed scheduling threshold {}, the backoff interval is {} ms.",
1262+
application.getApplicationId(), appBackoffMissedThreshold,
1263+
appBackoffIntervalMs);
1264+
ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
1265+
activitiesManager, node, application, application.getPriority(),
1266+
ActivityDiagnosticConstant.APPLICATION_IN_BACKOFF_STATE);
1267+
// Add the app to the backoff state, to prevent further scheduling
1268+
// attempts during the backoff period.
1269+
appsInBackoffState.put(application.getApplicationId(), true);
1270+
// Reset missed scheduling opportunities
1271+
application.resetAppMissedSchedulingOpportunities();
1272+
continue;
1273+
}
1274+
12151275
ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
12161276
node, SystemClock.getInstance().getTime(), application);
12171277

@@ -1302,19 +1362,26 @@ public CSAssignment assignContainers(Resource clusterResource,
13021362
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
13031363
parent.getQueuePath(), getQueuePath(),
13041364
ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
1365+
// Reset missed scheduling opportunities after successfully allocating
1366+
// resources for the application.
1367+
application.resetAppMissedSchedulingOpportunities();
13051368
return assignment;
13061369
} else if (assignment.getSkippedType()
13071370
== CSAssignment.SkippedType.OTHER) {
13081371
ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
13091372
activitiesManager, application.getApplicationId(),
13101373
ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
13111374
application.updateNodeInfoForAMDiagnostics(node);
1375+
// Add missed scheduling opportunities for the application
1376+
application.addAppMissedSchedulingOpportunities();
13121377
} else if (assignment.getSkippedType()
13131378
== CSAssignment.SkippedType.QUEUE_LIMIT) {
13141379
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
13151380
parent.getQueuePath(), getQueuePath(), ActivityState.REJECTED,
13161381
() -> ActivityDiagnosticConstant.QUEUE_DO_NOT_HAVE_ENOUGH_HEADROOM
13171382
+ " from " + application.getApplicationId());
1383+
// Add missed scheduling opportunities for the application
1384+
application.addAppMissedSchedulingOpportunities();
13181385
return assignment;
13191386
} else{
13201387
// If we don't allocate anything, and it is not skipped by application,
@@ -1335,6 +1402,15 @@ public CSAssignment assignContainers(Resource clusterResource,
13351402
return CSAssignment.NULL_ASSIGNMENT;
13361403
}
13371404

1405+
public boolean isAppInBackoffState(ApplicationId appId) {
1406+
return appBackoffEnabled && appsInBackoffState.getIfPresent(appId) != null;
1407+
}
1408+
1409+
public boolean isAppShouldEnterBackoffState(FiCaSchedulerApp application) {
1410+
return appBackoffEnabled &&
1411+
application.getAppMissedSchedulingOpportunities() >= appBackoffMissedThreshold;
1412+
}
1413+
13381414
@Override
13391415
public boolean accept(Resource cluster,
13401416
ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) {

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java

+128
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
import java.util.Set;
7777
import java.util.regex.Matcher;
7878
import java.util.regex.Pattern;
79+
import java.util.concurrent.TimeUnit;
7980

8081
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePrefixes.getAutoCreatedQueueObjectTemplateConfPrefix;
8182
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePrefixes.getNodeLabelPrefix;
@@ -3036,4 +3037,131 @@ private String normalizePolicyName(String policyName) {
30363037
"Could not instantiate " + "NodesSortingPolicy: " + policyName, e);
30373038
}
30383039
}
3040+
3041+
/**
3042+
* Configuration keys for enabling backoff mechanism for a queue.
3043+
* When enabled, applications in the queue will be temporarily skipped
3044+
* if they fail to schedule tasks after a certain number of opportunities.
3045+
*/
3046+
@Private
3047+
public static final String BACKOFF_ENABLED = "app-backoff.enabled";
3048+
3049+
/**
3050+
* Default value for enabling backoff mechanism.
3051+
*/
3052+
@Private
3053+
public static final boolean DEFAULT_BACKOFF_ENABLED = false;
3054+
3055+
/**
3056+
* Configuration key indicating the duration for which an application
3057+
* in backoff state will be skipped during the scheduling process.
3058+
*/
3059+
@Private
3060+
public static final String APP_BACKOFF_INTERVAL_MS = "app-backoff.interval-ms";
3061+
3062+
/**
3063+
* Default value for the backoff duration in milliseconds.
3064+
*/
3065+
@Private
3066+
public static final long DEFAULT_APP_BACKOFF_INTERVAL_MS = 3000L;
3067+
3068+
/**
3069+
* Configuration key for the threshold of missed scheduling opportunities
3070+
* before an application is put into backoff state.
3071+
*/
3072+
@Private
3073+
public static final String APP_BACKOFF_MISSED_THRESHOLD =
3074+
"app-backoff.missed-threshold";
3075+
3076+
/**
3077+
* Default value for missed opportunities' threshold.
3078+
*/
3079+
@Private
3080+
public static final long DEFAULT_APP_BACKOFF_MISSED_THRESHOLD = 3L;
3081+
3082+
/**
3083+
* Get the global default value for backoff enabled setting.
3084+
* @return true if backoff is enabled, false otherwise
3085+
*/
3086+
public boolean getGlobalAppBackoffEnabled() {
3087+
return getBoolean(PREFIX + BACKOFF_ENABLED, DEFAULT_BACKOFF_ENABLED);
3088+
}
3089+
3090+
/**
3091+
* Get the global default value for backoff interval in milliseconds.
3092+
* @return the backoff interval in milliseconds
3093+
*/
3094+
public long getGlobalAppBackoffIntervalMs() {
3095+
return getTimeDuration(PREFIX + APP_BACKOFF_INTERVAL_MS,
3096+
DEFAULT_APP_BACKOFF_INTERVAL_MS, TimeUnit.MILLISECONDS);
3097+
}
3098+
3099+
/**
3100+
* Get the global default value for missed opportunities' threshold.
3101+
* @return the missed opportunities threshold
3102+
*/
3103+
public long getGlobalAppBackoffMissedThreshold() {
3104+
return getLong(PREFIX + APP_BACKOFF_MISSED_THRESHOLD,
3105+
DEFAULT_APP_BACKOFF_MISSED_THRESHOLD);
3106+
}
3107+
3108+
/**
3109+
* Check if app-backoff is enabled for a specific queue.
3110+
* @param queue the queue path
3111+
* @return true if app-backoff is enabled for the queue, false otherwise
3112+
*/
3113+
public boolean isAppBackoffEnabled(QueuePath queue) {
3114+
return getBoolean(getQueuePrefix(queue) + BACKOFF_ENABLED,
3115+
getGlobalAppBackoffEnabled());
3116+
}
3117+
3118+
/**
3119+
* Get the app-backoff interval in milliseconds for a specific queue.
3120+
* @param queue the queue path
3121+
* @return the app-backoff interval in milliseconds
3122+
*/
3123+
public long getAppBackoffIntervalMs(QueuePath queue) {
3124+
return getTimeDuration(getQueuePrefix(queue) + APP_BACKOFF_INTERVAL_MS,
3125+
getGlobalAppBackoffIntervalMs(), TimeUnit.MILLISECONDS);
3126+
}
3127+
3128+
/**
3129+
* Get the missed opportunities threshold for a specific queue.
3130+
* @param queue the queue path
3131+
* @return the missed opportunities threshold
3132+
*/
3133+
public long getAppBackoffMissedThreshold(QueuePath queue) {
3134+
return getLong(getQueuePrefix(queue) + APP_BACKOFF_MISSED_THRESHOLD,
3135+
getGlobalAppBackoffMissedThreshold());
3136+
}
3137+
3138+
/**
3139+
* Set the app-backoff enabled flag for a specific queue (for testing).
3140+
* @param queue the queue path
3141+
* @param enabled the backoff enabled flag
3142+
*/
3143+
@VisibleForTesting
3144+
public void setAppBackoffEnabled(QueuePath queue, boolean enabled) {
3145+
setBoolean(getQueuePrefix(queue) + BACKOFF_ENABLED, enabled);
3146+
}
3147+
3148+
/**
3149+
* Set the app-backoff interval in milliseconds for a specific queue (for testing).
3150+
* @param queue the queue path
3151+
* @param intervalMs the backoff interval in milliseconds
3152+
*/
3153+
@VisibleForTesting
3154+
public void setAppBackoffIntervalMs(QueuePath queue, long intervalMs) {
3155+
setLong(getQueuePrefix(queue) + APP_BACKOFF_INTERVAL_MS, intervalMs);
3156+
}
3157+
3158+
/**
3159+
* Set the app-backoff missed opportunities threshold for a specific queue (for testing).
3160+
* @param queue the queue path
3161+
* @param threshold the missed opportunities threshold
3162+
*/
3163+
@VisibleForTesting
3164+
public void setAppBackoffMissedThreshold(QueuePath queue, long threshold) {
3165+
setLong(getQueuePrefix(queue) + APP_BACKOFF_MISSED_THRESHOLD, threshold);
3166+
}
30393167
}

0 commit comments

Comments
 (0)