22import base64
33import getpass
44import json
5- from typing import Any , List , Optional , Set
5+ from typing import Any , List , Optional , Set , Dict
66
77from keepersdk .enterprise import breachwatch_report
88from keepersdk .proto import breachwatch_pb2 , client_pb2
1616
1717logger = api .get_logger ()
1818
19- STATUS_TO_TEXT : dict [int , str ] = {
19+ STATUS_TO_TEXT : Dict [int , str ] = {
2020 client_pb2 .BWStatus .GOOD : "GOOD" ,
2121 client_pb2 .BWStatus .WEAK : "WEAK" ,
2222 client_pb2 .BWStatus .BREACHED : "BREACHED"
@@ -176,7 +176,7 @@ def execute(self, context: KeeperParams, **kwargs) -> Any:
176176 else :
177177 logger .info ("No breach watch requests to process" )
178178
179- def _get_record_names (self , kwargs : dict ) -> list [str ]:
179+ def _get_record_names (self , kwargs : Dict ) -> List [str ]:
180180 """Extract record names from kwargs."""
181181 records = kwargs .get ('records' )
182182 if not records :
@@ -187,14 +187,14 @@ def _get_record_names(self, kwargs: dict) -> list[str]:
187187
188188 return records
189189
190- def _resolve_record_uids (self , record_names : list [str ], context : KeeperParams ) -> Set [str ]:
190+ def _resolve_record_uids (self , record_names : List [str ], context : KeeperParams ) -> Set [str ]:
191191 """Resolve record names to UIDs using the context."""
192192 record_uids : Set [str ] = set ()
193193 for record_name in record_names :
194194 record_uids .update (record_utils .resolve_records (record_name , context ))
195195 return record_uids
196196
197- def _get_breached_records (self , vault : vault_online .VaultOnline ) -> dict [str , str ]:
197+ def _get_breached_records (self , vault : vault_online .VaultOnline ) -> Dict [str , str ]:
198198 """Get breached records and their passwords."""
199199 record_passwords = {}
200200
@@ -219,7 +219,7 @@ def _extract_record_password(self, vault: vault_online.VaultOnline, record_uid:
219219
220220 return None
221221
222- def _create_breach_watch_requests (self , vault : vault_online .VaultOnline , record_passwords : dict [str , str ], record_uids : Set [str ]) -> list [breachwatch_pb2 .BreachWatchRecordRequest ]:
222+ def _create_breach_watch_requests (self , vault : vault_online .VaultOnline , record_passwords : Dict [str , str ], record_uids : Set [str ]) -> List [breachwatch_pb2 .BreachWatchRecordRequest ]:
223223 """Create breach watch record requests for the given records."""
224224 bw_requests = []
225225
@@ -296,7 +296,7 @@ def _get_existing_breach_watch_euid(self, vault: vault_online.VaultOnline, recor
296296
297297 return None
298298
299- def _process_breach_watch_requests (self , vault : vault_online .VaultOnline , bw_requests : list [breachwatch_pb2 .BreachWatchRecordRequest ]) -> None :
299+ def _process_breach_watch_requests (self , vault : vault_online .VaultOnline , bw_requests : List [breachwatch_pb2 .BreachWatchRecordRequest ]) -> None :
300300 """Process the breach watch requests."""
301301 # Queue audit event
302302 self ._queue_audit_event (vault )
@@ -309,7 +309,7 @@ def _queue_audit_event(self, vault: vault_online.VaultOnline) -> None:
309309 if audit_plugin :
310310 audit_plugin .schedule_audit_event ('bw_record_ignored' )
311311
312- def _send_breach_watch_requests (self , vault : vault_online .VaultOnline , bw_requests : list [breachwatch_pb2 .BreachWatchRecordRequest ]) -> None :
312+ def _send_breach_watch_requests (self , vault : vault_online .VaultOnline , bw_requests : List [breachwatch_pb2 .BreachWatchRecordRequest ]) -> None :
313313 """Send breach watch requests in chunks."""
314314 while bw_requests :
315315 chunk = bw_requests [0 :999 ]
@@ -321,7 +321,7 @@ def _send_breach_watch_requests(self, vault: vault_online.VaultOnline, bw_reques
321321 except Exception as e :
322322 logger .error (f'Error sending breach watch chunk: { e } ' )
323323
324- def _send_breach_watch_chunk (self , vault : vault_online .VaultOnline , chunk : list [breachwatch_pb2 .BreachWatchRecordRequest ]) -> breachwatch_pb2 .BreachWatchUpdateResponse :
324+ def _send_breach_watch_chunk (self , vault : vault_online .VaultOnline , chunk : List [breachwatch_pb2 .BreachWatchRecordRequest ]) -> breachwatch_pb2 .BreachWatchUpdateResponse :
325325 """Send a chunk of breach watch requests."""
326326 rq = breachwatch_pb2 .BreachWatchUpdateRequest ()
327327 rq .breachWatchRecordRequest .extend (chunk )
@@ -368,7 +368,7 @@ def _validate_context(self, context: KeeperParams) -> None:
368368 if not context .auth .auth_context .license .get ('breachWatchEnabled' ):
369369 raise ValueError ("Breach watch is not enabled. Please contact your administrator to enable this feature." )
370370
371- def _get_and_validate_record_uids (self , kwargs : dict ) -> list [str ]:
371+ def _get_and_validate_record_uids (self , kwargs : Dict ) -> List [str ]:
372372 """Extract and validate record UIDs from kwargs."""
373373 record_uids = kwargs .get ('records' )
374374 if not record_uids :
@@ -479,7 +479,7 @@ def _is_vault_ready(self, context: KeeperParams) -> bool:
479479 raise base .CommandError ('Breach watch is not enabled. Please contact your administrator to enable this feature.' )
480480 return True
481481
482- def _get_passwords_to_scan (self , kwargs : dict ) -> list [str ]:
482+ def _get_passwords_to_scan (self , kwargs : Dict ) -> List [str ]:
483483 """Get passwords from command line arguments or prompt user."""
484484 passwords = kwargs .get ('passwords' , [])
485485 if passwords :
@@ -493,7 +493,7 @@ def _get_passwords_to_scan(self, kwargs: dict) -> list[str]:
493493 logger .info ('' )
494494 return []
495495
496- def _scan_passwords (self , breach_watch , passwords : list [str ]) -> list :
496+ def _scan_passwords (self , breach_watch , passwords : List [str ]) -> List :
497497 """Scan passwords and return results with EUIDs for cleanup."""
498498 scan_results = []
499499 for result in breach_watch .scan_passwords (passwords ):
@@ -505,7 +505,7 @@ def _is_valid_scan_result(self, result) -> bool:
505505 """Validate scan result structure."""
506506 return result and len (result ) == 2
507507
508- def _display_results (self , scan_results : list , echo_passwords : bool ) -> None :
508+ def _display_results (self , scan_results : List , echo_passwords : bool ) -> None :
509509 """Display scan results in a formatted way."""
510510 for result in scan_results :
511511 password , scan_result = result
@@ -523,7 +523,7 @@ def _get_status_text(self, scan_result) -> str:
523523 status_code = client_pb2 .BWStatus .BREACHED if is_breached else client_pb2 .BWStatus .GOOD
524524 return STATUS_TO_TEXT .get (status_code , "Unknown" )
525525
526- def _cleanup_scan_data (self , breach_watch , scan_results : list ) -> None :
526+ def _cleanup_scan_data (self , breach_watch , scan_results : List ) -> None :
527527 """Clean up scan data by deleting EUIDs."""
528528 euids = self ._extract_euids (scan_results )
529529 if euids :
@@ -532,7 +532,7 @@ def _cleanup_scan_data(self, breach_watch, scan_results: list) -> None:
532532 except Exception as e :
533533 logger .warning (f"Failed to cleanup scan data: { e } " )
534534
535- def _extract_euids (self , scan_results : list ) -> list :
535+ def _extract_euids (self , scan_results : List ) -> List :
536536 """Extract EUIDs from scan results for cleanup."""
537537 euids = []
538538 for result in scan_results :
0 commit comments