-
Notifications
You must be signed in to change notification settings - Fork 3
feat: add circuit breaker for upstream provider overload protection #75
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Implement per-provider circuit breakers that detect upstream rate limiting (429/503/529 status codes) and temporarily stop sending requests when providers are overloaded. Key features: - Per-provider circuit breakers (Anthropic, OpenAI) - Configurable failure threshold, time window, and cooldown period - Half-open state allows gradual recovery testing - Prometheus metrics for monitoring (state gauge, trips counter, rejects counter) - Thread-safe implementation with proper state machine transitions - Disabled by default for backward compatibility Circuit breaker states: - Closed: normal operation, tracking failures within sliding window - Open: all requests rejected with 503, waiting for cooldown - Half-Open: limited requests allowed to test if upstream recovered Status codes that trigger circuit breaker: - 429 Too Many Requests - 503 Service Unavailable - 529 Anthropic Overloaded Relates to: coder/internal#1153
…solation - Replace custom circuit breaker implementation with sony/gobreaker - Change from per-provider to per-endpoint circuit breakers (e.g., OpenAI chat completions failing won't block responses API) - Simplify API: CircuitBreakers manages all breakers internally - Update metrics to include endpoint label - Simplify tests to focus on key behaviors Based on PR review feedback suggesting use of established library and per-endpoint granularity for better fault isolation.
Rename fields to match gobreaker naming convention: - Window -> Interval - Cooldown -> Timeout - HalfOpenMaxRequests -> MaxRequests - FailureThreshold type int64 -> uint32
…onfigs Address PR review feedback: 1. Middleware pattern - Circuit breaker is now HTTP middleware that wraps handlers, capturing response status codes directly instead of extracting from provider-specific error types. 2. Per-provider configs - NewCircuitBreakers takes map[string]CircuitBreakerConfig keyed by provider name. Providers not in the map have no circuit breaker. 3. Remove provider overfitting - Deleted extractStatusCodeFromError() which hardcoded AnthropicErrorResponse and OpenAIErrorResponse types. Middleware now uses statusCapturingWriter to inspect actual HTTP response codes. 4. Configurable failure detection - IsFailure func in config allows providers to define custom status codes as failures. Defaults to 429/503/529. 5. Fix gauge values - State gauge now uses 0 (closed), 0.5 (half-open), 1 (open) 6. Integration tests - Replaced unit tests with httptest-based integration tests that verify actual behavior: upstream errors trip circuit, requests get blocked, recovery after timeout, per-endpoint isolation. 7. Error message - Changed from 'upstream rate limiting' to 'circuit breaker is open'
- Add CircuitBreaker interface with Allow(), RecordSuccess(), RecordFailure() - Add NoopCircuitBreaker struct for providers without circuit breaker config - Add gobreakerCircuitBreaker wrapping sony/gobreaker implementation - CircuitBreakers.Get() returns NoopCircuitBreaker when provider not configured - Add http.Flusher support to statusCapturingWriter for SSE streaming - Add Unwrap() for ResponseWriter interface detection
- Changed CircuitBreaker interface to Execute(fn func() int) (statusCode, rejected) - Use gobreaker.Execute() to properly handle both ErrOpenState and ErrTooManyRequests - NoopCircuitBreaker.Execute simply runs the function and returns not rejected - Simplified middleware by removing separate Allow/Record pattern
…e gobreakerCircuitBraker along with the interface and noop struct
Co-authored-by: Paweł Banaszewski <pawel@coder.com>
dannykopping
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's looking better now, but still a few things which need addressing please.
| for _, provider := range providers { | ||
| // Create per-provider circuit breaker if configured | ||
| cfg := provider.CircuitBreakerConfig() | ||
| onChange := func(endpoint string, from, to gobreaker.State) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should always log when the CB changes state.
You can move the check for cfg != nil && metrics != nil into onChange and only update metrics if needed.
| mux.HandleFunc(path, newInterceptionProcessor(provider, recorder, mcpProxy, logger, metrics, tracer)) | ||
| handler := newInterceptionProcessor(provider, recorder, mcpProxy, logger, metrics, tracer) | ||
| // Wrap with circuit breaker middleware (nil cbs passes through) | ||
| wrapped := CircuitBreakerMiddleware(cbs, metrics)(handler) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR description is now stale, btw.
| } | ||
|
|
||
| // DefaultCircuitBreakerConfig returns sensible defaults for circuit breaker configuration. | ||
| func DefaultCircuitBreakerConfig() CircuitBreakerConfig { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not used, right?
| switch statusCode { | ||
| case http.StatusTooManyRequests, // 429 | ||
| http.StatusServiceUnavailable, // 503 | ||
| 529: // Anthropic "Overloaded" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a comment here before about this not being provider-specific; not sure what happened to it.
Each provider may handle rate-limits differently. That needs to go into the Provider interface.
| type ProviderConfig struct { | ||
| BaseURL, Key string | ||
| BaseURL, Key string | ||
| CircuitBreaker *CircuitBreakerConfig |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do you imagine this will be configured?
| } | ||
|
|
||
| return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
| endpoint := strings.TrimPrefix(r.URL.Path, "/"+cbs.provider) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If two providers supported the same endpoint, you'd get one provider influencing another.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was changed so ProviderCircuitBreakers struct is created per provider, separate sync.Map per provider.
| "go.opentelemetry.io/otel" | ||
| ) | ||
|
|
||
| func TestCircuitBreaker_WithNewRequestBridge(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just testing Anthropic; we should support all providers.
| } | ||
|
|
||
| // First 2 requests hit upstream, get 429 | ||
| for i := 0; i < 2; i++ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: reference the config (FailureThreshold) so the relationship is clear.
| assert.Equal(t, 1.0, state, "CircuitBreakerState should be 1 (open)") | ||
|
|
||
| rejects := promtest.ToFloat64(metrics.CircuitBreakerRejects.WithLabelValues(aibridge.ProviderAnthropic, "/v1/messages")) | ||
| assert.Equal(t, 1.0, rejects, "CircuitBreakerRejects should be 1") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just testing that a circuit-breaker will close, but not open again, so this isn't a comprehensive integration test.
| if metrics != nil { | ||
| metrics.CircuitBreakerRejects.WithLabelValues(cbs.provider, endpoint).Inc() | ||
| } | ||
| http.Error(w, "circuit breaker is open", http.StatusServiceUnavailable) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Requests expecting JSON responses will be broken by plaintext responses.
Summary
Implement per-provider circuit breakers that detect upstream rate limiting (429/503/529 status codes) and temporarily stop sending requests when providers are overloaded.
This completes the overload protection story by adding the aibridge-specific component that couldn't be implemented as generic HTTP middleware in coderd (since it requires understanding upstream provider responses).
Key Features
Circuit Breaker States
Status Codes That Trigger Circuit Breaker
Other error codes (400, 401, 500, 502, etc.) do not trigger the circuit breaker since they indicate different issues that circuit breaking wouldn't help with.
Default Configuration
EnabledfalseFailureThreshold5Window10sCooldown30sHalfOpenMaxRequests3New Prometheus Metrics
aibridge_circuit_breaker_state{provider}- Current state (0=closed, 1=open, 2=half-open)aibridge_circuit_breaker_trips_total{provider}- Total times circuit openedaibridge_circuit_breaker_rejects_total{provider}- Requests rejected due to open circuitFiles Changed
circuit_breaker.go- Core circuit breaker implementationcircuit_breaker_test.go- Comprehensive test suite (13 tests)bridge.go- Integration into RequestBridgeinterception.go- Apply circuit breaker to intercepted requestsmetrics.go- Add Prometheus metricsTesting
All tests pass:
Related
aibridgedinternal#1153