Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 67 additions & 20 deletions vulnerabilities/pipelines/v2_importers/apache_tomcat_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from collections import defaultdict
from collections import namedtuple
from typing import Iterable
from typing import List

import requests
from bs4 import BeautifulSoup
Expand All @@ -25,15 +26,21 @@

from vulnerabilities.importer import AdvisoryDataV2
from vulnerabilities.importer import AffectedPackageV2
from vulnerabilities.importer import PackageCommitPatchData
from vulnerabilities.importer import ReferenceV2
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2

GITHUB_COMMIT_URL_RE = re.compile(
r"https?://github\.com/apache/tomcat/commit/(?P<commit_hash>[0-9a-f]{5,40})"
)
GITBOX_COMMIT_URL_RE = re.compile(
r"https?://gitbox\.apache\.org/repos/asf\?p=tomcat\.git;a=commit;h=(?P<commit_hash>[0-9a-f]{5,40})"
)
TOMCAT_VCS_URL = "https://github.com/apache/tomcat"

class ApacheTomcatImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
"""
Apache HTTPD Importer Pipeline

This pipeline imports security advisories from the Apache HTTPD project.
"""
class ApacheTomcatImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
"""Apache Tomcat importer pipeline."""

pipeline_id = "apache_tomcat_importer_v2"
spdx_license_expression = "Apache-2.0"
Expand Down Expand Up @@ -73,6 +80,9 @@ def collect_advisories(self) -> Iterable[AdvisoryDataV2]:
grouped[advisory.cve].append(advisory)
for cve, advisory_list in grouped.items():
affected_packages = []
references = []
all_commit_patches = []

for advisory in advisory_list:
self.log(f"Processing advisory {advisory.cve}")
apache_range = to_version_ranges_apache(
Expand All @@ -86,10 +96,19 @@ def collect_advisories(self) -> Iterable[AdvisoryDataV2]:
advisory.fixed_in,
)

commit_patches = get_commit_patches(
advisory.commit_urls,
)
all_commit_patches.extend(commit_patches)

for ref_url in advisory.reference_urls:
references.append(ReferenceV2(url=ref_url))

affected_packages.append(
AffectedPackageV2(
package=PackageURL(type="apache", name="tomcat"),
affected_version_range=apache_range,
fixed_by_commit_patches=commit_patches,
)
)

Expand All @@ -101,13 +120,15 @@ def collect_advisories(self) -> Iterable[AdvisoryDataV2]:
name="tomcat",
),
affected_version_range=maven_range,
fixed_by_commit_patches=commit_patches,
)
)
page_id = page_url.split("/")[-1].replace(".html", "")
yield AdvisoryDataV2(
advisory_id=f"{page_id}/{cve}",
summary=advisory_list[0].summary,
affected_packages=affected_packages,
references=references,
url=page_url,
)

Expand Down Expand Up @@ -258,6 +279,8 @@ class TomcatAdvisoryData:
summary: str
fixed_in: str
affected_versions: str
commit_urls: List[str] = dataclasses.field(default_factory=list)
reference_urls: List[str] = dataclasses.field(default_factory=list)


def parse_tomcat_security(html_content):
Expand All @@ -283,36 +306,60 @@ def parse_tomcat_security(html_content):

if strong and cve_link:
if current:
results.append(current)
results.append(_finalize_advisory(current))

current = {
"cve": cve_link.get_text(strip=True),
"summary": strong.get_text(" ", strip=True),
"affected_versions": None,
"fixed_in": fixed_in,
"commit_urls": [],
"reference_urls": [],
}
continue

if current:
text = p.get_text(" ", strip=True)

if "was fixed" in text.lower():
for link in p.find_all("a", href=True):
href = link["href"]
if GITHUB_COMMIT_URL_RE.match(href) or GITBOX_COMMIT_URL_RE.match(href):
current["commit_urls"].append(href)
current["reference_urls"].append(href)

if text.startswith("Affects:"):
current["affected_versions"] = text.replace("Affects:", "").strip()
current = TomcatAdvisoryData(
cve=current["cve"],
summary=current["summary"],
affected_versions=current["affected_versions"],
fixed_in=current["fixed_in"],
)
results.append(current)
results.append(_finalize_advisory(current))
current = None

if current:
current = TomcatAdvisoryData(
cve=current["cve"],
summary=current["summary"],
affected_versions=current["affected_versions"],
fixed_in=current["fixed_in"],
)
results.append(current)
results.append(_finalize_advisory(current))

return results


def _finalize_advisory(current):
return TomcatAdvisoryData(
cve=current["cve"],
summary=current["summary"],
affected_versions=current["affected_versions"],
fixed_in=current["fixed_in"],
commit_urls=current.get("commit_urls", []),
reference_urls=current.get("reference_urls", []),
)


def get_commit_patches(commit_urls):
commit_patches = []
for url in commit_urls:
match = GITHUB_COMMIT_URL_RE.match(url) or GITBOX_COMMIT_URL_RE.match(url)
if match:
commit_hash = match.group("commit_hash")
commit_patches.append(
PackageCommitPatchData(
vcs_url=TOMCAT_VCS_URL,
commit_hash=commit_hash,
)
)
return commit_patches
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@

from vulnerabilities.importer import AdvisoryDataV2
from vulnerabilities.importer import AffectedPackageV2
from vulnerabilities.importer import PackageCommitPatchData
from vulnerabilities.importer import ReferenceV2
from vulnerabilities.pipelines.v2_importers.apache_tomcat_importer import (
ApacheTomcatImporterPipeline,
TomcatAdvisoryData,
get_commit_patches,
parse_tomcat_security,
)
from vulnerabilities.pipelines.v2_importers.apache_tomcat_importer import TomcatAdvisoryData
from vulnerabilities.pipelines.v2_importers.apache_tomcat_importer import parse_tomcat_security

TOMCAT_SECURITY_HTML = """
<html>
Expand Down Expand Up @@ -52,6 +55,26 @@
</html>
"""

TOMCAT_SECURITY_HTML_WITH_COMMITS = """
<html>
<body>
<h3 id="Fixed_in_Apache_Tomcat_10.1.40">Fixed in Apache Tomcat 10.1.40</h3>
<div class="text">
<p>
<strong>Important: Denial of Service</strong>
<a href="https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2025-31650">CVE-2025-31650</a>
</p>
<p>This was fixed with commits
<a href="https://github.com/apache/tomcat/commit/cba1a0fe1289ee7f5dd46c61c38d1e1ac5437bff">cba1a0fe</a>,
<a href="https://github.com/apache/tomcat/commit/1eef1dc459c45f1e421d8bd25ef340fc1cc34edc">1eef1dc4</a> and
<a href="https://github.com/apache/tomcat/commit/8cc3b8fb3f2d8d4d6a757e014f19d1fafa948a60">8cc3b8fb</a>.
</p>
<p>Affects: 10.1.10 to 10.1.39</p>
</div>
</body>
</html>
"""


def test_parse_tomcat_security_multiple_fixed_sections_same_cve():
advisories = parse_tomcat_security(TOMCAT_SECURITY_HTML)
Expand Down Expand Up @@ -108,7 +131,6 @@ def test_affected_packages_structure():
url="https://tomcat.apache.org/security-10.html",
)

# Validate package structure expectations
for pkg in advisory.affected_packages:
assert isinstance(pkg, AffectedPackageV2)
assert isinstance(pkg.package, PackageURL)
Expand Down Expand Up @@ -143,3 +165,68 @@ def test_apache_and_maven_version_ranges_created(mock_get):

for r in maven_ranges:
assert isinstance(r, MavenVersionRange)


def test_parse_tomcat_security_extracts_commit_urls():
advisories = parse_tomcat_security(TOMCAT_SECURITY_HTML_WITH_COMMITS)
assert len(advisories) == 1
adv = advisories[0]
assert adv.cve == "CVE-2025-31650"
assert len(adv.commit_urls) == 3
assert "cba1a0fe1289ee7f5dd46c61c38d1e1ac5437bff" in adv.commit_urls[0]
assert "1eef1dc459c45f1e421d8bd25ef340fc1cc34edc" in adv.commit_urls[1]
assert "8cc3b8fb3f2d8d4d6a757e014f19d1fafa948a60" in adv.commit_urls[2]
assert len(adv.reference_urls) == 3


def test_parse_tomcat_security_extracts_gitbox_commits():
html = """
<html><body>
<h3 id="Fixed">Fixed 1.0</h3>
<div class="text">
<p><strong>Bug</strong><a href="CVE-2021-25329">CVE-2021-25329</a></p>
<p>Fixed with commit <a href="https://gitbox.apache.org/repos/asf?p=tomcat.git;a=commit;h=7b5269715a77">7b52697</a></p>
<p>Affects: 1.0</p>
</div>
</body></html>
"""
advisories = parse_tomcat_security(html)
assert len(advisories) == 1
assert "7b5269715a77" in advisories[0].commit_urls[0]


def test_get_commit_patches_creates_patch_data():
urls = [
"https://github.com/apache/tomcat/commit/b59099e4ca501a039510334ebe1024971cd6f959",
"https://github.com/apache/tomcat/commit/cba1a0fe1289ee7f5dd46c61c38d1e1ac5437bff",
]
patches = get_commit_patches(urls)
assert len(patches) == 2
assert patches[0].commit_hash == "b59099e4ca501a039510334ebe1024971cd6f959"
assert patches[0].vcs_url == "https://github.com/apache/tomcat"
assert patches[1].commit_hash == "cba1a0fe1289ee7f5dd46c61c38d1e1ac5437bff"


@patch("vulnerabilities.pipelines.v2_importers.apache_tomcat_importer.requests.get")
def test_pipeline_populates_commit_patches_and_references(mock_get):
mock_get.return_value.content = TOMCAT_SECURITY_HTML_WITH_COMMITS.encode("utf-8")

pipeline = ApacheTomcatImporterPipeline()
pipeline.fetch_advisory_links = types.MethodType(
lambda self: ["https://tomcat.apache.org/security-10.html"],
pipeline,
)

advisory = list(pipeline.collect_advisories())[0]

assert len(advisory.affected_packages) == 2

for pkg in advisory.affected_packages:
assert len(pkg.fixed_by_commit_patches) == 3
for patch in pkg.fixed_by_commit_patches:
assert isinstance(patch, PackageCommitPatchData)
assert patch.vcs_url == "https://github.com/apache/tomcat"

assert len(advisory.references) == 3
for ref in advisory.references:
assert isinstance(ref, ReferenceV2)