@@ -53,6 +53,9 @@ def _validate_lock_file(lock_file_path: Path) -> LockFile:
5353 return read_file
5454
5555
56+ import socket
57+
58+
5659class DaemonConnector :
5760 """Connects to the LSP daemon via socket to execute commands."""
5861
@@ -61,37 +64,25 @@ def __init__(self, project_path: Path, lock_file: LockFile):
6164 self .lock_file = lock_file
6265 self .renderer = JanitorStateRenderer ()
6366
64- def _open_connection (self ) -> t .Any :
65- """Open connection (named pipe or Unix socket) for communication."""
66- if self .lock_file .communication is None :
67- raise ValueError ("Lock file does not contain communication information" )
68-
69- comm_mode = self .lock_file .communication .type
70-
71- if isinstance (comm_mode , DaemonCommunicationModeUnixSocket ):
72- socket_path = comm_mode .socket
73-
74- # Check if it's a Unix domain socket or named pipe
75- if socket_path .endswith (".sock" ):
76- # Unix domain socket
77- import socket
78-
79- sock = socket .socket (socket .AF_UNIX , socket .SOCK_STREAM )
80- sock .connect (socket_path )
81-
82- # Convert to file-like object
83- import io
84-
85- return io .BufferedRWPair (
86- io .BufferedReader (io .FileIO (sock .fileno (), mode = "r" , closefd = False )),
87- io .BufferedWriter (io .FileIO (sock .fileno (), mode = "w" , closefd = False )),
88- )
89- else :
90- # Named pipe
91- return open (socket_path , "r+b" )
67+ def _open_connection (self ) -> tuple [t .BinaryIO , t .BinaryIO ]:
68+ lock_file = self .lock_file
69+ communication = lock_file .communication
70+ print (f"Using communication mode: { communication } " )
71+ if communication is None :
72+ raise ValueError ("not correct" )
73+
74+ if isinstance (communication .type , DaemonCommunicationModeUnixSocket ):
75+ print ("Opening Unix socket connection..." )
76+ sock = socket .socket (socket .AF_UNIX , socket .SOCK_STREAM )
77+ sock .connect (communication .type .socket )
78+ print (f"Connected to Unix socket at { communication .type .socket } " )
79+ rfile = sock .makefile ("rb" , buffering = 0 )
80+ wfile = sock .makefile ("wb" , buffering = 0 )
81+ print ("Connected to daemon via Unix socket." )
82+ return rfile , wfile
9283 else :
93- raise ValueError (f "Only Unix socket communication is supported" )
94-
84+ raise ValueError ("Only Unix socket communication is supported" )
85+
9586 def _send_jsonrpc_request (self , connection : t .Any , method : str , params : dict ) -> str :
9687 """Send a JSON-RPC request over the connection and return the request ID."""
9788 request_id = str (uuid .uuid4 ())
@@ -152,61 +143,45 @@ def _read_jsonrpc_response(self, connection: t.Any, expected_id: str) -> t.Any:
152143 return message .get ("result" , {})
153144
154145 def call_janitor (self , ignore_ttl : bool = False ) -> bool :
155- """Call the janitor command through the LSP daemon."""
156- connection = None
146+ rfile = wfile = None
157147 try :
158- connection = self ._open_connection ()
148+ rfile , wfile = self ._open_connection ()
159149
160- # Send the janitor request via JSON-RPC
161150 request = LSPCLICallRequest (
162151 arguments = ["janitor" ] + (["--ignore-ttl" ] if ignore_ttl else [])
163152 )
153+ request_id = self ._send_jsonrpc_request (wfile , "sqlmesh/cli/call" , request .model_dump ())
164154
165- request_id = self ._send_jsonrpc_request (
166- connection , "sqlmesh/cli/call" , request .model_dump ()
167- )
168-
169- # Listen for notifications and the final response
170155 with self .renderer as renderer :
171156 while True :
172157 try :
173- # Read any JSON-RPC message (response or notification)
174- message_data = self ._read_jsonrpc_message (connection )
175-
176- # Check if it's the response to our request
158+ message_data = self ._read_jsonrpc_message (rfile )
177159 if "id" in message_data and message_data ["id" ] == request_id :
178- # This is the response to our request
179160 result = message_data .get ("result" , {})
180161 if result .get ("state" ) == "finished" :
181162 return True
182163 elif result .get ("state" ) == "error" :
183164 print (f"Error from daemon: { result .get ('message' , 'Unknown error' )} " )
184165 return False
185-
186- # Check if it's a notification with updates
187166 elif message_data .get ("method" ) == "sqlmesh/cli/update" :
188167 params = message_data .get ("params" , {})
189168 if params .get ("state" ) == "ongoing" :
190- # Parse the janitor state and render it
191169 message = params .get ("message" , {})
192170 if "state" in message :
193171 janitor_state = JanitorState .model_validate (message )
194172 renderer .render (janitor_state .state )
195-
196173 except Exception as stream_error :
197- # If we can't read more messages, assume we're done
198174 print (f"Stream ended: { stream_error } " )
199175 break
200-
201176 return True
202-
203177 except Exception as e :
204178 print (f"Failed to communicate with daemon: { e } " )
205179 return False
206180 finally :
207- if connection :
208- connection .close ()
209-
181+ try :
182+ if rfile : rfile .close ()
183+ finally :
184+ if wfile : wfile .close ()
210185
211186def get_daemon_connector (project_path : Path ) -> t .Optional [DaemonConnector ]:
212187 """Get a daemon connector if a valid lock file exists."""
0 commit comments