Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 55 additions & 33 deletions src/atv_player/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from collections.abc import Mapping
from dataclasses import replace
import gc
import httpx
import inspect
import threading
import time
Expand Down Expand Up @@ -65,21 +64,29 @@
from atv_player.metadata.providers.tmdb import TMDBProvider, infer_tmdb_media_type
from atv_player.metadata.providers.tmdb_client import TMDBClient
from atv_player.models import AppConfig, LiveEpgConfig, PlayItem, VodItem
from atv_player.network_proxy import ProxyConfig, ProxyDecider, build_httpx_kwargs_for_url
from atv_player.network_client import NetworkClient
from atv_player.network_proxy import ProxyConfig, ProxyDecider
from atv_player.paths import app_cache_dir, app_data_dir
from atv_player.live_source_repository import LiveSourceRepository
from atv_player.plugins import SpiderPluginLoader, SpiderPluginManager
from atv_player.plugins.compat.base.spider import set_proxy_decider_loader as set_spider_proxy_decider_loader
from atv_player.plugins.compat.base.spider import set_session_loader as set_spider_session_loader
from atv_player.plugins.repository import SpiderPluginRepository
from atv_player.playback_parsers import BuiltInPlaybackParserService
from atv_player.player.m3u8_ad_filter import M3U8AdFilter
from atv_player.proxy.server import LocalHlsProxyServer
from atv_player.yt_dlp_service import YtdlpPlaybackService
from atv_player.storage import SettingsRepository
from atv_player.time_utils import is_refresh_stale
from atv_player.ui.poster_loader import set_proxy_decider_loader
from atv_player.ui.poster_loader import set_http_get_loader, set_proxy_decider_loader
from atv_player.ui.login_window import LoginWindow
from atv_player.ui.main_window import MainWindow, load_direct_parse_detail
from atv_player.ui.main_window import (
MainWindow,
load_direct_parse_detail,
set_main_window_http_get_loader,
set_main_window_http_post_loader,
)
from atv_player.ui.player_window import set_player_window_http_get_loader
from atv_player.ui.icon_cache import load_icon

POSTER_CACHE_MAX_AGE_SECONDS = 7 * 24 * 60 * 60
Expand Down Expand Up @@ -141,6 +148,27 @@ def decide_start_view(config: AppConfig) -> str:
return "main" if config.token else "login"


def _proxy_signature(config: AppConfig) -> tuple[str, str, tuple[str, ...]]:
return (
config.network_proxy_mode,
config.network_proxy_url,
tuple(config.network_proxy_bypass_rules),
)


def _make_proxy_invalidation_wrapper(save_fn, config: AppConfig, network: NetworkClient):
last = [_proxy_signature(config)]

def wrapped() -> None:
save_fn()
current = _proxy_signature(config)
if current != last[0]:
last[0] = current
network.invalidate_proxy()

return wrapped


def _app_icon_path() -> Path:
return Path(__file__).resolve().parent / "icons" / "app.svg"

Expand Down Expand Up @@ -306,34 +334,40 @@ def __init__(self, repo: SettingsRepository) -> None:
self.login_window: LoginWindow | None = None
self.main_window: MainWindow | None = None
self._api_client: ApiClient | None = None
set_proxy_decider_loader(self._build_proxy_decider)
set_spider_proxy_decider_loader(self._build_proxy_decider)
self._network = NetworkClient(self._build_proxy_decider)
set_proxy_decider_loader(lambda: self._network.proxy_decider)
set_http_get_loader(lambda: self._network.get)
set_main_window_http_get_loader(lambda: self._network.get)
set_main_window_http_post_loader(lambda: self._network.post)
set_player_window_http_get_loader(lambda: self._network.get)
set_spider_proxy_decider_loader(lambda: self._network.proxy_decider)
set_spider_session_loader(lambda: self._network.requests_session())
self._m3u8_ad_filter = M3U8AdFilter(
proxy_server=LocalHlsProxyServer(
get=self._proxy_http_get(),
stream=self._proxy_http_stream(),
get=self._network.get,
stream=self._network.stream,
),
get=self._proxy_http_get(),
get=self._network.get,
)
self._playback_parser_service = BuiltInPlaybackParserService(
get=self._proxy_http_get(),
post=self._proxy_http_post(),
get=self._network.get,
post=self._network.post,
)
self._yt_dlp_service = YtdlpPlaybackService(
proxy_decider=self._build_proxy_decider(),
config_loader=self.repo.load_config,
)
self._danmaku_service = create_default_danmaku_service(
get=self._proxy_http_get(),
post=self._proxy_http_post(),
get=self._network.get,
post=self._network.post,
)
if hasattr(repo, "database_path"):
self._live_source_repository = LiveSourceRepository(repo.database_path)
self._live_epg_repository = LiveEpgRepository(repo.database_path)
self._plugin_repository = SpiderPluginRepository(repo.database_path)
self._playback_history_repository = LocalPlaybackHistoryRepository(repo.database_path)
cache_dir = app_cache_dir() / "plugins"
self._plugin_loader = SpiderPluginLoader(cache_dir, get=self._proxy_http_get())
self._plugin_loader = SpiderPluginLoader(cache_dir, get=self._network.get)
self._plugin_manager = SpiderPluginManager(
self._plugin_repository,
self._plugin_loader,
Expand Down Expand Up @@ -384,28 +418,13 @@ def _build_proxy_decider(self) -> ProxyDecider:
)

def _proxy_http_get(self):
def run(url: str, **kwargs):
request_kwargs = dict(kwargs)
request_kwargs.update(build_httpx_kwargs_for_url(self._build_proxy_decider(), url))
return httpx.get(url, **request_kwargs)

return run
return self._network.get

def _proxy_http_post(self):
def run(url: str, **kwargs):
request_kwargs = dict(kwargs)
request_kwargs.update(build_httpx_kwargs_for_url(self._build_proxy_decider(), url))
return httpx.post(url, **request_kwargs)

return run
return self._network.post

def _proxy_http_stream(self):
def run(method: str, url: str, **kwargs):
request_kwargs = dict(kwargs)
request_kwargs.update(build_httpx_kwargs_for_url(self._build_proxy_decider(), url))
return httpx.stream(method, url, **request_kwargs)

return run
return self._network.stream

def start(self) -> QWidget:
config = self.repo.load_config()
Expand Down Expand Up @@ -1211,7 +1230,9 @@ def plugin_loader_task():
history_controller=history_controller,
player_controller=player_controller,
config=config,
save_config=lambda: self.repo.save_config(config),
save_config=_make_proxy_invalidation_wrapper(
lambda: self.repo.save_config(config), config, self._network
),
douban_controller=douban_controller,
telegram_controller=telegram_controller,
bilibili_controller=bilibili_controller,
Expand Down Expand Up @@ -1344,3 +1365,4 @@ def close(self) -> None:
if callable(close_filter):
close_filter()
self._close_api_client()
self._network.close()
156 changes: 156 additions & 0 deletions src/atv_player/network_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
from __future__ import annotations

import threading
from collections.abc import Callable

import httpx
import requests

from atv_player.network_proxy import ProxyDecider


_DEFAULT_POOL_LIMIT = 10
_MANUAL_PREFIX = "manual:"


def _client_key(decider: ProxyDecider | None, url: str) -> str:
if decider is None:
return "direct"
decision = decider.decide(url)
if decision.kind == "direct":
return "direct"
if decision.kind == "system":
return "system"
if decision.kind == "manual":
return f"{_MANUAL_PREFIX}{decision.proxy_url}"
return "direct"


def _default_client_factory(key: str, pool_limit: int) -> httpx.Client:
limits = httpx.Limits(
max_connections=pool_limit,
max_keepalive_connections=pool_limit,
)
if key == "direct":
return httpx.Client(trust_env=False, limits=limits)
if key == "system":
return httpx.Client(trust_env=True, limits=limits)
if key.startswith(_MANUAL_PREFIX):
proxy_url = key[len(_MANUAL_PREFIX):]
return httpx.Client(proxy=proxy_url, trust_env=False, limits=limits)
raise ValueError(f"unknown client key: {key}")


def _default_session_factory(pool_limit: int) -> requests.Session:
session = requests.Session()
adapter = requests.adapters.HTTPAdapter(
pool_connections=pool_limit,
pool_maxsize=pool_limit,
)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session


class NetworkClient:
def __init__(
self,
decider_factory: Callable[[], ProxyDecider | None],
*,
pool_limit: int = _DEFAULT_POOL_LIMIT,
client_factory: Callable[[str, int], httpx.Client] = _default_client_factory,
session_factory: Callable[[int], requests.Session] = _default_session_factory,
) -> None:
self._decider_factory = decider_factory
self._pool_limit = pool_limit
self._client_factory = client_factory
self._session_factory = session_factory
self._lock = threading.Lock()
self._decider: ProxyDecider | None = None
self._decider_loaded = False
self._clients: dict[str, httpx.Client] = {}
self._session: requests.Session | None = None
self._closed = False

@property
def proxy_decider(self) -> ProxyDecider | None:
with self._lock:
return self._load_decider_locked()

def _load_decider_locked(self) -> ProxyDecider | None:
if not self._decider_loaded:
self._decider = self._decider_factory()
self._decider_loaded = True
return self._decider

def invalidate_proxy(self) -> None:
with self._lock:
if self._closed:
return
self._decider = None
self._decider_loaded = False
non_direct_keys = [k for k in self._clients if k != "direct"]
clients_to_close = [self._clients.pop(k) for k in non_direct_keys]
session_to_close = self._session
self._session = None
for client in clients_to_close:
try:
client.close()
except Exception:
pass
if session_to_close is not None:
try:
session_to_close.close()
except Exception:
pass

def close(self) -> None:
with self._lock:
if self._closed:
return
self._closed = True
clients_to_close = list(self._clients.values())
self._clients.clear()
session_to_close = self._session
self._session = None
self._decider = None
self._decider_loaded = False
for client in clients_to_close:
try:
client.close()
except Exception:
pass
if session_to_close is not None:
try:
session_to_close.close()
except Exception:
pass

def _resolve(self, url: str) -> httpx.Client:
with self._lock:
if self._closed:
raise RuntimeError("NetworkClient is closed")
decider = self._load_decider_locked()
key = _client_key(decider, url)
client = self._clients.get(key)
if client is None:
client = self._client_factory(key, self._pool_limit)
self._clients[key] = client
return client

def get(self, url: str, **kwargs):
return self._resolve(url).get(url, **kwargs)

def post(self, url: str, **kwargs):
return self._resolve(url).post(url, **kwargs)

def stream(self, method: str, url: str, **kwargs):
return self._resolve(url).stream(method, url, **kwargs)

def requests_session(self) -> requests.Session:
with self._lock:
if self._closed:
raise RuntimeError("NetworkClient is closed")
if self._session is None:
self._session = self._session_factory(self._pool_limit)
return self._session
22 changes: 18 additions & 4 deletions src/atv_player/plugins/compat/base/spider.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

_CACHE_ROOT = Path.home() / ".cache" / "atv-player" / "plugins" / "spider-cache"
_proxy_decider_loader: Callable[[], ProxyDecider | None] | None = None
_session_loader: Callable[[], requests.Session | None] | None = None


def set_cache_root(path: Path | str) -> None:
Expand All @@ -28,12 +29,23 @@ def set_proxy_decider_loader(loader: Callable[[], ProxyDecider | None] | None) -
_proxy_decider_loader = loader


def set_session_loader(loader: Callable[[], requests.Session | None] | None) -> None:
global _session_loader
_session_loader = loader


def _effective_proxy_decider() -> ProxyDecider | None:
if _proxy_decider_loader is None:
return None
return _proxy_decider_loader()


def _effective_session() -> requests.Session | None:
if _session_loader is None:
return None
return _session_loader()


def _cache_path(key: str) -> Path:
_CACHE_ROOT.mkdir(parents=True, exist_ok=True)
return _CACHE_ROOT / f"{sha256(key.encode('utf-8')).hexdigest()}.cache"
Expand Down Expand Up @@ -98,8 +110,7 @@ def fetch(
stream=False,
allow_redirects=True,
):
response = requests.get(
url,
kwargs = dict(
params=params,
cookies=cookies,
headers=headers,
Expand All @@ -109,6 +120,8 @@ def fetch(
allow_redirects=allow_redirects,
proxies=build_requests_proxies_for_url(_effective_proxy_decider(), url),
)
session = _effective_session()
response = session.get(url, **kwargs) if session is not None else requests.get(url, **kwargs)
response.encoding = "utf-8"
return _buffer_and_close_response(response)

Expand All @@ -125,8 +138,7 @@ def post(
stream=False,
allow_redirects=True,
):
response = requests.post(
url,
kwargs = dict(
params=params,
data=data,
json=json,
Expand All @@ -138,6 +150,8 @@ def post(
allow_redirects=allow_redirects,
proxies=build_requests_proxies_for_url(_effective_proxy_decider(), url),
)
session = _effective_session()
response = session.post(url, **kwargs) if session is not None else requests.post(url, **kwargs)
response.encoding = "utf-8"
return _buffer_and_close_response(response)

Expand Down
Loading
Loading