-
Notifications
You must be signed in to change notification settings - Fork 141
fix: Stop disconnecting consumers from MultiTopicConsumer on connection errors if not using regex #379
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Stop disconnecting consumers from MultiTopicConsumer on connection errors if not using regex #379
Conversation
…on errors if not using regex - streamnative#375 A MultiTopicConsumer contains many child Consumers, one for every partitioned topic, and when you poll the MultiTopicConsumer it polls all child consumers until a new message is ready. This works great in non-error conditions, but in some rare cases, such as when one or more pulsar brokers are restart in sequence, the child consumer can return `Poll::Ready(Some(Err(e)))` The previous behavior was to assume the child consumer was dead and to remove it from the MultiTopicConsumer. This is reasonable if you are using the regex refresh feature, where topics will be discovered at runtime dynamically. However, this is completely the wrong behavior when you have a fixed topic list at startup. Silently dropping a child topic means your MultiTopicConsumer will never listen to that topic ever again. There is no test included, as an automated test that shut down the entire broker during a test would not behave well locally/CI. It was tested with the example included in the issue - https://github.com/chamons/pulsar-load-shed-repro
|
@BewareMyPower @freeznet - You've previously reviewed my PRs or have recent active development history. Would you be willing to review my PR? |
|
More a question for maintainers, but I do wonder if it's ever appropriate to remove a topic from the list when we encounter an "unexpected" error in this code path. |
|
@mdeltito - If you have as regex and that regex covers all of your topics, then I think removing it from the list is acceptable. In 30 seconds or whatever your refresh timer is, you'll pick it up again. Technically my fix would fail if you had regex but it didn't cover all of your topics, but it would be no worse that before the fix. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR fixes a critical bug where MultiTopicConsumer would silently drop topics from consumption when connection errors occurred. The issue manifested during Pulsar broker restarts, where child consumers would return errors and be permanently removed from the multi-topic consumer, even when using a fixed topic list (not regex-based discovery).
Key Changes:
- Modified error handling to only remove topics from
MultiTopicConsumeron errors whentopic_regexis present - Added explanatory comment documenting the conditional removal logic
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
@chamons Thanks! I agree it's acceptable, and I understand that Also, thanks in general for digging into some of these tricky issues and sharing your findings! |
|
I have little understanding on why the removal code exists, but my current theory is that it was added to prevent something like #378 but for Consumers. |
…er restarts by unconditionally update_topics in MultiTopicConsumer While testing streamnative#379 it was found in that cases when the entire pulsar broker was restarted, we'd hit a similar case: - Instead of hitting Some(Err(e) at https://github.com/streamnative/pulsar-rs/blob/a14e8a15144a48d7d97b20c6a7fc637cbf5d780b/src/consumer/multi.rs#L381-L391 - We hit None at https://github.com/streamnative/pulsar-rs/blob/a14e8a15144a48d7d97b20c6a7fc637cbf5d780b/src/consumer/multi.rs#L377-L379 This means we still rip out the topic out of the MultiTopicConsumer forever. I originally was going to apply a similar fix to streamnative#379, but review of the code made me realize that there was code in: https://github.com/streamnative/pulsar-rs/blob/a14e8a15144a48d7d97b20c6a7fc637cbf5d780b/src/consumer/multi.rs#L145C12-L145C25 That actually handle keeping the initial topics around by recreating them. It however, wasn't being called because the call was guarded by a topic_regex existance check. I believe it is necessary for this code to be unconditionally called, because errors in consumers can drop them from a MultiTopicConsumer and we need to recreate. Local testing confirmed that it works, with 5 full restarts of our staging pulsar cluster while under loading not hanging up once.
…er restarts by unconditionally update_topics in MultiTopicConsumer While testing streamnative#379 it was found in that cases when the entire pulsar broker was restarted, we'd hit a similar case: - Instead of hitting Some(Err(e) at https://github.com/streamnative/pulsar-rs/blob/a14e8a15144a48d7d97b20c6a7fc637cbf5d780b/src/consumer/multi.rs#L381-L391 - We hit None at https://github.com/streamnative/pulsar-rs/blob/a14e8a15144a48d7d97b20c6a7fc637cbf5d780b/src/consumer/multi.rs#L377-L379 This means we still rip out the topic out of the MultiTopicConsumer forever. I originally was going to apply a similar fix to streamnative#379, but review of the code made me realize that there was code in: https://github.com/streamnative/pulsar-rs/blob/a14e8a15144a48d7d97b20c6a7fc637cbf5d780b/src/consumer/multi.rs#L145C12-L145C25 That actually handle keeping the initial topics around by recreating them. It however, wasn't being called because the call was guarded by a topic_regex existence check. I believe it is necessary for this code to be unconditionally called, because errors in consumers can drop them from a MultiTopicConsumer and we need to recreate. Local testing confirmed that it works, with 5 full restarts of our staging pulsar cluster while under loading not hanging up once.
A MultiTopicConsumer contains many child Consumers, one for every partitioned topic, and when you poll the MultiTopicConsumer it polls all child consumers until a new message is ready.
This works great in non-error conditions, but in some rare cases, such as when one or more pulsar brokers are restart in sequence, the child consumer can return
Poll::Ready(Some(Err(e)))The previous behavior was to assume the child consumer was dead and to remove it from the MultiTopicConsumer. This is reasonable if you are using the regex refresh feature, where topics will be discovered at runtime dynamically. However, this is completely the wrong behavior when you have a fixed topic list at startup. Silently dropping a child topic means your MultiTopicConsumer will never listen to that topic ever again.
There is no test included, as an automated test that shut down the entire broker during a test would not behave well locally/CI. It was tested with the example included in the issue - https://github.com/chamons/pulsar-load-shed-repro