Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 12 additions & 18 deletions src/exploit_iq_commons/utils/document_embedding.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,7 @@ def _get_repo_lock(cls, git_repo: str, ref: str) -> threading.Lock:

def __init__(self, *, embedding: "Embeddings", vdb_directory: PathLike = VDB_DIRECTORY,
git_directory: PathLike = DEFAULT_GIT_DIRECTORY, chunk_size: int = 800, chunk_overlap: int = 160,
pickle_cache_directory: PathLike = DEFAULT_PICKLE_CACHE_DIRECTORY, ecosystem: Ecosystem | None = None,
manifest_relative_path: str | None = None):
pickle_cache_directory: PathLike = DEFAULT_PICKLE_CACHE_DIRECTORY):
"""
Create a new DocumentEmbedding instance.

Expand All @@ -311,10 +310,6 @@ def __init__(self, *, embedding: "Embeddings", vdb_directory: PathLike = VDB_DIR
chunk_overlap : int, optional
Overlap between chunks, by default 200
:param pickle_cache_directory:
ecosystem: Ecosystem
The ecosystem used within the repo
manifest_relative_path: str, optional
The path to manifest file within the Git repository
"""

self._embedding = embedding
Expand All @@ -323,8 +318,6 @@ def __init__(self, *, embedding: "Embeddings", vdb_directory: PathLike = VDB_DIR
self._chunk_size = chunk_size
self._chunk_overlap = chunk_overlap
self._pickle_cache_directory = Path(pickle_cache_directory)
self._ecosystem = ecosystem
self._manifest_relative_path = manifest_relative_path

@property
def embedding(self):
Expand Down Expand Up @@ -452,7 +445,7 @@ def clone_and_install_dependencies(self, source_info: SourceDocumentsInfo, manif

return repo_path

def collect_documents_from_cloned(self, source_info: SourceDocumentsInfo) -> list[Document]:
def collect_documents_from_cloned(self, source_info: SourceDocumentsInfo, manifest_relative_path: str | None = None) -> list[Document]:
"""
Collect and parse documents from an already-cloned repository.

Expand All @@ -473,8 +466,8 @@ def collect_documents_from_cloned(self, source_info: SourceDocumentsInfo) -> lis
"""
repo_path = self.get_repo_path(source_info)
cache_name = source_info.type if source_info.type != "code" else ""
if self._manifest_relative_path:
full_git_repo_path = f"{source_info.git_repo}/{self._manifest_relative_path}"
if manifest_relative_path:
full_git_repo_path = f"{source_info.git_repo}/{manifest_relative_path}"
else:
full_git_repo_path = source_info.git_repo

Expand Down Expand Up @@ -528,7 +521,7 @@ def collect_documents_from_cloned(self, source_info: SourceDocumentsInfo) -> lis
)
return documents

def collect_documents(self, source_info: SourceDocumentsInfo) -> list[Document]:
def collect_documents(self, source_info: SourceDocumentsInfo, manifest_relative_path: str | None = None) -> list[Document]:
"""
Collect documents from a source document info. This will clone the git repository and collect files from the
repository based on the include and exclude patterns. Each file is then parsed and segmented based on its
Expand All @@ -549,8 +542,8 @@ def collect_documents(self, source_info: SourceDocumentsInfo) -> list[Document]:
Returns a list of documents collected from the source document info.
"""
repo_path = self.get_repo_path(source_info)
if self._manifest_relative_path:
full_git_repo_path = f"{source_info.git_repo}/{self._manifest_relative_path}"
if manifest_relative_path:
full_git_repo_path = f"{source_info.git_repo}/{manifest_relative_path}"
else:
full_git_repo_path = source_info.git_repo

Expand Down Expand Up @@ -592,7 +585,7 @@ def collect_documents(self, source_info: SourceDocumentsInfo) -> list[Document]:
documents_name=cache_name)
return documents

def create_vdb(self, source_infos: list[SourceDocumentsInfo], output_path: PathLike):
def create_vdb(self, source_infos: list[SourceDocumentsInfo], output_path: PathLike, manifest_relative_path: str | None = None):
"""
Create a FAISS database from a list of input directories.

Expand Down Expand Up @@ -622,7 +615,7 @@ def create_vdb(self, source_infos: list[SourceDocumentsInfo], output_path: PathL
documents = []
for input_dir in source_infos:
try:
documents.extend(self.collect_documents(input_dir))
documents.extend(self.collect_documents(input_dir, manifest_relative_path=manifest_relative_path))
except Exception as e:
logger.warning("Error collecting documents for source info %s: %s", input_dir, e)
continue
Expand Down Expand Up @@ -683,7 +676,8 @@ def create_vdb(self, source_infos: list[SourceDocumentsInfo], output_path: PathL

def build_vdbs(self,
input_sources: list[SourceDocumentsInfo],
ignore_code_embedding: bool = False) -> tuple[Path | None, Path | None]:
ignore_code_embedding: bool = False,
manifest_relative_path : str | None = None) -> tuple[Path | None, Path | None]:
"""
Build the code and document VDB based on a list of source documents.

Expand Down Expand Up @@ -715,7 +709,7 @@ def build_vdbs(self,
vdb_output_dir = self.vdb_directory / source_type / str(self.hash_source_documents_info(source_infos))

if (not vdb_output_dir.exists() or os.environ.get("MORPHEUS_ALWAYS_REBUILD_VDB", "0") == "1"):
vdb = self.create_vdb(source_infos=source_infos, output_path=vdb_output_dir)
vdb = self.create_vdb(source_infos=source_infos, output_path=vdb_output_dir, manifest_relative_path=manifest_relative_path)
else:
logger.info("Cache hit on VDB. Loading existing FAISS database: %s", vdb_output_dir)

Expand Down
43 changes: 43 additions & 0 deletions src/exploit_iq_commons/utils/git_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from pathlib import Path
from pathlib import PurePath
from pathlib import PurePosixPath

from git import Repo
from exploit_iq_commons.logging.loggers_factory import LoggingFactory
Expand Down Expand Up @@ -111,6 +112,47 @@ def get_repo_from_path(base_dir: str, git_repo: str = ".git") -> Repo:
else:
raise ValueError(f"Path {repo_path} does not exist")


def validate_manifest_relative_path(repo_path: Path, manifest_relative_path: str) -> None:
"""Validate that a manifest path is a safe relative directory under ``repo_path``.

Raises
------
ValueError
If the path is absolute, contains parent-traversal segments, resolves outside
the repository root, or does not exist as a directory.
"""
posix_path = PurePosixPath(manifest_relative_path)

if posix_path.is_absolute():
raise ValueError(
f"manifest_relative_path must be a relative path, got absolute path: {manifest_relative_path!r}"
)

if ".." in posix_path.parts:
raise ValueError(
"manifest_relative_path must not contain parent directory references ('..'): "
f"{manifest_relative_path!r}"
)

repo_root = repo_path.resolve()
manifest_dir = (repo_root / manifest_relative_path).resolve()

if not manifest_dir.is_relative_to(repo_root):
raise ValueError(
f"manifest_relative_path resolves outside the git repository: {manifest_relative_path!r}"
)

if not manifest_dir.is_dir():
logger.error(
f"manifest_relative_path {manifest_relative_path!r} does not exist as a directory "
f"under {repo_path}"
)
raise ValueError(
f"manifest_relative_path {manifest_relative_path!r} does not exist"
)


def resolve_path_to_manifest(git_repo_path: Path, manifest_relative_path: str | None = None) -> Path:
"""Resolve the directory containing the manifest within a Git repository.
If ``manifest_relative_path`` is provided, it is treated as a path relative to
Expand All @@ -123,6 +165,7 @@ def resolve_path_to_manifest(git_repo_path: Path, manifest_relative_path: str |
"""
if manifest_relative_path:
logger.debug(f"Appending git repo manifest path {git_repo_path} with: {manifest_relative_path}")
validate_manifest_relative_path(git_repo_path, manifest_relative_path)
path_to_manifest = git_repo_path.joinpath(manifest_relative_path)
else:
path_to_manifest = git_repo_path
Expand Down
4 changes: 4 additions & 0 deletions src/exploit_iq_commons/utils/source_code_git_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
)

from exploit_iq_commons.utils.dep_tree import INSTALLED_PACKAGES_FILE, TRANSITIVE_ENV_NAME, Ecosystem
from exploit_iq_commons.utils.git_utils import validate_manifest_relative_path
from exploit_iq_commons.utils.transitive_code_searcher_tool import (
TransitiveCodeSearcher,
)
Expand Down Expand Up @@ -264,6 +265,9 @@ def load_repo(self):

repo.git.checkout(self.ref, "--force")

if self._manifest_relative_path:
validate_manifest_relative_path(self.repo_path, self._manifest_relative_path)

logger.info("Loaded Git repository at path: '%s' @ '%s'", self.repo_path, self.ref)
TransitiveCodeSearcher.download_dependencies(
self.repo_path,
Expand Down
48 changes: 48 additions & 0 deletions src/exploit_iq_commons/utils/tests/test_git_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import pytest
from pathlib import Path

from exploit_iq_commons.utils.git_utils import resolve_path_to_manifest
from exploit_iq_commons.utils.git_utils import validate_manifest_relative_path


@pytest.fixture
def repo_with_subdir(tmp_path: Path) -> Path:
repo_path = tmp_path / "repo"
subdir = repo_path / "module" / "sub"
subdir.mkdir(parents=True)
return repo_path


class TestValidateManifestRelativePath:
def test_accepts_valid_subdirectory(self, repo_with_subdir: Path):
validate_manifest_relative_path(repo_with_subdir, "module/sub")

def test_rejects_absolute_path(self, repo_with_subdir: Path):
with pytest.raises(ValueError, match="must be a relative path"):
validate_manifest_relative_path(repo_with_subdir, "/etc/passwd")

def test_rejects_parent_traversal(self, repo_with_subdir: Path):
with pytest.raises(ValueError, match="parent directory references"):
validate_manifest_relative_path(repo_with_subdir, "../outside")

def test_rejects_embedded_parent_traversal(self, repo_with_subdir: Path):
with pytest.raises(ValueError, match="parent directory references"):
validate_manifest_relative_path(repo_with_subdir, "module/../../outside")

def test_rejects_nonexistent_directory(self, repo_with_subdir: Path):
with pytest.raises(ValueError, match="does not exist"):
validate_manifest_relative_path(repo_with_subdir, "missing/path")

def test_rejects_file_path(self, repo_with_subdir: Path):
(repo_with_subdir / "file.txt").write_text("x")
with pytest.raises(ValueError, match="does not exist"):
validate_manifest_relative_path(repo_with_subdir, "file.txt")


class TestResolvePathToManifest:
def test_returns_repo_root_when_manifest_path_is_none(self, repo_with_subdir: Path):
assert resolve_path_to_manifest(repo_with_subdir, None) == repo_with_subdir

def test_resolves_valid_subdirectory(self, repo_with_subdir: Path):
assert resolve_path_to_manifest(repo_with_subdir, "module/sub") == repo_with_subdir / "module" / "sub"

31 changes: 21 additions & 10 deletions src/exploit_iq_commons/utils/transitive_code_searcher_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,15 @@

logger = LoggingFactory.get_agent_logger(f"morpheus.{__name__}")

def determine_manifest_name_by_ecosystem(the_ecosystem:Ecosystem | None = None):
def fetch_manifest_file_names_for_ecosystem(the_ecosystem: Ecosystem | None = None) -> list[str] | None:
if the_ecosystem:
for manifest_name, ecosystem in MANIFESTS_TO_ECOSYSTEMS.items():
if ecosystem == the_ecosystem:
logger.debug(f"Manifest found for ecosystem '{the_ecosystem}': '{manifest_name}'")
return manifest_name
manifests = [
manifest_name
for manifest_name, ecosystem in MANIFESTS_TO_ECOSYSTEMS.items()
if ecosystem == the_ecosystem
]
logger.debug(f"Manifests found for ecosystem '{the_ecosystem}': {manifests}")
return manifests
return None


Expand Down Expand Up @@ -68,12 +71,20 @@ def download_dependencies(git_repo_path: Path, manifest_relative_path: str | Non

Returns whether dependencies were downloaded or not.
"""
path_to_manifest: Path
path_to_manifest = resolve_path_to_manifest(git_repo_path, manifest_relative_path)
# If ecosystem is supplied in input, then override default of first found ecosystem manifest in the repo.

manifest_file_for_ecosystem = determine_manifest_name_by_ecosystem(the_ecosystem)
if manifest_file_for_ecosystem and the_ecosystem and os.path.isfile(path_to_manifest / manifest_file_for_ecosystem):
if the_ecosystem:
manifest_files_for_ecosystem = fetch_manifest_file_names_for_ecosystem(the_ecosystem)
manifest_exists = False
for manifest_file in manifest_files_for_ecosystem or []:
if os.path.isfile(path_to_manifest / manifest_file):
manifest_exists = True
break
if not manifest_exists:
logger.error(f"No manifest files for ecosystem '{the_ecosystem.value}' were found in "
f"{path_to_manifest}. Expected one of: {manifest_files_for_ecosystem}")
raise FileNotFoundError(
f"Manifest files for ecosystem '{the_ecosystem.value}' were not found"
)
Comment thread
gnetanel marked this conversation as resolved.
logger.info(f"Setting ecosystem to user-provided value: {the_ecosystem}")
ecosystem = the_ecosystem
logger.info(f"Ecosystem field supplied in request payload, ecosystem value => {ecosystem}")
Expand Down
4 changes: 3 additions & 1 deletion src/vuln_analysis/functions/cve_clone_and_deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from exploit_iq_commons.logging.loggers_factory import LoggingFactory, trace_id
from exploit_iq_commons.utils.credential_client import credential_context
from exploit_iq_commons.utils.dep_tree import detect_ecosystem
from exploit_iq_commons.utils.git_utils import resolve_path_to_manifest

logger = LoggingFactory.get_agent_logger(__name__)

Expand Down Expand Up @@ -131,7 +132,8 @@ async def _arun(message: AgentMorpheusInput) -> AgentMorpheusEngineInput:
# Detect ecosystem from cloned repo manifests if not provided
if message.image.ecosystem is None and code_sources:
repo_path = embedder.get_repo_path(code_sources[0])
detected = detect_ecosystem(repo_path)
updated_path = resolve_path_to_manifest(repo_path, message.image.manifest_path)
Comment thread
gnetanel marked this conversation as resolved.
detected = detect_ecosystem(updated_path)
if detected is not None:
message.image.ecosystem = detected
logger.info(
Expand Down
Loading
Loading