diff --git a/README.md b/README.md index 6122c2b..a5de4e4 100644 --- a/README.md +++ b/README.md @@ -24,20 +24,42 @@ pip install git+https://github.com/acompany-develop/IMA-PCR-Utils ## What's inside -### Module +### Modules -The `imapcrutils` module consists of the following public types and functions: +The `imapcrutils` package is organized into the following modules, each with a +focused responsibility. All public symbols are also re-exported from the +top-level `imapcrutils` namespace for convenience. + +#### `imapcrutils.log` — IMA log data model and parser | Name | Description | | ---- | ----------- | | `IMALogEntry` | Represents a single IMA log entry (`pcr_idx`, `template_hash`, `template_name`, `hash_algo`, `file_hash`, `file_path`). | | `parse_ima_log_string` | Parse an ASCII IMA log string into a list of `IMALogEntry`. | + +#### `imapcrutils.template` — ima-ng template serialization and template_hash + +| Name | Description | +| ---- | ----------- | | `build_template_fields` | Build `ima-ng` template fields (digest/name) from an `IMALogEntry`. | | `calculate_expected_template_hash` | Recompute the expected template hash for an entry (default: SHA-1). | +| `validate_ima_log_entry` | Validate a single entry by comparing the template hash with the recomputed value. | + +#### `imapcrutils.pcr` — PCR10 replay and boot_aggregate + +| Name | Description | +| ---- | ----------- | | `calculate_pcr10` | Replay PCR10 by extending PCR10 with each `ima-ng` entry (default chain hash: SHA-256). | | `truncate_ima_log_by_pcr` | Truncate the IMA log at the point where the calculated PCR matches the reference value. | -| `validate_ima_log_entry` | Validate a single entry by comparing the template hash with the recomputed value. | | `calculate_boot_aggregate` | Calculate `boot_aggregate` from PCR0..PCR9 values. | + +#### `imapcrutils.appraisal` — IMA log appraisal against a YAML policy + +Subpackage split into `policy` (data model), `loader` (YAML parsing), and +`appraise` (policy evaluation). + +| Name | Description | +| ---- | ----------- | | `AppraisalResult` | Verdict (`ALLOW` / `DENY` / `NEUTRAL`) for a single IMA log entry against an appraisal policy. | | `PolicyComponent` | A single named component of an appraisal policy (`name`, `path` glob, optional `allow`/`deny` hash sets). | | `AppraisalPolicy` | An ordered collection of `PolicyComponent`s; the first matching component decides the verdict for an entry. | diff --git a/examples/appraise.py b/examples/appraise.py index 00247a1..a7a4f96 100755 --- a/examples/appraise.py +++ b/examples/appraise.py @@ -7,7 +7,8 @@ import argparse import sys -from imapcrutils import AppraisalResult, appraise_ima_log, load_policy_file, parse_ima_log_string +from imapcrutils.appraisal import AppraisalResult, appraise_ima_log, load_policy_file +from imapcrutils.log import parse_ima_log_string DEFAULT_IMA_LOG_PATH = "/sys/kernel/security/ima/ascii_runtime_measurements" diff --git a/examples/boot_aggregate.py b/examples/boot_aggregate.py index 00de9e0..84d314d 100755 --- a/examples/boot_aggregate.py +++ b/examples/boot_aggregate.py @@ -8,7 +8,7 @@ import hashlib import sys -from imapcrutils import calculate_boot_aggregate +from imapcrutils.pcr import calculate_boot_aggregate def select_hash_function(hash_algorithm: str): diff --git a/examples/pcr10.py b/examples/pcr10.py index 4593803..8d93d42 100755 --- a/examples/pcr10.py +++ b/examples/pcr10.py @@ -8,7 +8,8 @@ import hashlib import sys -from imapcrutils import calculate_pcr10, parse_ima_log_string +from imapcrutils.log import parse_ima_log_string +from imapcrutils.pcr import calculate_pcr10 DEFAULT_IMA_LOG_PATH = "/sys/kernel/security/ima/ascii_runtime_measurements" diff --git a/examples/truncate_log.py b/examples/truncate_log.py index 6db9048..65f0ea8 100755 --- a/examples/truncate_log.py +++ b/examples/truncate_log.py @@ -8,7 +8,8 @@ import hashlib import sys -from imapcrutils import parse_ima_log_string, truncate_ima_log_by_pcr +from imapcrutils.log import parse_ima_log_string +from imapcrutils.pcr import truncate_ima_log_by_pcr DEFAULT_IMA_LOG_PATH = "/sys/kernel/security/ima/ascii_runtime_measurements" diff --git a/imapcrutils/__init__.py b/imapcrutils/__init__.py index 01f6a28..ddbfcfe 100644 --- a/imapcrutils/__init__.py +++ b/imapcrutils/__init__.py @@ -2,22 +2,20 @@ """ IMA-PCR-Utils - Python Library -This package provides functions for parsing IMA log entries and calculating PCR10 values. +Parsing IMA log entries, replaying PCR10 / boot_aggregate, and appraising +IMA log entries against a YAML policy. + +Modules: + +- :mod:`imapcrutils.log` — IMA log data model and parser. +- :mod:`imapcrutils.template` — ima-ng template serialization and template_hash recomputation. +- :mod:`imapcrutils.pcr` — PCR10 replay and boot_aggregate. +- :mod:`imapcrutils.appraisal` — IMA log appraisal (policy model, YAML loader, evaluator). """ __version__ = "0.1.0" -from imapcrutils.libs import ( - IMALogEntry, - build_template_fields, - calculate_boot_aggregate, - calculate_expected_template_hash, - calculate_pcr10, - parse_ima_log_string, - truncate_ima_log_by_pcr, - validate_ima_log_entry, -) -from imapcrutils.verify import ( +from imapcrutils.appraisal import ( AppraisalPolicy, AppraisalResult, PolicyComponent, @@ -26,3 +24,28 @@ load_policy_file, verify_ima_log, ) +from imapcrutils.log import IMALogEntry, parse_ima_log_string +from imapcrutils.pcr import calculate_boot_aggregate, calculate_pcr10, truncate_ima_log_by_pcr +from imapcrutils.template import build_template_fields, calculate_expected_template_hash, validate_ima_log_entry + +__all__ = [ + # log + "IMALogEntry", + "parse_ima_log_string", + # template + "build_template_fields", + "calculate_expected_template_hash", + "validate_ima_log_entry", + # pcr + "calculate_pcr10", + "truncate_ima_log_by_pcr", + "calculate_boot_aggregate", + # appraisal + "AppraisalResult", + "PolicyComponent", + "AppraisalPolicy", + "load_policy", + "load_policy_file", + "appraise_ima_log", + "verify_ima_log", +] diff --git a/imapcrutils/appraisal/__init__.py b/imapcrutils/appraisal/__init__.py new file mode 100644 index 0000000..df5dedc --- /dev/null +++ b/imapcrutils/appraisal/__init__.py @@ -0,0 +1,24 @@ +# SPDX-License-Identifier: MIT +""" +IMA log appraisal against a YAML policy. + +The package is split into three modules: + +- :mod:`imapcrutils.appraisal.policy` — policy data model (no I/O). +- :mod:`imapcrutils.appraisal.loader` — YAML → :class:`AppraisalPolicy`. +- :mod:`imapcrutils.appraisal.appraise` — apply a policy to IMA log entries. +""" + +from imapcrutils.appraisal.appraise import appraise_ima_log, verify_ima_log +from imapcrutils.appraisal.loader import load_policy, load_policy_file +from imapcrutils.appraisal.policy import AppraisalPolicy, AppraisalResult, PolicyComponent + +__all__ = [ + "AppraisalResult", + "PolicyComponent", + "AppraisalPolicy", + "load_policy", + "load_policy_file", + "appraise_ima_log", + "verify_ima_log", +] diff --git a/imapcrutils/appraisal/appraise.py b/imapcrutils/appraisal/appraise.py new file mode 100644 index 0000000..159d83e --- /dev/null +++ b/imapcrutils/appraisal/appraise.py @@ -0,0 +1,41 @@ +# SPDX-License-Identifier: MIT +""" +Apply an appraisal policy to IMA log entries. + +For every IMA log entry, the first matching component (in declaration order) +decides the verdict: Allow, Deny, or Neutral. :func:`appraise_ima_log` +returns the per-entry verdicts; :func:`verify_ima_log` collapses them to a +single pass/fail boolean. +""" + +__all__ = [ + "appraise_ima_log", + "verify_ima_log", +] + +from imapcrutils.appraisal.policy import AppraisalPolicy, AppraisalResult +from imapcrutils.log import IMALogEntry + + +def appraise_ima_log(entries: list[IMALogEntry], policy: AppraisalPolicy) -> list[tuple[IMALogEntry, AppraisalResult]]: + """ + Classify each IMA log entry against the appraisal policy. + + Args: + entries: IMA log entries to classify. + policy: Appraisal policy. + + Returns: + A list of (entry, verdict) pairs in the same order as entries. + """ + return [(entry, policy.appraise(entry)) for entry in entries] + + +def verify_ima_log(entries: list[IMALogEntry], policy: AppraisalPolicy) -> bool: + """ + Verify that no IMA log entry is denied by the policy. + + Returns True when every entry's verdict is Allow or Neutral. Returns + False as soon as any entry hits a deny/denylist rule. + """ + return all(policy.appraise(entry) is not AppraisalResult.DENY for entry in entries) diff --git a/imapcrutils/appraisal/loader.py b/imapcrutils/appraisal/loader.py new file mode 100644 index 0000000..f992307 --- /dev/null +++ b/imapcrutils/appraisal/loader.py @@ -0,0 +1,78 @@ +# SPDX-License-Identifier: MIT +""" +YAML appraisal policy loader. + +Parses a YAML document into an :class:`AppraisalPolicy`. This is the only +module that depends on ``pyyaml``; the policy model itself +(:mod:`imapcrutils.appraisal.policy`) is format-agnostic. + +Policy format (YAML):: + + component1: + path: + allow: [...] # optional + component2: + path: + deny: [...] # optional + ... +""" + +__all__ = [ + "load_policy", + "load_policy_file", +] + +from pathlib import Path + +import yaml + +from imapcrutils.appraisal.policy import AppraisalPolicy, PolicyComponent + + +def load_policy(yaml_string: str) -> AppraisalPolicy: + """ + Parse a YAML appraisal policy string into an AppraisalPolicy. + + Args: + yaml_string: YAML document mapping component names to rule dicts. + + Returns: + AppraisalPolicy with components in declaration order. + + Raises: + ValueError: if the document is not a mapping or a component is malformed. + """ + data = yaml.safe_load(yaml_string) + if data is None: + return AppraisalPolicy(components=[]) + if not isinstance(data, dict): + raise ValueError("policy root must be a mapping of component names to rules") + + components: list[PolicyComponent] = [] + for name, rules in data.items(): + if not isinstance(rules, dict): + raise ValueError(f"component '{name}': rules must be a mapping") + path = rules.get("path") + if not isinstance(path, str): + raise ValueError(f"component '{name}': 'path' is required and must be a string") + allowlist = rules.get("allow") + if isinstance(allowlist, list) and all(map(lambda x: isinstance(x, str), allowlist)): + allow = list(map(lambda x: x.lower(), allowlist)) + elif allowlist is None: + allow = None + else: + raise ValueError(f"component '{name}': 'allow' must be a list of strings") + denylist = rules.get("deny") + if isinstance(denylist, list) and all(map(lambda x: isinstance(x, str), denylist)): + deny = list(map(lambda x: x.lower(), denylist)) + elif denylist is None: + deny = None + else: + raise ValueError(f"component '{name}': 'deny' must be a list of strings") + components.append(PolicyComponent(name=str(name), path=path, allow=allow, deny=deny)) + return AppraisalPolicy(components=components) + + +def load_policy_file(path: str | Path) -> AppraisalPolicy: + """Load an appraisal policy from a YAML file on disk.""" + return load_policy(Path(path).read_text()) diff --git a/imapcrutils/appraisal/policy.py b/imapcrutils/appraisal/policy.py new file mode 100644 index 0000000..002c60d --- /dev/null +++ b/imapcrutils/appraisal/policy.py @@ -0,0 +1,71 @@ +# SPDX-License-Identifier: MIT +""" +Appraisal policy data model. + +Defines :class:`AppraisalResult`, :class:`PolicyComponent`, and +:class:`AppraisalPolicy` — the in-memory representation of an IMA log +appraisal policy. No file-format dependencies live here; YAML parsing +is provided separately by :mod:`imapcrutils.appraisal.loader`. +""" + +__all__ = [ + "AppraisalResult", + "PolicyComponent", + "AppraisalPolicy", +] + +import fnmatch +from dataclasses import dataclass, field +from enum import Enum + +from imapcrutils.log import IMALogEntry + + +class AppraisalResult(Enum): + """Verdict for a single IMA log entry against an appraisal policy.""" + + ALLOW = "allow" + DENY = "deny" + NEUTRAL = "neutral" + + +@dataclass +class PolicyComponent: + """A single named component of an appraisal policy.""" + + name: str + path: str + allow: set[str] | None = None + deny: set[str] | None = None + + def matches_path(self, file_path: str) -> bool: + """Return True if file_path matches this component's path glob.""" + return fnmatch.fnmatchcase(file_path, self.path) + + def appraise_hash(self, file_hash_hex: str) -> AppraisalResult: + """Classify a hex-encoded file hash against this component's rules.""" + normalized = file_hash_hex.lower() + if self.deny is not None: + if normalized in self.deny: + return AppraisalResult.DENY + return AppraisalResult.ALLOW + if self.allow is not None: + if normalized in self.allow: + return AppraisalResult.ALLOW + return AppraisalResult.DENY + return AppraisalResult.NEUTRAL + + +@dataclass +class AppraisalPolicy: + """An ordered collection of policy components.""" + + components: list[PolicyComponent] = field(default_factory=list) + + def appraise(self, entry: IMALogEntry) -> AppraisalResult: + """Return the verdict for entry from the first matching component.""" + file_hash_hex = entry.file_hash.hex() + for component in self.components: + if component.matches_path(entry.file_path): + return component.appraise_hash(file_hash_hex) + return AppraisalResult.NEUTRAL diff --git a/imapcrutils/libs.py b/imapcrutils/libs.py deleted file mode 100644 index 0d89f38..0000000 --- a/imapcrutils/libs.py +++ /dev/null @@ -1,230 +0,0 @@ -# SPDX-License-Identifier: MIT -""" -IMA log parsing, PCR10 calculation and boot_aggregate calculation library. - -This module provides functions for parsing IMA log entries, calculating PCR10 values and boot_aggregate values. -""" - -__all__ = [ - "IMALogEntry", - "parse_ima_log_string", - "build_template_fields", - "calculate_expected_template_hash", - "calculate_pcr10", - "truncate_ima_log_by_pcr", - "validate_ima_log_entry", - "calculate_boot_aggregate", -] - - -import hashlib -import struct -from collections.abc import Callable -from dataclasses import dataclass - - -@dataclass -class IMALogEntry: - """Structure representing a single IMA log entry.""" - - # Fields - pcr_idx: str - template_hash: str - template_name: str - hash_algo: str - file_hash: bytes - file_path: str - - # Methods - def __str__(self) -> str: - """Return a string representation of the IMALogEntry.""" - return " ".join( - [self.pcr_idx, self.template_hash, self.template_name, self.hash_algo + ":" + self.file_hash.hex(), self.file_path] - ) - - @classmethod - def from_string(cls, line: str) -> "IMALogEntry": - """ - Parse a single line from IMA log and convert it to IMALogEntry. - - Args: - line: A line from IMA log file - (format: "pcr_idx template_hash template_name file_data file_path") - - Returns: - IMALogEntry - """ - parts = line.strip().split(" ") - if len(parts) < 5: - raise ValueError(f"Invalid IMA log entry: {line}") - pcr_idx = parts[0] - template_hash = parts[1] - template_name = parts[2] - # format: "algo:hexdigest" e.g. "sha256:0123456789abcdef..." - try: - hash_algo, file_hash_hex = parts[3].split(":") - file_hash = bytes.fromhex(file_hash_hex) - except ValueError as e: - raise ValueError(f"Invalid file_hash format: {parts[3]}") from e - file_path = " ".join(parts[4:]) - return cls(pcr_idx, template_hash, template_name, hash_algo, file_hash, file_path) - - -def parse_ima_log_string(log_string: str) -> list[IMALogEntry]: - """ - Parse a string of IMA log entries and convert it to a list of IMALogEntry. - - Args: - log_string: A string of IMA log entries - - Returns: - List of IMALogEntry - """ - entries = [] - for line in log_string.split("\n"): - if line.strip() == "": - continue - entry = IMALogEntry.from_string(line) - entries.append(entry) - return entries - - -def build_template_fields(entry: IMALogEntry) -> tuple[bytes, bytes, bytes, bytes]: - """ - Build d_ng_content, d_ng_field, n_ng_content, n_ng_field from IMA log entry. - - Args: - entry: IMALogEntry structure - - Returns: - Tuple of (d_ng_content, d_ng_field, n_ng_content, n_ng_field) - - Raises: - ValueError: If file_data format is invalid - """ - # 1. Create d-ng (Digest) field - # Format: [Algo(string)] + [:] + [\0] + [digest_bytes] - algo = entry.hash_algo - digest_bytes = entry.file_hash - # "algo:\0" + digest_bytes - d_ng_content = algo.encode("ascii") + b":" + b"\x00" + digest_bytes - d_ng_field = struct.pack(" bytes: - """ - Calculate expected template hash from IMA log entry. - - Args: - entry: IMALogEntry structure - hash_func: Hash function to use (default: hashlib.sha1) - Should be a function that takes bytes and returns bytes digest - - Returns: - Expected template hash as bytes - """ - _d_ng_content, d_ng_field, _n_ng_content, n_ng_field = build_template_fields(entry) - # Combine template data - template_data = d_ng_field + n_ng_field - # Calculate Template Hash - expected_template_hash = hash_func(template_data).digest() - - return expected_template_hash - - -def calculate_pcr10(entries: list[IMALogEntry], hash_func: Callable[[bytes], bytes] = hashlib.sha256) -> bytes: - """ - Calculate PCR10 value from a list of IMA log entries. - - Args: - entries: List of IMALogEntry structures - hash_func: Hash function for PCR extension (default: hashlib.sha256) - - Returns: - PCR10 value as bytes - """ - # PCR initial value is all zeros - pcr_value = bytes(hash_func().digest_size) - for entry in entries: - # Skip if PCR is not 10 - if entry.pcr_idx != "10": - continue - # Only support ima-ng template - if entry.template_name != "ima-ng": - continue - # Calculate template hash using the specified hash function - expected_template_hash = calculate_expected_template_hash(entry, hash_func) - # PCR extension (Extend Operation) - # PCR_new = HASH( PCR_old || Template_Hash ) - pcr_value = hash_func(pcr_value + expected_template_hash).digest() - return pcr_value - - -def truncate_ima_log_by_pcr( - entries: list[IMALogEntry], pcr: bytes, hash_func: Callable[[bytes], bytes] = hashlib.sha256 -) -> list[IMALogEntry] | None: - """ - Find the point in the IMA log where the calculated PCR10 matches the reference value. - - Filters IMA log entries for PCR index "10" and "ima-ng" template, then extends - the PCR value incrementally. Returns the sublist of valid entries from the - beginning up to and including the entry where the PCR value matches the reference. - Returns None if no match is found. - - Args: - entries: List of IMALogEntry objects to process - pcr: Reference PCR value to match against - hash_func: Hash function for PCR extension (default: hashlib.sha256) - - Returns: - List of IMALogEntry objects from the beginning up to the matching entry, - or None if the reference PCR value is not found - """ - results = [] - pcr_value = bytes(hash_func().digest_size) - for entry in entries: - if entry.pcr_idx != "10": - continue - if entry.template_name != "ima-ng": - continue - results.append(entry) - template_hash = calculate_expected_template_hash(entry, hash_func) - pcr_value = hash_func(pcr_value + template_hash).digest() - if pcr_value == pcr: - return results - return None - - -def validate_ima_log_entry(entry: IMALogEntry, hash_func: Callable[[bytes], bytes] = hashlib.sha1) -> bool: - """ - Validate IMA log entry. Template_hash must coincide with the hash of the file data. - - Args: - entry: IMALogEntry structure - hash_func: Hash function to use (default: hashlib.sha1) - Returns: - True if entry is valid, False otherwise - """ - expected_template_hash = calculate_expected_template_hash(entry, hash_func) - return entry.template_hash == expected_template_hash.hex() - - -def calculate_boot_aggregate(pcrlist: list[bytes], hash_func: Callable[[bytes], bytes] = hashlib.sha256) -> bytes: - """ - Calculate boot aggregate from PCR0..PCR9. - - Args: - pcrlist: List of PCR0..PCR9 byte strings - hash_func: Hash function for boot aggregate calculation (default: hashlib.sha256) - Returns: - Boot aggregate as bytes - """ - if len(pcrlist) != 10: - raise ValueError(f"length of PCR list is expected to be 10: got {len(pcrlist)}") - return hash_func(b"".join(pcrlist)).digest() diff --git a/imapcrutils/log.py b/imapcrutils/log.py new file mode 100644 index 0000000..7083f1b --- /dev/null +++ b/imapcrutils/log.py @@ -0,0 +1,81 @@ +# SPDX-License-Identifier: MIT +""" +IMA log data model and parser. + +Provides :class:`IMALogEntry` representing a single IMA log line and +:func:`parse_ima_log_string` for parsing a multi-line log into a list of +entries. +""" + +__all__ = [ + "IMALogEntry", + "parse_ima_log_string", +] + +from dataclasses import dataclass + + +@dataclass +class IMALogEntry: + """Structure representing a single IMA log entry.""" + + # Fields + pcr_idx: str + template_hash: str + template_name: str + hash_algo: str + file_hash: bytes + file_path: str + + # Methods + def __str__(self) -> str: + """Return a string representation of the IMALogEntry.""" + return " ".join( + [self.pcr_idx, self.template_hash, self.template_name, self.hash_algo + ":" + self.file_hash.hex(), self.file_path] + ) + + @classmethod + def from_string(cls, line: str) -> "IMALogEntry": + """ + Parse a single line from IMA log and convert it to IMALogEntry. + + Args: + line: A line from IMA log file + (format: "pcr_idx template_hash template_name file_data file_path") + + Returns: + IMALogEntry + """ + parts = line.strip().split(" ") + if len(parts) < 5: + raise ValueError(f"Invalid IMA log entry: {line}") + pcr_idx = parts[0] + template_hash = parts[1] + template_name = parts[2] + # format: "algo:hexdigest" e.g. "sha256:0123456789abcdef..." + try: + hash_algo, file_hash_hex = parts[3].split(":") + file_hash = bytes.fromhex(file_hash_hex) + except ValueError as e: + raise ValueError(f"Invalid file_hash format: {parts[3]}") from e + file_path = " ".join(parts[4:]) + return cls(pcr_idx, template_hash, template_name, hash_algo, file_hash, file_path) + + +def parse_ima_log_string(log_string: str) -> list[IMALogEntry]: + """ + Parse a string of IMA log entries and convert it to a list of IMALogEntry. + + Args: + log_string: A string of IMA log entries + + Returns: + List of IMALogEntry + """ + entries = [] + for line in log_string.split("\n"): + if line.strip() == "": + continue + entry = IMALogEntry.from_string(line) + entries.append(entry) + return entries diff --git a/imapcrutils/pcr.py b/imapcrutils/pcr.py new file mode 100644 index 0000000..b27ab75 --- /dev/null +++ b/imapcrutils/pcr.py @@ -0,0 +1,99 @@ +# SPDX-License-Identifier: MIT +""" +PCR value computations: PCR10 replay from IMA log entries and boot_aggregate +from PCR0..PCR9. + +:func:`calculate_pcr10` and :func:`truncate_ima_log_by_pcr` replay the PCR +extension chain that IMA performs at runtime. :func:`calculate_boot_aggregate` +computes the boot-time PCR digest used as the first IMA measurement. +""" + +__all__ = [ + "calculate_pcr10", + "truncate_ima_log_by_pcr", + "calculate_boot_aggregate", +] + +import hashlib +from collections.abc import Callable + +from imapcrutils.log import IMALogEntry +from imapcrutils.template import calculate_expected_template_hash + + +def calculate_pcr10(entries: list[IMALogEntry], hash_func: Callable[[bytes], bytes] = hashlib.sha256) -> bytes: + """ + Calculate PCR10 value from a list of IMA log entries. + + Args: + entries: List of IMALogEntry structures + hash_func: Hash function for PCR extension (default: hashlib.sha256) + + Returns: + PCR10 value as bytes + """ + # PCR initial value is all zeros + pcr_value = bytes(hash_func().digest_size) + for entry in entries: + # Skip if PCR is not 10 + if entry.pcr_idx != "10": + continue + # Only support ima-ng template + if entry.template_name != "ima-ng": + continue + # Calculate template hash using the specified hash function + expected_template_hash = calculate_expected_template_hash(entry, hash_func) + # PCR extension (Extend Operation) + # PCR_new = HASH( PCR_old || Template_Hash ) + pcr_value = hash_func(pcr_value + expected_template_hash).digest() + return pcr_value + + +def truncate_ima_log_by_pcr( + entries: list[IMALogEntry], pcr: bytes, hash_func: Callable[[bytes], bytes] = hashlib.sha256 +) -> list[IMALogEntry] | None: + """ + Find the point in the IMA log where the calculated PCR10 matches the reference value. + + Filters IMA log entries for PCR index "10" and "ima-ng" template, then extends + the PCR value incrementally. Returns the sublist of valid entries from the + beginning up to and including the entry where the PCR value matches the reference. + Returns None if no match is found. + + Args: + entries: List of IMALogEntry objects to process + pcr: Reference PCR value to match against + hash_func: Hash function for PCR extension (default: hashlib.sha256) + + Returns: + List of IMALogEntry objects from the beginning up to the matching entry, + or None if the reference PCR value is not found + """ + results = [] + pcr_value = bytes(hash_func().digest_size) + for entry in entries: + if entry.pcr_idx != "10": + continue + if entry.template_name != "ima-ng": + continue + results.append(entry) + template_hash = calculate_expected_template_hash(entry, hash_func) + pcr_value = hash_func(pcr_value + template_hash).digest() + if pcr_value == pcr: + return results + return None + + +def calculate_boot_aggregate(pcrlist: list[bytes], hash_func: Callable[[bytes], bytes] = hashlib.sha256) -> bytes: + """ + Calculate boot aggregate from PCR0..PCR9. + + Args: + pcrlist: List of PCR0..PCR9 byte strings + hash_func: Hash function for boot aggregate calculation (default: hashlib.sha256) + Returns: + Boot aggregate as bytes + """ + if len(pcrlist) != 10: + raise ValueError(f"length of PCR list is expected to be 10: got {len(pcrlist)}") + return hash_func(b"".join(pcrlist)).digest() diff --git a/imapcrutils/template.py b/imapcrutils/template.py new file mode 100644 index 0000000..422ffed --- /dev/null +++ b/imapcrutils/template.py @@ -0,0 +1,83 @@ +# SPDX-License-Identifier: MIT +""" +ima-ng template serialization and template_hash recomputation. + +Builds the d-ng (digest) and n-ng (name) fields that comprise an ima-ng +template, recomputes the expected template hash, and validates that the +recorded template_hash in a log entry matches the recomputed value. +""" + +__all__ = [ + "build_template_fields", + "calculate_expected_template_hash", + "validate_ima_log_entry", +] + +import hashlib +import struct +from collections.abc import Callable + +from imapcrutils.log import IMALogEntry + + +def build_template_fields(entry: IMALogEntry) -> tuple[bytes, bytes, bytes, bytes]: + """ + Build d_ng_content, d_ng_field, n_ng_content, n_ng_field from IMA log entry. + + Args: + entry: IMALogEntry structure + + Returns: + Tuple of (d_ng_content, d_ng_field, n_ng_content, n_ng_field) + + Raises: + ValueError: If file_data format is invalid + """ + # 1. Create d-ng (Digest) field + # Format: [Algo(string)] + [:] + [\0] + [digest_bytes] + algo = entry.hash_algo + digest_bytes = entry.file_hash + # "algo:\0" + digest_bytes + d_ng_content = algo.encode("ascii") + b":" + b"\x00" + digest_bytes + d_ng_field = struct.pack(" bytes: + """ + Calculate expected template hash from IMA log entry. + + Args: + entry: IMALogEntry structure + hash_func: Hash function to use (default: hashlib.sha1) + Should be a function that takes bytes and returns bytes digest + + Returns: + Expected template hash as bytes + """ + _d_ng_content, d_ng_field, _n_ng_content, n_ng_field = build_template_fields(entry) + # Combine template data + template_data = d_ng_field + n_ng_field + # Calculate Template Hash + expected_template_hash = hash_func(template_data).digest() + + return expected_template_hash + + +def validate_ima_log_entry(entry: IMALogEntry, hash_func: Callable[[bytes], bytes] = hashlib.sha1) -> bool: + """ + Validate IMA log entry. Template_hash must coincide with the hash of the file data. + + Args: + entry: IMALogEntry structure + hash_func: Hash function to use (default: hashlib.sha1) + Returns: + True if entry is valid, False otherwise + """ + expected_template_hash = calculate_expected_template_hash(entry, hash_func) + return entry.template_hash == expected_template_hash.hex() diff --git a/imapcrutils/verify.py b/imapcrutils/verify.py deleted file mode 100644 index 529cfa2..0000000 --- a/imapcrutils/verify.py +++ /dev/null @@ -1,173 +0,0 @@ -# SPDX-License-Identifier: MIT -""" -IMA log appraisal against a YAML policy. - -Each policy component declares a file-path glob and one or more allow/deny -hash rules. For every IMA log entry the first matching component (in -declaration order) decides the verdict: Allow, Deny, or Neutral. - -Policy format (YAML):: - - component1: - path: - allow: [...] # optional - component2: - path: - deny: [...] # optional - ... -""" - -__all__ = [ - "AppraisalResult", - "PolicyComponent", - "AppraisalPolicy", - "load_policy", - "load_policy_file", - "appraise_ima_log", - "verify_ima_log", -] - -import fnmatch -from dataclasses import dataclass, field -from enum import Enum -from pathlib import Path - -import yaml - -from imapcrutils.libs import IMALogEntry - - -class AppraisalResult(Enum): - """Verdict for a single IMA log entry against an appraisal policy.""" - - ALLOW = "allow" - DENY = "deny" - NEUTRAL = "neutral" - - -@dataclass -class PolicyComponent: - """A single named component of an appraisal policy.""" - - name: str - path: str - allow: set[str] | None = None - deny: set[str] | None = None - - def matches_path(self, file_path: str) -> bool: - """Return True if file_path matches this component's path glob.""" - return fnmatch.fnmatchcase(file_path, self.path) - - def appraise_hash(self, file_hash_hex: str) -> AppraisalResult: - """Classify a hex-encoded file hash against this component's rules.""" - normalized = file_hash_hex.lower() - if self.deny is not None: - if normalized in self.deny: - return AppraisalResult.DENY - return AppraisalResult.ALLOW - if self.allow is not None: - if normalized in self.allow: - return AppraisalResult.ALLOW - return AppraisalResult.DENY - return AppraisalResult.NEUTRAL - - -@dataclass -class AppraisalPolicy: - """An ordered collection of policy components.""" - - components: list[PolicyComponent] = field(default_factory=list) - - def appraise(self, entry: IMALogEntry) -> AppraisalResult: - """Return the verdict for entry from the first matching component.""" - file_hash_hex = entry.file_hash.hex() - for component in self.components: - if component.matches_path(entry.file_path): - return component.appraise_hash(file_hash_hex) - return AppraisalResult.NEUTRAL - - -def _coerce_hash_set(name: str, single: object, many: object) -> set[str]: - result: set[str] = set() - if single is not None: - if not isinstance(single, str): - raise ValueError(f"component '{name}': scalar hash must be a string") - result.add(single.lower()) - if many is not None: - if not isinstance(many, list) or not all(isinstance(h, str) for h in many): - raise ValueError(f"component '{name}': list hash field must be a list of strings") - result.update(h.lower() for h in many) - return result - - -def load_policy(yaml_string: str) -> AppraisalPolicy: - """ - Parse a YAML appraisal policy string into an AppraisalPolicy. - - Args: - yaml_string: YAML document mapping component names to rule dicts. - - Returns: - AppraisalPolicy with components in declaration order. - - Raises: - ValueError: if the document is not a mapping or a component is malformed. - """ - data = yaml.safe_load(yaml_string) - if data is None: - return AppraisalPolicy(components=[]) - if not isinstance(data, dict): - raise ValueError("policy root must be a mapping of component names to rules") - - components: list[PolicyComponent] = [] - for name, rules in data.items(): - if not isinstance(rules, dict): - raise ValueError(f"component '{name}': rules must be a mapping") - path = rules.get("path") - if not isinstance(path, str): - raise ValueError(f"component '{name}': 'path' is required and must be a string") - allowlist = rules.get("allow") - if isinstance(allowlist, list) and all(map(lambda x: isinstance(x, str), allowlist)): - allow = list(map(lambda x: x.lower(), allowlist)) - elif allowlist is None: - allow = None - else: - raise ValueError(f"component '{name}': 'allow' must be a list of strings") - denylist = rules.get("deny") - if isinstance(denylist, list) and all(map(lambda x: isinstance(x, str), denylist)): - deny = list(map(lambda x: x.lower(), denylist)) - elif denylist is None: - deny = None - else: - raise ValueError(f"component '{name}': 'deny' must be a list of strings") - components.append(PolicyComponent(name=str(name), path=path, allow=allow, deny=deny)) - return AppraisalPolicy(components=components) - - -def load_policy_file(path: str | Path) -> AppraisalPolicy: - """Load an appraisal policy from a YAML file on disk.""" - return load_policy(Path(path).read_text()) - - -def appraise_ima_log(entries: list[IMALogEntry], policy: AppraisalPolicy) -> list[tuple[IMALogEntry, AppraisalResult]]: - """ - Classify each IMA log entry against the appraisal policy. - - Args: - entries: IMA log entries to classify. - policy: Appraisal policy. - - Returns: - A list of (entry, verdict) pairs in the same order as entries. - """ - return [(entry, policy.appraise(entry)) for entry in entries] - - -def verify_ima_log(entries: list[IMALogEntry], policy: AppraisalPolicy) -> bool: - """ - Verify that no IMA log entry is denied by the policy. - - Returns True when every entry's verdict is Allow or Neutral. Returns - False as soon as any entry hits a deny/denylist rule. - """ - return all(policy.appraise(entry) is not AppraisalResult.DENY for entry in entries) diff --git a/pyproject.toml b/pyproject.toml index e3daf70..d88b603 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,7 +19,7 @@ dev = [ ] [tool.setuptools.packages.find] -include = ["imapcrutils"] +include = ["imapcrutils", "imapcrutils.*"] [tool.ruff] target-version = "py310" diff --git a/tests/test_appraisal_appraise.py b/tests/test_appraisal_appraise.py new file mode 100644 index 0000000..90f9b35 --- /dev/null +++ b/tests/test_appraisal_appraise.py @@ -0,0 +1,125 @@ +# SPDX-License-Identifier: MIT +"""Tests for imapcrutils.appraisal.appraise — appraise_ima_log and verify_ima_log.""" + +import textwrap + +from imapcrutils import ( + AppraisalPolicy, + AppraisalResult, + IMALogEntry, + PolicyComponent, + appraise_ima_log, + load_policy, + parse_ima_log_string, + verify_ima_log, +) + +SAMPLE_HASH = "088faac4777b024045bd578c5c3f8efc4ac2cafb4af90a12832a762feb58eb88" +OTHER_HASH = "00" * 32 + + +def _entry(path: str, hash_hex: str = SAMPLE_HASH) -> IMALogEntry: + return IMALogEntry( + pcr_idx="10", + template_hash="8facace9d7255a1985e976e9bb59675f211c82de", + template_name="ima-ng", + hash_algo="sha256", + file_hash=bytes.fromhex(hash_hex), + file_path=path, + ) + + +# --------------------------------------------------------------------------- +# appraise_ima_log +# --------------------------------------------------------------------------- + + +class TestAppraiseImaLog: + """Tests for appraise_ima_log — classifying a list of entries.""" + + def test_returns_pairs_in_order(self): + """Result preserves input order and pairs each entry with its verdict.""" + entries = [ + _entry("/usr/bin/a"), + _entry("/usr/bin/b", OTHER_HASH), + _entry("/etc/x"), + ] + policy = AppraisalPolicy(components=[PolicyComponent(name="bin", path="/usr/bin/*", allow=[SAMPLE_HASH])]) + result = appraise_ima_log(entries, policy) + assert [r[0].file_path for r in result] == ["/usr/bin/a", "/usr/bin/b", "/etc/x"] + assert [r[1] for r in result] == [ + AppraisalResult.ALLOW, + AppraisalResult.DENY, + AppraisalResult.NEUTRAL, + ] + + def test_empty_entries(self): + """No entries yields an empty result list.""" + assert appraise_ima_log([], AppraisalPolicy()) == [] + + +# --------------------------------------------------------------------------- +# verify_ima_log +# --------------------------------------------------------------------------- + + +class TestVerifyImaLog: + """Tests for verify_ima_log — boolean pass/fail across all entries.""" + + def test_all_allow_returns_true(self): + """All-allow log passes.""" + entries = [_entry("/usr/bin/a"), _entry("/usr/bin/b")] + policy = AppraisalPolicy(components=[PolicyComponent(name="bin", path="/usr/bin/*", allow={SAMPLE_HASH})]) + assert verify_ima_log(entries, policy) is True + + def test_neutral_entries_pass(self): + """Neutral entries (no matching component) do not fail verification.""" + entries = [_entry("/etc/x"), _entry("/usr/bin/a")] + policy = AppraisalPolicy(components=[PolicyComponent(name="bin", path="/usr/bin/*", allow={SAMPLE_HASH})]) + assert verify_ima_log(entries, policy) is True + + def test_single_deny_returns_false(self): + """A single deny in the log fails verification.""" + entries = [ + _entry("/usr/bin/good"), + _entry("/usr/bin/bad", OTHER_HASH), + ] + policy = AppraisalPolicy( + components=[PolicyComponent(name="bin", path="/usr/bin/*", allow={SAMPLE_HASH}, deny={OTHER_HASH})] + ) + assert verify_ima_log(entries, policy) is False + + def test_empty_log_passes(self): + """An empty log trivially passes.""" + assert verify_ima_log([], AppraisalPolicy()) is True + + def test_real_sample_with_wildcard_allowlist(self, sample_ima_log): + """A policy that allows every path/hash from the sample log passes.""" + entries = parse_ima_log_string(sample_ima_log) + all_hashes = {e.file_hash.hex() for e in entries} + policy = AppraisalPolicy(components=[PolicyComponent(name="all", path="*", allow=all_hashes)]) + assert verify_ima_log(entries, policy) is True + results = appraise_ima_log(entries, policy) + assert all(v is AppraisalResult.ALLOW for _, v in results) + + def test_real_sample_with_one_denied_hash(self, sample_ima_log): + """Denying any hash present in the sample log fails verification.""" + entries = parse_ima_log_string(sample_ima_log) + target_hash = entries[5].file_hash.hex() + policy = AppraisalPolicy(components=[PolicyComponent(name="all", path="*", deny={target_hash})]) + assert verify_ima_log(entries, policy) is False + + def test_load_policy_end_to_end(self, sample_ima_log): + """YAML-loaded policy integrates with parse_ima_log_string + verify.""" + entries = parse_ima_log_string(sample_ima_log) + boot_hash = entries[0].file_hash.hex() + yaml_str = textwrap.dedent( + f""" + boot: + path: boot_aggregate + allow: [{boot_hash}] + """ + ) + policy = load_policy(yaml_str) + results = dict((e.file_path, v) for e, v in appraise_ima_log(entries, policy)) + assert results["boot_aggregate"] is AppraisalResult.ALLOW diff --git a/tests/test_appraisal_loader.py b/tests/test_appraisal_loader.py new file mode 100644 index 0000000..7dac7f4 --- /dev/null +++ b/tests/test_appraisal_loader.py @@ -0,0 +1,78 @@ +# SPDX-License-Identifier: MIT +"""Tests for imapcrutils.appraisal.loader — parsing YAML into AppraisalPolicy.""" + +import textwrap + +import pytest + +from imapcrutils import load_policy, load_policy_file + + +class TestLoadPolicy: + """Tests for load_policy — parsing YAML into AppraisalPolicy.""" + + def test_full_policy(self): + """Parse a policy with all four rule fields and preserve component order.""" + yaml_str = textwrap.dedent( + """ + kernel: + path: /usr/lib/modules/* + allow: [aaaa, bbbb, cccc] + userland: + path: /usr/bin/* + allow: [dddd] + userland2: + path: /usr/local/bin/* + deny: [eeee] + """ + ) + policy = load_policy(yaml_str) + assert [c.name for c in policy.components] == ["kernel", "userland", "userland2"] + kernel = policy.components[0] + assert kernel.path == "/usr/lib/modules/*" + assert kernel.allow == ["aaaa", "bbbb", "cccc"] + assert kernel.deny is None + userland = policy.components[1] + assert userland.path == "/usr/bin/*" + assert userland.allow == ["dddd"] + assert userland.deny is None + userland2 = policy.components[2] + assert userland2.path == "/usr/local/bin/*" + assert userland2.allow is None + assert userland2.deny == ["eeee"] + + def test_lowercases_hashes(self): + """Hashes are normalized to lowercase so callers can mix cases.""" + policy = load_policy("c:\n path: /x\n allow: [ABCDEF]\n") + assert policy.components[0].allow == ["abcdef"] + + def test_empty_yaml(self): + """An empty document is a policy with zero components.""" + assert load_policy("").components == [] + + def test_root_must_be_mapping(self): + """A non-mapping root document is rejected.""" + with pytest.raises(ValueError, match="root must be a mapping"): + load_policy("- not a mapping\n") + + def test_component_rules_must_be_mapping(self): + """Component rules must be a mapping.""" + with pytest.raises(ValueError, match="rules must be a mapping"): + load_policy("c: not-a-mapping\n") + + def test_missing_path(self): + """A component without 'path' is rejected.""" + with pytest.raises(ValueError, match="'path' is required"): + load_policy("c:\n allow: [abcd]\n") + + def test_allowlist_must_be_list_of_strings(self): + """allowlist must be a list of strings.""" + with pytest.raises(ValueError, match="'allow' must be a list of strings"): + load_policy("c:\n path: /x\n allow: 'not-a-list'\n") + + def test_load_policy_file(self, tmp_path): + """load_policy_file reads from disk and parses the same way.""" + f = tmp_path / "policy.yaml" + f.write_text("c:\n path: /x\n allow: [abcd]\n") + policy = load_policy_file(f) + assert policy.components[0].allow == ["abcd"] diff --git a/tests/test_appraisal_policy.py b/tests/test_appraisal_policy.py new file mode 100644 index 0000000..12a5820 --- /dev/null +++ b/tests/test_appraisal_policy.py @@ -0,0 +1,91 @@ +# SPDX-License-Identifier: MIT +"""Tests for imapcrutils.appraisal.policy — policy data model and per-entry verdict.""" + +from imapcrutils import ( + AppraisalPolicy, + AppraisalResult, + IMALogEntry, + PolicyComponent, +) + +SAMPLE_HASH = "088faac4777b024045bd578c5c3f8efc4ac2cafb4af90a12832a762feb58eb88" +OTHER_HASH = "00" * 32 + + +def _entry(path: str, hash_hex: str = SAMPLE_HASH) -> IMALogEntry: + return IMALogEntry( + pcr_idx="10", + template_hash="8facace9d7255a1985e976e9bb59675f211c82de", + template_name="ima-ng", + hash_algo="sha256", + file_hash=bytes.fromhex(hash_hex), + file_path=path, + ) + + +# --------------------------------------------------------------------------- +# PolicyComponent.appraise_hash +# --------------------------------------------------------------------------- + + +class TestPolicyComponentAppraise: + """Tests for the per-component verdict logic.""" + + def test_allow_hit(self): + """A hash in the allow set yields ALLOW.""" + c = PolicyComponent(name="c", path="*", allow={SAMPLE_HASH}) + assert c.appraise_hash(SAMPLE_HASH) is AppraisalResult.ALLOW + + def test_allow_unhit(self): + """A hash outside the allow set yields DENY.""" + c = PolicyComponent(name="c", path="*", allow={OTHER_HASH}) + assert c.appraise_hash(SAMPLE_HASH) is AppraisalResult.DENY + + def test_deny_hit(self): + """A hash in the deny set yields DENY.""" + c = PolicyComponent(name="c", path="*", deny={SAMPLE_HASH}) + assert c.appraise_hash(SAMPLE_HASH) is AppraisalResult.DENY + + def test_deny_unhit(self): + """A hash outside the deny set yields ALLOW.""" + c = PolicyComponent(name="c", path="*", deny={OTHER_HASH}) + assert c.appraise_hash(SAMPLE_HASH) is AppraisalResult.ALLOW + + +# --------------------------------------------------------------------------- +# AppraisalPolicy.appraise — path matching and component ordering +# --------------------------------------------------------------------------- + + +class TestAppraisalPolicy: + """Tests for AppraisalPolicy.appraise — path matching and ordering.""" + + def test_first_match_wins(self): + """The first component whose path matches decides the verdict.""" + first = PolicyComponent(name="first", path="/usr/*", allow={SAMPLE_HASH}) + second = PolicyComponent(name="second", path="/usr/bin/*", deny={SAMPLE_HASH}) + policy = AppraisalPolicy(components=[first, second]) + assert policy.appraise(_entry("/usr/bin/foo")) is AppraisalResult.ALLOW + + def test_first_match_wins_reverse_order(self): + """Reversing component order flips the verdict — confirms order matters.""" + first = PolicyComponent(name="first", path="/usr/bin/*", deny={SAMPLE_HASH}) + second = PolicyComponent(name="second", path="/usr/*", allow={SAMPLE_HASH}) + policy = AppraisalPolicy(components=[first, second]) + assert policy.appraise(_entry("/usr/bin/foo")) is AppraisalResult.DENY + + def test_no_matching_component_is_neutral(self): + """An entry whose path matches no component is NEUTRAL.""" + policy = AppraisalPolicy(components=[PolicyComponent(name="c", path="/etc/*", allow={SAMPLE_HASH})]) + assert policy.appraise(_entry("/usr/bin/foo")) is AppraisalResult.NEUTRAL + + def test_glob_question_mark(self): + """fnmatch '?' wildcards match a single character.""" + c = PolicyComponent(name="c", path="/a/?.bin", allow={SAMPLE_HASH}) + policy = AppraisalPolicy(components=[c]) + assert policy.appraise(_entry("/a/x.bin")) is AppraisalResult.ALLOW + assert policy.appraise(_entry("/a/xx.bin")) is AppraisalResult.NEUTRAL + + def test_empty_policy_is_neutral(self): + """An empty policy yields NEUTRAL for every entry.""" + assert AppraisalPolicy().appraise(_entry("/anything")) is AppraisalResult.NEUTRAL diff --git a/tests/test_log.py b/tests/test_log.py new file mode 100644 index 0000000..2a19ce0 --- /dev/null +++ b/tests/test_log.py @@ -0,0 +1,106 @@ +# SPDX-License-Identifier: MIT +"""Tests for imapcrutils.log — IMA log data model and parsing.""" + +import pytest + +from imapcrutils import IMALogEntry, parse_ima_log_string + +SAMPLE_LINE = "10 8facace9d7255a1985e976e9bb59675f211c82de ima-ng sha256:088faac4777b024045bd578c5c3f8efc4ac2cafb4af90a12832a762feb58eb88 boot_aggregate" # noqa: E501 + +SAMPLE_LINE_LONG_PATH = "10 842c66cec8b78a650d98e85cbbf0b67fc1a2a605 ima-ng sha256:cf06a09ff00ee3275779e83cf9a4037dd822ba9dc16442584212f605ba71e341 /usr/lib/modules/6.14.0-1017-azure-fde/kernel/fs/autofs/autofs4.ko.zst" # noqa: E501 + + +# --------------------------------------------------------------------------- +# IMALogEntry.from_string +# --------------------------------------------------------------------------- + + +class TestIMALogEntryFromString: + """Tests for IMALogEntry.from_string — parsing a single IMA log line.""" + + def test_basic(self): + """Parse a well-formed boot_aggregate line and verify every field.""" + entry = IMALogEntry.from_string(SAMPLE_LINE) + assert entry.pcr_idx == "10" + assert entry.template_hash == "8facace9d7255a1985e976e9bb59675f211c82de" + assert entry.template_name == "ima-ng" + assert entry.hash_algo == "sha256" + assert entry.file_hash == bytes.fromhex("088faac4777b024045bd578c5c3f8efc4ac2cafb4af90a12832a762feb58eb88") + assert entry.file_path == "boot_aggregate" + + def test_long_file_path(self): + """Parse a line with a long absolute file path.""" + entry = IMALogEntry.from_string(SAMPLE_LINE_LONG_PATH) + assert entry.file_path == ("/usr/lib/modules/6.14.0-1017-azure-fde/kernel/fs/autofs/autofs4.ko.zst") + + def test_file_path_with_spaces(self): + """Spaces after the 4th field are preserved as part of the file path.""" + line = "10 aaaa ima-ng sha256:bbbb /path/with spaces/file name.txt" + entry = IMALogEntry.from_string(line) + assert entry.file_path == "/path/with spaces/file name.txt" + + def test_too_few_parts_raises(self): + """Fewer than 5 space-separated fields must raise ValueError.""" + with pytest.raises(ValueError, match="Invalid IMA log entry"): + IMALogEntry.from_string("10 abc ima-ng") + + def test_bad_hash_format_raises(self): + """file_data field without a colon separator must raise ValueError.""" + with pytest.raises(ValueError, match="Invalid file_hash format"): + IMALogEntry.from_string("10 abc ima-ng nocolon /some/path") + + def test_bad_hex_digest_raises(self): + """Non-hex characters in the digest portion must raise ValueError.""" + with pytest.raises(ValueError, match="Invalid file_hash format"): + IMALogEntry.from_string("10 abc ima-ng sha256:ZZZZ /some/path") + + +# --------------------------------------------------------------------------- +# IMALogEntry.__str__ (round-trip) +# --------------------------------------------------------------------------- + + +class TestIMALogEntryStr: + """Tests for IMALogEntry.__str__ — round-trip fidelity.""" + + def test_round_trip(self): + """str(from_string(line)) must reproduce the original line.""" + entry = IMALogEntry.from_string(SAMPLE_LINE) + assert str(entry) == SAMPLE_LINE + + def test_round_trip_long_path(self): + """Round-trip with a long absolute file path.""" + entry = IMALogEntry.from_string(SAMPLE_LINE_LONG_PATH) + assert str(entry) == SAMPLE_LINE_LONG_PATH + + +# --------------------------------------------------------------------------- +# parse_ima_log_string +# --------------------------------------------------------------------------- + + +class TestParseImaLogString: + """Tests for parse_ima_log_string — multi-line IMA log parsing.""" + + def test_sample_file(self, sample_ima_log): + """Parse the full 32-entry sample log and spot-check first/last entries.""" + entries = parse_ima_log_string(sample_ima_log) + assert len(entries) == 32 + assert entries[0].file_path == "boot_aggregate" + assert entries[-1].file_path.endswith("tls.ko.zst") + + def test_empty_string(self): + """Empty input must produce an empty list.""" + assert not parse_ima_log_string("") + + def test_blank_lines_skipped(self): + """Blank lines (leading, trailing, between entries) are ignored.""" + log = "\n" + SAMPLE_LINE + "\n\n" + SAMPLE_LINE_LONG_PATH + "\n\n" + entries = parse_ima_log_string(log) + assert len(entries) == 2 + + def test_single_line(self): + """A single line (no trailing newline) is parsed correctly.""" + entries = parse_ima_log_string(SAMPLE_LINE) + assert len(entries) == 1 + assert entries[0].pcr_idx == "10" diff --git a/tests/test_libs.py b/tests/test_pcr.py similarity index 53% rename from tests/test_libs.py rename to tests/test_pcr.py index 9ee5a29..b62e2ea 100644 --- a/tests/test_libs.py +++ b/tests/test_pcr.py @@ -1,209 +1,20 @@ # SPDX-License-Identifier: MIT -"""Tests for imapcrutils.libs — parsing, PCR10 replay, validation, and boot_aggregate.""" +"""Tests for imapcrutils.pcr — PCR10 replay, log truncation, and boot_aggregate.""" import hashlib -import struct import pytest from imapcrutils import ( - IMALogEntry, - build_template_fields, calculate_boot_aggregate, calculate_expected_template_hash, calculate_pcr10, parse_ima_log_string, truncate_ima_log_by_pcr, - validate_ima_log_entry, ) SAMPLE_LINE = "10 8facace9d7255a1985e976e9bb59675f211c82de ima-ng sha256:088faac4777b024045bd578c5c3f8efc4ac2cafb4af90a12832a762feb58eb88 boot_aggregate" # noqa: E501 -SAMPLE_LINE_LONG_PATH = "10 842c66cec8b78a650d98e85cbbf0b67fc1a2a605 ima-ng sha256:cf06a09ff00ee3275779e83cf9a4037dd822ba9dc16442584212f605ba71e341 /usr/lib/modules/6.14.0-1017-azure-fde/kernel/fs/autofs/autofs4.ko.zst" # noqa: E501 - - -# --------------------------------------------------------------------------- -# IMALogEntry.from_string -# --------------------------------------------------------------------------- - - -class TestIMALogEntryFromString: - """Tests for IMALogEntry.from_string — parsing a single IMA log line.""" - - def test_basic(self): - """Parse a well-formed boot_aggregate line and verify every field.""" - entry = IMALogEntry.from_string(SAMPLE_LINE) - assert entry.pcr_idx == "10" - assert entry.template_hash == "8facace9d7255a1985e976e9bb59675f211c82de" - assert entry.template_name == "ima-ng" - assert entry.hash_algo == "sha256" - assert entry.file_hash == bytes.fromhex("088faac4777b024045bd578c5c3f8efc4ac2cafb4af90a12832a762feb58eb88") - assert entry.file_path == "boot_aggregate" - - def test_long_file_path(self): - """Parse a line with a long absolute file path.""" - entry = IMALogEntry.from_string(SAMPLE_LINE_LONG_PATH) - assert entry.file_path == ("/usr/lib/modules/6.14.0-1017-azure-fde/kernel/fs/autofs/autofs4.ko.zst") - - def test_file_path_with_spaces(self): - """Spaces after the 4th field are preserved as part of the file path.""" - line = "10 aaaa ima-ng sha256:bbbb /path/with spaces/file name.txt" - entry = IMALogEntry.from_string(line) - assert entry.file_path == "/path/with spaces/file name.txt" - - def test_too_few_parts_raises(self): - """Fewer than 5 space-separated fields must raise ValueError.""" - with pytest.raises(ValueError, match="Invalid IMA log entry"): - IMALogEntry.from_string("10 abc ima-ng") - - def test_bad_hash_format_raises(self): - """file_data field without a colon separator must raise ValueError.""" - with pytest.raises(ValueError, match="Invalid file_hash format"): - IMALogEntry.from_string("10 abc ima-ng nocolon /some/path") - - def test_bad_hex_digest_raises(self): - """Non-hex characters in the digest portion must raise ValueError.""" - with pytest.raises(ValueError, match="Invalid file_hash format"): - IMALogEntry.from_string("10 abc ima-ng sha256:ZZZZ /some/path") - - -# --------------------------------------------------------------------------- -# IMALogEntry.__str__ (round-trip) -# --------------------------------------------------------------------------- - - -class TestIMALogEntryStr: - """Tests for IMALogEntry.__str__ — round-trip fidelity.""" - - def test_round_trip(self): - """str(from_string(line)) must reproduce the original line.""" - entry = IMALogEntry.from_string(SAMPLE_LINE) - assert str(entry) == SAMPLE_LINE - - def test_round_trip_long_path(self): - """Round-trip with a long absolute file path.""" - entry = IMALogEntry.from_string(SAMPLE_LINE_LONG_PATH) - assert str(entry) == SAMPLE_LINE_LONG_PATH - - -# --------------------------------------------------------------------------- -# parse_ima_log_string -# --------------------------------------------------------------------------- - - -class TestParseImaLogString: - """Tests for parse_ima_log_string — multi-line IMA log parsing.""" - - def test_sample_file(self, sample_ima_log): - """Parse the full 32-entry sample log and spot-check first/last entries.""" - entries = parse_ima_log_string(sample_ima_log) - assert len(entries) == 32 - assert entries[0].file_path == "boot_aggregate" - assert entries[-1].file_path.endswith("tls.ko.zst") - - def test_empty_string(self): - """Empty input must produce an empty list.""" - assert not parse_ima_log_string("") - - def test_blank_lines_skipped(self): - """Blank lines (leading, trailing, between entries) are ignored.""" - log = "\n" + SAMPLE_LINE + "\n\n" + SAMPLE_LINE_LONG_PATH + "\n\n" - entries = parse_ima_log_string(log) - assert len(entries) == 2 - - def test_single_line(self): - """A single line (no trailing newline) is parsed correctly.""" - entries = parse_ima_log_string(SAMPLE_LINE) - assert len(entries) == 1 - assert entries[0].pcr_idx == "10" - - -# --------------------------------------------------------------------------- -# build_template_fields -# --------------------------------------------------------------------------- - - -class TestBuildTemplateFields: - """Tests for build_template_fields — ima-ng d-ng/n-ng field construction.""" - - def test_first_entry(self): - """Verify d-ng and n-ng field layout for the boot_aggregate entry.""" - entry = IMALogEntry.from_string(SAMPLE_LINE) - d_ng_content, d_ng_field, n_ng_content, n_ng_field = build_template_fields(entry) - - # d-ng content: "sha256:\x00" + digest_bytes - assert d_ng_content.startswith(b"sha256:\x00") - assert d_ng_content[8:] == entry.file_hash - - # d-ng field: little-endian uint32 length prefix + content - length = struct.unpack(" IMALogEntry: - return IMALogEntry( - pcr_idx="10", - template_hash="8facace9d7255a1985e976e9bb59675f211c82de", - template_name="ima-ng", - hash_algo="sha256", - file_hash=bytes.fromhex(hash_hex), - file_path=path, - ) - - -# --------------------------------------------------------------------------- -# load_policy -# --------------------------------------------------------------------------- - - -class TestLoadPolicy: - """Tests for load_policy — parsing YAML into AppraisalPolicy.""" - - def test_full_policy(self): - """Parse a policy with all four rule fields and preserve component order.""" - yaml_str = textwrap.dedent( - """ - kernel: - path: /usr/lib/modules/* - allow: [aaaa, bbbb, cccc] - userland: - path: /usr/bin/* - allow: [dddd] - userland2: - path: /usr/local/bin/* - deny: [eeee] - """ - ) - policy = load_policy(yaml_str) - assert [c.name for c in policy.components] == ["kernel", "userland", "userland2"] - kernel = policy.components[0] - assert kernel.path == "/usr/lib/modules/*" - assert kernel.allow == ["aaaa", "bbbb", "cccc"] - assert kernel.deny is None - userland = policy.components[1] - assert userland.path == "/usr/bin/*" - assert userland.allow == ["dddd"] - assert userland.deny is None - userland2 = policy.components[2] - assert userland2.path == "/usr/local/bin/*" - assert userland2.allow is None - assert userland2.deny == ["eeee"] - - def test_lowercases_hashes(self): - """Hashes are normalized to lowercase so callers can mix cases.""" - policy = load_policy("c:\n path: /x\n allow: [ABCDEF]\n") - assert policy.components[0].allow == ["abcdef"] - - def test_empty_yaml(self): - """An empty document is a policy with zero components.""" - assert load_policy("").components == [] - - def test_root_must_be_mapping(self): - """A non-mapping root document is rejected.""" - with pytest.raises(ValueError, match="root must be a mapping"): - load_policy("- not a mapping\n") - - def test_component_rules_must_be_mapping(self): - """Component rules must be a mapping.""" - with pytest.raises(ValueError, match="rules must be a mapping"): - load_policy("c: not-a-mapping\n") - - def test_missing_path(self): - """A component without 'path' is rejected.""" - with pytest.raises(ValueError, match="'path' is required"): - load_policy("c:\n allow: [abcd]\n") - - def test_allowlist_must_be_list_of_strings(self): - """allowlist must be a list of strings.""" - with pytest.raises(ValueError, match="'allow' must be a list of strings"): - load_policy("c:\n path: /x\n allow: 'not-a-list'\n") - - def test_load_policy_file(self, tmp_path): - """load_policy_file reads from disk and parses the same way.""" - f = tmp_path / "policy.yaml" - f.write_text("c:\n path: /x\n allow: [abcd]\n") - policy = load_policy_file(f) - assert policy.components[0].allow == ["abcd"] - - -# --------------------------------------------------------------------------- -# PolicyComponent / AppraisalPolicy.appraise -# --------------------------------------------------------------------------- - - -class TestPolicyComponentAppraise: - """Tests for the per-component verdict logic.""" - - def test_allow_hit(self): - """A hash in the allow set yields ALLOW.""" - c = PolicyComponent(name="c", path="*", allow={SAMPLE_HASH}) - assert c.appraise_hash(SAMPLE_HASH) is AppraisalResult.ALLOW - - def test_allow_unhit(self): - """A hash outside the allow set yields DENY.""" - c = PolicyComponent(name="c", path="*", allow={OTHER_HASH}) - assert c.appraise_hash(SAMPLE_HASH) is AppraisalResult.DENY - - def test_deny_hit(self): - """A hash in the deny set yields DENY.""" - c = PolicyComponent(name="c", path="*", deny={SAMPLE_HASH}) - assert c.appraise_hash(SAMPLE_HASH) is AppraisalResult.DENY - - def test_deny_unhit(self): - """A hash outside the deny set yields ALLOW.""" - c = PolicyComponent(name="c", path="*", deny={OTHER_HASH}) - assert c.appraise_hash(SAMPLE_HASH) is AppraisalResult.ALLOW - - -class TestAppraisalPolicy: - """Tests for AppraisalPolicy.appraise — path matching and ordering.""" - - def test_first_match_wins(self): - """The first component whose path matches decides the verdict.""" - first = PolicyComponent(name="first", path="/usr/*", allow={SAMPLE_HASH}) - second = PolicyComponent(name="second", path="/usr/bin/*", deny={SAMPLE_HASH}) - policy = AppraisalPolicy(components=[first, second]) - assert policy.appraise(_entry("/usr/bin/foo")) is AppraisalResult.ALLOW - - def test_first_match_wins_reverse_order(self): - """Reversing component order flips the verdict — confirms order matters.""" - first = PolicyComponent(name="first", path="/usr/bin/*", deny={SAMPLE_HASH}) - second = PolicyComponent(name="second", path="/usr/*", allow={SAMPLE_HASH}) - policy = AppraisalPolicy(components=[first, second]) - assert policy.appraise(_entry("/usr/bin/foo")) is AppraisalResult.DENY - - def test_no_matching_component_is_neutral(self): - """An entry whose path matches no component is NEUTRAL.""" - policy = AppraisalPolicy(components=[PolicyComponent(name="c", path="/etc/*", allow={SAMPLE_HASH})]) - assert policy.appraise(_entry("/usr/bin/foo")) is AppraisalResult.NEUTRAL - - def test_glob_question_mark(self): - """fnmatch '?' wildcards match a single character.""" - c = PolicyComponent(name="c", path="/a/?.bin", allow={SAMPLE_HASH}) - policy = AppraisalPolicy(components=[c]) - assert policy.appraise(_entry("/a/x.bin")) is AppraisalResult.ALLOW - assert policy.appraise(_entry("/a/xx.bin")) is AppraisalResult.NEUTRAL - - def test_empty_policy_is_neutral(self): - """An empty policy yields NEUTRAL for every entry.""" - assert AppraisalPolicy().appraise(_entry("/anything")) is AppraisalResult.NEUTRAL - - -# --------------------------------------------------------------------------- -# appraise_ima_log / verify_ima_log -# --------------------------------------------------------------------------- - - -class TestAppraiseImaLog: - """Tests for appraise_ima_log — classifying a list of entries.""" - - def test_returns_pairs_in_order(self): - """Result preserves input order and pairs each entry with its verdict.""" - entries = [ - _entry("/usr/bin/a"), - _entry("/usr/bin/b", OTHER_HASH), - _entry("/etc/x"), - ] - policy = AppraisalPolicy(components=[PolicyComponent(name="bin", path="/usr/bin/*", allow=[SAMPLE_HASH])]) - result = appraise_ima_log(entries, policy) - assert [r[0].file_path for r in result] == ["/usr/bin/a", "/usr/bin/b", "/etc/x"] - assert [r[1] for r in result] == [ - AppraisalResult.ALLOW, - AppraisalResult.DENY, - AppraisalResult.NEUTRAL, - ] - - def test_empty_entries(self): - """No entries yields an empty result list.""" - assert appraise_ima_log([], AppraisalPolicy()) == [] - - -class TestVerifyImaLog: - """Tests for verify_ima_log — boolean pass/fail across all entries.""" - - def test_all_allow_returns_true(self): - """All-allow log passes.""" - entries = [_entry("/usr/bin/a"), _entry("/usr/bin/b")] - policy = AppraisalPolicy(components=[PolicyComponent(name="bin", path="/usr/bin/*", allow={SAMPLE_HASH})]) - assert verify_ima_log(entries, policy) is True - - def test_neutral_entries_pass(self): - """Neutral entries (no matching component) do not fail verification.""" - entries = [_entry("/etc/x"), _entry("/usr/bin/a")] - policy = AppraisalPolicy(components=[PolicyComponent(name="bin", path="/usr/bin/*", allow={SAMPLE_HASH})]) - assert verify_ima_log(entries, policy) is True - - def test_single_deny_returns_false(self): - """A single deny in the log fails verification.""" - entries = [ - _entry("/usr/bin/good"), - _entry("/usr/bin/bad", OTHER_HASH), - ] - policy = AppraisalPolicy( - components=[PolicyComponent(name="bin", path="/usr/bin/*", allow={SAMPLE_HASH}, deny={OTHER_HASH})] - ) - assert verify_ima_log(entries, policy) is False - - def test_empty_log_passes(self): - """An empty log trivially passes.""" - assert verify_ima_log([], AppraisalPolicy()) is True - - def test_real_sample_with_wildcard_allowlist(self, sample_ima_log): - """A policy that allows every path/hash from the sample log passes.""" - entries = parse_ima_log_string(sample_ima_log) - all_hashes = {e.file_hash.hex() for e in entries} - policy = AppraisalPolicy(components=[PolicyComponent(name="all", path="*", allow=all_hashes)]) - assert verify_ima_log(entries, policy) is True - results = appraise_ima_log(entries, policy) - assert all(v is AppraisalResult.ALLOW for _, v in results) - - def test_real_sample_with_one_denied_hash(self, sample_ima_log): - """Denying any hash present in the sample log fails verification.""" - entries = parse_ima_log_string(sample_ima_log) - target_hash = entries[5].file_hash.hex() - policy = AppraisalPolicy(components=[PolicyComponent(name="all", path="*", deny={target_hash})]) - assert verify_ima_log(entries, policy) is False - - def test_load_policy_end_to_end(self, sample_ima_log): - """YAML-loaded policy integrates with parse_ima_log_string + verify.""" - entries = parse_ima_log_string(sample_ima_log) - boot_hash = entries[0].file_hash.hex() - yaml_str = textwrap.dedent( - f""" - boot: - path: boot_aggregate - allow: [{boot_hash}] - """ - ) - policy = load_policy(yaml_str) - results = dict((e.file_path, v) for e, v in appraise_ima_log(entries, policy)) - assert results["boot_aggregate"] is AppraisalResult.ALLOW