Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions lib/Tumblr/StreamBuilder/Streams/CursorlessFilteredStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ final class CursorlessFilteredStream extends WrapStream
*/
private float $overfetch_ratio;

/** @var bool */
private bool $skip_empty_pages;

/**
* @param Stream $inner The stream to filter.
* @param StreamFilter $filter The filter to apply to the stream.
Expand All @@ -64,13 +67,17 @@ final class CursorlessFilteredStream extends WrapStream
* is relatively cheap to over-enumerate, you can crank this up to preemptively over-fetch.
* A value of zero means to not over-fetch, whereas a value of 1.0 means to fetch double the
* number of results requested. No more than $count results will be returned.
* @param bool|null $skip_empty_pages If true (default), fully-filtered pages do not count
* against the retry budget, allowing the stream to scan past long runs of filtered content.
* Set to `false` to count every inner enumeration as a retry.
*/
public function __construct(
Stream $inner,
StreamFilter $filter,
string $identity,
?int $retry_count = null,
?float $overfetch_ratio = null
?float $overfetch_ratio = null,
?bool $skip_empty_pages = true
) {
if (is_null($retry_count)) {
$retry_count = FilteredStream::DEFAULT_RETRY_COUNT;
Expand All @@ -82,6 +89,7 @@ public function __construct(
$this->filter = $filter;
$this->retry_count = $retry_count;
$this->overfetch_ratio = $overfetch_ratio;
$this->skip_empty_pages = $skip_empty_pages ?? true;
}

/**
Expand All @@ -96,6 +104,7 @@ public function to_template(): array
'stream_filter' => $this->filter->to_template(),
'retry_count' => $this->retry_count,
'overfetch_ratio' => $this->overfetch_ratio,
'skip_empty_pages' => $this->skip_empty_pages,
];
}

Expand All @@ -113,7 +122,8 @@ public static function from_template(StreamContext $context): self
$filter,
$context->get_current_identity(),
$context->get_optional_property('retry_count'),
$context->get_optional_property('overfetch_ratio')
$context->get_optional_property('overfetch_ratio'),
$context->get_optional_property('skip_empty_pages')
);
}

Expand Down Expand Up @@ -190,10 +200,13 @@ private function _filter_rec(

$tracer && $tracer->filter_retry($this, $inner_cursor, $retry_cursor, $depth, $want_count, $inner_result->get_size(), count($retained));

// Don't count fully-filtered pages against retry budget
$new_depth = count($retained) === 0 && $this->skip_empty_pages ? $depth : $depth - 1;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are now potentially delegating exiting the recursion to the inner stream being exhausted, is there a risk of running out of memory/running into stack overflow issues?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that could happen if we have a huge block of filtered elements.

lets say that we paginate by 10 elements.

and we have 30 consecutive elements being filtered.

this change addresses that issue by not counting "depth" for empty pages.

current exhaustion
page 1 => 8 elements - depth 2
page 2 => 10 filtered elements - depth 1

with this fix:
page 1 => 8 elements - depth 2
page 2 => 10 filtered elements - depth 1
page 3 => 10 filtered elements - depth 1
page 3 => 10 filtered elements - depth 1
page 4 => 5 elements - depth 0

if we make little progress by finding some elements in subsequent pages (inner enumeration), we will decrement retry count (depth). but when we have a big chunk of filtered elements, we will be able to skip the whole block and fetch a few more elements from a subsequent page.

if we notice that this goes too deep and causes memory issues, we can turn it off by setting skip_empty_pages to false.

I want skip_empty_pages to be true by default so that we fix this bug on most places and we still have a way to stop the behaviour.

@lengare

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, thank you for explaining. Let's call this out in release notes and carefully monitor affected templates during deploy.


$rec_result = $this->_filter_rec(
$want_count - count($retained),
$retry_cursor,
$depth - 1,
$new_depth,
$tracer,
$option,
$propagated_is_exhaustive
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
use Tests\Unit\Tumblr\StreamBuilder\StreamBuilderTest;
use Tumblr\StreamBuilder\DependencyBag;
use Tumblr\StreamBuilder\Interfaces\Log;
use Tumblr\StreamBuilder\StreamCursors\SearchStreamCursor;
use Tumblr\StreamBuilder\StreamFilterResult;
use Tumblr\StreamBuilder\StreamFilters\CompositeStreamFilter;
use Tumblr\StreamBuilder\StreamFilters\StreamFilter;
Expand Down Expand Up @@ -138,6 +139,114 @@ public function provide_final_retry_exhaustion_cases(): iterable
yield 'inner_exhausted' => [true, true];
}

/**
* Tests that skip_empty_pages (default true) allows scanning past fully-filtered pages.
*
* Simulates 3 consecutive batches where all elements are filtered out,
* followed by a batch with a passing element. With retry_count=1 and
* skip_empty_pages=true, the stream should reach the 4th batch because
* empty pages don't consume retries.
*/
public function test_skip_empty_pages_continues_past_filtered_batches()
{
$keep = new MockedPostRefElement(1, 100);
$drop1 = new MockedPostRefElement(2, 200);
$drop2 = new MockedPostRefElement(3, 300);
$drop3 = new MockedPostRefElement(4, 400);

$inner_stream = $this->createMock(Stream::class);
$inner_stream->expects($this->exactly(4))
->method('_enumerate')
->willReturnOnConsecutiveCalls(
new StreamResult(false, [$drop1]),
new StreamResult(false, [$drop2]),
new StreamResult(false, [$drop3]),
new StreamResult(true, [$keep])
);

$filter = $this->createMock(StreamFilter::class);
$filter->expects($this->exactly(4))
->method('filter_inner')
->willReturnOnConsecutiveCalls(
new StreamFilterResult([], [$drop1]),
new StreamFilterResult([], [$drop2]),
new StreamFilterResult([], [$drop3]),
new StreamFilterResult([$keep], [])
);

// retry_count=1, skip_empty_pages=true (default)
$filtered_stream = new CursorlessFilteredStream($inner_stream, $filter, 'test', 1, 0.0);
$result = $filtered_stream->enumerate(1);

$this->assertSame(1, $result->get_size());
$this->assertSame($keep, $result->get_elements()[0]->get_original_element());
}

/**
* Tests that skip_empty_pages=false preserves the old behavior where
* fully-filtered pages consume retries.
*
* With retry_count=1, only 2 batches are fetched. Both are fully filtered,
* so the result is empty.
*/
public function test_skip_empty_pages_false_preserves_old_behavior()
{
$drop1 = new MockedPostRefElement(1, 100);
$drop1->set_cursor(new SearchStreamCursor(1));
$drop2 = new MockedPostRefElement(2, 200);
$drop2->set_cursor(new SearchStreamCursor(2));

$inner_stream = $this->createMock(Stream::class);
$inner_stream->expects($this->exactly(2))
->method('_enumerate')
->willReturnOnConsecutiveCalls(
new StreamResult(false, [$drop1]),
new StreamResult(true, [$drop2])
);

$filter = $this->createMock(StreamFilter::class);
$filter->expects($this->exactly(2))
->method('filter_inner')
->willReturnOnConsecutiveCalls(
new StreamFilterResult([], [$drop1]),
new StreamFilterResult([], [$drop2])
);

$filtered_stream = new CursorlessFilteredStream($inner_stream, $filter, 'test', 1, 0.0, false);
$result = $filtered_stream->enumerate(1);

$this->assertSame(0, $result->get_size());
$this->assertTrue($result->is_exhaustive());
}

/**
* Tests that skip_empty_pages terminates when the inner stream is exhausted,
* even if retry budget has not been consumed.
*/
public function test_skip_empty_pages_terminates_on_inner_exhaustion()
{
$drop = new MockedPostRefElement(1, 100);

$inner_stream = $this->createMock(Stream::class);
$inner_stream->expects($this->exactly(2))
->method('_enumerate')
->willReturnOnConsecutiveCalls(
new StreamResult(false, [$drop]),
new StreamResult(true, [])
);

$filter = $this->createMock(StreamFilter::class);
$filter->expects($this->once())
->method('filter_inner')
->willReturn(new StreamFilterResult([], [$drop]));

$filtered_stream = new CursorlessFilteredStream($inner_stream, $filter, 'test', 2, 0.0);
$result = $filtered_stream->enumerate(1);

$this->assertSame(0, $result->get_size());
$this->assertTrue($result->is_exhaustive());
}

/**
* Redo the dependency bag injection.
* @return void
Expand Down
Loading