From 94f85875eb6d1743307a73ae256df3cfac7178cb Mon Sep 17 00:00:00 2001 From: John Belmonte Date: Tue, 16 Aug 2022 12:45:34 +0900 Subject: [PATCH 1/4] trio task hierarchy timer (WIP) --- tests/test_trio.py | 104 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 103 insertions(+), 1 deletion(-) diff --git a/tests/test_trio.py b/tests/test_trio.py index b7840bb..6d5fd0c 100644 --- a/tests/test_trio.py +++ b/tests/test_trio.py @@ -64,7 +64,109 @@ async def test_trio_perf_counter_time_sleep(): t0 = trio_perf_counter() time.sleep(.01) dt = trio_perf_counter() - t0 - assert dt > .008 + assert dt == pytest.approx(.01, rel=.2) + + +class _TaskHierarchyTimeInstrument(trio.abc.Instrument): + def __init__(self, time_fn=time.perf_counter): + self._time_fn = time_fn + self._root_task = trio_lowlevel.current_task() + self._hierarchy_tasks = {self._root_task} + self._descheduled_start = 0. + self._descheduled_elapsed = 0. + + # populate existing child tasks of the root + nurseries = set(self._root_task.child_nurseries) + while nurseries: + nursery: trio.Nursery = nurseries.pop() + for child_task in nursery.child_tasks: + self._hierarchy_tasks.add(child_task) + nurseries.update(child_task.child_nurseries) + + def task_spawned(self, task: trio_lowlevel.Task): + if task.parent_nursery and task.parent_nursery.parent_task in self._hierarchy_tasks: + self._hierarchy_tasks.add(task) + + def task_exited(self, task: trio_lowlevel.Task): + self._hierarchy_tasks.discard(task) + + def after_task_step(self, task: trio_lowlevel.Task): + if task in self._hierarchy_tasks: + self._descheduled_start = self._time_fn() + + def before_task_step(self, task: trio_lowlevel.Task): + if task in self._hierarchy_tasks: + self._descheduled_elapsed += self._time_fn() - self._descheduled_start + + def _finalize(self): + assert self._hierarchy_tasks == {self._root_task} + + def get_elapsed_descheduled_time(self): + return self._descheduled_elapsed + + +async def _work(duration, count=1): + for _ in range(count): + time.sleep(duration) + await trio.sleep(0) + + +async def test_trio_perf_counter_child(): + instrument = _TaskHierarchyTimeInstrument() + trio_lowlevel.add_instrument(instrument) + t0 = time.perf_counter() - instrument.get_elapsed_descheduled_time() + try: + async with trio.open_nursery() as nursery: + await _work(.05, 3) + + @nursery.start_soon + async def _child(): + async with trio.open_nursery() as nursery2: + @nursery2.start_soon + async def _child_child(): + await _work(.05, 5) + + await _work(.05, 5) + + @nursery.start_soon + async def _child_2(): + await _work(.05, 10) + finally: + trio_lowlevel.remove_instrument(instrument) + instrument._finalize() + dt = (time.perf_counter() - instrument.get_elapsed_descheduled_time()) - t0 + print('total time:', round(dt, 2)) + assert dt == pytest.approx(1.15, rel=.1) + + +async def test_trio_perf_counter_child_partial(): + try: + async with trio.open_nursery() as nursery: + @nursery.start_soon + async def _child(): + async with trio.open_nursery() as nursery2: + @nursery2.start_soon + async def _child_child(): + await _work(.1, 5) + + await _work(.1, 5) + + await trio.sleep(.5) + + instrument = _TaskHierarchyTimeInstrument() + trio_lowlevel.add_instrument(instrument) + t0 = time.perf_counter() - instrument.get_elapsed_descheduled_time() + + @nursery.start_soon + async def _child_2(): + await _work(.1, 10) + + finally: + trio.lowlevel.remove_instrument(instrument) + instrument._finalize() + dt = (time.perf_counter() - instrument.get_elapsed_descheduled_time()) - t0 + print('total time:', round(dt, 2)) + assert dt == pytest.approx(1.5, rel=.1) async def test_trio_perf_counter_unregister(): From f8aace47394149f2556771ea61a365c728fc75f1 Mon Sep 17 00:00:00 2001 From: John Belmonte Date: Tue, 16 Aug 2022 16:21:08 +0900 Subject: [PATCH 2/4] working trio_hierarchy_perf_counter() --- src/perf_timer/__init__.py | 2 +- src/perf_timer/_trio.py | 72 ++++++++++++++++- tests/test_trio.py | 161 +++++++++++++++---------------------- 3 files changed, 137 insertions(+), 98 deletions(-) diff --git a/src/perf_timer/__init__.py b/src/perf_timer/__init__.py index ffbf549..7211f94 100644 --- a/src/perf_timer/__init__.py +++ b/src/perf_timer/__init__.py @@ -1,7 +1,7 @@ from ._impl import (PerfTimer, ThreadPerfTimer, AverageObserver, StdDevObserver, HistogramObserver, measure_overhead) try: - from ._trio import trio_perf_counter, TrioPerfTimer + from ._trio import trio_perf_counter, trio_hierarchy_perf_counter, TrioPerfTimer except ImportError: pass from ._version import __version__ diff --git a/src/perf_timer/_trio.py b/src/perf_timer/_trio.py index 328dbe2..dba6600 100644 --- a/src/perf_timer/_trio.py +++ b/src/perf_timer/_trio.py @@ -1,6 +1,7 @@ from collections import defaultdict -from dataclasses import dataclass +from dataclasses import dataclass, field from time import perf_counter +from typing import Set import trio try: @@ -70,6 +71,75 @@ def trio_perf_counter(): return perf_counter() - _instrument.get_elapsed_descheduled_time(task) +@dataclass +class _HierarchyTimeInfo: + hierarchy_tasks: Set[trio_lowlevel.Task] = field(default_factory=set) + deschedule_start: float = 0 + elapsed_descheduled: float = 0 + + +class _HierarchyDescheduledTimeInstrument(trio.abc.Instrument): + """Trio instrument tracking elapsed descheduled time of given task and children""" + + def __init__(self, time_fn=perf_counter): + self._time_fn = time_fn + self._info_by_root_task = defaultdict(_HierarchyTimeInfo) + + def task_spawned(self, task: trio_lowlevel.Task): + # TODO: Maintain a global tree rather than a set per root, avoiding O(N) + # on task spawn and exit. (But then task steps are O(N)...) + if task.parent_nursery: + parent_task = task.parent_nursery.parent_task + for info in self._info_by_root_task.values(): + if parent_task in info.hierarchy_tasks: + info.hierarchy_tasks.add(task) + + def task_exited(self, task: trio_lowlevel.Task): + for info in self._info_by_root_task.values(): + info.hierarchy_tasks.discard(task) + + root_info = self._info_by_root_task.pop(task, None) + if root_info: + assert not root_info.hierarchy_tasks + if not self._info_by_root_task: + trio_lowlevel.remove_instrument(self) + + def after_task_step(self, task: trio_lowlevel.Task): + for info in self._info_by_root_task.values(): + if task in info.hierarchy_tasks: + info.descheduled_start = self._time_fn() + + def before_task_step(self, task: trio_lowlevel.Task): + for info in self._info_by_root_task.values(): + if task in info.hierarchy_tasks: + info.elapsed_descheduled += self._time_fn() - info.descheduled_start + + def get_elapsed_descheduled_time(self, task: trio_lowlevel.Task): + info = self._info_by_root_task[task] + hierarchy_tasks = info.hierarchy_tasks + + if not hierarchy_tasks: # newly tracked root + hierarchy_tasks.add(task) + # populate existing child tasks of the root + nurseries = set(task.child_nurseries) + while nurseries: + nursery: trio.Nursery = nurseries.pop() + for child_task in nursery.child_tasks: + hierarchy_tasks.add(child_task) + nurseries.update(child_task.child_nurseries) + + return info.elapsed_descheduled + + +_hierarchy_instrument = _HierarchyDescheduledTimeInstrument() + + +def trio_hierarchy_perf_counter(): + trio_lowlevel.add_instrument(_hierarchy_instrument) + task = trio_lowlevel.current_task() + return perf_counter() - _hierarchy_instrument.get_elapsed_descheduled_time(task) + + class TrioPerfTimer(PerfTimer): """Variant of PerfTimer which measures Trio task time diff --git a/tests/test_trio.py b/tests/test_trio.py index 6d5fd0c..f379d62 100644 --- a/tests/test_trio.py +++ b/tests/test_trio.py @@ -8,7 +8,13 @@ except ImportError: import trio.hazmat as trio_lowlevel -from perf_timer import trio_perf_counter, _trio, TrioPerfTimer, AverageObserver +from perf_timer import trio_perf_counter, _trio, TrioPerfTimer, AverageObserver, trio_hierarchy_perf_counter + + +async def _work(duration, count=1): + for _ in range(count): + time.sleep(duration) + await trio.sleep(0) async def test_descheduled_time_instrument(): @@ -62,118 +68,65 @@ async def _untracked_child(): async def test_trio_perf_counter_time_sleep(): # NOTE: subject to false pass due to reliance on wall time t0 = trio_perf_counter() - time.sleep(.01) + await _work(.1, 5) dt = trio_perf_counter() - t0 - assert dt == pytest.approx(.01, rel=.2) - - -class _TaskHierarchyTimeInstrument(trio.abc.Instrument): - def __init__(self, time_fn=time.perf_counter): - self._time_fn = time_fn - self._root_task = trio_lowlevel.current_task() - self._hierarchy_tasks = {self._root_task} - self._descheduled_start = 0. - self._descheduled_elapsed = 0. - - # populate existing child tasks of the root - nurseries = set(self._root_task.child_nurseries) - while nurseries: - nursery: trio.Nursery = nurseries.pop() - for child_task in nursery.child_tasks: - self._hierarchy_tasks.add(child_task) - nurseries.update(child_task.child_nurseries) + assert dt == pytest.approx(.5, rel=.15) - def task_spawned(self, task: trio_lowlevel.Task): - if task.parent_nursery and task.parent_nursery.parent_task in self._hierarchy_tasks: - self._hierarchy_tasks.add(task) - - def task_exited(self, task: trio_lowlevel.Task): - self._hierarchy_tasks.discard(task) - - def after_task_step(self, task: trio_lowlevel.Task): - if task in self._hierarchy_tasks: - self._descheduled_start = self._time_fn() - - def before_task_step(self, task: trio_lowlevel.Task): - if task in self._hierarchy_tasks: - self._descheduled_elapsed += self._time_fn() - self._descheduled_start - - def _finalize(self): - assert self._hierarchy_tasks == {self._root_task} - - def get_elapsed_descheduled_time(self): - return self._descheduled_elapsed +async def test_trio_perf_counter_child(): + t0 = trio_hierarchy_perf_counter() + async with trio.open_nursery() as nursery: + await _work(.05, 3) -async def _work(duration, count=1): - for _ in range(count): - time.sleep(duration) - await trio.sleep(0) + @nursery.start_soon + async def _child(): + async with trio.open_nursery() as nursery2: + @nursery2.start_soon + async def _child_child(): + await _work(.05, 5) + await _work(.05, 5) -async def test_trio_perf_counter_child(): - instrument = _TaskHierarchyTimeInstrument() - trio_lowlevel.add_instrument(instrument) - t0 = time.perf_counter() - instrument.get_elapsed_descheduled_time() - try: - async with trio.open_nursery() as nursery: - await _work(.05, 3) - - @nursery.start_soon - async def _child(): - async with trio.open_nursery() as nursery2: - @nursery2.start_soon - async def _child_child(): - await _work(.05, 5) + @nursery.start_soon + async def _child_2(): + await _work(.05, 10) - await _work(.05, 5) + dt = trio_hierarchy_perf_counter() - t0 + assert dt == pytest.approx(1.15, rel=.15) - @nursery.start_soon - async def _child_2(): - await _work(.05, 10) - finally: - trio_lowlevel.remove_instrument(instrument) - instrument._finalize() - dt = (time.perf_counter() - instrument.get_elapsed_descheduled_time()) - t0 - print('total time:', round(dt, 2)) - assert dt == pytest.approx(1.15, rel=.1) - - -async def test_trio_perf_counter_child_partial(): - try: - async with trio.open_nursery() as nursery: - @nursery.start_soon - async def _child(): - async with trio.open_nursery() as nursery2: - @nursery2.start_soon - async def _child_child(): - await _work(.1, 5) +async def test_trio_hierarchy_perf_counter_active_children(): + # trio_hierarchy_perf_counter() should work even if the task already has children + async with trio.open_nursery() as nursery: + @nursery.start_soon + async def _child(): + async with trio.open_nursery() as nursery2: + @nursery2.start_soon + async def _child_child(): await _work(.1, 5) - await trio.sleep(.5) + await _work(.1, 5) - instrument = _TaskHierarchyTimeInstrument() - trio_lowlevel.add_instrument(instrument) - t0 = time.perf_counter() - instrument.get_elapsed_descheduled_time() + await trio.sleep(.5) + t0 = trio_hierarchy_perf_counter() - @nursery.start_soon - async def _child_2(): - await _work(.1, 10) + @nursery.start_soon + async def _child_2(): + await _work(.1, 10) - finally: - trio.lowlevel.remove_instrument(instrument) - instrument._finalize() - dt = (time.perf_counter() - instrument.get_elapsed_descheduled_time()) - t0 - print('total time:', round(dt, 2)) - assert dt == pytest.approx(1.5, rel=.1) + dt = trio_hierarchy_perf_counter() - t0 + assert dt == pytest.approx(1.5, rel=.15) -async def test_trio_perf_counter_unregister(): +@pytest.mark.parametrize('counter_fn, instrument', [ + (trio_perf_counter, _trio._instrument), + (trio_hierarchy_perf_counter, _trio._hierarchy_instrument), +]) +async def test_trio_perf_counter_unregister(counter_fn, instrument: trio.abc.Instrument): async def perf_counter_with_trio_sleep(): - trio_perf_counter() + counter_fn() await trio.sleep(0) - trio_perf_counter() + counter_fn() async with trio.open_nursery() as nursery: nursery.start_soon(perf_counter_with_trio_sleep) @@ -183,7 +136,7 @@ async def perf_counter_with_trio_sleep(): # the Trio instrumentation to no longer be active (so remove call # will fail). with pytest.raises(KeyError): - trio_lowlevel.remove_instrument(_trio._instrument) + trio_lowlevel.remove_instrument(instrument) async def test_trio_perf_timer(autojump_clock): @@ -218,3 +171,19 @@ async def foo(): assert timer._sum == 15 assert timer._max == 10 del timer + + +# TODO: test nested perf timers on the same task +# async def _foo(): +# with TrioPerfTimer('timer1'): +# async with trio.open_nursery() as nursery: +# @nursery.start_soon +# async def _child_1(): +# pass # do work +# +# # do work +# +# with TrioPerfTimer('timer2'): +# @nursery.start_soon +# async def _child_2(): +# pass # do work From 3057739d20e2734c8f7a1ae0f3364047fa7320e3 Mon Sep 17 00:00:00 2001 From: John Belmonte Date: Tue, 16 Aug 2022 21:50:56 +0900 Subject: [PATCH 3/4] change instrument scheme to scale with number of tracked tasks --- benchmarks/overhead.py | 4 +- src/perf_timer/__init__.py | 3 +- src/perf_timer/_trio.py | 82 ++++++++++++++++---------------------- tests/test_trio.py | 5 ++- 4 files changed, 41 insertions(+), 53 deletions(-) diff --git a/benchmarks/overhead.py b/benchmarks/overhead.py index 01acc00..a287754 100644 --- a/benchmarks/overhead.py +++ b/benchmarks/overhead.py @@ -21,7 +21,7 @@ from perf_timer import (PerfTimer, ThreadPerfTimer, TrioPerfTimer, AverageObserver, StdDevObserver, HistogramObserver, - measure_overhead) + measure_overhead, TrioHierarchyPerfTimer) from perf_timer._impl import _format_duration @@ -39,7 +39,7 @@ async def main(): print() print('compare types:') observer = default_observer - for timer_type in (PerfTimer, ThreadPerfTimer, TrioPerfTimer): + for timer_type in (PerfTimer, ThreadPerfTimer, TrioPerfTimer, TrioHierarchyPerfTimer): duration = measure_overhead(partial(timer_type, observer=observer)) item = f'{timer_type.__name__}(observer={observer.__name__}):' print(f' {item:45s}{_format(duration)}') diff --git a/src/perf_timer/__init__.py b/src/perf_timer/__init__.py index 7211f94..84e2ab6 100644 --- a/src/perf_timer/__init__.py +++ b/src/perf_timer/__init__.py @@ -1,7 +1,8 @@ from ._impl import (PerfTimer, ThreadPerfTimer, AverageObserver, StdDevObserver, HistogramObserver, measure_overhead) try: - from ._trio import trio_perf_counter, trio_hierarchy_perf_counter, TrioPerfTimer + from ._trio import (trio_perf_counter, trio_hierarchy_perf_counter, + TrioPerfTimer, TrioHierarchyPerfTimer) except ImportError: pass from ._version import __version__ diff --git a/src/perf_timer/_trio.py b/src/perf_timer/_trio.py index dba6600..1b5fa89 100644 --- a/src/perf_timer/_trio.py +++ b/src/perf_timer/_trio.py @@ -1,7 +1,6 @@ from collections import defaultdict -from dataclasses import dataclass, field +from dataclasses import dataclass from time import perf_counter -from typing import Set import trio try: @@ -71,64 +70,44 @@ def trio_perf_counter(): return perf_counter() - _instrument.get_elapsed_descheduled_time(task) -@dataclass -class _HierarchyTimeInfo: - hierarchy_tasks: Set[trio_lowlevel.Task] = field(default_factory=set) - deschedule_start: float = 0 - elapsed_descheduled: float = 0 - - class _HierarchyDescheduledTimeInstrument(trio.abc.Instrument): - """Trio instrument tracking elapsed descheduled time of given task and children""" + """Trio instrument tracking elapsed descheduled time of given task and children + + The implementation is similar to _DescheduledTimeInstrument, except on task + steps we walk up the task tree to find tracked tasks. This probably has + significant overhead since it must be done even for untracked hierarchies. + """ def __init__(self, time_fn=perf_counter): self._time_fn = time_fn - self._info_by_root_task = defaultdict(_HierarchyTimeInfo) - - def task_spawned(self, task: trio_lowlevel.Task): - # TODO: Maintain a global tree rather than a set per root, avoiding O(N) - # on task spawn and exit. (But then task steps are O(N)...) - if task.parent_nursery: - parent_task = task.parent_nursery.parent_task - for info in self._info_by_root_task.values(): - if parent_task in info.hierarchy_tasks: - info.hierarchy_tasks.add(task) + self._info_by_root_task = defaultdict(_TimeInfo) + + def _parents_info(self, task: trio_lowlevel.Task): + while True: + info = self._info_by_root_task.get(task) + if info: + yield info + parent_nursery = task.parent_nursery + if parent_nursery: + task = parent_nursery.parent_task + else: + break def task_exited(self, task: trio_lowlevel.Task): - for info in self._info_by_root_task.values(): - info.hierarchy_tasks.discard(task) - - root_info = self._info_by_root_task.pop(task, None) - if root_info: - assert not root_info.hierarchy_tasks - if not self._info_by_root_task: - trio_lowlevel.remove_instrument(self) + if self._info_by_root_task.pop(task, None) and not self._info_by_root_task: + trio_lowlevel.remove_instrument(self) def after_task_step(self, task: trio_lowlevel.Task): - for info in self._info_by_root_task.values(): - if task in info.hierarchy_tasks: - info.descheduled_start = self._time_fn() + # TODO: maintain global "tracked tasks" to provide a fast path? + for info in self._parents_info(task): + info.deschedule_start = self._time_fn() def before_task_step(self, task: trio_lowlevel.Task): - for info in self._info_by_root_task.values(): - if task in info.hierarchy_tasks: - info.elapsed_descheduled += self._time_fn() - info.descheduled_start + for info in self._parents_info(task): + info.elapsed_descheduled += self._time_fn() - info.deschedule_start def get_elapsed_descheduled_time(self, task: trio_lowlevel.Task): - info = self._info_by_root_task[task] - hierarchy_tasks = info.hierarchy_tasks - - if not hierarchy_tasks: # newly tracked root - hierarchy_tasks.add(task) - # populate existing child tasks of the root - nurseries = set(task.child_nurseries) - while nurseries: - nursery: trio.Nursery = nurseries.pop() - for child_task in nursery.child_tasks: - hierarchy_tasks.add(child_task) - nurseries.update(child_task.child_nurseries) - - return info.elapsed_descheduled + return self._info_by_root_task[task].elapsed_descheduled _hierarchy_instrument = _HierarchyDescheduledTimeInstrument() @@ -167,3 +146,10 @@ async def foo(): def __init__(self, name, time_fn=trio_perf_counter, **kwargs): super().__init__(name, time_fn=time_fn, **kwargs) + + +# TODO: The shorter trio_perf_counter and TrioPerfTimer names should include +# hierarchy, and non-hierarchy variants relegated to longer names. +class TrioHierarchyPerfTimer(TrioPerfTimer): + def __init__(self, name, time_fn=trio_hierarchy_perf_counter, **kwargs): + super().__init__(name, time_fn=time_fn, **kwargs) diff --git a/tests/test_trio.py b/tests/test_trio.py index f379d62..00a6c86 100644 --- a/tests/test_trio.py +++ b/tests/test_trio.py @@ -8,7 +8,8 @@ except ImportError: import trio.hazmat as trio_lowlevel -from perf_timer import trio_perf_counter, _trio, TrioPerfTimer, AverageObserver, trio_hierarchy_perf_counter +from perf_timer import (trio_perf_counter, trio_hierarchy_perf_counter, _trio, + TrioPerfTimer, AverageObserver) async def _work(duration, count=1): @@ -73,7 +74,7 @@ async def test_trio_perf_counter_time_sleep(): assert dt == pytest.approx(.5, rel=.15) -async def test_trio_perf_counter_child(): +async def test_trio_hierarchy_perf_counter(): t0 = trio_hierarchy_perf_counter() async with trio.open_nursery() as nursery: await _work(.05, 3) From 619cb99e7720b74747fa6cc457b94a9aad778288 Mon Sep 17 00:00:00 2001 From: John Belmonte Date: Tue, 16 Aug 2022 22:54:18 +0900 Subject: [PATCH 4/4] minimize time_fn calls --- src/perf_timer/_trio.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/perf_timer/_trio.py b/src/perf_timer/_trio.py index 1b5fa89..2d32f70 100644 --- a/src/perf_timer/_trio.py +++ b/src/perf_timer/_trio.py @@ -82,11 +82,13 @@ def __init__(self, time_fn=perf_counter): self._time_fn = time_fn self._info_by_root_task = defaultdict(_TimeInfo) - def _parents_info(self, task: trio_lowlevel.Task): + def _parents_info(self, task: trio_lowlevel.Task, time_fn): + t = 0 while True: info = self._info_by_root_task.get(task) if info: - yield info + t = t or time_fn() + yield info, t parent_nursery = task.parent_nursery if parent_nursery: task = parent_nursery.parent_task @@ -99,12 +101,12 @@ def task_exited(self, task: trio_lowlevel.Task): def after_task_step(self, task: trio_lowlevel.Task): # TODO: maintain global "tracked tasks" to provide a fast path? - for info in self._parents_info(task): - info.deschedule_start = self._time_fn() + for info, t in self._parents_info(task, self._time_fn): + info.deschedule_start = t def before_task_step(self, task: trio_lowlevel.Task): - for info in self._parents_info(task): - info.elapsed_descheduled += self._time_fn() - info.deschedule_start + for info, t in self._parents_info(task, self._time_fn): + info.elapsed_descheduled += t - info.deschedule_start def get_elapsed_descheduled_time(self, task: trio_lowlevel.Task): return self._info_by_root_task[task].elapsed_descheduled