Conversation
There was a problem hiding this comment.
Pull request overview
This WIP PR refactors the reducer architecture to decrease memory footprint by fundamentally changing how reducers work. Instead of reducers being iterable streams themselves, they now implement a simpler next() method pattern that processes values one at a time.
Key changes:
- Reducers no longer extend
AbstractStreamor implementIteratorAggregate; they now have anext(mixed $value, mixed $key)method for stateful value accumulation - The
aggregatorsproperty has been renamed toaggregatedthroughout the codebase to better reflect its purpose - A new
Reduceoperator wraps reducers to provide streaming capability while reducing
Reviewed changes
Copilot reviewed 39 out of 39 changed files in this pull request and generated 20 comments.
Show a summary per file
| File | Description |
|---|---|
| StreamTest.php | Updates test assertions to use the renamed aggregated property |
| Reducer test files (Sum, Min, Max, Count, Callback, Average) | Refactored to test reducers through the reduce() function rather than directly instantiating them |
| Operator test files (BufferWhile, BufferCount) | Updates assertions to use aggregated property |
| Collector test files | Updates to access aggregated property from collectors |
| AggregatorTest.php | Refactored to test aggregators with the new Reduce operator wrapper |
| functions.php | Refactored reduce() and aggregate() functions to work with new reducer architecture, updated PHPDoc annotations |
| Stream.php | Marked class as final, updated method signatures to accept callables as reducers |
| Reducer implementations | Completely refactored from stream-based to stateful next() method pattern |
| Operator/Reduce.php | New operator that wraps reducers to provide streaming with reduction |
| Contract interfaces | ReducerInterface simplified to expose $value and next() method; StreamInterface and CollectorInterface updated with aggregated property |
| Collector implementations | Updated to use aggregated property name |
| Aggregator.php | Updated to work with new Reduce operator |
| AbstractStream.php | Added aggregated property that maps aggregator values |
| Documentation files | Various improvements to grammar, examples, and terminology consistency |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| $this->aggregated = \array_map( | ||
| static fn(AggregatorInterface $aggregator): mixed => $aggregator->value, | ||
| $this->collection instanceof StreamInterface ? $this->collection->aggregators : [], |
There was a problem hiding this comment.
The aggregated property in StreamInterface already provides the mapped values (see AbstractStream lines 43-50 where it maps aggregators to their values). This code is redundantly mapping the aggregators again. It should be simplified to: $this->aggregated = $this->collection instanceof StreamInterface ? $this->collection->aggregated : [];
| @@ -10,4 +10,4 @@ following command in your terminal: | |||
| composer require runopencode/dataset | |||
|
|
|||
| Nothing more is required, no additional initialization and/or configration. Just | |||
There was a problem hiding this comment.
The word is misspelled. It should be "configuration" not "configration".
| echo \sprint('Total in EUR: %d', $stream->aggregated['total_converted']); | ||
| echo "\n"; | ||
| echo \sprint('Average in EUR: %d', $stream->aggregators['average_converted']); | ||
| echo \sprint('Average in EUR: %d', $stream->aggregated['average_converted']); |
There was a problem hiding this comment.
The function name appears to be misspelled. It should be sprintf, not sprint.
| new Count([])->value; | ||
| $this->assertSame( | ||
| 1, | ||
| reduce($dataset, Count::class, filter: static fn(int $value, string $key): bool => $key !== 'b' && $value !== 1) |
There was a problem hiding this comment.
Using named parameters with the reduce function won't work as expected. The reduce() function uses variadic parameters (...$args) which are passed positionally to the reducer constructor via ReflectionClass::newInstanceArgs(). Named parameters in the function call will be converted to positional arguments, which may not map correctly. The filter parameter should be passed as a positional argument instead.
| new Max([])->value; | ||
| $this->assertEquals( | ||
| 2, | ||
| reduce($dataset, Max::class, extractor: static fn(int $value, int $key): int => $value * $key), |
There was a problem hiding this comment.
Using named parameters with the reduce function won't work as expected. The reduce() function uses variadic parameters (...$args) which are passed positionally to the reducer constructor via ReflectionClass::newInstanceArgs(). Named parameters will not be properly mapped. The Max reducer constructor expects initial as the first parameter and extractor as the second, so this would need to pass null for initial before the extractor.
|
|
||
| $this->assertEqualsWithDelta( | ||
| 10.0, | ||
| reduce($dataset, Average::class, initial: 20), |
There was a problem hiding this comment.
Using named parameters with the reduce function won't work as expected. The reduce() function uses variadic parameters (...$args) which are passed positionally to the reducer constructor via ReflectionClass::newInstanceArgs(). Named parameters will not be properly mapped. The initial parameter should be passed as the first positional argument.
| new Average([])->value; | ||
| $this->assertEqualsWithDelta( | ||
| 4.0, | ||
| reduce($dataset, Average::class, countNull: true), |
There was a problem hiding this comment.
Using named parameters with the reduce function won't work as expected. The reduce() function uses variadic parameters (...$args) which are passed positionally to the reducer constructor via ReflectionClass::newInstanceArgs(). Named parameters will not be properly mapped. The countNull parameter is the third parameter in Average's constructor, so this would need to pass null for initial and extractor before countNull.
No description provided.