Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 3 additions & 16 deletions lib/Tumblr/StreamBuilder/Streams/CursorlessFilteredStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ final class CursorlessFilteredStream extends WrapStream
*/
private float $overfetch_ratio;

/** @var bool */
private bool $skip_empty_pages;

/**
* @param Stream $inner The stream to filter.
* @param StreamFilter $filter The filter to apply to the stream.
Expand All @@ -67,17 +64,13 @@ final class CursorlessFilteredStream extends WrapStream
* is relatively cheap to over-enumerate, you can crank this up to preemptively over-fetch.
* A value of zero means to not over-fetch, whereas a value of 1.0 means to fetch double the
* number of results requested. No more than $count results will be returned.
* @param bool|null $skip_empty_pages If true (default), fully-filtered pages do not count
* against the retry budget, allowing the stream to scan past long runs of filtered content.
* Set to `false` to count every inner enumeration as a retry.
*/
public function __construct(
Stream $inner,
StreamFilter $filter,
string $identity,
?int $retry_count = null,
?float $overfetch_ratio = null,
?bool $skip_empty_pages = true
?float $overfetch_ratio = null
) {
if (is_null($retry_count)) {
$retry_count = FilteredStream::DEFAULT_RETRY_COUNT;
Expand All @@ -89,7 +82,6 @@ public function __construct(
$this->filter = $filter;
$this->retry_count = $retry_count;
$this->overfetch_ratio = $overfetch_ratio;
$this->skip_empty_pages = $skip_empty_pages ?? true;
}

/**
Expand All @@ -104,7 +96,6 @@ public function to_template(): array
'stream_filter' => $this->filter->to_template(),
'retry_count' => $this->retry_count,
'overfetch_ratio' => $this->overfetch_ratio,
'skip_empty_pages' => $this->skip_empty_pages,
];
}

Expand All @@ -122,8 +113,7 @@ public static function from_template(StreamContext $context): self
$filter,
$context->get_current_identity(),
$context->get_optional_property('retry_count'),
$context->get_optional_property('overfetch_ratio'),
$context->get_optional_property('skip_empty_pages')
$context->get_optional_property('overfetch_ratio')
);
}

Expand Down Expand Up @@ -200,13 +190,10 @@ private function _filter_rec(

$tracer && $tracer->filter_retry($this, $inner_cursor, $retry_cursor, $depth, $want_count, $inner_result->get_size(), count($retained));

// Don't count fully-filtered pages against retry budget
$new_depth = count($retained) === 0 && $this->skip_empty_pages ? $depth : $depth - 1;

$rec_result = $this->_filter_rec(
$want_count - count($retained),
$retry_cursor,
$new_depth,
$depth - 1,
$tracer,
$option,
$propagated_is_exhaustive
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
use Tests\Unit\Tumblr\StreamBuilder\StreamBuilderTest;
use Tumblr\StreamBuilder\DependencyBag;
use Tumblr\StreamBuilder\Interfaces\Log;
use Tumblr\StreamBuilder\StreamCursors\SearchStreamCursor;
use Tumblr\StreamBuilder\StreamFilterResult;
use Tumblr\StreamBuilder\StreamFilters\CompositeStreamFilter;
use Tumblr\StreamBuilder\StreamFilters\StreamFilter;
Expand Down Expand Up @@ -139,114 +138,6 @@ public function provide_final_retry_exhaustion_cases(): iterable
yield 'inner_exhausted' => [true, true];
}

/**
* Tests that skip_empty_pages (default true) allows scanning past fully-filtered pages.
*
* Simulates 3 consecutive batches where all elements are filtered out,
* followed by a batch with a passing element. With retry_count=1 and
* skip_empty_pages=true, the stream should reach the 4th batch because
* empty pages don't consume retries.
*/
public function test_skip_empty_pages_continues_past_filtered_batches()
{
$keep = new MockedPostRefElement(1, 100);
$drop1 = new MockedPostRefElement(2, 200);
$drop2 = new MockedPostRefElement(3, 300);
$drop3 = new MockedPostRefElement(4, 400);

$inner_stream = $this->createMock(Stream::class);
$inner_stream->expects($this->exactly(4))
->method('_enumerate')
->willReturnOnConsecutiveCalls(
new StreamResult(false, [$drop1]),
new StreamResult(false, [$drop2]),
new StreamResult(false, [$drop3]),
new StreamResult(true, [$keep])
);

$filter = $this->createMock(StreamFilter::class);
$filter->expects($this->exactly(4))
->method('filter_inner')
->willReturnOnConsecutiveCalls(
new StreamFilterResult([], [$drop1]),
new StreamFilterResult([], [$drop2]),
new StreamFilterResult([], [$drop3]),
new StreamFilterResult([$keep], [])
);

// retry_count=1, skip_empty_pages=true (default)
$filtered_stream = new CursorlessFilteredStream($inner_stream, $filter, 'test', 1, 0.0);
$result = $filtered_stream->enumerate(1);

$this->assertSame(1, $result->get_size());
$this->assertSame($keep, $result->get_elements()[0]->get_original_element());
}

/**
* Tests that skip_empty_pages=false preserves the old behavior where
* fully-filtered pages consume retries.
*
* With retry_count=1, only 2 batches are fetched. Both are fully filtered,
* so the result is empty.
*/
public function test_skip_empty_pages_false_preserves_old_behavior()
{
$drop1 = new MockedPostRefElement(1, 100);
$drop1->set_cursor(new SearchStreamCursor(1));
$drop2 = new MockedPostRefElement(2, 200);
$drop2->set_cursor(new SearchStreamCursor(2));

$inner_stream = $this->createMock(Stream::class);
$inner_stream->expects($this->exactly(2))
->method('_enumerate')
->willReturnOnConsecutiveCalls(
new StreamResult(false, [$drop1]),
new StreamResult(true, [$drop2])
);

$filter = $this->createMock(StreamFilter::class);
$filter->expects($this->exactly(2))
->method('filter_inner')
->willReturnOnConsecutiveCalls(
new StreamFilterResult([], [$drop1]),
new StreamFilterResult([], [$drop2])
);

$filtered_stream = new CursorlessFilteredStream($inner_stream, $filter, 'test', 1, 0.0, false);
$result = $filtered_stream->enumerate(1);

$this->assertSame(0, $result->get_size());
$this->assertTrue($result->is_exhaustive());
}

/**
* Tests that skip_empty_pages terminates when the inner stream is exhausted,
* even if retry budget has not been consumed.
*/
public function test_skip_empty_pages_terminates_on_inner_exhaustion()
{
$drop = new MockedPostRefElement(1, 100);

$inner_stream = $this->createMock(Stream::class);
$inner_stream->expects($this->exactly(2))
->method('_enumerate')
->willReturnOnConsecutiveCalls(
new StreamResult(false, [$drop]),
new StreamResult(true, [])
);

$filter = $this->createMock(StreamFilter::class);
$filter->expects($this->once())
->method('filter_inner')
->willReturn(new StreamFilterResult([], [$drop]));

$filtered_stream = new CursorlessFilteredStream($inner_stream, $filter, 'test', 2, 0.0);
$result = $filtered_stream->enumerate(1);

$this->assertSame(0, $result->get_size());
$this->assertTrue($result->is_exhaustive());
}

/**
* Redo the dependency bag injection.
* @return void
Expand Down
Loading