-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_sentinel_lock.py
More file actions
104 lines (84 loc) · 2.73 KB
/
test_sentinel_lock.py
File metadata and controls
104 lines (84 loc) · 2.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import os
import tempfile
import time
import threading
from sentinel_lock import acquire_lock, release_lock, LockPriority
def test_acquire_and_release():
"""Basic acquire and release cycle."""
with tempfile.NamedTemporaryFile(delete=False) as f:
lock_path = f.name
try:
fd = acquire_lock(lock_path, LockPriority.P2_ACCUMULATOR)
assert fd is not None
release_lock(fd)
finally:
os.unlink(lock_path)
def test_p0_never_blocks():
"""P0 (judge) acquires even when lock is held."""
with tempfile.NamedTemporaryFile(delete=False) as f:
lock_path = f.name
try:
held = threading.Event()
release_evt = threading.Event()
def hold_lock():
fd = acquire_lock(lock_path, LockPriority.P2_ACCUMULATOR)
held.set()
release_evt.wait()
release_lock(fd)
t = threading.Thread(target=hold_lock)
t.start()
held.wait()
fd = acquire_lock(lock_path, LockPriority.P0_JUDGE)
assert fd is None # P0 skips lock, proceeds anyway
release_evt.set()
t.join()
finally:
os.unlink(lock_path)
def test_p1_times_out():
"""P1 (synthesizer) times out after short wait."""
with tempfile.NamedTemporaryFile(delete=False) as f:
lock_path = f.name
try:
held = threading.Event()
release_evt = threading.Event()
def hold_lock():
fd = acquire_lock(lock_path, LockPriority.P2_ACCUMULATOR)
held.set()
release_evt.wait()
release_lock(fd)
t = threading.Thread(target=hold_lock)
t.start()
held.wait()
t0 = time.monotonic()
fd = acquire_lock(lock_path, LockPriority.P1_SYNTHESIZER, timeout_s=1)
elapsed = time.monotonic() - t0
assert fd is None
assert elapsed >= 0.9
release_evt.set()
t.join()
finally:
os.unlink(lock_path)
def test_p3_scribe_times_out():
"""P3 (scribe) times out after waiting when lock is held."""
with tempfile.NamedTemporaryFile(delete=False) as f:
lock_path = f.name
try:
held = threading.Event()
release_evt = threading.Event()
def hold_lock():
fd = acquire_lock(lock_path, LockPriority.P2_ACCUMULATOR)
held.set()
release_evt.wait()
release_lock(fd)
t = threading.Thread(target=hold_lock)
t.start()
held.wait()
t0 = time.monotonic()
fd = acquire_lock(lock_path, LockPriority.P3_SCRIBE, timeout_s=1)
elapsed = time.monotonic() - t0
assert fd is None
assert elapsed >= 0.9
release_evt.set()
t.join()
finally:
os.unlink(lock_path)