Skip to content

Commit 66540ca

Browse files
authored
fix: wait for draining engine before starting again (FIR-50579) (#474)
1 parent a02f399 commit 66540ca

4 files changed

Lines changed: 293 additions & 14 deletions

File tree

src/firebolt/model/V2/database.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ def delete(self) -> None:
8383
for engine in self.get_attached_engines():
8484
if engine.current_status in {
8585
EngineStatus.STARTING,
86+
EngineStatus.DRAINING,
8687
EngineStatus.STOPPING,
8788
}:
8889
raise AttachedEngineInUseError(method_name="delete")

src/firebolt/model/V2/engine.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,11 @@ def _wait_for_start_stop(self) -> None:
109109
wait_timeout = 3600
110110
interval_seconds = 5
111111
timeout_time = time.time() + wait_timeout
112-
while self.current_status in (EngineStatus.STOPPING, EngineStatus.STARTING):
112+
while self.current_status in (
113+
EngineStatus.DRAINING,
114+
EngineStatus.STOPPING,
115+
EngineStatus.STARTING,
116+
):
113117
logger.info(
114118
f"Engine {self.name} is currently "
115119
f"{self.current_status.value.lower()}, waiting"
@@ -136,7 +140,7 @@ def start(self) -> Engine:
136140
if self.current_status == EngineStatus.RUNNING:
137141
logger.info(f"Engine {self.name} is already running.")
138142
return self
139-
if self.current_status in (EngineStatus.DROPPING, EngineStatus.REPAIRING):
143+
if self.current_status in (EngineStatus.DRAINING,):
140144
raise ValueError(
141145
f"Unable to start engine {self.name} because it's "
142146
f"in {self.current_status.value.lower()} state"
@@ -159,7 +163,7 @@ def stop(self) -> Engine:
159163
if self.current_status == EngineStatus.STOPPED:
160164
logger.info(f"Engine {self.name} is already stopped.")
161165
return self
162-
if self.current_status in (EngineStatus.DROPPING, EngineStatus.REPAIRING):
166+
if self.current_status in (EngineStatus.DRAINING,):
163167
raise ValueError(
164168
f"Unable to stop engine {self.name} because it's "
165169
f"in {self.current_status.value.lower()} state"
@@ -202,7 +206,7 @@ def update(
202206

203207
self.refresh()
204208
self._wait_for_start_stop()
205-
if self.current_status in (EngineStatus.DROPPING, EngineStatus.REPAIRING):
209+
if self.current_status in (EngineStatus.DRAINING,):
206210
raise ValueError(
207211
f"Unable to update engine {self.name} because it's "
208212
f"in {self.current_status.value.lower()} state"
@@ -239,7 +243,7 @@ def update(
239243
def delete(self) -> None:
240244
"""Delete an engine."""
241245
self.refresh()
242-
if self.current_status in [EngineStatus.DROPPING, EngineStatus.DELETING]:
246+
if self.current_status in [EngineStatus.DRAINING, EngineStatus.DELETING]:
243247
return
244248
with self._service._connection.cursor() as c:
245249
c.execute(self.DROP_SQL.format(self.name))

src/firebolt/service/V2/types.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,9 @@ class EngineStatus(Enum):
88
"""
99

1010
STARTING = "STARTING"
11-
STARTED = "STARTED"
1211
RUNNING = "RUNNING"
1312
STOPPING = "STOPPING"
1413
STOPPED = "STOPPED"
15-
DROPPING = "DROPPING"
16-
REPAIRING = "REPAIRING"
17-
FAILED = "FAILED"
1814
DELETING = "DELETING"
1915
RESIZING = "RESIZING"
2016
DRAINING = "DRAINING"

tests/unit/service/test_engine.py

Lines changed: 283 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from typing import Callable, Union
2+
from unittest.mock import MagicMock, patch
23

34
from httpx import Request
45
from pytest import mark, raises
@@ -13,6 +14,43 @@
1314
from tests.unit.service.conftest import get_objects_from_db_callback
1415

1516

17+
def create_mock_engine_with_status_transitions(mock_engine: Engine, statuses: list):
18+
"""
19+
Helper function to create a callback that simulates engine status transitions.
20+
21+
Args:
22+
mock_engine: The base engine object to use for creating responses
23+
statuses: List of EngineStatus values to cycle through on subsequent calls
24+
25+
Returns:
26+
A callback function that can be used with HTTPXMock
27+
"""
28+
call_count = [0]
29+
30+
def get_engine_callback_with_transitions(request: Request) -> Response:
31+
# Return different statuses based on call count
32+
current_status = statuses[min(call_count[0], len(statuses) - 1)]
33+
call_count[0] += 1
34+
35+
engine_data = Engine(
36+
name=mock_engine.name,
37+
region=mock_engine.region,
38+
spec=mock_engine.spec,
39+
scale=mock_engine.scale,
40+
current_status=current_status,
41+
version=mock_engine.version,
42+
endpoint=mock_engine.endpoint,
43+
warmup=mock_engine.warmup,
44+
auto_stop=mock_engine.auto_stop,
45+
type=mock_engine.type,
46+
_database_name=mock_engine._database_name,
47+
_service=None,
48+
)
49+
return get_objects_from_db_callback([engine_data])(request)
50+
51+
return get_engine_callback_with_transitions
52+
53+
1654
def test_engine_create(
1755
httpx_mock: HTTPXMock,
1856
engine_name: str,
@@ -230,17 +268,12 @@ def test_engine_new_status(
230268
("M", InstanceType.M, "STOPPED", EngineStatus.STOPPED),
231269
("L", InstanceType.L, "STARTING", EngineStatus.STARTING),
232270
("XL", InstanceType.XL, "STOPPING", EngineStatus.STOPPING),
233-
# Test InstanceType enum values directly
234-
(InstanceType.S, InstanceType.S, "FAILED", EngineStatus.FAILED),
235-
(InstanceType.M, InstanceType.M, "REPAIRING", EngineStatus.REPAIRING),
236271
# Test unknown/invalid values that should default to UNKNOWN
237272
("INVALID_TYPE", InstanceType.UNKNOWN, "INVALID_STATUS", EngineStatus.UNKNOWN),
238273
("XXL", InstanceType.UNKNOWN, "WEIRD_STATE", EngineStatus.UNKNOWN),
239274
# Test empty strings that should default to UNKNOWN
240275
("", InstanceType.UNKNOWN, "", EngineStatus.UNKNOWN),
241276
# Test all valid EngineStatus values with M instance type
242-
("M", InstanceType.M, "STARTED", EngineStatus.STARTED),
243-
("M", InstanceType.M, "DROPPING", EngineStatus.DROPPING),
244277
("M", InstanceType.M, "DELETING", EngineStatus.DELETING),
245278
("M", InstanceType.M, "RESIZING", EngineStatus.RESIZING),
246279
("M", InstanceType.M, "DRAINING", EngineStatus.DRAINING),
@@ -276,3 +309,248 @@ def test_engine_instantiation_with_different_configurations(
276309
assert engine.name == "test_engine"
277310
assert engine.region == "us-east-1"
278311
assert engine.scale == 2
312+
313+
314+
@patch("time.sleep")
315+
@patch("time.time")
316+
def test_engine_start_waits_for_draining_to_stop(
317+
mock_time: MagicMock,
318+
mock_sleep: MagicMock,
319+
httpx_mock: HTTPXMock,
320+
resource_manager: ResourceManager,
321+
mock_engine: Engine,
322+
system_engine_no_db_query_url: str,
323+
):
324+
"""
325+
Test that start() waits for an engine in DRAINING state to become STOPPED
326+
before proceeding with the start operation.
327+
"""
328+
# Set up time mock to avoid timeout - return incrementing values
329+
mock_time.return_value = 0 # Always return early time to avoid timeout
330+
331+
# Set up mock responses: DRAINING -> STOPPED -> STOPPED (after start command)
332+
callback = create_mock_engine_with_status_transitions(
333+
mock_engine,
334+
[
335+
EngineStatus.DRAINING, # Initial state
336+
EngineStatus.STOPPED, # After first refresh in _wait_for_start_stop
337+
EngineStatus.STOPPED, # After start command, final refresh
338+
],
339+
)
340+
341+
httpx_mock.add_callback(
342+
callback, url=system_engine_no_db_query_url, is_reusable=True
343+
)
344+
345+
# Set up the engine with proper service
346+
mock_engine._service = resource_manager.engines
347+
348+
# Call start method
349+
result = mock_engine.start()
350+
351+
# Verify that sleep was called (indicating it waited for DRAINING state)
352+
mock_sleep.assert_called_with(5)
353+
354+
# Verify the engine is returned
355+
assert result is mock_engine
356+
assert result.current_status == EngineStatus.STOPPED
357+
358+
359+
@patch("time.sleep")
360+
@patch("time.time")
361+
def test_engine_start_waits_for_stopping_to_stop(
362+
mock_time: MagicMock,
363+
mock_sleep: MagicMock,
364+
httpx_mock: HTTPXMock,
365+
resource_manager: ResourceManager,
366+
mock_engine: Engine,
367+
system_engine_no_db_query_url: str,
368+
):
369+
"""
370+
Test that start() waits for an engine in STOPPING state to become STOPPED
371+
before proceeding with the start operation.
372+
"""
373+
# Set up time mock to avoid timeout
374+
mock_time.return_value = 0 # Always return early time to avoid timeout
375+
376+
# Set up mock responses: STOPPING -> STOPPED -> STOPPED (after start command)
377+
callback = create_mock_engine_with_status_transitions(
378+
mock_engine,
379+
[
380+
EngineStatus.STOPPING, # Initial state
381+
EngineStatus.STOPPED, # After first refresh in _wait_for_start_stop
382+
EngineStatus.STOPPED, # After start command, final refresh
383+
],
384+
)
385+
386+
httpx_mock.add_callback(
387+
callback, url=system_engine_no_db_query_url, is_reusable=True
388+
)
389+
390+
# Set up the engine with proper service
391+
mock_engine._service = resource_manager.engines
392+
393+
# Call start method
394+
result = mock_engine.start()
395+
396+
# Verify that sleep was called (indicating it waited for STOPPING state)
397+
mock_sleep.assert_called_with(5)
398+
399+
# Verify the engine is returned
400+
assert result is mock_engine
401+
assert result.current_status == EngineStatus.STOPPED
402+
403+
404+
@patch("time.sleep")
405+
@patch("time.time")
406+
def test_engine_stop_waits_for_draining_to_stop(
407+
mock_time: MagicMock,
408+
mock_sleep: MagicMock,
409+
httpx_mock: HTTPXMock,
410+
resource_manager: ResourceManager,
411+
mock_engine: Engine,
412+
system_engine_no_db_query_url: str,
413+
):
414+
"""
415+
Test that stop() waits for an engine in DRAINING state to finish draining
416+
before proceeding with the stop operation.
417+
"""
418+
# Set up time mock to avoid timeout
419+
mock_time.return_value = 0 # Always return early time to avoid timeout
420+
421+
# Set up mock responses: DRAINING -> RUNNING -> STOPPED (after stop command)
422+
callback = create_mock_engine_with_status_transitions(
423+
mock_engine,
424+
[
425+
EngineStatus.DRAINING, # Initial state
426+
EngineStatus.RUNNING, # After first refresh in _wait_for_start_stop
427+
EngineStatus.STOPPED, # After stop command, final refresh
428+
],
429+
)
430+
431+
httpx_mock.add_callback(
432+
callback, url=system_engine_no_db_query_url, is_reusable=True
433+
)
434+
435+
# Set up the engine with proper service
436+
mock_engine._service = resource_manager.engines
437+
438+
# Call stop method
439+
result = mock_engine.stop()
440+
441+
# Verify that sleep was called (indicating it waited for DRAINING state)
442+
mock_sleep.assert_called_with(5)
443+
444+
# Verify the engine is returned
445+
assert result is mock_engine
446+
assert result.current_status == EngineStatus.STOPPED
447+
448+
449+
@patch("time.sleep")
450+
@patch("time.time")
451+
def test_engine_wait_for_start_stop_timeout(
452+
mock_time: MagicMock,
453+
mock_sleep: MagicMock,
454+
httpx_mock: HTTPXMock,
455+
resource_manager: ResourceManager,
456+
mock_engine: Engine,
457+
system_engine_no_db_query_url: str,
458+
):
459+
"""
460+
Test that _wait_for_start_stop raises TimeoutError when engine stays in
461+
transitional state too long.
462+
"""
463+
# Mock time.time to simulate timeout using a function that tracks calls
464+
call_count = [0]
465+
466+
def mock_time_function():
467+
call_count[0] += 1
468+
# Return normal time for first few calls, then timeout for _wait_for_start_stop
469+
if call_count[0] <= 5:
470+
return 0 # Early time
471+
else:
472+
return 3601 # Past timeout
473+
474+
mock_time.side_effect = mock_time_function
475+
476+
def get_engine_callback_always_starting(request: Request) -> Response:
477+
# Always return STARTING to simulate stuck state
478+
engine_data = Engine(
479+
name=mock_engine.name,
480+
region=mock_engine.region,
481+
spec=mock_engine.spec,
482+
scale=mock_engine.scale,
483+
current_status=EngineStatus.STARTING, # Always starting
484+
version=mock_engine.version,
485+
endpoint=mock_engine.endpoint,
486+
warmup=mock_engine.warmup,
487+
auto_stop=mock_engine.auto_stop,
488+
type=mock_engine.type,
489+
_database_name=mock_engine._database_name,
490+
_service=None,
491+
)
492+
return get_objects_from_db_callback([engine_data])(request)
493+
494+
httpx_mock.add_callback(
495+
get_engine_callback_always_starting,
496+
url=system_engine_no_db_query_url,
497+
is_reusable=True,
498+
)
499+
500+
# Set up the engine with proper service
501+
mock_engine._service = resource_manager.engines
502+
503+
# Call start method and expect TimeoutError
504+
with raises(TimeoutError, match="Excedeed timeout of 3600s waiting for.*starting"):
505+
mock_engine.start()
506+
507+
508+
@patch("time.sleep")
509+
@patch("time.time")
510+
def test_engine_start_already_running_no_wait(
511+
mock_time: MagicMock,
512+
mock_sleep: MagicMock,
513+
httpx_mock: HTTPXMock,
514+
resource_manager: ResourceManager,
515+
mock_engine: Engine,
516+
system_engine_no_db_query_url: str,
517+
):
518+
"""
519+
Test that start() doesn't wait when engine is already RUNNING.
520+
"""
521+
# Mock time to avoid any timeout issues
522+
mock_time.return_value = 0
523+
524+
def get_engine_callback_running(request: Request) -> Response:
525+
engine_data = Engine(
526+
name=mock_engine.name,
527+
region=mock_engine.region,
528+
spec=mock_engine.spec,
529+
scale=mock_engine.scale,
530+
current_status=EngineStatus.RUNNING,
531+
version=mock_engine.version,
532+
endpoint=mock_engine.endpoint,
533+
warmup=mock_engine.warmup,
534+
auto_stop=mock_engine.auto_stop,
535+
type=mock_engine.type,
536+
_database_name=mock_engine._database_name,
537+
_service=None,
538+
)
539+
return get_objects_from_db_callback([engine_data])(request)
540+
541+
httpx_mock.add_callback(
542+
get_engine_callback_running, url=system_engine_no_db_query_url, is_reusable=True
543+
)
544+
545+
# Set up the engine with proper service
546+
mock_engine._service = resource_manager.engines
547+
548+
# Call start method
549+
result = mock_engine.start()
550+
551+
# Verify that no sleep was called (no waiting happened)
552+
mock_sleep.assert_not_called()
553+
554+
# Verify the engine is returned
555+
assert result is mock_engine
556+
assert result.current_status == EngineStatus.RUNNING

0 commit comments

Comments
 (0)