Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions media_manager/common/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Any, TypeVar
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify TypeVar is not used anywhere else in the file
rg -n "TypeVar" media_manager/common/repository.py

Repository: maxdorninger/MediaManager

Length of output: 123


🏁 Script executed:

cat -n media_manager/common/repository.py | head -30

Repository: maxdorninger/MediaManager

Length of output: 1071


🏁 Script executed:

rg -n "^from media_manager.common.repository import|^import media_manager.common.repository" --type py

Repository: maxdorninger/MediaManager

Length of output: 350


Remove unused TypeVar import and dead code assignment.

The PEP 695 class-level type parameter syntax on line 18 (class BaseRepository[T, S: BaseModel]) introduces T and S as implicit type parameters, making the module-level T = TypeVar("T") on line 14 and the TypeVar import on line 2 unused dead code.

♻️ Proposed fix
-from typing import Any, TypeVar
+from typing import Any
 from uuid import UUID
-T = TypeVar("T")
 EntityId = UUID | int | str
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
from typing import Any, TypeVar
from typing import Any
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@media_manager/common/repository.py` at line 2, Remove the now-unused
module-level TypeVar import and assignment: delete the TypeVar import from the
top of the module and remove the `T = TypeVar("T")` binding, since
BaseRepository uses PEP 695 class-level type params (`class BaseRepository[T, S:
BaseModel]`) and those make the module-level `T` and `S` TypeVar declarations
dead code; ensure no other symbols reference the removed `T` binding before
committing.

from uuid import UUID

from pydantic import BaseModel
from sqlalchemy import delete, select
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
from sqlalchemy.orm import Session
Expand All @@ -11,11 +12,10 @@
log = logging.getLogger(__name__)

T = TypeVar("T")
S = TypeVar("S")
EntityId = UUID | int | str


class BaseRepository[T, S]:
class BaseRepository[T, S: BaseModel]:
"""
Base repository providing common CRUD operations for media models.
"""
Expand Down
74 changes: 48 additions & 26 deletions media_manager/indexer/indexers/jackett.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,35 +132,57 @@ def __get_optimal_query_parameters(
indexer=indexer, session=session
)
if params["t"] == "tvsearch":
if not search_capabilities.supports_tv_search:
msg = f"Indexer {indexer} does not support TV search"
raise RuntimeError(msg)
if search_capabilities.supports_tv_search_season and "season" in params:
query_params["season"] = params["season"]
if search_capabilities.supports_tv_search_episode and "ep" in params:
query_params["ep"] = params["ep"]
if search_capabilities.supports_tv_search_imdb and "imdbid" in params:
query_params["imdbid"] = params["imdbid"]
elif search_capabilities.supports_tv_search_tvdb and "tvdbid" in params:
query_params["tvdbid"] = params["tvdbid"]
elif search_capabilities.supports_tv_search_tmdb and "tmdbid" in params:
query_params["tmdbid"] = params["tmdbid"]
else:
query_params["q"] = params["q"]
self.__add_tv_query_params(
indexer, params, query_params, search_capabilities
)
if params["t"] == "movie":
if not search_capabilities.supports_movie_search:
msg = f"Indexer {indexer} does not support Movie search"
raise RuntimeError(msg)
if search_capabilities.supports_movie_search_imdb and "imdbid" in params:
query_params["imdbid"] = params["imdbid"]
elif search_capabilities.supports_tv_search_tvdb and "tvdbid" in params:
query_params["tvdbid"] = params["tvdbid"]
elif search_capabilities.supports_tv_search_tmdb and "tmdbid" in params:
query_params["tmdbid"] = params["tmdbid"]
else:
query_params["q"] = params["q"]
self.__add_movie_query_params(
indexer, params, query_params, search_capabilities
)
return query_params

def __add_tv_query_params(
self,
indexer: str,
params: dict,
query_params: dict[str, str],
search_capabilities: IndexerInfo,
) -> None:
if not search_capabilities.supports_tv_search:
msg = f"Indexer {indexer} does not support TV search"
raise RuntimeError(msg)
if search_capabilities.supports_tv_search_season and "season" in params:
query_params["season"] = params["season"]
if search_capabilities.supports_tv_search_episode and "ep" in params:
query_params["ep"] = params["ep"]
if search_capabilities.supports_tv_search_imdb and "imdbid" in params:
query_params["imdbid"] = params["imdbid"]
elif search_capabilities.supports_tv_search_tvdb and "tvdbid" in params:
query_params["tvdbid"] = params["tvdbid"]
elif search_capabilities.supports_tv_search_tmdb and "tmdbid" in params:
query_params["tmdbid"] = params["tmdbid"]
else:
query_params["q"] = params["q"]

def __add_movie_query_params(
self,
indexer: str,
params: dict,
query_params: dict[str, str],
search_capabilities: IndexerInfo,
) -> None:
if not search_capabilities.supports_movie_search:
msg = f"Indexer {indexer} does not support Movie search"
raise RuntimeError(msg)
if search_capabilities.supports_movie_search_imdb and "imdbid" in params:
query_params["imdbid"] = params["imdbid"]
elif search_capabilities.supports_tv_search_tvdb and "tvdbid" in params:
query_params["tvdbid"] = params["tvdbid"]
elif search_capabilities.supports_tv_search_tmdb and "tmdbid" in params:
query_params["tmdbid"] = params["tmdbid"]
Comment on lines +179 to +182
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Movie query path checks TV capability flags by mistake.

At Line 179-182, the movie helper uses supports_tv_search_* flags. This can suppress valid movie ID parameters and reduce query quality.

Proposed fix
-        elif search_capabilities.supports_tv_search_tvdb and "tvdbid" in params:
+        elif search_capabilities.supports_movie_search_tvdb and "tvdbid" in params:
             query_params["tvdbid"] = params["tvdbid"]
-        elif search_capabilities.supports_tv_search_tmdb and "tmdbid" in params:
+        elif search_capabilities.supports_movie_search_tmdb and "tmdbid" in params:
             query_params["tmdbid"] = params["tmdbid"]
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@media_manager/indexer/indexers/jackett.py` around lines 179 - 182, The movie
query branch is incorrectly gating movie ID params on TV capability flags;
update the conditionals in the movie helper so they check movie capabilities
instead of TV ones — replace uses of search_capabilities.supports_tv_search_tvdb
with search_capabilities.supports_movie_search_tvdb and
search_capabilities.supports_tv_search_tmdb with
search_capabilities.supports_movie_search_tmdb (keeping the param checks for
"tvdbid" and "tmdbid" and assignment to query_params["tvdbid"]/["tmdbid"]
intact) to ensure movie ID params are included when movie search support exists.

else:
query_params["q"] = params["q"]

def get_torrents_by_indexer(
self, indexer: str, params: dict, session: requests.Session
) -> list[IndexerQueryResult]:
Expand Down
129 changes: 68 additions & 61 deletions media_manager/indexer/indexers/torznab_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,71 +18,78 @@ def process_search_result(self, xml: str) -> list[IndexerQueryResult]:
}
for item in xml_tree.findall("channel/item"):
try:
flags = []
seeders = 0
age = 0
indexer_name = "unknown"
result = self._process_item(item, xmlns)
if result:
result_list.append(result)
except Exception:
log.exception("1 Torznab search result failed")
return result_list

if item.find("jackettindexer") is not None:
indexer_name = item.find("jackettindexer").text
if item.find("prowlarrindexer") is not None:
indexer_name = item.find("prowlarrindexer").text
def _process_item(self, item: ET.Element, xmlns: dict) -> IndexerQueryResult | None:
indexer_name = "unknown"
if item.find("jackettindexer") is not None:
indexer_name = item.find("jackettindexer").text or "unknown"
elif item.find("prowlarrindexer") is not None:
indexer_name = item.find("prowlarrindexer").text or "unknown"

is_usenet = (
item.find("enclosure").attrib["type"] != "application/x-bittorrent"
)
is_usenet = item.find("enclosure").attrib["type"] != "application/x-bittorrent"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Guard enclosure access before using attrib fields.

At Line 35 and Line 53, item.find("enclosure") is dereferenced directly. A missing enclosure node will raise and discard the item via the outer exception path; handle this explicitly and skip cleanly with a warning.

Proposed fix
-        is_usenet = item.find("enclosure").attrib["type"] != "application/x-bittorrent"
+        enclosure = item.find("enclosure")
+        if enclosure is None:
+            log.warning("Torznab item missing enclosure, skipping.")
+            return None
+        is_usenet = enclosure.attrib.get("type") != "application/x-bittorrent"
...
-            download_url=str(item.find("enclosure").attrib["url"]),
+            download_url=str(enclosure.attrib.get("url", "")),

Also applies to: 53-53

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@media_manager/indexer/indexers/torznab_mixin.py` at line 35, The code
dereferences item.find("enclosure") directly (used when setting is_usenet and
later at the second occurrence), which will raise if the enclosure node is
missing; change both spots to first assign enclosure = item.find("enclosure"),
check if enclosure is None and if so log a warning (include identifying context
like item title or guid) and skip/continue that item, otherwise access
enclosure.attrib["type"] (and other attrib keys) safely—look for the assignments
to is_usenet and the later direct enclosure.attrib usage in torznab_mixin.py and
wrap them with this guard to avoid exceptions.


attributes = list(item.findall("torznab:attr", xmlns))
for attribute in attributes:
if is_usenet:
if attribute.attrib["name"] == "usenetdate":
posted_date = parsedate_to_datetime(
attribute.attrib["value"]
)
now = datetime.now(datetime.UTC)
age = int((now - posted_date).total_seconds())
else:
if attribute.attrib["name"] == "seeders":
seeders = int(attribute.attrib["value"])
seeders, age, flags = self._parse_torznab_attributes(item, xmlns, is_usenet)

if attribute.attrib["name"] == "downloadvolumefactor":
download_volume_factor = float(attribute.attrib["value"])
if download_volume_factor == 0:
flags.append("freeleech")
if download_volume_factor == 0.5:
flags.append("halfleech")
if download_volume_factor == 0.75:
flags.append("freeleech75")
if download_volume_factor == 0.25:
flags.append("freeleech25")
title = item.find("title").text or "unknown"
size_str = item.find("size")
if size_str is None or size_str.text is None:
log.warning(f"Torznab item {title} has no size, skipping.")
return None

if attribute.attrib["name"] == "uploadvolumefactor":
upload_volume_factor = int(attribute.attrib["value"])
if upload_volume_factor == 2:
flags.append("doubleupload")
try:
size = int(size_str.text or "0")
except ValueError:
log.warning(f"Torznab item {title} has invalid size, skipping.")
return None

title = item.find("title").text
size_str = item.find("size")
if size_str is None or size_str.text is None:
log.warning(f"Torznab item {title} has no size, skipping.")
continue
try:
size = int(size_str.text or "0")
except ValueError:
log.warning(f"Torznab item {title} has invalid size, skipping.")
continue
return IndexerQueryResult(
title=title,
download_url=str(item.find("enclosure").attrib["url"]),
seeders=seeders,
flags=flags,
size=size,
usenet=is_usenet,
age=age,
indexer=indexer_name,
)

result = IndexerQueryResult(
title=title or "unknown",
download_url=str(item.find("enclosure").attrib["url"]),
seeders=seeders,
flags=flags,
size=size,
usenet=is_usenet,
age=age,
indexer=indexer_name,
)
result_list.append(result)
except Exception:
log.exception("1 Torznab search result failed")
return result_list
def _parse_torznab_attributes(
self, item: ET.Element, xmlns: dict, is_usenet: bool
) -> tuple[int, int, list[str]]:
seeders = 0
age = 0
flags = []
attributes = list(item.findall("torznab:attr", xmlns))
for attribute in attributes:
name = attribute.attrib["name"]
value = attribute.attrib["value"]
if is_usenet:
if name == "usenetdate":
posted_date = parsedate_to_datetime(value)
now = datetime.now(datetime.UTC)
age = int((now - posted_date).total_seconds())
else:
if name == "seeders":
seeders = int(value)
elif name == "downloadvolumefactor":
self._add_leech_flags(float(value), flags)
elif name == "uploadvolumefactor":
if int(value) == 2:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

uploadvolumefactor parsing can fail for decimal values.

At Line 83, int(value) will fail for valid values like "2.0", which can cause items to be dropped. Parse as float and compare numerically.

Proposed fix
-                elif name == "uploadvolumefactor":
-                    if int(value) == 2:
+                elif name == "uploadvolumefactor":
+                    if float(value) == 2.0:
                         flags.append("doubleupload")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if int(value) == 2:
elif name == "uploadvolumefactor":
if float(value) == 2.0:
flags.append("doubleupload")
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@media_manager/indexer/indexers/torznab_mixin.py` at line 83, The check for
uploadvolumefactor uses int(value) which raises on valid decimal strings like
"2.0"; change the logic that currently does if int(value) == 2 to parse as a
float instead (e.g. val = float(value); if val == 2.0) and wrap the conversion
in a try/except ValueError to gracefully skip or handle malformed values; update
the comparison site that references uploadvolumefactor/value in torznab_mixin.py
so numeric decimal inputs are accepted.

flags.append("doubleupload")
return seeders, age, flags

def _add_leech_flags(self, factor: float, flags: list[str]) -> None:
if factor == 0:
flags.append("freeleech")
elif factor == 0.5:
flags.append("halfleech")
elif factor == 0.75:
flags.append("freeleech75")
elif factor == 0.25:
flags.append("freeleech25")
107 changes: 45 additions & 62 deletions media_manager/indexer/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,69 +16,52 @@
def evaluate_indexer_query_result(
query_result: IndexerQueryResult, ruleset: ScoringRuleSet
) -> tuple[IndexerQueryResult, bool]:
title_rules = MediaManagerConfig().indexers.title_scoring_rules
indexer_flag_rules = MediaManagerConfig().indexers.indexer_flag_scoring_rules
config = MediaManagerConfig().indexers
for rule_name in ruleset.rule_names:
for rule in title_rules:
if rule.name == rule_name:
log.debug(f"Applying rule {rule.name} to {query_result.title}")
if (
any(
re.search(
rf"\b{re.escape(keyword)}\b",
query_result.title,
re.IGNORECASE,
)
for keyword in rule.keywords
)
and not rule.negate
):
log.debug(
f"Rule {rule.name} with keywords {rule.keywords} matched for {query_result.title}"
)
query_result.score += rule.score_modifier
elif (
not any(
re.search(
rf"\b{re.escape(keyword)}\b",
query_result.title,
re.IGNORECASE,
)
for keyword in rule.keywords
)
and rule.negate
):
log.debug(
f"Negated rule {rule.name} with keywords {rule.keywords} matched for {query_result.title}"
)
query_result.score += rule.score_modifier
else:
log.debug(
f"Rule {rule.name} with keywords {rule.keywords} did not match for {query_result.title}"
)
for rule in indexer_flag_rules:
if rule.name == rule_name:
log.debug(f"Applying rule {rule.name} to {query_result.title}")
if (
any(flag in query_result.flags for flag in rule.flags)
and not rule.negate
):
log.debug(
f"Rule {rule.name} with flags {rule.flags} matched for {query_result.title} with flags {query_result.flags}"
)
query_result.score += rule.score_modifier
elif (
not any(flag in query_result.flags for flag in rule.flags)
and rule.negate
):
log.debug(
f"Negated rule {rule.name} with flags {rule.flags} matched for {query_result.title} with flags {query_result.flags}"
)
query_result.score += rule.score_modifier
else:
log.debug(
f"Rule {rule.name} with flags {rule.flags} did not match for {query_result.title} with flags {query_result.flags}"
)
_apply_title_rule(query_result, rule_name, config.title_scoring_rules)
_apply_indexer_flag_rule(
query_result, rule_name, config.indexer_flag_scoring_rules
)

if query_result.score <= 0:
return query_result, False

return query_result, True
Comment on lines +26 to +29
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Non-positive pass check conflicts with downstream retention of zero-score results.

At Line 26, score 0 is marked as failed, but downstream filtering still keeps score >= 0, so zero-score entries are effectively retained despite failing. This causes inconsistent scoring behaviour and misleading logs.

Proposed fix
-    if query_result.score <= 0:
+    if query_result.score < 0:
         return query_result, False
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if query_result.score <= 0:
return query_result, False
return query_result, True
if query_result.score < 0:
return query_result, False
return query_result, True
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@media_manager/indexer/utils.py` around lines 26 - 29, The pass/fail gate in
media_manager/indexer/utils.py currently treats query_result.score <= 0 as a
failure which contradicts downstream logic that retains score >= 0; update the
condition to treat only negative scores as failures (i.e., change the check from
<= 0 to < 0 around query_result.score) so that the function returning
(query_result, True) matches downstream filtering; keep the same return shapes
(return query_result, False / True) and adjust any nearby comments/logs
referencing the threshold if present.



def _apply_title_rule(
query_result: IndexerQueryResult, rule_name: str, rules: list
) -> None:
for rule in rules:
if rule.name == rule_name:
log.debug(f"Applying rule {rule.name} to {query_result.title}")
matched = any(
re.search(rf"\b{re.escape(k)}\b", query_result.title, re.IGNORECASE)
for k in rule.keywords
)
if matched != rule.negate:
log.debug(
f"{'Negated ' if rule.negate else ''}Rule {rule.name} matched for {query_result.title}"
)
query_result.score += rule.score_modifier
else:
log.debug(f"Rule {rule.name} did not match for {query_result.title}")


def _apply_indexer_flag_rule(
query_result: IndexerQueryResult, rule_name: str, rules: list
) -> None:
for rule in rules:
if rule.name == rule_name:
log.debug(f"Applying rule {rule.name} to {query_result.title}")
matched = any(f in query_result.flags for f in rule.flags)
if matched != rule.negate:
log.debug(
f"{'Negated ' if rule.negate else ''}Rule {rule.name} matched for {query_result.title}"
)
query_result.score += rule.score_modifier
else:
log.debug(f"Rule {rule.name} did not match for {query_result.title}")
if query_result.score <= 0:
return query_result, False

Expand Down
4 changes: 2 additions & 2 deletions ruff.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ line-ending = "lf"
quote-style = "double"

[lint]
# to be enabled: BLE, C90, CPY, D, DOC, DTZ, FBT, G, PL, RSE, SLF, SIM, TC
# to be enabled: BLE, CPY, D, DOC, DTZ, FBT, G, PL, RSE, SLF, SIM, TC
extend-select = [
"A", "ARG", "ASYNC", "ANN",
"B",
"B", "C90",
"C4", "COM",
"DTZ",
"E", "EM", "EXE",
Expand Down
Loading