From 7442a2afa5465da9c61f6777494008465d369ae5 Mon Sep 17 00:00:00 2001 From: Joao Eduardo Luis Date: Sat, 7 Mar 2026 16:17:07 +0000 Subject: [PATCH 1/4] cbsd/backend: fix Redis connection leak by reusing pool Every call to Backend.redis() created a new Redis connection that was never closed. Create the connection pool once in __init__() and return the shared instance, adding a close() method for cleanup. Signed-off-by: Joao Eduardo Luis --- cbsd/cbslib/core/backend.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/cbsd/cbslib/core/backend.py b/cbsd/cbslib/core/backend.py index 9eadb13..ef6d3f5 100644 --- a/cbsd/cbslib/core/backend.py +++ b/cbsd/cbslib/core/backend.py @@ -33,15 +33,20 @@ class BackendError(CESError): class Backend: _redis_url: str - # _redis: aioredis.Redis + _redis: aioredis.Redis def __init__(self, backend_url: str) -> None: self._redis_url = backend_url - - async def redis(self) -> aioredis.Redis: try: - return aioredis.from_url(f"{self._redis_url}?decode_responses=True") # pyright: ignore[reportUnknownMemberType] + self._redis = aioredis.from_url(f"{self._redis_url}?decode_responses=True") except Exception as e: - msg = f"error opening connection to redis backend: {e}" + msg = f"error creating redis connection pool: {e}" logger.error(msg) raise BackendError(msg) from e + + async def redis(self) -> aioredis.Redis: + return self._redis + + async def close(self) -> None: + """Close the Redis connection pool.""" + await self._redis.aclose() From 4a5ba851c77d5f55877bbec911e9bf58f18fbbbf Mon Sep 17 00:00:00 2001 From: Joao Eduardo Luis Date: Sat, 7 Mar 2026 16:17:30 +0000 Subject: [PATCH 2/4] cbsd/auth: remove plaintext token logging token_decode() and Users.create() logged full PASETO tokens at DEBUG level. If CBS_DEBUG is set, every request leaks credentials to the log. Remove token values from log messages entirely. Signed-off-by: Joao Eduardo Luis --- cbsd/cbslib/auth/auth.py | 2 -- cbsd/cbslib/auth/users.py | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/cbsd/cbslib/auth/auth.py b/cbsd/cbslib/auth/auth.py index 214ecc5..dbae6e7 100644 --- a/cbsd/cbslib/auth/auth.py +++ b/cbsd/cbslib/auth/auth.py @@ -59,8 +59,6 @@ def token_create(user: str) -> Token: def token_decode(token: str) -> TokenInfo: """Decode the provided token.""" - logger.debug(f"decode token: {token}") - config = get_config() assert config.server, "unexpected server config missing" key = pyseto.Key.new( diff --git a/cbsd/cbslib/auth/users.py b/cbsd/cbslib/auth/users.py index 72aae09..83e2805 100644 --- a/cbsd/cbslib/auth/users.py +++ b/cbsd/cbslib/auth/users.py @@ -88,7 +88,7 @@ async def create(self, email: str, name: str) -> Token: return user.token token = token_create(email) - logger.debug(f"created token for user '{email}': {token}") + logger.debug(f"created token for user '{email}'") self._tokens_db[token.token.get_secret_value()] = token self._users_db[email] = User(email=email, name=name, token=token) From 29f60c4fcac624fadf8a4a14ebb866a6b0449768 Mon Sep 17 00:00:00 2001 From: Joao Eduardo Luis Date: Sat, 7 Mar 2026 16:18:46 +0000 Subject: [PATCH 3/4] cbsd/tracker: use context manager for lock in new() BuildsTracker.new() manually acquired the lock and released it inside a nested _cleanup() function. If an uncaught exception occurred between acquire and cleanup, the lock was never released, permanently blocking all build operations. Restructure to use 'async with self._lock:' and extract cleanup into _fail_entry(). Signed-off-by: Joao Eduardo Luis --- cbsd/cbslib/builds/tracker.py | 166 ++++++++++++++++------------------ 1 file changed, 78 insertions(+), 88 deletions(-) diff --git a/cbsd/cbslib/builds/tracker.py b/cbsd/cbslib/builds/tracker.py index a68af7b..7f6951d 100644 --- a/cbsd/cbslib/builds/tracker.py +++ b/cbsd/cbslib/builds/tracker.py @@ -80,105 +80,95 @@ def __init__(self, db: BuildsDB, logs: BuildLogsHandler) -> None: self._builds_by_build_id = {} self._lock = asyncio.Lock() - async def new(self, desc: BuildDescriptor) -> tuple[BuildID, str]: - """Create a new build entry, scheduling it for build.""" - _ = await self._lock.acquire() - - async def _cleanup( - *, - build_id: BuildID | None = None, - entry: BuildEntry | None = None, - failed: bool = False, - do_logs: bool = False, - ) -> None: - if failed and entry and build_id: - entry.state = EntryState.failure - entry.finished = dt.now(tz=datetime.UTC) - - try: - await self._db.update(build_id, entry) - except Exception as e: - msg = f"error cleaning up task, db update: {e}" - logger.error(msg) - raise TrackerError(msg) from e - - if do_logs: - try: - await self._logs.finish(build_id) - except Exception as e: - msg = f"error cleaning up task, finish logs: {e}" - logger.error(msg) - raise TrackerError(msg) from e - - logger.info(f"cleanup state from new: failed = {failed}") - self._lock.release() - - # NOTE: We should ensure builds that are the same are properly - # deduplicated if they are in-progress. I.e., if the same build - # request comes twice, and it's either in-progress or queued, then - # we return its build ID. - - build_entry = BuildEntry( - task_id=None, - desc=desc, - user=desc.signed_off_by.email, - submitted=dt.now(tz=datetime.UTC), - state=EntryState.new, - started=None, - finished=None, - ) + async def _fail_entry( + self, + build_id: BuildID, + entry: BuildEntry, + *, + do_logs: bool = False, + ) -> None: + """Mark a build entry as failed during cleanup.""" + entry.state = EntryState.failure + entry.finished = dt.now(tz=datetime.UTC) try: - build_id = await self._db.new(build_entry) + await self._db.update(build_id, entry) except Exception as e: - msg = f"error storing build entry state to db: {e}" + msg = f"error cleaning up task, db update: {e}" logger.error(msg) - await _cleanup(failed=True) raise TrackerError(msg) from e - try: - # start tracking and handling logs for this build - await self._logs.new(build_id) - except Exception as e: - msg = f"error starting log tracking: {e}" - logger.error(msg) - await _cleanup(failed=True, entry=build_entry, build_id=build_id) - raise TrackerError(msg) from e + if do_logs: + try: + await self._logs.finish(build_id) + except Exception as e: + msg = f"error cleaning up task, finish logs: {e}" + logger.error(msg) + raise TrackerError(msg) from e - try: - # schedule version for building - task = tasks.build.apply_async( - ( - build_id, - desc, - ), - serializer="pydantic", - ) - except Exception as e: - msg = f"error scheduling new build: {e}" - logger.error(msg) - await _cleanup( - failed=True, do_logs=True, entry=build_entry, build_id=build_id + async def new(self, desc: BuildDescriptor) -> tuple[BuildID, str]: + """Create a new build entry, scheduling it for build.""" + async with self._lock: + # NOTE: We should ensure builds that are the same are properly + # deduplicated if they are in-progress. I.e., if the same build + # request comes twice, and it's either in-progress or queued, then + # we return its build ID. + + build_entry = BuildEntry( + task_id=None, + desc=desc, + user=desc.signed_off_by.email, + submitted=dt.now(tz=datetime.UTC), + state=EntryState.new, + started=None, + finished=None, ) - raise TrackerError(msg) from e - build_entry.task_id = task.task_id - build_entry.state = EntryState(task.state.upper()) - try: - await self._db.update(build_id, build_entry) - except Exception as e: - msg = f"error updating entry state in db: {e}" - logger.error(msg) - await _cleanup( - failed=True, do_logs=True, entry=build_entry, build_id=build_id - ) - raise TrackerError(msg) from e + try: + build_id = await self._db.new(build_entry) + except Exception as e: + msg = f"error storing build entry state to db: {e}" + logger.error(msg) + raise TrackerError(msg) from e + + try: + # start tracking and handling logs for this build + await self._logs.new(build_id) + except Exception as e: + msg = f"error starting log tracking: {e}" + logger.error(msg) + await self._fail_entry(build_id, build_entry) + raise TrackerError(msg) from e + + try: + # schedule version for building + task = tasks.build.apply_async( + ( + build_id, + desc, + ), + serializer="pydantic", + ) + except Exception as e: + msg = f"error scheduling new build: {e}" + logger.error(msg) + await self._fail_entry(build_id, build_entry, do_logs=True) + raise TrackerError(msg) from e + + build_entry.task_id = task.task_id + build_entry.state = EntryState(task.state.upper()) + try: + await self._db.update(build_id, build_entry) + except Exception as e: + msg = f"error updating entry state in db: {e}" + logger.error(msg) + await self._fail_entry(build_id, build_entry, do_logs=True) + raise TrackerError(msg) from e - self._builds_by_task_id[build_entry.task_id] = build_id - self._builds_by_build_id[build_id] = build_entry.task_id + self._builds_by_task_id[build_entry.task_id] = build_id + self._builds_by_build_id[build_id] = build_entry.task_id - await _cleanup() - return (build_id, build_entry.state) + return (build_id, build_entry.state) async def list( self, *, owner: str | None = None From 04e8e9e341ba837504422974845a8726a787f725 Mon Sep 17 00:00:00 2001 From: Joao Eduardo Luis Date: Sat, 7 Mar 2026 16:19:05 +0000 Subject: [PATCH 4/4] cbsd/builds: avoid blocking event loop on Celery result _update_components() called res.get() synchronously inside an async task, blocking the entire asyncio event loop until the Celery worker responded. Use asyncio.to_thread() to run the blocking call in a separate thread. Signed-off-by: Joao Eduardo Luis --- cbsd/cbslib/builds/mgr.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cbsd/cbslib/builds/mgr.py b/cbsd/cbslib/builds/mgr.py index bd55d67..674a167 100644 --- a/cbsd/cbslib/builds/mgr.py +++ b/cbsd/cbslib/builds/mgr.py @@ -133,7 +133,10 @@ async def _update_components(self) -> None: async def _task() -> None: try: res = celery_app.send_task("cbslib.worker.tasks.list_components") - raw = cast(dict[str, Any], res.get()) # pyright: ignore[reportExplicitAny] + raw = cast( + dict[str, Any], # pyright: ignore[reportExplicitAny] + await asyncio.to_thread(res.get), + ) except Exception as e: logger.error(f"failed to obtain components: {e}") sys.exit(errno.ENOTRECOVERABLE)