Optimize spans buffer insertion with eviction during insert#10
Optimize spans buffer insertion with eviction during insert#10everettbu wants to merge 1 commit into
Conversation
Enhance the Redis buffer system to support dedicated queue routing for different models, improving performance isolation and reducing buffer processing contention across high-volume models. Key improvements: - Add PendingBufferRouter for per-model queue assignment - Implement RedisBufferRouter with customizable queue functions - Extract model metadata from Redis keys for routing decisions - Add comprehensive test coverage for custom queue assignments This optimization allows critical models like Group to use dedicated processing queues while maintaining backward compatibility for models without custom queue assignments. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
There was a problem hiding this comment.
Pull Request Overview
This PR optimizes the Redis buffer system by implementing a router-based approach that allows different models to use dedicated queues for processing buffer increments. This prevents backlog issues with one model from affecting others.
- Introduces
RedisBufferRouterandPendingBufferRouterclasses to manage model-specific buffer routing - Replaces single shared
PendingBufferwith multiple model-specific buffers - Adds queue assignment functionality for models to use dedicated processing queues
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| src/sentry/buffer/redis.py | Core implementation of buffer routing system with new router classes and model key extraction |
| tests/sentry/buffer/test_redis.py | Updates imports to include new router functionality and helper functions |
| sentry-repo | Adds subproject commit reference |
| def __init__(self, incr_batch_size: int) -> None: | ||
| self.incr_batch_size = incr_batch_size | ||
| self.default_pending_buffer = PendingBuffer(self.incr_batch_size) | ||
| # map of model_key to PendingBufferValue |
There was a problem hiding this comment.
There is an extra trailing space at the end of the comment line.
| # map of model_key to PendingBufferValue | |
| # map of model_key to PendingBufferValue |
| self.incr_batch_size = incr_batch_size | ||
| self.default_pending_buffer = PendingBuffer(self.incr_batch_size) | ||
| # map of model_key to PendingBufferValue | ||
| self.pending_buffer_router: dict[str, PendingBufferValue] = dict() |
There was a problem hiding this comment.
[nitpick] Use {} instead of dict() for better performance and readability when creating empty dictionaries.
| self.pending_buffer_router: dict[str, PendingBufferValue] = dict() | |
| self.pending_buffer_router: dict[str, PendingBufferValue] = {} |
| class RedisBufferRouter: | ||
| def __init__(self) -> None: | ||
| # map of model_key (generated from _get_model_key function) to queue name | ||
| self._routers: dict[str, ChooseQueueFunction] = dict() |
There was a problem hiding this comment.
[nitpick] Use {} instead of dict() for better performance and readability when creating empty dictionaries.
| self._routers: dict[str, ChooseQueueFunction] = dict() | |
| self._routers: dict[str, ChooseQueueFunction] = {} |
| # | ||
| # Hence, we override the default queue of the process_incr task by passing in the assigned queue for the | ||
| # model associated with the model_key. | ||
| process_incr_kwargs: dict[str, Any] = dict() |
There was a problem hiding this comment.
[nitpick] Use {} instead of dict() for better performance and readability when creating empty dictionaries.
| process_incr_kwargs: dict[str, Any] = dict() | |
| process_incr_kwargs: dict[str, Any] = {} |
Test 2