Skip to content

Commit 4c3656e

Browse files
ionox0mr-c
authored andcommitted
add lock for multi-threaded directory creation (#1040)
1 parent b3639a4 commit 4c3656e

File tree

4 files changed

+36
-12
lines changed

4 files changed

+36
-12
lines changed

cwltool/command_line_tool.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,10 @@ def __init__(self,
9595
self.script = script
9696
self.prov_obj = None # type: Optional[ProvenanceProfile]
9797

98-
def run(self, runtimeContext): # type: (RuntimeContext) -> None
98+
def run(self,
99+
runtimeContext, # type: RuntimeContext
100+
tmpdir_lock=None # type: threading.Lock
101+
): # type: (...) -> None
99102
try:
100103
normalizeFilesDirs(self.builder.job)
101104
ev = self.builder.do_eval(self.script)

cwltool/executors.py

+6-3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import os
55
import tempfile
66
import threading
7+
from threading import Lock
78
from abc import ABCMeta, abstractmethod
89
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union
910

@@ -25,6 +26,8 @@
2526
from .utils import DEFAULT_TMP_PREFIX
2627
from .workflow import Workflow, WorkflowJob, WorkflowJobStep
2728

29+
TMPDIR_LOCK = Lock()
30+
2831

2932
class JobExecutor(with_metaclass(ABCMeta, object)):
3033
""" Abstract base job executor. """
@@ -229,10 +232,10 @@ def select_resources(self, request, runtime_context): # pylint: disable=unused-
229232

230233
return result
231234

232-
def _runner(self, job, runtime_context):
235+
def _runner(self, job, runtime_context, TMPDIR_LOCK):
233236
""" Job running thread. """
234237
try:
235-
job.run(runtime_context)
238+
job.run(runtime_context, TMPDIR_LOCK)
236239
except WorkflowException as err:
237240
_logger.exception("Got workflow error")
238241
self.exceptions.append(err)
@@ -293,7 +296,7 @@ def run_job(self,
293296
n += 1
294297
continue
295298

296-
thread = threading.Thread(target=self._runner, args=(job, runtime_context))
299+
thread = threading.Thread(target=self._runner, args=(job, runtime_context, TMPDIR_LOCK))
297300
thread.daemon = True
298301
self.threads.add(thread)
299302
if isinstance(job, JobBase):

cwltool/job.py

+25-7
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import functools
55
import itertools
66
import logging
7+
import threading
78
import os
89
import re
910
import shutil
@@ -207,7 +208,8 @@ def __repr__(self):
207208

208209
@abstractmethod
209210
def run(self,
210-
runtimeContext # type: RuntimeContext
211+
runtimeContext, # type: RuntimeContext
212+
tmpdir_lock=None # type: threading.Lock
211213
): # type: (...) -> None
212214
pass
213215

@@ -421,11 +423,18 @@ def get_tree_mem_usage(memory_usage):
421423

422424
class CommandLineJob(JobBase):
423425
def run(self,
424-
runtimeContext # type: RuntimeContext
426+
runtimeContext, # type: RuntimeContext
427+
tmpdir_lock=None # type: threading.Lock
425428
): # type: (...) -> None
426429

427-
if not os.path.exists(self.tmpdir):
428-
os.makedirs(self.tmpdir)
430+
if tmpdir_lock:
431+
with tmpdir_lock:
432+
if not os.path.exists(self.tmpdir):
433+
os.makedirs(self.tmpdir)
434+
else:
435+
if not os.path.exists(self.tmpdir):
436+
os.makedirs(self.tmpdir)
437+
429438
self._setup(runtimeContext)
430439

431440
env = self.environment
@@ -587,9 +596,18 @@ def add_volumes(self,
587596
pathmapper.update(
588597
key, new_path, vol.target, vol.type, vol.staged)
589598

590-
def run(self, runtimeContext): # type: (RuntimeContext) -> None
591-
if not os.path.exists(self.tmpdir):
592-
os.makedirs(self.tmpdir)
599+
def run(self,
600+
runtimeContext, # type: RuntimeContext
601+
tmpdir_lock=None # type: threading.Lock
602+
): # type: (...) -> None
603+
if tmpdir_lock:
604+
with tmpdir_lock:
605+
if not os.path.exists(self.tmpdir):
606+
os.makedirs(self.tmpdir)
607+
else:
608+
if not os.path.exists(self.tmpdir):
609+
os.makedirs(self.tmpdir)
610+
593611
(docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
594612
self.prov_obj = runtimeContext.prov_obj
595613
img_id = None

cwltool/workflow.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -412,7 +412,7 @@ def valueFromFunc(k, v): # type: (Any, Any) -> Any
412412
step.completed = True
413413

414414

415-
def run(self, runtimeContext):
415+
def run(self, runtimeContext, tmpdir_lock=None):
416416
'''
417417
logs the start of each workflow
418418
'''

0 commit comments

Comments
 (0)