-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsentinel_lock.py
More file actions
73 lines (59 loc) · 1.99 KB
/
sentinel_lock.py
File metadata and controls
73 lines (59 loc) · 1.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
"""
GPU coordination lock for Sentinel.
Four priority levels share one flock-based lock:
P0 (Judge) — non-blocking try, proceeds regardless
P1 (Synthesizer) — blocking with short timeout
P2 (Accumulator) — blocking with long timeout
P3 (Scribe) — blocking with medium timeout (lowest priority)
"""
import fcntl
import os
import time
from enum import IntEnum
from typing import Optional
class LockPriority(IntEnum):
P0_JUDGE = 0
P1_SYNTHESIZER = 1
P2_ACCUMULATOR = 2
P3_SCRIBE = 3
_DEFAULT_TIMEOUTS = {
LockPriority.P0_JUDGE: 0,
LockPriority.P1_SYNTHESIZER: 5,
LockPriority.P2_ACCUMULATOR: 30,
LockPriority.P3_SCRIBE: 10,
}
def acquire_lock(lock_path: str, priority: LockPriority,
timeout_s: Optional[float] = None) -> Optional[int]:
"""Acquire the GPU lock file. Returns fd on success, None on failure/skip.
P0: Non-blocking try. Returns None if locked (caller proceeds without lock).
P1/P2: Poll with timeout. Returns None on timeout.
"""
if timeout_s is None:
timeout_s = _DEFAULT_TIMEOUTS[priority]
os.makedirs(os.path.dirname(lock_path) or ".", exist_ok=True)
fd = os.open(lock_path, os.O_CREAT | os.O_RDWR)
if priority == LockPriority.P0_JUDGE:
try:
fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
return fd
except (BlockingIOError, OSError):
os.close(fd)
return None
deadline = time.monotonic() + timeout_s
while True:
try:
fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
return fd
except (BlockingIOError, OSError):
if time.monotonic() >= deadline:
os.close(fd)
return None
time.sleep(1.0)
def release_lock(fd: Optional[int]) -> None:
"""Release and close the lock file descriptor."""
if fd is not None:
try:
fcntl.flock(fd, fcntl.LOCK_UN)
os.close(fd)
except OSError:
pass