-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathingest.py
More file actions
74 lines (59 loc) · 1.87 KB
/
ingest.py
File metadata and controls
74 lines (59 loc) · 1.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
"""Master ingestion script for the ThreatGraph core pipeline."""
from __future__ import annotations
import os
import sys
import time
sys.path.insert(0, os.path.dirname(__file__))
from src.config import ATTACK_STIX_PATH
from src.database import get_db, get_stats, init_schema
from src.ingestion.asset_seeder import seed_assets
from src.ingestion.attack_loader import ingest_attack
from src.ingestion.cve_correlator import correlate_cves
from src.ingestion.software_linker import link_software_versions
def run_full_ingest(db) -> dict:
"""Execute the full ingest pipeline in the correct order."""
schema = init_schema(db)
attack_map = ingest_attack(db, ATTACK_STIX_PATH)
assets = seed_assets(db, reset=True)
software_links = link_software_versions(db)
cves = correlate_cves(db, use_cache=True, max_results_per_cpe=50)
return {
"schema": schema,
"attack": {"objects": len(attack_map)},
"assets": assets,
"software_links": software_links,
"cves": cves,
}
def main():
start = time.time()
print("=" * 60)
print(" ThreatGraph — Full Data Ingestion Pipeline")
print("=" * 60)
db = get_db()
print("\n✓ Connected to SurrealDB")
results = run_full_ingest(db)
print("\n── Ingest Summary ──")
for stage, payload in results.items():
print(f"{stage:>16}: {payload}")
stats = get_stats(db)
print("\n── Graph Stats ──")
for table in (
"technique",
"tactic",
"threat_group",
"software",
"asset",
"software_version",
"cve",
"employs",
"uses",
"runs",
"linked_to_software",
"has_cve",
"affects",
):
print(f"{table:>16}: {stats.get(table, 0)}")
print(f"\nTotal time: {time.time() - start:.1f}s")
print("=" * 60)
if __name__ == "__main__":
main()