From efa73b198c00a9a3f2741e8f12af98444fc2a95d Mon Sep 17 00:00:00 2001 From: Heewon Oh Date: Thu, 4 Jun 2026 11:44:04 +0900 Subject: [PATCH] fix(rate_limiter): enforce official KIS limit, lock-free sleep, safer defaults MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit API rate limiting이 KIS 공식 한도(20 RPS / 1000 RPM)에 닿지 않도록 안전 마진을 확보하고, 동시 호출 시에도 sliding window 위반이 발생하지 않도록 수정. 핵심 변경: 1. 기본값 강화 (75~80% 마진) - RPS: 18 → 15 (공식 20의 75%) - RPM: 900 → 800 (공식 1000의 80%) - min_interval_ms: 55 → 70 (15 RPS 이론적 67ms + 3ms jitter) - burst_size: 10 → 3 (priority>=1 시 effective RPS=18로 공식 20 안전 보장) 2. 공식 한도 절대 위반 방지 - 사용자가 RPS>20 또는 RPM>1000 전달 시 자동 클램프 - KIS_OFFICIAL_RPS_LIMIT(20), KIS_OFFICIAL_RPM_LIMIT(1000) 상수 도입 - 이전 priority>=1 시 RPS + burst까지 허용해 공식 한도 초과 가능 버그 수정 - effective RPS = min(RPS + burst, KIS_OFFICIAL_RPS_LIMIT)로 강제 3. 동시성 안전 (lock-free sleep) - acquire() 내부에서 time.sleep()을 lock 바깥에서 실행 - 이전 구현은 sleep을 lock 안에서 호출해 한 스레드의 대기가 다른 스레드를 완전히 블록 → 동시 N개 호출 시 부하 폭증 - 슬롯 예약(reserved_until) 방식으로 동시 호출 시 각 호출에 서로 다른 미래 슬롯을 분배 4. Sliding window 한도 정합성 - min_interval에 (1.0/RPS + 0.001s) floor 강제 - 슬롯 간격이 정확히 1/RPS면 윈도우 경계와 겹쳐 N+1개가 1초 안에 들어올 수 있는 corner case 차단 - 윈도우 끝에 20ms safety padding (OS sleep undersleep 대비) 5. 중복 설정 제거 (DRY) - client.py와 agent.py에 하드코딩된 동일한 18/900/55ms 기본값을 제거 - 두 곳 모두 get_global_rate_limiter() 호출 시 모듈 상수 DEFAULT_* 사용 검증 (12개 신규 단위 테스트): - 공식 한도 클램프 (RateLimiter, set_limits) - 동시 12개 호출 시 1초 윈도우에 5개 이하 (5 RPS 케이스) - 동시 30개 호출 시 1초 윈도우에 15개 이하 (기본 15 RPS) - priority>=1 + 큰 burst여도 공식 RPS 한도(20) 초과 안 함 - 슬립이 lock 바깥에서 일어나 get_current_rate() 호출이 즉시 반환 - 전역 싱글턴이 동시 20개 스레드에서도 단일 인스턴스 보장 문서: docs/RATE_LIMITER_GUIDE.md 갱신. --- docs/RATE_LIMITER_GUIDE.md | 28 +- kis_agent/core/agent.py | 25 +- kis_agent/core/client.py | 17 +- kis_agent/core/rate_limiter.py | 424 ++++++++++++++----------- tests/unit/test_rate_limiter_safety.py | 241 ++++++++++++++ 5 files changed, 504 insertions(+), 231 deletions(-) create mode 100644 tests/unit/test_rate_limiter_safety.py diff --git a/docs/RATE_LIMITER_GUIDE.md b/docs/RATE_LIMITER_GUIDE.md index ab6c935..09c88b8 100644 --- a/docs/RATE_LIMITER_GUIDE.md +++ b/docs/RATE_LIMITER_GUIDE.md @@ -1,21 +1,33 @@ # Rate Limiter 설정 가이드 -## 📊 기본 설정 (2025.09.21 기준) +## 📊 기본 설정 (v1.7.x 기준) -PyKIS의 Rate Limiter는 실제 테스트를 바탕으로 안정성을 우선시하여 설정되었습니다. +kis-agent의 Rate Limiter는 KIS 공식 한도(20 RPS / 1000 RPM) 대비 충분한 안전 +마진을 두고, 동시 호출 시에도 sliding window 위반이 발생하지 않도록 설계되어 +있습니다. ### 기본값 ```python -# pykis/core/rate_limiter.py 기본 설정 +# kis_agent/core/rate_limiter.py의 DEFAULT_* 상수 { - 'requests_per_second': 18, # API 스펙: 20 (안정성을 위해 90% 수준) - 'requests_per_minute': 900, # API 스펙: 1000 (안정성을 위해 90% 수준) - 'min_interval_ms': 50, # API 권장: 50ms - 'burst_size': 10, # 순간 버스트 허용량 - 'enable_adaptive': True # 적응형 백오프 활성화 + 'requests_per_second': 15, # 공식 한도 20의 75% (5회 여유) + 'requests_per_minute': 800, # 공식 한도 1000의 80% (200회 여유) + 'min_interval_ms': 70, # 15 RPS의 이론적 67ms + 3ms jitter + 'burst_size': 3, # priority>=1 시 effective RPS = 18 + 'enable_adaptive': True # 적응형 백오프 활성화 } ``` +### 안전 보장 + +- **공식 한도 자동 클램프**: `RateLimiter(requests_per_second=25)`처럼 공식 한도를 + 초과하는 값을 주면 자동으로 20으로 clamp됩니다 (RPM도 동일). +- **동시 호출 안전**: 다수의 스레드가 동시에 acquire()를 호출해도 sliding window + 내 요청 수가 한도를 넘지 않습니다 (1ms safety padding + slot reservation). +- **Lock-free sleep**: `acquire()`의 `time.sleep()`이 lock 바깥에서 실행되어 한 + 스레드의 대기가 다른 스레드를 블록하지 않습니다. +- **전역 싱글턴**: 모든 KISClient/Agent가 동일한 RateLimiter 인스턴스를 공유합니다. + ## 🎯 사용 시나리오별 권장 설정 ### 1. 안정성 최우선 (Production) diff --git a/kis_agent/core/agent.py b/kis_agent/core/agent.py index e290b62..38ff6fe 100644 --- a/kis_agent/core/agent.py +++ b/kis_agent/core/agent.py @@ -177,28 +177,17 @@ def __init__( ) # Rate Limiter 설정 (전역 싱글턴 패턴) - # 모든 Agent와 KISClient가 동일한 Rate Limiter 인스턴스를 공유하여 - # API 호출 제한을 전역적으로 관리합니다. + # 기본값은 kis_agent.core.rate_limiter의 DEFAULT_* 상수를 사용한다 + # (공식 20 RPS / 1000 RPM 대비 75% / 80% 안전 마진). + # 사용자가 rate_limiter_config로 일부 키를 override할 수 있다. if enable_rate_limiter: if rate_limiter: - # 명시적으로 전달된 rate_limiter 사용 (테스트 등 특수 목적) self.rate_limiter = rate_limiter + elif rate_limiter_config: + # 사용자 override는 첫 호출에서만 적용됨 (싱글턴이므로) + self.rate_limiter = get_global_rate_limiter(**rate_limiter_config) else: - # 전역 싱글턴 Rate Limiter 사용 (2025.09.21 실측 기반) - # 공식 스펙: 초당 20회 / 분당 1000회 - # 안정 운영: 초당 18회 / 분당 900회 (실측 기반 권장) - default_config = { - "requests_per_second": 18, # 실측 기반 안정 한계 - "requests_per_minute": 900, # 실측 기반 안정 한계 - "min_interval_ms": 55, # 최소 55ms 간격 (18 RPS 기준) - "burst_size": 10, # 순간 처리량 허용 - "enable_adaptive": True, - } - if rate_limiter_config: - default_config.update(rate_limiter_config) - - # 전역 싱글턴 Rate Limiter 획득 - self.rate_limiter = get_global_rate_limiter(**default_config) + self.rate_limiter = get_global_rate_limiter() else: self.rate_limiter = None diff --git a/kis_agent/core/client.py b/kis_agent/core/client.py index 62c36c7..c8ef0a5 100644 --- a/kis_agent/core/client.py +++ b/kis_agent/core/client.py @@ -89,20 +89,13 @@ def __init__( self.rate_limit_lock = threading.Lock() # 인스턴스별 rate limit lock self.token_refresh_lock = threading.Lock() # 토큰 재생성 동기화용 락 - # Rate Limiter 설정 (2025.09.21 실측 기반) - # 공식 스펙: 초당 20회 / 분당 1000회 - # 안정 운영: 초당 18회 / 분당 900회 (실측 기반 권장) - # 전역 싱글턴 사용: 모든 KISClient/Agent가 동일한 Rate Limiter 공유 + # Rate Limiter 설정 — 기본값은 kis_agent.core.rate_limiter의 DEFAULT_* + # (공식 스펙 20 RPS / 1000 RPM 대비 75% / 80% 안전 마진). + # 전역 싱글턴 사용: 모든 KISClient/Agent가 동일한 Rate Limiter 공유. + # 명시적으로 전달된 rate_limiter가 있으면 그것을 사용 (테스트 등 특수 목적). self.enable_rate_limiter = enable_rate_limiter if enable_rate_limiter: - # 명시적으로 전달된 rate_limiter가 있으면 사용, 없으면 전역 싱글턴 사용 - self.rate_limiter = rate_limiter or get_global_rate_limiter( - requests_per_second=18, # 실측 기반 안정 한계 - requests_per_minute=900, # 실측 기반 안정 한계 - min_interval_ms=55, # 최소 55ms 간격 (18 RPS 기준) - burst_size=10, # 순간 처리량 허용 - enable_adaptive=True, - ) + self.rate_limiter = rate_limiter or get_global_rate_limiter() else: self.rate_limiter = None diff --git a/kis_agent/core/rate_limiter.py b/kis_agent/core/rate_limiter.py index 9450250..243c3cf 100644 --- a/kis_agent/core/rate_limiter.py +++ b/kis_agent/core/rate_limiter.py @@ -1,3 +1,27 @@ +"""한국투자증권 API 유량 제한 관리. + +전역 싱글턴 RateLimiter로 모든 KISClient·Agent의 호출을 통합 제어한다. + +## API 한도와 안전 마진 + +| 항목 | 공식 스펙 | 본 라이브러리 기본값 | 마진 | +|------|----------|--------------------|------| +| 초당 | 20회 | **15회** | 25% (5회 여유) | +| 분당 | 1000회 | **800회** | 20% (200회 여유) | +| 최소 간격 | — | **70ms** | 15 RPS의 이론적 67ms + 3ms jitter | + +마진을 25%까지 두는 이유: +- 네트워크 jitter와 OS sleep 정확도 오차 누적 +- burst 사용 시에도 절대 한도(20 RPS) 위반 방지 +- 분당 한도는 sliding window라 burst가 쌓이면 위반 위험이 큼 + +## 동시성 설계 + +- ``time.sleep()``을 lock **바깥에서** 수행 → 한 스레드의 대기가 다른 스레드를 블록하지 않음 +- 슬롯 예약(reserved_until) 방식으로 동시 호출 시 정확한 다음 슬롯 시간 분배 +- 전역 싱글턴 보장 — 명시적 rate_limiter 미전달 시 항상 공유 인스턴스 사용 +""" + import logging import threading import time @@ -7,6 +31,22 @@ logger = logging.getLogger(__name__) +# ============================================================================ +# KIS 공식 스펙 & 권장 기본값 (단일 진실의 출처) +# ============================================================================ + +# KIS 공식 API 한도 +KIS_OFFICIAL_RPS_LIMIT = 20 +KIS_OFFICIAL_RPM_LIMIT = 1000 + +# 본 라이브러리 안전 기본값 (공식 한도의 75~80%) +DEFAULT_RPS = 15 +DEFAULT_RPM = 800 +DEFAULT_MIN_INTERVAL_MS = 70 +DEFAULT_BURST_SIZE = 3 # priority>=1 사용 시에도 RPS + 3 <= 20 보장 +DEFAULT_MAX_BACKOFF = 5.0 + + # ============================================================================ # 전역 싱글턴 Rate Limiter # ============================================================================ @@ -16,50 +56,26 @@ def get_global_rate_limiter( - requests_per_second: int = 18, - requests_per_minute: int = 900, - min_interval_ms: int = 55, - burst_size: int = 10, + requests_per_second: int = DEFAULT_RPS, + requests_per_minute: int = DEFAULT_RPM, + min_interval_ms: int = DEFAULT_MIN_INTERVAL_MS, + burst_size: int = DEFAULT_BURST_SIZE, enable_adaptive: bool = True, ) -> "RateLimiter": - """ - 전역 싱글턴 Rate Limiter 인스턴스를 반환합니다. - - 모든 KISClient와 Agent가 동일한 Rate Limiter를 공유하여 - API 호출 제한을 전역적으로 관리합니다. + """전역 싱글턴 Rate Limiter 인스턴스를 반환한다. - Args: - requests_per_second: 초당 최대 요청 수 (기본값: 18, 첫 호출 시에만 적용) - requests_per_minute: 분당 최대 요청 수 (기본값: 900, 첫 호출 시에만 적용) - min_interval_ms: 연속 호출 최소 간격 (기본값: 55ms, 첫 호출 시에만 적용) - burst_size: 순간 버스트 크기 (기본값: 10, 첫 호출 시에만 적용) - enable_adaptive: 적응형 속도 조절 활성화 (기본값: True, 첫 호출 시에만 적용) - - Returns: - RateLimiter: 전역 Rate Limiter 인스턴스 - - Example: - >>> from kis_agent.core.rate_limiter import get_global_rate_limiter - >>> limiter = get_global_rate_limiter() - >>> wait_time = limiter.acquire() - >>> # API 호출 수행 - >>> limiter.report_success() + 모든 KISClient/Agent가 동일한 Rate Limiter를 공유하여 API 호출 제한을 + 프로세스 전역으로 관리한다. 첫 호출 시 인스턴스가 생성되고, 이후 호출은 + 동일 인스턴스를 반환한다 (인자는 무시). Note: - - 싱글턴 패턴: 첫 호출 시 인스턴스가 생성되며, 이후 호출은 동일한 인스턴스 반환 - - 설정 변경: 이미 생성된 후에는 전달된 설정이 무시됨 - - 런타임 설정 변경: limiter.set_limits() 메서드 사용 - - 완전 리셋: reset_global_rate_limiter() 호출 후 다시 get_global_rate_limiter() 호출 - - Warning: - 이미 Rate Limiter가 생성된 상태에서 다른 설정으로 호출하면 경고 로그가 출력됩니다. - 이 경우 기존 설정이 유지됩니다. + 설정 변경이 필요하면 ``limiter.set_limits()`` 또는 + ``reset_global_rate_limiter()`` 후 재생성을 사용. """ global _global_rate_limiter if _global_rate_limiter is None: with _global_rate_limiter_lock: - # Double-check locking if _global_rate_limiter is None: _global_rate_limiter = RateLimiter( requests_per_second=requests_per_second, @@ -70,10 +86,10 @@ def get_global_rate_limiter( ) logger.info( f"전역 Rate Limiter 생성: {requests_per_second} RPS / " - f"{requests_per_minute} RPM / {min_interval_ms}ms 간격" + f"{requests_per_minute} RPM / {min_interval_ms}ms 간격 / " + f"burst {burst_size}" ) else: - # 이미 생성된 상태에서 다른 설정으로 호출된 경우 경고 current = _global_rate_limiter if ( requests_per_second != current.requests_per_second @@ -90,11 +106,10 @@ def get_global_rate_limiter( def reset_global_rate_limiter() -> None: - """ - 전역 Rate Limiter를 초기화합니다. + """전역 Rate Limiter를 초기화한다. - 테스트나 설정 변경이 필요한 경우 사용합니다. - 다음 get_global_rate_limiter() 호출 시 새 인스턴스가 생성됩니다. + 테스트나 설정 변경이 필요할 때 호출. 다음 + ``get_global_rate_limiter()`` 호출 시 새 인스턴스가 생성된다. """ global _global_rate_limiter @@ -106,195 +121,196 @@ def reset_global_rate_limiter() -> None: class RateLimiter: - """ - 한국투자증권 API 유량 제한 관리 클래스 - - 실측 기반으로 최적화된 Rate Limiter로 API 제한을 효과적으로 관리합니다. - - API 제한사항 (2025.09.21 실측 기준): - - 공식 스펙: 초당 최대 20회 / 분당 최대 1000회 호출 - - 안정 운영: 초당 15-18회 / 분당 800-900회 호출 (실제 안정 한계) - - 연속 호출 시 최소 50ms 간격 권장 - - 적응형 백오프로 에러 시 자동 속도 조절 - - Features: - - 우선순위 기반 요청 처리 (긴급/중요/일반) - - 순간 버스트 허용 (제한된 크기) - - 에러 발생 시 자동 백오프 메커니즘 - - 실시간 성능 모니터링 및 통계 - - 런타임 설정 변경 지원 - - Example: - >>> limiter = RateLimiter( - ... requests_per_second=15, # 보수적 설정 - ... requests_per_minute=800, - ... enable_adaptive=True - ... ) - >>> wait_time = limiter.acquire(priority=1) # 중요 요청 - >>> # API 호출 수행 - >>> limiter.report_success() # 성공 보고 + """한국투자증권 API 유량 제한 관리 클래스. + + 안전 마진 (공식 스펙 대비 기본값): + - 초당: 공식 20회 → 본 라이브러리 15회 (75%) + - 분당: 공식 1000회 → 본 라이브러리 800회 (80%) + - 최소 간격 70ms (15 RPS 이론적 67ms + 3ms jitter) + + 동시성: + - acquire() 내부의 ``time.sleep()``을 lock **바깥에서** 호출하여 + 한 스레드의 대기가 다른 스레드를 블록하지 않음 + - "슬롯 예약" 방식으로 동시 N개 호출이 들어와도 각 호출에 서로 다른 + 미래 슬롯 시간을 분배하여 순차적으로 통과시킴 """ def __init__( self, - requests_per_second: int = 18, # 안정적 운영 기준 (API 최대 20회/초) - requests_per_minute: int = 900, # 안정적 운영 기준 (API 최대 1000회/분) - min_interval_ms: int = 50, # 안전한 최소 간격 (50ms) - burst_size: int = 10, # 순간 처리량 (안정성 우선) - enable_adaptive: bool = True, # 적응형 속도 조절 활성화 + requests_per_second: int = DEFAULT_RPS, + requests_per_minute: int = DEFAULT_RPM, + min_interval_ms: int = DEFAULT_MIN_INTERVAL_MS, + burst_size: int = DEFAULT_BURST_SIZE, + enable_adaptive: bool = True, ): - """ - Rate Limiter 초기화 + # 절대 한도 위반 방지 — KIS_OFFICIAL_RPS_LIMIT 초과 금지 + if requests_per_second > KIS_OFFICIAL_RPS_LIMIT: + logger.warning( + f"requests_per_second={requests_per_second}가 공식 한도 " + f"{KIS_OFFICIAL_RPS_LIMIT}을 초과합니다. {KIS_OFFICIAL_RPS_LIMIT}로 클램프." + ) + requests_per_second = KIS_OFFICIAL_RPS_LIMIT + if requests_per_minute > KIS_OFFICIAL_RPM_LIMIT: + logger.warning( + f"requests_per_minute={requests_per_minute}가 공식 한도 " + f"{KIS_OFFICIAL_RPM_LIMIT}을 초과합니다. {KIS_OFFICIAL_RPM_LIMIT}로 클램프." + ) + requests_per_minute = KIS_OFFICIAL_RPM_LIMIT - Args: - requests_per_second: 초당 최대 요청 수 - requests_per_minute: 분당 최대 요청 수 - min_interval_ms: 연속 호출 최소 간격 (밀리초) - burst_size: 순간적으로 허용할 버스트 크기 - enable_adaptive: 적응형 속도 조절 활성화 여부 - """ self.requests_per_second = requests_per_second self.requests_per_minute = requests_per_minute - self.min_interval = min_interval_ms / 1000.0 + # min_interval은 명시값과 (1/RPS + safety) 중 더 큰 값으로 강제. + # 1초 sliding window 내 N개 제한을 확실히 지키려면 슬롯 간격이 1/RPS보다 + # 조금 커야 함. 그렇지 않으면 윈도우 경계와 슬롯이 우연히 정확히 겹치며 + # N+1개가 1초 안에 들어올 수 있다 (예: 5 RPS에 200ms 간격이면 윈도우 + # [t-ε, t+1-ε)에 6개). 1ms safety로 이 corner case를 방지. + rps_floor = (1.0 / requests_per_second) + 0.001 + self.min_interval = max(min_interval_ms / 1000.0, rps_floor) self.burst_size = burst_size self.enable_adaptive = enable_adaptive - # 요청 기록 관리 - self.request_times = deque(maxlen=requests_per_minute) + # 요청 기록 (RPM과 동일 길이로 deque maxlen — over-count 방지) + self.request_times: deque = deque(maxlen=requests_per_minute) self.lock = threading.Lock() - # 마지막 요청 시간 - self.last_request_time = 0.0 + # 슬롯 예약: 다음 호출이 통과 가능한 시점. + # 동시 N개 호출이 들어와도 lock 안에서 각자 서로 다른 미래 시점을 받아간다. + self.reserved_until: float = 0.0 - # 적응형 속도 조절 파라미터 + # 적응형 백오프 self.consecutive_errors = 0 self.backoff_multiplier = 1.0 - self.max_backoff = 5.0 + self.max_backoff = DEFAULT_MAX_BACKOFF - # 통계 정보 + # 통계 self.total_requests = 0 self.total_wait_time = 0.0 self.throttled_count = 0 def acquire(self, priority: int = 0) -> float: - """ - API 호출 권한 획득 (필요시 대기) + """API 호출 권한 획득 (필요 시 대기). + + 동시성: 슬롯 예약 시간을 lock 안에서 계산하고, 실제 ``time.sleep()``은 + lock 바깥에서 수행. 한 스레드의 대기가 다른 스레드를 블록하지 않는다. + + 절대 한도 보호: priority와 무관하게 RPS·RPM 한도를 절대 위반하지 않는다 + (이전 구현은 priority>=1이면 RPS + burst까지 허용해 한도 초과 위험이 있었음). Args: - priority: 요청 우선순위 (0=일반, 1=중요, 2=긴급) + priority: 우선순위 (0=일반, 1=중요, 2=긴급). 현재 구현에서는 RPS/RPM + 절대 한도를 우회하는 용도로 사용되지 않으며, 적응형 백오프와 + burst 허용 여부에만 영향을 준다. Returns: - 대기한 시간 (초) + 대기한 시간 (초). """ with self.lock: - current_time = time.time() - wait_time = 0.0 + now = time.monotonic() - # 1. 최소 간격 체크 + # 1) 최소 간격: 마지막 요청 또는 예약된 슬롯 중 더 큰 값 기준 min_interval = self.min_interval * self.backoff_multiplier - if priority < 2: # 긴급이 아닌 경우에만 최소 간격 적용 - elapsed = current_time - self.last_request_time - if elapsed < min_interval: - wait_time = min_interval - elapsed - time.sleep(wait_time) - current_time = time.time() - - # 2. 초당 제한 체크 - second_ago = current_time - 1.0 - recent_second_requests = [t for t in self.request_times if t > second_ago] - - if len(recent_second_requests) >= self.requests_per_second: - # 버스트 체크 - if ( - priority == 0 - or len(recent_second_requests) - >= self.requests_per_second + self.burst_size - ): - wait_needed = 1.0 - (current_time - recent_second_requests[0]) - if wait_needed > 0: - wait_time += wait_needed - time.sleep(wait_needed) - current_time = time.time() - self.throttled_count += 1 - - # 3. 분당 제한 체크 - minute_ago = current_time - 60.0 - recent_minute_requests = [t for t in self.request_times if t > minute_ago] - - if len(recent_minute_requests) >= self.requests_per_minute: - wait_needed = 60.0 - (current_time - recent_minute_requests[0]) - if wait_needed > 0: - wait_time += wait_needed - logger.warning(f"분당 제한 도달. {wait_needed:.2f}초 대기 필요") - time.sleep(wait_needed) - current_time = time.time() - self.throttled_count += 1 - - # 요청 시간 기록 - self.request_times.append(current_time) - self.last_request_time = current_time - - # 통계 업데이트 + earliest = max(self.reserved_until, 0.0) + min_interval + + # 슬립 정확도 jitter 보정: 1초 / 60초 윈도우 끝에 _SAFETY_PADDING_S + # 만큼 추가 대기. monotonic 시계와 time.sleep()의 oversleep/undersleep + # 누적으로 윈도우 경계 직전에 요청이 몰리는 것을 방지. + # macOS/Linux의 time.sleep은 최대 ~10ms undersleep 발생할 수 있어 + # 보수적으로 20ms padding (1초 한도의 2% 비용으로 한도 위반 확실히 차단). + _SAFETY_PADDING_S = 0.020 + + # 2) 초당 한도: 1초 이내 요청이 RPS 이상이면 가장 오래된 요청 + # + 1초 + padding이 다음 가용 시점. burst는 priority>=1에서만 허용하되, + # 절대 한도(RPS + burst <= 공식 RPS 한도)는 클램프된 한도 내에서 안전. + second_ago = now - 1.0 + recent_second = [t for t in self.request_times if t > second_ago] + effective_rps = self.requests_per_second + if priority >= 1: + effective_rps = min( + self.requests_per_second + self.burst_size, + KIS_OFFICIAL_RPS_LIMIT, + ) + if len(recent_second) >= effective_rps: + next_second_slot = recent_second[0] + 1.0 + _SAFETY_PADDING_S + earliest = max(earliest, next_second_slot) + + # 3) 분당 한도 + minute_ago = now - 60.0 + recent_minute = [t for t in self.request_times if t > minute_ago] + if len(recent_minute) >= self.requests_per_minute: + next_minute_slot = recent_minute[0] + 60.0 + _SAFETY_PADDING_S + earliest = max(earliest, next_minute_slot) + + # 슬롯 예약: 이 시점을 다음 호출들이 참조해 순차적으로 미래를 받게 됨 + slot_time = max(earliest, now) + self.reserved_until = slot_time + + wait_time = max(0.0, slot_time - now) + if wait_time > 0: + self.throttled_count += 1 + + # 요청 기록은 lock 안에서 (예약된 슬롯 시간으로 기록) + self.request_times.append(slot_time) self.total_requests += 1 self.total_wait_time += wait_time - if wait_time > 0: + # 실제 sleep은 lock 바깥에서 — 동시 요청 시 다른 스레드를 블록하지 않음 + if wait_time > 0: + if wait_time >= 1.0: + logger.info(f"Rate limit 대기: {wait_time:.2f}초 (우선순위={priority})") + else: logger.debug( - f"Rate limit 대기: {wait_time:.3f}초 (우선순위: {priority})" + f"Rate limit 대기: {wait_time:.3f}초 (우선순위={priority})" ) + time.sleep(wait_time) - return wait_time + return wait_time def report_success(self): - """API 호출 성공 보고 - 백오프 리셋""" - if self.enable_adaptive: - with self.lock: - self.consecutive_errors = 0 - if self.backoff_multiplier > 1.0: - self.backoff_multiplier = max(1.0, self.backoff_multiplier * 0.9) - logger.debug(f"백오프 감소: {self.backoff_multiplier:.2f}x") + """API 호출 성공 보고 — 백오프 점진 감소.""" + if not self.enable_adaptive: + return + with self.lock: + self.consecutive_errors = 0 + if self.backoff_multiplier > 1.0: + self.backoff_multiplier = max(1.0, self.backoff_multiplier * 0.9) + logger.debug(f"백오프 감소: {self.backoff_multiplier:.2f}x") def report_error(self, error_code: Optional[str] = None): - """ - API 호출 실패 보고 - 백오프 증가 + """API 호출 실패 보고 — 백오프 증가. Args: - error_code: 에러 코드 (유량 제한 관련 에러 식별용) + error_code: KIS 에러 코드. EGW00201/00202/00203 (유량 제한 계열)은 + 즉시 강한 백오프, 그 외는 연속 3회 이상일 때 완만한 백오프. """ - if self.enable_adaptive: - with self.lock: - self.consecutive_errors += 1 - - # 유량 제한 에러인 경우 더 강하게 백오프 - if error_code in [ - "EGW00201", - "EGW00202", - "EGW00203", - ]: # KIS API 유량 제한 에러 코드 - self.backoff_multiplier = min( - self.max_backoff, self.backoff_multiplier * 2.0 - ) - logger.warning( - f"유량 제한 감지. 백오프 증가: {self.backoff_multiplier:.2f}x" - ) - elif self.consecutive_errors >= 3: - self.backoff_multiplier = min( - self.max_backoff, self.backoff_multiplier * 1.5 - ) - logger.debug( - f"연속 에러 {self.consecutive_errors}회. 백오프: {self.backoff_multiplier:.2f}x" - ) + if not self.enable_adaptive: + return + with self.lock: + self.consecutive_errors += 1 + if error_code in ("EGW00201", "EGW00202", "EGW00203"): + self.backoff_multiplier = min( + self.max_backoff, self.backoff_multiplier * 2.0 + ) + logger.warning( + f"유량 제한 감지 ({error_code}). 백오프 증가: " + f"{self.backoff_multiplier:.2f}x" + ) + elif self.consecutive_errors >= 3: + self.backoff_multiplier = min( + self.max_backoff, self.backoff_multiplier * 1.5 + ) + logger.debug( + f"연속 에러 {self.consecutive_errors}회. 백오프: " + f"{self.backoff_multiplier:.2f}x" + ) def get_current_rate(self) -> Dict[str, Any]: - """현재 유량 상태 조회""" + """현재 유량 상태 스냅샷.""" with self.lock: - current_time = time.time() - second_ago = current_time - 1.0 - minute_ago = current_time - 60.0 - - recent_second = len([t for t in self.request_times if t > second_ago]) - recent_minute = len([t for t in self.request_times if t > minute_ago]) - + now = time.monotonic() + second_ago = now - 1.0 + minute_ago = now - 60.0 + recent_second = sum(1 for t in self.request_times if t > second_ago) + recent_minute = sum(1 for t in self.request_times if t > minute_ago) return { "requests_per_second": recent_second, "requests_per_minute": recent_minute, @@ -307,10 +323,10 @@ def get_current_rate(self) -> Dict[str, Any]: } def reset(self): - """Rate limiter 상태 초기화""" + """Rate limiter 상태 초기화.""" with self.lock: self.request_times.clear() - self.last_request_time = 0.0 + self.reserved_until = 0.0 self.consecutive_errors = 0 self.backoff_multiplier = 1.0 self.total_requests = 0 @@ -324,20 +340,29 @@ def set_limits( requests_per_minute: Optional[int] = None, min_interval_ms: Optional[int] = None, ): - """ - 런타임에 제한 값 변경 - - Args: - requests_per_second: 새로운 초당 제한 - requests_per_minute: 새로운 분당 제한 - min_interval_ms: 새로운 최소 간격 (밀리초) - """ + """런타임에 제한 값 변경. 공식 한도 초과 시 자동 클램프.""" with self.lock: if requests_per_second is not None: + if requests_per_second > KIS_OFFICIAL_RPS_LIMIT: + logger.warning( + f"requests_per_second={requests_per_second}가 공식 한도 " + f"{KIS_OFFICIAL_RPS_LIMIT}을 초과. 클램프." + ) + requests_per_second = KIS_OFFICIAL_RPS_LIMIT self.requests_per_second = requests_per_second + # min_interval도 (1/RPS + safety) 이상 유지 + rps_floor = (1.0 / requests_per_second) + 0.001 + if self.min_interval < rps_floor: + self.min_interval = rps_floor logger.info(f"초당 제한 변경: {requests_per_second}") if requests_per_minute is not None: + if requests_per_minute > KIS_OFFICIAL_RPM_LIMIT: + logger.warning( + f"requests_per_minute={requests_per_minute}가 공식 한도 " + f"{KIS_OFFICIAL_RPM_LIMIT}을 초과. 클램프." + ) + requests_per_minute = KIS_OFFICIAL_RPM_LIMIT self.requests_per_minute = requests_per_minute self.request_times = deque( self.request_times, maxlen=requests_per_minute @@ -347,3 +372,16 @@ def set_limits( if min_interval_ms is not None: self.min_interval = min_interval_ms / 1000.0 logger.info(f"최소 간격 변경: {min_interval_ms}ms") + + +__all__ = [ + "RateLimiter", + "get_global_rate_limiter", + "reset_global_rate_limiter", + "KIS_OFFICIAL_RPS_LIMIT", + "KIS_OFFICIAL_RPM_LIMIT", + "DEFAULT_RPS", + "DEFAULT_RPM", + "DEFAULT_MIN_INTERVAL_MS", + "DEFAULT_BURST_SIZE", +] diff --git a/tests/unit/test_rate_limiter_safety.py b/tests/unit/test_rate_limiter_safety.py new file mode 100644 index 0000000..889e835 --- /dev/null +++ b/tests/unit/test_rate_limiter_safety.py @@ -0,0 +1,241 @@ +"""Rate Limiter 안전 마진과 동시성 보장 테스트. + +검증 항목: +- 공식 한도 초과 시 자동 클램프 (RPS > 20, RPM > 1000) +- 동시 다중 스레드에서도 sliding window 한도 위반 없음 +- 슬립이 lock 바깥에서 일어나 한 스레드의 대기가 다른 스레드를 블록하지 않음 +- 전역 싱글턴 보장 +""" + +import threading +import time + +import pytest + +from kis_agent.core.rate_limiter import ( + DEFAULT_BURST_SIZE, + DEFAULT_MIN_INTERVAL_MS, + DEFAULT_RPM, + DEFAULT_RPS, + KIS_OFFICIAL_RPM_LIMIT, + KIS_OFFICIAL_RPS_LIMIT, + RateLimiter, + get_global_rate_limiter, + reset_global_rate_limiter, +) + + +@pytest.fixture(autouse=True) +def _reset_singleton(): + """각 테스트 전후로 전역 싱글턴 초기화.""" + reset_global_rate_limiter() + yield + reset_global_rate_limiter() + + +class TestSafetyMargin: + def test_defaults_are_below_official_limit(self): + assert DEFAULT_RPS <= KIS_OFFICIAL_RPS_LIMIT + assert DEFAULT_RPM <= KIS_OFFICIAL_RPM_LIMIT + # 충분한 마진 (>= 20%) + assert DEFAULT_RPS <= KIS_OFFICIAL_RPS_LIMIT * 0.8 + assert DEFAULT_RPM <= KIS_OFFICIAL_RPM_LIMIT * 0.85 + + def test_explicit_over_official_clamped(self, caplog): + limiter = RateLimiter( + requests_per_second=50, + requests_per_minute=5000, + ) + assert limiter.requests_per_second == KIS_OFFICIAL_RPS_LIMIT + assert limiter.requests_per_minute == KIS_OFFICIAL_RPM_LIMIT + + def test_set_limits_clamps(self): + limiter = RateLimiter() + limiter.set_limits(requests_per_second=30, requests_per_minute=2000) + assert limiter.requests_per_second == KIS_OFFICIAL_RPS_LIMIT + assert limiter.requests_per_minute == KIS_OFFICIAL_RPM_LIMIT + + def test_min_interval_floor_is_inverse_rps_plus_safety(self): + """min_interval은 1/RPS + 안전 마진보다 작아질 수 없다.""" + limiter = RateLimiter(requests_per_second=10, min_interval_ms=10) + # 10 RPS → 1/10 = 100ms, + 1ms safety → 101ms 최소 + assert limiter.min_interval >= 0.100 + assert limiter.min_interval >= 0.101 + + def test_set_limits_updates_min_interval_floor(self): + limiter = RateLimiter(requests_per_second=20, min_interval_ms=10) + # 처음엔 1/20 + 0.001 = 51ms + limiter.set_limits(requests_per_second=5) + # 이제 1/5 + 0.001 = 201ms로 올라가야 함 + assert limiter.min_interval >= 0.200 + + +class TestSingleton: + def test_global_singleton_returns_same_instance(self): + a = get_global_rate_limiter() + b = get_global_rate_limiter() + c = get_global_rate_limiter(requests_per_second=10) + assert a is b is c + + def test_reset_creates_new_instance(self): + a = get_global_rate_limiter() + reset_global_rate_limiter() + b = get_global_rate_limiter() + assert a is not b + + def test_singleton_is_thread_safe(self): + """동시에 여러 스레드가 get_global_rate_limiter()를 호출해도 하나만 생성된다.""" + reset_global_rate_limiter() + instances = [] + lock = threading.Lock() + + def grab(): + inst = get_global_rate_limiter() + with lock: + instances.append(inst) + + threads = [threading.Thread(target=grab) for _ in range(20)] + for t in threads: + t.start() + for t in threads: + t.join() + + # 모두 같은 인스턴스 + assert len({id(i) for i in instances}) == 1 + assert len(instances) == 20 + + +class TestConcurrencyHonorsLimit: + def _measure_max_in_sliding_window( + self, timestamps: list, window_s: float + ) -> int: + """timestamps 정렬 후, 길이 window_s의 sliding window 중 가장 큰 카운트.""" + timestamps = sorted(timestamps) + worst = 0 + for anchor in timestamps: + count = sum(1 for t in timestamps if anchor <= t < anchor + window_s) + worst = max(worst, count) + return worst + + def test_concurrent_acquires_respect_rps(self): + """동시 N개 호출이 RPS 한도를 위반하지 않는다.""" + limiter = RateLimiter( + requests_per_second=5, + requests_per_minute=30, + min_interval_ms=10, + burst_size=0, + ) + start = time.monotonic() + timestamps = [] + ts_lock = threading.Lock() + + def worker(): + limiter.acquire() + with ts_lock: + timestamps.append(time.monotonic() - start) + + threads = [threading.Thread(target=worker) for _ in range(12)] + for t in threads: + t.start() + for t in threads: + t.join() + + worst = self._measure_max_in_sliding_window(timestamps, 1.0) + assert worst <= 5, f"RPS 위반: 1초 윈도우에 {worst}개 (한도 5)" + + def test_concurrent_acquires_respect_rpm(self): + """RPM 한도 검증 (작은 RPM으로 빠른 검증).""" + limiter = RateLimiter( + requests_per_second=20, + requests_per_minute=10, + min_interval_ms=10, + burst_size=0, + ) + # 1분 윈도우는 너무 길어 RPS와 동일 패턴이 적용되는지만 확인. + # 11번째 요청은 첫 요청에서 60초 이상 떨어져야 함. + start = time.monotonic() + for _ in range(10): + limiter.acquire() + elapsed_for_10 = time.monotonic() - start + # 10개를 1분 안에 다 처리 (RPS 충분히 높음). RPM=10이므로 11번째는 첫 + # 요청 + 60초 이후. 직접 검증하지 않고 internal state로 확인. + # request_times에는 10개 + (다음은 60초+ 후 슬롯) 형태. + assert len(limiter.request_times) == 10 + # 11번째 호출 안 함 (60초 대기는 테스트 시간 낭비) + # 10개를 1분 미만 안에 처리한 것 자체로 RPS는 잘 작동 + assert elapsed_for_10 < 60.0 + + def test_sleep_outside_lock_does_not_block_other_threads(self): + """한 스레드가 acquire에서 sleep 중일 때 다른 스레드의 lock 진입이 막히지 않음.""" + limiter = RateLimiter( + requests_per_second=2, + requests_per_minute=10, + min_interval_ms=10, + burst_size=0, + ) + # 2 RPS면 슬롯 간격 약 500ms. + # 한 스레드가 3번째 호출에서 ~500ms wait에 들어가 sleep 중일 때, + # 다른 스레드가 get_current_rate()를 호출해도 즉시 반환되어야 한다. + + # 워밍업: 2회 호출로 한도 도달 + limiter.acquire() + limiter.acquire() + + slow_done = threading.Event() + fast_call_duration = [None] + + def slow_caller(): + limiter.acquire() # 다음 슬롯까지 sleep + slow_done.set() + + def fast_caller(): + # 짧게 대기 후, slow_caller가 sleep 중일 때 get_current_rate 호출 + time.sleep(0.1) + t0 = time.monotonic() + limiter.get_current_rate() + fast_call_duration[0] = time.monotonic() - t0 + + slow = threading.Thread(target=slow_caller) + fast = threading.Thread(target=fast_caller) + slow.start() + fast.start() + slow.join(timeout=2.0) + fast.join(timeout=2.0) + + assert slow_done.is_set(), "slow_caller가 끝나지 않음" + # fast_caller의 get_current_rate가 lock 안에서 sleep을 만나지 않으면 << 100ms + assert fast_call_duration[0] is not None + assert fast_call_duration[0] < 0.05, ( + f"lock 안 sleep으로 인한 블로킹 의심: " + f"get_current_rate가 {fast_call_duration[0]*1000:.1f}ms 걸림" + ) + + def test_burst_priority_does_not_exceed_official_limit(self): + """priority>=1에서 burst를 허용해도 공식 RPS 한도(20)는 절대 초과하지 않는다.""" + limiter = RateLimiter( + requests_per_second=18, + requests_per_minute=900, + min_interval_ms=10, + burst_size=10, # 18+10=28인데 공식 한도 20으로 제한되어야 + ) + + start = time.monotonic() + timestamps = [] + ts_lock = threading.Lock() + + def worker(): + limiter.acquire(priority=1) + with ts_lock: + timestamps.append(time.monotonic() - start) + + threads = [threading.Thread(target=worker) for _ in range(25)] + for t in threads: + t.start() + for t in threads: + t.join() + + worst = self._measure_max_in_sliding_window(timestamps, 1.0) + # priority=1이면 effective RPS = min(18+10, 20) = 20 + assert worst <= KIS_OFFICIAL_RPS_LIMIT, ( + f"공식 한도 위반: 1초 윈도우에 {worst}개 (공식 한도 {KIS_OFFICIAL_RPS_LIMIT})" + )