From c74cd51ea85ff0e4fd9a28ad9ebb3c196d807d5b Mon Sep 17 00:00:00 2001 From: lucila Date: Mon, 6 Apr 2026 16:46:00 -0300 Subject: [PATCH 1/4] fix an issue when a full page is filtered out but more elements are available in the inner stream --- .../Streams/CursorlessFilteredStream.php | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/lib/Tumblr/StreamBuilder/Streams/CursorlessFilteredStream.php b/lib/Tumblr/StreamBuilder/Streams/CursorlessFilteredStream.php index 6e7bacf..2c9111d 100644 --- a/lib/Tumblr/StreamBuilder/Streams/CursorlessFilteredStream.php +++ b/lib/Tumblr/StreamBuilder/Streams/CursorlessFilteredStream.php @@ -53,6 +53,9 @@ final class CursorlessFilteredStream extends WrapStream */ private float $overfetch_ratio; + /** @var bool */ + private bool $skip_empty_pages; + /** * @param Stream $inner The stream to filter. * @param StreamFilter $filter The filter to apply to the stream. @@ -64,13 +67,17 @@ final class CursorlessFilteredStream extends WrapStream * is relatively cheap to over-enumerate, you can crank this up to preemptively over-fetch. * A value of zero means to not over-fetch, whereas a value of 1.0 means to fetch double the * number of results requested. No more than $count results will be returned. + * @param bool|null $skip_empty_pages If true (default), fully-filtered pages do not count + * against the retry budget, allowing the stream to scan past long runs of filtered content. + * Set to `false` to count every inner enumeration as a retry. */ public function __construct( Stream $inner, StreamFilter $filter, string $identity, ?int $retry_count = null, - ?float $overfetch_ratio = null + ?float $overfetch_ratio = null, + ?bool $skip_empty_pages = true ) { if (is_null($retry_count)) { $retry_count = FilteredStream::DEFAULT_RETRY_COUNT; @@ -82,6 +89,7 @@ public function __construct( $this->filter = $filter; $this->retry_count = $retry_count; $this->overfetch_ratio = $overfetch_ratio; + $this->skip_empty_pages = $skip_empty_pages ?? true; } /** @@ -96,6 +104,7 @@ public function to_template(): array 'stream_filter' => $this->filter->to_template(), 'retry_count' => $this->retry_count, 'overfetch_ratio' => $this->overfetch_ratio, + 'skip_empty_pages' => $this->skip_empty_pages, ]; } @@ -113,7 +122,8 @@ public static function from_template(StreamContext $context): self $filter, $context->get_current_identity(), $context->get_optional_property('retry_count'), - $context->get_optional_property('overfetch_ratio') + $context->get_optional_property('overfetch_ratio'), + $context->get_optional_property('skip_empty_pages', true) ); } @@ -190,10 +200,13 @@ private function _filter_rec( $tracer && $tracer->filter_retry($this, $inner_cursor, $retry_cursor, $depth, $want_count, $inner_result->get_size(), count($retained)); + // Don't count fully-filtered pages against retry budget + $new_depth = count($retained) === 0 && $this->skip_empty_pages ? $depth : $depth - 1; + $rec_result = $this->_filter_rec( $want_count - count($retained), $retry_cursor, - $depth - 1, + $new_depth, $tracer, $option, $propagated_is_exhaustive From f7f794943828c410fa3140dd369e75f48d6f1f85 Mon Sep 17 00:00:00 2001 From: lucila Date: Mon, 6 Apr 2026 16:53:49 -0300 Subject: [PATCH 2/4] unit tests for cursorless filtered stream --- .../Streams/CursorlessFilteredStreamTest.php | 106 ++++++++++++++++++ 1 file changed, 106 insertions(+) diff --git a/tests/unit/Tumblr/StreamBuilder/Streams/CursorlessFilteredStreamTest.php b/tests/unit/Tumblr/StreamBuilder/Streams/CursorlessFilteredStreamTest.php index 77cff85..c0b1098 100644 --- a/tests/unit/Tumblr/StreamBuilder/Streams/CursorlessFilteredStreamTest.php +++ b/tests/unit/Tumblr/StreamBuilder/Streams/CursorlessFilteredStreamTest.php @@ -138,6 +138,112 @@ public function provide_final_retry_exhaustion_cases(): iterable yield 'inner_exhausted' => [true, true]; } + /** + * Tests that skip_empty_pages (default true) allows scanning past fully-filtered pages. + * + * Simulates 3 consecutive batches where all elements are filtered out, + * followed by a batch with a passing element. With retry_count=1 and + * skip_empty_pages=true, the stream should reach the 4th batch because + * empty pages don't consume retries. + */ + public function test_skip_empty_pages_continues_past_filtered_batches() + { + $keep = new MockedPostRefElement(1, 100); + $drop1 = new MockedPostRefElement(2, 200); + $drop2 = new MockedPostRefElement(3, 300); + $drop3 = new MockedPostRefElement(4, 400); + + $inner_stream = $this->createMock(Stream::class); + $inner_stream->expects($this->exactly(4)) + ->method('_enumerate') + ->willReturnOnConsecutiveCalls( + new StreamResult(false, [$drop1]), + new StreamResult(false, [$drop2]), + new StreamResult(false, [$drop3]), + new StreamResult(true, [$keep]) + ); + + $filter = $this->createMock(StreamFilter::class); + $filter->expects($this->exactly(4)) + ->method('filter_inner') + ->willReturnOnConsecutiveCalls( + new StreamFilterResult([], [$drop1]), + new StreamFilterResult([], [$drop2]), + new StreamFilterResult([], [$drop3]), + new StreamFilterResult([$keep], []) + ); + + // retry_count=1, skip_empty_pages=true (default) + $filtered_stream = new CursorlessFilteredStream($inner_stream, $filter, 'test', 1, 0.0); + $result = $filtered_stream->enumerate(1); + + $this->assertSame(1, $result->get_size()); + $this->assertSame($keep, $result->get_elements()[0]->get_original_element()); + } + + /** + * Tests that skip_empty_pages=false preserves the old behavior where + * fully-filtered pages consume retries. + * + * With retry_count=1, only 2 batches are fetched. Both are fully filtered, + * so the result is empty. + */ + public function test_skip_empty_pages_false_preserves_old_behavior() + { + $drop1 = new MockedPostRefElement(1, 100); + $drop2 = new MockedPostRefElement(2, 200); + + $inner_stream = $this->createMock(Stream::class); + $inner_stream->expects($this->exactly(2)) + ->method('_enumerate') + ->willReturnOnConsecutiveCalls( + new StreamResult(false, [$drop1]), + new StreamResult(false, [$drop2]) + ); + + $filter = $this->createMock(StreamFilter::class); + $filter->expects($this->exactly(2)) + ->method('filter_inner') + ->willReturnOnConsecutiveCalls( + new StreamFilterResult([], [$drop1]), + new StreamFilterResult([], [$drop2]) + ); + + $filtered_stream = new CursorlessFilteredStream($inner_stream, $filter, 'test', 1, 0.0, false); + $result = $filtered_stream->enumerate(1); + + $this->assertSame(0, $result->get_size()); + $this->assertTrue($result->is_exhaustive()); + } + + /** + * Tests that skip_empty_pages terminates when the inner stream is exhausted, + * even if retry budget has not been consumed. + */ + public function test_skip_empty_pages_terminates_on_inner_exhaustion() + { + $drop = new MockedPostRefElement(1, 100); + + $inner_stream = $this->createMock(Stream::class); + $inner_stream->expects($this->exactly(2)) + ->method('_enumerate') + ->willReturnOnConsecutiveCalls( + new StreamResult(false, [$drop]), + new StreamResult(true, []) + ); + + $filter = $this->createMock(StreamFilter::class); + $filter->expects($this->once()) + ->method('filter_inner') + ->willReturn(new StreamFilterResult([], [$drop])); + + $filtered_stream = new CursorlessFilteredStream($inner_stream, $filter, 'test', 2, 0.0); + $result = $filtered_stream->enumerate(1); + + $this->assertSame(0, $result->get_size()); + $this->assertTrue($result->is_exhaustive()); + } + /** * Redo the dependency bag injection. * @return void From 59f2531de017754b053790fef0582411846cba27 Mon Sep 17 00:00:00 2001 From: lucila Date: Mon, 6 Apr 2026 17:14:23 -0300 Subject: [PATCH 3/4] adjust unit tests --- .../StreamBuilder/Streams/CursorlessFilteredStream.php | 2 +- .../StreamBuilder/Streams/CursorlessFilteredStreamTest.php | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/lib/Tumblr/StreamBuilder/Streams/CursorlessFilteredStream.php b/lib/Tumblr/StreamBuilder/Streams/CursorlessFilteredStream.php index 2c9111d..79e62ab 100644 --- a/lib/Tumblr/StreamBuilder/Streams/CursorlessFilteredStream.php +++ b/lib/Tumblr/StreamBuilder/Streams/CursorlessFilteredStream.php @@ -123,7 +123,7 @@ public static function from_template(StreamContext $context): self $context->get_current_identity(), $context->get_optional_property('retry_count'), $context->get_optional_property('overfetch_ratio'), - $context->get_optional_property('skip_empty_pages', true) + $context->get_optional_property('skip_empty_pages') ); } diff --git a/tests/unit/Tumblr/StreamBuilder/Streams/CursorlessFilteredStreamTest.php b/tests/unit/Tumblr/StreamBuilder/Streams/CursorlessFilteredStreamTest.php index c0b1098..d777b0a 100644 --- a/tests/unit/Tumblr/StreamBuilder/Streams/CursorlessFilteredStreamTest.php +++ b/tests/unit/Tumblr/StreamBuilder/Streams/CursorlessFilteredStreamTest.php @@ -25,6 +25,7 @@ use Tests\Unit\Tumblr\StreamBuilder\StreamBuilderTest; use Tumblr\StreamBuilder\DependencyBag; use Tumblr\StreamBuilder\Interfaces\Log; +use Tumblr\StreamBuilder\StreamCursors\SearchStreamCursor; use Tumblr\StreamBuilder\StreamFilterResult; use Tumblr\StreamBuilder\StreamFilters\CompositeStreamFilter; use Tumblr\StreamBuilder\StreamFilters\StreamFilter; @@ -191,14 +192,16 @@ public function test_skip_empty_pages_continues_past_filtered_batches() public function test_skip_empty_pages_false_preserves_old_behavior() { $drop1 = new MockedPostRefElement(1, 100); + $drop1->set_cursor(new SearchStreamCursor(1)); $drop2 = new MockedPostRefElement(2, 200); + $drop2->set_cursor(new SearchStreamCursor(2)); $inner_stream = $this->createMock(Stream::class); $inner_stream->expects($this->exactly(2)) ->method('_enumerate') ->willReturnOnConsecutiveCalls( new StreamResult(false, [$drop1]), - new StreamResult(false, [$drop2]) + new StreamResult(true, [$drop2]) ); $filter = $this->createMock(StreamFilter::class); @@ -214,6 +217,7 @@ public function test_skip_empty_pages_false_preserves_old_behavior() $this->assertSame(0, $result->get_size()); $this->assertTrue($result->is_exhaustive()); + } /** From 7e16c4e322d27f95063e3df958c7ac2533b40f23 Mon Sep 17 00:00:00 2001 From: lucila Date: Mon, 6 Apr 2026 17:18:05 -0300 Subject: [PATCH 4/4] remove empty line --- .../StreamBuilder/Streams/CursorlessFilteredStreamTest.php | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/unit/Tumblr/StreamBuilder/Streams/CursorlessFilteredStreamTest.php b/tests/unit/Tumblr/StreamBuilder/Streams/CursorlessFilteredStreamTest.php index d777b0a..443c4a2 100644 --- a/tests/unit/Tumblr/StreamBuilder/Streams/CursorlessFilteredStreamTest.php +++ b/tests/unit/Tumblr/StreamBuilder/Streams/CursorlessFilteredStreamTest.php @@ -217,7 +217,6 @@ public function test_skip_empty_pages_false_preserves_old_behavior() $this->assertSame(0, $result->get_size()); $this->assertTrue($result->is_exhaustive()); - } /**