Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,9 @@ docker compose up --build
- **Crawler** (`cmd/crawler/main.go`): Crawls the Nostr network, stores follow relationships in SQLite, periodically calculates PageRank/TrustRank scores
- **API** (`cmd/api/main.go`): Read-only HTTP server that queries reputation data

### Database Access Modes
### Database Access

The repository supports two modes via `repository.New(path, mode)`:

- `ModeReadWrite`: For crawler - writes connections and scores
- `ModeReadOnly`: For API - optimized for concurrent reads with `PRAGMA query_only = ON`
`repository.New(path)` opens a single read-write handle. WAL mode allows multiple concurrent readers alongside a single writer; within a process `writeMu` serializes writers, and SQLite's own locks coordinate across the crawler and API processes. The connection pool is sized for 10 concurrent readers.

### Key Packages

Expand Down Expand Up @@ -82,3 +79,18 @@ Copy `config.example.yaml` to `config.yaml` (and `docker-compose.example.yml` to
- `crawler.request_interval_ms`: Milliseconds between requests per relay (default: 500)
- `crawler.num_contact_processors`: Number of contact event processors (default: 4)
- `crawler.num_profile_processors`: Number of profile event processors (default: 4)
- `vouch.weight`: Enables the feature (0 = disabled, default) and sets the weight of a vouch edge relative to a follow edge (1.0). Typical enabled value: `0.5`.

### Vouch & Report via Nostr Events (when `vouch.weight > 0`)

Vouches and reports are plain signed Nostr events, not a private API. They flow in two ways (both verify the event signature before storing):

1. **Crawler ingestion (pull)** — one subscription per relay carries two filters (a REQ OR's multiple filters): filter 1 is kind:3 (+kind:0 when search is on); filter 2 is the kind:30000 vouch set constrained by `#d` (scoped to that filter, so it doesn't affect 3/0). kind:1984 reports are fetched in a separate subscription, capped at the newest 50, so an append-only flood can't crowd out the replaceable events.
2. **`POST /event` (push)** — accepts a single signed event (kind 3 / 1984 / 30000). It is fire-and-forget: the event is queued and the request returns 202 immediately and unconditionally. Background workers then verify the signature, apply the anti-inflation admission rule (events from pubkeys with no TrustRank and not in `seed_pubkeys` are dropped), and persist. A full queue simply drops the event — the crawler ingests it from relays anyway. (The crawler path does not apply the admission rule — ranking already discounts untrusted sources.)

Shared parsing/storage lives in `internal/ingest` so both paths behave identically.

- **Vouch** = membership in the author's vouch set: a **NIP-51 follow set (kind:30000)** tagged `d=vouch` (regular follow sets with any other `d` are ignored; the identifier is deliberately generic so it can become a shared convention). Its `p` tags list the vouched pubkeys. Registers source→target as a vouch edge (weight `vouch.weight`, deduped against a follow from the same source). Vouches follow the **same lifecycle as follow edges** (`vouches.last_seen`, not active deletion): each set refreshes its edges' `last_seen`; a pubkey dropped from the set is not deleted but stops being refreshed and ages out via the same staleness window as follows (`StreamVouches` filters on the ranking cutoff). So revoking a vouch takes effect after the window, exactly like unfollowing.
- **Report** = a **kind:1984** (NIP-56) event targeting a **profile** (`p` tag, no `e` tag) with report type `spam` or `impersonation` (other types ignored). Applies a trust-weighted penalty to the target's final score: `final = raw * (1 - R/(R+F))` where R is the sum of reporter trust_scores and F is the sum of follower/voucher trust_scores.

No mutual exclusion — `vouches` and `reports` rows coexist independently. A source that both vouches for and reports the same target is not specially handled: the vouch adds flow and the report subtracts it at ranking time, which roughly cancels out. Stored in the `vouches` and `reports` tables (schema from migration v4).
20 changes: 15 additions & 5 deletions cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@ func main() {
log.Fatalf("Failed to load config: %v", err)
}

// Initialize repository in read-only mode
repo, err := repository.New(cfg.Database, repository.ModeReadOnly)
repo, err := repository.New(cfg.Database)
if err != nil {
log.Fatalf("Failed to initialize database: %v", err)
}
defer repo.Close()

log.Println("[API] Database initialized successfully (read-only mode)")
log.Println("[API] Database initialized successfully")

// Initialize cache
apiCache := cache.New(10*time.Minute, 10*time.Minute)

// Initialize handler with search config
h := handler.New(repo, apiCache, &cfg.Search)
// Initialize handler with search config and seed pubkeys (seeds always
// qualify for vouch/report submissions regardless of trust_score).
h := handler.New(repo, apiCache, &cfg.Search, cfg.SeedPubkeys)

// Setup static file system
staticFS, err := fs.Sub(staticFiles, "static")
Expand Down Expand Up @@ -87,6 +87,16 @@ func main() {
http.HandleFunc("/users/", middleware.CORS(h.User))
http.HandleFunc("/search", middleware.CORS(h.Search))

// Event ingestion endpoint. Accepts signed Nostr events (kind 3 / 1984 /
// 30000) as a push complement to the crawler; it queues the event and
// returns 202 immediately, processing it asynchronously. When vouch.weight
// <= 0 the feature is disabled: no route is registered and requests fall
// through to the SPA catch-all handler below.
if cfg.Vouch.Enabled() {
http.HandleFunc("/event", middleware.CORS(h.PostEvent))
log.Printf("[API] Event ingestion endpoint enabled (weight=%.2f)", cfg.Vouch.Weight)
}

// Serve static assets (js, css, images, etc.) with long cache (1 year for hashed assets)
http.HandleFunc("/assets/", func(w http.ResponseWriter, r *http.Request) {
path := strings.TrimPrefix(r.URL.Path, "/")
Expand Down
8 changes: 4 additions & 4 deletions cmd/crawler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,17 @@ func main() {
log.Fatalf("[CONFIG] Failed to load configuration: %v", err)
}

// 2. Initialize Repository in read-write mode
// 2. Initialize Repository
log.Println("[DATABASE] Initializing...")
repo, err := repository.New(cfg.Database, repository.ModeReadWrite)
repo, err := repository.New(cfg.Database)
if err != nil {
log.Fatalf("[DATABASE] Failed to initialize: %v", err)
}
defer repo.Close()
log.Println("[DATABASE] Ready (read-write mode)")

// 3. Create ranking calculator
calculator := ranking.NewCalculator(repo, cfg.SeedPubkeys, cfg.Ranking.TrustRankWeight, cfg.Ranking.PageRankWeight)
calculator := ranking.NewCalculator(repo, cfg.SeedPubkeys, cfg.Ranking.TrustRankWeight, cfg.Ranking.PageRankWeight, cfg.Vouch.Weight)

// 4. Perform an initial rank calculation
log.Println("[RANK] Performing initial calculation...")
Expand All @@ -58,7 +58,7 @@ func main() {
NumContactProcessors: cfg.Crawler.NumContactProcessors,
NumProfileProcessors: cfg.Crawler.NumProfileProcessors,
}
c := crawler.NewCrawler(repo, cfg.Relays, cfg.SeedPubkeys, &cfg.Search, crawlerConfig)
c := crawler.NewCrawler(repo, cfg.Relays, cfg.SeedPubkeys, &cfg.Search, crawlerConfig, cfg.Vouch.Enabled())
c.Start()

// 6. Periodically Calculate Ranks
Expand Down
13 changes: 13 additions & 0 deletions config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,19 @@ crawler:
# Number of profile event processors (default: 4)
num_profile_processors: 4

# Vouch / report feature. Fed by standard signed Nostr events — the crawler
# ingests them from relays, and the POST /event endpoint accepts pushes too.
#
# `weight` is the single control: set to 0 (default) to disable the feature
# entirely — the crawler skips kind:1984 / kind:30000, POST /event returns 404,
# and vouches are ignored by ranking. Set to a value in (0, 1] to enable. The
# value is the weight of a vouch edge relative to a follow edge (1.0); e.g. 0.5
# means each vouch contributes half the flow of an actual follow. Reports apply
# a trust-weighted penalty to the target's final score. POST /event submissions
# from pubkeys with no TrustRank (and not in seed_pubkeys) are silently ignored.
vouch:
weight: 0

# Default nostr relays to connect to, used for searching uesrs' relay lists
relays:
- wss://relay.damus.io/
Expand Down
24 changes: 24 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,21 @@ type CrawlerConfig struct {
NumProfileProcessors int `yaml:"num_profile_processors"` // Number of profile event processors (default: 4)
}

// VouchConfig controls the vouch/report feature: whether the crawler ingests
// kind:1984 reports and kind:30000 vouch sets, whether POST /event is served,
// and the weight of vouch edges in the ranking graph.
//
// A single knob: weight == 0 disables the feature entirely (crawler skips
// those kinds, POST /event returns 404, vouches are not read by the ranking
// calculator); weight > 0 enables it and sets the vouch-edge weight relative
// to a follow edge (1.0). Typical values: 0.5. Must be in [0, 1].
type VouchConfig struct {
Weight float64 `yaml:"weight"`
}

// Enabled reports whether the vouch feature is active.
func (v VouchConfig) Enabled() bool { return v.Weight > 0 }

// Config represents the application configuration
type Config struct {
Relays []string `yaml:"relays"`
Expand All @@ -38,6 +53,7 @@ type Config struct {
Search SearchConfig `yaml:"search"`
Ranking RankingConfig `yaml:"ranking"`
Crawler CrawlerConfig `yaml:"crawler"`
Vouch VouchConfig `yaml:"vouch"`
}

// Load reads and parses the configuration file
Expand All @@ -62,6 +78,9 @@ func Load(path string) (*Config, error) {
NumContactProcessors: 4,
NumProfileProcessors: 4,
},
Vouch: VouchConfig{
Weight: 0, // disabled by default
},
}
if err := yaml.Unmarshal(data, &cfg); err != nil {
return nil, err
Expand All @@ -80,6 +99,11 @@ func Load(path string) (*Config, error) {
log.Printf("[CONFIG] - Ranking weights: TrustRank=%.2f, PageRank=%.2f", cfg.Ranking.TrustRankWeight, cfg.Ranking.PageRankWeight)
log.Printf("[CONFIG] - Crawler: batch_size=%d, request_interval=%dms, contact_processors=%d, profile_processors=%d",
cfg.Crawler.BatchSize, cfg.Crawler.RequestIntervalMs, cfg.Crawler.NumContactProcessors, cfg.Crawler.NumProfileProcessors)
if cfg.Vouch.Enabled() {
log.Printf("[CONFIG] - Vouch/report feature enabled (weight=%.2f)", cfg.Vouch.Weight)
} else {
log.Printf("[CONFIG] - Vouch/report feature disabled (weight=0)")
}

return &cfg, nil
}
Expand Down
82 changes: 82 additions & 0 deletions internal/api/handler/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package handler

import (
"encoding/json"
"io"
"log"
"net/http"

"github.com/nbd-wtf/go-nostr"
)

// Async ingest tuning for POST /event.
const (
maxEventBytes = 1 << 20 // 1 MiB cap on a request body; Nostr events are small
ingestQueueSize = 1024 // buffered events awaiting background processing
ingestWorkers = 4 // background workers draining the queue
)

// PostEvent handles POST /event as a fire-and-forget intake: it reads the body,
// queues the event, and returns 202 immediately and unconditionally. Signature
// verification, the anti-inflation admission check, and persistence all happen
// on a background worker (processEvent) — the client learns nothing about the
// outcome. This is just a push accelerator: the same event is published to
// public relays, and the crawler ingests it from there regardless, so a full
// queue can simply drop the event.
func (h *Handler) PostEvent(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
writeError(w, http.StatusMethodNotAllowed, "Method not allowed")
return
}

// Decode synchronously — the body is gone once we return — then hand off.
var ev nostr.Event
if err := json.NewDecoder(io.LimitReader(r.Body, maxEventBytes)).Decode(&ev); err == nil {
select {
case h.ingestCh <- &ev:
default:
// Queue full: drop. The crawler will pick the event up from relays.
log.Printf("[API] Ingest queue full, dropped event kind=%d", ev.Kind)
}
}

writeJSON(w, http.StatusAccepted, map[string]string{"status": "accepted"})
}

// ingestWorker drains the async ingest queue, processing one event at a time.
func (h *Handler) ingestWorker() {
for ev := range h.ingestCh {
h.processEvent(ev)
}
}

// processEvent verifies, admits, and persists a single queued event off the
// request path. Every failure is silently dropped (logged at most) — there is
// no caller to report back to.
func (h *Handler) processEvent(ev *nostr.Event) {
if ok, err := ev.CheckSignature(); err != nil || !ok {
return
}
// Anti-inflation admission: only seeds or pubkeys with positive TrustRank
// contribute, so an untrusted author cannot inflate the graph by pushing.
if !h.authorQualifies(ev.PubKey) {
return
}
if _, err := h.ingester.Apply(ev); err != nil {
log.Printf("[API] Error ingesting event (kind=%d pubkey=%s): %v", ev.Kind, ev.PubKey, err)
}
}

// authorQualifies returns true if the author is an explicit seed or has a
// positive last-computed TrustRank score.
func (h *Handler) authorQualifies(pubkey string) bool {
if _, ok := h.seedSet[pubkey]; ok {
return true
}
score, err := h.repo.GetTrustScore(pubkey)
if err != nil {
log.Printf("[API] Error reading trust_score for %s: %v", pubkey, err)
return false
}
return score > 0
}
22 changes: 19 additions & 3 deletions internal/api/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"fayan/config"
"fayan/internal/cache"
"fayan/internal/ingest"
"fayan/internal/models"
"fayan/internal/repository"

Expand All @@ -22,15 +23,30 @@ type Handler struct {
repo *repository.Repository
cache *cache.Cache
searchConfig *config.SearchConfig
seedSet map[string]struct{}
ingester *ingest.Ingester
ingestCh chan *nostr.Event
}

// New creates a new Handler instance
func New(repo *repository.Repository, cache *cache.Cache, searchConfig *config.SearchConfig) *Handler {
return &Handler{
// New creates a new Handler instance and starts the background workers that
// process events posted to /event asynchronously.
func New(repo *repository.Repository, cache *cache.Cache, searchConfig *config.SearchConfig, seedPubkeys []string) *Handler {
seedSet := make(map[string]struct{}, len(seedPubkeys))
for _, pk := range seedPubkeys {
seedSet[pk] = struct{}{}
}
h := &Handler{
repo: repo,
cache: cache,
searchConfig: searchConfig,
seedSet: seedSet,
ingester: ingest.New(repo),
ingestCh: make(chan *nostr.Event, ingestQueueSize),
}
for range ingestWorkers {
go h.ingestWorker()
}
return h
}

// HealthResponse represents the health check response
Expand Down
4 changes: 2 additions & 2 deletions internal/api/middleware/cors.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import "net/http"
func CORS(next http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization")

if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusOK)
Expand Down
Loading