Skip to content

Commit a94b407

Browse files
committed
Merge branch 'master' into group-trigger
2 parents b9561aa + 4ed84b1 commit a94b407

File tree

73 files changed

+1016
-314
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

73 files changed

+1016
-314
lines changed

changes.d/6574.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Broadcast: Report any settings that are not compatible with the scheduler Cylc version.

cylc/flow/broadcast_mgr.py

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
fail_if_platform_and_host_conflict,
3838
PlatformLookupError,
3939
)
40-
40+
from cylc.flow.util import uniq
4141

4242
if TYPE_CHECKING:
4343
from cylc.flow.id import Tokens
@@ -151,7 +151,7 @@ def clear_broadcast(
151151
LOG.error(get_broadcast_bad_options_report(bad_options))
152152
if modified_settings:
153153
self.data_store_mgr.delta_broadcast()
154-
return modified_settings, bad_options
154+
return uniq(modified_settings), bad_options
155155

156156
def expire_broadcast(self, cutoff=None, **kwargs):
157157
"""Clear all broadcasts targeting cycle points earlier than cutoff."""
@@ -284,18 +284,24 @@ def put_broadcast(
284284
bad_options is as described in the docstring for self.clear().
285285
"""
286286
modified_settings = []
287-
bad_point_strings = []
288-
bad_namespaces = []
287+
bad_settings = []
288+
bad_point_strings = set()
289+
bad_namespaces = set()
289290

290291
with self.lock:
291292
for setting in settings or []:
292293
# Coerce setting to cylc runtime object,
293294
# i.e. str to DurationFloat.
294295
coerced_setting = deepcopy(setting)
295-
BroadcastConfigValidator().validate(
296-
coerced_setting,
297-
SPEC['runtime']['__MANY__'],
298-
)
296+
try:
297+
BroadcastConfigValidator().validate(
298+
coerced_setting,
299+
SPEC['runtime']['__MANY__'],
300+
)
301+
except Exception as exc:
302+
LOG.error(exc)
303+
bad_settings.append(setting)
304+
continue
299305

300306
# Skip and warn if a run mode is broadcast to a workflow
301307
# running in simulation or dummy mode.
@@ -308,22 +314,24 @@ def put_broadcast(
308314
f' running in {self.workflow_run_mode.value} mode'
309315
' will have no effect, and will not be actioned.'
310316
)
317+
bad_settings.append(setting)
311318
continue
312319

313320
for point_string in point_strings or []:
314321
# Standardise the point and check its validity.
315322
bad_point = False
316323
try:
317324
point_string = standardise_point_string(point_string)
325+
318326
except PointParsingError:
319327
if point_string != '*':
320-
bad_point_strings.append(point_string)
328+
bad_point_strings.add(point_string)
321329
bad_point = True
322330
if not bad_point and point_string not in self.broadcasts:
323331
self.broadcasts[point_string] = {}
324332
for namespace in namespaces or []:
325333
if namespace not in self.linearized_ancestors:
326-
bad_namespaces.append(namespace)
334+
bad_namespaces.add(namespace)
327335
elif not bad_point:
328336
# Check broadcast against config and against
329337
# existing broadcasts:
@@ -340,6 +348,7 @@ def put_broadcast(
340348
namespace,
341349
coerced_setting,
342350
):
351+
bad_settings.append(setting)
343352
continue
344353

345354
if namespace not in self.broadcasts[point_string]:
@@ -362,13 +371,15 @@ def put_broadcast(
362371
LOG.info(get_broadcast_change_report(modified_settings))
363372

364373
bad_options = {}
374+
if bad_settings:
375+
bad_options["settings"] = uniq(bad_settings)
365376
if bad_point_strings:
366-
bad_options["point_strings"] = bad_point_strings
377+
bad_options["point_strings"] = sorted(bad_point_strings)
367378
if bad_namespaces:
368-
bad_options["namespaces"] = bad_namespaces
379+
bad_options["namespaces"] = sorted(bad_namespaces)
369380
if modified_settings:
370381
self.data_store_mgr.delta_broadcast()
371-
return modified_settings, bad_options
382+
return uniq(modified_settings), bad_options
372383

373384
@staticmethod
374385
def bc_mixes_old_and_new_platform_settings(

cylc/flow/broadcast_report.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,13 @@
2626
CHANGE_TITLE_CANCEL = "Broadcast cancelled:"
2727
CHANGE_TITLE_SET = "Broadcast set:"
2828

29+
CLI_OPT_MAP = {
30+
# broadcast field: cli option
31+
'point_strings': 'point',
32+
'namespaces': 'namespace',
33+
'settings': 'set',
34+
}
35+
2936

3037
def get_broadcast_bad_options_report(bad_options, is_set=False):
3138
"""Return a string to report bad options for broadcast cancel/clear."""
@@ -48,7 +55,11 @@ def get_broadcast_bad_options_report(bad_options, is_set=False):
4855
value_str += val
4956
else:
5057
value_str = value
51-
msg += BAD_OPTIONS_FMT % (key, value_str)
58+
if isinstance(value, dict):
59+
value_str = ', '.join(
60+
f'{key}={val}' for key, val in value.items()
61+
)
62+
msg += BAD_OPTIONS_FMT % (CLI_OPT_MAP.get(key, key), value_str)
5263
return msg
5364

5465

cylc/flow/data_messages_pb2.py

Lines changed: 34 additions & 34 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cylc/flow/data_messages_pb2.pyi

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ class PbTrigger(_message.Message):
304304
def __init__(self, id: _Optional[str] = ..., label: _Optional[str] = ..., message: _Optional[str] = ..., satisfied: bool = ..., time: _Optional[float] = ...) -> None: ...
305305

306306
class PbTaskProxy(_message.Message):
307-
__slots__ = ("stamp", "id", "task", "state", "cycle_point", "depth", "job_submits", "outputs", "namespace", "prerequisites", "jobs", "first_parent", "name", "is_held", "edges", "ancestors", "flow_nums", "external_triggers", "xtriggers", "is_queued", "is_runahead", "flow_wait", "runtime", "graph_depth")
307+
__slots__ = ("stamp", "id", "task", "state", "cycle_point", "depth", "job_submits", "outputs", "namespace", "prerequisites", "jobs", "first_parent", "name", "is_held", "edges", "ancestors", "flow_nums", "external_triggers", "xtriggers", "is_queued", "is_runahead", "flow_wait", "runtime", "graph_depth", "is_retry", "is_wallclock", "is_xtriggered")
308308
class OutputsEntry(_message.Message):
309309
__slots__ = ("key", "value")
310310
KEY_FIELD_NUMBER: _ClassVar[int]
@@ -350,6 +350,9 @@ class PbTaskProxy(_message.Message):
350350
FLOW_WAIT_FIELD_NUMBER: _ClassVar[int]
351351
RUNTIME_FIELD_NUMBER: _ClassVar[int]
352352
GRAPH_DEPTH_FIELD_NUMBER: _ClassVar[int]
353+
IS_RETRY_FIELD_NUMBER: _ClassVar[int]
354+
IS_WALLCLOCK_FIELD_NUMBER: _ClassVar[int]
355+
IS_XTRIGGERED_FIELD_NUMBER: _ClassVar[int]
353356
stamp: str
354357
id: str
355358
task: str
@@ -374,7 +377,10 @@ class PbTaskProxy(_message.Message):
374377
flow_wait: bool
375378
runtime: PbRuntime
376379
graph_depth: int
377-
def __init__(self, stamp: _Optional[str] = ..., id: _Optional[str] = ..., task: _Optional[str] = ..., state: _Optional[str] = ..., cycle_point: _Optional[str] = ..., depth: _Optional[int] = ..., job_submits: _Optional[int] = ..., outputs: _Optional[_Mapping[str, PbOutput]] = ..., namespace: _Optional[_Iterable[str]] = ..., prerequisites: _Optional[_Iterable[_Union[PbPrerequisite, _Mapping]]] = ..., jobs: _Optional[_Iterable[str]] = ..., first_parent: _Optional[str] = ..., name: _Optional[str] = ..., is_held: bool = ..., edges: _Optional[_Iterable[str]] = ..., ancestors: _Optional[_Iterable[str]] = ..., flow_nums: _Optional[str] = ..., external_triggers: _Optional[_Mapping[str, PbTrigger]] = ..., xtriggers: _Optional[_Mapping[str, PbTrigger]] = ..., is_queued: bool = ..., is_runahead: bool = ..., flow_wait: bool = ..., runtime: _Optional[_Union[PbRuntime, _Mapping]] = ..., graph_depth: _Optional[int] = ...) -> None: ...
380+
is_retry: bool
381+
is_wallclock: bool
382+
is_xtriggered: bool
383+
def __init__(self, stamp: _Optional[str] = ..., id: _Optional[str] = ..., task: _Optional[str] = ..., state: _Optional[str] = ..., cycle_point: _Optional[str] = ..., depth: _Optional[int] = ..., job_submits: _Optional[int] = ..., outputs: _Optional[_Mapping[str, PbOutput]] = ..., namespace: _Optional[_Iterable[str]] = ..., prerequisites: _Optional[_Iterable[_Union[PbPrerequisite, _Mapping]]] = ..., jobs: _Optional[_Iterable[str]] = ..., first_parent: _Optional[str] = ..., name: _Optional[str] = ..., is_held: bool = ..., edges: _Optional[_Iterable[str]] = ..., ancestors: _Optional[_Iterable[str]] = ..., flow_nums: _Optional[str] = ..., external_triggers: _Optional[_Mapping[str, PbTrigger]] = ..., xtriggers: _Optional[_Mapping[str, PbTrigger]] = ..., is_queued: bool = ..., is_runahead: bool = ..., flow_wait: bool = ..., runtime: _Optional[_Union[PbRuntime, _Mapping]] = ..., graph_depth: _Optional[int] = ..., is_retry: bool = ..., is_wallclock: bool = ..., is_xtriggered: bool = ...) -> None: ...
378384

379385
class PbFamily(_message.Message):
380386
__slots__ = ("stamp", "id", "name", "meta", "depth", "proxies", "parents", "child_tasks", "child_families", "first_parent", "runtime", "descendants")
@@ -405,7 +411,7 @@ class PbFamily(_message.Message):
405411
def __init__(self, stamp: _Optional[str] = ..., id: _Optional[str] = ..., name: _Optional[str] = ..., meta: _Optional[_Union[PbMeta, _Mapping]] = ..., depth: _Optional[int] = ..., proxies: _Optional[_Iterable[str]] = ..., parents: _Optional[_Iterable[str]] = ..., child_tasks: _Optional[_Iterable[str]] = ..., child_families: _Optional[_Iterable[str]] = ..., first_parent: _Optional[str] = ..., runtime: _Optional[_Union[PbRuntime, _Mapping]] = ..., descendants: _Optional[_Iterable[str]] = ...) -> None: ...
406412

407413
class PbFamilyProxy(_message.Message):
408-
__slots__ = ("stamp", "id", "cycle_point", "name", "family", "state", "depth", "first_parent", "child_tasks", "child_families", "is_held", "ancestors", "states", "state_totals", "is_held_total", "is_queued", "is_queued_total", "is_runahead", "is_runahead_total", "runtime", "graph_depth")
414+
__slots__ = ("stamp", "id", "cycle_point", "name", "family", "state", "depth", "first_parent", "child_tasks", "child_families", "is_held", "ancestors", "states", "state_totals", "is_held_total", "is_queued", "is_queued_total", "is_runahead", "is_runahead_total", "runtime", "graph_depth", "is_retry", "is_wallclock", "is_xtriggered")
409415
class StateTotalsEntry(_message.Message):
410416
__slots__ = ("key", "value")
411417
KEY_FIELD_NUMBER: _ClassVar[int]
@@ -434,6 +440,9 @@ class PbFamilyProxy(_message.Message):
434440
IS_RUNAHEAD_TOTAL_FIELD_NUMBER: _ClassVar[int]
435441
RUNTIME_FIELD_NUMBER: _ClassVar[int]
436442
GRAPH_DEPTH_FIELD_NUMBER: _ClassVar[int]
443+
IS_RETRY_FIELD_NUMBER: _ClassVar[int]
444+
IS_WALLCLOCK_FIELD_NUMBER: _ClassVar[int]
445+
IS_XTRIGGERED_FIELD_NUMBER: _ClassVar[int]
437446
stamp: str
438447
id: str
439448
cycle_point: str
@@ -455,7 +464,10 @@ class PbFamilyProxy(_message.Message):
455464
is_runahead_total: int
456465
runtime: PbRuntime
457466
graph_depth: int
458-
def __init__(self, stamp: _Optional[str] = ..., id: _Optional[str] = ..., cycle_point: _Optional[str] = ..., name: _Optional[str] = ..., family: _Optional[str] = ..., state: _Optional[str] = ..., depth: _Optional[int] = ..., first_parent: _Optional[str] = ..., child_tasks: _Optional[_Iterable[str]] = ..., child_families: _Optional[_Iterable[str]] = ..., is_held: bool = ..., ancestors: _Optional[_Iterable[str]] = ..., states: _Optional[_Iterable[str]] = ..., state_totals: _Optional[_Mapping[str, int]] = ..., is_held_total: _Optional[int] = ..., is_queued: bool = ..., is_queued_total: _Optional[int] = ..., is_runahead: bool = ..., is_runahead_total: _Optional[int] = ..., runtime: _Optional[_Union[PbRuntime, _Mapping]] = ..., graph_depth: _Optional[int] = ...) -> None: ...
467+
is_retry: bool
468+
is_wallclock: bool
469+
is_xtriggered: bool
470+
def __init__(self, stamp: _Optional[str] = ..., id: _Optional[str] = ..., cycle_point: _Optional[str] = ..., name: _Optional[str] = ..., family: _Optional[str] = ..., state: _Optional[str] = ..., depth: _Optional[int] = ..., first_parent: _Optional[str] = ..., child_tasks: _Optional[_Iterable[str]] = ..., child_families: _Optional[_Iterable[str]] = ..., is_held: bool = ..., ancestors: _Optional[_Iterable[str]] = ..., states: _Optional[_Iterable[str]] = ..., state_totals: _Optional[_Mapping[str, int]] = ..., is_held_total: _Optional[int] = ..., is_queued: bool = ..., is_queued_total: _Optional[int] = ..., is_runahead: bool = ..., is_runahead_total: _Optional[int] = ..., runtime: _Optional[_Union[PbRuntime, _Mapping]] = ..., graph_depth: _Optional[int] = ..., is_retry: bool = ..., is_wallclock: bool = ..., is_xtriggered: bool = ...) -> None: ...
459471

460472
class PbEdge(_message.Message):
461473
__slots__ = ("stamp", "id", "source", "target", "suicide", "cond")

cylc/flow/data_store_mgr.py

Lines changed: 91 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1526,6 +1526,8 @@ def _process_internal_task_proxy(
15261526
xtrig.satisfied = satisfied
15271527
self.xtrigger_tasks.setdefault(sig, set()).add((tproxy.id, label))
15281528

1529+
self._set_task_xtrigger_modifiers(tproxy)
1530+
15291531
if tproxy.state in self.latest_state_tasks:
15301532
tp_ref = itask.identity
15311533
tp_queue = self.latest_state_tasks[tproxy.state]
@@ -2014,6 +2016,15 @@ def update_family_proxies(self):
20142016
if self.updated_state_families:
20152017
self.state_update_follow_on = True
20162018

2019+
@staticmethod
2020+
def from_delta_or_node(tp_delta, tp_node, label):
2021+
"""Get an item from task proxy delta if available, falling back to
2022+
node otherwise."""
2023+
this = tp_delta
2024+
if this is None or not this.HasField(label):
2025+
this = tp_node
2026+
return getattr(this, label) or None
2027+
20172028
def _family_ascent_point_update(self, fp_id):
20182029
"""Updates the given family and children recursively.
20192030
@@ -2052,6 +2063,9 @@ def _family_ascent_point_update(self, fp_id):
20522063
is_held_total = 0
20532064
is_queued_total = 0
20542065
is_runahead_total = 0
2066+
is_retry = False
2067+
is_wallclock = False
2068+
is_xtriggered = False
20552069
graph_depth = self.n_edge_distance
20562070
for child_id in fam_node.child_families:
20572071
child_node = fp_updated.get(child_id, fp_data.get(child_id))
@@ -2071,31 +2085,28 @@ def _family_ascent_point_update(self, fp_id):
20712085
tp_delta = tp_updated.get(tp_id)
20722086
tp_node = tp_added.get(tp_id, tp_data.get(tp_id))
20732087

2074-
tp_state = tp_delta
2075-
if tp_state is None or not tp_state.HasField('state'):
2076-
tp_state = tp_node
2077-
if tp_state.state:
2078-
task_states.append(tp_state.state)
2088+
tp_state = self.from_delta_or_node(tp_delta, tp_node, 'state')
2089+
if tp_state:
2090+
task_states.append(tp_state)
20792091

2080-
tp_held = tp_delta
2081-
if tp_held is None or not tp_held.HasField('is_held'):
2082-
tp_held = tp_node
2083-
if tp_held.is_held:
2092+
if self.from_delta_or_node(tp_delta, tp_node, 'is_held'):
20842093
is_held_total += 1
20852094

2086-
tp_queued = tp_delta
2087-
if tp_queued is None or not tp_queued.HasField('is_queued'):
2088-
tp_queued = tp_node
2089-
if tp_queued.is_queued:
2095+
if self.from_delta_or_node(tp_delta, tp_node, 'is_queued'):
20902096
is_queued_total += 1
20912097

2092-
tp_runahead = tp_delta
2093-
if (tp_runahead is None
2094-
or not tp_runahead.HasField('is_runahead')):
2095-
tp_runahead = tp_node
2096-
if tp_runahead.is_runahead:
2098+
if self.from_delta_or_node(tp_delta, tp_node, 'is_runahead'):
20972099
is_runahead_total += 1
20982100

2101+
if self.from_delta_or_node(tp_delta, tp_node, 'is_retry'):
2102+
is_retry = True
2103+
2104+
if self.from_delta_or_node(tp_delta, tp_node, 'is_wallclock'):
2105+
is_wallclock = True
2106+
2107+
if self.from_delta_or_node(tp_delta, tp_node, 'is_xtriggered'):
2108+
is_xtriggered = True
2109+
20992110
tp_depth = tp_delta
21002111
if tp_depth is None or not tp_depth.HasField('graph_depth'):
21012112
tp_depth = tp_node
@@ -2114,7 +2125,10 @@ def _family_ascent_point_update(self, fp_id):
21142125
is_queued_total=is_queued_total,
21152126
is_runahead=(is_runahead_total > 0),
21162127
is_runahead_total=is_runahead_total,
2117-
graph_depth=graph_depth
2128+
is_retry=is_retry,
2129+
is_wallclock=is_wallclock,
2130+
is_xtriggered=is_xtriggered,
2131+
graph_depth=graph_depth,
21182132
)
21192133
fp_delta.states[:] = state_counter.keys()
21202134
# Use all states to clean up pruned counts
@@ -2522,19 +2536,72 @@ def delta_task_ext_trigger(
25222536
ext_trigger.time = update_time
25232537
self.updates_pending = True
25242538

2539+
@staticmethod
2540+
def _set_task_xtrigger_modifiers(node_or_delta):
2541+
# update the xtrigger task modifiers
2542+
node_or_delta.is_retry = False
2543+
node_or_delta.is_wallclock = False
2544+
node_or_delta.is_xtriggered = False
2545+
for xtrigger in node_or_delta.xtriggers.values():
2546+
if xtrigger.satisfied:
2547+
continue
2548+
if (
2549+
xtrigger.label.startswith('_cylc_retry')
2550+
or xtrigger.label.startswith('_cylc_submit_retry')
2551+
):
2552+
node_or_delta.is_retry = True
2553+
elif xtrigger.id.startswith('wall_clock'):
2554+
node_or_delta.is_wallclock = True
2555+
else:
2556+
node_or_delta.is_xtriggered = True
2557+
25252558
def _delta_xtrigger(
2526-
self, tp_id: str, label: str, sig: str, satisfied: bool,
2527-
t_update: float
2559+
self,
2560+
tp_id: str,
2561+
label: str,
2562+
sig: str,
2563+
satisfied: bool,
2564+
update_time: float,
25282565
) -> None:
25292566
"""Helper for the two xtrigger delta methods."""
2567+
# fetch the task from the store
2568+
tp_id, tproxy = self.store_node_fetcher(Tokens(tp_id))
2569+
if not tproxy:
2570+
return
2571+
2572+
# create or fetch the updated delta
25302573
tp_delta = self.updated[TASK_PROXIES].setdefault(
2531-
tp_id, PbTaskProxy(id=tp_id))
2532-
tp_delta.stamp = f'{tp_id}@{t_update}'
2574+
tp_id, PbTaskProxy(id=tp_id)
2575+
)
2576+
tp_delta.stamp = f'{tp_id}@{update_time}'
2577+
2578+
# populate all xtriggers on the delta if not already present
2579+
if not tp_delta.xtriggers:
2580+
# NOTE: if one xtrigger changes, we must include all in the
2581+
# delta, see https://github.com/cylc/cylc-flow/issues/6307
2582+
for _sig, xtrigger in tproxy.xtriggers.items():
2583+
if _sig == sig:
2584+
# don't copy the xtrigger we are changing
2585+
continue
2586+
_xtrigger = tp_delta.xtriggers[_sig]
2587+
_xtrigger.id = xtrigger.id
2588+
_xtrigger.label = xtrigger.label
2589+
_xtrigger.satisfied = xtrigger.satisfied
2590+
_xtrigger.time = xtrigger.time
2591+
2592+
# modify the xtrigger that has changed
25332593
xtrigger = tp_delta.xtriggers[sig]
25342594
xtrigger.id = sig
25352595
xtrigger.label = label
25362596
xtrigger.satisfied = satisfied
2537-
xtrigger.time = t_update
2597+
xtrigger.time = update_time
2598+
2599+
# update the xtrigger task modifiers
2600+
self._set_task_xtrigger_modifiers(tp_delta)
2601+
2602+
# ensure family modifier counts are updated
2603+
self.state_update_families.add(tproxy.first_parent)
2604+
25382605
self.updates_pending = True
25392606

25402607
def delta_xtrigger(self, sig: str, succeeded: bool) -> None:

0 commit comments

Comments
 (0)