-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathslurm_utils.py
More file actions
98 lines (80 loc) · 3.29 KB
/
Copy pathslurm_utils.py
File metadata and controls
98 lines (80 loc) · 3.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import os
import tempfile
import shutil
from contextlib import contextmanager
from logger import Log
# These are not in the Context object so we cache this in case folks just want to dive directly into a Chunk()
# without checking any other contextual information.
slurm_array_job_id = os.getenv("SLURM_ARRAY_JOB_ID", "0")
slurm_array_task_id = int(os.getenv("SLURM_ARRAY_TASK_ID", "0"))
slurm_array_task_count = int(os.getenv("SLURM_ARRAY_TASK_COUNT", "1"))
class SlurmContext:
def __init__(self, logger=None, verbosity=0):
self.log = logger if logger is not None else Log(verbosity)
self.log.verbosity = verbosity
self.job_id = slurm_array_job_id
self.task_id = slurm_array_task_id
self.task_count = slurm_array_task_count
self.cache_path = os.path.join( ".cache", str(self.job_id), str(self.task_id) )
self.output_path = tempfile.gettempdir()
self.temp_files = []
self.log.debug(f"Initialized SlurmContext with job_id={self.job_id}, task_id={self.task_id}, task_count={self.task_count}")
self.log.debug(f"Cache path: {self.cache_path}")
if( not os.path.exists(self.cache_path) ):
self.log.debug(f" - Cache directory was missing, created it!")
os.makedirs(self.cache_path)
self.log.debug(f"Output path: {self.output_path}")
@contextmanager
def mkTempFile(self, name: str|None = None, mode: str = "w"):
if name is None:
name = f"temp_{len(self.temp_files)}.tmp"
path = os.path.join(self.output_path, f"{self.job_id}_{self.task_id}_{name}")
self.log.debug(f"Creating temporary file: {path}")
try:
with open(path, mode) as f:
self.temp_files.append(path)
yield f
finally:
pass
@staticmethod
def is_array():
return "SLURM_ARRAY_TASK_ID" in os.environ
def collect(self, output = "./out" ):
self.log.debug("Collecting temporary files...")
self.log.debug(f" - Running as a { 'SLURM array' if self.is_array() else 'single process' } job, collecting temporary files.")
if( not os.path.exists(output) ):
self.log.debug(f" - Output directory was missing, created it!")
os.makedirs(output)
for path in self.temp_files:
dest = os.path.join(output, os.path.basename(path))
self.log.debug(f" - {path} -> {dest}")
shutil.move(path, dest)
class SlurmUtils:
@staticmethod
def chunks(xs: list, n: int):
n = max(1, n)
return (xs[i:i+n] for i in range(0, len(xs), n))
@staticmethod
@contextmanager
def Chunk( files: list, context: SlurmContext|None = None ):
if( not context ):
context = SlurmContext()
if( not context.is_array() ):
yield files
return
chunk = list(SlurmUtils.chunks( files, context.task_count ))[context.task_id]
try:
yield chunk
finally:
pass
@staticmethod
@contextmanager
def Context(logger=None, verbosity=0):
context = None
try:
context = SlurmContext(logger, verbosity)
yield context
finally:
pass
Chunk = SlurmUtils.Chunk
Context = SlurmUtils.Context