From 5f5b70cc61d1f5112832b3408a4d7de867998dbb Mon Sep 17 00:00:00 2001 From: adeshmukh-ks Date: Tue, 17 Mar 2026 13:49:13 +0530 Subject: [PATCH 1/2] one-time-share SDK support and examples --- .../sdk_examples/one_time_share/create_ots.py | 618 ++++++++++++++++++ .../one_time_share/list_shares.py | 77 ++- .../sdk_examples/one_time_share/remove_ots.py | 598 +++++++++++++++++ .../src/keepercli/commands/record_edit.py | 4 +- .../src/keepercli/commands/shares.py | 201 ++---- .../src/keepercli/helpers/record_utils.py | 47 +- .../src/keepersdk/vault/one_time_share.py | 277 ++++++++ 7 files changed, 1587 insertions(+), 235 deletions(-) create mode 100644 examples/sdk_examples/one_time_share/create_ots.py create mode 100644 examples/sdk_examples/one_time_share/remove_ots.py create mode 100644 keepersdk-package/src/keepersdk/vault/one_time_share.py diff --git a/examples/sdk_examples/one_time_share/create_ots.py b/examples/sdk_examples/one_time_share/create_ots.py new file mode 100644 index 0000000..b655fbc --- /dev/null +++ b/examples/sdk_examples/one_time_share/create_ots.py @@ -0,0 +1,618 @@ +from datetime import timedelta +import getpass +import sqlite3 +import json +import logging + +from typing import Dict, Optional + +import fido2 +import webbrowser + +from keepersdk import errors, utils +from keepersdk.authentication import ( + configuration, + endpoint, + keeper_auth, + login_auth, +) +from keepersdk.authentication.yubikey import ( + IKeeperUserInteraction, + yubikey_authenticate, +) +from keepersdk.constants import KEEPER_PUBLIC_HOSTS +from keepersdk.vault import one_time_share, sqlite_storage, vault_online +from keepersdk import utils + +try: + import pyperclip +except ImportError: + pyperclip = None + +logger = utils.get_logger() +logger.setLevel(logging.INFO) +if not logger.handlers: + _handler = logging.StreamHandler() + _handler.setLevel(logging.INFO) + _handler.setFormatter( + logging.Formatter("%(asctime)s - %(levelname)s - %(name)s - %(message)s") + ) + logger.addHandler(_handler) + + +class FidoCliInteraction(fido2.client.UserInteraction, IKeeperUserInteraction): + def output_text(self, text: str) -> None: + print(text) + + def prompt_up(self) -> None: + print( + "\nTouch the flashing Security key to authenticate or " + "press Ctrl-C to resume with the primary two factor authentication..." + ) + + def request_pin(self, permissions, rd_id): + return getpass.getpass("Enter Security Key PIN: ") + + def request_uv(self, permissions, rd_id): + print("User Verification required.") + return True + + +# Two-factor duration codes (used by LoginFlow) +_TWO_FACTOR_DURATION_CODES: Dict[login_auth.TwoFactorDuration, str] = { + login_auth.TwoFactorDuration.EveryLogin: "login", + login_auth.TwoFactorDuration.Every12Hours: "12_hours", + login_auth.TwoFactorDuration.EveryDay: "24_hours", + login_auth.TwoFactorDuration.Every30Days: "30_days", + login_auth.TwoFactorDuration.Forever: "forever", +} + + +class LoginFlow: + """ + Handles the full login process: server selection, username, password, + device approval, 2FA, SSO data key, and SSO token. + """ + + def __init__(self) -> None: + self._config = configuration.JsonConfigurationStorage() + self._logged_in_with_persistent = True + self._endpoint: Optional[endpoint.KeeperEndpoint] = None + + @property + def endpoint(self) -> Optional[endpoint.KeeperEndpoint]: + return self._endpoint + + @property + def logged_in_with_persistent(self) -> bool: + """True if login succeeded by resuming an existing persistent session (no step loop).""" + return self._logged_in_with_persistent + + def run(self) -> Optional[keeper_auth.KeeperAuth]: + """ + Run the login flow. + + Returns: + Authenticated Keeper context, or None if login fails. + """ + server = self._ensure_server() + keeper_endpoint = endpoint.KeeperEndpoint(self._config, server) + self._endpoint = keeper_endpoint + login_auth_context = login_auth.LoginAuth(keeper_endpoint) + + username = self._config.get().last_login or input("Enter username: ") + login_auth_context.resume_session = True + login_auth_context.login(username) + + while not login_auth_context.login_step.is_final(): + step = login_auth_context.login_step + if isinstance(step, login_auth.LoginStepDeviceApproval): + self._handle_device_approval(step) + elif isinstance(step, login_auth.LoginStepTwoFactor): + self._handle_two_factor(step) + elif isinstance(step, login_auth.LoginStepPassword): + self._handle_password(step) + elif isinstance(step, login_auth.LoginStepSsoToken): + self._handle_sso_token(step) + elif isinstance(step, login_auth.LoginStepSsoDataKey): + self._handle_sso_data_key(step) + elif isinstance(step, login_auth.LoginStepError): + print(f"Login error: ({step.code}) {step.message}") + return None + else: + raise NotImplementedError( + f"Unsupported login step type: {type(step).__name__}" + ) + self._logged_in_with_persistent = False + + if self._logged_in_with_persistent: + print("Successfully logged in with persistent login") + + if isinstance(login_auth_context.login_step, login_auth.LoginStepConnected): + return login_auth_context.login_step.take_keeper_auth() + + return None + + def _ensure_server(self) -> str: + if not self._config.get().last_server: + print("Available server options:") + for region, host in KEEPER_PUBLIC_HOSTS.items(): + print(f" {region}: {host}") + server = ( + input("Enter server (default: keepersecurity.com): ").strip() + or "keepersecurity.com" + ) + self._config.get().last_server = server + else: + server = self._config.get().last_server + return server + + def _handle_device_approval( + self, step: login_auth.LoginStepDeviceApproval + ) -> None: + """Device approval: same options as keepercli verify_device (email, keeper push, 2FA, resume).""" + menu = [ + ("email_send", "to send email"), + ("email_code=", "to validate verification code sent via email"), + ("keeper_push", "to send Keeper Push notification"), + ("2fa_send", "to send 2FA code"), + ("2fa_code=", "to validate a code provided by 2FA application"), + ("", "to resume"), + ] + lines = ["Approve by selecting a method below"] + lines.extend(f" {cmd} {desc}" for cmd, desc in menu) + print("\n".join(lines)) + + selection = input("Type your selection or to resume: ").strip() + if selection is None: + return + if selection in ("email_send", "es"): + step.send_push(channel=login_auth.DeviceApprovalChannel.Email) + print("An email with instructions has been sent. Press when approved.") + elif selection.startswith("email_code="): + code = selection[len("email_code=") :] + step.send_code(channel=login_auth.DeviceApprovalChannel.Email, code=code) + print("Successfully verified email code.") + elif selection in ("keeper_push", "kp"): + step.send_push(channel=login_auth.DeviceApprovalChannel.KeeperPush) + print( + "Successfully made a push notification to the approved device. " + "Press when approved." + ) + elif selection in ("2fa_send", "2fs"): + step.send_push(channel=login_auth.DeviceApprovalChannel.TwoFactor) + print("2FA code was sent.") + elif selection.startswith("2fa_code="): + code = selection[len("2fa_code=") :] + step.send_code(channel=login_auth.DeviceApprovalChannel.TwoFactor, code=code) + print("Successfully verified 2FA code.") + else: + step.resume() + + def _handle_password(self, step: login_auth.LoginStepPassword) -> None: + """Password step: prompt for password and retry on auth_failed (aligned with keepercli handle_verify_password).""" + print(f"\nEnter password for {step.username}") + while True: + password = getpass.getpass("Password: ") + if not password: + raise KeyboardInterrupt() + try: + step.verify_password(password) + break + except errors.KeeperApiError as kae: + print( + "Invalid email or password combination, please re-enter." + if kae.result_code == "auth_failed" + else kae.message + ) + + def _handle_two_factor(self, step: login_auth.LoginStepTwoFactor) -> None: + channels = [ + x + for x in step.get_channels() + if x.channel_type != login_auth.TwoFactorChannel.Other + ] + menu = [] + for i, channel in enumerate(channels): + desc = self._two_factor_channel_desc(channel.channel_type) + menu.append( + ( + str(i + 1), + f"{desc} {channel.channel_name} {channel.phone}", + ) + ) + menu.append(("q", "Quit authentication attempt and return to Commander prompt.")) + + lines = ["", "This account requires 2FA Authentication"] + lines.extend(f" {a}. {t}" for a, t in menu) + print("\n".join(lines)) + + while True: + selection = input("Selection: ") + if selection is None: + return + if selection in ("q", "Q"): + raise KeyboardInterrupt() + try: + assert selection.isnumeric() + idx = 1 if not selection else int(selection) + assert 1 <= idx <= len(channels) + channel = channels[idx - 1] + desc = self._two_factor_channel_desc(channel.channel_type) + print(f"Selected {idx}. {desc}") + except AssertionError: + print( + "Invalid entry, additional factors of authentication shown " + "may be configured if not currently enabled." + ) + continue + + if channel.channel_type in ( + login_auth.TwoFactorChannel.TextMessage, + login_auth.TwoFactorChannel.KeeperDNA, + login_auth.TwoFactorChannel.DuoSecurity, + ): + action = next( + ( + x + for x in step.get_channel_push_actions(channel.channel_uid) + if x + in ( + login_auth.TwoFactorPushAction.TextMessage, + login_auth.TwoFactorPushAction.KeeperDna, + ) + ), + None, + ) + if action: + step.send_push(channel.channel_uid, action) + + if channel.channel_type == login_auth.TwoFactorChannel.SecurityKey: + try: + challenge = json.loads(channel.challenge) + signature = yubikey_authenticate(challenge, FidoCliInteraction()) + if signature: + print("Verified Security Key.") + step.send_code(channel.channel_uid, signature) + return + except Exception as e: + logger.error(e) + continue + + # 2FA code path + step.duration = min(step.duration, channel.max_expiration) + available_dura = sorted( + x for x in _TWO_FACTOR_DURATION_CODES if x <= channel.max_expiration + ) + available_codes = [ + _TWO_FACTOR_DURATION_CODES.get(x) or "login" for x in available_dura + ] + + while True: + mfa_desc = self._two_factor_duration_desc(step.duration) + prompt_exp = ( + f"\n2FA Code Duration: {mfa_desc}.\n" + f"To change duration: 2fa_duration={'|'.join(available_codes)}" + ) + print(prompt_exp) + + selection = input("\nEnter 2FA Code or Duration: ") + if not selection: + return + if selection in available_codes: + step.duration = self._two_factor_code_to_duration(selection) + elif selection.startswith("2fa_duration="): + code = selection[len("2fa_duration=") :] + if code in available_codes: + step.duration = self._two_factor_code_to_duration(code) + else: + print(f"Invalid 2FA duration: {code}") + else: + try: + step.send_code(channel.channel_uid, selection) + print("Successfully verified 2FA Code.") + return + except errors.KeeperApiError as kae: + print(f"Invalid 2FA code: ({kae.result_code}) {kae.message}") + + def _handle_sso_data_key( + self, step: login_auth.LoginStepSsoDataKey + ) -> None: + menu = [ + ("1", "Keeper Push. Send a push notification to your device."), + ("2", "Admin Approval. Request your admin to approve this device."), + ("r", "Resume SSO authentication after device is approved."), + ("q", "Quit SSO authentication attempt and return to Commander prompt."), + ] + lines = ["Approve this device by selecting a method below:"] + lines.extend(f" {cmd:>3}. {text}" for cmd, text in menu) + print("\n".join(lines)) + + while True: + answer = input("Selection: ") + if answer is None: + return + if answer == "q": + raise KeyboardInterrupt() + if answer == "r": + step.resume() + break + if answer in ("1", "2"): + step.request_data_key( + login_auth.DataKeyShareChannel.KeeperPush + if answer == "1" + else login_auth.DataKeyShareChannel.AdminApproval + ) + else: + print(f'Action "{answer}" is not supported.') + + def _handle_sso_token(self, step: login_auth.LoginStepSsoToken) -> None: + menu = [ + ("a", "SSO User with a Master Password."), + ] + if pyperclip: + menu.append(("c", "Copy SSO Login URL to clipboard.")) + else: + menu.append(("u", "Show SSO Login URL.")) + try: + wb = webbrowser.get() + menu.append(("o", "Navigate to SSO Login URL with the default web browser.")) + except Exception: + wb = None + if pyperclip: + menu.append(("p", "Paste SSO Token from clipboard.")) + menu.append(("t", "Enter SSO Token manually.")) + menu.append(("q", "Quit SSO authentication attempt and return to Commander prompt.")) + + lines = [ + "", + "SSO Login URL:", + step.sso_login_url, + "Navigate to SSO Login URL with your browser and complete authentication.", + "Copy a returned SSO Token into clipboard." + + (" Paste that token into Commander." if pyperclip else " Then use option 't' to enter the token manually."), + 'NOTE: To copy SSO Token please click "Copy authentication token" ' + 'button on "SSO Connect" page.', + "", + ] + lines.extend(f" {a:>3}. {t}" for a, t in menu) + print("\n".join(lines)) + + while True: + token = input("Selection: ") + if token == "q": + raise KeyboardInterrupt() + if token == "a": + step.login_with_password() + return + if token == "c": + token = None + if pyperclip: + try: + pyperclip.copy(step.sso_login_url) + print("SSO Login URL is copied to clipboard.") + except Exception: + print("Failed to copy SSO Login URL to clipboard.") + else: + print("Clipboard not available (install pyperclip).") + elif token == "u": + token = None + if not pyperclip: + print("\nSSO Login URL:", step.sso_login_url, "\n") + else: + print("Unsupported menu option (use 'c' to copy URL).") + elif token == "o": + token = None + if wb: + try: + wb.open_new_tab(step.sso_login_url) + except Exception: + print("Failed to open web browser.") + elif token == "p": + if pyperclip: + try: + token = pyperclip.paste() + except Exception: + token = "" + print("Failed to paste from clipboard") + else: + token = None + print("Clipboard not available (use 't' to enter token manually).") + elif token == "t": + token = getpass.getpass("Enter SSO Token: ").strip() + else: + if len(token) < 10: + print(f"Unsupported menu option: {token}") + continue + + if token: + try: + step.set_sso_token(token) + break + except errors.KeeperApiError as kae: + print(f"SSO Login error: ({kae.result_code}) {kae.message}") + + @staticmethod + def _two_factor_channel_desc( + channel_type: login_auth.TwoFactorChannel, + ) -> str: + return { + login_auth.TwoFactorChannel.Authenticator: "TOTP (Google and Microsoft Authenticator)", + login_auth.TwoFactorChannel.TextMessage: "Send SMS Code", + login_auth.TwoFactorChannel.DuoSecurity: "DUO", + login_auth.TwoFactorChannel.RSASecurID: "RSA SecurID", + login_auth.TwoFactorChannel.SecurityKey: "WebAuthN (FIDO2 Security Key)", + login_auth.TwoFactorChannel.KeeperDNA: "Keeper DNA (Watch)", + login_auth.TwoFactorChannel.Backup: "Backup Code", + }.get(channel_type, "Not Supported") + + @staticmethod + def _two_factor_duration_desc( + duration: login_auth.TwoFactorDuration, + ) -> str: + return { + login_auth.TwoFactorDuration.EveryLogin: "Require Every Login", + login_auth.TwoFactorDuration.Forever: "Save on this Device Forever", + login_auth.TwoFactorDuration.Every12Hours: "Ask Every 12 hours", + login_auth.TwoFactorDuration.EveryDay: "Ask Every 24 hours", + login_auth.TwoFactorDuration.Every30Days: "Ask Every 30 days", + }.get(duration, "Require Every Login") + + @staticmethod + def _two_factor_code_to_duration( + text: str, + ) -> login_auth.TwoFactorDuration: + for dura, code in _TWO_FACTOR_DURATION_CODES.items(): + if code == text: + return dura + return login_auth.TwoFactorDuration.EveryLogin + + +def enable_persistent_login(keeper_auth_context: keeper_auth.KeeperAuth) -> None: + """ + Enable persistent login and register data key for device. + Sets persistent_login to on and logout_timer to 30 days. + """ + keeper_auth.set_user_setting(keeper_auth_context, 'persistent_login', '1') + keeper_auth.register_data_key_for_device(keeper_auth_context) + mins_per_day = 60 * 24 + timeout_in_minutes = mins_per_day * 30 # 30 days + keeper_auth.set_user_setting(keeper_auth_context, 'logout_timer', str(timeout_in_minutes)) + print("Persistent login turned on successfully and device registered") + + +def login(): + """ + Handle the login process including server selection, authentication, + and multi-factor authentication steps (device approval, password, 2FA + with channel selection and Security Key, SSO data key, SSO token). + + Returns: + tuple: (keeper_auth_context, keeper_endpoint) on success, or (None, None) if login fails. + """ + flow = LoginFlow() + keeper_auth_context = flow.run() + if keeper_auth_context and not flow.logged_in_with_persistent: + enable_persistent_login(keeper_auth_context) + keeper_endpoint = flow.endpoint if keeper_auth_context else None + return keeper_auth_context, keeper_endpoint + + +def _resolve_record_uid( + vault: vault_online.VaultOnline, + record_title_or_uid: str, +) -> str: + """ + Resolve record title or UID to a record UID. + + If record_title_or_uid matches a record UID (record exists), return it. + Otherwise find the first record whose title equals or contains the given string. + """ + if not record_title_or_uid or not record_title_or_uid.strip(): + raise ValueError("Record title or UID must be non-empty.") + + candidate = record_title_or_uid.strip() + + # Try as record UID first + if vault.vault_data.get_record(candidate) is not None: + return candidate + + # Search by title (first match, case-insensitive) + candidate_lower = candidate.lower() + for record_info in vault.vault_data.records(): + if record_info.version not in (2, 3): + continue + if ( + record_info.title.lower() == candidate_lower + or candidate_lower in record_info.title.lower() + ): + return record_info.record_uid + + raise ValueError( + f"No record found matching {record_title_or_uid!r}. " + "Use an existing record title or record UID." + ) + + +def create_one_time_share_example( + vault: vault_online.VaultOnline, + record_title_or_uid: str, + ots_name: str, + expiration_days: int = 7, + is_editable: bool = False, + is_self_destruct: bool = False, +) -> Optional[str]: + """ + Create a one-time share for the given record using the SDK. + + Args: + vault: Initialized VaultOnline instance. + record_title_or_uid: Record title or record UID to share. + ots_name: Label for the one-time share link. + expiration_days: Number of days the share link is valid (max 182). + is_editable: If True, the recipient can edit the shared record. + is_self_destruct: If True, the share is invalidated after first open. + + Returns: + The one-time share URL, or None on error. + """ + record_uid = _resolve_record_uid(vault, record_title_or_uid) + expiration_period = timedelta(days=expiration_days) + + try: + url = one_time_share.create_one_time_share( + vault=vault, + record_uid=record_uid, + expiration_period=expiration_period, + name=ots_name or None, + is_editable=is_editable, + is_self_destruct=is_self_destruct, + ) + print(f"One-time share created for record {record_title_or_uid!r}.") + print(f"Share name: {ots_name or '(unnamed)'}") + print(f"Expires in: {expiration_days} day(s)") + print(f"URL: {url}") + return url + except ValueError as e: + print(f"Error creating one-time share: {e}") + return None + + +def create_one_time_share_run(keeper_auth_context: keeper_auth.KeeperAuth) -> None: + """Build vault from auth context, create one-time share with configured variables, then close.""" + # Record and one-time share parameters + RECORD_TITLE_OR_UID = "My Login" # Record title or record UID to create one-time share for + OTS_NAME = "Share for contractor" # Label for the one-time share link + EXPIRATION_DAYS = 7 # Link valid for 7 days (max 182) + + conn = sqlite3.Connection("file::memory:", uri=True) + vault_storage = sqlite_storage.SqliteVaultStorage( + lambda: conn, + vault_owner=bytes(keeper_auth_context.auth_context.username, "utf-8"), + ) + vault = vault_online.VaultOnline(keeper_auth_context, vault_storage) + vault.sync_down() + try: + create_one_time_share_example( + vault, + RECORD_TITLE_OR_UID, + OTS_NAME, + expiration_days=EXPIRATION_DAYS, + ) + except Exception as e: + print(f"Error: {e}") + finally: + vault.close() + keeper_auth_context.close() + + +def main() -> None: + keeper_auth_context, _ = login() + if keeper_auth_context: + create_one_time_share_run(keeper_auth_context) + else: + print("Login failed.") + + +if __name__ == "__main__": + main() diff --git a/examples/sdk_examples/one_time_share/list_shares.py b/examples/sdk_examples/one_time_share/list_shares.py index 7beb69d..3652476 100644 --- a/examples/sdk_examples/one_time_share/list_shares.py +++ b/examples/sdk_examples/one_time_share/list_shares.py @@ -2,7 +2,7 @@ import sqlite3 import json import logging -from datetime import datetime + from typing import Dict, Optional import fido2 @@ -20,7 +20,7 @@ yubikey_authenticate, ) from keepersdk.constants import KEEPER_PUBLIC_HOSTS -from keepersdk.vault import sqlite_storage, vault_online, ksm_management +from keepersdk.vault import one_time_share, sqlite_storage, vault_online from keepersdk import utils try: @@ -453,51 +453,60 @@ def login(): return keeper_auth_context, keeper_endpoint -def list_one_time_shares(keeper_auth_context: keeper_auth.KeeperAuth): - conn = sqlite3.Connection('file::memory:', uri=True) - vault_storage = sqlite_storage.SqliteVaultStorage(lambda: conn, vault_owner=bytes(keeper_auth_context.auth_context.username, 'utf-8')) +def list_one_time_shares(keeper_auth_context: keeper_auth.KeeperAuth) -> None: + conn = sqlite3.Connection("file::memory:", uri=True) + vault_storage = sqlite_storage.SqliteVaultStorage( + lambda: conn, + vault_owner=bytes(keeper_auth_context.auth_context.username, "utf-8"), + ) vault = vault_online.VaultOnline(keeper_auth_context, vault_storage) vault.sync_down() try: - record_search = input('Enter record name/UID to check for shares (or leave empty for all records): ').strip() + record_search = input( + "Enter record name/UID to check for shares (or leave empty for all records): " + ).strip() record_uids = [] for record_info in vault.vault_data.records(): if record_info.version not in (2, 3): continue - if not record_search or record_search.lower() in record_info.title.lower() or record_search == record_info.record_uid: + if ( + not record_search + or record_search.lower() in record_info.title.lower() + or record_search == record_info.record_uid + ): record_uids.append(record_info.record_uid) if not record_uids: print("\nNo records found to check for one-time shares") else: - app_infos = ksm_management.get_app_info(vault=vault, app_uid=record_uids[:100]) - shares_found = [] - now = utils.current_milli_time() - for app_info in app_infos: - if not app_info.isExternalShare: - continue - record_uid = utils.base64_url_encode(app_info.appRecordUid) - record_info = vault.vault_data.get_record(record_uid) - record_title = record_info.title if record_info else 'Unknown' - for client in app_info.clients: - shares_found.append({ - 'record_title': record_title, - 'share_name': client.id if client.id else 'Unnamed', - 'created': datetime.fromtimestamp(client.createdOn / 1000) if client.createdOn else None, - 'expires': datetime.fromtimestamp(client.accessExpireOn / 1000) if client.accessExpireOn else None, - 'expired': now > client.accessExpireOn if client.accessExpireOn else False, - 'opened': datetime.fromtimestamp(client.firstAccess / 1000) if client.firstAccess else None - }) - if not shares_found: + shares: list[one_time_share.OneTimeShare] = one_time_share.list_one_time_shares( + vault=vault, + record_uid=record_uids[:1000], + include_expired=True, + ) + if not shares: print("\nNo one-time shares found") else: - print(f"\nOne-Time Shares ({len(shares_found)})\n{'=' * 130}") - print(f"{'Record Title':<30} {'Share Name':<20} {'Created':<20} {'Expires':<20} {'Status':<15}\n{'-' * 130}") - for share in shares_found: - status = 'Expired' if share['expired'] else ('Opened' if share['opened'] else 'Active') - created = share['created'].strftime('%Y-%m-%d %H:%M') if share['created'] else 'N/A' - expires = share['expires'].strftime('%Y-%m-%d %H:%M') if share['expires'] else 'N/A' - print(f"{share['record_title'][:29]:<30} {share['share_name'][:19]:<20} {created:<20} {expires:<20} {status:<15}") - print(f"{'-' * 130}\nTotal: {len(shares_found)}") + print(f"\nOne-Time Shares ({len(shares)})\n{'=' * 130}") + print( + f"{'Record Title':<30} {'Share Name':<20} {'Created':<20} {'Expires':<20} {'Status':<15}\n{'-' * 130}" + ) + for share in shares: + record_info = vault.vault_data.get_record(share.record_uid) + record_title = record_info.title if record_info else "Unknown" + created = ( + share.generated.strftime("%Y-%m-%d %H:%M") + if share.generated + else "N/A" + ) + expires = ( + share.expires.strftime("%Y-%m-%d %H:%M") + if share.expires + else "N/A" + ) + print( + f"{record_title[:29]:<30} {share.share_link_name[:19]:<20} {created:<20} {expires:<20} {share.status:<15}" + ) + print(f"{'-' * 130}\nTotal: {len(shares)}") print("=" * 130) except Exception as e: print(f"Error retrieving one-time shares: {e}") diff --git a/examples/sdk_examples/one_time_share/remove_ots.py b/examples/sdk_examples/one_time_share/remove_ots.py new file mode 100644 index 0000000..21ee011 --- /dev/null +++ b/examples/sdk_examples/one_time_share/remove_ots.py @@ -0,0 +1,598 @@ +import getpass +import sqlite3 +import json +import logging +from typing import Dict, Optional + +import fido2 +import webbrowser + +from keepersdk import errors, utils +from keepersdk.authentication import ( + configuration, + endpoint, + keeper_auth, + login_auth, +) +from keepersdk.authentication.yubikey import ( + IKeeperUserInteraction, + yubikey_authenticate, +) +from keepersdk.constants import KEEPER_PUBLIC_HOSTS +from keepersdk.vault import one_time_share, sqlite_storage, vault_online +from keepersdk import utils + +try: + import pyperclip +except ImportError: + pyperclip = None + +logger = utils.get_logger() +logger.setLevel(logging.INFO) +if not logger.handlers: + _handler = logging.StreamHandler() + _handler.setLevel(logging.INFO) + _handler.setFormatter( + logging.Formatter("%(asctime)s - %(levelname)s - %(name)s - %(message)s") + ) + logger.addHandler(_handler) + + +class FidoCliInteraction(fido2.client.UserInteraction, IKeeperUserInteraction): + def output_text(self, text: str) -> None: + print(text) + + def prompt_up(self) -> None: + print( + "\nTouch the flashing Security key to authenticate or " + "press Ctrl-C to resume with the primary two factor authentication..." + ) + + def request_pin(self, permissions, rd_id): + return getpass.getpass("Enter Security Key PIN: ") + + def request_uv(self, permissions, rd_id): + print("User Verification required.") + return True + + +# Two-factor duration codes (used by LoginFlow) +_TWO_FACTOR_DURATION_CODES: Dict[login_auth.TwoFactorDuration, str] = { + login_auth.TwoFactorDuration.EveryLogin: "login", + login_auth.TwoFactorDuration.Every12Hours: "12_hours", + login_auth.TwoFactorDuration.EveryDay: "24_hours", + login_auth.TwoFactorDuration.Every30Days: "30_days", + login_auth.TwoFactorDuration.Forever: "forever", +} + + +class LoginFlow: + """ + Handles the full login process: server selection, username, password, + device approval, 2FA, SSO data key, and SSO token. + """ + + def __init__(self) -> None: + self._config = configuration.JsonConfigurationStorage() + self._logged_in_with_persistent = True + self._endpoint: Optional[endpoint.KeeperEndpoint] = None + + @property + def endpoint(self) -> Optional[endpoint.KeeperEndpoint]: + return self._endpoint + + @property + def logged_in_with_persistent(self) -> bool: + """True if login succeeded by resuming an existing persistent session (no step loop).""" + return self._logged_in_with_persistent + + def run(self) -> Optional[keeper_auth.KeeperAuth]: + """ + Run the login flow. + + Returns: + Authenticated Keeper context, or None if login fails. + """ + server = self._ensure_server() + keeper_endpoint = endpoint.KeeperEndpoint(self._config, server) + self._endpoint = keeper_endpoint + login_auth_context = login_auth.LoginAuth(keeper_endpoint) + + username = self._config.get().last_login or input("Enter username: ") + login_auth_context.resume_session = True + login_auth_context.login(username) + + while not login_auth_context.login_step.is_final(): + step = login_auth_context.login_step + if isinstance(step, login_auth.LoginStepDeviceApproval): + self._handle_device_approval(step) + elif isinstance(step, login_auth.LoginStepTwoFactor): + self._handle_two_factor(step) + elif isinstance(step, login_auth.LoginStepPassword): + self._handle_password(step) + elif isinstance(step, login_auth.LoginStepSsoToken): + self._handle_sso_token(step) + elif isinstance(step, login_auth.LoginStepSsoDataKey): + self._handle_sso_data_key(step) + elif isinstance(step, login_auth.LoginStepError): + print(f"Login error: ({step.code}) {step.message}") + return None + else: + raise NotImplementedError( + f"Unsupported login step type: {type(step).__name__}" + ) + self._logged_in_with_persistent = False + + if self._logged_in_with_persistent: + print("Successfully logged in with persistent login") + + if isinstance(login_auth_context.login_step, login_auth.LoginStepConnected): + return login_auth_context.login_step.take_keeper_auth() + + return None + + def _ensure_server(self) -> str: + if not self._config.get().last_server: + print("Available server options:") + for region, host in KEEPER_PUBLIC_HOSTS.items(): + print(f" {region}: {host}") + server = ( + input("Enter server (default: keepersecurity.com): ").strip() + or "keepersecurity.com" + ) + self._config.get().last_server = server + else: + server = self._config.get().last_server + return server + + def _handle_device_approval( + self, step: login_auth.LoginStepDeviceApproval + ) -> None: + """Device approval: same options as keepercli verify_device (email, keeper push, 2FA, resume).""" + menu = [ + ("email_send", "to send email"), + ("email_code=", "to validate verification code sent via email"), + ("keeper_push", "to send Keeper Push notification"), + ("2fa_send", "to send 2FA code"), + ("2fa_code=", "to validate a code provided by 2FA application"), + ("", "to resume"), + ] + lines = ["Approve by selecting a method below"] + lines.extend(f" {cmd} {desc}" for cmd, desc in menu) + print("\n".join(lines)) + + selection = input("Type your selection or to resume: ").strip() + if selection is None: + return + if selection in ("email_send", "es"): + step.send_push(channel=login_auth.DeviceApprovalChannel.Email) + print("An email with instructions has been sent. Press when approved.") + elif selection.startswith("email_code="): + code = selection[len("email_code=") :] + step.send_code(channel=login_auth.DeviceApprovalChannel.Email, code=code) + print("Successfully verified email code.") + elif selection in ("keeper_push", "kp"): + step.send_push(channel=login_auth.DeviceApprovalChannel.KeeperPush) + print( + "Successfully made a push notification to the approved device. " + "Press when approved." + ) + elif selection in ("2fa_send", "2fs"): + step.send_push(channel=login_auth.DeviceApprovalChannel.TwoFactor) + print("2FA code was sent.") + elif selection.startswith("2fa_code="): + code = selection[len("2fa_code=") :] + step.send_code(channel=login_auth.DeviceApprovalChannel.TwoFactor, code=code) + print("Successfully verified 2FA code.") + else: + step.resume() + + def _handle_password(self, step: login_auth.LoginStepPassword) -> None: + """Password step: prompt for password and retry on auth_failed (aligned with keepercli handle_verify_password).""" + print(f"\nEnter password for {step.username}") + while True: + password = getpass.getpass("Password: ") + if not password: + raise KeyboardInterrupt() + try: + step.verify_password(password) + break + except errors.KeeperApiError as kae: + print( + "Invalid email or password combination, please re-enter." + if kae.result_code == "auth_failed" + else kae.message + ) + + def _handle_two_factor(self, step: login_auth.LoginStepTwoFactor) -> None: + channels = [ + x + for x in step.get_channels() + if x.channel_type != login_auth.TwoFactorChannel.Other + ] + menu = [] + for i, channel in enumerate(channels): + desc = self._two_factor_channel_desc(channel.channel_type) + menu.append( + ( + str(i + 1), + f"{desc} {channel.channel_name} {channel.phone}", + ) + ) + menu.append(("q", "Quit authentication attempt and return to Commander prompt.")) + + lines = ["", "This account requires 2FA Authentication"] + lines.extend(f" {a}. {t}" for a, t in menu) + print("\n".join(lines)) + + while True: + selection = input("Selection: ") + if selection is None: + return + if selection in ("q", "Q"): + raise KeyboardInterrupt() + try: + assert selection.isnumeric() + idx = 1 if not selection else int(selection) + assert 1 <= idx <= len(channels) + channel = channels[idx - 1] + desc = self._two_factor_channel_desc(channel.channel_type) + print(f"Selected {idx}. {desc}") + except AssertionError: + print( + "Invalid entry, additional factors of authentication shown " + "may be configured if not currently enabled." + ) + continue + + if channel.channel_type in ( + login_auth.TwoFactorChannel.TextMessage, + login_auth.TwoFactorChannel.KeeperDNA, + login_auth.TwoFactorChannel.DuoSecurity, + ): + action = next( + ( + x + for x in step.get_channel_push_actions(channel.channel_uid) + if x + in ( + login_auth.TwoFactorPushAction.TextMessage, + login_auth.TwoFactorPushAction.KeeperDna, + ) + ), + None, + ) + if action: + step.send_push(channel.channel_uid, action) + + if channel.channel_type == login_auth.TwoFactorChannel.SecurityKey: + try: + challenge = json.loads(channel.challenge) + signature = yubikey_authenticate(challenge, FidoCliInteraction()) + if signature: + print("Verified Security Key.") + step.send_code(channel.channel_uid, signature) + return + except Exception as e: + logger.error(e) + continue + + # 2FA code path + step.duration = min(step.duration, channel.max_expiration) + available_dura = sorted( + x for x in _TWO_FACTOR_DURATION_CODES if x <= channel.max_expiration + ) + available_codes = [ + _TWO_FACTOR_DURATION_CODES.get(x) or "login" for x in available_dura + ] + + while True: + mfa_desc = self._two_factor_duration_desc(step.duration) + prompt_exp = ( + f"\n2FA Code Duration: {mfa_desc}.\n" + f"To change duration: 2fa_duration={'|'.join(available_codes)}" + ) + print(prompt_exp) + + selection = input("\nEnter 2FA Code or Duration: ") + if not selection: + return + if selection in available_codes: + step.duration = self._two_factor_code_to_duration(selection) + elif selection.startswith("2fa_duration="): + code = selection[len("2fa_duration=") :] + if code in available_codes: + step.duration = self._two_factor_code_to_duration(code) + else: + print(f"Invalid 2FA duration: {code}") + else: + try: + step.send_code(channel.channel_uid, selection) + print("Successfully verified 2FA Code.") + return + except errors.KeeperApiError as kae: + print(f"Invalid 2FA code: ({kae.result_code}) {kae.message}") + + def _handle_sso_data_key( + self, step: login_auth.LoginStepSsoDataKey + ) -> None: + menu = [ + ("1", "Keeper Push. Send a push notification to your device."), + ("2", "Admin Approval. Request your admin to approve this device."), + ("r", "Resume SSO authentication after device is approved."), + ("q", "Quit SSO authentication attempt and return to Commander prompt."), + ] + lines = ["Approve this device by selecting a method below:"] + lines.extend(f" {cmd:>3}. {text}" for cmd, text in menu) + print("\n".join(lines)) + + while True: + answer = input("Selection: ") + if answer is None: + return + if answer == "q": + raise KeyboardInterrupt() + if answer == "r": + step.resume() + break + if answer in ("1", "2"): + step.request_data_key( + login_auth.DataKeyShareChannel.KeeperPush + if answer == "1" + else login_auth.DataKeyShareChannel.AdminApproval + ) + else: + print(f'Action "{answer}" is not supported.') + + def _handle_sso_token(self, step: login_auth.LoginStepSsoToken) -> None: + menu = [ + ("a", "SSO User with a Master Password."), + ] + if pyperclip: + menu.append(("c", "Copy SSO Login URL to clipboard.")) + else: + menu.append(("u", "Show SSO Login URL.")) + try: + wb = webbrowser.get() + menu.append(("o", "Navigate to SSO Login URL with the default web browser.")) + except Exception: + wb = None + if pyperclip: + menu.append(("p", "Paste SSO Token from clipboard.")) + menu.append(("t", "Enter SSO Token manually.")) + menu.append(("q", "Quit SSO authentication attempt and return to Commander prompt.")) + + lines = [ + "", + "SSO Login URL:", + step.sso_login_url, + "Navigate to SSO Login URL with your browser and complete authentication.", + "Copy a returned SSO Token into clipboard." + + (" Paste that token into Commander." if pyperclip else " Then use option 't' to enter the token manually."), + 'NOTE: To copy SSO Token please click "Copy authentication token" ' + 'button on "SSO Connect" page.', + "", + ] + lines.extend(f" {a:>3}. {t}" for a, t in menu) + print("\n".join(lines)) + + while True: + token = input("Selection: ") + if token == "q": + raise KeyboardInterrupt() + if token == "a": + step.login_with_password() + return + if token == "c": + token = None + if pyperclip: + try: + pyperclip.copy(step.sso_login_url) + print("SSO Login URL is copied to clipboard.") + except Exception: + print("Failed to copy SSO Login URL to clipboard.") + else: + print("Clipboard not available (install pyperclip).") + elif token == "u": + token = None + if not pyperclip: + print("\nSSO Login URL:", step.sso_login_url, "\n") + else: + print("Unsupported menu option (use 'c' to copy URL).") + elif token == "o": + token = None + if wb: + try: + wb.open_new_tab(step.sso_login_url) + except Exception: + print("Failed to open web browser.") + elif token == "p": + if pyperclip: + try: + token = pyperclip.paste() + except Exception: + token = "" + print("Failed to paste from clipboard") + else: + token = None + print("Clipboard not available (use 't' to enter token manually).") + elif token == "t": + token = getpass.getpass("Enter SSO Token: ").strip() + else: + if len(token) < 10: + print(f"Unsupported menu option: {token}") + continue + + if token: + try: + step.set_sso_token(token) + break + except errors.KeeperApiError as kae: + print(f"SSO Login error: ({kae.result_code}) {kae.message}") + + @staticmethod + def _two_factor_channel_desc( + channel_type: login_auth.TwoFactorChannel, + ) -> str: + return { + login_auth.TwoFactorChannel.Authenticator: "TOTP (Google and Microsoft Authenticator)", + login_auth.TwoFactorChannel.TextMessage: "Send SMS Code", + login_auth.TwoFactorChannel.DuoSecurity: "DUO", + login_auth.TwoFactorChannel.RSASecurID: "RSA SecurID", + login_auth.TwoFactorChannel.SecurityKey: "WebAuthN (FIDO2 Security Key)", + login_auth.TwoFactorChannel.KeeperDNA: "Keeper DNA (Watch)", + login_auth.TwoFactorChannel.Backup: "Backup Code", + }.get(channel_type, "Not Supported") + + @staticmethod + def _two_factor_duration_desc( + duration: login_auth.TwoFactorDuration, + ) -> str: + return { + login_auth.TwoFactorDuration.EveryLogin: "Require Every Login", + login_auth.TwoFactorDuration.Forever: "Save on this Device Forever", + login_auth.TwoFactorDuration.Every12Hours: "Ask Every 12 hours", + login_auth.TwoFactorDuration.EveryDay: "Ask Every 24 hours", + login_auth.TwoFactorDuration.Every30Days: "Ask Every 30 days", + }.get(duration, "Require Every Login") + + @staticmethod + def _two_factor_code_to_duration( + text: str, + ) -> login_auth.TwoFactorDuration: + for dura, code in _TWO_FACTOR_DURATION_CODES.items(): + if code == text: + return dura + return login_auth.TwoFactorDuration.EveryLogin + + +def enable_persistent_login(keeper_auth_context: keeper_auth.KeeperAuth) -> None: + """ + Enable persistent login and register data key for device. + Sets persistent_login to on and logout_timer to 30 days. + """ + keeper_auth.set_user_setting(keeper_auth_context, 'persistent_login', '1') + keeper_auth.register_data_key_for_device(keeper_auth_context) + mins_per_day = 60 * 24 + timeout_in_minutes = mins_per_day * 30 # 30 days + keeper_auth.set_user_setting(keeper_auth_context, 'logout_timer', str(timeout_in_minutes)) + print("Persistent login turned on successfully and device registered") + + +def login(): + """ + Handle the login process including server selection, authentication, + and multi-factor authentication steps (device approval, password, 2FA + with channel selection and Security Key, SSO data key, SSO token). + + Returns: + tuple: (keeper_auth_context, keeper_endpoint) on success, or (None, None) if login fails. + """ + flow = LoginFlow() + keeper_auth_context = flow.run() + if keeper_auth_context and not flow.logged_in_with_persistent: + enable_persistent_login(keeper_auth_context) + keeper_endpoint = flow.endpoint if keeper_auth_context else None + return keeper_auth_context, keeper_endpoint + + +def _resolve_record_uid( + vault: vault_online.VaultOnline, + record_title_or_uid: str, +) -> str: + """ + Resolve record title or UID to a record UID. + + If record_title_or_uid matches a record UID (record exists), return it. + Otherwise find the first record whose title equals or contains the given string. + """ + if not record_title_or_uid or not record_title_or_uid.strip(): + raise ValueError("Record title or UID must be non-empty.") + + candidate = record_title_or_uid.strip() + + if vault.vault_data.get_record(candidate) is not None: + return candidate + + candidate_lower = candidate.lower() + for record_info in vault.vault_data.records(): + if record_info.version not in (2, 3): + continue + if ( + record_info.title.lower() == candidate_lower + or candidate_lower in record_info.title.lower() + ): + return record_info.record_uid + + raise ValueError( + f"No record found matching {record_title_or_uid!r}. " + "Use an existing record title or record UID." + ) + + +def remove_one_time_share_example( + vault: vault_online.VaultOnline, + record_title_or_uid: str, + share_identifier: str, +) -> bool: + """ + Remove a one-time share for the given record using the SDK. + + Args: + vault: Initialized VaultOnline instance. + record_title_or_uid: Record title or record UID that has the one-time share. + share_identifier: Full share link ID, or unique prefix. + + Returns: + True if the share was removed, False on error. + """ + record_uid = _resolve_record_uid(vault, record_title_or_uid) + try: + one_time_share.remove_one_time_share( + vault=vault, + record_uid=record_uid, + share_identifier=share_identifier, + ) + print(f"One-time share {share_identifier!r} removed for record {record_title_or_uid!r}.") + return True + except ValueError as e: + print(f"Error removing one-time share: {e}") + return False + + +def remove_one_time_share_run(keeper_auth_context: keeper_auth.KeeperAuth) -> None: + """Build vault from auth context, remove one-time share with configured variables, then close.""" + # Record and share to remove + RECORD_TITLE_OR_UID = "My Login" # Record title or record UID that has the share + SHARE_IDENTIFIER = "Share for contractor" # Share link ID / prefix + + conn = sqlite3.Connection("file::memory:", uri=True) + vault_storage = sqlite_storage.SqliteVaultStorage( + lambda: conn, + vault_owner=bytes(keeper_auth_context.auth_context.username, "utf-8"), + ) + vault = vault_online.VaultOnline(keeper_auth_context, vault_storage) + vault.sync_down() + try: + remove_one_time_share_example( + vault, + RECORD_TITLE_OR_UID, + SHARE_IDENTIFIER, + ) + except Exception as e: + print(f"Error: {e}") + finally: + vault.close() + keeper_auth_context.close() + + +def main() -> None: + keeper_auth_context, _ = login() + if keeper_auth_context: + remove_one_time_share_run(keeper_auth_context) + else: + print("Login failed.") + + +if __name__ == "__main__": + main() diff --git a/keepercli-package/src/keepercli/commands/record_edit.py b/keepercli-package/src/keepercli/commands/record_edit.py index 99bc79c..ee75d8b 100644 --- a/keepercli-package/src/keepercli/commands/record_edit.py +++ b/keepercli-package/src/keepercli/commands/record_edit.py @@ -11,7 +11,7 @@ from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import ed25519 -from keepersdk.vault import (record_types, typed_field_utils, vault_record, attachment, record_facades, +from keepersdk.vault import (record_types, typed_field_utils, vault_record, attachment, record_facades, one_time_share, record_management, vault_online, vault_data, vault_types, vault_utils, vault_extensions, share_management_utils) from keepersdk import crypto, generator @@ -714,7 +714,7 @@ def execute(self, context: KeeperParams, **kwargs) -> None: SIX_MONTHS_IN_SECONDS = 182 * 24 * 60 * 60 if expiration_period.total_seconds() > SIX_MONTHS_IN_SECONDS: raise base.CommandError('URL expiration period cannot be greater than 6 months.') - url = record_utils.process_external_share(context=context, expiration_period=expiration_period, record=record) + url = one_time_share.create_one_time_share(context.vault, record.record_uid, expiration_period, is_self_destruct=True) expiration_date = datetime.datetime.now() + expiration_period formatted_date = expiration_date.strftime('%d/%m/%Y %H:%M:%S') message = f'Record self-destructs on {formatted_date} or after being viewed once. Once the link is opened the recipient will have 5 minutes to view the record.\n{url}' diff --git a/keepercli-package/src/keepercli/commands/shares.py b/keepercli-package/src/keepercli/commands/shares.py index 2794969..1ec67b4 100644 --- a/keepercli-package/src/keepercli/commands/shares.py +++ b/keepercli-package/src/keepercli/commands/shares.py @@ -7,8 +7,8 @@ from keepersdk import utils from keepersdk.authentication import keeper_auth -from keepersdk.proto import record_pb2, APIRequest_pb2 -from keepersdk.vault import ksm_management, vault_online, vault_utils, share_management_utils +from keepersdk.proto import record_pb2 +from keepersdk.vault import one_time_share, share_management_utils, vault_online, vault_utils from keepersdk.vault.shares_management import RecordShares, FolderShares from . import base @@ -21,7 +21,6 @@ class ApiUrl(Enum): SHARE_ADMIN = 'vault/am_i_share_admin' SHARE_UPDATE = 'vault/records_share_update' SHARE_FOLDER_UPDATE = 'vault/shared_folder_update_v3' - REMOVE_EXTERNAL_SHARE = 'vault/external_share_remove' class ShareAction(Enum): @@ -42,27 +41,14 @@ class ManagePermission(Enum): # Constants TIMESTAMP_MILLISECONDS_FACTOR = 1000 TRUNCATE_SUFFIX = '...' -URL_TRUNCATE_LENGTH = 30 -NON_SHARED_DEFAULT = 'non-shared' -CUSTOM_FIELD_TYPE_PREFIX = 'type:' -TOTP_FIELD_NAME = 'totp' -LIST_SEPARATOR = '|' -DICT_SEPARATOR = ';' -KEY_VALUE_SEPARATOR = '=' -PERMISSION_SEPARATOR = '=' -SHARE_NAMES_SEPARATOR = ', ' SUPPORTED_RECORD_VERSIONS = {2, 3} -DEFAULT_SEARCH_FIELDS = ['by_title', 'by_login', 'by_password'] CHUNK_SIZE = 500 -MAX_BATCH_SIZE = 1000 SHARE_LINK_TRUNCATE_LENGTH = 20 SIX_MONTHS_IN_SECONDS = 182 * 24 * 60 * 60 TEAMS_THRESHOLD = 500 -ALL_FOLDERS_WILDCARD = '*' ALL_USERS_WILDCARD = '@existing' ALL_USERS_WILDCARD_ALT = '@current' -DEFAULT_ACCOUNT_WILDCARD = '*' -DEFAULT_RECORD_WILDCARD = '*' +DEFAULT_WILDCARD = '*' def set_expiration_fields(obj, expiration): """Set expiration and timerNotificationType fields on proto object if expiration is provided.""" @@ -325,9 +311,9 @@ def _normalize_folder_names(self, folder_names) -> List: def _resolve_shared_folder_uids(self, vault: vault_online.VaultOnline, names: List) -> Set: """Resolve folder names to shared folder UIDs.""" - all_folders = any(x == ALL_FOLDERS_WILDCARD for x in names) + all_folders = any(x == DEFAULT_WILDCARD for x in names) if all_folders: - names = [x for x in names if x != ALL_FOLDERS_WILDCARD] + names = [x for x in names if x != DEFAULT_WILDCARD] shared_folder_cache = {x.shared_folder_uid: x for x in vault.vault_data.shared_folders()} folder_cache = {x.folder_uid: x for x in vault.vault_data.folders()} @@ -444,7 +430,7 @@ def _parse_user_arguments(self, vault, kwargs: Dict) -> Dict: } for u in (kwargs.get('user') or []): - if u == DEFAULT_ACCOUNT_WILDCARD: + if u == DEFAULT_WILDCARD: default_account = True elif u in (ALL_USERS_WILDCARD, ALL_USERS_WILDCARD_ALT): all_users = True @@ -502,7 +488,7 @@ def _parse_record_arguments(self, vault, kwargs: Dict) -> Dict: unresolved_names = [] for r in records: - if r == DEFAULT_RECORD_WILDCARD: + if r == DEFAULT_WILDCARD: default_record = True elif r in (ALL_USERS_WILDCARD, ALL_USERS_WILDCARD_ALT): all_records = True @@ -736,9 +722,10 @@ def execute(self, context: KeeperParams, **kwargs): if not record_uids: raise base.CommandError('No records found') - applications = self._get_applications(vault, record_uids) - table_data = self._build_share_table(applications, kwargs) - + shares = one_time_share.list_one_time_shares( + vault, list(record_uids), include_expired=kwargs.get('show_all', False) + ) + table_data = self._build_share_table_from_sdk(shares, kwargs) return self._format_output(table_data, kwargs) def _resolve_record_uids(self, context: KeeperParams, vault, records: List, recursive: bool) -> Set: @@ -790,69 +777,32 @@ def on_folder(f): else: on_folder(folder) - def _get_applications(self, vault, record_uids: Set): - """Get application info for the given record UIDs.""" - r_uids = list(record_uids) - if len(r_uids) >= MAX_BATCH_SIZE: - logger.info('Trimming result to %d records', MAX_BATCH_SIZE) - r_uids = r_uids[:MAX_BATCH_SIZE - 1] - return ksm_management.get_app_info(vault=vault, app_uid=r_uids) - - def _build_share_table(self, applications, kwargs): - """Build table data from applications.""" + def _build_share_table_from_sdk(self, shares: List[one_time_share.OneTimeShare], kwargs): + """Build table data from OneTimeShare list.""" show_all = kwargs.get('show_all', False) verbose = kwargs.get('verbose', False) - now = utils.current_milli_time() - + output_format = kwargs.get('format') fields = ['record_uid', 'share_link_name', 'share_link_id', 'generated', 'opened', 'expires'] if show_all: fields.append('status') - table = [] - output_format = kwargs.get('format') - - for app_info in applications: - if not app_info.isExternalShare: - continue - - for client in app_info.clients: - if not show_all and now > client.accessExpireOn: - continue - - link = self._create_share_link_data(app_info, client, verbose, output_format, now) - table.append([link.get(x, '') for x in fields]) - + for s in shares: + share_link_id = s.share_link_id + if output_format == 'table' and not verbose and len(share_link_id) > SHARE_LINK_TRUNCATE_LENGTH: + share_link_id = share_link_id[:SHARE_LINK_TRUNCATE_LENGTH] + TRUNCATE_SUFFIX + row = [ + s.record_uid, + s.share_link_name, + share_link_id, + s.generated or '', + s.opened or '', + s.expires or '', + ] + if show_all: + row.append(s.status) + table.append(row) return table, fields - def _create_share_link_data(self, app_info, client, verbose: bool, output_format: str, now: int): - """Create share link data dictionary.""" - encoded_client_id = utils.base64_url_encode(client.clientId) - link = { - 'record_uid': utils.base64_url_encode(app_info.appRecordUid), - 'name': client.id, - 'share_link_id': encoded_client_id, - 'generated': datetime.datetime.fromtimestamp(client.createdOn / TIMESTAMP_MILLISECONDS_FACTOR), - 'expires': datetime.datetime.fromtimestamp(client.accessExpireOn / TIMESTAMP_MILLISECONDS_FACTOR), - } - - if output_format == 'table' and not verbose: - link['share_link_id'] = encoded_client_id[:SHARE_LINK_TRUNCATE_LENGTH] + TRUNCATE_SUFFIX - else: - link['share_link_id'] = encoded_client_id - - if client.firstAccess > 0: - link['opened'] = datetime.datetime.fromtimestamp(client.firstAccess / TIMESTAMP_MILLISECONDS_FACTOR) - link['accessed'] = datetime.datetime.fromtimestamp(client.lastAccess / TIMESTAMP_MILLISECONDS_FACTOR) - - if now > client.accessExpireOn: - link['status'] = 'Expired' - elif client.firstAccess > 0: - link['status'] = 'Opened' - else: - link['status'] = 'Generated' - - return link - def _format_output(self, table_data, kwargs): """Format and return the output.""" table, fields = table_data @@ -914,7 +864,7 @@ def execute(self, context: KeeperParams, **kwargs): raise base.CommandError('URL expiration period parameter \"--expire\" is required.') period = self._validate_and_parse_expiration(period_str) - + urls = self._create_share_urls(context, vault, record_names, period, name, is_editable) return self._handle_output(context, urls, kwargs) @@ -931,11 +881,15 @@ def _create_share_urls(self, context: KeeperParams, vault, record_names: List, p urls = {} for record_name in record_names: record_uid = record_utils.resolve_record(context=context, name=record_name) - record = vault.vault_data.load_record(record_uid=record_uid) - url = record_utils.process_external_share( - context=context, expiration_period=period, record=record, name=name, is_editable=is_editable, is_self_destruct=False + url = one_time_share.create_one_time_share( + vault=vault, + record_uid=record_uid, + expiration_period=period, + name=name or None, + is_editable=is_editable, + is_self_destruct=False, ) - urls[record_uid] = str(url) + urls[record_uid] = url return urls def _handle_output(self, context: KeeperParams, urls: Dict, kwargs): @@ -992,83 +946,24 @@ def add_arguments_to_parser(parser: argparse.ArgumentParser): def execute(self, context: KeeperParams, **kwargs): if not context.vault: raise ValueError('Vault is not initialized.') - - vault = context.vault + vault = context.vault record_name = kwargs.get('record') if not record_name: self.get_parser().print_help() return - record_uid = record_utils.resolve_record(context=context, name=record_name) - applications = ksm_management.get_app_info(vault=vault, app_uid=record_uid) - - if len(applications) == 0: - logger.info('There are no one-time shares for record \"%s\"', record_name) - return - share_name = kwargs.get('share') if not share_name: self.get_parser().print_help() return - client_id = self._find_client_id(applications, share_name) - if not client_id: - return - - self._remove_share(vault, record_uid, client_id, share_name, record_name) - - def _find_client_id(self, applications, share_name: str) -> Optional[bytes]: - - cleaned_name = share_name[:-len(TRUNCATE_SUFFIX)] if share_name.endswith(TRUNCATE_SUFFIX) else share_name - cleaned_name_lower = cleaned_name.lower() - - partial_matches = [] - - for app_info in applications: - if not app_info.isExternalShare: - continue - - for client in app_info.clients: - if client.id.lower() == cleaned_name_lower: - return client.clientId - - encoded_client_id = utils.base64_url_encode(client.clientId) - if encoded_client_id == cleaned_name: - return client.clientId - - if encoded_client_id.startswith(cleaned_name): - partial_matches.append(client.clientId) - - return self._resolve_partial_matches(partial_matches, share_name) - - def _resolve_partial_matches(self, partial_matches: List[bytes], original_name: str) -> Optional[bytes]: - """ - Resolve partial matches to a single client ID. - - Args: - partial_matches: List of client IDs that partially match - original_name: Original share name for error reporting - - Returns: - bytes: Single client ID if exactly one match, None otherwise - """ - if not partial_matches: - logger.warning('No one-time share found matching "%s"', original_name) - return None - - if len(partial_matches) == 1: - return partial_matches[0] - - # Multiple matches found - logger.warning('Multiple one-time shares found matching "%s". Please use a more specific identifier.', original_name) - return None - - def _remove_share(self, vault, record_uid: str, client_id: bytes, share_name: str, record_name: str): - """Remove the one-time share.""" - rq = APIRequest_pb2.RemoveAppClientsRequest() - rq.appRecordUid = utils.base64_url_decode(record_uid) - rq.clients.append(client_id) - - vault.keeper_auth.execute_auth_rest(request=rq, rest_endpoint=ApiUrl.REMOVE_EXTERNAL_SHARE.value) - logger.info('One-time share \"%s\" is removed from record \"%s\"', share_name, record_name) + try: + record_uid = record_utils.resolve_record(context=context, name=record_name) + one_time_share.remove_one_time_share(vault=vault, record_uid=record_uid, share_identifier=share_name) + logger.info(f'One-time share "{share_name}" is removed from record "{record_name}"') + except ValueError as e: + if 'no one-time shares' in str(e).lower() or 'there are no' in str(e).lower(): + logger.error(f'{str(e)}') + else: + logger.warning(f'{str(e)}') diff --git a/keepercli-package/src/keepercli/helpers/record_utils.py b/keepercli-package/src/keepercli/helpers/record_utils.py index 614129c..fb5e7bd 100644 --- a/keepercli-package/src/keepercli/helpers/record_utils.py +++ b/keepercli-package/src/keepercli/helpers/record_utils.py @@ -4,13 +4,10 @@ import hashlib import hmac import re -from datetime import timedelta from typing import Iterator, List, Optional from urllib import parse -from urllib.parse import urlunparse -from keepersdk import crypto, utils -from keepersdk.proto.APIRequest_pb2 import AddExternalShareRequest, Device +from keepersdk import utils from keepersdk.proto.enterprise_pb2 import GetSharingAdminsRequest, GetSharingAdminsResponse from keepersdk.vault import vault_online, vault_record, vault_types, vault_utils @@ -22,8 +19,6 @@ logger = api.get_logger() GET_SHARE_ADMINS = 'enterprise/get_sharing_admins' -EXTERNAL_SHARE_ADD_URL = 'vault/external_share_add' -KEEPER_SECRETS_MANAGER_CLIENT_ID = 'KEEPER_SECRETS_MANAGER_CLIENT_ID' def try_resolve_single_record(record_name: Optional[str], context: KeeperParams) -> Optional[vault_record.KeeperRecordInfo]: @@ -82,46 +77,6 @@ def default_confirm(prompt: str) -> bool: return input(f"{prompt} (y/n): ").strip().lower() == 'y' -def process_external_share(context: KeeperParams, expiration_period: timedelta, - record: vault_record.PasswordRecord | vault_record.TypedRecord, - name: Optional[str] = None, is_editable: bool = False, - is_self_destruct: Optional[bool] = True) -> str: - - vault = context.vault - record_uid = record.record_uid - record_key = vault.vault_data.get_record_key(record_uid=record_uid) - client_key = utils.generate_aes_key() - client_id = crypto.hmac_sha512(client_key, KEEPER_SECRETS_MANAGER_CLIENT_ID.encode()) - - request = AddExternalShareRequest() - request.recordUid = utils.base64_url_decode(record_uid) - request.encryptedRecordKey = crypto.encrypt_aes_v2(record_key, client_key) - request.clientId = client_id - request.accessExpireOn = utils.current_milli_time() + int(expiration_period.total_seconds() * 1000) - - if name: - request.id = name - - request.isSelfDestruct = is_self_destruct - request.isEditable = is_editable - - vault.keeper_auth.execute_auth_rest( - rest_endpoint=EXTERNAL_SHARE_ADD_URL, - request=request, - response_type=Device - ) - - url = urlunparse(( - 'https', - context.auth.keeper_endpoint.server, - '/vault/share', - None, - None, - utils.base64_url_encode(client_key) - )) - return url - - def get_totp_code(url, offset=None): comp = parse.urlparse(url) if comp.scheme == 'otpauth': diff --git a/keepersdk-package/src/keepersdk/vault/one_time_share.py b/keepersdk-package/src/keepersdk/vault/one_time_share.py new file mode 100644 index 0000000..81449c4 --- /dev/null +++ b/keepersdk-package/src/keepersdk/vault/one_time_share.py @@ -0,0 +1,277 @@ +"""One-time share operations for records.""" + +import dataclasses +import datetime +from typing import List, Optional, Union +from urllib.parse import urlunparse + +from .. import crypto, utils +from ..proto.APIRequest_pb2 import AddExternalShareRequest, Device, RemoveAppClientsRequest +from . import ksm_management, vault_online + + +TIMESTAMP_MILLISECONDS_FACTOR = 1000 +MAX_BATCH_SIZE = 1000 +SIX_MONTHS_IN_SECONDS = 182 * 24 * 60 * 60 +EXTERNAL_SHARE_ADD_URL = "vault/external_share_add" +REMOVE_EXTERNAL_SHARE_URL = "vault/external_share_remove" +KEEPER_SECRETS_MANAGER_CLIENT_ID = "KEEPER_SECRETS_MANAGER_CLIENT_ID" +TRUNCATE_SUFFIX = "..." + + +@dataclasses.dataclass +class OneTimeShare: + """One-time share link for a record.""" + + record_uid: str + share_link_name: str + share_link_id: str + generated: Optional[datetime.datetime] + expires: Optional[datetime.datetime] + opened: Optional[datetime.datetime] + accessed: Optional[datetime.datetime] + status: str # 'Expired' | 'Opened' | 'Generated' + + +def list_one_time_shares( + vault: vault_online.VaultOnline, + record_uid: Union[str, List[str]], + include_expired: bool = False, +) -> List[OneTimeShare]: + """ + List one-time shares for the given record UID(s). + + Args: + vault: Initialized VaultOnline instance. + record_uid: Single record UID (str) or list of record UIDs. + include_expired: If True, include shares that have already expired. + Default False returns only active or not-yet-opened shares. + + Returns: + List of OneTimeShare instances. + """ + if vault is None: + raise ValueError("Vault is not initialized.") + + uids = [record_uid] if isinstance(record_uid, str) else list(record_uid) + if not uids: + return [] + + if len(uids) > MAX_BATCH_SIZE: + uids = uids[:MAX_BATCH_SIZE] + + app_infos = ksm_management.get_app_info(vault=vault, app_uid=uids) + now = utils.current_milli_time() + result: List[OneTimeShare] = [] + + for app_info in app_infos: + if not getattr(app_info, "isExternalShare", False): + continue + + record_uid_str = utils.base64_url_encode(app_info.appRecordUid) + + for client in getattr(app_info, "clients", []): + if not include_expired and now > getattr(client, "accessExpireOn", 0): + continue + + _append_share_link(result, app_info, client, record_uid_str, now) + + return result + + +def _append_share_link( + result: List[OneTimeShare], + app_info, + client, + record_uid_str: str, + now: int, +) -> None: + """Build OneTimeShare from app_info/client and append to result.""" + created_ts = getattr(client, "createdOn", 0) or 0 + expires_ts = getattr(client, "accessExpireOn", 0) or 0 + first_access_ts = getattr(client, "firstAccess", 0) or 0 + last_access_ts = getattr(client, "lastAccess", 0) or 0 + + if now > expires_ts: + status = "Expired" + elif first_access_ts > 0: + status = "Opened" + else: + status = "Generated" + + result.append( + OneTimeShare( + record_uid=record_uid_str, + share_link_name=getattr(client, "id", "") or "", + share_link_id=utils.base64_url_encode(client.clientId), + generated=_ms_to_datetime(created_ts), + expires=_ms_to_datetime(expires_ts), + opened=_ms_to_datetime(first_access_ts) if first_access_ts else None, + accessed=_ms_to_datetime(last_access_ts) if last_access_ts else None, + status=status, + ) + ) + + +def _ms_to_datetime(ms: int) -> Optional[datetime.datetime]: + """Convert millisecond timestamp to datetime.""" + if not ms or ms <= 0: + return None + return datetime.datetime.fromtimestamp(ms / TIMESTAMP_MILLISECONDS_FACTOR) + + +def create_one_time_share( + vault: vault_online.VaultOnline, + record_uid: str, + expiration_period: datetime.timedelta, + name: Optional[str] = None, + is_editable: bool = False, + is_self_destruct: bool = False, +) -> str: + """ + Create a one-time share URL for a record. + + Args: + vault: Initialized VaultOnline instance. + record_uid: Record UID to share. + expiration_period: How long the share link is valid (e.g. timedelta(days=7)). + Cannot exceed 6 months. + name: Optional label for the share link. + is_editable: If True, the recipient can edit the shared record. + is_self_destruct: If True, the share is invalidated after first open. + + Returns: + The one-time share URL (string). The recipient opens this URL to access the record. + + Raises: + ValueError: If vault is not initialized, record is not found, or + expiration_period exceeds 6 months. + """ + if vault is None: + raise ValueError("Vault is not initialized.") + + if expiration_period.total_seconds() > SIX_MONTHS_IN_SECONDS: + raise ValueError( + "Expiration period cannot be greater than 6 months." + ) + + record_key = vault.vault_data.get_record_key(record_uid=record_uid) + if record_key is None: + raise ValueError(f"Record not found: {record_uid}") + + client_key = utils.generate_aes_key() + client_id = crypto.hmac_sha512( + client_key, KEEPER_SECRETS_MANAGER_CLIENT_ID.encode() + ) + + request = AddExternalShareRequest() + request.recordUid = utils.base64_url_decode(record_uid) + request.encryptedRecordKey = crypto.encrypt_aes_v2(record_key, client_key) + request.clientId = client_id + request.accessExpireOn = utils.current_milli_time() + int( + expiration_period.total_seconds() * 1000 + ) + if name: + request.id = name + request.isSelfDestruct = is_self_destruct + request.isEditable = is_editable + + vault.keeper_auth.execute_auth_rest( + rest_endpoint=EXTERNAL_SHARE_ADD_URL, + request=request, + response_type=Device, + ) + + server = vault.keeper_auth.keeper_endpoint.server + url = urlunparse( + ( + "https", + server, + "/vault/share", + None, + None, + utils.base64_url_encode(client_key), + ) + ) + return url + + +def remove_one_time_share( + vault: vault_online.VaultOnline, + record_uid: str, + share_identifier: str, +) -> None: + """ + Remove a one-time share for a record. + + Args: + vault: Initialized VaultOnline instance. + record_uid: Record UID that has the one-time share. + share_identifier: One-time share name (client.id), full share link ID + (base64-encoded client ID), or a unique prefix of the share link ID. + + Raises: + ValueError: If vault is not initialized, no one-time shares exist for + the record, no share matches the identifier, or multiple shares + match a partial identifier. + """ + if vault is None: + raise ValueError("Vault is not initialized.") + + app_infos = ksm_management.get_app_info(vault=vault, app_uid=record_uid) + if not app_infos: + raise ValueError( + f"There are no one-time shares for record {record_uid!r}." + ) + + client_id = _find_client_id(app_infos, share_identifier) + if client_id is None: + raise ValueError( + f'No one-time share found matching {share_identifier!r} for record {record_uid!r}.' + ) + + request = RemoveAppClientsRequest() + request.appRecordUid = utils.base64_url_decode(record_uid) + request.clients.append(client_id) + + vault.keeper_auth.execute_auth_rest( + request=request, + rest_endpoint=REMOVE_EXTERNAL_SHARE_URL, + ) + + +def _find_client_id(app_infos, share_identifier: str) -> Optional[bytes]: + """ + Resolve share name or ID to a single client ID (bytes). + + Matches by exact share name (client.id), exact base64 clientId, or + unique prefix of base64 clientId. + """ + cleaned = ( + share_identifier[: -len(TRUNCATE_SUFFIX)] + if share_identifier.endswith(TRUNCATE_SUFFIX) + else share_identifier + ) + cleaned_lower = cleaned.lower() + partial_matches: List[bytes] = [] + + for app_info in app_infos: + if not getattr(app_info, "isExternalShare", False): + continue + for client in getattr(app_info, "clients", []): + if (getattr(client, "id", "") or "").lower() == cleaned_lower: + return client.clientId + encoded = utils.base64_url_encode(client.clientId) + if encoded == cleaned: + return client.clientId + if encoded.startswith(cleaned): + partial_matches.append(client.clientId) + + if not partial_matches: + return None + if len(partial_matches) == 1: + return partial_matches[0] + raise ValueError( + f'Multiple one-time shares match {share_identifier!r}. Use a more specific identifier.' + ) + From 7df36f7bacea70045c84e7051d13d32bd5e53e5d Mon Sep 17 00:00:00 2001 From: adeshmukh-ks Date: Tue, 17 Mar 2026 15:12:12 +0530 Subject: [PATCH 2/2] record permissions commands support added --- .../update_record_permissions.py | 611 +++++++++++++ .../commands/record_handling_commands.py | 843 +++++------------- .../keepersdk/vault/share_management_utils.py | 412 ++++++++- 3 files changed, 1238 insertions(+), 628 deletions(-) create mode 100644 examples/sdk_examples/sharing_commands/update_record_permissions.py diff --git a/examples/sdk_examples/sharing_commands/update_record_permissions.py b/examples/sdk_examples/sharing_commands/update_record_permissions.py new file mode 100644 index 0000000..f35b9a9 --- /dev/null +++ b/examples/sdk_examples/sharing_commands/update_record_permissions.py @@ -0,0 +1,611 @@ +""" +Sample script demonstrating update_record_permissions from share_management_utils. + +Updates record-level permissions (can_edit / can_share) for records in a folder, +either for direct record shares, shared folder record permissions, or both. +Uses the same login and vault setup pattern as share_folder.py. +""" +import getpass +import json +import logging +import sqlite3 +from typing import Any, Dict, Optional + +import fido2 +import webbrowser + +from keepersdk import errors, utils +from keepersdk.authentication import ( + configuration, + endpoint, + keeper_auth, + login_auth, +) +from keepersdk.authentication.yubikey import ( + IKeeperUserInteraction, + yubikey_authenticate, +) +from keepersdk.constants import KEEPER_PUBLIC_HOSTS +from keepersdk.vault import share_management_utils, sqlite_storage, vault_online + +try: + import pyperclip +except ImportError: + pyperclip = None + +logger = utils.get_logger() +logger.setLevel(logging.INFO) +if not logger.handlers: + _handler = logging.StreamHandler() + _handler.setLevel(logging.INFO) + _handler.setFormatter( + logging.Formatter("%(asctime)s - %(levelname)s - %(name)s - %(message)s") + ) + logger.addHandler(_handler) + + +class FidoCliInteraction(fido2.client.UserInteraction, IKeeperUserInteraction): + def output_text(self, text: str) -> None: + print(text) + + def prompt_up(self) -> None: + print( + "\nTouch the flashing Security key to authenticate or " + "press Ctrl-C to resume with the primary two factor authentication..." + ) + + def request_pin(self, permissions, rd_id): + return getpass.getpass("Enter Security Key PIN: ") + + def request_uv(self, permissions, rd_id): + print("User Verification required.") + return True + + +# Two-factor duration codes (used by LoginFlow) +_TWO_FACTOR_DURATION_CODES: Dict[login_auth.TwoFactorDuration, str] = { + login_auth.TwoFactorDuration.EveryLogin: "login", + login_auth.TwoFactorDuration.Every12Hours: "12_hours", + login_auth.TwoFactorDuration.EveryDay: "24_hours", + login_auth.TwoFactorDuration.Every30Days: "30_days", + login_auth.TwoFactorDuration.Forever: "forever", +} + + +class LoginFlow: + """ + Handles the full login process: server selection, username, password, + device approval, 2FA, SSO data key, and SSO token. + """ + + def __init__(self) -> None: + self._config = configuration.JsonConfigurationStorage() + self._logged_in_with_persistent = True + self._endpoint: Optional[endpoint.KeeperEndpoint] = None + + @property + def endpoint(self) -> Optional[endpoint.KeeperEndpoint]: + return self._endpoint + + @property + def logged_in_with_persistent(self) -> bool: + """True if login succeeded by resuming an existing persistent session (no step loop).""" + return self._logged_in_with_persistent + + def run(self) -> Optional[keeper_auth.KeeperAuth]: + """ + Run the login flow. + + Returns: + Authenticated Keeper context, or None if login fails. + """ + server = self._ensure_server() + keeper_endpoint = endpoint.KeeperEndpoint(self._config, server) + self._endpoint = keeper_endpoint + login_auth_context = login_auth.LoginAuth(keeper_endpoint) + + username = self._config.get().last_login or input("Enter username: ") + login_auth_context.resume_session = True + login_auth_context.login(username) + + while not login_auth_context.login_step.is_final(): + step = login_auth_context.login_step + if isinstance(step, login_auth.LoginStepDeviceApproval): + self._handle_device_approval(step) + elif isinstance(step, login_auth.LoginStepTwoFactor): + self._handle_two_factor(step) + elif isinstance(step, login_auth.LoginStepPassword): + self._handle_password(step) + elif isinstance(step, login_auth.LoginStepSsoToken): + self._handle_sso_token(step) + elif isinstance(step, login_auth.LoginStepSsoDataKey): + self._handle_sso_data_key(step) + elif isinstance(step, login_auth.LoginStepError): + print(f"Login error: ({step.code}) {step.message}") + return None + else: + raise NotImplementedError( + f"Unsupported login step type: {type(step).__name__}" + ) + self._logged_in_with_persistent = False + + if self._logged_in_with_persistent: + print("Successfully logged in with persistent login") + + if isinstance(login_auth_context.login_step, login_auth.LoginStepConnected): + return login_auth_context.login_step.take_keeper_auth() + + return None + + def _ensure_server(self) -> str: + if not self._config.get().last_server: + print("Available server options:") + for region, host in KEEPER_PUBLIC_HOSTS.items(): + print(f" {region}: {host}") + server = ( + input("Enter server (default: keepersecurity.com): ").strip() + or "keepersecurity.com" + ) + self._config.get().last_server = server + else: + server = self._config.get().last_server + return server + + def _handle_device_approval( + self, step: login_auth.LoginStepDeviceApproval + ) -> None: + menu = [ + ("email_send", "to send email"), + ("email_code=", "to validate verification code sent via email"), + ("keeper_push", "to send Keeper Push notification"), + ("2fa_send", "to send 2FA code"), + ("2fa_code=", "to validate a code provided by 2FA application"), + ("", "to resume"), + ] + lines = ["Approve by selecting a method below"] + lines.extend(f" {cmd} {desc}" for cmd, desc in menu) + print("\n".join(lines)) + selection = input("Type your selection or to resume: ").strip() + if selection is None: + return + if selection in ("email_send", "es"): + step.send_push(channel=login_auth.DeviceApprovalChannel.Email) + print("An email with instructions has been sent. Press when approved.") + elif selection.startswith("email_code="): + code = selection[len("email_code=") :] + step.send_code(channel=login_auth.DeviceApprovalChannel.Email, code=code) + print("Successfully verified email code.") + elif selection in ("keeper_push", "kp"): + step.send_push(channel=login_auth.DeviceApprovalChannel.KeeperPush) + print( + "Successfully made a push notification to the approved device. " + "Press when approved." + ) + elif selection in ("2fa_send", "2fs"): + step.send_push(channel=login_auth.DeviceApprovalChannel.TwoFactor) + print("2FA code was sent.") + elif selection.startswith("2fa_code="): + code = selection[len("2fa_code=") :] + step.send_code(channel=login_auth.DeviceApprovalChannel.TwoFactor, code=code) + print("Successfully verified 2FA code.") + else: + step.resume() + + def _handle_password(self, step: login_auth.LoginStepPassword) -> None: + print(f"\nEnter password for {step.username}") + while True: + password = getpass.getpass("Password: ") + if not password: + raise KeyboardInterrupt() + try: + step.verify_password(password) + break + except errors.KeeperApiError as kae: + print( + "Invalid email or password combination, please re-enter." + if kae.result_code == "auth_failed" + else kae.message + ) + + def _handle_two_factor(self, step: login_auth.LoginStepTwoFactor) -> None: + channels = [ + x + for x in step.get_channels() + if x.channel_type != login_auth.TwoFactorChannel.Other + ] + menu = [] + for i, channel in enumerate(channels): + desc = self._two_factor_channel_desc(channel.channel_type) + menu.append( + ( + str(i + 1), + f"{desc} {channel.channel_name} {channel.phone}", + ) + ) + menu.append(("q", "Quit authentication attempt and return to Commander prompt.")) + lines = ["", "This account requires 2FA Authentication"] + lines.extend(f" {a}. {t}" for a, t in menu) + print("\n".join(lines)) + while True: + selection = input("Selection: ") + if selection is None: + return + if selection in ("q", "Q"): + raise KeyboardInterrupt() + try: + assert selection.isnumeric() + idx = 1 if not selection else int(selection) + assert 1 <= idx <= len(channels) + channel = channels[idx - 1] + desc = self._two_factor_channel_desc(channel.channel_type) + print(f"Selected {idx}. {desc}") + except AssertionError: + print( + "Invalid entry, additional factors of authentication shown " + "may be configured if not currently enabled." + ) + continue + if channel.channel_type in ( + login_auth.TwoFactorChannel.TextMessage, + login_auth.TwoFactorChannel.KeeperDNA, + login_auth.TwoFactorChannel.DuoSecurity, + ): + action = next( + ( + x + for x in step.get_channel_push_actions(channel.channel_uid) + if x + in ( + login_auth.TwoFactorPushAction.TextMessage, + login_auth.TwoFactorPushAction.KeeperDna, + ) + ), + None, + ) + if action: + step.send_push(channel.channel_uid, action) + if channel.channel_type == login_auth.TwoFactorChannel.SecurityKey: + try: + challenge = json.loads(channel.challenge) + signature = yubikey_authenticate(challenge, FidoCliInteraction()) + if signature: + print("Verified Security Key.") + step.send_code(channel.channel_uid, signature) + return + except Exception as e: + logger.error(e) + continue + step.duration = min(step.duration, channel.max_expiration) + available_dura = sorted( + x for x in _TWO_FACTOR_DURATION_CODES if x <= channel.max_expiration + ) + available_codes = [ + _TWO_FACTOR_DURATION_CODES.get(x) or "login" for x in available_dura + ] + while True: + mfa_desc = self._two_factor_duration_desc(step.duration) + prompt_exp = ( + f"\n2FA Code Duration: {mfa_desc}.\n" + f"To change duration: 2fa_duration={'|'.join(available_codes)}" + ) + print(prompt_exp) + selection = input("\nEnter 2FA Code or Duration: ") + if not selection: + return + if selection in available_codes: + step.duration = self._two_factor_code_to_duration(selection) + elif selection.startswith("2fa_duration="): + code = selection[len("2fa_duration=") :] + if code in available_codes: + step.duration = self._two_factor_code_to_duration(code) + else: + print(f"Invalid 2FA duration: {code}") + else: + try: + step.send_code(channel.channel_uid, selection) + print("Successfully verified 2FA Code.") + return + except errors.KeeperApiError as kae: + print(f"Invalid 2FA code: ({kae.result_code}) {kae.message}") + + def _handle_sso_data_key( + self, step: login_auth.LoginStepSsoDataKey + ) -> None: + menu = [ + ("1", "Keeper Push. Send a push notification to your device."), + ("2", "Admin Approval. Request your admin to approve this device."), + ("r", "Resume SSO authentication after device is approved."), + ("q", "Quit SSO authentication attempt and return to Commander prompt."), + ] + lines = ["Approve this device by selecting a method below:"] + lines.extend(f" {cmd:>3}. {text}" for cmd, text in menu) + print("\n".join(lines)) + while True: + answer = input("Selection: ") + if answer is None: + return + if answer == "q": + raise KeyboardInterrupt() + if answer == "r": + step.resume() + break + if answer in ("1", "2"): + step.request_data_key( + login_auth.DataKeyShareChannel.KeeperPush + if answer == "1" + else login_auth.DataKeyShareChannel.AdminApproval + ) + else: + print(f'Action "{answer}" is not supported.') + + def _handle_sso_token(self, step: login_auth.LoginStepSsoToken) -> None: + menu = [ + ("a", "SSO User with a Master Password."), + ] + if pyperclip: + menu.append(("c", "Copy SSO Login URL to clipboard.")) + else: + menu.append(("u", "Show SSO Login URL.")) + try: + wb = webbrowser.get() + menu.append(("o", "Navigate to SSO Login URL with the default web browser.")) + except Exception: + wb = None + if pyperclip: + menu.append(("p", "Paste SSO Token from clipboard.")) + menu.append(("t", "Enter SSO Token manually.")) + menu.append(("q", "Quit SSO authentication attempt and return to Commander prompt.")) + lines = [ + "", + "SSO Login URL:", + step.sso_login_url, + "Navigate to SSO Login URL with your browser and complete authentication.", + "Copy a returned SSO Token into clipboard." + + (" Paste that token into Commander." if pyperclip else " Then use option 't' to enter the token manually."), + 'NOTE: To copy SSO Token please click "Copy authentication token" ' + 'button on "SSO Connect" page.', + "", + ] + lines.extend(f" {a:>3}. {t}" for a, t in menu) + print("\n".join(lines)) + while True: + token = input("Selection: ") + if token == "q": + raise KeyboardInterrupt() + if token == "a": + step.login_with_password() + return + if token == "c": + token = None + if pyperclip: + try: + pyperclip.copy(step.sso_login_url) + print("SSO Login URL is copied to clipboard.") + except Exception: + print("Failed to copy SSO Login URL to clipboard.") + else: + print("Clipboard not available (install pyperclip).") + elif token == "u": + token = None + if not pyperclip: + print("\nSSO Login URL:", step.sso_login_url, "\n") + else: + print("Unsupported menu option (use 'c' to copy URL).") + elif token == "o": + token = None + if wb: + try: + wb.open_new_tab(step.sso_login_url) + except Exception: + print("Failed to open web browser.") + elif token == "p": + if pyperclip: + try: + token = pyperclip.paste() + except Exception: + token = "" + print("Failed to paste from clipboard") + else: + token = None + print("Clipboard not available (use 't' to enter token manually).") + elif token == "t": + token = getpass.getpass("Enter SSO Token: ").strip() + else: + if len(token) < 10: + print(f"Unsupported menu option: {token}") + continue + if token: + try: + step.set_sso_token(token) + break + except errors.KeeperApiError as kae: + print(f"SSO Login error: ({kae.result_code}) {kae.message}") + + @staticmethod + def _two_factor_channel_desc( + channel_type: login_auth.TwoFactorChannel, + ) -> str: + return { + login_auth.TwoFactorChannel.Authenticator: "TOTP (Google and Microsoft Authenticator)", + login_auth.TwoFactorChannel.TextMessage: "Send SMS Code", + login_auth.TwoFactorChannel.DuoSecurity: "DUO", + login_auth.TwoFactorChannel.RSASecurID: "RSA SecurID", + login_auth.TwoFactorChannel.SecurityKey: "WebAuthN (FIDO2 Security Key)", + login_auth.TwoFactorChannel.KeeperDNA: "Keeper DNA (Watch)", + login_auth.TwoFactorChannel.Backup: "Backup Code", + }.get(channel_type, "Not Supported") + + @staticmethod + def _two_factor_duration_desc( + duration: login_auth.TwoFactorDuration, + ) -> str: + return { + login_auth.TwoFactorDuration.EveryLogin: "Require Every Login", + login_auth.TwoFactorDuration.Forever: "Save on this Device Forever", + login_auth.TwoFactorDuration.Every12Hours: "Ask Every 12 hours", + login_auth.TwoFactorDuration.EveryDay: "Ask Every 24 hours", + login_auth.TwoFactorDuration.Every30Days: "Ask Every 30 days", + }.get(duration, "Require Every Login") + + @staticmethod + def _two_factor_code_to_duration( + text: str, + ) -> login_auth.TwoFactorDuration: + for dura, code in _TWO_FACTOR_DURATION_CODES.items(): + if code == text: + return dura + return login_auth.TwoFactorDuration.EveryLogin + + +def enable_persistent_login(keeper_auth_context: keeper_auth.KeeperAuth) -> None: + """Enable persistent login and register data key for device.""" + keeper_auth.set_user_setting(keeper_auth_context, 'persistent_login', '1') + keeper_auth.register_data_key_for_device(keeper_auth_context) + mins_per_day = 60 * 24 + timeout_in_minutes = mins_per_day * 30 + keeper_auth.set_user_setting(keeper_auth_context, 'logout_timer', str(timeout_in_minutes)) + print("Persistent login turned on successfully and device registered") + + +def login(): + """ + Handle the login process. Returns (keeper_auth_context, keeper_endpoint) on success, + or (None, None) if login fails. + """ + flow = LoginFlow() + keeper_auth_context = flow.run() + if keeper_auth_context and not flow.logged_in_with_persistent: + enable_persistent_login(keeper_auth_context) + keeper_endpoint = flow.endpoint if keeper_auth_context else None + return keeper_auth_context, keeper_endpoint + + +def update_record_permissions_in_folder( + keeper_auth_context: keeper_auth.KeeperAuth, + action: str, + can_share: bool = False, + can_edit: bool = False, + folder_uid_or_path: Optional[str] = None, + recursive: bool = False, + share_record: bool = True, + share_folder: bool = True, + dry_run: bool = False, + sync_after: bool = True, +) -> Dict[str, Any]: + """ + Update record permissions (can_edit / can_share) in a folder using + share_management_utils.update_record_permissions. + + Args: + keeper_auth_context: Authenticated Keeper context. + action: 'grant' or 'revoke'. + can_share: Whether to change the "can share" permission. + can_edit: Whether to change the "can edit" permission. + folder_uid_or_path: Folder UID or path; None or empty = root. + recursive: If True, include all subfolders. + share_record: If True, update direct record shares. + share_folder: If True, update shared folder record permissions. + dry_run: If True, only compute and return planned updates; do not apply. + sync_after: If True and changes were applied, sync vault down after updates. + + Returns: + Result dict from update_record_permissions (direct_share_updates, + shared_folder_updates, direct_share_errors, shared_folder_errors, + skipped_shared_folders). + """ + conn = sqlite3.Connection("file::memory:", uri=True) + vault_storage = sqlite_storage.SqliteVaultStorage( + lambda: conn, + vault_owner=bytes(keeper_auth_context.auth_context.username, 'utf-8') + ) + vault = vault_online.VaultOnline(keeper_auth_context, vault_storage) + vault.sync_down() + + try: + result = share_management_utils.update_record_permissions( + vault=vault, + action=action, + can_share=can_share, + can_edit=can_edit, + folder_uid_or_path=folder_uid_or_path, + recursive=recursive, + share_record=share_record, + share_folder=share_folder, + dry_run=dry_run, + sync_after=sync_after, + ) + _print_result(result, action, dry_run) + return result + except share_management_utils.ShareValidationError as e: + print(f"Validation error: {e}") + raise + except share_management_utils.ShareNotFoundError as e: + print(f"Folder not found: {e}") + raise + finally: + vault.close() + keeper_auth_context.close() + + +def _print_result(result: Dict[str, Any], action: str, dry_run: bool) -> None: + """Print a summary of update_record_permissions result.""" + prefix = "[dry run] " if dry_run else "" + direct = result.get("direct_share_updates") or [] + sf_updates = result.get("shared_folder_updates") or {} + direct_errors = result.get("direct_share_errors") or [] + sf_errors = result.get("shared_folder_errors") or [] + skipped = result.get("skipped_shared_folders") or {} + + if direct: + print(f"{prefix}Direct share updates: {len(direct)}") + if sf_updates: + total_sf = sum(len(v) for v in sf_updates.values()) + print(f"{prefix}Shared folder record updates: {total_sf} (across {len(sf_updates)} shared folder(s))") + if not direct and not sf_updates and not direct_errors and not sf_errors: + print(f"{prefix}No permission changes to apply for action={action!r}.") + if direct_errors: + print(f"Direct share errors: {len(direct_errors)}") + if sf_errors: + print(f"Shared folder errors: {len(sf_errors)}") + if skipped: + print(f"Skipped shared folders (permissions): {len(skipped)}") + + +def main() -> None: + """ + Main entry point. Logs in and updates record permissions in a folder. + Edit the variables below to match your folder and desired permissions. + """ + keeper_auth_context, _ = login() + if not keeper_auth_context: + print("Login failed. Unable to update record permissions.") + return + + # Configure these for your run: + action = "grant" # or "revoke" + can_share = True + can_edit = True + folder_uid_or_path = "My Folder" # Use folder UID / path like "My Folder" + recursive = False + share_record = True # update direct record shares + share_folder = True # update shared folder record permissions + dry_run = False # set to False to apply changes + + update_record_permissions_in_folder( + keeper_auth_context=keeper_auth_context, + action=action, + can_share=can_share, + can_edit=can_edit, + folder_uid_or_path=folder_uid_or_path, + recursive=recursive, + share_record=share_record, + share_folder=share_folder, + dry_run=dry_run, + sync_after=not dry_run, + ) + if dry_run: + print("Run with dry_run=False to apply the changes.") + + +if __name__ == "__main__": + main() diff --git a/keepercli-package/src/keepercli/commands/record_handling_commands.py b/keepercli-package/src/keepercli/commands/record_handling_commands.py index a17bc98..af8642d 100644 --- a/keepercli-package/src/keepercli/commands/record_handling_commands.py +++ b/keepercli-package/src/keepercli/commands/record_handling_commands.py @@ -25,13 +25,7 @@ # Constants for FindDuplicateCommand TEAM_USER_TYPE = '(Team User)' NON_SHARED_LABEL = 'non-shared' -ENTERPRISE_COMPLIANCE_DAYS = 1 URL_DISPLAY_LENGTH = 30 -ENTERPRISE_UPDATE_FLOOR_DAYS = 1 - -# Default field mappings for duplicate detection -DEFAULT_MATCH_FIELDS = ['title', 'login', 'password'] -ENTERPRISE_FIELD_KEYS = ['title', 'url', 'record_type'] # Report field names FIELD_TITLE = 'Title' @@ -40,15 +34,12 @@ FIELD_WEBSITE_ADDRESS = 'Website Address' FIELD_CUSTOM_FIELDS = 'Custom Fields' FIELD_SHARES = 'Shares' -FIELD_RECORD_UID = 'record_uid' FIELD_GROUP = 'group' FIELD_URL = 'url' FIELD_RECORD_OWNER = 'record_owner' FIELD_SHARED_TO = 'shared_to' -FIELD_SHARED_FOLDER_UID = 'shared_folder_uid' # Report titles -ENTERPRISE_DUPLICATE_TITLE = 'Duplicate Search Results (Enterprise Scope):' VAULT_DUPLICATE_TITLE = 'Duplicates Found:' NO_DUPLICATES_FOUND = 'No duplicates found.' @@ -1111,492 +1102,137 @@ def _format_url(self, url, include_in_output): return [parsed_url] if include_in_output else [] -class _PermissionConfig: - """Configuration for permission changes. - - Attributes: - should_have: True if granting permissions, False if revoking - change_share: Whether to change share permissions - change_edit: Whether to change edit permissions - force: Skip confirmation prompts - dry_run: Only display changes without applying them - recursive: Apply to subfolders - """ - def __init__(self, action: str, can_share: bool, can_edit: bool, - force: bool, dry_run: bool, recursive: bool): - self.should_have = action == 'grant' - self.change_share = can_share - self.change_edit = can_edit - self.force = force - self.dry_run = dry_run - self.recursive = recursive - - if not self.change_share and not self.change_edit: - raise base.CommandError( - 'Please choose at least one of the following options: can-edit, can-share' - ) - - -class _PermissionProcessor: - """Handles processing of permission changes for records.""" - - def __init__(self, config: _PermissionConfig, context: KeeperParams): - self.config = config - self.context = context - self.vault = context.vault - - def process_direct_shares(self, folders): - """Process direct record shares and return commands to update.""" - updates = [] - skipped = [] - - record_uids = set() - for folder in folders: - if folder.records: - record_uids.update(folder.records) - - if not record_uids: - return updates, skipped - - shared_records = share_management_utils.get_record_shares(self.vault, list(record_uids)) - if not shared_records: - return updates, skipped - - for shared_record in shared_records: - shares = shared_record.get('shares', {}) - user_permissions = shares.get('user_permissions', []) - - for up in user_permissions: - if up.get('owner'): # Skip record owners - continue - - username = up.get('username') - if username == self.context.auth.auth_context.username: # Skip self - continue - - needs_update = self._needs_permission_update( - up, self.config.should_have, self.config.change_share, self.config.change_edit - ) - - if needs_update: - updates.append({ - 'record_uid': shared_record.get('record_uid'), - 'to_username': username, - 'editable': self.config.should_have if self.config.change_edit else up.get('editable'), - 'shareable': self.config.should_have if self.config.change_share else up.get('shareable'), - }) - - return updates, skipped - - def process_shared_folder_permissions(self, folders): - """Process shared folder record permissions and return commands to update.""" - updates = {} - skipped = {} - - share_admin_folders = self._get_share_admin_folders(folders) - - account_uid = self.context.auth.auth_context.account_uid - - for folder in folders: - if folder.folder_type not in ['shared_folder', 'shared_folder_folder']: - continue - - shared_folder_uid = self._get_shared_folder_uid(folder) - if not shared_folder_uid or shared_folder_uid not in self.vault.vault_data._shared_folders: - continue - - is_share_admin = shared_folder_uid in share_admin_folders - shared_folder = self.vault.vault_data.load_shared_folder(shared_folder_uid) - - has_manage_records = self._has_manage_records_permission( - shared_folder, shared_folder_uid, is_share_admin, account_uid - ) - - container = updates if (is_share_admin or has_manage_records) else skipped - - if shared_folder.record_permissions: - record_uids = folder.records if folder.records else set() - for rp in shared_folder.record_permissions: - record_uid = rp.record_uid - if record_uid in record_uids and record_uid not in container.get(shared_folder_uid, {}): - if self._needs_shared_folder_update(rp): - container.setdefault(shared_folder_uid, {}) - container[shared_folder_uid][record_uid] = self._build_update_command( - record_uid, shared_folder_uid - ) - - return self._clean_empty_dicts(updates), self._clean_empty_dicts(skipped) - - def _needs_permission_update(self, user_perm, should_have, change_share, change_edit): - """Check if user permission needs updating.""" - if change_edit and should_have != user_perm.get('editable'): - return True - if change_share and should_have != user_perm.get('shareable'): - return True - return False - - def _needs_shared_folder_update(self, record_permission): - """Check if shared folder record permission needs updating.""" - should_have = self.config.should_have - if self.config.change_edit and should_have != record_permission.can_edit: - return True - if self.config.change_share and should_have != record_permission.can_share: - return True - return False - - def _get_share_admin_folders(self, folders): - """Get set of shared folder UIDs where user is share admin.""" - share_admin_folders = set() - shared_folder_uids = set() - - for folder in folders: - shared_folder_uid = None - if folder.folder_type == 'shared_folder': - shared_folder_uid = folder.folder_uid - elif folder.folder_type == 'shared_folder_folder': - shared_folder_uid = folder.folder_scope_uid - - if shared_folder_uid and shared_folder_uid not in shared_folder_uids: - if shared_folder_uid in self.vault.vault_data._shared_folders: - shared_folder_uids.add(shared_folder_uid) - - if not shared_folder_uids: - return share_admin_folders - - try: - rq = record_pb2.AmIShareAdmin() - for shared_folder_uid in shared_folder_uids: - osa = record_pb2.IsObjectShareAdmin() - osa.uid = utils.base64_url_decode(shared_folder_uid) - osa.objectType = record_pb2.CHECK_SA_ON_SF - rq.isObjectShareAdmin.append(osa) - - rs = self.vault.keeper_auth.execute_auth_rest( - rest_endpoint='vault/am_i_share_admin', - request=rq, - response_type=record_pb2.AmIShareAdmin - ) - - for osa in rs.isObjectShareAdmin: - if osa.isAdmin: - share_admin_folders.add(utils.base64_url_encode(osa.uid)) - except Exception: - pass - - return share_admin_folders - - def _get_shared_folder_uid(self, folder): - """Get the shared folder UID from a folder object.""" - if folder.folder_type == 'shared_folder': - return folder.folder_uid - elif folder.folder_type == 'shared_folder_folder': - return folder.folder_scope_uid - return None - - def _has_manage_records_permission(self, shared_folder, shared_folder_uid, is_share_admin, account_uid): - """Check if user has permission to manage records in shared folder.""" - if is_share_admin: - return True - - if shared_folder.user_permissions: - if shared_folder.user_permissions[0].user_uid == account_uid: - return True - - user = next( - (x for x in shared_folder.user_permissions if x.name == self.context.auth.auth_context.username), - None - ) - if user and user.manage_records: - return True - - return False - - def _build_update_command(self, record_uid, shared_folder_uid): - """Build a protobuf command to update record permissions.""" - cmd = folder_pb2.SharedFolderUpdateRecord() - cmd.recordUid = utils.base64_url_decode(record_uid) - cmd.sharedFolderUid = utils.base64_url_decode(shared_folder_uid) - - cmd.canEdit = ( - folder_pb2.BOOLEAN_TRUE if self.config.should_have else folder_pb2.BOOLEAN_FALSE - ) if self.config.change_edit else folder_pb2.BOOLEAN_NO_CHANGE - - cmd.canShare = ( - folder_pb2.BOOLEAN_TRUE if self.config.should_have else folder_pb2.BOOLEAN_FALSE - ) if self.config.change_share else folder_pb2.BOOLEAN_NO_CHANGE - - return cmd - - @staticmethod - def _clean_empty_dicts(data): - """Remove empty dictionaries from nested structure.""" - cleaned = {} - for key, value in data.items(): - if isinstance(value, dict) and value: - cleaned[key] = value - return cleaned +def _report_skipped_shared_folders(vault, skipped_sf: dict) -> None: + """Print table of shared folder records skipped (insufficient permission).""" + table = [] + for shared_folder_uid in skipped_sf: + sf = vault.vault_data.get_shared_folder(shared_folder_uid=shared_folder_uid) + uid, name = shared_folder_uid, (sf.name[:32] if sf else '') + for record_uid in skipped_sf[shared_folder_uid]: + rec = vault.vault_data.get_record(record_uid=record_uid) + table.append([uid, name, record_uid, (rec.title[:32] if rec else '')]) + uid, name = '', '' + if table: + report_utils.dump_report_data( + table, + ['Shared Folder UID', 'Shared Folder Name', 'Record UID', 'Record Title'], + title='SKIP Shared Folder Record Share permission(s). Not permitted', + row_number=True, + ) + logger.info('\n') -class _PermissionReporter: - """Handles reporting of permission changes.""" - - def __init__(self, config: _PermissionConfig, context: KeeperParams): - self.config = config - self.context = context - self.vault = context.vault - - def report_direct_shares(self, updates, skipped): - """Report on direct share updates and skipped items.""" - if skipped and self.config.dry_run: - self._report_skipped_direct_shares(skipped) - - if updates and not self.config.force: - self._report_direct_share_updates(updates) - - def report_shared_folder_changes(self, updates, skipped): - """Report on shared folder updates and skipped items.""" - if skipped and self.config.dry_run: - self._report_skipped_shared_folder(skipped) - - if updates and not self.config.force: - self._report_shared_folder_updates(updates) - - def _report_skipped_direct_shares(self, skipped): - """Report records that couldn't be updated due to insufficient permissions.""" - table = [] - for cmd in skipped: - record_uid = utils.base64_url_encode(cmd['recordUid']) - record = self.vault.vault_data.get_record(record_uid=record_uid) - record_owner = record.flags.IsOwner - rec = self.vault.vault_data.get_record(record_uid=record_uid) - row = [record_uid, rec.title[:32], record_owner, cmd['to_username']] - table.append(row) - - headers = ['Record UID', 'Title', 'Owner', 'Email'] - title = 'SKIP Direct Record Share permission(s). Not permitted' - report_utils.dump_report_data(table, headers, title=title, row_number=True, group_by=0) - logger.info('\n') - - def _report_direct_share_updates(self, updates): - """Report direct share updates that will be made.""" - table = [] - for cmd in updates: - record_uid = cmd['record_uid'] - rec = self.vault.vault_data.get_record(record_uid=record_uid) - row = [record_uid, rec.title[:32], cmd['to_username']] - - if self.config.change_edit: - row.append('Y' if cmd['editable'] else 'N') - if self.config.change_share: - row.append('Y' if cmd['shareable'] else 'N') - +def _report_direct_share_plan( + vault, + direct_updates: list, + action_label: str, + change_edit: bool, + change_share: bool, +) -> None: + """Print table of planned direct record share updates.""" + table = [] + for cmd in direct_updates: + rec = vault.vault_data.get_record(record_uid=cmd['record_uid']) + row = [cmd['record_uid'], (rec.title[:32] if rec else ''), cmd['to_username']] + if change_edit: + row.append('Y' if cmd['editable'] else 'N') + if change_share: + row.append('Y' if cmd['shareable'] else 'N') + table.append(row) + headers = ['Record UID', 'Title', 'Email'] + if change_edit: + headers.append('Can Edit') + if change_share: + headers.append('Can Share') + report_utils.dump_report_data( + table, headers, + title=f'{action_label} Direct Record Share permission(s)', + row_number=True, + group_by=0, + ) + logger.info('\n') + + +def _report_shared_folder_plan( + vault, + sf_updates: dict, + action_label: str, + change_edit: bool, + change_share: bool, +) -> None: + """Print table of planned shared folder record updates.""" + table = [] + for shared_folder_uid in sf_updates: + sf = vault.vault_data.get_shared_folder(shared_folder_uid=shared_folder_uid) + uid, name = shared_folder_uid, (sf.name[:32] if sf else '') + for record_uid in sf_updates[shared_folder_uid]: + cmd = sf_updates[shared_folder_uid][record_uid] + rec = vault.vault_data.get_record(record_uid=record_uid) + row = [uid, name, record_uid, (rec.title[:32] if rec else '')] + if change_edit: + row.append('Y' if cmd.canEdit == folder_pb2.BOOLEAN_TRUE else 'N') + if change_share: + row.append('Y' if cmd.canShare == folder_pb2.BOOLEAN_TRUE else 'N') table.append(row) - - headers = ['Record UID', 'Title', 'Email'] - if self.config.change_edit: + uid, name = '', '' + if table: + headers = ['Shared Folder UID', 'Shared Folder Name', 'Record UID', 'Record Title'] + if change_edit: headers.append('Can Edit') - if self.config.change_share: + if change_share: headers.append('Can Share') - - action = 'GRANT' if self.config.should_have else 'REVOKE' - title = f'{action} Direct Record Share permission(s)' - report_utils.dump_report_data(table, headers, title=title, row_number=True, group_by=0) + report_utils.dump_report_data( + table, headers, + title=f'{action_label} Shared Folder Record Share permission(s)', + row_number=True, + ) logger.info('\n') - - def _report_skipped_shared_folder(self, skipped): - """Report shared folder records that couldn't be updated.""" - table = [] - for shared_folder_uid in skipped: - shared_folder = self.vault.vault_data.get_shared_folder(shared_folder_uid=shared_folder_uid) - uid = shared_folder_uid - name = shared_folder.name[:32] - - for record_uid in skipped[shared_folder_uid]: - record = self.vault.vault_data.get_record(record_uid=record_uid) - row = [uid, name, record_uid, record.title[:32]] - uid = '' - name = '' - table.append(row) - - if table: - headers = ['Shared Folder UID', 'Shared Folder Name', 'Record UID', 'Record Title'] - title = 'SKIP Shared Folder Record Share permission(s). Not permitted' - report_utils.dump_report_data(table, headers, title=title, row_number=True) - logger.info('\n') - - def _report_shared_folder_updates(self, updates): - """Report shared folder updates that will be made.""" - table = [] - for shared_folder_uid in updates: - commands = updates[shared_folder_uid] - shared_folder = self.vault.vault_data.get_shared_folder(shared_folder_uid=shared_folder_uid) - uid = shared_folder_uid - name = shared_folder.name[:32] - - for record_uid in commands: - cmd = commands[record_uid] - record = self.vault.vault_data.get_record(record_uid=record_uid) - row = [uid, name, record_uid, record.title[:32]] - - if self.config.change_edit: - edit_val = 'Y' if cmd.canEdit == folder_pb2.BOOLEAN_TRUE else 'N' - row.append(edit_val) - if self.config.change_share: - share_val = 'Y' if cmd.canShare == folder_pb2.BOOLEAN_TRUE else 'N' - row.append(share_val) - - table.append(row) - uid = '' - name = '' - - if table: - headers = ['Shared Folder UID', 'Shared Folder Name', 'Record UID', 'Record Title'] - if self.config.change_edit: - headers.append('Can Edit') - if self.config.change_share: - headers.append('Can Share') - - action = 'GRANT' if self.config.should_have else 'REVOKE' - title = f'{action} Shared Folder Record Share permission(s)' - report_utils.dump_report_data(table, headers, title=title, row_number=True) - logger.info('\n') -class _PermissionExecutor: - """Handles execution of permission changes.""" - - def __init__(self, config: _PermissionConfig, context: KeeperParams): - self.config = config - self.context = context - self.vault = context.vault - - def execute_direct_share_updates(self, updates): - """Execute direct share permission updates.""" - if not updates: - return [] - - errors = [] - batch_size = 900 - - while updates: - batch = updates[:batch_size] - updates = updates[batch_size:] - - rsu_rq = record_pb2.RecordShareUpdateRequest() - rsu_rq.updateSharedRecord.extend((self._to_share_record_proto(x) for x in batch)) - - rsu_rs = self.vault.keeper_auth.execute_auth_rest( - rest_endpoint='vault/records_share_update', - request=rsu_rq, - response_type=record_pb2.RecordShareUpdateResponse - ) - - for status in rsu_rs.updateSharedRecordStatus: - if status.status.lower() != 'success': - record_uid = utils.base64_url_encode(status.recordUid) - errors.append([record_uid, status.username, status.status.lower(), status.message]) - - return errors - - def execute_shared_folder_updates(self, updates): - """Execute shared folder permission updates.""" - if not updates: - return [] - - errors = [] - requests = self._build_shared_folder_requests(updates) - chunks = self._chunk_requests(requests) - - for chunk in chunks: - rqs = folder_pb2.SharedFolderUpdateV3RequestV2() - rqs.sharedFoldersUpdateV3.extend(chunk.values()) - - rss = self.vault.keeper_auth.execute_auth_rest( - rest_endpoint='vault/shared_folder_update_v3', - request=rqs, - response_type=folder_pb2.SharedFolderUpdateV3ResponseV2, - payload_version=1 - ) - - for rs in rss.sharedFoldersUpdateV3Response: - shared_folder_uid = utils.base64_url_encode(rs.sharedFolderUid) - for status in rs.sharedFolderUpdateRecordStatus: - if status.status != 'success': - record_uid = utils.base64_url_encode(status.recordUid) - errors.append([shared_folder_uid, record_uid, status.status]) - - return errors - - def _build_shared_folder_requests(self, updates): - """Build protobuf requests for shared folder updates.""" - requests = [] - - for shared_folder_uid in updates: - update_commands = list(updates[shared_folder_uid].values()) - batch_size = 490 - - while update_commands: - batch = update_commands[:batch_size] - update_commands = update_commands[batch_size:] - - rq = folder_pb2.SharedFolderUpdateV3Request() - rq.sharedFolderUid = utils.base64_url_decode(shared_folder_uid) - rq.forceUpdate = True - rq.sharedFolderUpdateRecord.extend(batch) - if batch: - rq.fromTeamUid = batch[0].teamUid - requests.append(rq) - - return requests - - def _chunk_requests(self, requests): - """Chunk requests to stay within size limits.""" - chunks = [] - current_chunk = {} - total_elements = 0 - - for rq in requests: - if rq.sharedFolderUid in current_chunk: - chunks.append(current_chunk) - current_chunk = {} - total_elements = 0 - - batch_size = len(rq.sharedFolderUpdateRecord) - if total_elements + batch_size > 500: - chunks.append(current_chunk) - current_chunk = {} - total_elements = 0 - - current_chunk[rq.sharedFolderUid] = rq - total_elements += batch_size - - if current_chunk: - chunks.append(current_chunk) - - return chunks - - def _to_share_record_proto(self, srd): - """Convert dictionary to SharedRecord protobuf.""" - srp = record_pb2.SharedRecord() - srp.toUsername = srd['to_username'] - srp.recordUid = utils.base64_url_decode(srd['record_uid']) - - if 'shared_folder_uid' in srd: - srp.sharedFolderUid = utils.base64_url_decode(srd['shared_folder_uid']) - if 'team_uid' in srd: - srp.teamUid = utils.base64_url_decode(srd['team_uid']) - if 'record_key' in srd: - srp.recordKey = utils.base64_url_decode(srd['record_key']) - if 'use_ecc_key' in srd: - srp.useEccKey = srd['use_ecc_key'] - if 'editable' in srd: - srp.editable = srd['editable'] - if 'shareable' in srd: - srp.shareable = srd['shareable'] - if 'transfer' in srd: - srp.transfer = srd['transfer'] - - return srp +def _report_record_permission_result( + vault, + result: dict, + *, + should_have: bool, + change_edit: bool, + change_share: bool, + force: bool, + dry_run: bool, +) -> None: + """Report planned updates and skipped items from SDK result. Shows plan when dry_run or not force.""" + direct = result.get('direct_share_updates') or [] + sf_updates = result.get('shared_folder_updates') or {} + skipped_sf = result.get('skipped_shared_folders') or {} + show_plan = dry_run or not force + action_label = 'GRANT' if should_have else 'REVOKE' + + if skipped_sf and dry_run: + _report_skipped_shared_folders(vault, skipped_sf) + if direct and show_plan: + _report_direct_share_plan(vault, direct, action_label, change_edit, change_share) + if sf_updates and show_plan: + _report_shared_folder_plan(vault, sf_updates, action_label, change_edit, change_share) + + +def _report_record_permission_errors(result: dict, action_label: str) -> None: + """Print error tables from apply result (direct share and shared folder).""" + direct_errors = result.get('direct_share_errors') or [] + sf_errors = result.get('shared_folder_errors') or [] + if direct_errors: + report_utils.dump_report_data( + direct_errors, + ['Record UID', 'Email', 'Error Code', 'Message'], + title=f'Failed to {action_label} Direct Record Share permission(s)', + row_number=True, + ) + logger.info('\n') + if sf_errors: + report_utils.dump_report_data( + sf_errors, + ['Shared Folder UID', 'Record UID', 'Error Code'], + title=f'Failed to {action_label} Shared Folder Record Share permission(s)', + ) + logger.info('\n') class RecordPermissionCommand(base.ArgparseCommand): @@ -1610,163 +1246,118 @@ def __init__(self): parser = argparse.ArgumentParser(prog='record-permission', description='Modify the permissions of a record') RecordPermissionCommand.add_arguments_to_parser(parser) super().__init__(parser) - + @staticmethod def add_arguments_to_parser(parser: argparse.ArgumentParser): parser.add_argument('--dry-run', dest='dry_run', action='store_true', - help='Display the permissions changes without committing them') + help='Display the permissions changes without committing them') parser.add_argument('--force', dest='force', action='store_true', - help='Apply permission changes without any confirmation') + help='Apply permission changes without any confirmation') parser.add_argument('-R', '--recursive', dest='recursive', action='store_true', - help='Apply permission changes to all sub-folders') + help='Apply permission changes to all sub-folders') parser.add_argument('--share-record', dest='share_record', action='store_true', - help='Change a records sharing permissions') + help='Change a records sharing permissions') parser.add_argument('--share-folder', dest='share_folder', action='store_true', - help='Change a folders sharing permissions') + help='Change a folders sharing permissions') parser.add_argument('-a', '--action', dest='action', action='store', choices=['grant', 'revoke'], - required=True, help='The action being taken') + required=True, help='The action being taken') parser.add_argument('-s', '--can-share', dest='can_share', action='store_true', - help='Set record permission: can be shared') + help='Set record permission: can be shared') parser.add_argument('-d', '--can-edit', dest='can_edit', action='store_true', - help='Set record permission: can be edited') + help='Set record permission: can be edited') parser.add_argument('folder', nargs='?', type=str, action='store', help='folder path or folder UID') parser.error = base.ArgparseCommand.raise_parse_exception parser.exit = base.ArgparseCommand.suppress_exit - - def _resolve_folder(self, context: KeeperParams, folder_name: str): - """Resolve folder from name or UID.""" - vault = context.vault - - if not folder_name: - return vault.vault_data.root_folder - - if folder_name in vault.vault_data._folders: - return vault.vault_data.get_folder(folder_name) - - folder, path = folder_utils.try_resolve_path(context, folder_name) - if len(path) == 0: - return folder - - raise base.CommandError(f'Folder {folder_name} not found') - - def _get_folders_to_process(self, start_folder, recursive): - """Get list of folders to process, optionally recursively.""" - folders = [start_folder] - - if not recursive: - return folders - - visited = {start_folder.folder_uid} - pos = 0 - - while pos < len(folders): - folder = folders[pos] - if folder.subfolders: - for subfolder_uid in folder.subfolders: - if subfolder_uid not in visited: - subfolder = self.vault.vault_data.get_folder(subfolder_uid) - if subfolder: - folders.append(subfolder) - visited.add(subfolder_uid) - pos += 1 - - logger.debug('Folder count: %s', len(folders)) - return folders - + def _determine_scope(self, kwargs): - """Determine if processing share_record, share_folder, or both.""" + """Return (share_record, share_folder); default both True if neither set.""" share_record = kwargs.get('share_record', False) share_folder = kwargs.get('share_folder', False) - - if not share_record and not share_folder: - return True, True - - return share_record, share_folder - - def _log_permission_request(self, folder, config): - """Log the permission change request.""" - if config.force: + return (True, True) if (not share_record and not share_folder) else (share_record, share_folder) + + def _resolve_folder_display_name(self, vault, folder_arg: str) -> str: + """Resolve folder by UID/path; return display name. Raises CommandError if not found.""" + if not folder_arg: + return 'Root' + folder, remaining = share_management_utils.try_resolve_path(vault, folder_arg) + if remaining: + raise base.CommandError(f'Folder "{folder_arg}" not found') + return folder.name or folder_arg + + def _log_permission_request(self, folder_name: str, action_label: str, recursive: bool, + change_edit: bool, change_share: bool, force: bool) -> None: + if force: return - - action = 'GRANT' if config.should_have else 'REVOKE' - scope = ['recursively' if config.recursive else 'only'] - - permissions = [] - if config.change_edit: - permissions.append('"Can Edit"') - if config.change_share: - permissions.append('"Can Share"') - - permission_str = ' & '.join(permissions) - logger.info( - f'\nRequest to {action} {permission_str} permission(s) in "{folder.name}" folder {scope[0]}' - ) - + scope = 'recursively' if recursive else 'only' + parts = [] + if change_edit: + parts.append('"Can Edit"') + if change_share: + parts.append('"Can Share"') + logger.info(f'\nRequest to {action_label} {" & ".join(parts)} permission(s) in "{folder_name}" folder {scope}') + def execute(self, context: KeeperParams, **kwargs): - """Execute record permission changes.""" + """Execute record permission changes using SDK update_record_permissions.""" if not context.vault: raise base.CommandError('Vault is not initialized') - - self.vault = context.vault - - config = _PermissionConfig( - action=kwargs.get('action', ''), - can_share=kwargs.get('can_share', False), - can_edit=kwargs.get('can_edit', False), - force=kwargs.get('force', False), - dry_run=kwargs.get('dry_run', False), - recursive=kwargs.get('recursive', False) - ) - - folder = self._resolve_folder(context, kwargs.get('folder', '')) - folders = self._get_folders_to_process(folder, config.recursive) - + vault = context.vault + + action = kwargs.get('action', '') + can_share = kwargs.get('can_share', False) + can_edit = kwargs.get('can_edit', False) + force = kwargs.get('force', False) + dry_run = kwargs.get('dry_run', False) + recursive = kwargs.get('recursive', False) share_record, share_folder = self._determine_scope(kwargs) - - self._log_permission_request(folder, config) - - processor = _PermissionProcessor(config, context) - reporter = _PermissionReporter(config, context) - executor = _PermissionExecutor(config, context) - - direct_share_updates = [] - direct_share_skipped = [] - shared_folder_updates = {} - shared_folder_skipped = {} - - if share_record: - direct_share_updates, direct_share_skipped = processor.process_direct_shares(folders) - - if share_folder: - shared_folder_updates, shared_folder_skipped = processor.process_shared_folder_permissions(folders) - - reporter.report_direct_shares(direct_share_updates, direct_share_skipped) - reporter.report_shared_folder_changes(shared_folder_updates, shared_folder_skipped) - - if not config.dry_run and (direct_share_updates or shared_folder_updates): - if not config.force: - answer = prompt_utils.user_choice( - "Do you want to proceed with these permission changes?", 'yn', 'n' - ) - if answer.lower() != 'y': - return - - if direct_share_updates: - direct_errors = executor.execute_direct_share_updates(direct_share_updates) - if direct_errors: - headers = ['Record UID', 'Email', 'Error Code', 'Message'] - action = 'GRANT' if config.should_have else 'REVOKE' - title = f'Failed to {action} Direct Record Share permission(s)' - report_utils.dump_report_data(direct_errors, headers, title=title, row_number=True) - logger.info('\n') - - if shared_folder_updates: - shared_folder_errors = executor.execute_shared_folder_updates(shared_folder_updates) - if shared_folder_errors: - headers = ['Shared Folder UID', 'Record UID', 'Error Code'] - action = 'GRANT' if config.should_have else 'REVOKE' - title = f'Failed to {action} Shared Folder Record Share permission(s)' - report_utils.dump_report_data(shared_folder_errors, headers, title=title) - logger.info('\n') - - self.vault.sync_down(True) + folder_arg = (kwargs.get('folder') or '').strip() + + if not can_share and not can_edit: + raise base.CommandError('Please choose at least one of the following options: can-edit, can-share') + + folder_name = self._resolve_folder_display_name(vault, folder_arg) + action_label = 'GRANT' if action == 'grant' else 'REVOKE' + self._log_permission_request(folder_name, action_label, recursive, can_edit, can_share, force) + + sdk_kw = { + 'can_share': can_share, + 'can_edit': can_edit, + 'folder_uid_or_path': folder_arg or None, + 'recursive': recursive, + 'share_record': share_record, + 'share_folder': share_folder, + } + + try: + result = share_management_utils.update_record_permissions( + vault, action, dry_run=True, sync_after=False, **sdk_kw + ) + except share_management_utils.ShareValidationError as e: + raise base.CommandError(str(e)) + except share_management_utils.ShareNotFoundError as e: + raise base.CommandError(str(e)) + + should_have = action == 'grant' + _report_record_permission_result( + vault, result, + should_have=should_have, + change_edit=can_edit, + change_share=can_share, + force=force, + dry_run=dry_run, + ) + + has_updates = result.get('direct_share_updates') or result.get('shared_folder_updates') + if dry_run or not has_updates: + return + + if not force: + answer = prompt_utils.user_choice( + 'Do you want to proceed with these permission changes?', 'yn', 'n' + ) + if answer.lower() != 'y': + return + + result = share_management_utils.update_record_permissions( + vault, action, dry_run=False, sync_after=True, **sdk_kw + ) + _report_record_permission_errors(result, action_label) diff --git a/keepersdk-package/src/keepersdk/vault/share_management_utils.py b/keepersdk-package/src/keepersdk/vault/share_management_utils.py index 5b10690..f30ccbf 100644 --- a/keepersdk-package/src/keepersdk/vault/share_management_utils.py +++ b/keepersdk-package/src/keepersdk/vault/share_management_utils.py @@ -5,7 +5,7 @@ from typing import Optional, Dict, List, Any, Generator, Iterable, Set, Tuple, Union from .. import crypto, utils -from ..proto import enterprise_pb2, record_pb2 +from ..proto import enterprise_pb2, folder_pb2, record_pb2 from ..vault import vault_online, vault_record, vault_types, vault_utils from ..enterprise import enterprise_data @@ -956,4 +956,412 @@ def parse_timeout(timeout_input: str) -> datetime.timedelta: return datetime.timedelta(**{TIMEOUT_DEFAULT_UNIT: int(timeout_input)}) tdelta_kwargs = _parse_timeout_units(timeout_input) - return datetime.timedelta(**tdelta_kwargs) \ No newline at end of file + return datetime.timedelta(**tdelta_kwargs) + + +_SHARED_FOLDER_TYPES: Tuple[str, ...] = ('shared_folder', 'shared_folder_folder') +_AM_I_SHARE_ADMIN_ENDPOINT = 'vault/am_i_share_admin' +_RECORDS_SHARE_UPDATE_ENDPOINT = 'vault/records_share_update' +_SHARED_FOLDER_UPDATE_V3_ENDPOINT = 'vault/shared_folder_update_v3' +_DIRECT_SHARE_BATCH_SIZE = 900 +_SHARED_FOLDER_RECORD_BATCH_SIZE = 490 +_SHARED_FOLDER_CHUNK_ELEMENT_LIMIT = 500 + + +def _resolve_folder_for_permission( + vault: vault_online.VaultOnline, + folder_uid_or_path: Optional[str] +) -> vault_types.Folder: + """Resolve folder from UID or path. Returns root folder if folder_uid_or_path is None or empty.""" + if not folder_uid_or_path or not folder_uid_or_path.strip(): + return vault.vault_data.root_folder + name = folder_uid_or_path.strip() + if name in vault.vault_data._folders: + folder = vault.vault_data.get_folder(name) + if folder: + return folder + folder, remaining = try_resolve_path(vault, name) + if remaining: + raise ShareNotFoundError(f'Folder "{folder_uid_or_path}" not found') + return folder + + +def _get_folders_to_process( + vault: vault_online.VaultOnline, + start_folder: vault_types.Folder, + recursive: bool +) -> List[vault_types.Folder]: + """Return list of folders to process, optionally including all subfolders.""" + folders = [start_folder] + if not recursive: + return folders + visited: Set[str] = {start_folder.folder_uid} + pos = 0 + while pos < len(folders): + folder = folders[pos] + if folder.subfolders: + for sub_uid in folder.subfolders: + if sub_uid not in visited: + sub = vault.vault_data.get_folder(sub_uid) + if sub: + folders.append(sub) + visited.add(sub_uid) + pos += 1 + return folders + + +def _get_share_admin_folders( + vault: vault_online.VaultOnline, + folders: List[vault_types.Folder] +) -> Set[str]: + """Return set of shared folder UIDs where the current user is share admin.""" + shared_folder_uids: Set[str] = set() + for folder in folders: + uid = None + if folder.folder_type == 'shared_folder': + uid = folder.folder_uid + elif folder.folder_type == 'shared_folder_folder': + uid = folder.folder_scope_uid + if uid and uid not in shared_folder_uids and uid in vault.vault_data._shared_folders: + shared_folder_uids.add(uid) + if not shared_folder_uids: + return set() + try: + rq = record_pb2.AmIShareAdmin() + for sf_uid in shared_folder_uids: + osa = record_pb2.IsObjectShareAdmin() + osa.uid = utils.base64_url_decode(sf_uid) + osa.objectType = record_pb2.CHECK_SA_ON_SF + rq.isObjectShareAdmin.append(osa) + rs = vault.keeper_auth.execute_auth_rest( + rest_endpoint=_AM_I_SHARE_ADMIN_ENDPOINT, + request=rq, + response_type=record_pb2.AmIShareAdmin + ) + return {utils.base64_url_encode(osa.uid) for osa in rs.isObjectShareAdmin if osa.isAdmin} + except Exception: + return set() + + +def _get_shared_folder_uid(folder: vault_types.Folder) -> Optional[str]: + """Get shared folder UID from a folder (for shared_folder or shared_folder_folder).""" + if folder.folder_type == 'shared_folder': + return folder.folder_uid + if folder.folder_type == 'shared_folder_folder': + return folder.folder_scope_uid + return None + + +def _has_manage_records_permission( + vault: vault_online.VaultOnline, + shared_folder: vault_types.SharedFolder, + shared_folder_uid: str, + is_share_admin: bool +) -> bool: + """Return True if current user can manage records in this shared folder.""" + if is_share_admin: + return True + account_uid = utils.base64_url_encode(vault.keeper_auth.auth_context.account_uid) + username = vault.keeper_auth.auth_context.username + if shared_folder.user_permissions: + if shared_folder.user_permissions[0].user_uid == account_uid: + return True + user = next( + (u for u in shared_folder.user_permissions if u.name == username), + None + ) + if user and user.manage_records: + return True + return False + + +def _needs_shared_folder_record_update( + rp: vault_types.SharedFolderRecord, + should_have: bool, + change_edit: bool, + change_share: bool +) -> bool: + """Return True if this shared folder record permission should be updated.""" + if change_edit and (should_have != rp.can_edit): + return True + if change_share and (should_have != rp.can_share): + return True + return False + + +def _build_shared_folder_record_update( + record_uid: str, + shared_folder_uid: str, + should_have: bool, + change_edit: bool, + change_share: bool +) -> Any: + """Build SharedFolderUpdateRecord protobuf for one record in a shared folder.""" + cmd = folder_pb2.SharedFolderUpdateRecord() + cmd.recordUid = utils.base64_url_decode(record_uid) + cmd.sharedFolderUid = utils.base64_url_decode(shared_folder_uid) + cmd.canEdit = ( + folder_pb2.BOOLEAN_TRUE if should_have else folder_pb2.BOOLEAN_FALSE + ) if change_edit else folder_pb2.BOOLEAN_NO_CHANGE + cmd.canShare = ( + folder_pb2.BOOLEAN_TRUE if should_have else folder_pb2.BOOLEAN_FALSE + ) if change_share else folder_pb2.BOOLEAN_NO_CHANGE + return cmd + + +def _process_direct_share_updates( + vault: vault_online.VaultOnline, + folders: List[vault_types.Folder], + should_have: bool, + change_edit: bool, + change_share: bool +) -> List[Dict[str, Any]]: + """Collect direct record-share permission updates (record shared to users).""" + record_uids: Set[str] = set() + for folder in folders: + if folder.records: + record_uids.update(folder.records) + if not record_uids: + return [] + shared_records = get_record_shares(vault, list(record_uids)) + if not shared_records: + return [] + current_username = vault.keeper_auth.auth_context.username + updates: List[Dict[str, Any]] = [] + for sr in shared_records: + shares = sr.get('shares', {}) + user_permissions = shares.get('user_permissions', []) + for up in user_permissions: + if up.get('owner'): + continue + username = up.get('username') + if username == current_username: + continue + needs = (change_edit and (should_have != up.get('editable'))) or ( + change_share and (should_have != up.get('shareable')) + ) + if needs: + updates.append({ + 'record_uid': sr.get('record_uid'), + 'to_username': username, + 'editable': should_have if change_edit else up.get('editable'), + 'shareable': should_have if change_share else up.get('shareable'), + }) + return updates + + +def _process_shared_folder_permission_updates( + vault: vault_online.VaultOnline, + folders: List[vault_types.Folder], + should_have: bool, + change_edit: bool, + change_share: bool +) -> Tuple[Dict[str, Dict[str, Any]], Dict[str, Dict[str, Any]]]: + """Collect shared-folder record permission updates and skipped (no permission).""" + share_admin = _get_share_admin_folders(vault, folders) + account_uid = utils.base64_url_encode(vault.keeper_auth.auth_context.account_uid) + updates: Dict[str, Dict[str, Any]] = {} + skipped: Dict[str, Dict[str, Any]] = {} + for folder in folders: + if folder.folder_type not in _SHARED_FOLDER_TYPES: + continue + shared_folder_uid = _get_shared_folder_uid(folder) + if not shared_folder_uid or shared_folder_uid not in vault.vault_data._shared_folders: + continue + is_share_admin = shared_folder_uid in share_admin + shared_folder = vault.vault_data.load_shared_folder(shared_folder_uid) + if not shared_folder: + continue + has_manage = _has_manage_records_permission( + vault, shared_folder, shared_folder_uid, is_share_admin + ) + container = updates if (is_share_admin or has_manage) else skipped + if not shared_folder.record_permissions: + continue + record_uids = folder.records if folder.records else set() + for rp in shared_folder.record_permissions: + record_uid = rp.record_uid + if record_uid not in record_uids: + continue + if record_uid in container.get(shared_folder_uid, {}): + continue + if _needs_shared_folder_record_update(rp, should_have, change_edit, change_share): + container.setdefault(shared_folder_uid, {}) + container[shared_folder_uid][record_uid] = _build_shared_folder_record_update( + record_uid, shared_folder_uid, should_have, change_edit, change_share + ) + # drop empty dicts + updates = {k: v for k, v in updates.items() if v} + skipped = {k: v for k, v in skipped.items() if v} + return updates, skipped + + +def _to_shared_record_proto(item: Dict[str, Any]) -> Any: + """Build SharedRecord protobuf for records_share_update.""" + sr = record_pb2.SharedRecord() + sr.toUsername = item['to_username'] + sr.recordUid = utils.base64_url_decode(item['record_uid']) + if 'editable' in item: + sr.editable = item['editable'] + if 'shareable' in item: + sr.shareable = item['shareable'] + return sr + + +def _execute_direct_share_updates( + vault: vault_online.VaultOnline, + updates: List[Dict[str, Any]] +) -> List[List[Any]]: + """Apply direct record share permission updates. Returns list of error rows [record_uid, username, status, message].""" + errors: List[List[Any]] = [] + while updates: + batch = updates[:_DIRECT_SHARE_BATCH_SIZE] + updates = updates[_DIRECT_SHARE_BATCH_SIZE:] + rq = record_pb2.RecordShareUpdateRequest() + rq.updateSharedRecord.extend(_to_shared_record_proto(x) for x in batch) + rs = vault.keeper_auth.execute_auth_rest( + rest_endpoint=_RECORDS_SHARE_UPDATE_ENDPOINT, + request=rq, + response_type=record_pb2.RecordShareUpdateResponse + ) + for status in rs.updateSharedRecordStatus: + if status.status.lower() != 'success': + errors.append([ + utils.base64_url_encode(status.recordUid), + status.username, + status.status.lower(), + status.message + ]) + return errors + + +def _execute_shared_folder_updates( + vault: vault_online.VaultOnline, + updates: Dict[str, Dict[str, Any]] +) -> List[List[Any]]: + """Apply shared folder record permission updates. Returns list of error rows [shared_folder_uid, record_uid, status].""" + errors: List[List[Any]] = [] + requests: List[Any] = [] + for shared_folder_uid in updates: + commands = list(updates[shared_folder_uid].values()) + while commands: + batch = commands[:_SHARED_FOLDER_RECORD_BATCH_SIZE] + commands = commands[_SHARED_FOLDER_RECORD_BATCH_SIZE:] + rq = folder_pb2.SharedFolderUpdateV3Request() + rq.sharedFolderUid = utils.base64_url_decode(shared_folder_uid) + rq.forceUpdate = True + rq.sharedFolderUpdateRecord.extend(batch) + if batch: + rq.fromTeamUid = batch[0].teamUid + requests.append(rq) + # Chunk for API size limits + chunks: List[Dict[bytes, Any]] = [] + current: Dict[bytes, Any] = {} + total = 0 + for rq in requests: + if rq.sharedFolderUid in current: + chunks.append(current) + current = {} + total = 0 + n = len(rq.sharedFolderUpdateRecord) + if total + n > _SHARED_FOLDER_CHUNK_ELEMENT_LIMIT: + chunks.append(current) + current = {} + total = 0 + current[rq.sharedFolderUid] = rq + total += n + if current: + chunks.append(current) + for chunk in chunks: + rqs = folder_pb2.SharedFolderUpdateV3RequestV2() + rqs.sharedFoldersUpdateV3.extend(chunk.values()) + rss = vault.keeper_auth.execute_auth_rest( + rest_endpoint=_SHARED_FOLDER_UPDATE_V3_ENDPOINT, + request=rqs, + response_type=folder_pb2.SharedFolderUpdateV3ResponseV2, + payload_version=1 + ) + for rs in rss.sharedFoldersUpdateV3Response: + sf_uid = utils.base64_url_encode(rs.sharedFolderUid) + for status in rs.sharedFolderUpdateRecordStatus: + if status.status != 'success': + errors.append([sf_uid, utils.base64_url_encode(status.recordUid), status.status]) + return errors + + +def update_record_permissions( + vault: vault_online.VaultOnline, + action: str, + can_share: bool = False, + can_edit: bool = False, + *, + folder_uid_or_path: Optional[str] = None, + recursive: bool = False, + share_record: bool = True, + share_folder: bool = True, + dry_run: bool = False, + sync_after: bool = True +) -> Dict[str, Any]: + """Update record permissions (can_edit / can_share) in a folder and optionally its subfolders. + + Args: + vault: Connected vault. + action: ``'grant'`` or ``'revoke'``. + can_share: Whether to change the "can share" permission. + can_edit: Whether to change the "can edit" permission. + folder_uid_or_path: Folder UID or path; if None or empty, uses root. + recursive: If True, include all subfolders. + share_record: If True, update direct record shares (record shared to users). + share_folder: If True, update shared folder record permissions. + dry_run: If True, do not apply changes; only compute and return planned updates. + sync_after: If True and changes were applied, sync vault down after updates. + + Returns: + Dict with keys: + - ``direct_share_updates``: list of direct-share updates (each a dict with + record_uid, to_username, editable, shareable). + - ``shared_folder_updates``: dict shared_folder_uid -> { record_uid -> update_cmd }. + - ``direct_share_errors``: list of error rows for direct share API (if not dry_run). + - ``shared_folder_errors``: list of error rows for shared folder API (if not dry_run). + - ``skipped_shared_folders``: shared folder UIDs skipped due to insufficient permissions. + + Raises: + ShareValidationError: If neither can_share nor can_edit is True, or action is invalid. + ShareNotFoundError: If folder_uid_or_path is not found. + """ + if action not in ('grant', 'revoke'): + raise ShareValidationError(f'Invalid action: {action!r}; use "grant" or "revoke"') + if not can_share and not can_edit: + raise ShareValidationError('Specify at least one of can_share or can_edit') + should_have = action == 'grant' + folder = _resolve_folder_for_permission(vault, folder_uid_or_path) + folders = _get_folders_to_process(vault, folder, recursive) + direct_share_updates: List[Dict[str, Any]] = [] + shared_folder_updates: Dict[str, Dict[str, Any]] = {} + skipped_shared_folders: Dict[str, Dict[str, Any]] = {} + if share_record: + direct_share_updates = _process_direct_share_updates( + vault, folders, should_have, can_edit, can_share + ) + if share_folder: + shared_folder_updates, skipped_shared_folders = _process_shared_folder_permission_updates( + vault, folders, should_have, can_edit, can_share + ) + result: Dict[str, Any] = { + 'direct_share_updates': direct_share_updates, + 'shared_folder_updates': shared_folder_updates, + 'direct_share_errors': [], + 'shared_folder_errors': [], + 'skipped_shared_folders': skipped_shared_folders, + } + if dry_run: + return result + if direct_share_updates: + result['direct_share_errors'] = _execute_direct_share_updates(vault, direct_share_updates) + if shared_folder_updates: + result['shared_folder_errors'] = _execute_shared_folder_updates(vault, shared_folder_updates) + if sync_after and (direct_share_updates or shared_folder_updates): + vault.sync_down(True) + return result + +