diff --git a/app/services/implementations/web_search_service.py b/app/services/implementations/web_search_service.py index c2e6457..ffe1a79 100644 --- a/app/services/implementations/web_search_service.py +++ b/app/services/implementations/web_search_service.py @@ -1,20 +1,36 @@ -from typing import List, Optional -import aiohttp -from datetime import UTC, datetime import logging +from datetime import UTC, datetime +from typing import List, Optional from uuid import UUID, uuid4 -from app.core.config import settings + +import aiohttp from sqlalchemy.exc import IntegrityError +from app.core.config import settings from app.core.exceptions import ValidationError +from app.core.utils.url import normalize_domain_name from app.models.database.models import SourceModel -from app.services.interfaces.web_search_service import WebSearchServiceInterface from app.repositories.implementations.source_repository import SourceRepository from app.services.domain_service import DomainService -from app.core.utils.url import normalize_domain_name +from app.services.interfaces.web_search_service import WebSearchServiceInterface logger = logging.getLogger(__name__) +# hard filter +BLOCKED_SOURCE_DOMAINS = { + "reddit.com", + "instagram.com", + "youtube.com", + "youtu.be", + "bsky.app", + "bsky.social", + "tiktok.com", + "facebook.com", + "x.com", + "twitter.com", + "threads.net", +} + class GoogleWebSearchService(WebSearchServiceInterface): def __init__(self, domain_service: DomainService, source_repository: SourceRepository): @@ -24,16 +40,41 @@ def __init__(self, domain_service: DomainService, source_repository: SourceRepos self.domain_service = domain_service self.source_repository = source_repository + def _is_blocked_domain(self, domain_name: str) -> bool: + normalized_domain = normalize_domain_name(domain_name) + return any( + normalized_domain == blocked_domain or normalized_domain.endswith(f".{blocked_domain}") + for blocked_domain in BLOCKED_SOURCE_DOMAINS + ) + + def _has_acceptable_credibility(self, credibility_score: Optional[float]) -> bool: + # This removes both unknown credibility and explicit 0 credibility. + return credibility_score is not None and credibility_score > 0 + + def _is_allowed_source(self, source: SourceModel) -> bool: + domain_name = ( + source.domain.domain_name + if hasattr(source, "domain") and source.domain + else normalize_domain_name(source.url) + ) + return not self._is_blocked_domain(domain_name) and self._has_acceptable_credibility(source.credibility_score) + + def _filter_allowed_sources(self, sources: List[SourceModel]) -> List[SourceModel]: + return [source for source in sources if self._is_allowed_source(source)] + async def search_and_create_sources( self, claim_text: str, search_id: UUID, num_results: int = 5, language: str = "english" ) -> List[SourceModel]: """Search for sources and create or update records.""" try: + logger.warning("NEW SOURCE FILTER CODE IS RUNNING") params = { "key": self.api_key, "cx": self.search_engine_id, "q": claim_text, - "num": min(num_results, 10), + # "num": min(num_results, 10), + # Fetch max results because source policy filtering happens after search, not in the query. + "num": 10, "fields": "items(title,link,snippet)", } if language == "english": @@ -41,7 +82,8 @@ async def search_and_create_sources( "key": self.api_key, "cx": self.search_engine_id, "q": claim_text, - "num": min(num_results, 10), + # "num": min(num_results, 10), + "num": 10, "fields": "items(title,link,snippet)", "lr": "lang_en", } @@ -50,7 +92,8 @@ async def search_and_create_sources( "key": self.api_key, "cx": self.search_engine_id, "q": claim_text, - "num": min(num_results, 10), + # "num": min(num_results, 10), + "num": 10, "fields": "items(title,link,snippet)", "lr": "lang_fr", } @@ -70,16 +113,38 @@ async def search_and_create_sources( for item in data["items"]: try: + # domain_name = normalize_domain_name(item["link"]) + # domain, is_new = await self.domain_service.get_or_create_domain(domain_name) + + # if is_new: + # logger.info(f"Created new domain record for: {domain_name}") + + # source = await self._create_new_source(item, search_id, domain.id, domain.credibility_score) + # if source: + # sources.append(source) + # logger.debug(f"Created new source for URL: {item['link']}") domain_name = normalize_domain_name(item["link"]) + if self._is_blocked_domain(domain_name): + logger.info(f"Skipping blocked source domain: {domain_name}") + continue domain, is_new = await self.domain_service.get_or_create_domain(domain_name) - if is_new: logger.info(f"Created new domain record for: {domain_name}") + if not self._has_acceptable_credibility(domain.credibility_score): + logger.info( + f"Skipping source with low/unknown credibility: " + f"{domain_name} ({domain.credibility_score})" + ) + continue - source = await self._create_new_source(item, search_id, domain.id, domain.credibility_score) + source = await self._create_new_source( + item, + search_id, + domain.id, + domain.credibility_score, + ) if source: sources.append(source) - logger.debug(f"Created new source for URL: {item['link']}") except Exception as e: logger.error(f"Error processing search result: {str(e)}", exc_info=True) @@ -128,6 +193,7 @@ async def _create_new_source( def format_sources_for_prompt(self, sources: List[SourceModel], language: str = "english") -> str: """Format sources into a string for the LLM prompt.""" + sources = self._filter_allowed_sources(sources) if language == "english": if not sources: return "No reliable sources found." @@ -178,10 +244,12 @@ def format_sources_for_prompt(self, sources: List[SourceModel], language: str = def calculate_overall_credibility(self, sources: List[SourceModel]) -> float: """Calculate overall credibility score for a set of sources.""" + sources = self._filter_allowed_sources(sources) if not sources: return 0.0 # Filter out sources with null credibility scores + # valid_scores = [source.credibility_score for source in sources if source.credibility_score is not None] valid_scores = [source.credibility_score for source in sources if source.credibility_score is not None] if not valid_scores: