Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,19 @@
History
=======

Unreleased
----------

| Bug fix for verify_config controller service scope

**Bug Fixes**

- **verify_config()**: Controller service verification is now scoped strictly to the target Process Group. Previously, controller services inherited from ancestor/parent PGs (up to root) were included in verification results because NiFi's REST API returns ancestor services by default. A broken controller on root or a sibling flow could cause CI verification to fail for an unrelated child flow.

**Canvas Module**

- **list_all_controllers()**: Added ``include_ancestors`` keyword argument (default ``True`` to preserve existing behaviour). Pass ``include_ancestors=False`` to exclude controller services inherited from parent/ancestor Process Groups.

1.5.0 (2026-01-25)
-------------------

Expand Down
17 changes: 14 additions & 3 deletions nipyapi/canvas.py
Original file line number Diff line number Diff line change
Expand Up @@ -2115,12 +2115,13 @@ def create_controller(parent_pg, controller, name=None):
)


def list_all_controllers(
def list_all_controllers( # pylint: disable=too-many-arguments,too-many-positional-arguments
pg_id="root",
descendants=True,
include_reporting_tasks=False,
greedy=True,
identifier_type="auto",
include_ancestors=True,
):
"""
Lists all controllers under a given Process Group, defaults to Root.
Expand All @@ -2134,12 +2135,18 @@ def list_all_controllers(
greedy (bool): For name lookup, True for partial match, False for exact.
identifier_type (str): How to interpret string identifier:
"auto" (default) detects UUID vs name, "id" or "name" to force.
include_ancestors (bool): Whether to include controller services
inherited from parent/ancestor Process Groups. Defaults to True
to preserve historical behaviour (NiFi's REST API default).
Pass False to restrict the result to controller services that
live in ``pg_id`` (and its descendants when ``descendants=True``).

Returns:
None, ControllerServiceEntity, or list(ControllerServiceEntity)

"""
assert isinstance(descendants, bool)
assert isinstance(include_ancestors, bool)
assert pg_id == "root" or isinstance(pg_id, (str, nipyapi.nifi.ProcessGroupEntity))
# Resolve pg_id to actual ID (supports name lookup)
if pg_id != "root":
Expand All @@ -2163,14 +2170,18 @@ def list_all_controllers(
else:
pgs = [get_process_group(pg_id, "id")]
for pg in pgs:
new_conts = handle.get_controller_services_from_group(pg.id).controller_services
new_conts = handle.get_controller_services_from_group(
pg.id, include_ancestor_groups=include_ancestors
).controller_services
# trim duplicates from inheritance
out += [x for x in new_conts if x.id not in [y.id for y in out]]
else:
# Case where NiFi > 1.2.0
# duplicate trim already handled by server
out = handle.get_controller_services_from_group(
pg_id, include_descendant_groups=descendants
pg_id,
include_descendant_groups=descendants,
include_ancestor_groups=include_ancestors,
).controller_services
if include_reporting_tasks:
mgmt_handle = nipyapi.nifi.FlowApi()
Expand Down
20 changes: 17 additions & 3 deletions nipyapi/ci/verify_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,17 @@ def _verify_single_processor(processor) -> Dict[str, Any]:


def _verify_controllers(process_group_id: str) -> List[Dict[str, Any]]:
"""Verify all controller services in a process group."""
controllers = nipyapi.canvas.list_all_controllers(process_group_id, descendants=False)
log.debug("Found %d controller services", len(controllers))
"""Verify controller services owned by ``process_group_id``.

Scoped strictly to the given Process Group: excludes controller services
inherited from ancestor/parent PGs and does not recurse into descendants.
This keeps CI verification verdicts dependent only on the flow being
deployed.
"""
controllers = nipyapi.canvas.list_all_controllers(
process_group_id, descendants=False, include_ancestors=False
)
log.debug("Found %d controller services in PG", len(controllers))
return [_verify_single_controller(c) for c in controllers]


Expand All @@ -108,6 +116,12 @@ def verify_config(
a flow. Verifies controller services and processors that are in a
stopped/disabled state.

Scope:
- Controller services: only those owned by ``process_group_id``
(ancestor-inherited services are intentionally excluded so a broken
controller on a sibling/parent flow cannot fail this flow's CI).
- Processors: ``process_group_id`` and all descendant Process Groups.

Args:
process_group_id: ID of the process group. Env: NIFI_PROCESS_GROUP_ID
verify_controllers: Verify controller services (default: True)
Expand Down
43 changes: 43 additions & 0 deletions tests/test_canvas.py
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,49 @@ def test_list_all_controllers(fix_pg, fix_cont):
_ = canvas.list_all_controllers(descendants=['pie'])


def test_list_all_controllers_excludes_ancestors(fix_pg, fix_cont):
"""list_all_controllers must honour include_ancestors=False.

Default (include_ancestors=True) preserves prior behaviour; the opt-out
path is what CI verify_config relies on to avoid verifying controller
services inherited from parent/root PGs.
"""
f_pg_1 = fix_pg.generate()
f_pg_2 = fix_pg.generate(parent_pg=f_pg_1)
f_c_root = fix_cont()
f_c_mid = fix_cont(parent_pg=f_pg_1)
f_c_leaf = fix_cont(parent_pg=f_pg_2)
assert isinstance(f_c_root, nifi.ControllerServiceEntity)
assert isinstance(f_c_mid, nifi.ControllerServiceEntity)
assert isinstance(f_c_leaf, nifi.ControllerServiceEntity)

# Scoped to leaf, no descendants, no ancestors -> only leaf CS
r1 = canvas.list_all_controllers(
pg_id=f_pg_2.id, descendants=False, include_ancestors=False
)
r1 = [x for x in r1 if conftest.test_basename in x.component.name]
assert len(r1) == 1
assert r1[0].id == f_c_leaf.id

# Scoped to mid, descendants=True, ancestors=False -> mid + leaf, NOT root
r2 = canvas.list_all_controllers(
pg_id=f_pg_1.id, descendants=True, include_ancestors=False
)
r2 = [x for x in r2 if conftest.test_basename in x.component.name]
ids = {x.id for x in r2}
assert f_c_mid.id in ids
assert f_c_leaf.id in ids
assert f_c_root.id not in ids

# Backwards compat: default include_ancestors=True still returns root CS
r3 = canvas.list_all_controllers(pg_id=f_pg_2.id, descendants=False)
r3 = [x for x in r3 if conftest.test_basename in x.component.name]
ids3 = {x.id for x in r3}
assert f_c_root.id in ids3
assert f_c_mid.id in ids3
assert f_c_leaf.id in ids3


def test_create_controller(fix_cont):
root_pg = canvas.get_process_group(canvas.get_root_pg_id(), 'id')
cont_type = canvas.list_all_controller_types()[0]
Expand Down
17 changes: 17 additions & 0 deletions tests/test_ci.py
Original file line number Diff line number Diff line change
Expand Up @@ -1823,6 +1823,23 @@ def test_verify_config_skips_enabled_controllers(fix_pg, fix_cont):
nipyapi.canvas.schedule_controller(f_c1, False)


def test_verify_config_ignores_ancestor_controllers(fix_pg, fix_cont):
"""verify_config must be scoped to the provided PG — no ancestor CSes."""
parent = fix_pg.generate()
child = fix_pg.generate(parent_pg=parent)
f_c_ancestor = fix_cont(parent_pg=parent)
f_c_target = fix_cont(parent_pg=child)

result = ci.verify_config(process_group_id=child.id)

# Only the child's controller service should appear in the results
result_ids = {r["id"] for r in result["controller_results"]}
assert f_c_target.id in result_ids
assert f_c_ancestor.id not in result_ids
assert len(result["controller_results"]) == 1
assert result["process_group_name"] == child.component.name


# =============================================================================
# Export Parameters Tests
# =============================================================================
Expand Down
Loading