diff --git a/CLAUDE.md b/CLAUDE.md index 1cd14eb..6fa249c 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -30,12 +30,9 @@ docker compose up --build - **Crawler** (`cmd/crawler/main.go`): Crawls the Nostr network, stores follow relationships in SQLite, periodically calculates PageRank/TrustRank scores - **API** (`cmd/api/main.go`): Read-only HTTP server that queries reputation data -### Database Access Modes +### Database Access -The repository supports two modes via `repository.New(path, mode)`: - -- `ModeReadWrite`: For crawler - writes connections and scores -- `ModeReadOnly`: For API - optimized for concurrent reads with `PRAGMA query_only = ON` +`repository.New(path)` opens a single read-write handle. WAL mode allows multiple concurrent readers alongside a single writer; within a process `writeMu` serializes writers, and SQLite's own locks coordinate across the crawler and API processes. The connection pool is sized for 10 concurrent readers. ### Key Packages @@ -82,3 +79,18 @@ Copy `config.example.yaml` to `config.yaml` (and `docker-compose.example.yml` to - `crawler.request_interval_ms`: Milliseconds between requests per relay (default: 500) - `crawler.num_contact_processors`: Number of contact event processors (default: 4) - `crawler.num_profile_processors`: Number of profile event processors (default: 4) +- `vouch.weight`: Enables the feature (0 = disabled, default) and sets the weight of a vouch edge relative to a follow edge (1.0). Typical enabled value: `0.5`. + +### Vouch & Report via Nostr Events (when `vouch.weight > 0`) + +Vouches and reports are plain signed Nostr events, not a private API. They flow in two ways (both verify the event signature before storing): + +1. **Crawler ingestion (pull)** — one subscription per relay carries two filters (a REQ OR's multiple filters): filter 1 is kind:3 (+kind:0 when search is on); filter 2 is the kind:30000 vouch set constrained by `#d` (scoped to that filter, so it doesn't affect 3/0). kind:1984 reports are fetched in a separate subscription, capped at the newest 50, so an append-only flood can't crowd out the replaceable events. +2. **`POST /event` (push)** — accepts a single signed event (kind 3 / 1984 / 30000). It is fire-and-forget: the event is queued and the request returns 202 immediately and unconditionally. Background workers then verify the signature, apply the anti-inflation admission rule (events from pubkeys with no TrustRank and not in `seed_pubkeys` are dropped), and persist. A full queue simply drops the event — the crawler ingests it from relays anyway. (The crawler path does not apply the admission rule — ranking already discounts untrusted sources.) + +Shared parsing/storage lives in `internal/ingest` so both paths behave identically. + +- **Vouch** = membership in the author's vouch set: a **NIP-51 follow set (kind:30000)** tagged `d=vouch` (regular follow sets with any other `d` are ignored; the identifier is deliberately generic so it can become a shared convention). Its `p` tags list the vouched pubkeys. Registers source→target as a vouch edge (weight `vouch.weight`, deduped against a follow from the same source). Vouches follow the **same lifecycle as follow edges** (`vouches.last_seen`, not active deletion): each set refreshes its edges' `last_seen`; a pubkey dropped from the set is not deleted but stops being refreshed and ages out via the same staleness window as follows (`StreamVouches` filters on the ranking cutoff). So revoking a vouch takes effect after the window, exactly like unfollowing. +- **Report** = a **kind:1984** (NIP-56) event targeting a **profile** (`p` tag, no `e` tag) with report type `spam` or `impersonation` (other types ignored). Applies a trust-weighted penalty to the target's final score: `final = raw * (1 - R/(R+F))` where R is the sum of reporter trust_scores and F is the sum of follower/voucher trust_scores. + +No mutual exclusion — `vouches` and `reports` rows coexist independently. A source that both vouches for and reports the same target is not specially handled: the vouch adds flow and the report subtracts it at ranking time, which roughly cancels out. Stored in the `vouches` and `reports` tables (schema from migration v4). diff --git a/cmd/api/main.go b/cmd/api/main.go index 5d3086f..4b3319b 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -27,20 +27,20 @@ func main() { log.Fatalf("Failed to load config: %v", err) } - // Initialize repository in read-only mode - repo, err := repository.New(cfg.Database, repository.ModeReadOnly) + repo, err := repository.New(cfg.Database) if err != nil { log.Fatalf("Failed to initialize database: %v", err) } defer repo.Close() - log.Println("[API] Database initialized successfully (read-only mode)") + log.Println("[API] Database initialized successfully") // Initialize cache apiCache := cache.New(10*time.Minute, 10*time.Minute) - // Initialize handler with search config - h := handler.New(repo, apiCache, &cfg.Search) + // Initialize handler with search config and seed pubkeys (seeds always + // qualify for vouch/report submissions regardless of trust_score). + h := handler.New(repo, apiCache, &cfg.Search, cfg.SeedPubkeys) // Setup static file system staticFS, err := fs.Sub(staticFiles, "static") @@ -87,6 +87,16 @@ func main() { http.HandleFunc("/users/", middleware.CORS(h.User)) http.HandleFunc("/search", middleware.CORS(h.Search)) + // Event ingestion endpoint. Accepts signed Nostr events (kind 3 / 1984 / + // 30000) as a push complement to the crawler; it queues the event and + // returns 202 immediately, processing it asynchronously. When vouch.weight + // <= 0 the feature is disabled: no route is registered and requests fall + // through to the SPA catch-all handler below. + if cfg.Vouch.Enabled() { + http.HandleFunc("/event", middleware.CORS(h.PostEvent)) + log.Printf("[API] Event ingestion endpoint enabled (weight=%.2f)", cfg.Vouch.Weight) + } + // Serve static assets (js, css, images, etc.) with long cache (1 year for hashed assets) http.HandleFunc("/assets/", func(w http.ResponseWriter, r *http.Request) { path := strings.TrimPrefix(r.URL.Path, "/") diff --git a/cmd/crawler/main.go b/cmd/crawler/main.go index bc27bea..bcf84bc 100644 --- a/cmd/crawler/main.go +++ b/cmd/crawler/main.go @@ -31,9 +31,9 @@ func main() { log.Fatalf("[CONFIG] Failed to load configuration: %v", err) } - // 2. Initialize Repository in read-write mode + // 2. Initialize Repository log.Println("[DATABASE] Initializing...") - repo, err := repository.New(cfg.Database, repository.ModeReadWrite) + repo, err := repository.New(cfg.Database) if err != nil { log.Fatalf("[DATABASE] Failed to initialize: %v", err) } @@ -41,7 +41,7 @@ func main() { log.Println("[DATABASE] Ready (read-write mode)") // 3. Create ranking calculator - calculator := ranking.NewCalculator(repo, cfg.SeedPubkeys, cfg.Ranking.TrustRankWeight, cfg.Ranking.PageRankWeight) + calculator := ranking.NewCalculator(repo, cfg.SeedPubkeys, cfg.Ranking.TrustRankWeight, cfg.Ranking.PageRankWeight, cfg.Vouch.Weight) // 4. Perform an initial rank calculation log.Println("[RANK] Performing initial calculation...") @@ -58,7 +58,7 @@ func main() { NumContactProcessors: cfg.Crawler.NumContactProcessors, NumProfileProcessors: cfg.Crawler.NumProfileProcessors, } - c := crawler.NewCrawler(repo, cfg.Relays, cfg.SeedPubkeys, &cfg.Search, crawlerConfig) + c := crawler.NewCrawler(repo, cfg.Relays, cfg.SeedPubkeys, &cfg.Search, crawlerConfig, cfg.Vouch.Enabled()) c.Start() // 6. Periodically Calculate Ranks diff --git a/config.example.yaml b/config.example.yaml index c774fa3..d549922 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -36,6 +36,19 @@ crawler: # Number of profile event processors (default: 4) num_profile_processors: 4 +# Vouch / report feature. Fed by standard signed Nostr events — the crawler +# ingests them from relays, and the POST /event endpoint accepts pushes too. +# +# `weight` is the single control: set to 0 (default) to disable the feature +# entirely — the crawler skips kind:1984 / kind:30000, POST /event returns 404, +# and vouches are ignored by ranking. Set to a value in (0, 1] to enable. The +# value is the weight of a vouch edge relative to a follow edge (1.0); e.g. 0.5 +# means each vouch contributes half the flow of an actual follow. Reports apply +# a trust-weighted penalty to the target's final score. POST /event submissions +# from pubkeys with no TrustRank (and not in seed_pubkeys) are silently ignored. +vouch: + weight: 0 + # Default nostr relays to connect to, used for searching uesrs' relay lists relays: - wss://relay.damus.io/ diff --git a/config/config.go b/config/config.go index 9af4d24..85de88e 100644 --- a/config/config.go +++ b/config/config.go @@ -28,6 +28,21 @@ type CrawlerConfig struct { NumProfileProcessors int `yaml:"num_profile_processors"` // Number of profile event processors (default: 4) } +// VouchConfig controls the vouch/report feature: whether the crawler ingests +// kind:1984 reports and kind:30000 vouch sets, whether POST /event is served, +// and the weight of vouch edges in the ranking graph. +// +// A single knob: weight == 0 disables the feature entirely (crawler skips +// those kinds, POST /event returns 404, vouches are not read by the ranking +// calculator); weight > 0 enables it and sets the vouch-edge weight relative +// to a follow edge (1.0). Typical values: 0.5. Must be in [0, 1]. +type VouchConfig struct { + Weight float64 `yaml:"weight"` +} + +// Enabled reports whether the vouch feature is active. +func (v VouchConfig) Enabled() bool { return v.Weight > 0 } + // Config represents the application configuration type Config struct { Relays []string `yaml:"relays"` @@ -38,6 +53,7 @@ type Config struct { Search SearchConfig `yaml:"search"` Ranking RankingConfig `yaml:"ranking"` Crawler CrawlerConfig `yaml:"crawler"` + Vouch VouchConfig `yaml:"vouch"` } // Load reads and parses the configuration file @@ -62,6 +78,9 @@ func Load(path string) (*Config, error) { NumContactProcessors: 4, NumProfileProcessors: 4, }, + Vouch: VouchConfig{ + Weight: 0, // disabled by default + }, } if err := yaml.Unmarshal(data, &cfg); err != nil { return nil, err @@ -80,6 +99,11 @@ func Load(path string) (*Config, error) { log.Printf("[CONFIG] - Ranking weights: TrustRank=%.2f, PageRank=%.2f", cfg.Ranking.TrustRankWeight, cfg.Ranking.PageRankWeight) log.Printf("[CONFIG] - Crawler: batch_size=%d, request_interval=%dms, contact_processors=%d, profile_processors=%d", cfg.Crawler.BatchSize, cfg.Crawler.RequestIntervalMs, cfg.Crawler.NumContactProcessors, cfg.Crawler.NumProfileProcessors) + if cfg.Vouch.Enabled() { + log.Printf("[CONFIG] - Vouch/report feature enabled (weight=%.2f)", cfg.Vouch.Weight) + } else { + log.Printf("[CONFIG] - Vouch/report feature disabled (weight=0)") + } return &cfg, nil } diff --git a/internal/api/handler/event.go b/internal/api/handler/event.go new file mode 100644 index 0000000..f03eb6f --- /dev/null +++ b/internal/api/handler/event.go @@ -0,0 +1,82 @@ +package handler + +import ( + "encoding/json" + "io" + "log" + "net/http" + + "github.com/nbd-wtf/go-nostr" +) + +// Async ingest tuning for POST /event. +const ( + maxEventBytes = 1 << 20 // 1 MiB cap on a request body; Nostr events are small + ingestQueueSize = 1024 // buffered events awaiting background processing + ingestWorkers = 4 // background workers draining the queue +) + +// PostEvent handles POST /event as a fire-and-forget intake: it reads the body, +// queues the event, and returns 202 immediately and unconditionally. Signature +// verification, the anti-inflation admission check, and persistence all happen +// on a background worker (processEvent) — the client learns nothing about the +// outcome. This is just a push accelerator: the same event is published to +// public relays, and the crawler ingests it from there regardless, so a full +// queue can simply drop the event. +func (h *Handler) PostEvent(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + writeError(w, http.StatusMethodNotAllowed, "Method not allowed") + return + } + + // Decode synchronously — the body is gone once we return — then hand off. + var ev nostr.Event + if err := json.NewDecoder(io.LimitReader(r.Body, maxEventBytes)).Decode(&ev); err == nil { + select { + case h.ingestCh <- &ev: + default: + // Queue full: drop. The crawler will pick the event up from relays. + log.Printf("[API] Ingest queue full, dropped event kind=%d", ev.Kind) + } + } + + writeJSON(w, http.StatusAccepted, map[string]string{"status": "accepted"}) +} + +// ingestWorker drains the async ingest queue, processing one event at a time. +func (h *Handler) ingestWorker() { + for ev := range h.ingestCh { + h.processEvent(ev) + } +} + +// processEvent verifies, admits, and persists a single queued event off the +// request path. Every failure is silently dropped (logged at most) — there is +// no caller to report back to. +func (h *Handler) processEvent(ev *nostr.Event) { + if ok, err := ev.CheckSignature(); err != nil || !ok { + return + } + // Anti-inflation admission: only seeds or pubkeys with positive TrustRank + // contribute, so an untrusted author cannot inflate the graph by pushing. + if !h.authorQualifies(ev.PubKey) { + return + } + if _, err := h.ingester.Apply(ev); err != nil { + log.Printf("[API] Error ingesting event (kind=%d pubkey=%s): %v", ev.Kind, ev.PubKey, err) + } +} + +// authorQualifies returns true if the author is an explicit seed or has a +// positive last-computed TrustRank score. +func (h *Handler) authorQualifies(pubkey string) bool { + if _, ok := h.seedSet[pubkey]; ok { + return true + } + score, err := h.repo.GetTrustScore(pubkey) + if err != nil { + log.Printf("[API] Error reading trust_score for %s: %v", pubkey, err) + return false + } + return score > 0 +} diff --git a/internal/api/handler/handler.go b/internal/api/handler/handler.go index 3dd58da..8c1a2fc 100644 --- a/internal/api/handler/handler.go +++ b/internal/api/handler/handler.go @@ -10,6 +10,7 @@ import ( "fayan/config" "fayan/internal/cache" + "fayan/internal/ingest" "fayan/internal/models" "fayan/internal/repository" @@ -22,15 +23,30 @@ type Handler struct { repo *repository.Repository cache *cache.Cache searchConfig *config.SearchConfig + seedSet map[string]struct{} + ingester *ingest.Ingester + ingestCh chan *nostr.Event } -// New creates a new Handler instance -func New(repo *repository.Repository, cache *cache.Cache, searchConfig *config.SearchConfig) *Handler { - return &Handler{ +// New creates a new Handler instance and starts the background workers that +// process events posted to /event asynchronously. +func New(repo *repository.Repository, cache *cache.Cache, searchConfig *config.SearchConfig, seedPubkeys []string) *Handler { + seedSet := make(map[string]struct{}, len(seedPubkeys)) + for _, pk := range seedPubkeys { + seedSet[pk] = struct{}{} + } + h := &Handler{ repo: repo, cache: cache, searchConfig: searchConfig, + seedSet: seedSet, + ingester: ingest.New(repo), + ingestCh: make(chan *nostr.Event, ingestQueueSize), + } + for range ingestWorkers { + go h.ingestWorker() } + return h } // HealthResponse represents the health check response diff --git a/internal/api/middleware/cors.go b/internal/api/middleware/cors.go index 6ac4c76..38fac4f 100644 --- a/internal/api/middleware/cors.go +++ b/internal/api/middleware/cors.go @@ -6,8 +6,8 @@ import "net/http" func CORS(next http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Access-Control-Allow-Origin", "*") - w.Header().Set("Access-Control-Allow-Methods", "GET, OPTIONS") - w.Header().Set("Access-Control-Allow-Headers", "Content-Type") + w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS") + w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization") if r.Method == http.MethodOptions { w.WriteHeader(http.StatusOK) diff --git a/internal/crawler/crawler.go b/internal/crawler/crawler.go index 852ec98..6df7a59 100644 --- a/internal/crawler/crawler.go +++ b/internal/crawler/crawler.go @@ -10,6 +10,7 @@ import ( "time" "fayan/config" + "fayan/internal/ingest" "fayan/internal/repository" "github.com/nbd-wtf/go-nostr" @@ -17,6 +18,13 @@ import ( "golang.org/x/time/rate" ) +// maxReportsPerQuery caps how many kind:1984 reports a single relay query pulls +// per author batch. Reports are append-only and a busy account can have many, +// so without a cap they would crowd out the replaceable events sharing a fetch. +// We deliberately do not paginate — successive crawl cycles re-fetch the batch, +// and reports are sparse, so the newest 50 per query suffice. +const maxReportsPerQuery = 50 + // CrawlerConfig holds the crawler configuration parameters type CrawlerConfig struct { BatchSize int @@ -33,8 +41,11 @@ type Crawler struct { seedPubkeys []string searchConfig *config.SearchConfig crawlerConfig *CrawlerConfig + vouchEnabled bool + ingester *ingest.Ingester contactsChan chan *nostr.Event profilesChan chan *nostr.Event + reportsChan chan *nostr.Event crawled map[string]bool crawledMu sync.Mutex relayLimiters map[string]*rate.Limiter @@ -56,8 +67,9 @@ type Crawler struct { cancel context.CancelFunc } -// NewCrawler creates a new Crawler instance. -func NewCrawler(repo *repository.Repository, relays []string, seedPubkeys []string, searchConfig *config.SearchConfig, crawlerConfig *CrawlerConfig) *Crawler { +// NewCrawler creates a new Crawler instance. When vouchEnabled is true the +// crawler also fetches kind:1984 reports and kind:30000 vouch sets. +func NewCrawler(repo *repository.Repository, relays []string, seedPubkeys []string, searchConfig *config.SearchConfig, crawlerConfig *CrawlerConfig, vouchEnabled bool) *Crawler { ctx, cancel := context.WithCancel(context.Background()) relayOptions := []nostr.RelayOption{ @@ -69,6 +81,7 @@ func NewCrawler(repo *repository.Repository, relays []string, seedPubkeys []stri // Calculate channel buffer sizes based on batch size contactsChanSize := crawlerConfig.BatchSize * 3 profilesChanSize := crawlerConfig.BatchSize * crawlerConfig.NumProfileProcessors * 2 + reportsChanSize := crawlerConfig.BatchSize * 2 c := &Crawler{ repo: repo, @@ -77,8 +90,11 @@ func NewCrawler(repo *repository.Repository, relays []string, seedPubkeys []stri seedPubkeys: seedPubkeys, searchConfig: searchConfig, crawlerConfig: crawlerConfig, + vouchEnabled: vouchEnabled, + ingester: ingest.New(repo), contactsChan: make(chan *nostr.Event, contactsChanSize), profilesChan: make(chan *nostr.Event, profilesChanSize), + reportsChan: make(chan *nostr.Event, reportsChanSize), crawled: make(map[string]bool), relayLimiters: make(map[string]*rate.Limiter), relayHealth: NewRelayHealthTracker(), @@ -110,6 +126,7 @@ func (c *Crawler) Stop() { // Now it's safe to close channels (no more senders) close(c.contactsChan) close(c.profilesChan) + close(c.reportsChan) // Stop the pool manager (this will close all relay connections) c.poolManager.Stop() @@ -218,6 +235,17 @@ func (c *Crawler) Start() { } } + // Processors for kind:1984 report events (only when the feature is enabled) + if c.vouchEnabled { + for range c.crawlerConfig.NumContactProcessors { + c.wg.Add(1) + go func() { + defer c.wg.Done() + c.reportProcessor() + }() + } + } + // Status reporter (not tracked in wg since it's non-critical) go c.statusReporter() } @@ -264,9 +292,7 @@ func (c *Crawler) contactProcessor() { if !ok { return // Channel closed, exit } - if event != nil { - c.processKind3Event(event) - } + c.dispatchRelationEvent(event) case <-c.ctx.Done(): // Context cancelled, drain remaining events before exiting c.drainContactsChan() @@ -275,6 +301,20 @@ func (c *Crawler) contactProcessor() { } } +// dispatchRelationEvent routes a relation event from the contacts channel to +// its handler by kind (kind:3 follows or kind:30000 vouch sets). +func (c *Crawler) dispatchRelationEvent(event *nostr.Event) { + if event == nil { + return + } + switch event.Kind { + case ingest.KindContacts: + c.processKind3Event(event) + case ingest.KindVouchSet: + c.processVouchSetEvent(event) + } +} + // drainContactsChan processes any remaining events in the contacts channel func (c *Crawler) drainContactsChan() { for { @@ -283,9 +323,7 @@ func (c *Crawler) drainContactsChan() { if !ok { return } - if event != nil { - c.processKind3Event(event) - } + c.dispatchRelationEvent(event) default: return } @@ -324,10 +362,12 @@ func (c *Crawler) fetchBatch(pubkeys []string) { } } - // Step 3: Fetch contacts (and profiles if search is enabled) from each relay concurrently + // Step 3: Fetch contacts (and profiles/vouch sets) from each relay concurrently // Collect results from all relays contactEvents := make(map[string]*nostr.Event) profileEvents := make(map[string]*nostr.Event) + vouchSetEvents := make(map[string]*nostr.Event) + var reportEvents []*nostr.Event var wg sync.WaitGroup var eventsMu sync.Mutex // Protect concurrent map writes @@ -337,7 +377,12 @@ func (c *Crawler) fetchBatch(pubkeys []string) { wg.Add(1) go func(r string, u []string) { defer wg.Done() - contacts, profiles := c.fetchEventsFromRelay(r, u, fetchProfiles) + contacts, profiles, vouchSets := c.fetchEventsFromRelay(r, u, fetchProfiles) + + var reports []*nostr.Event + if c.vouchEnabled { + reports = c.fetchReportsFromRelay(r, u) + } // Use mutex to protect map access eventsMu.Lock() @@ -351,6 +396,12 @@ func (c *Crawler) fetchBatch(pubkeys []string) { profileEvents[pubkey] = event } } + for pubkey, event := range vouchSets { + if existing, exists := vouchSetEvents[pubkey]; !exists || event.CreatedAt > existing.CreatedAt { + vouchSetEvents[pubkey] = event + } + } + reportEvents = append(reportEvents, reports...) eventsMu.Unlock() }(relay, users) } @@ -358,7 +409,8 @@ func (c *Crawler) fetchBatch(pubkeys []string) { // Wait for all relays to finish wg.Wait() - // Step 5: Check against global timestamps and send to processors + // Step 5: Check against global timestamps and send to processors. + // Contacts and vouch sets share the contacts channel (dispatched by kind). for _, event := range contactEvents { select { case c.contactsChan <- event: @@ -367,6 +419,14 @@ func (c *Crawler) fetchBatch(pubkeys []string) { } } + for _, event := range vouchSetEvents { + select { + case c.contactsChan <- event: + case <-c.ctx.Done(): + return + } + } + // Send profile events to profile processor if fetchProfiles { for _, event := range profileEvents { @@ -377,52 +437,75 @@ func (c *Crawler) fetchBatch(pubkeys []string) { } } } + + // Send report events to report processor + for _, event := range reportEvents { + select { + case c.reportsChan <- event: + case <-c.ctx.Done(): + return + } + } } -// fetchEventsFromRelay fetches contacts (kind 3) and optionally profiles (kind 0) for multiple users from a single relay -// Returns maps of pubkey -> latest event for contacts and profiles -func (c *Crawler) fetchEventsFromRelay(relay string, pubkeys []string, fetchProfiles bool) (map[string]*nostr.Event, map[string]*nostr.Event) { +// fetchEventsFromRelay fetches the replaceable events for multiple users from a +// single relay in ONE subscription. A REQ carries multiple filters (OR'd), so: +// - filter 1: kind 3 (contacts) + optional kind 0 (profiles) — one-per-author, +// no `d` tag; +// - filter 2 (when vouch is enabled): the vouch set (kind:30000) constrained +// by `#d` — that constraint applies only to its own filter, not to 3/0. +// +// One connection, one round-trip. kind:1984 reports are fetched separately +// (fetchReportsFromRelay): being append-only they carry a per-query limit that +// is cleaner to reason about in its own subscription. +// Returns maps of pubkey -> latest event for contacts, profiles and vouch sets. +func (c *Crawler) fetchEventsFromRelay(relay string, pubkeys []string, fetchProfiles bool) (map[string]*nostr.Event, map[string]*nostr.Event, map[string]*nostr.Event) { if len(pubkeys) == 0 { - return nil, nil + return nil, nil, nil } // Check if context is cancelled select { case <-c.ctx.Done(): - return nil, nil + return nil, nil, nil default: } // Skip if relay is banned if c.relayHealth.IsRelayBanned(relay) { - return nil, nil + return nil, nil, nil } // Apply rate limiting for this specific relay limiter := c.getRelayLimiter(relay) if err := limiter.Wait(c.ctx); err != nil { - return nil, nil + return nil, nil, nil } ctx, cancel := context.WithTimeout(c.ctx, 15*time.Second) defer cancel() - // Build filter with kinds 3 (contacts) and optionally 0 (profiles) + // Filter 1: contacts (+ profiles). kinds := []int{3} if fetchProfiles { kinds = append(kinds, 0) } + filters := nostr.Filters{{Kinds: kinds, Authors: pubkeys}} - filter := nostr.Filter{ - Kinds: kinds, - Authors: pubkeys, + // Filter 2: the vouch set; its #d constraint is scoped to this filter only. + if c.vouchEnabled { + filters = append(filters, nostr.Filter{ + Kinds: []int{ingest.KindVouchSet}, + Authors: pubkeys, + Tags: nostr.TagMap{"d": []string{ingest.VouchSetIdentifier}}, + }) } // Get the current pool from pool manager pool := c.poolManager.GetPool() - // SubscribeMany returns a channel of RelayEvent - eventsChan := pool.FetchMany(ctx, []string{relay}, filter) + // SubManyEose is like FetchMany but takes multiple filters; ends on EOSE. + eventsChan := pool.SubManyEose(ctx, []string{relay}, filters) // Track relay usage c.poolManager.TrackRelayUsage(relay) @@ -430,6 +513,7 @@ func (c *Crawler) fetchEventsFromRelay(relay string, pubkeys []string, fetchProf // Collect events and keep only the latest for each pubkey contacts := make(map[string]*nostr.Event) profiles := make(map[string]*nostr.Event) + vouchSets := make(map[string]*nostr.Event) timer := time.NewTimer(10 * time.Second) // Slightly less than context timeout defer timer.Stop() channelClosed := false @@ -445,7 +529,7 @@ func (c *Crawler) fetchEventsFromRelay(relay string, pubkeys []string, fetchProf channelClosed = true c.relayHealth.RecordSuccess(relay) } - return contacts, profiles + return contacts, profiles, vouchSets } ev := relayEvent.Event @@ -460,19 +544,80 @@ func (c *Crawler) fetchEventsFromRelay(relay string, pubkeys []string, fetchProf if existing, exists := profiles[ev.PubKey]; !exists || ev.CreatedAt > existing.CreatedAt { profiles[ev.PubKey] = ev } + case ingest.KindVouchSet: + if existing, exists := vouchSets[ev.PubKey]; !exists || ev.CreatedAt > existing.CreatedAt { + vouchSets[ev.PubKey] = ev + } } case <-timer.C: // Timeout - this could indicate connection issues if !channelClosed { c.relayHealth.RecordFailure(relay, "timeout - no response") } - return contacts, profiles + return contacts, profiles, vouchSets case <-ctx.Done(): // Context cancelled if !channelClosed { c.relayHealth.RecordFailure(relay, "context cancelled") } - return contacts, profiles + return contacts, profiles, vouchSets + } + } +} + +// fetchReportsFromRelay fetches kind:1984 reports for multiple users from a +// single relay in their own capped query, kept separate from the replaceable +// events so a flood of reports cannot crowd them out. Returns all matching +// events (an author may have many); de-duplication happens at ingest time. +func (c *Crawler) fetchReportsFromRelay(relay string, pubkeys []string) []*nostr.Event { + if len(pubkeys) == 0 { + return nil + } + + select { + case <-c.ctx.Done(): + return nil + default: + } + + if c.relayHealth.IsRelayBanned(relay) { + return nil + } + + limiter := c.getRelayLimiter(relay) + if err := limiter.Wait(c.ctx); err != nil { + return nil + } + + ctx, cancel := context.WithTimeout(c.ctx, 15*time.Second) + defer cancel() + + limit := maxReportsPerQuery + filter := nostr.Filter{ + Kinds: []int{ingest.KindReport}, + Authors: pubkeys, + Limit: limit, + } + + pool := c.poolManager.GetPool() + eventsChan := pool.FetchMany(ctx, []string{relay}, filter) + c.poolManager.TrackRelayUsage(relay) + + var reports []*nostr.Event + timer := time.NewTimer(10 * time.Second) + defer timer.Stop() + + for { + select { + case relayEvent, ok := <-eventsChan: + if !ok { + return reports + } + reports = append(reports, relayEvent.Event) + case <-timer.C: + return reports + case <-ctx.Done(): + return reports } } } @@ -584,39 +729,13 @@ func (c *Crawler) isValidRelay(url string) bool { // processKind3Event parses a kind:3 event and updates the database and work queue. // Uses batch writes to reduce lock contention. func (c *Crawler) processKind3Event(ev *nostr.Event) { - if ev.Kind != 3 { + if ev.Kind != ingest.KindContacts { return } - // Collect all pubkeys and connections for batch write - pubkeySet := make(map[string]bool) - pubkeySet[ev.PubKey] = true - - var connections []repository.Connection - - for _, tag := range ev.Tags { - if len(tag) >= 2 && tag[0] == "p" { - targetPubkey := tag[1] - if !nostr.IsValidPublicKey(targetPubkey) { - continue - } - if targetPubkey == ev.PubKey { - continue - } - - pubkeySet[targetPubkey] = true - connections = append(connections, repository.Connection{ - Source: ev.PubKey, - Target: targetPubkey, - }) - } - } - - // Convert set to slice - pubkeys := make([]string, 0, len(pubkeySet)) - for pk := range pubkeySet { - pubkeys = append(pubkeys, pk) - } + // Parse with the shared ingest parser so the crawler and POST /event paths + // produce identical follow edges. + pubkeys, connections := ingest.ParseContacts(ev) // Batch write all pubkeys and connections in a single transaction if err := c.repo.BatchUpsertPubkeysAndConnections(pubkeys, connections); err != nil { @@ -625,12 +744,80 @@ func (c *Crawler) processKind3Event(ev *nostr.Event) { // Update crawled map c.crawledMu.Lock() - for pk := range pubkeySet { + for _, pk := range pubkeys { c.crawled[pk] = true } c.crawledMu.Unlock() } +// processVouchSetEvent verifies a kind:30000 event's signature and replaces the +// author's vouch edges with the listed pubkeys. +func (c *Crawler) processVouchSetEvent(ev *nostr.Event) { + if ev.Kind != ingest.KindVouchSet { + return + } + if ok, err := ev.CheckSignature(); err != nil || !ok { + return + } + if err := c.ingester.ApplyVouchSet(ev); err != nil { + log.Printf("[CRAWLER] Error applying vouch set for %s: %v", ev.PubKey, err) + } +} + +// processReportEvent verifies a kind:1984 event's signature and stores it as a +// report edge when it is a profile-level spam/impersonation report. +func (c *Crawler) processReportEvent(ev *nostr.Event) { + if ev.Kind != ingest.KindReport { + return + } + if ok, err := ev.CheckSignature(); err != nil || !ok { + return + } + if _, err := c.ingester.ApplyReport(ev); err != nil { + log.Printf("[CRAWLER] Error applying report from %s: %v", ev.PubKey, err) + } +} + +// reportProcessor handles processing of report events (kind 1984) +func (c *Crawler) reportProcessor() { + for { + if c.waitIfPaused() { + c.drainReportsChan() + return + } + + select { + case event, ok := <-c.reportsChan: + if !ok { + return + } + if event != nil { + c.processReportEvent(event) + } + case <-c.ctx.Done(): + c.drainReportsChan() + return + } + } +} + +// drainReportsChan processes any remaining events in the reports channel +func (c *Crawler) drainReportsChan() { + for { + select { + case event, ok := <-c.reportsChan: + if !ok { + return + } + if event != nil { + c.processReportEvent(event) + } + default: + return + } + } +} + // profileProcessor handles processing of profile events (kind 0) func (c *Crawler) profileProcessor() { for { diff --git a/internal/ingest/ingest.go b/internal/ingest/ingest.go new file mode 100644 index 0000000..faf76d2 --- /dev/null +++ b/internal/ingest/ingest.go @@ -0,0 +1,173 @@ +// Package ingest turns signed Nostr events into reputation-graph data. The same +// logic backs both the crawler (events pulled from relays) and the API's +// POST /event endpoint (events pushed by clients), so the two paths stay in sync. +package ingest + +import ( + "fayan/internal/repository" + + "github.com/nbd-wtf/go-nostr" +) + +// Supported event kinds. +const ( + KindContacts = 3 // NIP-02 contact list → follow edges + KindReport = 1984 // NIP-56 report → report edge (profile-level only) + KindVouchSet = 30000 // NIP-51 follow set; the vouch set is the one tagged d=VouchSetIdentifier +) + +// VouchSetIdentifier is the NIP-51 `d` tag value identifying the follow set +// used as a vouch set. A kind:30000 with any other `d` is a regular follow set +// and is ignored. The value is intentionally generic (no project prefix) so it +// can serve as a shared convention if one emerges. +const VouchSetIdentifier = "vouch" + +// acceptedReportTypes are the NIP-56 report types that affect reputation. Fayan +// is a spam-detection system, so only these two carry weight; other types +// (nudity, profanity, …) are ignored. +var acceptedReportTypes = map[string]bool{ + "spam": true, + "impersonation": true, +} + +// Ingester applies signature-verified Nostr events to the repository. +type Ingester struct { + repo *repository.Repository +} + +// New creates an Ingester backed by repo. +func New(repo *repository.Repository) *Ingester { + return &Ingester{repo: repo} +} + +// Apply dispatches a verified event by kind, reporting whether the kind is one +// Fayan ingests. The caller is responsible for verifying ev's signature first. +func (in *Ingester) Apply(ev *nostr.Event) (handled bool, err error) { + switch ev.Kind { + case KindContacts: + return true, in.ApplyContacts(ev) + case KindReport: + _, err := in.ApplyReport(ev) + return true, err + case KindVouchSet: + if !IsVouchSet(ev) { + return false, nil // some other follow set — not ours + } + return true, in.ApplyVouchSet(ev) + } + return false, nil +} + +// IsVouchSet reports whether ev is the NIP-51 follow set (kind:30000) Fayan +// uses as a vouch set, i.e. tagged with VouchSetIdentifier. +func IsVouchSet(ev *nostr.Event) bool { + return ev.Kind == KindVouchSet && ev.Tags.GetD() == VouchSetIdentifier +} + +// ApplyContacts parses a kind:3 event into follow connections and persists them. +func (in *Ingester) ApplyContacts(ev *nostr.Event) error { + if ev.Kind != KindContacts { + return nil + } + pubkeys, connections := ParseContacts(ev) + return in.repo.BatchUpsertPubkeysAndConnections(pubkeys, connections) +} + +// ParseContacts extracts the author plus followed pubkeys and the follow edges +// from a kind:3 event. Exported so the crawler shares one parser with the API. +func ParseContacts(ev *nostr.Event) ([]string, []repository.Connection) { + pubkeySet := make(map[string]bool) + pubkeySet[ev.PubKey] = true + + var connections []repository.Connection + for _, tag := range ev.Tags { + if len(tag) >= 2 && tag[0] == "p" { + target := tag[1] + if !nostr.IsValidPublicKey(target) || target == ev.PubKey { + continue + } + pubkeySet[target] = true + connections = append(connections, repository.Connection{Source: ev.PubKey, Target: target}) + } + } + + pubkeys := make([]string, 0, len(pubkeySet)) + for pk := range pubkeySet { + pubkeys = append(pubkeys, pk) + } + return pubkeys, connections +} + +// ApplyReport stores a profile-level spam/impersonation report as a report edge. +// It returns false (without error) when the event is not an accepted profile +// report: it targets a specific event (has an e tag), lacks a usable p target, +// or carries a report type Fayan does not weigh. +func (in *Ingester) ApplyReport(ev *nostr.Event) (bool, error) { + if ev.Kind != KindReport { + return false, nil + } + target, ok := profileReportTarget(ev) + if !ok { + return false, nil + } + return true, in.repo.UpsertReport(ev.PubKey, target, ev.CreatedAt.Time()) +} + +// profileReportTarget returns the reported pubkey when ev is a NIP-56 report +// that targets a profile (not a specific event) with an accepted report type. +// Any e tag disqualifies the event — that is a note-level report, out of scope. +func profileReportTarget(ev *nostr.Event) (string, bool) { + target := "" + for _, tag := range ev.Tags { + if len(tag) == 0 { + continue + } + switch tag[0] { + case "e": + return "", false + case "p": + if target != "" || len(tag) < 2 { + continue + } + pk := tag[1] + if !nostr.IsValidPublicKey(pk) || pk == ev.PubKey { + continue + } + // NIP-56 carries the report type in the p tag's third element. + if len(tag) < 3 || !acceptedReportTypes[tag[2]] { + continue + } + target = pk + } + } + return target, target != "" +} + +// ApplyVouchSet refreshes the author's vouch edges from the pubkeys listed in +// their vouch set (the NIP-51 kind:30000 follow set tagged with +// VouchSetIdentifier). A no-op for any other event. +func (in *Ingester) ApplyVouchSet(ev *nostr.Event) error { + if !IsVouchSet(ev) { + return nil + } + targets := ParseVouchTargets(ev) + return in.repo.UpsertVouches(ev.PubKey, targets) +} + +// ParseVouchTargets extracts the valid, de-duplicated pubkeys an author vouches +// for from a follow set's p tags (excluding the author themselves). +func ParseVouchTargets(ev *nostr.Event) []string { + seen := make(map[string]bool) + var targets []string + for _, tag := range ev.Tags { + if len(tag) >= 2 && tag[0] == "p" { + pk := tag[1] + if !nostr.IsValidPublicKey(pk) || pk == ev.PubKey || seen[pk] { + continue + } + seen[pk] = true + targets = append(targets, pk) + } + } + return targets +} diff --git a/internal/ingest/ingest_test.go b/internal/ingest/ingest_test.go new file mode 100644 index 0000000..68215ea --- /dev/null +++ b/internal/ingest/ingest_test.go @@ -0,0 +1,113 @@ +package ingest + +import ( + "testing" + + "github.com/nbd-wtf/go-nostr" +) + +// mustPubkey returns a real, curve-valid x-only pubkey (IsValidPublicKey parses +// the secp256k1 point, so arbitrary hex will not do). +func mustPubkey(t *testing.T) string { + t.Helper() + pk, err := nostr.GetPublicKey(nostr.GeneratePrivateKey()) + if err != nil { + t.Fatalf("derive pubkey: %v", err) + } + return pk +} + +func TestParseContacts(t *testing.T) { + author, bob, carol := mustPubkey(t), mustPubkey(t), mustPubkey(t) + ev := &nostr.Event{ + PubKey: author, + Kind: KindContacts, + Tags: nostr.Tags{ + {"p", bob}, + {"p", carol}, + {"p", author}, // self — excluded + {"p", "not-hex"}, // invalid — excluded + {"e", "whatever"}, // non-p — ignored + }, + } + pubkeys, conns := ParseContacts(ev) + if len(conns) != 2 { + t.Fatalf("expected 2 connections, got %d", len(conns)) + } + // author + bob + carol = 3 distinct pubkeys + if len(pubkeys) != 3 { + t.Fatalf("expected 3 pubkeys, got %d", len(pubkeys)) + } +} + +func TestParseVouchTargets(t *testing.T) { + author, bob, carol := mustPubkey(t), mustPubkey(t), mustPubkey(t) + ev := &nostr.Event{ + PubKey: author, + Kind: KindVouchSet, + Tags: nostr.Tags{ + {"d", VouchSetIdentifier}, + {"p", bob}, + {"p", bob}, // duplicate — collapsed + {"p", carol}, + {"p", author}, // self — excluded + }, + } + targets := ParseVouchTargets(ev) + if len(targets) != 2 { + t.Fatalf("expected 2 deduped targets, got %d: %v", len(targets), targets) + } +} + +func TestIsVouchSet(t *testing.T) { + author := mustPubkey(t) + cases := []struct { + name string + kind int + d string + want bool + }{ + {"vouch set", KindVouchSet, VouchSetIdentifier, true}, + {"other follow set", KindVouchSet, "friends", false}, + {"follow set without d", KindVouchSet, "", false}, + {"wrong kind", KindContacts, VouchSetIdentifier, false}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + ev := &nostr.Event{PubKey: author, Kind: tc.kind} + if tc.d != "" { + ev.Tags = nostr.Tags{{"d", tc.d}} + } + if got := IsVouchSet(ev); got != tc.want { + t.Fatalf("IsVouchSet = %v, want %v", got, tc.want) + } + }) + } +} + +func TestProfileReportTarget(t *testing.T) { + author, bob := mustPubkey(t), mustPubkey(t) + cases := []struct { + name string + tags nostr.Tags + want string + wantOK bool + }{ + {"spam profile report", nostr.Tags{{"p", bob, "spam"}}, bob, true}, + {"impersonation profile report", nostr.Tags{{"p", bob, "impersonation"}}, bob, true}, + {"unweighted type (nudity)", nostr.Tags{{"p", bob, "nudity"}}, "", false}, + {"missing report type", nostr.Tags{{"p", bob}}, "", false}, + {"event-level report (has e tag)", nostr.Tags{{"e", "evt", "spam"}, {"p", bob, "spam"}}, "", false}, + {"self report", nostr.Tags{{"p", author, "spam"}}, "", false}, + {"no p tag", nostr.Tags{{"e", "evt", "spam"}}, "", false}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + ev := &nostr.Event{PubKey: author, Kind: KindReport, Tags: tc.tags} + got, ok := profileReportTarget(ev) + if ok != tc.wantOK || got != tc.want { + t.Fatalf("got (%q, %v), want (%q, %v)", got, ok, tc.want, tc.wantOK) + } + }) + } +} diff --git a/internal/models/user.go b/internal/models/user.go index 2e6611c..9da4394 100644 --- a/internal/models/user.go +++ b/internal/models/user.go @@ -24,3 +24,16 @@ type Connection struct { Source string Target string } + +// Vouch represents a user's explicit endorsement of another user, +// authenticated via NIP-98 and submitted through the API. +type Vouch struct { + Source string + Target string +} + +// ReportAggregate summarises reports against a single target pubkey. +// Only reporters with trust_score > 0 contribute. +type ReportAggregate struct { + TotalReporterTrust float64 +} diff --git a/internal/ranking/calculator.go b/internal/ranking/calculator.go index ce1a98c..8629efd 100644 --- a/internal/ranking/calculator.go +++ b/internal/ranking/calculator.go @@ -15,21 +15,31 @@ type scoreWithID struct { score float64 } +// inLink represents a single weighted in-edge in the adjacency list. +// Weight is 1.0 for follow edges and vouchWeight for vouch-only edges. +type inLink struct { + source int32 + weight float64 +} + // Calculator handles PageRank and TrustRank calculations type Calculator struct { repo *repository.Repository seedPubkeys []string trustRankWeight float64 pageRankWeight float64 + vouchWeight float64 } -// NewCalculator creates a new Calculator instance -func NewCalculator(repo *repository.Repository, seedPubkeys []string, trustRankWeight, pageRankWeight float64) *Calculator { +// NewCalculator creates a new Calculator instance. +// vouchWeight is the relative weight of a vouch-only edge (follow edges are 1.0). +func NewCalculator(repo *repository.Repository, seedPubkeys []string, trustRankWeight, pageRankWeight, vouchWeight float64) *Calculator { return &Calculator{ repo: repo, seedPubkeys: seedPubkeys, trustRankWeight: trustRankWeight, pageRankWeight: pageRankWeight, + vouchWeight: vouchWeight, } } @@ -41,6 +51,7 @@ func (c *Calculator) Calculate() error { type edge struct { source int32 target int32 + weight float64 } edges := make([]edge, 0, 1000) @@ -59,12 +70,22 @@ func (c *Calculator) Calculate() error { cutoffTime := time.Now().UTC().AddDate(0, 0, -30) + // edgeSet dedupes (source, target) pairs across follow and vouch edges so + // a user who both follows and vouches for the same target contributes a + // single follow-weighted edge (not double flow). + edgeSet := make(map[int64]bool) + encodeEdge := func(s, t int32) int64 { return int64(s)<<32 | int64(uint32(t)) } + err := c.repo.StreamConnectionsInTx(func(conn models.Connection) error { sourceID := getID(conn.Source) targetID := getID(conn.Target) if sourceID != targetID { - edges = append(edges, edge{source: sourceID, target: targetID}) + key := encodeEdge(sourceID, targetID) + if !edgeSet[key] { + edgeSet[key] = true + edges = append(edges, edge{source: sourceID, target: targetID, weight: 1.0}) + } } connectionCount++ return nil @@ -74,13 +95,50 @@ func (c *Calculator) Calculate() error { return err } + // Vouch edges: admit only those from pubkeys with positive last-round + // TrustRank (seeds always admitted so the feature works on first run when + // no one has trust_score written yet). Skipped entirely when the feature + // is disabled (vouchWeight <= 0). + vouchAdmitted := 0 + if c.vouchWeight > 0 { + qualifying, qErr := c.repo.GetPubkeysWithPositiveTrust() + if qErr != nil { + log.Printf(" [WARN] Failed to load qualifying pubkeys for vouch admission: %v", qErr) + qualifying = make(map[string]struct{}) + } + for _, s := range c.seedPubkeys { + qualifying[s] = struct{}{} + } + if err := c.repo.StreamVouches(func(v models.Vouch) error { + if _, ok := qualifying[v.Source]; !ok { + return nil + } + sourceID := getID(v.Source) + targetID := getID(v.Target) + if sourceID == targetID { + return nil + } + key := encodeEdge(sourceID, targetID) + if edgeSet[key] { + return nil + } + edgeSet[key] = true + edges = append(edges, edge{source: sourceID, target: targetID, weight: c.vouchWeight}) + vouchAdmitted++ + return nil + }, &cutoffTime); err != nil { + return err + } + log.Printf(" [INFO] Vouch edges admitted: %d (weight=%.2f)", vouchAdmitted, c.vouchWeight) + } + numNodes := len(idToPubkey) if numNodes == 0 { log.Println(" [WARN] Graph is empty, skipping calculation") return nil } - log.Printf(" [INFO] Processing %d nodes, %d connections", numNodes, connectionCount) + log.Printf(" [INFO] Processing %d nodes, %d connections (+ %d vouches)", numNodes, connectionCount, vouchAdmitted) // Build seed node set for TrustRank seedSet := make(map[int32]bool) @@ -91,12 +149,16 @@ func (c *Calculator) Calculate() error { } log.Printf(" [INFO] Found %d seed nodes in graph (out of %d configured)", len(seedSet), len(c.seedPubkeys)) - // Build the graph using slices (Adjacency List) - inLinks := make([][]int32, numNodes) + // Build the weighted graph. + // outWeight[i] is the sum of outgoing edge weights (used by flow math). + // outDegree[i] is the discrete count (used only for the Following field). + inLinks := make([][]inLink, numNodes) + outWeight := make([]float64, numNodes) outDegree := make([]int32, numNodes) for _, e := range edges { - inLinks[e.target] = append(inLinks[e.target], e.source) + inLinks[e.target] = append(inLinks[e.target], inLink{source: e.source, weight: e.weight}) + outWeight[e.source] += e.weight outDegree[e.source]++ } @@ -108,13 +170,13 @@ func (c *Calculator) Calculate() error { // Run PageRank log.Println(" [INFO] Running PageRank...") - pageScores := c.runPageRank(numNodes, inLinks, outDegree, dampingFactor, tolerance, maxIterations) + pageScores := c.runPageRank(numNodes, inLinks, outWeight, dampingFactor, tolerance, maxIterations) // Run TrustRank var trustScores []float64 if len(seedSet) > 0 { log.Println(" [INFO] Running TrustRank...") - trustScores = c.runTrustRank(numNodes, inLinks, outDegree, seedSet, dampingFactor, tolerance, maxIterations) + trustScores = c.runTrustRank(numNodes, inLinks, outWeight, seedSet, dampingFactor, tolerance, maxIterations) } else { log.Println(" [WARN] No seed nodes found, skipping TrustRank") trustScores = make([]float64, numNodes) @@ -126,6 +188,42 @@ func (c *Calculator) Calculate() error { scores[i] = c.trustRankWeight*trustScores[i] + c.pageRankWeight*pageScores[i] } + // Apply trust-weighted report penalty: scale each target's scores by + // (1 - penalty) where penalty = R / (R + F + ε). R is the sum of reporter + // trust_scores (only reporters with trust_score > 0 count); F is the sum + // of follower/voucher trust_scores weighted by their edge weights. Rank + // is computed after penalty so penalized accounts drop in the ordering. + if reports, err := c.repo.GetTrustWeightedReports(); err != nil { + log.Printf(" [WARN] Failed to load reports for penalty: %v", err) + } else if len(reports) > 0 { + // Reported targets are sparse, so compute F (weighted follower/voucher + // trust) only for them rather than scanning the whole graph. + penalized := 0 + for target, agg := range reports { + if agg.TotalReporterTrust <= 0 { + continue + } + id, ok := pubkeyToID[target] + if !ok { + continue + } + fTrust := 0.0 + for _, link := range inLinks[id] { + fTrust += trustScores[link.source] * link.weight + } + penalty := agg.TotalReporterTrust / (agg.TotalReporterTrust + fTrust + 1e-9) + if penalty > 1 { + penalty = 1 + } + factor := 1 - penalty + scores[id] *= factor + trustScores[id] *= factor + pageScores[id] *= factor + penalized++ + } + log.Printf(" [INFO] Applied report penalty to %d pubkeys", penalized) + } + // Calculate ranks based on scores rankList := make([]scoreWithID, numNodes) for i := range numNodes { @@ -188,8 +286,10 @@ func (c *Calculator) Calculate() error { return nil } -// runPageRank executes the PageRank algorithm -func (c *Calculator) runPageRank(numNodes int, inLinks [][]int32, outDegree []int32, damping, tolerance float64, maxIterations int) []float64 { +// runPageRank executes the weighted PageRank algorithm. +// Each in-edge carries its own weight; a node's score is distributed among +// its out-neighbors in proportion to edge weight (sum equals outWeight[j]). +func (c *Calculator) runPageRank(numNodes int, inLinks [][]inLink, outWeight []float64, damping, tolerance float64, maxIterations int) []float64 { scores := make([]float64, numNodes) newScores := make([]float64, numNodes) initialScore := 1.0 / float64(numNodes) @@ -201,15 +301,15 @@ func (c *Calculator) runPageRank(numNodes int, inLinks [][]int32, outDegree []in for iter := 0; iter < maxIterations; iter++ { danglingSum := 0.0 for i := range numNodes { - if outDegree[i] == 0 { + if outWeight[i] == 0 { danglingSum += scores[i] } } for i := range numNodes { sum := 0.0 - for _, j := range inLinks[i] { - sum += scores[j] / float64(outDegree[j]) + for _, link := range inLinks[i] { + sum += scores[link.source] * link.weight / outWeight[link.source] } newScores[i] = (1-damping)/float64(numNodes) + damping*(sum+danglingSum/float64(numNodes)) } @@ -231,8 +331,8 @@ func (c *Calculator) runPageRank(numNodes int, inLinks [][]int32, outDegree []in return scores } -// runTrustRank executes the TrustRank algorithm -func (c *Calculator) runTrustRank(numNodes int, inLinks [][]int32, outDegree []int32, seedSet map[int32]bool, damping, tolerance float64, maxIterations int) []float64 { +// runTrustRank executes the weighted TrustRank algorithm. +func (c *Calculator) runTrustRank(numNodes int, inLinks [][]inLink, outWeight []float64, seedSet map[int32]bool, damping, tolerance float64, maxIterations int) []float64 { scores := make([]float64, numNodes) newScores := make([]float64, numNodes) @@ -245,15 +345,15 @@ func (c *Calculator) runTrustRank(numNodes int, inLinks [][]int32, outDegree []i for iter := range maxIterations { danglingSum := 0.0 for i := range numNodes { - if outDegree[i] == 0 { + if outWeight[i] == 0 { danglingSum += scores[i] } } for i := range numNodes { sum := 0.0 - for _, j := range inLinks[i] { - sum += scores[j] / float64(outDegree[j]) + for _, link := range inLinks[i] { + sum += scores[link.source] * link.weight / outWeight[link.source] } // In TrustRank, dangling node scores only flow back to seed nodes diff --git a/internal/ranking/calculator_test.go b/internal/ranking/calculator_test.go new file mode 100644 index 0000000..522d2de --- /dev/null +++ b/internal/ranking/calculator_test.go @@ -0,0 +1,259 @@ +package ranking + +import ( + "path/filepath" + "testing" + "time" + + "fayan/internal/repository" +) + +func newTestRepo(t *testing.T) *repository.Repository { + t.Helper() + dir := t.TempDir() + repo, err := repository.New(filepath.Join(dir, "test.db")) + if err != nil { + t.Fatalf("open repo: %v", err) + } + t.Cleanup(func() { _ = repo.Close() }) + return repo +} + +func insertFollow(t *testing.T, repo *repository.Repository, source, target string) { + t.Helper() + err := repo.BatchUpsertPubkeysAndConnections( + []string{source, target}, + []repository.Connection{{Source: source, Target: target}}, + ) + if err != nil { + t.Fatalf("insert follow %s->%s: %v", source, target, err) + } +} + +func getUser(t *testing.T, repo *repository.Repository, pubkey string) (rank *int, followers, following int, score float64) { + t.Helper() + info, err := repo.GetUserByPubkey(pubkey) + if err != nil { + t.Fatalf("get user %s: %v", pubkey, err) + } + if info == nil { + return nil, 0, 0, 0 + } + return info.Rank, info.Followers, info.Following, info.Score +} + +// TestVouchPromotesUnfollowedUser verifies the core value proposition: after +// two ranking cycles (first establishes seed trust, second admits vouch edge), +// a newbie with no followers but one vouch from a seed receives a rank. +func TestVouchPromotesUnfollowedUser(t *testing.T) { + repo := newTestRepo(t) + + seeds := []string{"seed1", "seed2", "seed3"} + // Make seeds mutually follow so they have outgoing edges; TrustRank + // requires at least some graph structure to propagate. + insertFollow(t, repo, "seed1", "seed2") + insertFollow(t, repo, "seed2", "seed3") + insertFollow(t, repo, "seed3", "seed1") + + calc := NewCalculator(repo, seeds, 0.7, 0.3, 0.5) + + // First pass: seeds acquire trust_score > 0. + if err := calc.Calculate(); err != nil { + t.Fatal(err) + } + + // Seed vouches for a newbie nobody follows. + if err := repo.UpsertVouches("seed1", []string{"newbie"}); err != nil { + t.Fatal(err) + } + + // Second pass: vouch edge admitted because seed1 has trust_score > 0. + if err := calc.Calculate(); err != nil { + t.Fatal(err) + } + + rank, followers, _, score := getUser(t, repo, "newbie") + if rank == nil { + t.Fatalf("newbie should have a rank after vouch") + } + if followers != 1 { + t.Fatalf("newbie should have 1 follower (the vouch edge), got %d", followers) + } + if score <= 0 { + t.Fatalf("newbie score should be positive, got %v", score) + } +} + +// TestVouchWeightShrinksContribution verifies that a lower vouchWeight +// reduces the score a vouch-only edge contributes, relative to a full-weight +// (1.0) follow edge. +func TestVouchWeightShrinksContribution(t *testing.T) { + repo := newTestRepo(t) + + seeds := []string{"seed1", "seed2", "seed3"} + insertFollow(t, repo, "seed1", "seed2") + insertFollow(t, repo, "seed2", "seed3") + insertFollow(t, repo, "seed3", "seed1") + + // Bootstrap seed trust. + calcHigh := NewCalculator(repo, seeds, 0.7, 0.3, 1.0) + if err := calcHigh.Calculate(); err != nil { + t.Fatal(err) + } + if err := repo.UpsertVouches("seed1", []string{"newbie"}); err != nil { + t.Fatal(err) + } + + if err := calcHigh.Calculate(); err != nil { + t.Fatal(err) + } + _, _, _, scoreAtWeight1 := getUser(t, repo, "newbie") + + // Run the same graph again with vouchWeight=0.25. + calcLow := NewCalculator(repo, seeds, 0.7, 0.3, 0.25) + if err := calcLow.Calculate(); err != nil { + t.Fatal(err) + } + _, _, _, scoreAtWeight025 := getUser(t, repo, "newbie") + + if !(scoreAtWeight025 < scoreAtWeight1) { + t.Fatalf("expected lower vouch weight to produce lower score, got %.6g (w=0.25) vs %.6g (w=1.0)", + scoreAtWeight025, scoreAtWeight1) + } + if scoreAtWeight025 <= 0 { + t.Fatalf("score at w=0.25 should still be positive, got %v", scoreAtWeight025) + } +} + +// TestVouchAndFollowDedupe verifies A following AND vouching for B only +// produces one edge (A's following count = 1, not 2). +func TestVouchAndFollowDedupe(t *testing.T) { + repo := newTestRepo(t) + + insertFollow(t, repo, "a", "b") + if err := repo.UpsertVouches("a", []string{"b"}); err != nil { + t.Fatal(err) + } + // Give A trust so vouch edge would be admitted. + repo.BatchUpdatePubkeys([]repository.PubkeyUpdate{{ + Pubkey: "a", + Score: 0.5, + Rank: 1, + TrustScore: 0.5, + PageScore: 0.5, + Followers: 0, + Following: 1, + }}) + + calc := NewCalculator(repo, []string{"a"}, 0.7, 0.3, 0.5) + if err := calc.Calculate(); err != nil { + t.Fatal(err) + } + + _, _, following, _ := getUser(t, repo, "a") + if following != 1 { + t.Fatalf("A should have following=1 after dedup, got %d", following) + } + _, followers, _, _ := getUser(t, repo, "b") + if followers != 1 { + t.Fatalf("B should have followers=1 after dedup, got %d", followers) + } +} + +// TestReportDecaysScore verifies a well-connected pubkey loses score when +// reported by multiple trusted accounts. +func TestReportDecaysScore(t *testing.T) { + repo := newTestRepo(t) + + // Build a graph where X has several followers (seeds), so X's score starts high. + seeds := []string{"seed1", "seed2", "seed3"} + insertFollow(t, repo, "seed1", "seed2") + insertFollow(t, repo, "seed2", "seed3") + insertFollow(t, repo, "seed3", "seed1") + insertFollow(t, repo, "seed1", "x") + insertFollow(t, repo, "seed2", "x") + insertFollow(t, repo, "seed3", "x") + + calc := NewCalculator(repo, seeds, 0.7, 0.3, 0.5) + if err := calc.Calculate(); err != nil { + t.Fatal(err) + } + + _, _, _, scoreBefore := getUser(t, repo, "x") + if scoreBefore <= 0 { + t.Fatalf("setup should give X a positive score, got %v", scoreBefore) + } + + // All three seeds report X. + for _, s := range seeds { + if err := repo.UpsertReport(s, "x", time.Now()); err != nil { + t.Fatal(err) + } + } + + if err := calc.Calculate(); err != nil { + t.Fatal(err) + } + + _, _, _, scoreAfter := getUser(t, repo, "x") + if scoreAfter >= scoreBefore { + t.Fatalf("score should decay after reports: before=%v after=%v", scoreBefore, scoreAfter) + } + // All followers also report → R == F → penalty = 0.5 → score halves. + // Allow a small tolerance for rounding. + if scoreAfter > scoreBefore*0.55 { + t.Fatalf("expected penalty near 0.5 (R==F), score dropped only from %v to %v", scoreBefore, scoreAfter) + } +} + +// TestReportWithNoTrustIgnored verifies reports from untrusted accounts have +// no effect on the reported user's score. +func TestReportWithNoTrustIgnored(t *testing.T) { + repo := newTestRepo(t) + + seeds := []string{"seed1", "seed2"} + insertFollow(t, repo, "seed1", "seed2") + insertFollow(t, repo, "seed2", "seed1") + insertFollow(t, repo, "seed1", "target") + insertFollow(t, repo, "seed2", "target") + + // Add an untrusted account (no inbound edges, no trust). + if _, err := repo.DB().Exec( + `INSERT INTO pubkeys (pubkey, trust_score, created_at, updated_at) VALUES (?, 0, ?, ?);`, + "troll", time.Now(), time.Now(), + ); err != nil { + t.Fatal(err) + } + if err := repo.UpsertReport("troll", "target", time.Now()); err != nil { + t.Fatal(err) + } + + calc := NewCalculator(repo, seeds, 0.7, 0.3, 0.5) + if err := calc.Calculate(); err != nil { + t.Fatal(err) + } + _, _, _, scoreWithTrollReport := getUser(t, repo, "target") + + // Remove the troll report, recompute. + if _, err := repo.DB().Exec( + `DELETE FROM reports WHERE source_pubkey = ? AND target_pubkey = ?;`, + "troll", "target", + ); err != nil { + t.Fatal(err) + } + if err := calc.Calculate(); err != nil { + t.Fatal(err) + } + _, _, _, scoreWithoutReport := getUser(t, repo, "target") + + if absDiff(scoreWithTrollReport, scoreWithoutReport) > 1e-9 { + t.Fatalf("untrusted report should not change score: with=%v without=%v", scoreWithTrollReport, scoreWithoutReport) + } +} + +func absDiff(a, b float64) float64 { + if a > b { + return a - b + } + return b - a +} diff --git a/internal/repository/migration.go b/internal/repository/migration.go index 24ab0a1..1a8eda6 100644 --- a/internal/repository/migration.go +++ b/internal/repository/migration.go @@ -95,6 +95,47 @@ var migrations = []Migration{ return nil }, }, + { + Version: 4, + Name: "add_vouches_and_reports", + Up: func(db *sql.DB) error { + // Vouches share the follow-edge lifecycle: refreshed on each + // kind:30000 set, never actively deleted, aged out by a staleness + // window — hence last_seen (cf. connections), not created_at. + vouchesTable := ` + CREATE TABLE IF NOT EXISTS vouches ( + source_pubkey TEXT NOT NULL, + target_pubkey TEXT NOT NULL, + last_seen TIMESTAMP NOT NULL, + PRIMARY KEY (source_pubkey, target_pubkey) + );` + + reportsTable := ` + CREATE TABLE IF NOT EXISTS reports ( + source_pubkey TEXT NOT NULL, + target_pubkey TEXT NOT NULL, + created_at TIMESTAMP NOT NULL, + PRIMARY KEY (source_pubkey, target_pubkey) + );` + + if _, err := db.Exec(vouchesTable); err != nil { + return fmt.Errorf("failed to create vouches table: %w", err) + } + if _, err := db.Exec(reportsTable); err != nil { + return fmt.Errorf("failed to create reports table: %w", err) + } + if _, err := db.Exec("CREATE INDEX IF NOT EXISTS idx_vouches_target ON vouches(target_pubkey);"); err != nil { + return fmt.Errorf("failed to create idx_vouches_target: %w", err) + } + if _, err := db.Exec("CREATE INDEX IF NOT EXISTS idx_vouches_last_seen ON vouches(last_seen);"); err != nil { + return fmt.Errorf("failed to create idx_vouches_last_seen: %w", err) + } + if _, err := db.Exec("CREATE INDEX IF NOT EXISTS idx_reports_target ON reports(target_pubkey);"); err != nil { + return fmt.Errorf("failed to create idx_reports_target: %w", err) + } + return nil + }, + }, } // RunMigrations executes all pending database migrations diff --git a/internal/repository/repository.go b/internal/repository/repository.go index cc0c1dd..a68d318 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -11,16 +11,6 @@ import ( _ "github.com/mattn/go-sqlite3" ) -// DBMode specifies the database access mode -type DBMode int - -const ( - // ModeReadWrite is for crawler - optimized for writes - ModeReadWrite DBMode = iota - // ModeReadOnly is for API - optimized for reads - ModeReadOnly -) - // totalUsersCache caches the count of users type totalUsersCache struct { count int @@ -36,11 +26,14 @@ type Repository struct { writeMu sync.Mutex // Serializes all write operations for SQLite } -// New creates a new Repository instance -func New(dataSourceName string, mode DBMode) (*Repository, error) { - // Build DSN with parameters that apply to ALL connections in the pool - // This is critical - PRAGMA statements only affect a single connection, - // but DSN parameters are applied when each connection is created +// New creates a new Repository instance. +// WAL mode allows concurrent readers alongside a single writer; writeMu +// serializes writers within this process, SQLite's own locks coordinate +// across processes. +func New(dataSourceName string) (*Repository, error) { + // Build DSN with parameters that apply to ALL connections in the pool. + // DSN parameters are applied when each connection is created, while + // PRAGMA statements would only affect whichever connection ran them. dsnParams := []string{ "_journal_mode=WAL", "_synchronous=NORMAL", @@ -48,11 +41,7 @@ func New(dataSourceName string, mode DBMode) (*Repository, error) { "_cache_size=-64000", "_txlock=immediate", // Acquire write lock at BEGIN, not at first write } - if mode == ModeReadOnly { - dsnParams = append(dsnParams, "_query_only=true") - } - // Append parameters to DSN separator := "?" if strings.Contains(dataSourceName, "?") { separator = "&" @@ -68,20 +57,13 @@ func New(dataSourceName string, mode DBMode) (*Repository, error) { return nil, fmt.Errorf("could not connect to database: %w", err) } - // Configure connection pool based on mode - if mode == ModeReadOnly { - db.SetMaxOpenConns(10) - db.SetMaxIdleConns(5) - } else { - // For write mode: use single connection to avoid lock contention - // SQLite only allows one writer at a time anyway - db.SetMaxOpenConns(1) - db.SetMaxIdleConns(1) - } + // Connection pool: multiple connections so reads can run concurrently + // under WAL. Writes serialize on writeMu, so extra connections do not + // cause write contention. + db.SetMaxOpenConns(10) + db.SetMaxIdleConns(5) db.SetConnMaxLifetime(time.Hour) - // These PRAGMAs don't have DSN equivalents, set on initial connection - // With MaxOpenConns=1 for write mode, this is sufficient additionalPragmas := []string{ "PRAGMA temp_store = MEMORY;", "PRAGMA mmap_size = 1073741824;", @@ -102,11 +84,8 @@ func New(dataSourceName string, mode DBMode) (*Repository, error) { }, } - // Run migrations in read-write mode - if mode == ModeReadWrite { - if err := repo.RunMigrations(); err != nil { - return nil, fmt.Errorf("could not run migrations: %w", err) - } + if err := repo.RunMigrations(); err != nil { + return nil, fmt.Errorf("could not run migrations: %w", err) } return repo, nil diff --git a/internal/repository/vouch.go b/internal/repository/vouch.go new file mode 100644 index 0000000..fef63a9 --- /dev/null +++ b/internal/repository/vouch.go @@ -0,0 +1,179 @@ +package repository + +import ( + "database/sql" + "fmt" + "time" + + "fayan/internal/models" +) + +// UpsertVouches refreshes the vouch edges from source for the pubkeys in the +// latest kind:30000 set, mirroring how kind:3 contacts are stored: each edge is +// upserted with last_seen = now and never actively deleted. A vouch dropped +// from the set simply stops being refreshed and ages out of the ranking graph +// via the same staleness window as follows (see StreamVouches). Targets are +// upserted into pubkeys so ranking can cover brand-new accounts. +func (r *Repository) UpsertVouches(source string, targets []string) error { + if len(targets) == 0 { + return nil + } + + r.writeMu.Lock() + defer r.writeMu.Unlock() + + tx, err := r.db.Begin() + if err != nil { + return fmt.Errorf("failed to begin transaction: %w", err) + } + defer tx.Rollback() + + now := time.Now().UTC() + + pkStmt, err := tx.Prepare(`INSERT INTO pubkeys (pubkey, created_at, updated_at) VALUES (?, ?, ?) ON CONFLICT(pubkey) DO NOTHING;`) + if err != nil { + return fmt.Errorf("failed to prepare pubkey statement: %w", err) + } + defer pkStmt.Close() + + vStmt, err := tx.Prepare(`REPLACE INTO vouches (source_pubkey, target_pubkey, last_seen) VALUES (?, ?, ?);`) + if err != nil { + return fmt.Errorf("failed to prepare vouch statement: %w", err) + } + defer vStmt.Close() + + for _, target := range targets { + if _, err := pkStmt.Exec(target, now, now); err != nil { + return fmt.Errorf("failed to upsert target pubkey %s: %w", target, err) + } + if _, err := vStmt.Exec(source, target, now); err != nil { + return fmt.Errorf("failed to insert vouch %s -> %s: %w", source, target, err) + } + } + + return tx.Commit() +} + +// UpsertReport records source→target as a report edge. Reports are additive +// (kind:1984 events are not replaceable); re-reporting the same target just +// refreshes the timestamp. The target is upserted into pubkeys so ranking can +// cover brand-new accounts. No vouch is touched — the vouch-beats-report +// precedence is resolved at ranking time (see GetTrustWeightedReports). +func (r *Repository) UpsertReport(source, target string, createdAt time.Time) error { + r.writeMu.Lock() + defer r.writeMu.Unlock() + + tx, err := r.db.Begin() + if err != nil { + return fmt.Errorf("failed to begin transaction: %w", err) + } + defer tx.Rollback() + + now := createdAt.UTC() + if _, err := tx.Exec( + `INSERT INTO pubkeys (pubkey, created_at, updated_at) VALUES (?, ?, ?) ON CONFLICT(pubkey) DO NOTHING;`, + target, now, now, + ); err != nil { + return fmt.Errorf("failed to upsert target pubkey: %w", err) + } + + if _, err := tx.Exec( + `INSERT OR REPLACE INTO reports (source_pubkey, target_pubkey, created_at) VALUES (?, ?, ?);`, + source, target, now, + ); err != nil { + return fmt.Errorf("failed to insert report: %w", err) + } + + return tx.Commit() +} + +// GetTrustScore returns the trust_score for a pubkey. Returns 0 if the pubkey +// is not present in the pubkeys table. +func (r *Repository) GetTrustScore(pubkey string) (float64, error) { + var score float64 + err := r.db.QueryRow("SELECT COALESCE(trust_score, 0) FROM pubkeys WHERE pubkey = ?;", pubkey).Scan(&score) + if err != nil { + // sql.ErrNoRows → pubkey not known → trust is zero. + return 0, nil + } + return score, nil +} + +// StreamVouches streams vouch edges. When afterTime is non-nil, only edges +// refreshed at or after it are returned — the same staleness window that ages +// out follow edges, so a vouch dropped from a set eventually stops counting. +func (r *Repository) StreamVouches(callback func(models.Vouch) error, afterTime *time.Time) error { + var rows *sql.Rows + var err error + if afterTime != nil { + rows, err = r.db.Query("SELECT source_pubkey, target_pubkey FROM vouches WHERE last_seen >= ?;", afterTime) + } else { + rows, err = r.db.Query("SELECT source_pubkey, target_pubkey FROM vouches;") + } + if err != nil { + return fmt.Errorf("failed to query vouches: %w", err) + } + defer rows.Close() + + for rows.Next() { + var v models.Vouch + if err := rows.Scan(&v.Source, &v.Target); err != nil { + return fmt.Errorf("failed to scan vouch: %w", err) + } + if err := callback(v); err != nil { + return fmt.Errorf("callback error: %w", err) + } + } + return rows.Err() +} + +// GetPubkeysWithPositiveTrust returns the set of pubkeys whose last-computed +// trust_score is > 0. Used as the vouch-edge admission filter in ranking. +func (r *Repository) GetPubkeysWithPositiveTrust() (map[string]struct{}, error) { + rows, err := r.db.Query("SELECT pubkey FROM pubkeys WHERE trust_score > 0;") + if err != nil { + return nil, fmt.Errorf("failed to query pubkeys with positive trust: %w", err) + } + defer rows.Close() + + result := make(map[string]struct{}) + for rows.Next() { + var pk string + if err := rows.Scan(&pk); err != nil { + return nil, fmt.Errorf("failed to scan pubkey: %w", err) + } + result[pk] = struct{}{} + } + return result, rows.Err() +} + +// GetTrustWeightedReports aggregates reports per target, weighting each report +// by the reporter's trust_score. Reporters with trust_score ≤ 0 are excluded — +// the same admission rule that gates vouch edges. A source that both vouches +// for and reports the same target is not specially handled: the vouch adds +// flow and the report subtracts it, which roughly cancels out on its own. +func (r *Repository) GetTrustWeightedReports() (map[string]models.ReportAggregate, error) { + query := ` + SELECT r.target_pubkey, COALESCE(SUM(p.trust_score), 0) + FROM reports r + JOIN pubkeys p ON p.pubkey = r.source_pubkey + WHERE p.trust_score > 0 + GROUP BY r.target_pubkey; + ` + rows, err := r.db.Query(query) + if err != nil { + return nil, fmt.Errorf("failed to query weighted reports: %w", err) + } + defer rows.Close() + + result := make(map[string]models.ReportAggregate) + for rows.Next() { + var target string + var agg models.ReportAggregate + if err := rows.Scan(&target, &agg.TotalReporterTrust); err != nil { + return nil, fmt.Errorf("failed to scan report aggregate: %w", err) + } + result[target] = agg + } + return result, rows.Err() +} diff --git a/internal/repository/vouch_test.go b/internal/repository/vouch_test.go new file mode 100644 index 0000000..bc1ee6e --- /dev/null +++ b/internal/repository/vouch_test.go @@ -0,0 +1,270 @@ +package repository + +import ( + "path/filepath" + "testing" + "time" + + "fayan/internal/models" +) + +func newTestRepo(t *testing.T) *Repository { + t.Helper() + dir := t.TempDir() + path := filepath.Join(dir, "test.db") + repo, err := New(path) + if err != nil { + t.Fatalf("failed to open repo: %v", err) + } + t.Cleanup(func() { _ = repo.Close() }) + return repo +} + +// seedPubkey inserts a pubkey with the given trust_score so tests can simulate +// an account that has earned reputation in a previous ranking round. +func seedPubkey(t *testing.T, repo *Repository, pubkey string, trustScore float64) { + t.Helper() + now := time.Now().UTC() + if _, err := repo.db.Exec( + `INSERT INTO pubkeys (pubkey, trust_score, created_at, updated_at) VALUES (?, ?, ?, ?) + ON CONFLICT(pubkey) DO UPDATE SET trust_score = excluded.trust_score;`, + pubkey, trustScore, now, now, + ); err != nil { + t.Fatalf("failed to seed pubkey: %v", err) + } +} + +func countRows(t *testing.T, repo *Repository, table, source, target string) int { + t.Helper() + var n int + q := "SELECT COUNT(*) FROM " + table + " WHERE source_pubkey = ? AND target_pubkey = ?;" + if err := repo.db.QueryRow(q, source, target).Scan(&n); err != nil { + t.Fatalf("count query failed: %v", err) + } + return n +} + +var ( + t1 = time.Unix(1_700_000_000, 0).UTC() + t2 = time.Unix(1_700_000_100, 0).UTC() +) + +func TestUpsertVouches_NewInsert(t *testing.T) { + repo := newTestRepo(t) + if err := repo.UpsertVouches("alice", []string{"bob"}); err != nil { + t.Fatalf("UpsertVouches failed: %v", err) + } + if countRows(t, repo, "vouches", "alice", "bob") != 1 { + t.Fatalf("expected one vouch row") + } +} + +// TestUpsertVouches_DoesNotDelete verifies vouches follow the follow-edge +// lifecycle: a target dropped from a later set is NOT actively removed — it +// lingers (to be aged out by the staleness window at ranking time). +func TestUpsertVouches_DoesNotDelete(t *testing.T) { + repo := newTestRepo(t) + if err := repo.UpsertVouches("alice", []string{"bob", "charlie"}); err != nil { + t.Fatal(err) + } + // A later set without bob must not delete bob's edge. + if err := repo.UpsertVouches("alice", []string{"charlie", "dave"}); err != nil { + t.Fatal(err) + } + if countRows(t, repo, "vouches", "alice", "bob") != 1 { + t.Fatalf("expected bob to linger (not actively deleted)") + } + if countRows(t, repo, "vouches", "alice", "charlie") != 1 { + t.Fatalf("expected charlie to remain") + } + if countRows(t, repo, "vouches", "alice", "dave") != 1 { + t.Fatalf("expected dave to be added") + } +} + +func TestUpsertVouches_EmptyNoop(t *testing.T) { + repo := newTestRepo(t) + if err := repo.UpsertVouches("alice", []string{"bob"}); err != nil { + t.Fatal(err) + } + // An empty set is a no-op: nothing is refreshed, nothing is deleted. + if err := repo.UpsertVouches("alice", nil); err != nil { + t.Fatal(err) + } + if countRows(t, repo, "vouches", "alice", "bob") != 1 { + t.Fatalf("expected bob to remain after empty set (no active delete)") + } +} + +func TestUpsertVouches_UpsertsTargetPubkey(t *testing.T) { + repo := newTestRepo(t) + if err := repo.UpsertVouches("alice", []string{"brand-new-target"}); err != nil { + t.Fatal(err) + } + var n int + if err := repo.db.QueryRow("SELECT COUNT(*) FROM pubkeys WHERE pubkey = ?;", "brand-new-target").Scan(&n); err != nil { + t.Fatal(err) + } + if n != 1 { + t.Fatalf("expected target pubkey to be upserted into pubkeys table") + } +} + +func TestUpsertReport_NewAndIdempotent(t *testing.T) { + repo := newTestRepo(t) + if err := repo.UpsertReport("alice", "bob", t1); err != nil { + t.Fatal(err) + } + if err := repo.UpsertReport("alice", "bob", t2); err != nil { + t.Fatal(err) + } + if countRows(t, repo, "reports", "alice", "bob") != 1 { + t.Fatalf("expected exactly one report row after re-report") + } +} + +func TestVouchAndReportCoexist(t *testing.T) { + repo := newTestRepo(t) + // No mutual exclusion at write time: both rows persist independently. The + // vouch adds flow and the report subtracts it at ranking time. + if err := repo.UpsertVouches("alice", []string{"bob"}); err != nil { + t.Fatal(err) + } + if err := repo.UpsertReport("alice", "bob", t1); err != nil { + t.Fatal(err) + } + if countRows(t, repo, "vouches", "alice", "bob") != 1 { + t.Fatalf("expected vouch to persist alongside report") + } + if countRows(t, repo, "reports", "alice", "bob") != 1 { + t.Fatalf("expected report to persist alongside vouch") + } +} + +func TestGetTrustScore_UnknownPubkey(t *testing.T) { + repo := newTestRepo(t) + score, err := repo.GetTrustScore("who-dis") + if err != nil { + t.Fatal(err) + } + if score != 0 { + t.Fatalf("unknown pubkey should return 0 trust, got %v", score) + } +} + +func TestGetTrustScore_KnownPubkey(t *testing.T) { + repo := newTestRepo(t) + seedPubkey(t, repo, "trusted", 0.42) + score, err := repo.GetTrustScore("trusted") + if err != nil { + t.Fatal(err) + } + if score != 0.42 { + t.Fatalf("expected 0.42, got %v", score) + } +} + +func TestStreamVouches(t *testing.T) { + repo := newTestRepo(t) + if err := repo.UpsertVouches("alice", []string{"bob", "charlie"}); err != nil { + t.Fatal(err) + } + if err := repo.UpsertVouches("dave", []string{"bob"}); err != nil { + t.Fatal(err) + } + + var got []models.Vouch + if err := repo.StreamVouches(func(v models.Vouch) error { + got = append(got, v) + return nil + }, nil); err != nil { + t.Fatal(err) + } + if len(got) != 3 { + t.Fatalf("expected 3 vouches, got %d", len(got)) + } +} + +// TestStreamVouches_StaleFiltered verifies the staleness window: edges last +// seen before the cutoff are excluded, the same way stale follow edges are. +func TestStreamVouches_StaleFiltered(t *testing.T) { + repo := newTestRepo(t) + if err := repo.UpsertVouches("alice", []string{"bob"}); err != nil { + t.Fatal(err) + } + + count := func(after *time.Time) int { + n := 0 + if err := repo.StreamVouches(func(models.Vouch) error { n++; return nil }, after); err != nil { + t.Fatal(err) + } + return n + } + + past := time.Now().UTC().Add(-time.Hour) + if count(&past) != 1 { + t.Fatalf("expected the fresh vouch to pass a past cutoff") + } + future := time.Now().UTC().Add(time.Hour) + if count(&future) != 0 { + t.Fatalf("expected the vouch to be filtered out by a future cutoff") + } +} + +func TestGetPubkeysWithPositiveTrust(t *testing.T) { + repo := newTestRepo(t) + seedPubkey(t, repo, "high", 0.5) + seedPubkey(t, repo, "zero", 0) + seedPubkey(t, repo, "neg", -0.1) // should be excluded + + set, err := repo.GetPubkeysWithPositiveTrust() + if err != nil { + t.Fatal(err) + } + if _, ok := set["high"]; !ok { + t.Fatalf("expected 'high' in set") + } + if _, ok := set["zero"]; ok { + t.Fatalf("did not expect 'zero' in set") + } + if _, ok := set["neg"]; ok { + t.Fatalf("did not expect 'neg' in set") + } +} + +func TestGetTrustWeightedReports(t *testing.T) { + repo := newTestRepo(t) + seedPubkey(t, repo, "r1", 0.3) + seedPubkey(t, repo, "r2", 0.7) + seedPubkey(t, repo, "r3", 0) // untrusted; should be excluded + + if err := repo.UpsertReport("r1", "target", t1); err != nil { + t.Fatal(err) + } + if err := repo.UpsertReport("r2", "target", t1); err != nil { + t.Fatal(err) + } + if err := repo.UpsertReport("r3", "target", t1); err != nil { + t.Fatal(err) + } + + reports, err := repo.GetTrustWeightedReports() + if err != nil { + t.Fatal(err) + } + agg, ok := reports["target"] + if !ok { + t.Fatalf("expected 'target' in aggregates") + } + expected := 0.3 + 0.7 + if absDiff(agg.TotalReporterTrust, expected) > 1e-9 { + t.Fatalf("expected trust sum %v, got %v", expected, agg.TotalReporterTrust) + } +} + +func absDiff(a, b float64) float64 { + if a > b { + return a - b + } + return b - a +}