Skip to content
16 changes: 0 additions & 16 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 0 additions & 30 deletions health.go

This file was deleted.

137 changes: 0 additions & 137 deletions health_test.go

This file was deleted.

54 changes: 26 additions & 28 deletions internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"os"
"sync"
Expand Down Expand Up @@ -68,7 +67,7 @@ func New() *Cache {
}

// LoadFrom reads the cache from the given path. A missing file returns nil
// (fresh start). Capped at maxCacheSize bytes via io.LimitReader. Warns if
// (fresh start). Capped at maxCacheSize bytes via atomicfile.LoadJSON. Warns if
// the file has permissive mode bits set, as tokens are stored here.
func (c *Cache) LoadFrom(path string) error {
c.mu.Lock()
Expand All @@ -77,33 +76,21 @@ func (c *Cache) LoadFrom(path string) error {
c.data.LanguageProfiles = make(map[string]map[string]string)
c.data.UserTokens = make(map[string]string)

f, err := os.Open(path)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return nil
}
return err
}
defer f.Close()

if info, statErr := f.Stat(); statErr == nil {
if info.Mode().Perm()&0o077 != 0 {
slog.Warn("cache file has permissive mode; user tokens may be "+
"readable by other host users",
"path", path,
"mode", info.Mode().Perm().String())
info, statErr := os.Stat(path)
if statErr != nil {
if errors.Is(statErr, os.ErrNotExist) {
return nil // missing file = fresh start
}
return statErr
}

data, err := io.ReadAll(io.LimitReader(f, maxCacheSize))
if err != nil {
return err
if info.Mode().Perm()&0o077 != 0 {
slog.Warn("cache file has permissive mode; user tokens may be "+
"readable by other host users",
"path", path, "mode", info.Mode().Perm().String())
}
if int64(len(data)) >= maxCacheSize {
slog.Warn("cache file at size limit, may be truncated",
"path", path, "bytes", len(data), "limit", maxCacheSize)
}
return json.Unmarshal(data, &c.data)
// LoadJSON bounds the read at maxCacheSize and unmarshals; an oversize
// file returns an error (caller starts fresh) rather than truncating.
return atomicfile.LoadJSON(context.Background(), path, maxCacheSize, &c.data)
}

// SaveTo atomically writes the cache to the given path (temp file + rename)
Expand All @@ -120,8 +107,19 @@ func (c *Cache) SaveTo(path string) error {
return fmt.Errorf("marshal: %w", err)
}

if err := atomicfile.WriteFile(context.Background(), path, data, atomicfile.WithMode(0o600)); err != nil {
return err
if err := atomicfile.SaveBytes(path, data, 0o600); err != nil {
// cache.json is reconstructible: a parent-dir fsync failure
// (PhaseDirSync) means the cache was written to disk but its
// durability across an immediate crash is not guaranteed. Don't
// fail the save for that — the data is present and would be rebuilt
// from Plex on the next run anyway. Surface it as a warning instead.
var we *atomicfile.WriteError
if errors.As(err, &we) && we.Phase == atomicfile.PhaseDirSync {
slog.Warn("cache written but parent-dir fsync unconfirmed; not guaranteed durable across an immediate crash",
"path", path, "error", err)
} else {
return err
}
}
slog.Debug("cache saved", "path", path, "bytes", len(data))
return nil
Expand Down
10 changes: 8 additions & 2 deletions internal/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,10 +518,16 @@ func TestCacheSaveToEnforces0600Permissions(t *testing.T) {

func TestCacheSaveToRejectsBadDir(t *testing.T) {
t.Parallel()
// Parent is a regular file, so MkdirAll fails with ENOTDIR even as root;
// SaveTo (via atomicfile.SaveBytes, which auto-creates the dir) must error.
f := filepath.Join(t.TempDir(), "afile")
if err := os.WriteFile(f, []byte("x"), 0o600); err != nil {
t.Fatalf("setup: %v", err)
}
c := New()
err := c.SaveTo("/nonexistent/path-never-created/cache.json")
err := c.SaveTo(filepath.Join(f, "subdir", "cache.json"))
if err == nil {
t.Fatal("SaveTo() on bad dir should return error, got nil")
t.Fatal("SaveTo() under a file should return error, got nil")
}
}

Expand Down
6 changes: 4 additions & 2 deletions internal/plex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ import (
"log/slog"
"net/http"
"net/url"
"os"
"strings"
"time"

"github.com/cplieger/atomicfile"
)

// maxResponseBody caps the number of bytes read from any single Plex JSON
Expand Down Expand Up @@ -134,7 +135,8 @@ func newHTTPClient(caCertPath string) (*http.Client, error) {
},
}
if caCertPath != "" {
pemBytes, err := os.ReadFile(caCertPath)
const maxCACertSize = 1 << 20 // 1 MB
pemBytes, err := atomicfile.ReadBounded(context.Background(), caCertPath, maxCACertSize)
if err != nil {
return nil, fmt.Errorf("reading PLEX_CA_CERT_PATH=%q: %w", caCertPath, err)
}
Expand Down
24 changes: 15 additions & 9 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ import (
"log/slog"
"os"
"os/signal"
"path/filepath"
"sync"
"syscall"
"time"

"github.com/cplieger/atomicfile"
"github.com/cplieger/health"
"github.com/cplieger/plex-language-sync/internal/api"
"github.com/cplieger/plex-language-sync/internal/cache"
"github.com/cplieger/plex-language-sync/internal/ignore"
Expand Down Expand Up @@ -55,7 +58,7 @@ const shutdownWaitBudget = 10 * time.Second

func main() {
if len(os.Args) > 1 && os.Args[1] == "health" {
runProbe(healthMarkerPath)
health.RunProbe(health.DefaultPath)
}
os.Exit(run())
}
Expand All @@ -67,7 +70,7 @@ func run() int {
cfg := loadConfig()
logConfig(&cfg)

marker := newHealthMarker(healthMarkerPath)
marker := health.NewMarker(health.DefaultPath)
marker.Set(false)

client, err := plex.NewClient(cfg.plexURL, cfg.plexToken, cfg.caCertPath)
Expand Down Expand Up @@ -101,6 +104,9 @@ func run() int {
if err := c.LoadFrom(cachePath); err != nil {
slog.Warn("cache load failed, starting fresh", "error", err)
}
// Reap any temp orphaned by an interrupted SaveTo so they don't accumulate
// on the persistent /config volume.
atomicfile.CleanupStaleTemps(filepath.Dir(cachePath), time.Hour)

// User manager — admin identity + cached shared-user tokens.
um := users.NewManager(c)
Expand All @@ -112,19 +118,19 @@ func run() int {
um.InitialRefreshWithRetry(ctx, client, identity.MachineIdentifier, users.DefaultRefreshConfig())

marker.Set(true)
defer marker.Cleanup()
// A failed cache save on shutdown loses the latest learned language
// profiles and user tokens — treat as Error (operator-actionable)
// rather than the Warn used for transient mid-run save failures.
// Shutdown sequence: flag unhealthy first so Docker stops routing health
// probes as passing while the (slow) cache save runs, then persist the
// cache. Set(false) removes the marker, so no separate Cleanup is needed.
// A failed save here loses the latest learned language profiles and user
// tokens, so it is logged at Error (operator-actionable), not the Warn used
// for transient mid-run save failures.
defer func() {
marker.Set(false)
if err := c.SaveTo(cachePath); err != nil {
slog.Error("cache save on shutdown failed, profiles may be lost",
"path", cachePath, "error", err)
}
}()
// Signal unhealthy immediately on shutdown so Docker stops routing
// health probes as passing while cache save / cleanup run.
defer marker.Set(false)

// Compose the sync and scheduler subsystems from the concrete
// internal/* packages, passing api.* interfaces so the subsystems
Expand Down