-
Notifications
You must be signed in to change notification settings - Fork 4
Fix: Accumulator overflow during aggregation (#6793) #7641
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
0d49582
873e675
3c59cef
cbe835f
6876dc9
729d6bb
e7e4d9d
6f99b22
18bf9eb
4bbbbda
0eb96a3
27f6387
9b6d24d
f6ff6b2
361b2e8
6e98dd5
c627db0
a3565af
89d97c7
c68ff9d
592670b
904b6fe
6ba7ddf
0de0507
ac5d8cd
8a0c12f
083710d
0dae10f
3b59f00
6e5faa1
955bdca
66a262f
06cf22c
74182a5
f986a87
595b4ec
d849810
d7794b1
b816da3
6f6a249
b398f9b
91db4d5
b53a53f
a6f4dec
b3c4aa4
2fbbe34
ba29ce5
218d75d
528c0a6
b27bf7a
980b0c1
86763c9
43b40bf
fe0ba34
7ec35f1
325c415
b7677c8
b634592
b011c59
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -132,7 +132,9 @@ class SetAccumulator[V: Hashable](Accumulator[V, list[V]]): | |
|
|
||
| def __init__(self, | ||
| max_size: int | None = None, | ||
| key: Callable[[V], SupportsRichComparison] | None = None | ||
| key: Callable[[V], SupportsRichComparison] | None = None, | ||
| *, | ||
| allow_overflow: bool = False | ||
| ) -> None: | ||
| """ | ||
| :param max_size: the maximum number of elements to retain | ||
|
|
@@ -147,6 +149,7 @@ def __init__(self, | |
| self.value: set[V] = set() | ||
| self.max_size = max_size | ||
| self.key = none_safe_key(none_last=True) if key is None else key | ||
| self.allow_overflow = allow_overflow | ||
|
|
||
| def accumulate(self, value: V | list[V]) -> int: | ||
| """ | ||
|
|
@@ -553,9 +556,23 @@ def get(self) -> int: | |
|
|
||
| class EntityAggregator(metaclass=ABCMeta): | ||
|
|
||
| def __init__(self, outer_entity_type: EntityType, entity_type: EntityType): | ||
| def __init__(self, | ||
| outer_entity_type: EntityType, | ||
| entity_type: EntityType, | ||
| strict: bool = False): | ||
| """ | ||
| :param outer_entity_type: The entity type of the aggregate document. | ||
|
|
||
| :param entity_type: The entity type of the inner entities being | ||
| accumulated. | ||
|
|
||
| :param strict: Enforce complete accumulation of `document_id` for the | ||
| inner entity type. Required for "hot" entity types, whose | ||
| replicas don't track hub IDs. | ||
| """ | ||
| self.outer_entity_type = outer_entity_type | ||
| self.entity_type = entity_type | ||
| self.strict = strict | ||
|
|
||
| def _transform_entity(self, entity: JSON) -> JSON: | ||
| return entity | ||
|
|
@@ -600,13 +617,29 @@ def _accumulate(self, aggregate: Aggregate, entity: JSON) -> None: | |
| accumulator.accumulate(value) | ||
|
|
||
| def _aggregate(self, aggregate: Aggregate) -> JSON: | ||
| if self.strict: | ||
| accumulator = aggregate.get('document_id') | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Conflates absence and |
||
| assert accumulator is not None, R( | ||
| 'Hot entity types must always accumulate document_id', | ||
| self.entity_type, aggregate.keys() | ||
| ) | ||
| assert not (isinstance(accumulator, SetAccumulator) and accumulator.allow_overflow), R( | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should pull the The constructor argument can remain in |
||
| 'allow_overflow is not permitted when accumulating document_id ' | ||
| 'in hot entity types', self.entity_type | ||
| ) | ||
| result = {} | ||
| for k, accumulator in aggregate.items(): | ||
| if accumulator is not None: | ||
| result[k] = accumulator.get() | ||
| if accumulator.dropped > 0: | ||
| log.warning('Values were dropped %d times while aggregating %s.%s into %s', | ||
| accumulator.dropped, self.entity_type, k, self.outer_entity_type) | ||
| message = ( | ||
| f'Values were dropped {accumulator.dropped} times while aggregating ' | ||
| f'{self.entity_type}.{k} into {self.outer_entity_type}' | ||
| ) | ||
| if isinstance(accumulator, SetAccumulator) and accumulator.allow_overflow: | ||
| log.warning(message) | ||
| else: | ||
| assert False, R(message) | ||
| return result | ||
|
|
||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused by this. The aim of this PR is to make overflows an error, but the default for
strictisFalse? PL please.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Decided in PL to rename
stricttois_hot.