diff --git a/worker_plan/worker_plan_internal/llm_util/token_metrics_store.py b/worker_plan/worker_plan_internal/llm_util/token_metrics_store.py index cea2842b..60166ed5 100644 --- a/worker_plan/worker_plan_internal/llm_util/token_metrics_store.py +++ b/worker_plan/worker_plan_internal/llm_util/token_metrics_store.py @@ -126,9 +126,18 @@ def record_token_usage( self.db.session.rollback() except Exception: pass - # Fully discard the scoped session to prevent a corrupted - # transaction state from poisoning subsequent DB operations - # (e.g. the _handle_task_completion callback that runs next). + # Dispose the connection pool BEFORE removing the session. + # Errors like PGRES_TUPLES_OK leave the connection in a + # corrupted protocol state. If the connection is returned + # to the pool, pool_pre_ping (SELECT 1) can hang on it, + # blocking every Luigi worker thread that tries to check + # out a connection afterwards. Disposing the pool ensures + # the corrupted connection is closed outright and all + # threads get fresh connections. + try: + self.db.engine.dispose() + except Exception: + pass try: self.db.session.remove() except Exception: diff --git a/worker_plan_database/app.py b/worker_plan_database/app.py index 2b972414..617f40cc 100644 --- a/worker_plan_database/app.py +++ b/worker_plan_database/app.py @@ -115,11 +115,11 @@ def _new_model(model_cls: Any, **kwargs: Any) -> Any: # Configure specific loggers to send their output to stdout via the root logger. loggers_to_redirect_via_root = { - 'luigi': logging.DEBUG, - 'luigi-interface': logging.DEBUG, - 'luigi.worker': logging.DEBUG, - 'luigi.scheduler': logging.DEBUG, - 'luigi.task': logging.DEBUG, + 'luigi': logging.INFO, + 'luigi-interface': logging.INFO, + 'luigi.worker': logging.INFO, + 'luigi.scheduler': logging.INFO, + 'luigi.task': logging.INFO, 'transformers': logging.INFO, 'httpx': logging.WARNING, } @@ -590,6 +590,15 @@ def _handle_task_completion(self, parameters: HandleTaskCompletionParameters) -> "Assuming task is still active and continuing pipeline.", max_attempts, self.task_id, ) + # Dispose the connection pool — repeated failures + # likely mean the pool is poisoned (e.g. a + # PGRES_TUPLES_OK-corrupted connection that hangs + # on pool_pre_ping). Fresh connections will be + # created on the next access. + try: + db.engine.dispose() + except Exception: + pass return if task is None: logger.error(f"Task with ID {self.task_id!r} not found in database, while running the pipeline. This is an inconsistency.")