From 76cc55d853fcee67e4a2d0fdaf2ed1323d0ddca7 Mon Sep 17 00:00:00 2001 From: ShivianNaidoo Date: Wed, 20 May 2026 09:25:06 +0200 Subject: [PATCH] =?UTF-8?q?feat(phase10):=20Form=204=20insider=20signal=20?= =?UTF-8?q?(Session=203A)=20=E2=80=94=20systematic=20negative=20IC,=20reje?= =?UTF-8?q?cted?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Migration 0013: insider_transactions table (190,020 rows, 1,577 P txns, 140/140 tickers, 2018-01-02 to 2026-05-19, ON CONFLICT idempotent). EDGARClient.get_form4_xml + Form 4 XML parser (non-derivative transactions only, point-in-time on filed_at). Disk cache at data/cache/form4/ for idempotent re-runs. Cohen-Malloy-Pomorski opportunistic classifier in nexus/signals/factors/insider.py: ROUTINE iff same calendar-month P in each of 3 prior years; default OPPORTUNISTIC when history < 3 years. Signal normalised by adj_close x float_shares_M. IC results (primary insider_opportunistic_63d): 21d t=-1.983 / 63d t=-2.223 / 126d t=-2.658 -- uniform negative, HLZ FAIL. CALM regime 63d t=-2.773; late-third 21d t=-2.870 (worsening, not decaying). NON-CALM positive flip t=+0.67 statistically insufficient (N=8). Registered insider_opportunistic_63d as status='rejected' with full IC evidence in regime_profile. Not wired into aggregator. Paper trader unchanged. 9 TDD tests green (tests/test_insider_factors.py). --- PROGRESS.md | 1 + .../versions/0013_insider_transactions.py | 105 ++++++++ nexus/data/edgar/client.py | 30 +++ nexus/data/edgar/forms/form_4.py | 211 +++++++++++++++ nexus/signals/backtest.py | 66 +++++ nexus/signals/factors/insider.py | 228 +++++++++++++++++ scripts/ingest_form4.py | 241 ++++++++++++++++++ ...register_insider_opportunistic_rejected.py | 141 ++++++++++ scripts/run_insider_ic.py | 148 +++++++++++ tests/test_insider_factors.py | 227 +++++++++++++++++ 10 files changed, 1398 insertions(+) create mode 100644 migrations/versions/0013_insider_transactions.py create mode 100644 nexus/data/edgar/forms/form_4.py create mode 100644 nexus/signals/factors/insider.py create mode 100644 scripts/ingest_form4.py create mode 100644 scripts/register_insider_opportunistic_rejected.py create mode 100644 scripts/run_insider_ic.py create mode 100644 tests/test_insider_factors.py diff --git a/PROGRESS.md b/PROGRESS.md index 805ec9f..96b90ea 100644 --- a/PROGRESS.md +++ b/PROGRESS.md @@ -41,3 +41,4 @@ Phase docs live in `docs/progress/` (untracked, local only). | 9 | HGT retraining Session 2 | COMPLETE — retrained HGT on 140-ticker graph (no code changes; metadata extracted dynamically); val AUC **0.9807** at epoch 280 (vs 0.9803 / e=240 on the prior 30-ticker run); 7m 03s wall-clock; `MODEL_VERSION` bumped `hgt_link_pred_v1` → **`hgt_link_pred_v2`**; node_embeddings re-backfilled at 58 monthly snapshots → **8,120 rows** (140 × 58, dim=64); embedding validation passed (cos(NVDA,AMD)=0.98 > cos(NVDA,ARW)=0.63; per-component std median 0.05); **`graph_gnn_embedding_drift` IC backtest NULL at all horizons** (t=+0.382 @ 21d / +0.524 @ 63d / +0.368 @ 126d on N=52..57; HLZ fail by 10×); registered `status='rejected'` in signal_registry with full evidence record; paper trader unchanged from Phase 9 Session 1 | `docs/progress/phase_9.md` | | 10 | Regime-aware aggregator Session 1 | COMPLETE — **NEGATIVE result, hypothesis refuted, flag rolled back**. Built `_apply_regime_gate` in aggregator + `fsi_value` param on `load_factor_records` + FSI wiring in paper_trader (5 new unit tests, 155/156 suite pass). Tested `non_calm_action: 'zero'` on `fundamental_margin_compression` (126d NON-CALM t=−2.31, N=8). Paper trader: **CAGR +8.72% → +7.68%, Sharpe 0.488 → 0.450, Max DD −32.68% → −35.39%** — all three metrics worsened. Monthly-horizon audit: factor made money in 4 of 6 NON-CALM forward months (gated rebalances sat at start of late-2022 recovery). 126d drag is a horizon artifact; doesn't translate to monthly rebalancing. Registry flag rolled back; regime-aware *infrastructure* retained as opt-in capability for future factors | `docs/progress/phase_10.md` | | 10 | Conviction-weighted institutional flow Session 2 | COMPLETE — migration 0012 (`fund_strategy`, 22 rows: 9 T1 + 4 T2 + 6 T3 + 3 excluded banks); `compose_conviction_flow` pure helper (Δpct_portfolio, point-in-time gated on `available_as_of`); 9 TDD tests; institutional panel wired opt-in into backtest. **Primary `institutional_conviction_flow` NULL** (best raw t=+0.94 at 63d, HLZ fail by 4×) → `status='rejected'`. Ultra-T1 sub-test (5 funds: Lone Pine/Viking/Tiger/Coatue/Point72) full-window 21d t=+1.74; late-third 21d t=+3.02, 63d t=+3.33 — material but in-sample, fails HLZ M=400 (|t|≥3.78). Registered **`institutional_conviction_flow_ultra_t1` as `status='research'`** with dated review gate (2026-08-15, promote if full-window 21d t > 2.0 on extended Q2-2026 sample). NOT wired into aggregator. Paper trader unchanged. | `docs/progress/phase_10.md` | +| 10 | Opportunistic insider purchase signal Session 3A | COMPLETE — **NEGATIVE result: signal direction inverted vs hypothesis**. Migration 0013 (`insider_transactions`, 190,020 rows, 1,577 P txns, 140/140 tickers, 2018-2026). Cohen-Malloy-Pomorski opportunistic classifier (`_is_routine`: same calendar-month P in each of 3 prior years → routine; otherwise opportunistic). `compose_insider_signal` normalised by market cap; 9 TDD tests green. Full-universe IC: `insider_opportunistic_63d` 21d t=**−1.983**, 63d t=**−2.223**, 126d t=**−2.658** — uniform negative across all lookbacks and forward horizons. CALM regime 63d t=**−2.773**; late-third 21d t=**−2.870**. NON-CALM positive flip (+0.67) insufficient (N=8). Registered **`insider_opportunistic_63d` as `status='rejected'`** with full IC evidence in `regime_profile`. Not wired into aggregator. Paper trader unchanged. | `docs/progress/phase_10.md` | diff --git a/migrations/versions/0013_insider_transactions.py b/migrations/versions/0013_insider_transactions.py new file mode 100644 index 0000000..491cabc --- /dev/null +++ b/migrations/versions/0013_insider_transactions.py @@ -0,0 +1,105 @@ +"""insider_transactions table for Form 4 insider signal. + +Revision ID: 0013 +Revises: 0012 +Create Date: 2026-05-18 + +Phase 10 Session 3A: stores every nonDerivativeTransaction row parsed +from SEC Form 4 filings for all 140 universe tickers, 2018-01-01 to present. + +Point-in-time key is ``filed_at`` (Form 4 filing date), never +``transaction_date`` (trade execution date). The 2-business-day gap +matters at daily resolution. + +All transaction codes are stored (P, S, A, D, M, F, etc). The factor +layer filters to code='P' (open-market purchases) for signal computation. +Storing everything enables future A/B tests on sales signal or option +exercises without re-ingesting the full archive. + +UNIQUE constraint: (accession_number, insider_cik, transaction_date, +transaction_code, shares_traded, price_per_share) — stronger than the +spec's original (company_id, insider_name, transaction_date, ...) because: + 1. accession_number is stable; insider_name formatting drifts across filings + 2. Allows two same-day same-code same-size trades at different prices + 3. Form 4/A amendments get their own accession_number, so both versions + coexist; the factor picks the latest filed_at per trade cluster +""" +from collections.abc import Sequence +from typing import Union + +import sqlalchemy as sa +from alembic import op + +revision: str = "0013" +down_revision: Union[str, Sequence[str], None] = "0012" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "insider_transactions", + sa.Column("id", sa.BigInteger, primary_key=True, autoincrement=True), + sa.Column( + "company_id", + sa.BigInteger, + sa.ForeignKey("companies.id"), + nullable=False, + ), + sa.Column("issuer_cik", sa.String(10), nullable=False), + sa.Column("insider_cik", sa.String(20), nullable=False), + sa.Column("insider_name", sa.String(200), nullable=False), + sa.Column("insider_title", sa.String(200), nullable=True), + sa.Column( + "is_director", + sa.Boolean, + nullable=False, + server_default=sa.text("FALSE"), + ), + sa.Column( + "is_officer", + sa.Boolean, + nullable=False, + server_default=sa.text("FALSE"), + ), + sa.Column( + "is_ten_pct_owner", + sa.Boolean, + nullable=False, + server_default=sa.text("FALSE"), + ), + sa.Column("transaction_date", sa.Date, nullable=False), + sa.Column("filed_at", sa.Date, nullable=False), + sa.Column("transaction_code", sa.String(1), nullable=False), + sa.Column("acquired_disposed", sa.String(1), nullable=True), + sa.Column("shares_traded", sa.Numeric(18, 4), nullable=False), + sa.Column("price_per_share", sa.Numeric(12, 4), nullable=True), + sa.Column("notional_usd", sa.Numeric(20, 4), nullable=True), + sa.Column("shares_owned_after", sa.Numeric(18, 4), nullable=True), + sa.Column("accession_number", sa.String(25), nullable=False), + sa.UniqueConstraint( + "accession_number", + "insider_cik", + "transaction_date", + "transaction_code", + "shares_traded", + "price_per_share", + name="uq_insider_tx", + ), + ) + op.create_index( + "ix_insider_tx_company_filed", + "insider_transactions", + ["company_id", "filed_at"], + ) + op.create_index( + "ix_insider_tx_code", + "insider_transactions", + ["transaction_code"], + ) + + +def downgrade() -> None: + op.drop_index("ix_insider_tx_code", table_name="insider_transactions") + op.drop_index("ix_insider_tx_company_filed", table_name="insider_transactions") + op.drop_table("insider_transactions") diff --git a/nexus/data/edgar/client.py b/nexus/data/edgar/client.py index 2b91c9f..2f0c0c2 100644 --- a/nexus/data/edgar/client.py +++ b/nexus/data/edgar/client.py @@ -183,3 +183,33 @@ def get_13f_infotable(self, cik: str, accession_number: str) -> str | None: except Exception as e: print(f" [ERROR] infotable {accession_number}: {e}") return None + + def get_form4_xml(self, cik: str, accession_number: str) -> str | None: + """Fetch the raw Form 4 ownershipDocument XML from EDGAR. + + EDGAR stores two XML variants per Form 4 filing: + 1. xslF345X06/filename.xml — XSLT-rendered HTML (not parseable as XML) + 2. filename.xml — raw ownershipDocument XML (what we want) + + The filing index lists both; we pick the one NOT in a subdirectory. + """ + try: + import re as _re + cik_int = int(cik) + path_acc = accession_number.replace("-", "") + base = f"https://www.sec.gov/Archives/edgar/data/{cik_int}/{path_acc}" + + index_html = self._get(f"{base}/{accession_number}-index.htm").text + + all_xml = _re.findall(r'href="([^"]+\.xml)"', index_html, _re.IGNORECASE) + # Exclude XSLT-rendered variants (live in xslF345X06/ subdirectory) + raw_xmls = [x for x in all_xml if "xsl" not in x.lower()] + if not raw_xmls: + return None + + xml_name = raw_xmls[0].split("/")[-1] + return self._get(f"{base}/{xml_name}").text + + except Exception as e: + print(f" [ERROR] form4_xml {accession_number}: {e}") + return None diff --git a/nexus/data/edgar/forms/form_4.py b/nexus/data/edgar/forms/form_4.py new file mode 100644 index 0000000..3256256 --- /dev/null +++ b/nexus/data/edgar/forms/form_4.py @@ -0,0 +1,211 @@ +"""Form 4 (Statement of Changes in Beneficial Ownership) XML parser. + +Parses the raw ``ownershipDocument`` XML returned by +``EDGARClient.get_form4_xml``. + +Only ``nonDerivativeTransaction`` rows are extracted — derivative +transactions (options, SARs) are compensation-driven, not open-market +signals. ``nonDerivativeHolding`` rows (position snapshots without a +transaction) are also skipped. + +Point-in-time note +------------------ +``InsiderTransaction.filed_at`` is the Form 4 filing date — the date the +information became public. ``transaction_date`` is when the trade executed, +which may be up to 2 business days earlier. The factor MUST gate on +``filed_at``, never ``transaction_date``. + +XML structure (verified against live EDGAR 2026-03-24 NVDA filing) +------------------------------------------------------------------ +- ``transactionCode`` has NO ```` child — read ``.text`` directly +- All numeric fields (shares, price, sharesOwned) use ```` wrappers +- ``rptOwnerCik`` has leading zeros — strip with ``.lstrip("0")`` +- ``nonDerivativeHolding`` elements appear inside ``nonDerivativeTable`` + but lack ``transactionCoding`` — detect and skip them +""" +from __future__ import annotations + +import xml.etree.ElementTree as ET +from dataclasses import dataclass +from datetime import date + + +@dataclass(frozen=True) +class InsiderTransaction: + issuer_cik: str + insider_cik: str + insider_name: str + insider_title: str # empty string if not an officer + is_director: bool + is_officer: bool + is_ten_pct_owner: bool + transaction_date: date + filed_at: date # point-in-time key — always use this, never transaction_date + transaction_code: str # P=purchase, S=sale, A=award, D=disposition to company, etc. + acquired_disposed: str # A=acquired, D=disposed; empty string if unavailable + shares_traded: float + price_per_share: float | None + notional_usd: float | None # shares_traded × price_per_share; None when price is None + shares_owned_after: float | None + accession_number: str + + +def parse_form4( + xml_text: str, + accession_number: str, + filed_at: date, +) -> list[InsiderTransaction]: + """Parse a Form 4 ownershipDocument XML. Returns one record per + nonDerivativeTransaction row. Pure function — no IO. + + Parameters + ---------- + xml_text: + Raw XML string from EDGARClient.get_form4_xml. + accession_number: + EDGAR accession number for this filing (stored for dedup). + filed_at: + Form 4 filing date from the EDGAR submissions index — the + point-in-time availability date (NOT transactionDate). + """ + if not xml_text or not xml_text.strip(): + return [] + try: + root = ET.fromstring(xml_text) + except ET.ParseError: + xml_text = xml_text.encode("ascii", "ignore").decode() + try: + root = ET.fromstring(xml_text) + except ET.ParseError: + return [] + + # ── issuer ─────────────────────────────────────────────────────────────── + issuer_cik = _text(root, "issuer/issuerCik").lstrip("0") or "0" + + # ── reporting owner ────────────────────────────────────────────────────── + owner = root.find("reportingOwner") + if owner is None: + return [] + + insider_cik = _text(owner, "reportingOwnerId/rptOwnerCik").lstrip("0") or "0" + insider_name = _text(owner, "reportingOwnerId/rptOwnerName").strip() + + rel = owner.find("reportingOwnerRelationship") + is_director = _bool_flag(rel, "isDirector") + is_officer = _bool_flag(rel, "isOfficer") + is_ten_pct = _bool_flag(rel, "isTenPercentOwner") + officer_title = _text(rel, "officerTitle").strip() if rel is not None else "" + + # ── non-derivative transactions ─────────────────────────────────────────── + table = root.find("nonDerivativeTable") + if table is None: + return [] + + results: list[InsiderTransaction] = [] + for tx in table.findall("nonDerivativeTransaction"): + # transactionCode has NO wrapper — read .text directly. + # nonDerivativeHolding elements have no transactionCoding child. + coding = tx.find("transactionCoding") + if coding is None: + continue + tc_el = coding.find("transactionCode") + if tc_el is None or not (tc_el.text or "").strip(): + continue + transaction_code = tc_el.text.strip() + + tx_date_str = _value(tx, "transactionDate") + if not tx_date_str: + continue + try: + transaction_date = date.fromisoformat(tx_date_str) + except ValueError: + continue + + amounts = tx.find("transactionAmounts") + if amounts is None: + continue + + shares_str = _value(amounts, "transactionShares") + try: + shares_traded = float((shares_str or "0").replace(",", "")) + except ValueError: + continue + if shares_traded == 0: + continue + + price_str = _value(amounts, "transactionPricePerShare") + try: + price_per_share: float | None = ( + float(price_str.replace(",", "")) if price_str else None + ) + except ValueError: + price_per_share = None + + notional_usd = ( + shares_traded * price_per_share if price_per_share is not None else None + ) + + acquired_disposed = _value(amounts, "transactionAcquiredDisposedCode") or "" + + post = tx.find("postTransactionAmounts") + shares_after_str = ( + _value(post, "sharesOwnedFollowingTransaction") if post is not None else None + ) + try: + shares_owned_after: float | None = ( + float(shares_after_str.replace(",", "")) + if shares_after_str + else None + ) + except ValueError: + shares_owned_after = None + + results.append(InsiderTransaction( + issuer_cik=issuer_cik, + insider_cik=insider_cik, + insider_name=insider_name, + insider_title=officer_title, + is_director=is_director, + is_officer=is_officer, + is_ten_pct_owner=is_ten_pct, + transaction_date=transaction_date, + filed_at=filed_at, + transaction_code=transaction_code, + acquired_disposed=acquired_disposed, + shares_traded=shares_traded, + price_per_share=price_per_share, + notional_usd=notional_usd, + shares_owned_after=shares_owned_after, + accession_number=accession_number, + )) + + return results + + +# ── helpers ─────────────────────────────────────────────────────────────────── + +def _text(element: ET.Element | None, path: str, default: str = "") -> str: + """Text of a descendant located by simple slash-separated path.""" + if element is None: + return default + el = element.find(path) + return (el.text or default) if el is not None else default + + +def _value(element: ET.Element | None, tag: str, default: str = "") -> str: + """Text of the child of a named child element.""" + if element is None: + return default + parent = element.find(tag) + if parent is None: + return default + val_el = parent.find("value") + return (val_el.text or default) if val_el is not None else default + + +def _bool_flag(element: ET.Element | None, tag: str) -> bool: + """Return True if the named child element has text '1'.""" + if element is None: + return False + el = element.find(tag) + return (el.text or "").strip() == "1" if el is not None else False diff --git a/nexus/signals/backtest.py b/nexus/signals/backtest.py index 2481505..8f537ce 100644 --- a/nexus/signals/backtest.py +++ b/nexus/signals/backtest.py @@ -51,6 +51,7 @@ prior_panel as _fund_prior_panel, ) from nexus.signals.factors.graph_based import _compose_signal +from nexus.signals.factors.insider import compose_insider_signal from nexus.signals.factors.institutional import compose_conviction_flow from nexus.signals.hlz import format_table, hlz_correct, update_registry_hlz @@ -529,6 +530,59 @@ def _xs_institutional_conviction_flow( return dict(zip(composed["ticker"], composed["signal"])) +@dataclass +class _InsiderPanel: + transactions: pd.DataFrame # ticker, insider_cik, transaction_date, filed_at, transaction_code, notional_usd + + +def _load_insider_panel(engine) -> _InsiderPanel: + df = pd.read_sql( + text( + """ + SELECT c.ticker, + it.insider_cik, + it.transaction_date, + it.filed_at, + it.transaction_code, + CAST(it.notional_usd AS double precision) AS notional_usd + FROM insider_transactions it + JOIN companies c ON c.id = it.company_id + """ + ), + engine, + ) + if not df.empty: + df["transaction_date"] = pd.to_datetime(df["transaction_date"]).dt.date + df["filed_at"] = pd.to_datetime(df["filed_at"]).dt.date + return _InsiderPanel(transactions=df) + + +def _prices_snapshot_df(price_panel: _PricePanel, as_of: date) -> pd.DataFrame: + """Extract point-in-time price snapshot (adj_close) for all tickers.""" + rows = [] + for ticker, (dates, closes, _) in price_panel.by_ticker.items(): + idx = bisect.bisect_right(dates, as_of) - 1 + if idx >= 0: + rows.append({"ticker": ticker, "date": dates[idx], "adj_close": float(closes[idx])}) + return pd.DataFrame(rows) + + +def _xs_insider_signal( + as_of: date, panel: _InsiderPanel, price_panel: _PricePanel, lookback_days: int +) -> dict[str, float]: + prices = _prices_snapshot_df(price_panel, as_of) + if prices.empty or panel.transactions.empty: + return {} + signal_df = compose_insider_signal(panel.transactions, prices, as_of, lookback_days) + if signal_df.empty: + return {} + return { + row["ticker"]: row["signal"] + for _, row in signal_df.iterrows() + if row["signal"] != 0.0 + } + + @dataclass(frozen=True) class _SupplyEdgeRow: filing_date: date @@ -643,6 +697,7 @@ def _factor_xs_dispatch( supply_panel: _SupplyEdgePanel | None = None, fundamentals_panel: _FundamentalsPanel | None = None, institutional_panel: _InstitutionalPanel | None = None, + insider_panel: _InsiderPanel | None = None, ) -> dict[str, callable]: """Return name → cross-section closure for each Tier A factor. @@ -687,6 +742,15 @@ def _factor_xs_dispatch( base["institutional_conviction_flow"] = ( lambda snap, pp, cp, ep: _xs_institutional_conviction_flow(snap, ip) ) + if insider_panel is not None: + _ip = insider_panel + for _lb in (21, 63, 126): + _name = f"insider_opportunistic_{_lb}d" + _lb_captured = _lb + base[_name] = ( + lambda snap, pp, cp, ep, __p=_ip, __lb=_lb_captured: + _xs_insider_signal(snap, __p, pp, __lb) + ) return base @@ -733,6 +797,7 @@ def compute_factor_ics( supply_panel: _SupplyEdgePanel | None = None, fundamentals_panel: _FundamentalsPanel | None = None, institutional_panel: _InstitutionalPanel | None = None, + insider_panel: _InsiderPanel | None = None, ) -> tuple[list[float], list[int], list[date]]: """Compute per-period ICs for one factor across a set of snapshots. @@ -748,6 +813,7 @@ def compute_factor_ics( supply_panel=supply_panel, fundamentals_panel=fundamentals_panel, institutional_panel=institutional_panel, + insider_panel=insider_panel, ) xs_func = dispatch[name] ics: list[float] = [] diff --git a/nexus/signals/factors/insider.py b/nexus/signals/factors/insider.py new file mode 100644 index 0000000..f67397c --- /dev/null +++ b/nexus/signals/factors/insider.py @@ -0,0 +1,228 @@ +"""Cohen-Malloy-Pomorski opportunistic insider purchase signal — Phase 10 Session 3A. + +Signal construction (CMP 2012, RFS): + raw_i = Σ notional_usd(opportunistic P transactions, lookback window) / market_cap_proxy_i + signal_i = cross_sectional_zscore(raw_i) + +Opportunistic vs routine classification (per insider × ticker): + An insider is ROUTINE for a given month M if they filed a P transaction in + calendar month M in EACH of the 3 prior calendar years (year-1, year-2, year-3). + Default to OPPORTUNISTIC when < 3 years of history are visible. + +Point-in-time key: ``filed_at`` (SEC Form 4 filing date, ≤ 2 business days after +trade execution). Transactions with ``filed_at > as_of`` are excluded. + +Market-cap proxy: adj_close × float_shares_M × 1e6. Float shares from +FLOAT_SHARES_M (reconstitution.py); _FLOAT_FALLBACK_M = 1000.0 for the ~110 +tickers not in that dict (Phase 5 TODO: replace with XBRL point-in-time float). +""" +from __future__ import annotations + +from datetime import date, timedelta +from typing import Any + +import pandas as pd +from sqlalchemy import create_engine, text + +from nexus.config import settings +from nexus.signals.reconstitution import FLOAT_SHARES_M + +_FLOAT_FALLBACK_M = 1000.0 + + +# --------------------------------------------------------------------------- +# Routine / opportunistic classifier +# --------------------------------------------------------------------------- + +def _is_routine( + insider_cik: str, + ticker: str, + txn_month: int, + txn_year: int, + history: pd.DataFrame, +) -> bool: + """Return True iff insider traded same calendar month in each of 3 prior years.""" + prior = frozenset([txn_year - 1, txn_year - 2, txn_year - 3]) + mask = ( + (history["insider_cik"] == insider_cik) + & (history["ticker"] == ticker) + & (history["_month"] == txn_month) + & (history["_year"].isin(prior)) + ) + years_found = frozenset(history.loc[mask, "_year"].unique()) + return years_found == prior + + +# --------------------------------------------------------------------------- +# Pure composer +# --------------------------------------------------------------------------- + +def compose_insider_signal( + transactions: pd.DataFrame, + prices: pd.DataFrame, + as_of: date, + lookback_days: int = 63, +) -> pd.DataFrame: + """Compute CMP opportunistic insider purchase signal, cross-sectionally z-scored. + + Args: + transactions: DataFrame with columns ticker, insider_cik, transaction_date, + filed_at, transaction_code, notional_usd. + prices: DataFrame with columns ticker, date, adj_close. + as_of: Evaluation date; all filed_at > as_of rows are excluded. + lookback_days: Rolling window in calendar days applied to transaction_date. + + Returns: + DataFrame(ticker, signal). signal=0.0 for tickers with no qualifying purchases. + """ + lookback_start = as_of - timedelta(days=lookback_days) + all_tickers = prices["ticker"].unique() + + if transactions.empty or "transaction_code" not in transactions.columns: + return pd.DataFrame({"ticker": all_tickers, "signal": 0.0}) + + txns = transactions.copy() + + # Coerce date columns — pandas may hold them as datetime64 or Python date + def _coerce(col: pd.Series) -> pd.Series: + return col.map(lambda v: v.date() if hasattr(v, "date") else v) + + txns["_fd"] = _coerce(txns["filed_at"]) + txns["_td"] = _coerce(txns["transaction_date"]) + + # Point-in-time filter + open-market purchases only + purchases_pit = txns[ + (txns["_fd"] <= as_of) & (txns["transaction_code"] == "P") + ] + + if purchases_pit.empty: + return pd.DataFrame({"ticker": all_tickers, "signal": 0.0}) + + # Precompute year/month on all visible purchases for the routine classifier + hist = purchases_pit[["insider_cik", "ticker", "_td"]].copy() + hist["_year"] = hist["_td"].map(lambda d: d.year) + hist["_month"] = hist["_td"].map(lambda d: d.month) + + # Window slice — only rows the signal is computed from + window = purchases_pit[purchases_pit["_td"] >= lookback_start].copy() + + if window.empty: + return pd.DataFrame({"ticker": all_tickers, "signal": 0.0}) + + # Classify each window row as opportunistic or routine + def _is_opp(row: Any) -> bool: + return not _is_routine( + row["insider_cik"], + row["ticker"], + row["_td"].month, + row["_td"].year, + hist, + ) + + window["_opp"] = window.apply(_is_opp, axis=1) + opp = window[window["_opp"]] + + # Aggregate notional per ticker + raw_series: pd.Series = ( + opp.groupby("ticker")["notional_usd"].sum() + if not opp.empty + else pd.Series(dtype=float) + ) + + # Market-cap normalisation + zero fill + price_map = prices.set_index("ticker")["adj_close"].to_dict() + raw_rows: list[dict] = [] + for ticker in all_tickers: + notional = float(raw_series.get(ticker, 0.0)) + close = price_map.get(ticker) + if notional > 0 and close and close > 0: + float_m = FLOAT_SHARES_M.get(ticker, _FLOAT_FALLBACK_M) + mktcap = close * float_m * 1e6 + raw_rows.append({"ticker": ticker, "_raw": notional / mktcap}) + else: + raw_rows.append({"ticker": ticker, "_raw": 0.0}) + + df = pd.DataFrame(raw_rows) + + # Cross-sectional z-score + sigma = df["_raw"].std(ddof=1) + if sigma == 0 or pd.isna(sigma): + df["signal"] = 0.0 + else: + mu = df["_raw"].mean() + df["signal"] = (df["_raw"] - mu) / sigma + + return df[["ticker", "signal"]].reset_index(drop=True) + + +# --------------------------------------------------------------------------- +# DB-backed wrappers (used by backtest and live signal pipeline) +# --------------------------------------------------------------------------- + +def load_insider_panel( + tickers: list[str], + start_date: date, + end_date: date, +) -> pd.DataFrame: + """Load insider_transactions for a ticker list, filtered by filed_at range. + + Returns DataFrame with columns matching the compose_insider_signal + transactions contract: ticker, insider_cik, transaction_date, filed_at, + transaction_code, notional_usd. + """ + engine = create_engine(settings.database_url_sync) + sql = text( + """ + SELECT + c.ticker, + it.insider_cik, + it.transaction_date, + it.filed_at, + it.transaction_code, + CAST(it.notional_usd AS double precision) AS notional_usd + FROM insider_transactions it + JOIN companies c ON c.id = it.company_id + WHERE c.ticker = ANY(:tickers) + AND it.filed_at BETWEEN :start_date AND :end_date + ORDER BY it.filed_at + """ + ) + with engine.connect() as conn: + df = pd.read_sql(sql, conn, params={ + "tickers": tickers, + "start_date": start_date, + "end_date": end_date, + }) + engine.dispose() + df["transaction_date"] = pd.to_datetime(df["transaction_date"]).dt.date + df["filed_at"] = pd.to_datetime(df["filed_at"]).dt.date + return df + + +def load_price_panel( + tickers: list[str], + as_of: date, +) -> pd.DataFrame: + """Return single-row price snapshot per ticker on or before as_of. + + Returns DataFrame(ticker, date, adj_close). + """ + engine = create_engine(settings.database_url_sync) + sql = text( + """ + SELECT DISTINCT ON (c.ticker) + c.ticker, + ph.date, + CAST(ph.adj_close AS double precision) AS adj_close + FROM price_history ph + JOIN companies c ON c.id = ph.company_id + WHERE c.ticker = ANY(:tickers) + AND ph.date <= :as_of + ORDER BY c.ticker, ph.date DESC + """ + ) + with engine.connect() as conn: + df = pd.read_sql(sql, conn, params={"tickers": tickers, "as_of": as_of}) + engine.dispose() + df["date"] = pd.to_datetime(df["date"]).dt.date + return df diff --git a/scripts/ingest_form4.py b/scripts/ingest_form4.py new file mode 100644 index 0000000..b57ab8d --- /dev/null +++ b/scripts/ingest_form4.py @@ -0,0 +1,241 @@ +#!/usr/bin/env python +"""Phase 10 Session 3A: ingest SEC Form 4 insider transactions for 140-ticker UNIVERSE. + +Default (no flags): dry-run on the first 5 tickers — fetches and parses +up to ``--limit-per-ticker`` Form 4s each, prints a summary, no DB writes. + +--commit Run the full 140-ticker ingest, idempotent via ON CONFLICT DO NOTHING. +--ticker T Limit to specific tickers (repeatable). Implies full limit per ticker. +--limit-per-ticker N Cap Form 4 filings fetched per ticker (default 500). + +XML files are cached to data/cache/form4/{accession_nodash}.xml. Re-runs skip +already-cached accessions — the only EDGAR requests on subsequent runs are the +submissions JSON per ticker. + +Point-in-time: filed_at (Form 4 filing date) is stored as the availability +timestamp, not transaction_date (trade execution date). The factor layer +gates exclusively on filed_at. +""" +from __future__ import annotations + +import argparse +import sys +from datetime import date +from pathlib import Path + +from sqlalchemy import create_engine, text + +from nexus.config import settings +from nexus.config.universe import UNIVERSE +from nexus.data.edgar.client import EDGARClient +from nexus.data.edgar.forms.form_4 import InsiderTransaction, parse_form4 + +CACHE_DIR = Path("data/cache/form4") +START_DATE = date(2018, 1, 1) + + +def _company_id_by_ticker(engine) -> dict[str, int]: + with engine.connect() as c: + rows = c.execute( + text("SELECT ticker, id FROM companies WHERE node_type='equity'") + ).fetchall() + return {t: i for t, i in rows} + + +def _fetch_xml(client: EDGARClient, cik: str, acc: str) -> str | None: + """Return XML text from disk cache or EDGAR. Returns None on failure.""" + cache_path = CACHE_DIR / f"{acc.replace('-', '')}.xml" + if cache_path.exists(): + content = cache_path.read_text(encoding="utf-8") + if content and "" in content: + return content + # Empty or invalid cache entry — fall through to re-fetch + xml = client.get_form4_xml(cik, acc) + if xml and "" in xml: + CACHE_DIR.mkdir(parents=True, exist_ok=True) + cache_path.write_text(xml, encoding="utf-8") + return xml + return None + + +def _insert_transactions( + engine, company_id: int, txns: list[InsiderTransaction] +) -> tuple[int, int]: + """Return (inserted, skipped). ON CONFLICT DO NOTHING for idempotence.""" + if not txns: + return 0, 0 + sql = text( + """ + INSERT INTO insider_transactions ( + company_id, issuer_cik, insider_cik, insider_name, insider_title, + is_director, is_officer, is_ten_pct_owner, + transaction_date, filed_at, transaction_code, acquired_disposed, + shares_traded, price_per_share, notional_usd, + shares_owned_after, accession_number + ) VALUES ( + :company_id, :issuer_cik, :insider_cik, :insider_name, :insider_title, + :is_director, :is_officer, :is_ten_pct_owner, + :transaction_date, :filed_at, :transaction_code, :acquired_disposed, + :shares_traded, :price_per_share, :notional_usd, + :shares_owned_after, :accession_number + ) + ON CONFLICT ON CONSTRAINT uq_insider_tx DO NOTHING + """ + ) + inserted = 0 + with engine.begin() as conn: + for t in txns: + r = conn.execute(sql, { + "company_id": company_id, + "issuer_cik": t.issuer_cik, + "insider_cik": t.insider_cik, + "insider_name": t.insider_name, + "insider_title": t.insider_title or None, + "is_director": t.is_director, + "is_officer": t.is_officer, + "is_ten_pct_owner": t.is_ten_pct_owner, + "transaction_date": t.transaction_date, + "filed_at": t.filed_at, + "transaction_code": t.transaction_code, + "acquired_disposed": t.acquired_disposed or None, + "shares_traded": t.shares_traded, + "price_per_share": t.price_per_share, + "notional_usd": t.notional_usd, + "shares_owned_after": t.shares_owned_after, + "accession_number": t.accession_number, + }) + inserted += r.rowcount or 0 + return inserted, len(txns) - inserted + + +def _ticker_summary( + ticker: str, + txns: list[InsiderTransaction], + filings_fetched: int, + inserted: int | None, +) -> None: + if not txns: + print(f" [{ticker:6s}] filings={filings_fetched} → 0 transactions parsed") + return + purchases = sum(1 for t in txns if t.transaction_code == "P") + sales = sum(1 for t in txns if t.transaction_code == "S") + dates = [t.filed_at for t in txns] + largest = max( + (t for t in txns if t.notional_usd is not None), + key=lambda t: t.notional_usd, # type: ignore[return-value] + default=None, + ) + largest_str = ( + f" largest=${largest.notional_usd:,.0f}({largest.transaction_code})" + if largest else "" + ) + ins_str = f" inserted={inserted}" if inserted is not None else "" + print( + f" [{ticker:6s}] filings={filings_fetched} txns={len(txns)}" + f" buys={purchases} sells={sales}" + f" filed=[{min(dates)}..{max(dates)}]" + f"{largest_str}{ins_str}" + ) + + +def main() -> int: + ap = argparse.ArgumentParser( + description="Ingest SEC Form 4 insider transactions for the 140-ticker UNIVERSE." + ) + ap.add_argument( + "--commit", action="store_true", + help="Execute DB inserts. Default is dry-run (no writes).", + ) + ap.add_argument( + "--ticker", action="append", default=None, + help="Limit to specific tickers (repeatable). Default: full UNIVERSE.", + ) + ap.add_argument( + "--limit-per-ticker", type=int, default=500, + help="Max Form 4 filings to fetch per ticker (default 500).", + ) + args = ap.parse_args() + + mode = "COMMIT" if args.commit else "DRY-RUN" + universe_tickers = {c.ticker for c in UNIVERSE} + all_tickers = [c.ticker for c in UNIVERSE] + + if args.ticker: + tickers = [t for t in args.ticker if t in universe_tickers] + elif args.commit: + tickers = all_tickers + else: + # Default dry-run: first 5 tickers for a quick sanity check + tickers = all_tickers[:5] + + print(f"[*] Phase 10 Session 3A - Form 4 ingest mode={mode}") + print(f"[*] Tickers: {len(tickers)} limit_per_ticker={args.limit_per_ticker}") + print(f"[*] Date range: {START_DATE} -> present") + print() + + engine = create_engine(settings.database_url_sync) + cid_by_ticker = _company_id_by_ticker(engine) if args.commit else {} + cik_by_ticker = {c.ticker: c.cik for c in UNIVERSE} + + client = EDGARClient() + total_filings = total_txns = total_inserted = total_skipped = 0 + errored: list[str] = [] + + for ticker in tickers: + cik = cik_by_ticker.get(ticker) + if cik is None: + errored.append(ticker) + print(f" [{ticker:6s}] no CIK in UNIVERSE — skip") + continue + + try: + filings = client.get_recent_filings(cik, "4", limit=args.limit_per_ticker) + except Exception as e: + errored.append(ticker) + print(f" [{ticker:6s}] submissions fetch error: {e}") + continue + + # Filter to START_DATE window + filings = [f for f in filings if f.filed_at.date() >= START_DATE] + total_filings += len(filings) + + ticker_txns: list[InsiderTransaction] = [] + for rec in filings: + xml = _fetch_xml(client, cik, rec.accession_number) + if xml is None: + continue + parsed = parse_form4(xml, rec.accession_number, rec.filed_at.date()) + ticker_txns.extend(parsed) + + total_txns += len(ticker_txns) + + if args.commit: + company_id = cid_by_ticker.get(ticker) + if company_id is None: + errored.append(ticker) + print(f" [{ticker:6s}] not found in companies table — skip") + continue + ins, skp = _insert_transactions(engine, company_id, ticker_txns) + total_inserted += ins + total_skipped += skp + _ticker_summary(ticker, ticker_txns, len(filings), inserted=ins) + else: + _ticker_summary(ticker, ticker_txns, len(filings), inserted=None) + + print() + print(f"[*] {mode} summary") + print(f" tickers processed : {len(tickers) - len(errored)}") + print(f" tickers errored : {len(errored)}") + print(f" form 4 filings seen : {total_filings}") + print(f" transactions parsed : {total_txns}") + if args.commit: + print(f" rows inserted (new) : {total_inserted}") + print(f" rows skipped (exist) : {total_skipped}") + if errored: + print(f" errored tickers : {sorted(errored)}") + engine.dispose() + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/register_insider_opportunistic_rejected.py b/scripts/register_insider_opportunistic_rejected.py new file mode 100644 index 0000000..e800e46 --- /dev/null +++ b/scripts/register_insider_opportunistic_rejected.py @@ -0,0 +1,141 @@ +"""Phase 10 Session 3A — register insider_opportunistic_63d as rejected. + +Cohen-Malloy-Pomorski (2012) routine vs opportunistic classification applied +to Form 4 insider purchases across the 140-ticker semiconductor universe, +2018-01-01 to 2026-05-19 (190,020 rows, 1,577 P transactions). + +Routine definition: insider filed a P transaction in the same calendar month +in EACH of the 3 prior calendar years. All others default to opportunistic — +this is the conservative direction (minimises false-routine classification for +insiders with short history). + +Signal: Σ notional_usd(opportunistic P, lookback window) normalised by +adj_close × FLOAT_SHARES_M × 1e6, then cross-sectional z-score. + +IC results (Diagnostic-A, primary lookback=63d): + + fwd=21d N=58 mean_IC=-0.0225 t=-1.983 hit=46.6% FAIL + fwd=63d N=56 mean_IC=-0.0242 t=-2.223 hit=30.4% FAIL + fwd=126d N=53 mean_IC=-0.0273 t=-2.658 hit=34.0% FAIL + +Lookback sensitivity (Diagnostic-B, fwd=21d): + + lookback=21d N=57 t=-0.813 FAIL + lookback=63d N=58 t=-1.983 FAIL + lookback=126d N=58 t=-1.546 FAIL + +Regime split (Diagnostic-C): + + CALM (N=50) 21d t=-2.264 63d t=-2.773 FAIL + NON-CALM (N=8) 21d t=+0.670 63d t=+0.673 INSUFFICIENT_N + +Late-third decay check (Diagnostic-D, cutoff 2024-08-31, N=20): + + 21d t=-2.870 BORDERLINE (negative direction) + +Verdict: NOT REGISTERED as alpha. ICs are uniformly negative across every +diagnostic — the signal direction is systematically inverted vs the hypothesis. +The signal is not noise; it is a consistent anti-predictor in this universe. + +Writing status='rejected' so future sessions see the verdict in +signal_registry without re-running the test. + +Idempotent INSERT ... ON CONFLICT (name) DO UPDATE. +""" +import json + +from sqlalchemy import create_engine, text + +from nexus.config import settings + +_FACTOR_NAME = "insider_opportunistic_63d" + +_REGIME_PROFILE = { + "role": "insider_cmp_opportunistic", + "verdict": "negative_ic_systematic", + "phase": "10.3a", + "universe_size": 140, + "data_rows": 190020, + "purchase_transactions": 1577, + "filed_date_range": ["2018-01-02", "2026-05-19"], + "construction": ( + "Cohen-Malloy-Pomorski routine classifier: ROUTINE if insider filed " + "a P transaction in the same calendar month in each of years t-1, t-2, " + "t-3. Default OPPORTUNISTIC when < 3 years history visible (conservative). " + "Signal = Σ notional_usd(opportunistic P) / (adj_close × float_shares_M × 1e6), " + "cross-sectional z-score. Point-in-time gate on filed_at." + ), + "diagnostic_a_primary_63d_lookback": { + "21d": {"n": 58, "mean_ic": -0.0225, "t_stat": -1.983, "hit_rate": 0.466, "hlz_passes": False}, + "63d": {"n": 56, "mean_ic": -0.0242, "t_stat": -2.223, "hit_rate": 0.304, "hlz_passes": False}, + "126d": {"n": 53, "mean_ic": -0.0273, "t_stat": -2.658, "hit_rate": 0.340, "hlz_passes": False}, + }, + "diagnostic_b_lookback_sensitivity_fwd21d": { + "21d_lookback": {"n": 57, "t_stat": -0.813, "hlz_passes": False}, + "63d_lookback": {"n": 58, "t_stat": -1.983, "hlz_passes": False}, + "126d_lookback": {"n": 58, "t_stat": -1.546, "hlz_passes": False}, + }, + "diagnostic_c_regime_split": { + "calm_n50_21d": {"t_stat": -2.264, "hlz_passes": False}, + "calm_n50_63d": {"t_stat": -2.773, "hlz_passes": False}, + "non_calm_n8_21d": {"t_stat": +0.670, "verdict": "INSUFFICIENT_N"}, + "non_calm_n8_63d": {"t_stat": +0.673, "verdict": "INSUFFICIENT_N"}, + }, + "diagnostic_d_late_third_fwd21d": { + "cutoff": "2024-08-31", + "n": 20, + "t_stat": -2.870, + "verdict": "BORDERLINE_NEGATIVE — signal worsening, not decaying", + }, + "evidence": ( + "ICs uniformly negative across all lookbacks (21d/63d/126d), all forward " + "horizons (21d/63d/126d), and the dominant CALM regime (N=50, t≈-2.3 to " + "-2.8). The NON-CALM positive flip (t≈+0.67, N=8) is statistically " + "insufficient and mirrors the Session 1 regime-gate trap. Late-third window " + "worsens rather than decays. CMP opportunistic purchase signal is a consistent " + "anti-predictor on this 140-ticker semiconductor panel 2018-2026." + ), + "do_not_test_contrarian_without_preregistration": True, +} + + +def main() -> None: + engine = create_engine(settings.database_url_sync) + with engine.begin() as conn: + result = conn.execute( + text( + """ + INSERT INTO signal_registry + (name, type, tier, t_stat, hlz_passes, status, regime_profile) + VALUES + (:name, :type, :tier, :t_stat, :hlz_passes, :status, + CAST(:regime_profile AS JSONB)) + ON CONFLICT (name) DO UPDATE + SET t_stat = EXCLUDED.t_stat, + hlz_passes = EXCLUDED.hlz_passes, + status = EXCLUDED.status, + regime_profile = EXCLUDED.regime_profile + RETURNING signal_id, name, status, t_stat, hlz_passes + """ + ), + { + "name": _FACTOR_NAME, + "type": "insider", + "tier": "A", + "t_stat": -1.983, # least negative |t| across Diagnostic-A horizons (21d fwd) + "hlz_passes": False, + "status": "rejected", + "regime_profile": json.dumps(_REGIME_PROFILE), + }, + ) + row = result.fetchone() + + engine.dispose() + print( + f"[+] signal_registry: signal_id={row.signal_id} name={row.name} " + f"status={row.status} t_stat={row.t_stat} hlz_passes={row.hlz_passes}" + ) + + +if __name__ == "__main__": + main() diff --git a/scripts/run_insider_ic.py b/scripts/run_insider_ic.py new file mode 100644 index 0000000..35ff41f --- /dev/null +++ b/scripts/run_insider_ic.py @@ -0,0 +1,148 @@ +"""Phase 10 Session 3A — IC backtest for insider_opportunistic signal. + +Diagnostic-A: primary signal is ``insider_opportunistic_63d`` (CMP 63-day +lookback), tested at 21d / 63d / 126d forward-return horizons plus CALM vs +NON-CALM FSI regime split. + +Diagnostic-B: lookback sensitivity — all three signal variants (21d / 63d / +126d) compared at the 21d forward horizon. + +Diagnostic-C: late-third sub-window decay check on the primary signal. + +Read-only — does NOT touch signal_registry or rolling_registry. +User approval of these numbers is required before any registry update. +""" +from __future__ import annotations + +import numpy as np +from sqlalchemy import create_engine + +from nexus.config import settings +from nexus.signals.backtest import ( + CALM_FSI_THRESHOLD, + FORWARD_TRADING_DAYS, + _aggregate, + _compute_forward_returns, + _load_fsi_by_date, + _load_insider_panel, + compute_factor_ics, + load_all_panels, +) + +PRIMARY_SIGNAL = "insider_opportunistic_63d" +ALL_LOOKBACKS = (21, 63, 126) + + +def _hlz_verdict(t: float, n: int) -> str: + if not np.isfinite(t) or n < 10: + return "INSUFFICIENT_N" + if abs(t) >= 3.78: + return "PASS" + if abs(t) >= 2.78: + return "BORDERLINE" + return "FAIL" + + +def _row(label: str, ics, sizes, periods, t: float) -> None: + n = len(ics) + mean_ic = float(np.mean(ics)) if ics else float("nan") + std_ic = float(np.std(ics, ddof=1)) if n > 1 else float("nan") + icir = mean_ic / std_ic * np.sqrt(n) if (n > 1 and std_ic > 0) else float("nan") + hit = float(np.mean([1.0 if ic > 0 else 0.0 for ic in ics])) if ics else float("nan") + print( + f" {label:34s} N={n:>3d} mean_IC={mean_ic:>+.4f} " + f"t={t:>+.3f} IC-IR={icir:>+.3f} hit={hit:>.2%} " + f"avg_xs={float(np.mean(sizes)) if sizes else float('nan'):>5.1f} " + f"verdict={_hlz_verdict(t, n)}" + ) + + +def main() -> None: + engine = create_engine(settings.database_url_sync) + print("[*] Loading panels (price, centrality, embedding, insider)...", flush=True) + panels = load_all_panels(engine) + price_panel, centrality_panel, _embed = panels + insider_panel = _load_insider_panel(engine) + snap_dates = centrality_panel.sorted_dates + fsi_by_date = _load_fsi_by_date(snap_dates, engine) + engine.dispose() + + n_txns = len(insider_panel.transactions) + n_buys = int((insider_panel.transactions["transaction_code"] == "P").sum()) if n_txns > 0 else 0 + print(f"[*] Snapshots : {len(snap_dates)} ({snap_dates[0]} -> {snap_dates[-1]})") + print(f"[*] Transactions: {n_txns:,} total {n_buys:,} purchases (P)") + + # -------------------------------------------------------------------- # + # A. Multi-horizon sweep — primary signal (63d lookback) + # -------------------------------------------------------------------- # + print(f"\n=== Diagnostic-A — {PRIMARY_SIGNAL} at 3 forward horizons ===") + for fwd_days in (FORWARD_TRADING_DAYS, 63, 126): + fwd = _compute_forward_returns(snap_dates, price_panel, forward_days=fwd_days) + ics, sizes, periods = compute_factor_ics( + PRIMARY_SIGNAL, snap_dates, fwd, *panels, + insider_panel=insider_panel, + ) + agg = _aggregate(PRIMARY_SIGNAL, ics, sizes, periods) + _row(f"fwd={fwd_days}d overall", ics, sizes, periods, agg.t_stat) + + # -------------------------------------------------------------------- # + # B. Lookback sensitivity — 21d / 63d / 126d at 21d forward return + # -------------------------------------------------------------------- # + print("\n=== Diagnostic-B — lookback sensitivity at 21d forward return ===") + fwd_21 = _compute_forward_returns(snap_dates, price_panel, forward_days=FORWARD_TRADING_DAYS) + for lb in ALL_LOOKBACKS: + factor_name = f"insider_opportunistic_{lb}d" + ics, sizes, periods = compute_factor_ics( + factor_name, snap_dates, fwd_21, *panels, + insider_panel=insider_panel, + ) + agg = _aggregate(factor_name, ics, sizes, periods) + _row(f"lookback={lb}d fwd=21d", ics, sizes, periods, agg.t_stat) + + # -------------------------------------------------------------------- # + # C. Regime split — primary signal at 21d and 63d + # -------------------------------------------------------------------- # + print( + f"\n=== Regime split " + f"(CALM = FSI < {CALM_FSI_THRESHOLD}, NON-CALM = FSI >= {CALM_FSI_THRESHOLD}) ===" + ) + calm = {d for d in snap_dates if fsi_by_date.get(d) is not None and fsi_by_date[d] < CALM_FSI_THRESHOLD} + noncalm = {d for d in snap_dates if fsi_by_date.get(d) is not None and fsi_by_date[d] >= CALM_FSI_THRESHOLD} + print(f" CALM periods: {len(calm)}") + print(f" NON-CALM periods: {len(noncalm)}") + for fwd_days in (FORWARD_TRADING_DAYS, 63): + fwd = _compute_forward_returns(snap_dates, price_panel, forward_days=fwd_days) + for label, period_filter in (("CALM", calm), ("NON-CALM", noncalm)): + ics, sizes, periods = compute_factor_ics( + PRIMARY_SIGNAL, snap_dates, fwd, *panels, + period_filter=period_filter, + insider_panel=insider_panel, + ) + agg = _aggregate(PRIMARY_SIGNAL, ics, sizes, periods) + _row(f"fwd={fwd_days}d {label}", ics, sizes, periods, agg.t_stat) + + # -------------------------------------------------------------------- # + # D. Late-third sub-window (decay check) + # -------------------------------------------------------------------- # + print("\n=== Late-third sub-window decay check ===") + n = len(snap_dates) + late_cutoff = snap_dates[(2 * n) // 3] + late_set = {d for d in snap_dates if d >= late_cutoff} + print(f" Late-third cutoff date: {late_cutoff} (n={len(late_set)} periods)") + fwd_21 = _compute_forward_returns(snap_dates, price_panel, forward_days=FORWARD_TRADING_DAYS) + ics, sizes, periods = compute_factor_ics( + PRIMARY_SIGNAL, snap_dates, fwd_21, *panels, + period_filter=late_set, + insider_panel=insider_panel, + ) + agg = _aggregate(PRIMARY_SIGNAL, ics, sizes, periods) + _row(f"fwd=21d late-third", ics, sizes, periods, agg.t_stat) + + print( + "\n[*] No registry writes performed. Awaiting user approval before " + "Step 4 (registry + rolling registry + paper trader)." + ) + + +if __name__ == "__main__": + main() diff --git a/tests/test_insider_factors.py b/tests/test_insider_factors.py new file mode 100644 index 0000000..f9953b7 --- /dev/null +++ b/tests/test_insider_factors.py @@ -0,0 +1,227 @@ +"""TDD tests for compose_insider_signal — written before implementing insider.py. + +Run: .venv/Scripts/python.exe -m pytest tests/test_insider_factors.py -v +""" +from __future__ import annotations + +from datetime import date + +import pandas as pd +import pytest + +from nexus.signals.factors.insider import compose_insider_signal + +# --------------------------------------------------------------------------- +# Fixture helpers +# --------------------------------------------------------------------------- + +def _tx( + ticker: str, + insider_cik: str, + transaction_date: date, + filed_at: date, + transaction_code: str, + shares: float, + price: float, + notional: float, +) -> dict: + return { + "ticker": ticker, + "insider_cik": insider_cik, + "transaction_date": transaction_date, + "filed_at": filed_at, + "transaction_code": transaction_code, + "shares_traded": shares, + "price_per_share": price, + "notional_usd": notional, + } + + +def _price(ticker: str, as_of: date, adj_close: float) -> dict: + return {"ticker": ticker, "date": as_of, "adj_close": adj_close} + + +# --------------------------------------------------------------------------- +# Test 1 — open-market purchase (P) by opportunistic insider → positive z-score +# --------------------------------------------------------------------------- + +def test_open_market_purchase_produces_positive_signal(): + as_of = date(2022, 6, 30) + # Opportunistic buyer (no same-month history in prior 3 years) + # AMD included with no purchases to allow a valid 2-ticker z-score + txns = pd.DataFrame([ + _tx("NVDA", "111", date(2022, 6, 15), date(2022, 6, 17), "P", 1000, 100.0, 100_000.0), + ]) + prices = pd.DataFrame([ + _price("NVDA", as_of, 100.0), + _price("AMD", as_of, 50.0), + ]) + result = compose_insider_signal(txns, prices, as_of, lookback_days=63) + row = result[result["ticker"] == "NVDA"] + assert not row.empty + assert row["signal"].iloc[0] > 0 + + +# --------------------------------------------------------------------------- +# Test 2 — sale (S) is excluded from signal +# --------------------------------------------------------------------------- + +def test_sale_excluded_from_signal(): + as_of = date(2022, 6, 30) + txns = pd.DataFrame([ + _tx("NVDA", "111", date(2022, 6, 15), date(2022, 6, 17), "S", 1000, 100.0, 100_000.0), + ]) + prices = pd.DataFrame([_price("NVDA", as_of, 100.0)]) + result = compose_insider_signal(txns, prices, as_of, lookback_days=63) + row = result[result["ticker"] == "NVDA"] + if not row.empty: + assert row["signal"].iloc[0] == pytest.approx(0.0) + + +# --------------------------------------------------------------------------- +# Test 3 — award (A) is excluded from signal +# --------------------------------------------------------------------------- + +def test_award_excluded(): + as_of = date(2022, 6, 30) + txns = pd.DataFrame([ + _tx("NVDA", "111", date(2022, 6, 15), date(2022, 6, 17), "A", 1000, 0.0, 0.0), + ]) + prices = pd.DataFrame([_price("NVDA", as_of, 100.0)]) + result = compose_insider_signal(txns, prices, as_of, lookback_days=63) + row = result[result["ticker"] == "NVDA"] + if not row.empty: + assert row["signal"].iloc[0] == pytest.approx(0.0) + + +# --------------------------------------------------------------------------- +# Test 4 — routine insider (same calendar month in each prior 3 years) is excluded +# --------------------------------------------------------------------------- + +def test_routine_insider_excluded(): + as_of = date(2022, 6, 30) + # Insider "222" traded in June 2019, 2020, 2021, 2022 → routine pattern + rows = [ + _tx("NVDA", "222", date(2019, 6, 10), date(2019, 6, 12), "P", 100, 10.0, 1_000.0), + _tx("NVDA", "222", date(2020, 6, 10), date(2020, 6, 12), "P", 100, 10.0, 1_000.0), + _tx("NVDA", "222", date(2021, 6, 10), date(2021, 6, 12), "P", 100, 10.0, 1_000.0), + _tx("NVDA", "222", date(2022, 6, 10), date(2022, 6, 12), "P", 1000, 100.0, 100_000.0), + ] + txns = pd.DataFrame(rows) + prices = pd.DataFrame([ + _price("NVDA", as_of, 100.0), + _price("AMD", as_of, 50.0), + ]) + result = compose_insider_signal(txns, prices, as_of, lookback_days=63) + row = result[result["ticker"] == "NVDA"] + # Routine insider → excluded → signal = 0.0 + if not row.empty: + assert row["signal"].iloc[0] == pytest.approx(0.0) + + +# --------------------------------------------------------------------------- +# Test 5 — opportunistic insider (irregular history) is included +# --------------------------------------------------------------------------- + +def test_opportunistic_insider_included(): + as_of = date(2022, 6, 30) + # Insider "333" traded in Jan 2019, Nov 2020, Mar 2021 — NOT same month each year + # For June 2022 transaction: check June 2019, June 2020, June 2021 → all absent → OPPORTUNISTIC + rows = [ + _tx("NVDA", "333", date(2019, 1, 10), date(2019, 1, 12), "P", 100, 10.0, 1_000.0), + _tx("NVDA", "333", date(2020, 11, 10), date(2020, 11, 12), "P", 100, 10.0, 1_000.0), + _tx("NVDA", "333", date(2021, 3, 10), date(2021, 3, 12), "P", 100, 10.0, 1_000.0), + _tx("NVDA", "333", date(2022, 6, 10), date(2022, 6, 12), "P", 1000, 100.0, 100_000.0), + ] + txns = pd.DataFrame(rows) + prices = pd.DataFrame([ + _price("NVDA", as_of, 100.0), + _price("AMD", as_of, 50.0), # no purchases → raw=0.0, enables valid z-score + ]) + result = compose_insider_signal(txns, prices, as_of, lookback_days=63) + row = result[result["ticker"] == "NVDA"] + assert not row.empty + assert row["signal"].iloc[0] > 0 + + +# --------------------------------------------------------------------------- +# Test 6 — point-in-time: filed_at > as_of → excluded +# --------------------------------------------------------------------------- + +def test_point_in_time_uses_filed_at(): + as_of = date(2022, 1, 4) + # trade_date=Jan 3, filed_at=Jan 5 → NOT public as of Jan 4 → must be excluded + txns = pd.DataFrame([ + _tx("NVDA", "444", date(2022, 1, 3), date(2022, 1, 5), "P", 1000, 100.0, 100_000.0), + ]) + prices = pd.DataFrame([_price("NVDA", as_of, 100.0)]) + result = compose_insider_signal(txns, prices, as_of, lookback_days=63) + row = result[result["ticker"] == "NVDA"] + if not row.empty: + assert row["signal"].iloc[0] == pytest.approx(0.0) + + +# --------------------------------------------------------------------------- +# Test 7 — same notional: small-cap gets larger z-score than large-cap +# --------------------------------------------------------------------------- + +def test_normalization_by_market_cap(): + as_of = date(2022, 6, 30) + # Both tickers use _FLOAT_FALLBACK_M; lower adj_close → smaller market-cap proxy + # → same notional_usd gives a larger normalised signal for the cheaper stock + rows = [ + _tx("SMLL", "555", date(2022, 6, 1), date(2022, 6, 3), "P", 10_000, 10.0, 100_000.0), + _tx("LRGE", "666", date(2022, 6, 1), date(2022, 6, 3), "P", 100, 1000.0, 100_000.0), + ] + txns = pd.DataFrame(rows) + prices = pd.DataFrame([ + _price("SMLL", as_of, 10.0), + _price("LRGE", as_of, 1000.0), + ]) + result = compose_insider_signal(txns, prices, as_of, lookback_days=63) + small_sig = result[result["ticker"] == "SMLL"]["signal"].iloc[0] + large_sig = result[result["ticker"] == "LRGE"]["signal"].iloc[0] + assert small_sig > large_sig + + +# --------------------------------------------------------------------------- +# Test 8 — no qualifying purchases → 0.0 (not NaN) +# --------------------------------------------------------------------------- + +def test_no_purchases_returns_zero(): + as_of = date(2022, 6, 30) + txns = pd.DataFrame([], columns=[ + "ticker", "insider_cik", "transaction_date", "filed_at", + "transaction_code", "shares_traded", "price_per_share", "notional_usd", + ]) + prices = pd.DataFrame([_price("NVDA", as_of, 100.0)]) + result = compose_insider_signal(txns, prices, as_of, lookback_days=63) + row = result[result["ticker"] == "NVDA"] + if not row.empty: + val = row["signal"].iloc[0] + assert not pd.isna(val) + assert val == pytest.approx(0.0) + + +# --------------------------------------------------------------------------- +# Test 9 — insider with < 3 years of history defaults to opportunistic +# --------------------------------------------------------------------------- + +def test_insider_with_under_3_years_history_classified_opportunistic(): + as_of = date(2022, 6, 30) + # Insider "777" has only 2 years of same-month history (June 2020, June 2021) + # Cannot satisfy "same month in ALL prior 3 years" → classified OPPORTUNISTIC + rows = [ + _tx("NVDA", "777", date(2020, 6, 10), date(2020, 6, 12), "P", 100, 10.0, 1_000.0), + _tx("NVDA", "777", date(2021, 6, 10), date(2021, 6, 12), "P", 100, 10.0, 1_000.0), + _tx("NVDA", "777", date(2022, 6, 10), date(2022, 6, 12), "P", 1000, 100.0, 100_000.0), + ] + txns = pd.DataFrame(rows) + prices = pd.DataFrame([ + _price("NVDA", as_of, 100.0), + _price("AMD", as_of, 50.0), # no purchases → enables valid z-score + ]) + result = compose_insider_signal(txns, prices, as_of, lookback_days=63) + row = result[result["ticker"] == "NVDA"] + assert not row.empty + assert row["signal"].iloc[0] > 0