Skip to content
Merged
29 changes: 17 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ A LangGraph-based agent that autonomously discovers, scores, and tracks job oppo
## What it does

1. **Loads context** — reads your CV files (`query/resume/`), generates search queries deterministically from `config/search_config.yaml` (positions × locations cross-product), and loads target companies with their ATS hints
2. **Searches for jobs** — runs queries via LLM-powered web search (Claude web search tool); searches known company ATS boards (Greenhouse, Lever, Ashby) via unauthenticated HTTP — zero LLM tokens for ATS queries; semantic deduplication across all sources removes duplicate postings
3. **Scores matches** — batch-scores each posting against your CVs using an LLM; keeps only jobs above a configurable threshold
2. **Searches for jobs** — one directive LLM prompt returns job URLs only (no fabricated descriptions); Tavily extract validates each URL and pulls real posting content (hallucinated or unreachable URLs are dropped); company ATS boards (Greenhouse, Lever, Ashby) are queried via direct API — zero LLM tokens for ATS; all results deduplicated and checkpointed to `query/jobs_found.jsonl`
3. **Scores matches** — single LLM call scores all jobs against your CV; keeps only jobs above a configurable threshold
4. **Stores results** — deduplicates by content-hash and writes to local JSON and/or cloud storage (Google Drive, OneDrive, Dropbox)
5. **Notifies you** — sends a digest to Telegram, Slack, email, or WhatsApp

Expand All @@ -22,11 +22,11 @@ flowchart TD
C -- no --> E{job_queries.md?}
D --> E
E -- no --> F[generate_queries\npositions × locations from search_config]
E -- yes --> G[search_jobs\nanthropicweb LLM search]
E -- yes --> G[search_jobs\nLLM directive → Tavily extract]
F --> G
G --> H[search_companies\nATS direct + LLM search]
H --> I[aggregate_jobs\ndedup · cap · checkpoint]
I --> J2[analyze_jobs\nbatch LLM scoring]
G --> H[search_companies\nATS direct API]
H --> I[aggregate_jobs\ndedup · cap · jobs_found.jsonl]
I --> J2[analyze_jobs\nsingle LLM scoring call]
J2 --> J[store_results\nlocal JSON + cloud sync]
J --> K{notifications\nenabled?}
K -- yes --> L[send_notifications\nTelegram · Slack · email]
Expand Down Expand Up @@ -63,7 +63,9 @@ python3 -m venv .venv
# Install the Infisical CLI: https://infisical.com/docs/cli/overview
# Then add secrets to your Infisical project (env: dev):
# TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID — for notifications
# FRANCE_TRAVAIL_CLIENT_ID/SECRET, ADZUNA_APP_ID/KEY — for job boards (optional)
# TAVILY_API_KEY — for URL validation and extraction (required)
# FRANCE_TRAVAIL_CLIENT_ID/SECRET — optional free job board API
# ADZUNA_APP_ID/KEY — optional free job board API

# 3. Add your CV
# Drop a PDF or .md file into query/resume/
Expand Down Expand Up @@ -95,10 +97,12 @@ llm:

search:
connectors:
- name: france_travail # free API — francetravail.io (optional)
- name: adzuna # free API — developer.adzuna.com (optional)
- name: anthropic_web # LLM web search — primary connector
max_results_per_query: 4 # 4 queries × 4 results ≈ 15 total before dedup
- name: anthropic_web # primary: LLM directive search → Tavily extract
max_results_per_query: 4
- name: france_travail # optional free API — francetravail.io
enabled: false
- name: adzuna # optional free API — developer.adzuna.com
enabled: false

storage:
provider: local # local | google_drive | onedrive | dropbox
Expand Down Expand Up @@ -185,7 +189,8 @@ Per-model and per-node totals are stored on the final state as `token_usage` (sh
|---|---|
| Orchestration | LangGraph |
| LLM interface | LangChain (Anthropic Claude / OpenAI) |
| Job boards | France Travail, Adzuna (optional), Claude web search (primary) |
| Search | Claude web search (directive prompt) + Tavily extract (validation + content) |
| Job boards | France Travail, Adzuna (optional) |
| ATS boards | Greenhouse, Lever, Ashby (unauthenticated HTTP) |
| Terminal UI | Rich |
| Storage | Local JSON (Google Drive / OneDrive / Dropbox) |
Expand Down
119 changes: 106 additions & 13 deletions agent/nodes/search_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,90 @@ def _make_job_id(job: dict) -> str:
return hashlib.sha256(key.encode()).hexdigest()[:16]


# ── Directive search (anthropic_web) ─────────────────────────────────────────

_DIRECTIVE_TARGET = 30 # jobs we want after Tavily filtering
_DIRECTIVE_LLM_MAX = 50 # URLs we ask the LLM for (buffer for Tavily drops)


def _get_positions(state: AgentState) -> list[str]:
"""Collect unique non-empty position strings from the cvs config block."""
# cvs lives at config root (from search_config.yaml), not under config.search
cvs_cfg = state["config"].get("cvs", {})
seen: set[str] = set()
positions: list[str] = []
for titles in cvs_cfg.values():
for t in (titles or []):
if t and t.strip() and t.strip() not in seen:
seen.add(t.strip())
positions.append(t.strip())
return positions


def _run_directive_search(
state: AgentState,
llm,
search_cfg: dict,
run_log: list,
errors: list,
) -> list[dict]:
"""Two-step search for anthropic_web: LLM discovers URLs, Tavily validates them.

Step 1 — search: LLM returns up to _DIRECTIVE_LLM_MAX URL candidates
as {url, source, found_in_snippet}.
Step 2 — validate: Tavily extract drops hallucinated/unreachable URLs and
replaces LLM snippets with real posting content.
"""
from providers.search.url_validator import validate_and_enrich
from providers.search.web_search import AnthropicWebSearchProvider

positions = _get_positions(state)
# locations also lives at config root
locations: list[str] = state["config"].get("locations", ["Paris"])
companies: list[str] = state.get("companies", [])
hints: dict = state.get("company_hints", {})

run_log.append(
f"[anthropic_web] search: {positions} × {locations}, "
f"{len(companies)} companies, asking LLM for {_DIRECTIVE_LLM_MAX} URLs"
)

# ── Step 1: search ────────────────────────────────────────────────────────
try:
provider = AnthropicWebSearchProvider(llm, search_cfg)
candidates = provider.search_all(
positions=positions,
locations=locations,
companies=companies,
hints=hints,
max_results=_DIRECTIVE_LLM_MAX,
)
run_log.append(f"[anthropic_web] LLM returned {len(candidates)} URL candidates")
logger.info("[anthropic_web] LLM returned %d candidates", len(candidates))
except Exception as e:
errors.append(f"Directive search (LLM) failed: {e}")
logger.error("Directive search (LLM) failed: %s", e)
return []

if not candidates:
run_log.append("[anthropic_web] No URL candidates — skipping Tavily validation")
return []

# ── Step 2: validate ─────────────────────────────────────────────────────
run_log.append(f"[anthropic_web] validate: running Tavily extract on {len(candidates)} URLs")
try:
jobs = validate_and_enrich(candidates, search_cfg, max_results=_DIRECTIVE_TARGET)
run_log.append(
f"[anthropic_web] validate: {len(jobs)}/{len(candidates)} URLs passed Tavily"
)
logger.info("[anthropic_web] %d/%d URLs passed Tavily", len(jobs), len(candidates))
return jobs
except Exception as e:
errors.append(f"Directive search (Tavily validate) failed: {e}")
logger.error("Directive search (Tavily validate) failed: %s", e)
return []


# ── Graph node ───────────────────────────────────────────────────────────────

def run(state: AgentState) -> AgentState:
Expand Down Expand Up @@ -401,19 +485,28 @@ def run(state: AgentState) -> AgentState:

recency_days = search_cfg.get("recency_days", 3)

# Primary pass — these are the connectors we always try.
raw_jobs.extend(_run_parallel(primary, queries, llm, search_cfg, run_log, errors, recency_days))

# Fallback pass — only run when primary returned nothing. This is the
# safety net for "all my API keys broke" type situations.
if fallbacks:
if raw_jobs:
skipped = [c["name"] for c in fallbacks]
run_log.append(f"Fallback connectors skipped (primary found results): {skipped}")
logger.info("Fallback connectors skipped: %s", skipped)
else:
run_log.append("Primary connectors returned 0 results — activating fallbacks")
raw_jobs.extend(_run_parallel(fallbacks, queries, llm, search_cfg, run_log, errors, recency_days))
# anthropic_web gets one comprehensive directive call instead of N queries.
# All other connectors (france_travail, adzuna, …) keep the parallel loop.
directive_cfgs = [c for c in primary if c["name"] == "anthropic_web"]
loop_primary = [c for c in primary if c["name"] != "anthropic_web"]
directive_fallbacks = [c for c in fallbacks if c["name"] == "anthropic_web"]
loop_fallbacks = [c for c in fallbacks if c["name"] != "anthropic_web"]

if directive_cfgs:
raw_jobs.extend(_run_directive_search(state, llm, search_cfg, run_log, errors))

raw_jobs.extend(_run_parallel(loop_primary, queries, llm, search_cfg, run_log, errors, recency_days))

# Fallback pass — only runs when primary produced nothing.
if not raw_jobs:
if directive_fallbacks:
raw_jobs.extend(_run_directive_search(state, llm, search_cfg, run_log, errors))
if loop_fallbacks:
raw_jobs.extend(_run_parallel(loop_fallbacks, queries, llm, search_cfg, run_log, errors, recency_days))
elif fallbacks:
skipped = [c["name"] for c in fallbacks]
run_log.append(f"Fallback connectors skipped (primary found results): {skipped}")
logger.info("Fallback connectors skipped: %s", skipped)

# Drop month-old postings that slipped past API recency filters
raw_jobs = _filter_recent(raw_jobs)
Expand Down
115 changes: 51 additions & 64 deletions providers/search/connectors/tavily.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@
"""Tavily connector — search and extract.
"""Tavily Search and Extract connector.
Comment thread
github-advanced-security[bot] marked this conversation as resolved.
Fixed

Provides two operations:
- ``search(query)`` — general web search returning snippets (legacy, kept
for any callers that haven't migrated to the Brave-search pipeline).
- ``extract(urls)`` — fetch and clean the full text of a list of URLs via
Tavily's /extract endpoint. Used by AdaptiveWebSearchProvider to get real
job-posting content after Brave search returns the URLs.
Two capabilities:
- ``search(query)`` — structured web search results (legacy).
- ``extract(urls)`` — fetch full page content via Tavily's /extract endpoint.
Used by ``url_validator`` to validate LLM-returned URLs
and pull real posting text.

Required env var: TAVILY_API_KEY
Required environment variable: TAVILY_API_KEY
"""
import hashlib
import logging
import os
import urllib.parse
from datetime import datetime, timezone

import requests as _requests

from providers.search.base import BaseSearchProvider

logger = logging.getLogger(__name__)

# Tavily extract processes up to 20 URLs per call.
_EXTRACT_BATCH = 20
_TAVILY_EXTRACT_URL = "https://api.tavily.com/extract"
_EXTRACT_BATCH_SIZE = 20


def _domain_hint(url: str) -> str:
Expand All @@ -32,17 +33,50 @@ def _domain_hint(url: str) -> str:


class TavilyConnector(BaseSearchProvider):
"""Tavily search + extract connector."""

# ── Search (legacy / direct use) ─────────────────────────────────────────
"""Tavily search and extract."""

def search(self, query: str, max_results: int = 10, **kwargs) -> list[dict]:
"""General web search — returns snippet-only job dicts.
def extract(self, urls: list[str]) -> dict[str, str]:
"""Fetch full page content for each URL via Tavily's /extract endpoint.

Prefer the Brave-search → extract pipeline for new code; this method
is kept so existing callers and tests continue to work.
Returns {url: raw_content} for URLs that Tavily could successfully parse.
Absent keys mean the URL was unreachable or the content was empty —
callers treat absence as a drop signal.
"""
api_key = os.environ.get("TAVILY_API_KEY", "")
if not api_key:
logger.warning("TavilyConnector.extract: TAVILY_API_KEY not set — skipping")
return {}

content_by_url: dict[str, str] = {}
for i in range(0, len(urls), _EXTRACT_BATCH_SIZE):
batch = urls[i : i + _EXTRACT_BATCH_SIZE]
try:
resp = _requests.post(
_TAVILY_EXTRACT_URL,
headers={"Authorization": f"Bearer {api_key}"},
json={"urls": batch},
timeout=30,
)
resp.raise_for_status()
data = resp.json()
for result in data.get("results", []):
url = result.get("url", "")
content = result.get("raw_content", "")
if url and content:
content_by_url[url] = content
failed = len(data.get("failed_results", []))
logger.info(
"Tavily extract batch %d-%d: %d ok, %d failed",
i, i + len(batch), len(data.get("results", [])), failed,
)
except Exception as e:
logger.error("Tavily extract batch %d-%d failed: %s", i, i + len(batch), e)

return content_by_url

def search(self, query: str, max_results: int = 10, **kwargs) -> list[dict]:
"""Legacy search — returns structured results as job dicts."""
api_key = os.environ.get("TAVILY_API_KEY", "")
if not api_key:
logger.warning("TavilyConnector: TAVILY_API_KEY not set — skipping")
return []
Expand All @@ -69,50 +103,3 @@ def search(self, query: str, max_results: int = 10, **kwargs) -> list[dict]:
})
logger.info("TavilyConnector.search: '%s' → %d results", query, len(jobs))
return jobs

# ── Extract ───────────────────────────────────────────────────────────────

def extract(self, urls: list[str]) -> list[dict]:
"""Fetch and return cleaned full-page text for each URL.

Calls Tavily's /extract endpoint in batches of up to 20 URLs.
Returns ``[{"url": str, "raw_content": str}]`` for successful extracts.
Failed URLs are logged and skipped.
"""
api_key = os.environ.get("TAVILY_API_KEY", "")
if not api_key:
logger.warning("TavilyConnector: TAVILY_API_KEY not set — cannot extract")
return []
if not urls:
return []

try:
from tavily import TavilyClient
client = TavilyClient(api_key=api_key)
except Exception as e:
logger.error("TavilyConnector: failed to init client: %s", e)
return []

results: list[dict] = []
for i in range(0, len(urls), _EXTRACT_BATCH):
batch = urls[i:i + _EXTRACT_BATCH]
try:
resp = client.extract(urls=batch)
for r in resp.get("results", []):
content = r.get("raw_content", "") or ""
if content.strip():
results.append({"url": r.get("url", ""), "raw_content": content})
failed = resp.get("failed_results", [])
if failed:
logger.warning(
"TavilyConnector.extract: %d URL(s) failed: %s",
len(failed), [f.get("url") for f in failed],
)
except Exception as e:
logger.error("TavilyConnector.extract: batch %d failed: %s", i, e)

logger.info(
"TavilyConnector.extract: %d/%d URLs extracted successfully",
len(results), len(urls),
)
return results
Loading
Loading