Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions cbsd/cbslib/auth/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ def token_create(user: str) -> Token:

def token_decode(token: str) -> TokenInfo:
"""Decode the provided token."""
logger.debug(f"decode token: {token}")

config = get_config()
assert config.server, "unexpected server config missing"
key = pyseto.Key.new(
Expand Down
2 changes: 1 addition & 1 deletion cbsd/cbslib/auth/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async def create(self, email: str, name: str) -> Token:
return user.token

token = token_create(email)
logger.debug(f"created token for user '{email}': {token}")
logger.debug(f"created token for user '{email}'")

self._tokens_db[token.token.get_secret_value()] = token
self._users_db[email] = User(email=email, name=name, token=token)
Expand Down
5 changes: 4 additions & 1 deletion cbsd/cbslib/builds/mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,10 @@ async def _update_components(self) -> None:
async def _task() -> None:
try:
res = celery_app.send_task("cbslib.worker.tasks.list_components")
raw = cast(dict[str, Any], res.get()) # pyright: ignore[reportExplicitAny]
raw = cast(
dict[str, Any], # pyright: ignore[reportExplicitAny]
await asyncio.to_thread(res.get),
)
except Exception as e:
logger.error(f"failed to obtain components: {e}")
sys.exit(errno.ENOTRECOVERABLE)
Expand Down
166 changes: 78 additions & 88 deletions cbsd/cbslib/builds/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,105 +80,95 @@ def __init__(self, db: BuildsDB, logs: BuildLogsHandler) -> None:
self._builds_by_build_id = {}
self._lock = asyncio.Lock()

async def new(self, desc: BuildDescriptor) -> tuple[BuildID, str]:
"""Create a new build entry, scheduling it for build."""
_ = await self._lock.acquire()

async def _cleanup(
*,
build_id: BuildID | None = None,
entry: BuildEntry | None = None,
failed: bool = False,
do_logs: bool = False,
) -> None:
if failed and entry and build_id:
entry.state = EntryState.failure
entry.finished = dt.now(tz=datetime.UTC)

try:
await self._db.update(build_id, entry)
except Exception as e:
msg = f"error cleaning up task, db update: {e}"
logger.error(msg)
raise TrackerError(msg) from e

if do_logs:
try:
await self._logs.finish(build_id)
except Exception as e:
msg = f"error cleaning up task, finish logs: {e}"
logger.error(msg)
raise TrackerError(msg) from e

logger.info(f"cleanup state from new: failed = {failed}")
self._lock.release()

# NOTE: We should ensure builds that are the same are properly
# deduplicated if they are in-progress. I.e., if the same build
# request comes twice, and it's either in-progress or queued, then
# we return its build ID.

build_entry = BuildEntry(
task_id=None,
desc=desc,
user=desc.signed_off_by.email,
submitted=dt.now(tz=datetime.UTC),
state=EntryState.new,
started=None,
finished=None,
)
async def _fail_entry(
self,
build_id: BuildID,
entry: BuildEntry,
*,
do_logs: bool = False,
) -> None:
"""Mark a build entry as failed during cleanup."""
entry.state = EntryState.failure
entry.finished = dt.now(tz=datetime.UTC)

try:
build_id = await self._db.new(build_entry)
await self._db.update(build_id, entry)
except Exception as e:
msg = f"error storing build entry state to db: {e}"
msg = f"error cleaning up task, db update: {e}"
logger.error(msg)
await _cleanup(failed=True)
raise TrackerError(msg) from e

try:
# start tracking and handling logs for this build
await self._logs.new(build_id)
except Exception as e:
msg = f"error starting log tracking: {e}"
logger.error(msg)
await _cleanup(failed=True, entry=build_entry, build_id=build_id)
raise TrackerError(msg) from e
if do_logs:
try:
await self._logs.finish(build_id)
except Exception as e:
msg = f"error cleaning up task, finish logs: {e}"
logger.error(msg)
raise TrackerError(msg) from e

try:
# schedule version for building
task = tasks.build.apply_async(
(
build_id,
desc,
),
serializer="pydantic",
)
except Exception as e:
msg = f"error scheduling new build: {e}"
logger.error(msg)
await _cleanup(
failed=True, do_logs=True, entry=build_entry, build_id=build_id
async def new(self, desc: BuildDescriptor) -> tuple[BuildID, str]:
"""Create a new build entry, scheduling it for build."""
async with self._lock:
# NOTE: We should ensure builds that are the same are properly
# deduplicated if they are in-progress. I.e., if the same build
# request comes twice, and it's either in-progress or queued, then
# we return its build ID.

build_entry = BuildEntry(
task_id=None,
desc=desc,
user=desc.signed_off_by.email,
submitted=dt.now(tz=datetime.UTC),
state=EntryState.new,
started=None,
finished=None,
)
raise TrackerError(msg) from e

build_entry.task_id = task.task_id
build_entry.state = EntryState(task.state.upper())
try:
await self._db.update(build_id, build_entry)
except Exception as e:
msg = f"error updating entry state in db: {e}"
logger.error(msg)
await _cleanup(
failed=True, do_logs=True, entry=build_entry, build_id=build_id
)
raise TrackerError(msg) from e
try:
build_id = await self._db.new(build_entry)
except Exception as e:
msg = f"error storing build entry state to db: {e}"
logger.error(msg)
raise TrackerError(msg) from e

try:
# start tracking and handling logs for this build
await self._logs.new(build_id)
except Exception as e:
msg = f"error starting log tracking: {e}"
logger.error(msg)
await self._fail_entry(build_id, build_entry)
raise TrackerError(msg) from e

try:
# schedule version for building
task = tasks.build.apply_async(
(
build_id,
desc,
),
serializer="pydantic",
)
except Exception as e:
msg = f"error scheduling new build: {e}"
logger.error(msg)
await self._fail_entry(build_id, build_entry, do_logs=True)
raise TrackerError(msg) from e

build_entry.task_id = task.task_id
build_entry.state = EntryState(task.state.upper())
try:
await self._db.update(build_id, build_entry)
except Exception as e:
msg = f"error updating entry state in db: {e}"
logger.error(msg)
await self._fail_entry(build_id, build_entry, do_logs=True)
raise TrackerError(msg) from e

self._builds_by_task_id[build_entry.task_id] = build_id
self._builds_by_build_id[build_id] = build_entry.task_id
self._builds_by_task_id[build_entry.task_id] = build_id
self._builds_by_build_id[build_id] = build_entry.task_id

await _cleanup()
return (build_id, build_entry.state)
return (build_id, build_entry.state)

async def list(
self, *, owner: str | None = None
Expand Down
15 changes: 10 additions & 5 deletions cbsd/cbslib/core/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,20 @@ class BackendError(CESError):

class Backend:
_redis_url: str
# _redis: aioredis.Redis
_redis: aioredis.Redis

def __init__(self, backend_url: str) -> None:
self._redis_url = backend_url

async def redis(self) -> aioredis.Redis:
try:
return aioredis.from_url(f"{self._redis_url}?decode_responses=True") # pyright: ignore[reportUnknownMemberType]
self._redis = aioredis.from_url(f"{self._redis_url}?decode_responses=True")
except Exception as e:
msg = f"error opening connection to redis backend: {e}"
msg = f"error creating redis connection pool: {e}"
logger.error(msg)
raise BackendError(msg) from e

async def redis(self) -> aioredis.Redis:
return self._redis

async def close(self) -> None:
"""Close the Redis connection pool."""
await self._redis.aclose()