diff --git a/lib/Tumblr/StreamBuilder/Streams/CursorlessFilteredStream.php b/lib/Tumblr/StreamBuilder/Streams/CursorlessFilteredStream.php index 6e7bacf..79e62ab 100644 --- a/lib/Tumblr/StreamBuilder/Streams/CursorlessFilteredStream.php +++ b/lib/Tumblr/StreamBuilder/Streams/CursorlessFilteredStream.php @@ -53,6 +53,9 @@ final class CursorlessFilteredStream extends WrapStream */ private float $overfetch_ratio; + /** @var bool */ + private bool $skip_empty_pages; + /** * @param Stream $inner The stream to filter. * @param StreamFilter $filter The filter to apply to the stream. @@ -64,13 +67,17 @@ final class CursorlessFilteredStream extends WrapStream * is relatively cheap to over-enumerate, you can crank this up to preemptively over-fetch. * A value of zero means to not over-fetch, whereas a value of 1.0 means to fetch double the * number of results requested. No more than $count results will be returned. + * @param bool|null $skip_empty_pages If true (default), fully-filtered pages do not count + * against the retry budget, allowing the stream to scan past long runs of filtered content. + * Set to `false` to count every inner enumeration as a retry. */ public function __construct( Stream $inner, StreamFilter $filter, string $identity, ?int $retry_count = null, - ?float $overfetch_ratio = null + ?float $overfetch_ratio = null, + ?bool $skip_empty_pages = true ) { if (is_null($retry_count)) { $retry_count = FilteredStream::DEFAULT_RETRY_COUNT; @@ -82,6 +89,7 @@ public function __construct( $this->filter = $filter; $this->retry_count = $retry_count; $this->overfetch_ratio = $overfetch_ratio; + $this->skip_empty_pages = $skip_empty_pages ?? true; } /** @@ -96,6 +104,7 @@ public function to_template(): array 'stream_filter' => $this->filter->to_template(), 'retry_count' => $this->retry_count, 'overfetch_ratio' => $this->overfetch_ratio, + 'skip_empty_pages' => $this->skip_empty_pages, ]; } @@ -113,7 +122,8 @@ public static function from_template(StreamContext $context): self $filter, $context->get_current_identity(), $context->get_optional_property('retry_count'), - $context->get_optional_property('overfetch_ratio') + $context->get_optional_property('overfetch_ratio'), + $context->get_optional_property('skip_empty_pages') ); } @@ -190,10 +200,13 @@ private function _filter_rec( $tracer && $tracer->filter_retry($this, $inner_cursor, $retry_cursor, $depth, $want_count, $inner_result->get_size(), count($retained)); + // Don't count fully-filtered pages against retry budget + $new_depth = count($retained) === 0 && $this->skip_empty_pages ? $depth : $depth - 1; + $rec_result = $this->_filter_rec( $want_count - count($retained), $retry_cursor, - $depth - 1, + $new_depth, $tracer, $option, $propagated_is_exhaustive diff --git a/tests/unit/Tumblr/StreamBuilder/Streams/CursorlessFilteredStreamTest.php b/tests/unit/Tumblr/StreamBuilder/Streams/CursorlessFilteredStreamTest.php index 77cff85..443c4a2 100644 --- a/tests/unit/Tumblr/StreamBuilder/Streams/CursorlessFilteredStreamTest.php +++ b/tests/unit/Tumblr/StreamBuilder/Streams/CursorlessFilteredStreamTest.php @@ -25,6 +25,7 @@ use Tests\Unit\Tumblr\StreamBuilder\StreamBuilderTest; use Tumblr\StreamBuilder\DependencyBag; use Tumblr\StreamBuilder\Interfaces\Log; +use Tumblr\StreamBuilder\StreamCursors\SearchStreamCursor; use Tumblr\StreamBuilder\StreamFilterResult; use Tumblr\StreamBuilder\StreamFilters\CompositeStreamFilter; use Tumblr\StreamBuilder\StreamFilters\StreamFilter; @@ -138,6 +139,114 @@ public function provide_final_retry_exhaustion_cases(): iterable yield 'inner_exhausted' => [true, true]; } + /** + * Tests that skip_empty_pages (default true) allows scanning past fully-filtered pages. + * + * Simulates 3 consecutive batches where all elements are filtered out, + * followed by a batch with a passing element. With retry_count=1 and + * skip_empty_pages=true, the stream should reach the 4th batch because + * empty pages don't consume retries. + */ + public function test_skip_empty_pages_continues_past_filtered_batches() + { + $keep = new MockedPostRefElement(1, 100); + $drop1 = new MockedPostRefElement(2, 200); + $drop2 = new MockedPostRefElement(3, 300); + $drop3 = new MockedPostRefElement(4, 400); + + $inner_stream = $this->createMock(Stream::class); + $inner_stream->expects($this->exactly(4)) + ->method('_enumerate') + ->willReturnOnConsecutiveCalls( + new StreamResult(false, [$drop1]), + new StreamResult(false, [$drop2]), + new StreamResult(false, [$drop3]), + new StreamResult(true, [$keep]) + ); + + $filter = $this->createMock(StreamFilter::class); + $filter->expects($this->exactly(4)) + ->method('filter_inner') + ->willReturnOnConsecutiveCalls( + new StreamFilterResult([], [$drop1]), + new StreamFilterResult([], [$drop2]), + new StreamFilterResult([], [$drop3]), + new StreamFilterResult([$keep], []) + ); + + // retry_count=1, skip_empty_pages=true (default) + $filtered_stream = new CursorlessFilteredStream($inner_stream, $filter, 'test', 1, 0.0); + $result = $filtered_stream->enumerate(1); + + $this->assertSame(1, $result->get_size()); + $this->assertSame($keep, $result->get_elements()[0]->get_original_element()); + } + + /** + * Tests that skip_empty_pages=false preserves the old behavior where + * fully-filtered pages consume retries. + * + * With retry_count=1, only 2 batches are fetched. Both are fully filtered, + * so the result is empty. + */ + public function test_skip_empty_pages_false_preserves_old_behavior() + { + $drop1 = new MockedPostRefElement(1, 100); + $drop1->set_cursor(new SearchStreamCursor(1)); + $drop2 = new MockedPostRefElement(2, 200); + $drop2->set_cursor(new SearchStreamCursor(2)); + + $inner_stream = $this->createMock(Stream::class); + $inner_stream->expects($this->exactly(2)) + ->method('_enumerate') + ->willReturnOnConsecutiveCalls( + new StreamResult(false, [$drop1]), + new StreamResult(true, [$drop2]) + ); + + $filter = $this->createMock(StreamFilter::class); + $filter->expects($this->exactly(2)) + ->method('filter_inner') + ->willReturnOnConsecutiveCalls( + new StreamFilterResult([], [$drop1]), + new StreamFilterResult([], [$drop2]) + ); + + $filtered_stream = new CursorlessFilteredStream($inner_stream, $filter, 'test', 1, 0.0, false); + $result = $filtered_stream->enumerate(1); + + $this->assertSame(0, $result->get_size()); + $this->assertTrue($result->is_exhaustive()); + } + + /** + * Tests that skip_empty_pages terminates when the inner stream is exhausted, + * even if retry budget has not been consumed. + */ + public function test_skip_empty_pages_terminates_on_inner_exhaustion() + { + $drop = new MockedPostRefElement(1, 100); + + $inner_stream = $this->createMock(Stream::class); + $inner_stream->expects($this->exactly(2)) + ->method('_enumerate') + ->willReturnOnConsecutiveCalls( + new StreamResult(false, [$drop]), + new StreamResult(true, []) + ); + + $filter = $this->createMock(StreamFilter::class); + $filter->expects($this->once()) + ->method('filter_inner') + ->willReturn(new StreamFilterResult([], [$drop])); + + $filtered_stream = new CursorlessFilteredStream($inner_stream, $filter, 'test', 2, 0.0); + $result = $filtered_stream->enumerate(1); + + $this->assertSame(0, $result->get_size()); + $this->assertTrue($result->is_exhaustive()); + } + /** * Redo the dependency bag injection. * @return void