diff --git a/cbsd/cbslib/auth/auth.py b/cbsd/cbslib/auth/auth.py index 214ecc5..dbae6e7 100644 --- a/cbsd/cbslib/auth/auth.py +++ b/cbsd/cbslib/auth/auth.py @@ -59,8 +59,6 @@ def token_create(user: str) -> Token: def token_decode(token: str) -> TokenInfo: """Decode the provided token.""" - logger.debug(f"decode token: {token}") - config = get_config() assert config.server, "unexpected server config missing" key = pyseto.Key.new( diff --git a/cbsd/cbslib/auth/users.py b/cbsd/cbslib/auth/users.py index 72aae09..83e2805 100644 --- a/cbsd/cbslib/auth/users.py +++ b/cbsd/cbslib/auth/users.py @@ -88,7 +88,7 @@ async def create(self, email: str, name: str) -> Token: return user.token token = token_create(email) - logger.debug(f"created token for user '{email}': {token}") + logger.debug(f"created token for user '{email}'") self._tokens_db[token.token.get_secret_value()] = token self._users_db[email] = User(email=email, name=name, token=token) diff --git a/cbsd/cbslib/builds/mgr.py b/cbsd/cbslib/builds/mgr.py index bd55d67..674a167 100644 --- a/cbsd/cbslib/builds/mgr.py +++ b/cbsd/cbslib/builds/mgr.py @@ -133,7 +133,10 @@ async def _update_components(self) -> None: async def _task() -> None: try: res = celery_app.send_task("cbslib.worker.tasks.list_components") - raw = cast(dict[str, Any], res.get()) # pyright: ignore[reportExplicitAny] + raw = cast( + dict[str, Any], # pyright: ignore[reportExplicitAny] + await asyncio.to_thread(res.get), + ) except Exception as e: logger.error(f"failed to obtain components: {e}") sys.exit(errno.ENOTRECOVERABLE) diff --git a/cbsd/cbslib/builds/tracker.py b/cbsd/cbslib/builds/tracker.py index a68af7b..7f6951d 100644 --- a/cbsd/cbslib/builds/tracker.py +++ b/cbsd/cbslib/builds/tracker.py @@ -80,105 +80,95 @@ def __init__(self, db: BuildsDB, logs: BuildLogsHandler) -> None: self._builds_by_build_id = {} self._lock = asyncio.Lock() - async def new(self, desc: BuildDescriptor) -> tuple[BuildID, str]: - """Create a new build entry, scheduling it for build.""" - _ = await self._lock.acquire() - - async def _cleanup( - *, - build_id: BuildID | None = None, - entry: BuildEntry | None = None, - failed: bool = False, - do_logs: bool = False, - ) -> None: - if failed and entry and build_id: - entry.state = EntryState.failure - entry.finished = dt.now(tz=datetime.UTC) - - try: - await self._db.update(build_id, entry) - except Exception as e: - msg = f"error cleaning up task, db update: {e}" - logger.error(msg) - raise TrackerError(msg) from e - - if do_logs: - try: - await self._logs.finish(build_id) - except Exception as e: - msg = f"error cleaning up task, finish logs: {e}" - logger.error(msg) - raise TrackerError(msg) from e - - logger.info(f"cleanup state from new: failed = {failed}") - self._lock.release() - - # NOTE: We should ensure builds that are the same are properly - # deduplicated if they are in-progress. I.e., if the same build - # request comes twice, and it's either in-progress or queued, then - # we return its build ID. - - build_entry = BuildEntry( - task_id=None, - desc=desc, - user=desc.signed_off_by.email, - submitted=dt.now(tz=datetime.UTC), - state=EntryState.new, - started=None, - finished=None, - ) + async def _fail_entry( + self, + build_id: BuildID, + entry: BuildEntry, + *, + do_logs: bool = False, + ) -> None: + """Mark a build entry as failed during cleanup.""" + entry.state = EntryState.failure + entry.finished = dt.now(tz=datetime.UTC) try: - build_id = await self._db.new(build_entry) + await self._db.update(build_id, entry) except Exception as e: - msg = f"error storing build entry state to db: {e}" + msg = f"error cleaning up task, db update: {e}" logger.error(msg) - await _cleanup(failed=True) raise TrackerError(msg) from e - try: - # start tracking and handling logs for this build - await self._logs.new(build_id) - except Exception as e: - msg = f"error starting log tracking: {e}" - logger.error(msg) - await _cleanup(failed=True, entry=build_entry, build_id=build_id) - raise TrackerError(msg) from e + if do_logs: + try: + await self._logs.finish(build_id) + except Exception as e: + msg = f"error cleaning up task, finish logs: {e}" + logger.error(msg) + raise TrackerError(msg) from e - try: - # schedule version for building - task = tasks.build.apply_async( - ( - build_id, - desc, - ), - serializer="pydantic", - ) - except Exception as e: - msg = f"error scheduling new build: {e}" - logger.error(msg) - await _cleanup( - failed=True, do_logs=True, entry=build_entry, build_id=build_id + async def new(self, desc: BuildDescriptor) -> tuple[BuildID, str]: + """Create a new build entry, scheduling it for build.""" + async with self._lock: + # NOTE: We should ensure builds that are the same are properly + # deduplicated if they are in-progress. I.e., if the same build + # request comes twice, and it's either in-progress or queued, then + # we return its build ID. + + build_entry = BuildEntry( + task_id=None, + desc=desc, + user=desc.signed_off_by.email, + submitted=dt.now(tz=datetime.UTC), + state=EntryState.new, + started=None, + finished=None, ) - raise TrackerError(msg) from e - build_entry.task_id = task.task_id - build_entry.state = EntryState(task.state.upper()) - try: - await self._db.update(build_id, build_entry) - except Exception as e: - msg = f"error updating entry state in db: {e}" - logger.error(msg) - await _cleanup( - failed=True, do_logs=True, entry=build_entry, build_id=build_id - ) - raise TrackerError(msg) from e + try: + build_id = await self._db.new(build_entry) + except Exception as e: + msg = f"error storing build entry state to db: {e}" + logger.error(msg) + raise TrackerError(msg) from e + + try: + # start tracking and handling logs for this build + await self._logs.new(build_id) + except Exception as e: + msg = f"error starting log tracking: {e}" + logger.error(msg) + await self._fail_entry(build_id, build_entry) + raise TrackerError(msg) from e + + try: + # schedule version for building + task = tasks.build.apply_async( + ( + build_id, + desc, + ), + serializer="pydantic", + ) + except Exception as e: + msg = f"error scheduling new build: {e}" + logger.error(msg) + await self._fail_entry(build_id, build_entry, do_logs=True) + raise TrackerError(msg) from e + + build_entry.task_id = task.task_id + build_entry.state = EntryState(task.state.upper()) + try: + await self._db.update(build_id, build_entry) + except Exception as e: + msg = f"error updating entry state in db: {e}" + logger.error(msg) + await self._fail_entry(build_id, build_entry, do_logs=True) + raise TrackerError(msg) from e - self._builds_by_task_id[build_entry.task_id] = build_id - self._builds_by_build_id[build_id] = build_entry.task_id + self._builds_by_task_id[build_entry.task_id] = build_id + self._builds_by_build_id[build_id] = build_entry.task_id - await _cleanup() - return (build_id, build_entry.state) + return (build_id, build_entry.state) async def list( self, *, owner: str | None = None diff --git a/cbsd/cbslib/core/backend.py b/cbsd/cbslib/core/backend.py index 9eadb13..ef6d3f5 100644 --- a/cbsd/cbslib/core/backend.py +++ b/cbsd/cbslib/core/backend.py @@ -33,15 +33,20 @@ class BackendError(CESError): class Backend: _redis_url: str - # _redis: aioredis.Redis + _redis: aioredis.Redis def __init__(self, backend_url: str) -> None: self._redis_url = backend_url - - async def redis(self) -> aioredis.Redis: try: - return aioredis.from_url(f"{self._redis_url}?decode_responses=True") # pyright: ignore[reportUnknownMemberType] + self._redis = aioredis.from_url(f"{self._redis_url}?decode_responses=True") except Exception as e: - msg = f"error opening connection to redis backend: {e}" + msg = f"error creating redis connection pool: {e}" logger.error(msg) raise BackendError(msg) from e + + async def redis(self) -> aioredis.Redis: + return self._redis + + async def close(self) -> None: + """Close the Redis connection pool.""" + await self._redis.aclose()