diff --git a/media_manager/common/repository.py b/media_manager/common/repository.py index 6e7df289..9fa91a7a 100644 --- a/media_manager/common/repository.py +++ b/media_manager/common/repository.py @@ -2,6 +2,7 @@ from typing import Any, TypeVar from uuid import UUID +from pydantic import BaseModel from sqlalchemy import delete, select from sqlalchemy.exc import IntegrityError, SQLAlchemyError from sqlalchemy.orm import Session @@ -11,11 +12,10 @@ log = logging.getLogger(__name__) T = TypeVar("T") -S = TypeVar("S") EntityId = UUID | int | str -class BaseRepository[T, S]: +class BaseRepository[T, S: BaseModel]: """ Base repository providing common CRUD operations for media models. """ diff --git a/media_manager/indexer/indexers/jackett.py b/media_manager/indexer/indexers/jackett.py index b6fc1257..96e652bc 100644 --- a/media_manager/indexer/indexers/jackett.py +++ b/media_manager/indexer/indexers/jackett.py @@ -132,35 +132,57 @@ def __get_optimal_query_parameters( indexer=indexer, session=session ) if params["t"] == "tvsearch": - if not search_capabilities.supports_tv_search: - msg = f"Indexer {indexer} does not support TV search" - raise RuntimeError(msg) - if search_capabilities.supports_tv_search_season and "season" in params: - query_params["season"] = params["season"] - if search_capabilities.supports_tv_search_episode and "ep" in params: - query_params["ep"] = params["ep"] - if search_capabilities.supports_tv_search_imdb and "imdbid" in params: - query_params["imdbid"] = params["imdbid"] - elif search_capabilities.supports_tv_search_tvdb and "tvdbid" in params: - query_params["tvdbid"] = params["tvdbid"] - elif search_capabilities.supports_tv_search_tmdb and "tmdbid" in params: - query_params["tmdbid"] = params["tmdbid"] - else: - query_params["q"] = params["q"] + self.__add_tv_query_params( + indexer, params, query_params, search_capabilities + ) if params["t"] == "movie": - if not search_capabilities.supports_movie_search: - msg = f"Indexer {indexer} does not support Movie search" - raise RuntimeError(msg) - if search_capabilities.supports_movie_search_imdb and "imdbid" in params: - query_params["imdbid"] = params["imdbid"] - elif search_capabilities.supports_tv_search_tvdb and "tvdbid" in params: - query_params["tvdbid"] = params["tvdbid"] - elif search_capabilities.supports_tv_search_tmdb and "tmdbid" in params: - query_params["tmdbid"] = params["tmdbid"] - else: - query_params["q"] = params["q"] + self.__add_movie_query_params( + indexer, params, query_params, search_capabilities + ) return query_params + def __add_tv_query_params( + self, + indexer: str, + params: dict, + query_params: dict[str, str], + search_capabilities: IndexerInfo, + ) -> None: + if not search_capabilities.supports_tv_search: + msg = f"Indexer {indexer} does not support TV search" + raise RuntimeError(msg) + if search_capabilities.supports_tv_search_season and "season" in params: + query_params["season"] = params["season"] + if search_capabilities.supports_tv_search_episode and "ep" in params: + query_params["ep"] = params["ep"] + if search_capabilities.supports_tv_search_imdb and "imdbid" in params: + query_params["imdbid"] = params["imdbid"] + elif search_capabilities.supports_tv_search_tvdb and "tvdbid" in params: + query_params["tvdbid"] = params["tvdbid"] + elif search_capabilities.supports_tv_search_tmdb and "tmdbid" in params: + query_params["tmdbid"] = params["tmdbid"] + else: + query_params["q"] = params["q"] + + def __add_movie_query_params( + self, + indexer: str, + params: dict, + query_params: dict[str, str], + search_capabilities: IndexerInfo, + ) -> None: + if not search_capabilities.supports_movie_search: + msg = f"Indexer {indexer} does not support Movie search" + raise RuntimeError(msg) + if search_capabilities.supports_movie_search_imdb and "imdbid" in params: + query_params["imdbid"] = params["imdbid"] + elif search_capabilities.supports_tv_search_tvdb and "tvdbid" in params: + query_params["tvdbid"] = params["tvdbid"] + elif search_capabilities.supports_tv_search_tmdb and "tmdbid" in params: + query_params["tmdbid"] = params["tmdbid"] + else: + query_params["q"] = params["q"] + def get_torrents_by_indexer( self, indexer: str, params: dict, session: requests.Session ) -> list[IndexerQueryResult]: diff --git a/media_manager/indexer/indexers/torznab_mixin.py b/media_manager/indexer/indexers/torznab_mixin.py index 852660bd..275390b5 100644 --- a/media_manager/indexer/indexers/torznab_mixin.py +++ b/media_manager/indexer/indexers/torznab_mixin.py @@ -18,71 +18,78 @@ def process_search_result(self, xml: str) -> list[IndexerQueryResult]: } for item in xml_tree.findall("channel/item"): try: - flags = [] - seeders = 0 - age = 0 - indexer_name = "unknown" + result = self._process_item(item, xmlns) + if result: + result_list.append(result) + except Exception: + log.exception("1 Torznab search result failed") + return result_list - if item.find("jackettindexer") is not None: - indexer_name = item.find("jackettindexer").text - if item.find("prowlarrindexer") is not None: - indexer_name = item.find("prowlarrindexer").text + def _process_item(self, item: ET.Element, xmlns: dict) -> IndexerQueryResult | None: + indexer_name = "unknown" + if item.find("jackettindexer") is not None: + indexer_name = item.find("jackettindexer").text or "unknown" + elif item.find("prowlarrindexer") is not None: + indexer_name = item.find("prowlarrindexer").text or "unknown" - is_usenet = ( - item.find("enclosure").attrib["type"] != "application/x-bittorrent" - ) + is_usenet = item.find("enclosure").attrib["type"] != "application/x-bittorrent" - attributes = list(item.findall("torznab:attr", xmlns)) - for attribute in attributes: - if is_usenet: - if attribute.attrib["name"] == "usenetdate": - posted_date = parsedate_to_datetime( - attribute.attrib["value"] - ) - now = datetime.now(datetime.UTC) - age = int((now - posted_date).total_seconds()) - else: - if attribute.attrib["name"] == "seeders": - seeders = int(attribute.attrib["value"]) + seeders, age, flags = self._parse_torznab_attributes(item, xmlns, is_usenet) - if attribute.attrib["name"] == "downloadvolumefactor": - download_volume_factor = float(attribute.attrib["value"]) - if download_volume_factor == 0: - flags.append("freeleech") - if download_volume_factor == 0.5: - flags.append("halfleech") - if download_volume_factor == 0.75: - flags.append("freeleech75") - if download_volume_factor == 0.25: - flags.append("freeleech25") + title = item.find("title").text or "unknown" + size_str = item.find("size") + if size_str is None or size_str.text is None: + log.warning(f"Torznab item {title} has no size, skipping.") + return None - if attribute.attrib["name"] == "uploadvolumefactor": - upload_volume_factor = int(attribute.attrib["value"]) - if upload_volume_factor == 2: - flags.append("doubleupload") + try: + size = int(size_str.text or "0") + except ValueError: + log.warning(f"Torznab item {title} has invalid size, skipping.") + return None - title = item.find("title").text - size_str = item.find("size") - if size_str is None or size_str.text is None: - log.warning(f"Torznab item {title} has no size, skipping.") - continue - try: - size = int(size_str.text or "0") - except ValueError: - log.warning(f"Torznab item {title} has invalid size, skipping.") - continue + return IndexerQueryResult( + title=title, + download_url=str(item.find("enclosure").attrib["url"]), + seeders=seeders, + flags=flags, + size=size, + usenet=is_usenet, + age=age, + indexer=indexer_name, + ) - result = IndexerQueryResult( - title=title or "unknown", - download_url=str(item.find("enclosure").attrib["url"]), - seeders=seeders, - flags=flags, - size=size, - usenet=is_usenet, - age=age, - indexer=indexer_name, - ) - result_list.append(result) - except Exception: - log.exception("1 Torznab search result failed") - return result_list + def _parse_torznab_attributes( + self, item: ET.Element, xmlns: dict, is_usenet: bool + ) -> tuple[int, int, list[str]]: + seeders = 0 + age = 0 + flags = [] + attributes = list(item.findall("torznab:attr", xmlns)) + for attribute in attributes: + name = attribute.attrib["name"] + value = attribute.attrib["value"] + if is_usenet: + if name == "usenetdate": + posted_date = parsedate_to_datetime(value) + now = datetime.now(datetime.UTC) + age = int((now - posted_date).total_seconds()) + else: + if name == "seeders": + seeders = int(value) + elif name == "downloadvolumefactor": + self._add_leech_flags(float(value), flags) + elif name == "uploadvolumefactor": + if int(value) == 2: + flags.append("doubleupload") + return seeders, age, flags + + def _add_leech_flags(self, factor: float, flags: list[str]) -> None: + if factor == 0: + flags.append("freeleech") + elif factor == 0.5: + flags.append("halfleech") + elif factor == 0.75: + flags.append("freeleech75") + elif factor == 0.25: + flags.append("freeleech25") diff --git a/media_manager/indexer/utils.py b/media_manager/indexer/utils.py index 05677fe4..e37b7ff7 100644 --- a/media_manager/indexer/utils.py +++ b/media_manager/indexer/utils.py @@ -16,69 +16,52 @@ def evaluate_indexer_query_result( query_result: IndexerQueryResult, ruleset: ScoringRuleSet ) -> tuple[IndexerQueryResult, bool]: - title_rules = MediaManagerConfig().indexers.title_scoring_rules - indexer_flag_rules = MediaManagerConfig().indexers.indexer_flag_scoring_rules + config = MediaManagerConfig().indexers for rule_name in ruleset.rule_names: - for rule in title_rules: - if rule.name == rule_name: - log.debug(f"Applying rule {rule.name} to {query_result.title}") - if ( - any( - re.search( - rf"\b{re.escape(keyword)}\b", - query_result.title, - re.IGNORECASE, - ) - for keyword in rule.keywords - ) - and not rule.negate - ): - log.debug( - f"Rule {rule.name} with keywords {rule.keywords} matched for {query_result.title}" - ) - query_result.score += rule.score_modifier - elif ( - not any( - re.search( - rf"\b{re.escape(keyword)}\b", - query_result.title, - re.IGNORECASE, - ) - for keyword in rule.keywords - ) - and rule.negate - ): - log.debug( - f"Negated rule {rule.name} with keywords {rule.keywords} matched for {query_result.title}" - ) - query_result.score += rule.score_modifier - else: - log.debug( - f"Rule {rule.name} with keywords {rule.keywords} did not match for {query_result.title}" - ) - for rule in indexer_flag_rules: - if rule.name == rule_name: - log.debug(f"Applying rule {rule.name} to {query_result.title}") - if ( - any(flag in query_result.flags for flag in rule.flags) - and not rule.negate - ): - log.debug( - f"Rule {rule.name} with flags {rule.flags} matched for {query_result.title} with flags {query_result.flags}" - ) - query_result.score += rule.score_modifier - elif ( - not any(flag in query_result.flags for flag in rule.flags) - and rule.negate - ): - log.debug( - f"Negated rule {rule.name} with flags {rule.flags} matched for {query_result.title} with flags {query_result.flags}" - ) - query_result.score += rule.score_modifier - else: - log.debug( - f"Rule {rule.name} with flags {rule.flags} did not match for {query_result.title} with flags {query_result.flags}" - ) + _apply_title_rule(query_result, rule_name, config.title_scoring_rules) + _apply_indexer_flag_rule( + query_result, rule_name, config.indexer_flag_scoring_rules + ) + + if query_result.score <= 0: + return query_result, False + + return query_result, True + + +def _apply_title_rule( + query_result: IndexerQueryResult, rule_name: str, rules: list +) -> None: + for rule in rules: + if rule.name == rule_name: + log.debug(f"Applying rule {rule.name} to {query_result.title}") + matched = any( + re.search(rf"\b{re.escape(k)}\b", query_result.title, re.IGNORECASE) + for k in rule.keywords + ) + if matched != rule.negate: + log.debug( + f"{'Negated ' if rule.negate else ''}Rule {rule.name} matched for {query_result.title}" + ) + query_result.score += rule.score_modifier + else: + log.debug(f"Rule {rule.name} did not match for {query_result.title}") + + +def _apply_indexer_flag_rule( + query_result: IndexerQueryResult, rule_name: str, rules: list +) -> None: + for rule in rules: + if rule.name == rule_name: + log.debug(f"Applying rule {rule.name} to {query_result.title}") + matched = any(f in query_result.flags for f in rule.flags) + if matched != rule.negate: + log.debug( + f"{'Negated ' if rule.negate else ''}Rule {rule.name} matched for {query_result.title}" + ) + query_result.score += rule.score_modifier + else: + log.debug(f"Rule {rule.name} did not match for {query_result.title}") if query_result.score <= 0: return query_result, False diff --git a/ruff.toml b/ruff.toml index 9db5ea7b..29019a08 100644 --- a/ruff.toml +++ b/ruff.toml @@ -6,10 +6,10 @@ line-ending = "lf" quote-style = "double" [lint] -# to be enabled: BLE, C90, CPY, D, DOC, DTZ, FBT, G, PL, RSE, SLF, SIM, TC +# to be enabled: BLE, CPY, D, DOC, DTZ, FBT, G, PL, RSE, SLF, SIM, TC extend-select = [ "A", "ARG", "ASYNC", "ANN", - "B", + "B", "C90", "C4", "COM", "DTZ", "E", "EM", "EXE",