-
Notifications
You must be signed in to change notification settings - Fork 35
[WIP] #3823
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[WIP] #3823
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
Summary of ChangesHello @lidezhu, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a significant architectural change to the log puller's event delivery mechanism by replacing the existing Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a significant and well-executed refactoring by replacing the dynstream-based event delivery in the log puller with a new, custom Span Pipeline. The new implementation, detailed in the excellent design document, aims to improve performance and throughput by removing a serialization bottleneck. The code is clean, well-structured, and closely follows the design. I've identified one potential correctness issue that could lead to non-monotonic resolved timestamps and a suggestion to improve configurability. Overall, this is a high-quality contribution.
| // If the previous barrier has the same ts, the new one is redundant because it | ||
| // would advance to the same resolved-ts but requires waiting for more data. | ||
| if len(s.pendingResolved) != 0 && s.pendingResolved[len(s.pendingResolved)-1].ts == ts { | ||
| return | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current check ts == s.pendingResolved[len(s.pendingResolved)-1].ts is insufficient to prevent non-monotonic resolved timestamps. If a new resolved timestamp arrives that is smaller than the last pending one (but larger than the last flushed one), it will be added to the queue. This can lead to advanceResolvedTs being called with a decreasing timestamp, which might violate the assumptions of downstream components.
To ensure strict monotonicity of pending resolved timestamps, the check should be changed to ts <= .... This will correctly discard any new resolved timestamp that is not a strict improvement over the last pending one.
| // If the previous barrier has the same ts, the new one is redundant because it | |
| // would advance to the same resolved-ts but requires waiting for more data. | |
| if len(s.pendingResolved) != 0 && s.pendingResolved[len(s.pendingResolved)-1].ts == ts { | |
| return | |
| } | |
| // If the new resolved ts is not greater than the last pending one, | |
| // it's redundant. Advancing to a smaller or equal TS is not useful. | |
| if len(s.pendingResolved) != 0 && ts <= s.pendingResolved[len(s.pendingResolved)-1].ts { | |
| return | |
| } |
| subClient.pipeline = newSpanPipelineManager( | ||
| subClient.ctx, | ||
| runtime.GOMAXPROCS(0), | ||
| 4096, | ||
| 1<<30, // 1GiB, same order of magnitude as previous dynstream pending size. | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The parameters for newSpanPipelineManager are hardcoded. For better maintainability and to allow for performance tuning, these values should be configurable. I recommend adding corresponding fields to SubscriptionClientConfig and using them here, similar to how regionEventProcessor is configured.
For example, you could add these fields to SubscriptionClientConfig:
// PipelineWorkerCount is the number of workers for the span pipeline.
// If 0, defaults to runtime.GOMAXPROCS(0).
PipelineWorkerCount int
// PipelineQueueSize is the per-worker channel size for the span pipeline.
// If 0, defaults to 4096.
PipelineQueueSize int
// PipelineQuotaBytes is the total memory quota for in-flight data in the pipeline.
// If 0, defaults to 1GiB.
PipelineQuotaBytes int64And then initialize the manager like this:
pipelineWorkerCount := config.PipelineWorkerCount
if pipelineWorkerCount == 0 {
pipelineWorkerCount = runtime.GOMAXPROCS(0)
}
pipelineQueueSize := config.PipelineQueueSize
if pipelineQueueSize == 0 {
pipelineQueueSize = 4096
}
pipelineQuotaBytes := config.PipelineQuotaBytes
if pipelineQuotaBytes == 0 {
pipelineQuotaBytes = 1 << 30 // 1GiB
}
subClient.pipeline = newSpanPipelineManager(
subClient.ctx,
pipelineWorkerCount,
pipelineQueueSize,
pipelineQuotaBytes,
)|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
|
@lidezhu: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
What problem does this PR solve?
Issue Number: close #xxx
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note