Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
## lifebit-ai/cloudos-cli: changelog

## v2.82.1 (2026-03-11)

### Patch

- Fixes `--filter-queue` to retrieve jobs even if the initial page has none matching the filters

## v2.82.0 (2026-03-10)

### Feat
Expand Down
2 changes: 1 addition & 1 deletion cloudos_cli/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '2.82.0'
__version__ = '2.82.1'
67 changes: 39 additions & 28 deletions cloudos_cli/clos.py
Original file line number Diff line number Diff line change
Expand Up @@ -1179,6 +1179,24 @@ def get_job_list(self, workspace_id, last_n_jobs=None, page=None, page_size=None
user_id = self.resolve_user_id(filter_owner, workspace_id, verify)
params["user.id"] = user_id

# --- Resolve queue ID before pagination (for local filtering during pagination) ---
queue_id = None
if filter_queue:
try:
from cloudos_cli.queue.queue import Queue
queue_api = Queue(self.cloudos_url, self.apikey, self.cromwell_token, workspace_id, verify)
queues = queue_api.get_job_queues()

for queue in queues:
if queue.get("label") == filter_queue or queue.get("name") == filter_queue:
queue_id = queue.get("id") or queue.get("_id")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you try if as in the case of workflows params["workflow.id"] = workflow_id would work when adding the queue name since the response is"batch":{"enabled":true,"jobQueue":"69b2859f403797abd4686fb6"} for example, to do:

params["batch.jobQueue"] = queue_id

this might simplify the logic a lot

Copy link
Collaborator Author

@l-mansouri l-mansouri Mar 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I understand what you mean. could you explain it more?
I am confused because params["batch.jobQueue"] = queue_id is not present here.
Queue filtering can't happen on the server (it's not even in the ui), so we have to get the job details (that only contain the queue id) and compare them to the requested queue name.
In this if I am converting the queue name to id. Then I am comparing it to the one of jobs, further down in the code like so

        if filter_queue and queue_id:
                filtered_jobs = []
                for job in page_jobs:
                    job_queue = job.get("batch", {}).get("jobQueue")
                    # jobQueue can be a string ID or a dict with {"id": "...", "name": "..."}
                    job_queue_id = job_queue.get("id") if isinstance(job_queue, dict) else job_queue
                    if job_queue_id == queue_id:
                        filtered_jobs.append(job)
                page_jobs = filtered_jobs

            all_jobs.extend(page_jobs)

            # Check stopping conditions based on mode
            if use_pagination_mode:
                # In pagination mode (last_n_jobs), continue until we have enough jobs (after filtering)
                if target_job_count != 'all' and len(all_jobs) >= target_job_count:
                    break
            else:
                # In direct mode (page/page_size):
                # - Without filter_queue: stop after 1 page
                # - With filter_queue: continue until we have page_size filtered results
                if not filter_queue or len(all_jobs) >= current_page_size:
                    break

            # Check if we reached the last page (fewer jobs than requested page size)
            raw_page_jobs = content.get('jobs', [])
            if len(raw_page_jobs) < params["limit"]:
                break  # Last page

            current_page += 1

        # --- Apply limit after all filtering ---
        if use_pagination_mode and target_job_count != 'all' and isinstance(target_job_count, int) and target_job_count > 0:
            all_jobs = all_jobs[:target_job_count]

the only 2 line I can simplify are

                     job_queue = job.get("batch", {}).get("jobQueue")
                    # jobQueue can be a string ID or a dict with {"id": "...", "name": "..."}
                    job_queue_id = job_queue.get("id") if isinstance(job_queue, dict) else job_queue

they could be simplified like

job_queue_id = job.get("batch", {}).get("jobQueue")

by removing the case in which the jobqueue is like it is in job details where it's a dictionary, as currently that is not the case here. However, it does not simplify much the logic

break

if not queue_id:
raise ValueError(f"Queue with name '{filter_queue}' not found in workspace '{workspace_id}'")
except Exception as e:
raise ValueError(f"Error resolving queue '{filter_queue}'. {str(e)}")

# --- Fetch jobs page by page ---
all_jobs = []
params["limit"] = current_page_size
Expand All @@ -1201,47 +1219,40 @@ def get_job_list(self, workspace_id, last_n_jobs=None, page=None, page_size=None
if not page_jobs:
break

# Apply queue filter during pagination (if specified)
# jobQueue is a dict with "id" and "name" keys, extract the id for comparison
if filter_queue and queue_id:
filtered_jobs = []
for job in page_jobs:
job_queue = job.get("batch", {}).get("jobQueue", {})
# jobQueue is a dict like {"id": "...", "name": "...", ...}
job_queue_id = job_queue.get("id") if isinstance(job_queue, dict) else job_queue
if job_queue_id == queue_id:
filtered_jobs.append(job)
page_jobs = filtered_jobs

all_jobs.extend(page_jobs)

# Check stopping conditions based on mode
if use_pagination_mode:
# In pagination mode (last_n_jobs), continue until we have enough jobs
# In pagination mode (last_n_jobs), continue until we have enough jobs (after filtering)
if target_job_count != 'all' and len(all_jobs) >= target_job_count:
break
else:
# In direct mode (page/page_size), only get one page
break
# In direct mode (page/page_size), only get one page unless filter_queue is used
# When filter_queue is used, continue fetching pages until we have enough filtered results
if not filter_queue or len(all_jobs) >= current_page_size:
break

# Check if we reached the last page (fewer jobs than requested page size)
if len(page_jobs) < params["limit"]:
# Note: For queue filter, we check the unfiltered page_jobs count from the API
# This ensures we stop when the API has exhausted results
raw_page_jobs = content.get('jobs', [])
if len(raw_page_jobs) < params["limit"]:
break # Last page

current_page += 1

# --- Local queue filtering (not supported by API) ---
if filter_queue:
try:
batch_jobs=[job for job in all_jobs if job.get("batch", {})]
if batch_jobs:
from cloudos_cli.queue.queue import Queue
queue_api = Queue(self.cloudos_url, self.apikey, self.cromwell_token, workspace_id, verify)
queues = queue_api.get_job_queues()

queue_id = None
for queue in queues:
if queue.get("label") == filter_queue or queue.get("name") == filter_queue:
queue_id = queue.get("id") or queue.get("_id")
break

if not queue_id:
raise ValueError(f"Queue with name '{filter_queue}' not found in workspace '{workspace_id}'")

all_jobs = [job for job in all_jobs if job.get("batch", {}).get("jobQueue", {}) == queue_id]
else:
raise ValueError(f"The environment is not a batch environment so queues do not exist. Please remove the --filter-queue option.")
except Exception as e:
raise ValueError(f"Error filtering by queue '{filter_queue}'. {str(e)}")

# --- Apply limit after all filtering ---
if use_pagination_mode and target_job_count != 'all' and isinstance(target_job_count, int) and target_job_count > 0:
all_jobs = all_jobs[:target_job_count]
Expand Down
4 changes: 2 additions & 2 deletions tests/test_clos/test_get_job_list_filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"user": {"id": USER_ID, "name": "Test User"},
"project": {"id": PROJECT_ID, "name": "test-project"},
"workflow": {"id": WORKFLOW_ID, "name": "test-workflow"},
"batch": {"jobQueue": QUEUE_ID}
"batch": {"jobQueue": {"id": QUEUE_ID, "name": "v41"}}
},
{
"_id": "job2",
Expand All @@ -32,7 +32,7 @@
"user": {"id": "other_user_id", "name": "Other User"},
"project": {"id": "other_project_id", "name": "other-project"},
"workflow": {"id": "other_workflow_id", "name": "other-workflow"},
"batch": {"jobQueue": "other_queue_id"}
"batch": {"jobQueue": {"id": "other_queue_id", "name": "other-queue"}}
}
]
}
Expand Down
Loading