Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions agent/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
END
"""
import logging
import time
from typing import Any, Callable

from langgraph.graph import END, StateGraph
Expand Down Expand Up @@ -86,28 +87,47 @@ def _build_live_snapshot(
Reads the live token-usage snapshot lazily so the live page sees the same
cost numbers the TUI footer is showing.
"""
# Compute per-node elapsed times from wall-clock records kept by _safe().
# Running nodes get a live elapsed; completed nodes get their final time.
node_timings: dict[str, float] = {}
for n, end_t in _node_end_times.items():
start_t = _node_start_times.get(n)
if start_t is not None:
node_timings[n] = round(end_t - start_t, 2)

return {
"run_id": state.get("run_id", "unknown"),
"timestamp": state.get("timestamp", ""),
"run_start_time": state.get("run_start_time"), # Unix ts for JS duration counter
"status": status,
"current_node": current_node,
"node_status": dict(node_status),
"node_timings": {}, # populated by run.py — graph has no clock
"node_timings": node_timings,
"node_start_times": dict(_node_start_times), # JS uses these for live per-node timers
"kpis": {
"raw_jobs": len(state.get("raw_jobs", [])),
"scored_jobs": len(state.get("scored_jobs", [])),
"discarded_jobs": len(state.get("discarded_jobs", [])),
"stored_count": state.get("stored_count", 0),
# Per-node jobs-treated counts shown in the pipeline table
"jobs_treated": {
"search_jobs": len(state.get("raw_jobs", [])),
"search_companies": len(state.get("raw_jobs", [])),
"analyze_jobs": len(state.get("scored_jobs", [])) + len(state.get("discarded_jobs", [])),
"store_results": state.get("stored_count", 0),
},
},
"token_usage": usage_tracker.snapshot(),
"errors": list(state.get("errors", [])),
"scored_jobs": list(state.get("scored_jobs", [])),
"discarded_jobs": list(state.get("discarded_jobs", [])),
}


# Per-graph-build node-status accumulator. Reset by ``build_graph`` so each
# pipeline run starts from a clean slate; the wrapper mutates it in place as
# nodes complete.
# Per-graph-build accumulators. Reset by ``build_graph`` each run.
_node_status: dict[str, str] = {}
_node_start_times: dict[str, float] = {} # Unix timestamp when node started
_node_end_times: dict[str, float] = {} # Unix timestamp when node finished


# ── Safety wrapper ───────────────────────────────────────────────────────────
Expand All @@ -127,6 +147,7 @@ def _safe(node_fn, name: str):
def wrapper(state: AgentState) -> AgentState:
usage_tracker.set_node(name)
_node_status[name] = "running"
_node_start_times[name] = time.time()
# Push the "running" snapshot before the node executes so the live page
# sees the transition immediately, not just at completion.
_push_live_snapshot(state, name, status="running")
Expand All @@ -140,6 +161,7 @@ def wrapper(state: AgentState) -> AgentState:
# under mypy. Cast back so the wrapper signature stays honest.
crashed: AgentState = {**state, "errors": errors} # type: ignore[typeddict-item]
_node_status[name] = "error"
_node_end_times[name] = time.time()
_push_live_snapshot(crashed, name, status="running")
return crashed
finally:
Expand All @@ -148,6 +170,7 @@ def wrapper(state: AgentState) -> AgentState:
# Successful completion: mark done unless the node itself appended
# a new error (partial failure). The completed snapshot includes the
# node's own state mutations so the live page reflects fresh KPIs.
_node_end_times[name] = time.time()
merged = {**state, **result}
prev_err = len(state.get("errors", []))
new_err = len(merged.get("errors", []))
Expand Down Expand Up @@ -201,6 +224,8 @@ def build_graph() -> CompiledStateGraph:
# from a clean slate each run; otherwise re-running ``main()`` in a test
# would inherit "complete" markers from the previous run.
_node_status.clear()
_node_start_times.clear()
_node_end_times.clear()
for _n in _NODE_ORDER:
_node_status[_n] = "waiting"

Expand Down
33 changes: 22 additions & 11 deletions agent/nodes/analyze_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

_JOBS_FILE = Path("query/jobs_found.jsonl")
_SCORED_FILE = Path("query/jobs_scored.jsonl")
_DISCARDED_FILE = Path("query/jobs_discarded.jsonl")


def _read_jobs_jsonl() -> list[dict]:
Expand All @@ -28,9 +29,9 @@ def _read_jobs_jsonl() -> list[dict]:
return [json.loads(line) for line in f if line.strip()]


def _write_scored_jsonl(jobs: list[dict]) -> None:
def _write_jsonl(path: Path, jobs: list[dict]) -> None:
lines = [json.dumps(j, ensure_ascii=False) for j in jobs]
_SCORED_FILE.write_text("\n".join(lines) + ("\n" if lines else ""), encoding="utf-8")
path.write_text("\n".join(lines) + ("\n" if lines else ""), encoding="utf-8")


def run(state: AgentState) -> AgentState:
Expand All @@ -53,11 +54,11 @@ def run(state: AgentState) -> AgentState:

if not raw_jobs:
run_log.append("No jobs to analyze")
return {**state, "scored_jobs": [], "errors": errors, "run_log": run_log}
return {**state, "scored_jobs": [], "discarded_jobs": [], "errors": errors, "run_log": run_log}

if not cvs:
errors.append("No CVs loaded — cannot score jobs")
return {**state, "scored_jobs": [], "errors": errors, "run_log": run_log}
return {**state, "scored_jobs": [], "discarded_jobs": [], "errors": errors, "run_log": run_log}

from providers.llm.factory import build_llm
search_llm = build_llm(cfg["llm"], task="search")
Expand All @@ -73,19 +74,29 @@ def run(state: AgentState) -> AgentState:
errors.append(f"CV compression failed for '{cv['name']}': {e}")
compressed_cvs.append(cv)

scored_jobs = score_jobs_batch(scoring_llm, raw_jobs, compressed_cvs, scoring_cfg)
scored_jobs, discarded_jobs = score_jobs_batch(scoring_llm, raw_jobs, compressed_cvs, scoring_cfg)
scored_jobs.sort(key=lambda j: j["score"], reverse=True)
discarded_jobs.sort(key=lambda j: j["score"], reverse=True)

_write_scored_jsonl(scored_jobs)
run_log.append(f"analyze_jobs: wrote {len(scored_jobs)} scored jobs to {_SCORED_FILE}")
_write_jsonl(_SCORED_FILE, scored_jobs)
_write_jsonl(_DISCARDED_FILE, discarded_jobs)
run_log.append(
f"analyze_jobs: wrote {len(scored_jobs)} scored + {len(discarded_jobs)} discarded"
)

run_log.append(
f"Analysis complete: {len(scored_jobs)}/{len(raw_jobs)} "
f"jobs passed threshold (≥{min_score})"
f"jobs passed threshold (≥{min_score}), {len(discarded_jobs)} discarded"
)
logger.info(
"Analysis complete: %d/%d jobs above threshold",
len(scored_jobs), len(raw_jobs),
"Analysis complete: %d/%d jobs above threshold, %d discarded",
len(scored_jobs), len(raw_jobs), len(discarded_jobs),
)

return {**state, "scored_jobs": scored_jobs, "errors": errors, "run_log": run_log}
return {
**state,
"scored_jobs": scored_jobs,
"discarded_jobs": discarded_jobs,
"errors": errors,
"run_log": run_log,
}
32 changes: 32 additions & 0 deletions agent/nodes/store_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- Capture and persist ``sheet_url`` to ``.data/meta.json`` so notifications
sent on later runs can still link to the most recent sheet.
"""
import json
import logging
from datetime import datetime, timezone
from pathlib import Path
Expand All @@ -23,6 +24,30 @@
# test_notification.py and the notification node can reference them even when
# the current run produced none.
_META_CACHE = JsonCache(Path(".data/meta.json"))
_DISCARDED_STORE = Path(".data/discarded_jobs.jsonl")


def _store_discarded(jobs: list[dict], run_timestamp: str) -> None:
"""Append new discarded jobs to .data/discarded_jobs.jsonl, deduped by URL."""
_DISCARDED_STORE.parent.mkdir(parents=True, exist_ok=True)
existing_urls: set[str] = set()
if _DISCARDED_STORE.exists():
with _DISCARDED_STORE.open(encoding="utf-8") as f:
for line in f:
try:
existing_urls.add(json.loads(line).get("url", ""))
except json.JSONDecodeError:

Check notice

Code scanning / CodeQL

Empty except Note

'except' clause does nothing but pass and there is no explanatory comment.
pass
new_lines = []
for job in jobs:
if job.get("url", "") not in existing_urls:
job.setdefault("date_found", run_timestamp)
job["status"] = "discarded"
new_lines.append(json.dumps(job, ensure_ascii=False))
if new_lines:
with _DISCARDED_STORE.open("a", encoding="utf-8") as f:
f.write("\n".join(new_lines) + "\n")
logger.info("Stored %d new discarded jobs", len(new_lines))


def _update_meta(updates: dict) -> None:
Expand All @@ -39,6 +64,13 @@
run_log = list(state.get("run_log", []))

scored_jobs = state.get("scored_jobs", [])
discarded_jobs = state.get("discarded_jobs", [])

# Persist discarded jobs to a flat JSONL so they survive across runs and
# can be reviewed in the dashboard. Append-only with URL-based dedup.
if discarded_jobs:
_store_discarded(discarded_jobs, state.get("timestamp", ""))

if not scored_jobs:
run_log.append("No scored jobs to store")
return {**state, "stored_count": 0, "errors": errors, "run_log": run_log}
Expand Down
3 changes: 2 additions & 1 deletion agent/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ class AgentState(TypedDict):
raw_jobs: list[dict] # All jobs found before scoring

# ── Analysis (populated by analyze_jobs) ────────────────────────────────
scored_jobs: list[dict] # Jobs that passed the scoring threshold
scored_jobs: list[dict] # Jobs that passed the scoring threshold
discarded_jobs: list[dict] # Jobs scored below threshold — real score + reason kept

# ── Output (populated by store_results and send_notifications) ──────────
stored_count: int
Expand Down
3 changes: 3 additions & 0 deletions monitoring/web_monitoring/live_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,17 @@
_EMPTY_STATE: dict = {
"run_id": "—",
"timestamp": "",
"run_start_time": None, # Unix timestamp — JS uses this for the live duration counter
"status": "running",
"current_node": None,
"node_status": {},
"node_timings": {},
"node_start_times": {}, # Unix timestamps — JS uses these for per-node live timers
"kpis": {},
"token_usage": {},
"errors": [],
"scored_jobs": [],
"discarded_jobs": [],
}


Expand Down
Loading
Loading