diff --git a/commands/search.py b/commands/search.py index b7dbfd6..33f522c 100644 --- a/commands/search.py +++ b/commands/search.py @@ -221,7 +221,7 @@ def resources( if identifier: print(identifier) else: - print(json.dumps(hit, indent=2)) + print(json.dumps(hit)) count += 1 if limit and count >= limit: @@ -233,3 +233,168 @@ def resources( except Exception as e: logger.error(f"Error fetching resources: {e}") raise click.Abort() + + +@search.command(name="import-candidates") +@click.pass_obj +@click.option( + "--page-size", + type=int, + default=100, + help="Number of results per page (default: 100)", +) +@click.option( + "--limit", + type=int, + help="Maximum number of total results to return (default: unlimited)", +) +@click.option( + "--import-status", + type=str, + help="Filter by import status (e.g., NOT_IMPORTED, IMPORTED)", +) +@click.option( + "--type", + "publication_type", + type=str, + help="Filter by publication type (e.g., AcademicChapter, AcademicArticle)", +) +@click.option( + "--publication-year", + type=str, + help="Filter by publication year, or year range as 'from,to' (e.g., 2025 or 2023,2025)", +) +@click.option( + "--order-by", + type=str, + help="Order results by field (e.g., createdDate, modifiedDate)", +) +@click.option( + "--sort-order", + type=str, + default="desc", + help="Sort order: asc or desc (default: desc)", +) +@click.option( + "--aggregation", + type=str, + default="none", + help="Aggregation parameter (default: none)", +) +@click.option( + "--category", + type=str, + help="Filter by category", +) +@click.option( + "--contributor", + type=str, + help="Filter by contributor ID", +) +@click.option( + "--publisher", + type=str, + help="Filter by publisher ID", +) +@click.option( + "--title", + type=str, + help="Filter by title", +) +@click.option( + "--doi", + type=str, + help="Filter by DOI", +) +@click.option( + "--top-level-organization", + type=str, + help="Filter by top-level organization ID", +) +@click.option( + "--id-only", + is_flag=True, + help="Output only identifiers (one per line)", +) +@click.option( + "--query", + type=str, + multiple=True, + help="Additional query parameters as key=value (can be used multiple times)", +) +def import_candidates( + ctx: AppContext, + page_size: int, + limit: int | None, + import_status: str | None, + publication_type: str | None, + publication_year: str | None, + order_by: str | None, + sort_order: str, + aggregation: str, + category: str | None, + contributor: str | None, + publisher: str | None, + title: str | None, + doi: str | None, + top_level_organization: str | None, + id_only: bool, + query: Tuple[str, ...], +) -> None: + """Search import candidates for the authenticated customer. + + Examples: + # Search for not-imported academic chapters from 2025 + uv run cli.py search import-candidates --import-status NOT_IMPORTED --type AcademicChapter --publication-year 2025 + + # Search with all aggregations, sorted by created date + uv run cli.py search import-candidates --aggregation all --order-by createdDate --sort-order desc + + # Output only identifiers + uv run cli.py search import-candidates --import-status NOT_IMPORTED --id-only + """ + PARAM_MAP = { + "importStatus": import_status, + "type": publication_type, + "publicationYear": publication_year, + "orderBy": order_by, + "sortOrder": sort_order, + "aggregation": aggregation, + "category": category, + "contributor": contributor, + "publisher": publisher, + "title": title, + "doi": doi, + "topLevelOrganization": top_level_organization, + } + query_params = {key: value for key, value in PARAM_MAP.items() if value} + + for q in query: + if "=" in q: + key, value = q.split("=", 1) + query_params[key] = value + else: + logger.warning(f"Ignoring invalid query parameter: {q}") + + search_service = SearchApiService(profile=ctx.profile) + + try: + count = 0 + for hit in search_service.import_candidates_search(query_params, page_size): + if id_only: + identifier = hit.get("identifier") + if identifier: + print(identifier) + else: + print(json.dumps(hit)) + count += 1 + + if limit and count >= limit: + break + + if count > 0: + logger.info(f"Total results: {count}") + + except Exception as e: + logger.error(f"Error fetching import candidates: {e}") + raise click.Abort() diff --git a/commands/services/search_api.py b/commands/services/search_api.py index 9b8227d..cebf607 100644 --- a/commands/services/search_api.py +++ b/commands/services/search_api.py @@ -1,6 +1,8 @@ import boto3 +import json import logging import requests +from datetime import datetime, timedelta from requests.exceptions import JSONDecodeError from tenacity import ( retry, @@ -17,12 +19,45 @@ class SearchApiService: def __init__(self, profile: Optional[str]) -> None: self.session = boto3.Session(profile_name=profile) self.ssm = self.session.client("ssm") + self.secretsmanager = self.session.client("secretsmanager") self.api_domain = self._get_system_parameter("/NVA/ApiDomain") + self._cognito_uri: Optional[str] = None + self._client_credentials: Optional[Dict[str, str]] = None + self._token: Optional[str] = None + self._token_expiry_time: datetime = datetime.now() def _get_system_parameter(self, name: str) -> str: response = self.ssm.get_parameter(Name=name) return response["Parameter"]["Value"] + def _get_secret(self, name: str) -> Dict[str, str]: + response = self.secretsmanager.get_secret_value(SecretId=name) + return json.loads(response["SecretString"]) + + def _get_cognito_token(self) -> str: + url = f"{self._cognito_uri}/oauth2/token" + headers = {"Content-Type": "application/x-www-form-urlencoded"} + data = { + "grant_type": "client_credentials", + "client_id": self._client_credentials["backendClientId"], + "client_secret": self._client_credentials["backendClientSecret"], + } + response = requests.post(url, headers=headers, data=data) + response_json = response.json() + self._token_expiry_time = datetime.now() + timedelta(seconds=response_json["expires_in"]) + return response_json["access_token"] + + def _is_token_expired(self) -> bool: + return datetime.now() > self._token_expiry_time - timedelta(seconds=30) + + def _get_token(self) -> str: + if not self._cognito_uri: + self._cognito_uri = self._get_system_parameter("/NVA/CognitoUri") + self._client_credentials = self._get_secret("BackendCognitoClientCredentials") + if not self._token or self._is_token_expired(): + self._token = self._get_cognito_token() + return self._token + def get_uri(self, type: str) -> str: return f"https://{self.api_domain}/search/{type}" @@ -120,3 +155,59 @@ def resource_search( if offset >= total_hits: break + + def import_candidates_search( + self, + query_parameters: Dict[str, Any], + page_size: int = 100, + ) -> Generator[Dict[str, Any], None, None]: + url = self.get_uri("customer/import-candidates") + offset = 0 + + while True: + params = { + **query_parameters, + "from": offset, + "size": page_size, + } + headers = { + "Accept": "application/json", + "Authorization": f"Bearer {self._get_token()}", + } + + try: + response = self._make_search_request(url, headers, params) + except requests.exceptions.HTTPError as e: + logger.error( + f"Failed to search after retries. Status: {e.response.status_code}", + ) + if e.response.status_code >= 400: + try: + logger.error(f"Error detail: {e.response.json()}") + except (ValueError, JSONDecodeError): + logger.error(f"Error detail: {e.response.text}") + break + except requests.exceptions.RequestException as e: + logger.error(f"Network error after retries: {e}") + break + + if response.status_code != 200: + logger.error( + f"Failed to search. {response.status_code}: {response.json()}" + ) + break + + response_data = response.json() + hits = response_data.get("hits", []) + + if not hits: + break + + for hit in hits: + yield hit + + total_hits = response_data.get("totalHits", 0) + offset += len(hits) + + if offset >= total_hits: + break