From e38054b0e1e86aefabc0cc1fb2062a3cd541a215 Mon Sep 17 00:00:00 2001 From: "Gabriele N. Tornetta" Date: Thu, 30 Apr 2026 14:53:57 +0100 Subject: [PATCH] feat: add mojo-compress tool We add a tool to compress MOJO files, similar to austin-compress for the legacy austin collapsed stack format. --- austin/format/mojo.py | 5 +- austin/format/mojo_compress.py | 122 ++++++++++++ pyproject.toml | 1 + test/format/test_mojo_compress.py | 303 ++++++++++++++++++++++++++++++ 4 files changed, 428 insertions(+), 3 deletions(-) create mode 100644 austin/format/mojo_compress.py create mode 100644 test/format/test_mojo_compress.py diff --git a/austin/format/mojo.py b/austin/format/mojo.py index 4cad82f..adc5d4f 100644 --- a/austin/format/mojo.py +++ b/austin/format/mojo.py @@ -1,6 +1,5 @@ import abc import asyncio -import io import typing as t from dataclasses import dataclass from dataclasses import field @@ -917,7 +916,7 @@ def write(self, event: AustinEvent) -> int: ... # noqa: E704 class MojoStreamWriter(BaseMojoStreamWriter): """MOJO stream writer.""" - def __init__(self, mojo: io.BytesIO) -> None: + def __init__(self, mojo: t.BinaryIO) -> None: super().__init__(mojo) mojo.write(self.HEADER) @@ -948,7 +947,7 @@ def write(self, event: AustinEvent) -> int: for frame in frames: size += self.mojo.write(MojoFrameReference(frame).to_bytes()) - if self._gc: + if event.gc: size += self.mojo.write(bytes([MojoEvents.GC])) if self._mode == "full": diff --git a/austin/format/mojo_compress.py b/austin/format/mojo_compress.py new file mode 100644 index 0000000..e275316 --- /dev/null +++ b/austin/format/mojo_compress.py @@ -0,0 +1,122 @@ +# This file is part of "austin-python" which is released under GPL. +# +# See file LICENCE or go to http://www.gnu.org/licenses/ for full license +# details. +# +# austin-python is a Python wrapper around Austin, the CPython frame stack +# sampler. +# +# Copyright (c) 2018-2020 Gabriele N. Tornetta . +# All rights reserved. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import typing as t +from io import BytesIO + +from austin.events import AustinMetadata +from austin.events import AustinMetrics +from austin.events import AustinSample +from austin.format.mojo import MojoStreamReader +from austin.format.mojo import MojoStreamWriter + + +def compress(source: t.BinaryIO, dest: t.BinaryIO) -> None: + """Compress a MOJO source stream. + + Aggregates all metrics on equal stacks, where equality is determined by + pid, interpreter id, thread, frames, gc flag, and idle flag. + """ + time_stats: t.Dict[AustinSample.Key, int] = {} + memory_stats: t.Dict[AustinSample.Key, int] = {} + metadata: t.List[AustinMetadata] = [] + samples: t.Dict[AustinSample.Key, AustinSample] = {} + + reader = MojoStreamReader(source) + + for event in reader: + if isinstance(event, AustinMetadata): + metadata.append(event) + continue + + assert isinstance(event, AustinSample) + + key = event.key() + if key not in samples: + samples[key] = event + if event.metrics.time is not None: + time_stats[key] = time_stats.get(key, 0) + event.metrics.time + if event.metrics.memory is not None: + memory_stats[key] = memory_stats.get(key, 0) + event.metrics.memory + + writer = MojoStreamWriter(dest) + + for m in metadata: + writer.write(m) + + for key in samples: + writer.write( + AustinSample.from_key_and_metrics( + key, + AustinMetrics( + time=time_stats.get(key), + memory=memory_stats.get(key), + ), + ) + ) + + +def main() -> None: + """mojo-compress entry point.""" + from argparse import ArgumentParser + + arg_parser = ArgumentParser( + prog="mojo-compress", + description=( + "Compress a MOJO sample file by aggregating the collected samples." + ), + ) + + arg_parser.add_argument( + "input", + type=str, + help="The input MOJO file.", + ) + arg_parser.add_argument( + "output", + type=str, + help="The output MOJO file; defaults to the input file for in-place compression.", + nargs="?", + default=None, + ) + + arg_parser.add_argument("-V", "--version", action="version", version="0.1.0") + + args = arg_parser.parse_args() + + try: + with open(args.input, "rb") as fin: + buffer = BytesIO() + compress(fin, buffer) + + output_path = args.output or args.input + with open(output_path, "wb") as fout: + fout.write(buffer.getvalue()) + + except FileNotFoundError: + print(f"No such input file: {args.input}") + exit(1) + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml index 76feb05..6bd082b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -65,6 +65,7 @@ austin2speedscope = "austin.format.speedscope:main" mojo2austin = "austin.format.collapsed_stack:main" austinp-resolve = "austin.tools.resolve:main" mojodbg = "austin.tools.mojodbg:main" +mojo-compress = "austin.format.mojo_compress:main" [tool.hatch.envs.tests] template = "tests" diff --git a/test/format/test_mojo_compress.py b/test/format/test_mojo_compress.py new file mode 100644 index 0000000..eb9df84 --- /dev/null +++ b/test/format/test_mojo_compress.py @@ -0,0 +1,303 @@ +# This file is part of "austin-python" which is released under GPL. +# +# See file LICENCE or go to http://www.gnu.org/licenses/ for full license +# details. +# +# austin-python is a Python wrapper around Austin, the CPython frame stack +# sampler. +# +# Copyright (c) 2018-2020 Gabriele N. Tornetta . +# All rights reserved. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from io import BytesIO +from pathlib import Path +from unittest.mock import patch + +import pytest + +from austin.events import AustinFrame +from austin.events import AustinMetadata +from austin.events import AustinMetrics +from austin.events import AustinSample +from austin.format.mojo import MojoStreamReader +from austin.format.mojo import MojoStreamWriter +from austin.format.mojo_compress import compress +from austin.format.mojo_compress import main + + +def make_mojo(events: list) -> BytesIO: + buf = BytesIO() + writer = MojoStreamWriter(buf) + for event in events: + writer.write(event) + buf.seek(0) + return buf + + +FRAME_A = AustinFrame( + filename="mod.py", function="foo", line=1, line_end=0, column=0, column_end=0 +) +FRAME_B = AustinFrame( + filename="mod.py", function="bar", line=2, line_end=0, column=0, column_end=0 +) + + +def read_events(buf: BytesIO) -> list: + buf.seek(0) + return list(MojoStreamReader(buf)) + + +def test_compress_reduces_sample_count(datapath: Path): + with (datapath / "test.mojo").open("rb") as f: + out = BytesIO() + compress(f, out) + + out.seek(0) + reader = MojoStreamReader(out) + reader.unwind() + assert len(reader.samples) < 13227 + + +def test_compress_preserves_metadata(datapath: Path): + with (datapath / "test.mojo").open("rb") as f: + out = BytesIO() + compress(f, out) + + out.seek(0) + reader = MojoStreamReader(out) + reader.unwind() + assert reader.metadata["mode"] == "wall" + + +def test_compress_aggregates_time(): + src = make_mojo( + [ + AustinMetadata("mode", "wall"), + AustinSample( + pid=1, + iid=0, + thread="T1", + metrics=AustinMetrics(time=100), + frames=(FRAME_A,), + gc=None, + idle=None, + ), + AustinSample( + pid=1, + iid=0, + thread="T1", + metrics=AustinMetrics(time=200), + frames=(FRAME_A,), + gc=None, + idle=None, + ), + AustinSample( + pid=1, + iid=0, + thread="T1", + metrics=AustinMetrics(time=50), + frames=(FRAME_B,), + gc=None, + idle=None, + ), + ] + ) + + out = BytesIO() + compress(src, out) + + events = read_events(out) + samples = [e for e in events if isinstance(e, AustinSample)] + + assert len(samples) == 2 + by_frame = {s.frames[0].function: s for s in samples} + assert by_frame["foo"].metrics.time == 300 + assert by_frame["bar"].metrics.time == 50 + + +def test_compress_aggregates_memory(): + src = make_mojo( + [ + AustinMetadata("mode", "memory"), + AustinSample( + pid=1, + iid=0, + thread="T1", + metrics=AustinMetrics(memory=1024), + frames=(FRAME_A,), + gc=None, + idle=None, + ), + AustinSample( + pid=1, + iid=0, + thread="T1", + metrics=AustinMetrics(memory=512), + frames=(FRAME_A,), + gc=None, + idle=None, + ), + ] + ) + + out = BytesIO() + compress(src, out) + + samples = [e for e in read_events(out) if isinstance(e, AustinSample)] + assert len(samples) == 1 + assert samples[0].metrics.memory == 1536 + assert samples[0].metrics.time is None + + +def test_compress_aggregates_full_mode(): + src = make_mojo( + [ + AustinMetadata("mode", "full"), + AustinSample( + pid=1, + iid=0, + thread="T1", + metrics=AustinMetrics(time=100, memory=512), + frames=(FRAME_A,), + gc=False, + idle=False, + ), + AustinSample( + pid=1, + iid=0, + thread="T1", + metrics=AustinMetrics(time=200, memory=256), + frames=(FRAME_A,), + gc=False, + idle=False, + ), + ] + ) + + out = BytesIO() + compress(src, out) + + samples = [e for e in read_events(out) if isinstance(e, AustinSample)] + assert len(samples) == 1 + assert samples[0].metrics.time == 300 + assert samples[0].metrics.memory == 768 + + +def test_compress_keeps_gc_and_idle_separate(): + src = make_mojo( + [ + AustinMetadata("mode", "full"), + AustinMetadata("gc", "on"), + AustinSample( + pid=1, + iid=0, + thread="T1", + metrics=AustinMetrics(time=100, memory=0), + frames=(FRAME_A,), + gc=False, + idle=False, + ), + AustinSample( + pid=1, + iid=0, + thread="T1", + metrics=AustinMetrics(time=50, memory=0), + frames=(FRAME_A,), + gc=True, + idle=False, + ), + AustinSample( + pid=1, + iid=0, + thread="T1", + metrics=AustinMetrics(time=75, memory=0), + frames=(FRAME_A,), + gc=False, + idle=True, + ), + ] + ) + + out = BytesIO() + compress(src, out) + + samples = [e for e in read_events(out) if isinstance(e, AustinSample)] + assert len(samples) == 3 + by_flags = {(s.gc, s.idle): s for s in samples} + assert by_flags[(False, False)].metrics.time == 100 + assert by_flags[(True, False)].metrics.time == 50 + assert by_flags[(False, True)].metrics.time == 75 + + +def test_compress_keeps_threads_separate(): + src = make_mojo( + [ + AustinMetadata("mode", "wall"), + AustinSample( + pid=1, + iid=0, + thread="T1", + metrics=AustinMetrics(time=100), + frames=(FRAME_A,), + gc=None, + idle=None, + ), + AustinSample( + pid=1, + iid=0, + thread="T2", + metrics=AustinMetrics(time=200), + frames=(FRAME_A,), + gc=None, + idle=None, + ), + ] + ) + + out = BytesIO() + compress(src, out) + + samples = [e for e in read_events(out) if isinstance(e, AustinSample)] + assert len(samples) == 2 + + +def test_compress_main(datapath: Path, tmp_path: Path): + output = tmp_path / "compressed.mojo" + with patch("sys.argv", ["mojo-compress", str(datapath / "test.mojo"), str(output)]): + main() + assert output.exists() + assert output.stat().st_size > 0 + + +def test_compress_main_inplace(datapath: Path, tmp_path: Path): + import shutil + + copy = tmp_path / "test.mojo" + shutil.copy(datapath / "test.mojo", copy) + original_size = copy.stat().st_size + + with patch("sys.argv", ["mojo-compress", str(copy)]): + main() + + assert copy.stat().st_size < original_size + + +def test_compress_main_missing_file(tmp_path: Path): + missing = tmp_path / "nonexistent.mojo" + output = tmp_path / "out.mojo" + with patch("sys.argv", ["mojo-compress", str(missing), str(output)]): + with pytest.raises(SystemExit) as exc_info: + main() + assert exc_info.value.code == 1