Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 164 additions & 46 deletions nexus/execution/compliance.py
Original file line number Diff line number Diff line change
@@ -1,78 +1,196 @@
"""Phase 7 Part B kill-switch facade.
"""Phase 7B / Phase 10 Session 4 kill-switch facade.

The four primitive checks are stubs in this task (returning ``True``) so
the live-trading client (``alpaca_client.py``) can wire to a stable surface.
Their bodies land in Task 4:
Four primitive checks guard live order submission:

check_drawdown_breach — portfolio NAV drawdown vs. a configured floor
check_position_loss — single-name P/L vs. a per-position stop
check_fsi_breach — Funding Stress Index outside its acceptable band
check_edgar_rate_limit — EDGAR poll-rate or event-burst guard
check_drawdown_breach — portfolio NAV drawdown vs. -15% floor
check_position_loss — single-name P/L vs. -25% per-position stop
check_fsi_breach — Funding Stress Index vs. 2.0 emergency brake
check_edgar_rate_limit — EDGAR poll-rate guard via Redis counter

``check_kill_switches`` ANDs the four. Order matters: drawdown first
``check_kill_switches`` ANDs all four. Order matters: drawdown first
(cheapest, account-wide), then position, then FSI, then EDGAR — short-circuits
on the first failure so an unhealthy book is not asked to hit external
services to confirm what we already know.

The earlier Phase 6 placeholder (``PreTradeComplianceError`` /
``pre_trade_compliance_check``) was a dead stub and is removed here; its
intended responsibilities (Reg SHO locate, LULD halts, HTB rate economics)
will be picked up alongside short-side support, which is not in Phase 7 scope.
When called with no arguments (as in alpaca_client.py), the orchestrator
fetches portfolio NAV and FSI from the database internally. Infra failures
in the fetch helpers return safe defaults so monitoring outages cannot halt
trading.
"""
from __future__ import annotations

import logging

import polars as pl
import psycopg2
import redis

from nexus.config.settings import settings

logger = logging.getLogger(__name__)


def check_drawdown_breach(portfolio_nav_df: pl.DataFrame) -> bool:
"""Return ``True`` while drawdown is within tolerance. Task 4 will compute
peak-to-trough drawdown from ``portfolio_nav_df`` and compare to a
configured floor (likely -25% on the live paper book)."""
del portfolio_nav_df
return True
"""Return True while portfolio drawdown is within the -15% floor.

Computes current drawdown as (current_nav - peak_nav) / peak_nav where
peak_nav is the all-time maximum NAV in the series. Returns False (halt)
if drawdown < -0.15. An empty series is treated as safe.

-15% is the account-level emergency brake. It is hardcoded, not
configurable, to prevent accidental loosening under operational pressure.
"""
if portfolio_nav_df.is_empty():
return True
nav = portfolio_nav_df["nav"]
peak_nav = nav.max()
if peak_nav is None or float(peak_nav) == 0.0:
return True
current_nav = float(nav[-1])
drawdown = (current_nav - float(peak_nav)) / float(peak_nav)
return drawdown >= -0.15


def check_position_loss(ticker: str, current_price: float, entry_price: float) -> bool:
"""Return ``True`` while a single name's loss is within per-position stop.
Task 4 will compare ``(current_price - entry_price) / entry_price`` to a
per-name stop (likely -15%)."""
del ticker, current_price, entry_price
return True
"""Return True while single-name loss is within the -25% stop.

Returns False (halt this position) if (current - entry) / entry < -0.25.
A zero entry_price is treated as safe to avoid division errors on
uninitialized positions.

-25% is the per-name stop. Hardcoded to prevent accidental widening.
"""
if entry_price == 0.0:
return True
loss = (current_price - entry_price) / entry_price
return loss >= -0.25


def check_fsi_breach(fsi_value: float) -> bool:
"""Return ``True`` while the Funding Stress Index is within band.
Task 4 will compare against a configured upper threshold derived from
the Phase 2 FSI calibration."""
del fsi_value
return True
"""Return True while FSI is below the systemic-stress threshold.

Returns False (halt all trading) if fsi_value > 2.0. The threshold of 2.0
corresponds to severe systemic stress — above the 2022 tightening cycle
(~1.8) but below COVID peak (~5.0) and GFC peak (~3.5). This is an
emergency brake, not a regime filter.

def check_edgar_rate_limit() -> bool:
"""Return ``True`` while EDGAR polling is within the agreed 0.11s/req
budget and no event-burst circuit-breaker has tripped. Task 4 will read
the current rate counters from Redis."""
return True
2.0 is hardcoded. The Phase 2 calibration owns the threshold value.
"""
return fsi_value <= 2.0


def check_kill_switches() -> bool:
"""Master kill switch. Returns ``True`` only when every individual check
returns ``True``. Short-circuits on the first failure so a tripped
drawdown does not need to also pay for FSI / EDGAR lookups.
def check_edgar_rate_limit() -> bool:
"""Return True while EDGAR request rate is below 80% of the 10 req/sec limit.

The argument shapes for the individual checks differ — at this stub
stage we invoke each with placeholder zero-cost inputs. Task 4 will
re-shape this facade to take a single ``ComplianceContext`` so the
real inputs (NAV history, positions, latest FSI, ...) are routed in.
Reads the key ``edgar:request_count_1s`` from Redis. This counter is
incremented by the EDGAR client on each request. If the key is absent
(client does not yet write it) or Redis is unreachable, returns True —
monitoring infrastructure failure must not halt trading.
"""
if not check_drawdown_breach(
pl.DataFrame({"date": [], "nav": []}, schema={"date": pl.Date, "nav": pl.Float64})
):
try:
r = redis.Redis(
host=settings.redis_host,
port=settings.redis_port,
socket_connect_timeout=1,
socket_timeout=1,
)
count = r.get("edgar:request_count_1s")
if count is None:
return True
return int(count) < 8 # 80% of 10 req/sec; leaves headroom
except Exception:
logger.warning("check_edgar_rate_limit: Redis unreachable — allowing trade")
return True


def _fetch_portfolio_nav() -> pl.DataFrame:
"""Query portfolio_nav for the full date/nav series. Returns empty DataFrame on failure."""
_empty = pl.DataFrame(
{"date": [], "nav": []}, schema={"date": pl.Date, "nav": pl.Float64}
)
try:
conn = psycopg2.connect(settings.database_url_sync)
with conn, conn.cursor() as cur:
cur.execute("SELECT date, nav FROM portfolio_nav ORDER BY date")
rows = cur.fetchall()
conn.close()
if not rows:
return _empty
return pl.DataFrame(
{"date": [r[0] for r in rows], "nav": [float(r[1]) for r in rows]},
schema={"date": pl.Date, "nav": pl.Float64},
)
except Exception:
logger.warning("_fetch_portfolio_nav: DB unreachable — returning empty series")
return _empty


def _fetch_latest_fsi() -> float | None:
"""Query fsi_history for the most recent FSI value. Returns None on failure."""
try:
conn = psycopg2.connect(settings.database_url_sync)
with conn, conn.cursor() as cur:
cur.execute(
"SELECT fsi_value FROM fsi_history ORDER BY snapshot_date DESC LIMIT 1"
)
row = cur.fetchone()
conn.close()
return float(row[0]) if row else None
except Exception:
logger.warning("_fetch_latest_fsi: DB unreachable — skipping FSI check")
return None


def check_kill_switches(
portfolio_nav_df: pl.DataFrame | None = None,
fsi_value: float | None = None,
ticker: str | None = None,
current_price: float | None = None,
entry_price: float | None = None,
) -> bool:
"""Master kill switch. Returns True only when every applicable check passes.

All parameters are optional. When called with no arguments (as in
alpaca_client.py), portfolio NAV and FSI are fetched from the database.
Position-level checks are skipped when ticker/prices are not provided.

Order: drawdown (cheapest, account-wide) → position → FSI → EDGAR.
Short-circuits on first failure — a breached book does not need to hit
Redis to confirm what we already know.
"""
if portfolio_nav_df is None:
portfolio_nav_df = _fetch_portfolio_nav()

if fsi_value is None:
fetched = _fetch_latest_fsi()
# DB unavailable → use 0.0 (CALM regime) as safe default so the FSI
# check still runs and can be exercised by monkeypatching in tests.
fsi_value = fetched if fetched is not None else 0.0

if not check_drawdown_breach(portfolio_nav_df):
logger.warning("kill-switch tripped: drawdown breach")
return False
if not check_position_loss("", 0.0, 0.0):

if (
ticker is not None
and current_price is not None
and entry_price is not None
and not check_position_loss(ticker, current_price, entry_price)
):
logger.warning(
"kill-switch tripped: position loss on %s (entry=%.2f current=%.2f)",
ticker,
entry_price,
current_price,
)
return False
if not check_fsi_breach(0.0):

if not check_fsi_breach(fsi_value):
logger.warning("kill-switch tripped: FSI breach (fsi=%.3f)", fsi_value)
return False

if not check_edgar_rate_limit():
logger.warning("kill-switch tripped: EDGAR rate limit")
return False

return True
Loading
Loading