@@ -4,9 +4,11 @@ import sys
44import argparse
55import subprocess
66import os
7+ import signal
78import shlex
89import json
910import time
11+ import shutil
1012import requests
1113
1214from utils import find_available_port
@@ -441,6 +443,11 @@ if __name__ == "__main__":
441443 "uvm" ,
442444 "palladium" ,
443445 ]:
446+ # 0. Clean up stale BullMQ/Redis data to prevent replaying old events
447+ aof_dir = os .path .join (workflow_dir , ".motia" , "appendonlydir" )
448+ if os .path .exists (aof_dir ):
449+ shutil .rmtree (aof_dir )
450+
444451 # 1. Start service in background ================================
445452 # If port is specified, use the specified port; otherwise, automatically assign port
446453 if args .port :
@@ -451,7 +458,8 @@ if __name__ == "__main__":
451458 proc = subprocess .Popen (
452459 ["pnpm" , "dev" , "--port" , str (available_port )],
453460 cwd = workflow_dir ,
454- stderr = subprocess .DEVNULL # Suppress stderr to hide BullMQ errors
461+ stderr = subprocess .DEVNULL , # Suppress stderr to hide BullMQ errors
462+ preexec_fn = os .setsid # Create new process group
455463 )
456464
457465 # Wait for service to start
@@ -479,7 +487,11 @@ if __name__ == "__main__":
479487 check = False ,
480488 text = True ,
481489 )
482- proc .terminate ()
490+ # Kill entire process group to clean up all child processes
491+ try :
492+ os .killpg (os .getpgid (proc .pid ), signal .SIGTERM )
493+ except ProcessLookupError :
494+ pass
483495 proc .wait ()
484496 sys .exit (1 )
485497
@@ -510,7 +522,8 @@ if __name__ == "__main__":
510522 # 3. Shutdown service ================================
511523 # Give observability plugin time to finish async operations (e.g., Redis writes)
512524 time .sleep (1 )
513- proc .terminate ()
525+ # Kill entire process group (pnpm -> motia -> node) to avoid orphan processes
526+ os .killpg (os .getpgid (proc .pid ), signal .SIGTERM )
514527 proc .wait ()
515528 print (
516529 f"\n Task completed. Command running on http://localhost:{ available_port } is finished"
0 commit comments