Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,5 @@ domainfront_certs_*/

# Local scripts (excluded from git pushes)
scripts/
adblock_cache
http_cache/
5 changes: 5 additions & 0 deletions config.example.json
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@
],
"youtube_via_relay": false,
"hosts": {},
"disk_cache": {
"enabled": false,
"max_mb": 200,
"dir": ""
},
"exit_node": {
"enabled": false,
"provider": "cloudflare",
Expand Down
189 changes: 189 additions & 0 deletions src/core/disk_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
"""
Persistent on-disk HTTP response cache (L2 behind the in-memory ResponseCache).

Layout
------
Each cached entry is a single binary file in `cache_dir`:

{sha1(url)[:16]}.cache
├── bytes 0–7 : expiry as a little-endian IEEE 754 double (time.time() + ttl)
└── bytes 8–N : raw HTTP response (status line + headers + body)

No index file — the directory listing IS the index. This keeps the
implementation dependency-free and makes cache files portable across runs.

Eviction
--------
1. On every `put()`, remove expired files first.
2. If the directory is still over `max_bytes`, remove oldest files by mtime
until it fits.
3. A single background `evict()` call at startup prunes stale entries from
previous sessions without blocking the first request.

Thread safety
-------------
Each file write is atomic: we write to a `.tmp` file then rename it, so a
crash or concurrent writer never leaves a partial cache entry.
"""

import hashlib
import logging
import os
import struct
import time
from pathlib import Path

log = logging.getLogger("DiskCache")

_HEADER = struct.Struct("<d") # 8 bytes: little-endian double (expiry timestamp)
_HEADER_SIZE = _HEADER.size # 8


def _url_key(url: str) -> str:
return hashlib.sha1(url.encode()).hexdigest()[:24]


class DiskCache:
"""Persistent on-disk HTTP response cache."""

def __init__(self, cache_dir: str | Path, max_mb: int = 200):
self._dir = Path(cache_dir)
self._dir.mkdir(parents=True, exist_ok=True)
self._max_bytes = max_mb * 1024 * 1024
self.hits = 0
self.misses = 0
log.info(
"Disk cache initialised: dir=%s max=%d MB",
self._dir, max_mb,
)

# ── Public API ────────────────────────────────────────────────

def get(self, url: str) -> bytes | None:
path = self._dir / f"{_url_key(url)}.cache"
try:
data = path.read_bytes()
except OSError:
self.misses += 1
return None

if len(data) < _HEADER_SIZE:
path.unlink(missing_ok=True)
self.misses += 1
return None

expiry = _HEADER.unpack_from(data)[0]
if time.time() > expiry:
path.unlink(missing_ok=True)
self.misses += 1
return None

self.hits += 1
return data[_HEADER_SIZE:]

def put(self, url: str, raw_response: bytes, ttl: int) -> None:
if not raw_response or ttl <= 0:
return
# Don't cache single entries larger than 25% of the total budget.
if len(raw_response) > self._max_bytes // 4:
return

expiry = time.time() + ttl
payload = _HEADER.pack(expiry) + raw_response

path = self._dir / f"{_url_key(url)}.cache"
tmp = path.with_suffix(".tmp")
try:
tmp.write_bytes(payload)
tmp.replace(path) # atomic rename
except OSError as exc:
log.debug("Disk cache write failed (%s): %s", url[:60], exc)
try:
tmp.unlink(missing_ok=True)
except OSError:
pass
return

self._maybe_evict()

def evict(self) -> None:
"""Remove expired entries; then prune oldest until under size cap."""
self._evict_expired()
self._evict_to_fit()

def stats(self) -> dict:
total_bytes = sum(
f.stat().st_size
for f in self._dir.glob("*.cache")
if f.is_file()
)
return {
"hits": self.hits,
"misses": self.misses,
"entries": sum(1 for _ in self._dir.glob("*.cache")),
"size_mb": round(total_bytes / (1024 * 1024), 1),
}

# ── Internal helpers ──────────────────────────────────────────

def _maybe_evict(self) -> None:
"""Quick size check — only run full eviction when over budget."""
try:
total = sum(
f.stat().st_size
for f in self._dir.glob("*.cache")
if f.is_file()
)
if total > self._max_bytes:
self._evict_expired()
self._evict_to_fit()
except OSError:
pass

def _evict_expired(self) -> None:
now = time.time()
removed = 0
for path in list(self._dir.glob("*.cache")):
try:
data = path.read_bytes()
if len(data) < _HEADER_SIZE:
path.unlink(missing_ok=True)
removed += 1
continue
expiry = _HEADER.unpack_from(data)[0]
if now > expiry:
path.unlink(missing_ok=True)
removed += 1
except OSError:
pass
if removed:
log.debug("Disk cache: evicted %d expired entries", removed)

def _evict_to_fit(self) -> None:
"""Delete oldest files (by mtime) until total size is under budget."""
try:
files = [
(f.stat().st_mtime, f.stat().st_size, f)
for f in self._dir.glob("*.cache")
if f.is_file()
]
except OSError:
return

total = sum(sz for _, sz, _ in files)
if total <= self._max_bytes:
return

files.sort() # oldest first
removed = 0
for mtime, size, path in files:
if total <= self._max_bytes:
break
try:
path.unlink(missing_ok=True)
total -= size
removed += 1
except OSError:
pass
if removed:
log.debug("Disk cache: evicted %d entries to fit size cap", removed)
49 changes: 43 additions & 6 deletions src/proxy/proxy_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,25 @@ def __init__(self, config: dict):
self.fronter = DomainFronter(config)
self.mitm = None
self._cache = ResponseCache(max_mb=CACHE_MAX_MB)

# ── Disk cache (L2, optional) — persists across restarts ───────────
_dc_cfg = config.get("disk_cache") or {}
self._disk_cache = None
if _dc_cfg.get("enabled", False):
try:
import pathlib as _pathlib
from core.disk_cache import DiskCache as _DiskCache
_default_dir = str(
_pathlib.Path(__file__).resolve().parent.parent.parent / "http_cache"
)
_cache_dir = _dc_cfg.get("dir") or _default_dir
_max_mb = max(10, int(_dc_cfg.get("max_mb", 200)))
self._disk_cache = _DiskCache(_cache_dir, max_mb=_max_mb)
self._disk_cache.evict()
except Exception as _exc:
log.warning("Disk cache init failed: %s — disk caching disabled", _exc)
# ───────────────────────────────────────────────────────────────────

self._direct_fail_until: dict[str, float] = {}
self._servers: list[asyncio.base_events.Server] = []
self._client_tasks: set[asyncio.Task] = set()
Expand Down Expand Up @@ -998,12 +1017,19 @@ async def _relay_http_stream(self, host: str, port: int, reader, writer):
if await self._maybe_stream_download(method, url, headers, body, writer):
continue

# Check local cache first (GET only)
# Check local cache first (GET only) — L1 memory, L2 disk
response = None
if self._cache_allowed(method, url, headers, body):
response = self._cache.get(url)
if response:
log.debug("Cache HIT: %s", url[:60])
log.debug("Cache HIT (mem): %s", url[:60])
elif self._disk_cache is not None:
response = self._disk_cache.get(url)
if response:
log.debug("Cache HIT (disk): %s", url[:60])
mem_ttl = ResponseCache.parse_ttl(response, url)
if mem_ttl > 0:
self._cache.put(url, response, mem_ttl)

if response is None:
# Relay through Apps Script
Expand All @@ -1019,11 +1045,13 @@ async def _relay_http_stream(self, host: str, port: int, reader, writer):
b"\r\n" + err_body
)

# Cache successful GET responses
# Cache successful GET responses in both L1 and L2
if self._cache_allowed(method, url, headers, body) and response:
ttl = ResponseCache.parse_ttl(response, url)
if ttl > 0:
self._cache.put(url, response, ttl)
if self._disk_cache is not None:
self._disk_cache.put(url, response, ttl)
log.debug("Cached (%ds): %s", ttl, url[:60])

# Inject permissive CORS headers whenever the browser sent
Expand Down Expand Up @@ -1176,20 +1204,29 @@ async def _do_http(self, header_block: bytes, reader, writer):
if await self._maybe_stream_download(method, url, headers, body, writer):
return

# Cache check for GET
# Cache check for GET — L1 memory, L2 disk
response = None
if self._cache_allowed(method, url, headers, body):
response = self._cache.get(url)
if response:
log.debug("Cache HIT (HTTP): %s", url[:60])
log.debug("Cache HIT (mem, HTTP): %s", url[:60])
elif self._disk_cache is not None:
response = self._disk_cache.get(url)
if response:
log.debug("Cache HIT (disk, HTTP): %s", url[:60])
mem_ttl = ResponseCache.parse_ttl(response, url)
if mem_ttl > 0:
self._cache.put(url, response, mem_ttl)

if response is None:
response = await self._relay_smart(method, url, headers, body)
# Cache successful GET
# Cache successful GET in both L1 and L2
if self._cache_allowed(method, url, headers, body) and response:
ttl = ResponseCache.parse_ttl(response, url)
if ttl > 0:
self._cache.put(url, response, ttl)
if self._disk_cache is not None:
self._disk_cache.put(url, response, ttl)

if origin and response:
response = inject_cors_headers(response, origin)
Expand Down