31
31
import java .util .TreeSet ;
32
32
import java .util .concurrent .ConcurrentHashMap ;
33
33
import java .util .concurrent .ConcurrentMap ;
34
+ import java .util .concurrent .TimeUnit ;
34
35
import java .util .stream .Stream ;
35
36
36
37
import org .apache .commons .lang3 .StringUtils ;
39
40
import org .apache .hadoop .security .AccessControlException ;
40
41
import org .apache .hadoop .security .UserGroupInformation ;
41
42
import org .apache .hadoop .security .authorize .AccessControlList ;
43
+ import org .apache .hadoop .thirdparty .com .google .common .cache .Cache ;
44
+ import org .apache .hadoop .thirdparty .com .google .common .cache .CacheBuilder ;
42
45
import org .apache .hadoop .util .Sets ;
43
46
import org .apache .hadoop .util .Time ;
44
47
import org .apache .hadoop .yarn .api .records .ApplicationAttemptId ;
@@ -154,6 +157,13 @@ public class AbstractLeafQueue extends AbstractCSQueue {
154
157
private final List <FiCaSchedulerApp > runnableApps = new ArrayList <>();
155
158
private final List <FiCaSchedulerApp > nonRunnableApps = new ArrayList <>();
156
159
160
+ // Backoff related variables
161
+ private final boolean appBackoffEnabled ;
162
+ private long appBackoffIntervalMs = 0L ;
163
+ private long appBackoffMissedThreshold = 0L ;
164
+ // Cache of applications that are in backoff state
165
+ private Cache <ApplicationId , Boolean > appsInBackoffState = null ;
166
+
157
167
public AbstractLeafQueue (CapacitySchedulerQueueContext queueContext ,
158
168
String queueName , CSQueue parent , CSQueue old ) throws IOException {
159
169
this (queueContext , queueName , parent , old , false );
@@ -170,6 +180,26 @@ public AbstractLeafQueue(CapacitySchedulerQueueContext queueContext,
170
180
171
181
// One time initialization is enough since it is static ordering policy
172
182
this .pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps <>();
183
+
184
+ // Initialize the backoff configurations
185
+ CapacitySchedulerConfiguration conf = queueContext .getConfiguration ();
186
+ appBackoffEnabled = conf .isAppBackoffEnabled (queuePath );
187
+ if (appBackoffEnabled ) {
188
+ appBackoffIntervalMs = conf .getAppBackoffIntervalMs (queuePath );
189
+ if (appBackoffIntervalMs <= 0 ) {
190
+ throw new IOException (
191
+ "Backoff interval must be greater than 0 for queue: " + queuePath );
192
+ }
193
+ appBackoffMissedThreshold =
194
+ conf .getAppBackoffMissedThreshold (queuePath );
195
+ if (appBackoffMissedThreshold <= 0 ) {
196
+ throw new IOException (
197
+ "Backoff app missed threshold must be greater than 0 for queue: "
198
+ + queuePath );
199
+ }
200
+ appsInBackoffState = CacheBuilder .newBuilder ().expireAfterAccess (
201
+ appBackoffIntervalMs , TimeUnit .MILLISECONDS ).build ();
202
+ }
173
203
}
174
204
175
205
@ SuppressWarnings ("checkstyle:nowhitespaceafter" )
@@ -314,7 +344,10 @@ protected void setupQueueConfigs(Resource clusterResource) throws
314
344
+ defaultAppPriorityPerQueue + "\n priority = " + priority
315
345
+ "\n maxLifetime = " + getMaximumApplicationLifetime ()
316
346
+ " seconds" + "\n defaultLifetime = "
317
- + getDefaultApplicationLifetime () + " seconds" );
347
+ + getDefaultApplicationLifetime () + " seconds"
348
+ + "\n backoffEnabled = " + appBackoffEnabled
349
+ + "\n backoffIntervalMs = " + appBackoffIntervalMs
350
+ + "\n backoffAppMissedThreshold = " + appBackoffMissedThreshold );
318
351
} finally {
319
352
writeLock .unlock ();
320
353
}
@@ -1212,6 +1245,33 @@ public CSAssignment assignContainers(Resource clusterResource,
1212
1245
assignmentIterator .hasNext ();) {
1213
1246
FiCaSchedulerApp application = assignmentIterator .next ();
1214
1247
1248
+ // Check for backoff state
1249
+ if (isAppInBackoffState (application .getApplicationId ())) {
1250
+ // Skip if this app is still in backoff state
1251
+ ActivitiesLogger .APP .recordRejectedAppActivityFromLeafQueue (
1252
+ activitiesManager , node , application , application .getPriority (),
1253
+ ActivityDiagnosticConstant .APPLICATION_IN_BACKOFF_STATE );
1254
+ continue ;
1255
+ }
1256
+
1257
+ // Check for missed scheduling opportunities
1258
+ if (isAppShouldEnterBackoffState (application )) {
1259
+ // Don't assign containers to this app when the missed opportunities reached the threshold.
1260
+ LOG .info ("Skip scheduling for application {} as it has reached the "
1261
+ + "missed scheduling threshold {}, the backoff interval is {} ms." ,
1262
+ application .getApplicationId (), appBackoffMissedThreshold ,
1263
+ appBackoffIntervalMs );
1264
+ ActivitiesLogger .APP .recordRejectedAppActivityFromLeafQueue (
1265
+ activitiesManager , node , application , application .getPriority (),
1266
+ ActivityDiagnosticConstant .APPLICATION_IN_BACKOFF_STATE );
1267
+ // Add the app to the backoff state, to prevent further scheduling
1268
+ // attempts during the backoff period.
1269
+ appsInBackoffState .put (application .getApplicationId (), true );
1270
+ // Reset missed scheduling opportunities
1271
+ application .resetAppMissedSchedulingOpportunities ();
1272
+ continue ;
1273
+ }
1274
+
1215
1275
ActivitiesLogger .APP .startAppAllocationRecording (activitiesManager ,
1216
1276
node , SystemClock .getInstance ().getTime (), application );
1217
1277
@@ -1302,19 +1362,26 @@ public CSAssignment assignContainers(Resource clusterResource,
1302
1362
ActivitiesLogger .QUEUE .recordQueueActivity (activitiesManager , node ,
1303
1363
parent .getQueuePath (), getQueuePath (),
1304
1364
ActivityState .ACCEPTED , ActivityDiagnosticConstant .EMPTY );
1365
+ // Reset missed scheduling opportunities after successfully allocating
1366
+ // resources for the application.
1367
+ application .resetAppMissedSchedulingOpportunities ();
1305
1368
return assignment ;
1306
1369
} else if (assignment .getSkippedType ()
1307
1370
== CSAssignment .SkippedType .OTHER ) {
1308
1371
ActivitiesLogger .APP .finishSkippedAppAllocationRecording (
1309
1372
activitiesManager , application .getApplicationId (),
1310
1373
ActivityState .SKIPPED , ActivityDiagnosticConstant .EMPTY );
1311
1374
application .updateNodeInfoForAMDiagnostics (node );
1375
+ // Add missed scheduling opportunities for the application
1376
+ application .addAppMissedSchedulingOpportunities ();
1312
1377
} else if (assignment .getSkippedType ()
1313
1378
== CSAssignment .SkippedType .QUEUE_LIMIT ) {
1314
1379
ActivitiesLogger .QUEUE .recordQueueActivity (activitiesManager , node ,
1315
1380
parent .getQueuePath (), getQueuePath (), ActivityState .REJECTED ,
1316
1381
() -> ActivityDiagnosticConstant .QUEUE_DO_NOT_HAVE_ENOUGH_HEADROOM
1317
1382
+ " from " + application .getApplicationId ());
1383
+ // Add missed scheduling opportunities for the application
1384
+ application .addAppMissedSchedulingOpportunities ();
1318
1385
return assignment ;
1319
1386
} else {
1320
1387
// If we don't allocate anything, and it is not skipped by application,
@@ -1335,6 +1402,15 @@ public CSAssignment assignContainers(Resource clusterResource,
1335
1402
return CSAssignment .NULL_ASSIGNMENT ;
1336
1403
}
1337
1404
1405
+ public boolean isAppInBackoffState (ApplicationId appId ) {
1406
+ return appBackoffEnabled && appsInBackoffState .getIfPresent (appId ) != null ;
1407
+ }
1408
+
1409
+ public boolean isAppShouldEnterBackoffState (FiCaSchedulerApp application ) {
1410
+ return appBackoffEnabled &&
1411
+ application .getAppMissedSchedulingOpportunities () >= appBackoffMissedThreshold ;
1412
+ }
1413
+
1338
1414
@ Override
1339
1415
public boolean accept (Resource cluster ,
1340
1416
ResourceCommitRequest <FiCaSchedulerApp , FiCaSchedulerNode > request ) {
0 commit comments