Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 17 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ A LangGraph-based agent that autonomously discovers, scores, and tracks job oppo
## What it does

1. **Loads context** β€” reads your CV files (`query/resume/`), generates search queries deterministically from `config/search_config.yaml` (positions Γ— locations cross-product), and loads target companies with their ATS hints
2. **Searches for jobs** β€” runs queries via LLM-powered web search (Claude web search tool); searches known company ATS boards (Greenhouse, Lever, Ashby) via unauthenticated HTTP β€” zero LLM tokens for ATS queries; semantic deduplication across all sources removes duplicate postings
3. **Scores matches** β€” batch-scores each posting against your CVs using an LLM; keeps only jobs above a configurable threshold
2. **Searches for jobs** β€” one directive LLM prompt returns job URLs only (no fabricated descriptions); Tavily extract validates each URL and pulls real posting content (hallucinated or unreachable URLs are dropped); company ATS boards (Greenhouse, Lever, Ashby) are queried via direct API β€” zero LLM tokens for ATS; all results deduplicated and checkpointed to `query/jobs_found.jsonl`
3. **Scores matches** β€” single LLM call scores all jobs against your CV; keeps only jobs above a configurable threshold
4. **Stores results** β€” deduplicates by content-hash and writes to local JSON and/or cloud storage (Google Drive, OneDrive, Dropbox)
5. **Notifies you** β€” sends a digest to Telegram, Slack, email, or WhatsApp

Expand All @@ -22,11 +22,11 @@ flowchart TD
C -- no --> E{job_queries.md?}
D --> E
E -- no --> F[generate_queries\npositions Γ— locations from search_config]
E -- yes --> G[search_jobs\nanthropicweb LLM search]
E -- yes --> G[search_jobs\nLLM directive β†’ Tavily extract]
F --> G
G --> H[search_companies\nATS direct + LLM search]
H --> I[aggregate_jobs\ndedup Β· cap Β· checkpoint]
I --> J2[analyze_jobs\nbatch LLM scoring]
G --> H[search_companies\nATS direct API]
H --> I[aggregate_jobs\ndedup Β· cap Β· jobs_found.jsonl]
I --> J2[analyze_jobs\nsingle LLM scoring call]
J2 --> J[store_results\nlocal JSON + cloud sync]
J --> K{notifications\nenabled?}
K -- yes --> L[send_notifications\nTelegram Β· Slack Β· email]
Expand Down Expand Up @@ -63,7 +63,9 @@ python3 -m venv .venv
# Install the Infisical CLI: https://infisical.com/docs/cli/overview
# Then add secrets to your Infisical project (env: dev):
# TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID β€” for notifications
# FRANCE_TRAVAIL_CLIENT_ID/SECRET, ADZUNA_APP_ID/KEY β€” for job boards (optional)
# TAVILY_API_KEY β€” for URL validation and extraction (required)
# FRANCE_TRAVAIL_CLIENT_ID/SECRET β€” optional free job board API
# ADZUNA_APP_ID/KEY β€” optional free job board API

# 3. Add your CV
# Drop a PDF or .md file into query/resume/
Expand Down Expand Up @@ -95,10 +97,12 @@ llm:

search:
connectors:
- name: france_travail # free API β€” francetravail.io (optional)
- name: adzuna # free API β€” developer.adzuna.com (optional)
- name: anthropic_web # LLM web search β€” primary connector
max_results_per_query: 4 # 4 queries Γ— 4 results β‰ˆ 15 total before dedup
- name: anthropic_web # primary: LLM directive search β†’ Tavily extract
max_results_per_query: 4
- name: france_travail # optional free API β€” francetravail.io
enabled: false
- name: adzuna # optional free API β€” developer.adzuna.com
enabled: false

storage:
provider: local # local | google_drive | onedrive | dropbox
Expand Down Expand Up @@ -185,7 +189,8 @@ Per-model and per-node totals are stored on the final state as `token_usage` (sh
|---|---|
| Orchestration | LangGraph |
| LLM interface | LangChain (Anthropic Claude / OpenAI) |
| Job boards | France Travail, Adzuna (optional), Claude web search (primary) |
| Search | Claude web search (directive prompt) + Tavily extract (validation + content) |
| Job boards | France Travail, Adzuna (optional) |
| ATS boards | Greenhouse, Lever, Ashby (unauthenticated HTTP) |
| Terminal UI | Rich |
| Storage | Local JSON (Google Drive / OneDrive / Dropbox) |
Expand Down
116 changes: 36 additions & 80 deletions agent/nodes/analyze_jobs.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,56 @@
"""Score every job against every CV; keep those above ``min_score``.
"""Score every job in ``query/jobs_found.jsonl`` against every CV; keep those above ``min_score``.

Three scoring modes are supported, selected via ``config.yaml -> scoring.mode``:
Input: ``query/jobs_found.jsonl`` β€” written by aggregate_jobs, one job per line.
Output: ``query/jobs_scored.jsonl`` β€” same lines with ``score``, ``best_cv``,
``recommendation``, and ``reasoning`` appended.

- ``llm`` β€” Every job scored by the LLM. Highest quality, highest cost.
- ``hybrid`` β€” LLM bootstraps a per-CV regex profile, then static scoring
handles most jobs and only borderline ones go to the LLM.
Best price/performance for daily runs.
- ``static`` β€” Pure regex scoring against a pre-existing profile. Zero LLM
calls. Requires a profile to already exist (run hybrid once
to bootstrap one).

Two LLM handles are built per run:
- ``search_llm`` β€” cheap model used for CV compression
- ``scoring_llm`` β€” capable model used for actual scoring
Scoring is a single LLM call for all jobs (no batching, no hybrid/static modes).
The compressed CV cache is used so CV compression is paid exactly once per CV.
"""
import json
import logging
from pathlib import Path

from agent.state import AgentState
from providers.scoring.cv_cache import get_or_compress
from providers.scoring.llm_scorer import score_jobs_batch

logger = logging.getLogger(__name__)

_JOBS_FILE = Path("query/jobs_found.jsonl")
_SCORED_FILE = Path("query/jobs_scored.jsonl")


def _read_jobs_jsonl() -> list[dict]:
if not _JOBS_FILE.exists():
return []
with _JOBS_FILE.open(encoding="utf-8") as f:
return [json.loads(line) for line in f if line.strip()]


def _write_scored_jsonl(jobs: list[dict]) -> None:
lines = [json.dumps(j, ensure_ascii=False) for j in jobs]
_SCORED_FILE.write_text("\n".join(lines) + ("\n" if lines else ""), encoding="utf-8")


def run(state: AgentState) -> AgentState:
"""Compress CVs, score every raw job, and return ``scored_jobs`` (β‰₯ ``min_score``)."""
"""Read jobs from JSONL checkpoint, score in one LLM call, write scored JSONL."""
errors = list(state.get("errors", []))
run_log = list(state.get("run_log", []))

raw_jobs = state.get("raw_jobs", [])
cvs = state.get("cvs", [])
cfg = state["config"]
scoring_cfg = cfg.get("scoring", {})
min_score = scoring_cfg.get("min_score", 70)

# Early-exit short-circuits β€” these don't count as errors.
# Read from the JSONL checkpoint written by aggregate_jobs.
# Fall back to state["raw_jobs"] for test runs that skip aggregate_jobs.
raw_jobs = _read_jobs_jsonl()
if not raw_jobs:
raw_jobs = state.get("raw_jobs", [])
if raw_jobs:
run_log.append("analyze_jobs: JSONL checkpoint not found β€” using state raw_jobs")

if not raw_jobs:
run_log.append("No jobs to analyze")
return {**state, "scored_jobs": [], "errors": errors, "run_log": run_log}
Expand All @@ -43,14 +59,10 @@ def run(state: AgentState) -> AgentState:
errors.append("No CVs loaded β€” cannot score jobs")
return {**state, "scored_jobs": [], "errors": errors, "run_log": run_log}

# Build both LLM handles up front so configuration errors surface here
# rather than mid-scoring.
from providers.llm.factory import build_llm
search_llm = build_llm(cfg["llm"], task="search")
scoring_llm = build_llm(cfg["llm"], task="scoring")

# Compress every CV via the disk-backed cache β€” repeated runs against the
# same CV pay the LLM cost exactly once.
compressed_cvs: list[dict] = []
for cv in cvs:
try:
Expand All @@ -59,23 +71,13 @@ def run(state: AgentState) -> AgentState:
run_log.append(f"Compressed CV: {cv['name']}")
except Exception as e:
errors.append(f"CV compression failed for '{cv['name']}': {e}")
# Fall back to the full CV β€” scoring will be slower but correct.
compressed_cvs.append(cv)

mode = scoring_cfg.get("mode", "llm")
run_log.append(f"Scoring mode: {mode}")

scored_jobs: list[dict]
scored_jobs = score_jobs_batch(scoring_llm, raw_jobs, compressed_cvs, scoring_cfg)
scored_jobs.sort(key=lambda j: j["score"], reverse=True)

if mode == "static":
scored_jobs = _score_static(raw_jobs, cvs, scoring_cfg, errors)
elif mode == "hybrid":
scored_jobs = _score_hybrid(scoring_llm, raw_jobs, cvs, compressed_cvs, scoring_cfg)
else: # "llm" β€” the default
scored_jobs = score_jobs_batch(
scoring_llm, raw_jobs, compressed_cvs, scoring_cfg
)
scored_jobs.sort(key=lambda j: j["score"], reverse=True)
_write_scored_jsonl(scored_jobs)
run_log.append(f"analyze_jobs: wrote {len(scored_jobs)} scored jobs to {_SCORED_FILE}")

run_log.append(
f"Analysis complete: {len(scored_jobs)}/{len(raw_jobs)} "
Expand All @@ -87,49 +89,3 @@ def run(state: AgentState) -> AgentState:
)

return {**state, "scored_jobs": scored_jobs, "errors": errors, "run_log": run_log}


def _score_static(
raw_jobs: list[dict],
cvs: list[dict],
scoring_cfg: dict,
errors: list[str],
) -> list[dict]:
"""Score with the regex scorer only. Requires a profile per CV."""
from providers.scoring.profile_store import content_hash, load_profile
from providers.scoring.static_scorer import score_jobs_static

profiles_dir = scoring_cfg.get("profiles_dir", "scoring_profiles")
profiles: dict[str, dict] = {}
for cv in cvs:
cv_hash = content_hash(cv["content"])
profile = load_profile(cv["name"], cv_hash, profiles_dir)
if profile is None:
# Static mode can't bootstrap by itself β€” surface this so the
# user knows to run hybrid mode at least once.
errors.append(
f"No valid scoring profile for '{cv['name']}' β€” "
"run with mode: hybrid first to bootstrap"
)
else:
profiles[cv["name"]] = profile

if not profiles:
return []

scored = score_jobs_static(raw_jobs, profiles, scoring_cfg)
scored.sort(key=lambda j: j["score"], reverse=True)
return scored


def _score_hybrid(
scoring_llm,
raw_jobs: list[dict],
cvs: list[dict],
compressed_cvs: list[dict],
scoring_cfg: dict,
) -> list[dict]:
"""Score with the hybrid scorer (regex + LLM rescoring at the band edges)."""
from providers.scoring.hybrid_scorer import HybridScorer

return HybridScorer(scoring_llm, cvs, compressed_cvs, scoring_cfg).score(raw_jobs)
119 changes: 106 additions & 13 deletions agent/nodes/search_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,90 @@ def _make_job_id(job: dict) -> str:
return hashlib.sha256(key.encode()).hexdigest()[:16]


# ── Directive search (anthropic_web) ─────────────────────────────────────────

_DIRECTIVE_TARGET = 30 # jobs we want after Tavily filtering
_DIRECTIVE_LLM_MAX = 50 # URLs we ask the LLM for (buffer for Tavily drops)


def _get_positions(state: AgentState) -> list[str]:
"""Collect unique non-empty position strings from the cvs config block."""
# cvs lives at config root (from search_config.yaml), not under config.search
cvs_cfg = state["config"].get("cvs", {})
seen: set[str] = set()
positions: list[str] = []
for titles in cvs_cfg.values():
for t in (titles or []):
if t and t.strip() and t.strip() not in seen:
seen.add(t.strip())
positions.append(t.strip())
return positions


def _run_directive_search(
state: AgentState,
llm,
search_cfg: dict,
run_log: list,
errors: list,
) -> list[dict]:
"""Two-step search for anthropic_web: LLM discovers URLs, Tavily validates them.

Step 1 β€” search: LLM returns up to _DIRECTIVE_LLM_MAX URL candidates
as {url, source, found_in_snippet}.
Step 2 β€” validate: Tavily extract drops hallucinated/unreachable URLs and
replaces LLM snippets with real posting content.
"""
from providers.search.url_validator import validate_and_enrich
from providers.search.web_search import AnthropicWebSearchProvider

positions = _get_positions(state)
# locations also lives at config root
locations: list[str] = state["config"].get("locations", ["Paris"])
companies: list[str] = state.get("companies", [])
hints: dict = state.get("company_hints", {})

run_log.append(
f"[anthropic_web] search: {positions} Γ— {locations}, "
f"{len(companies)} companies, asking LLM for {_DIRECTIVE_LLM_MAX} URLs"
)

# ── Step 1: search ────────────────────────────────────────────────────────
try:
provider = AnthropicWebSearchProvider(llm, search_cfg)
candidates = provider.search_all(
positions=positions,
locations=locations,
companies=companies,
hints=hints,
max_results=_DIRECTIVE_LLM_MAX,
)
run_log.append(f"[anthropic_web] LLM returned {len(candidates)} URL candidates")
logger.info("[anthropic_web] LLM returned %d candidates", len(candidates))
except Exception as e:
errors.append(f"Directive search (LLM) failed: {e}")
logger.error("Directive search (LLM) failed: %s", e)
return []

if not candidates:
run_log.append("[anthropic_web] No URL candidates β€” skipping Tavily validation")
return []

# ── Step 2: validate ─────────────────────────────────────────────────────
run_log.append(f"[anthropic_web] validate: running Tavily extract on {len(candidates)} URLs")
try:
jobs = validate_and_enrich(candidates, search_cfg, max_results=_DIRECTIVE_TARGET)
run_log.append(
f"[anthropic_web] validate: {len(jobs)}/{len(candidates)} URLs passed Tavily"
)
logger.info("[anthropic_web] %d/%d URLs passed Tavily", len(jobs), len(candidates))
return jobs
except Exception as e:
errors.append(f"Directive search (Tavily validate) failed: {e}")
logger.error("Directive search (Tavily validate) failed: %s", e)
return []


# ── Graph node ───────────────────────────────────────────────────────────────

def run(state: AgentState) -> AgentState:
Expand Down Expand Up @@ -401,19 +485,28 @@ def run(state: AgentState) -> AgentState:

recency_days = search_cfg.get("recency_days", 3)

# Primary pass β€” these are the connectors we always try.
raw_jobs.extend(_run_parallel(primary, queries, llm, search_cfg, run_log, errors, recency_days))

# Fallback pass β€” only run when primary returned nothing. This is the
# safety net for "all my API keys broke" type situations.
if fallbacks:
if raw_jobs:
skipped = [c["name"] for c in fallbacks]
run_log.append(f"Fallback connectors skipped (primary found results): {skipped}")
logger.info("Fallback connectors skipped: %s", skipped)
else:
run_log.append("Primary connectors returned 0 results β€” activating fallbacks")
raw_jobs.extend(_run_parallel(fallbacks, queries, llm, search_cfg, run_log, errors, recency_days))
# anthropic_web gets one comprehensive directive call instead of N queries.
# All other connectors (france_travail, adzuna, …) keep the parallel loop.
directive_cfgs = [c for c in primary if c["name"] == "anthropic_web"]
loop_primary = [c for c in primary if c["name"] != "anthropic_web"]
directive_fallbacks = [c for c in fallbacks if c["name"] == "anthropic_web"]
loop_fallbacks = [c for c in fallbacks if c["name"] != "anthropic_web"]

if directive_cfgs:
raw_jobs.extend(_run_directive_search(state, llm, search_cfg, run_log, errors))

raw_jobs.extend(_run_parallel(loop_primary, queries, llm, search_cfg, run_log, errors, recency_days))

# Fallback pass β€” only runs when primary produced nothing.
if not raw_jobs:
if directive_fallbacks:
raw_jobs.extend(_run_directive_search(state, llm, search_cfg, run_log, errors))
if loop_fallbacks:
raw_jobs.extend(_run_parallel(loop_fallbacks, queries, llm, search_cfg, run_log, errors, recency_days))
elif fallbacks:
skipped = [c["name"] for c in fallbacks]
run_log.append(f"Fallback connectors skipped (primary found results): {skipped}")
logger.info("Fallback connectors skipped: %s", skipped)

# Drop month-old postings that slipped past API recency filters
raw_jobs = _filter_recent(raw_jobs)
Expand Down
Loading
Loading