Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 131 additions & 21 deletions keepercli-package/src/keepercli/commands/pedm_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,24 @@ def resolve_existing_policies(pedm: admin_plugin.PedmPlugin, policy_names: Any)
found_policies: Dict[str, admin_types.PedmPolicy] = {}
p: Optional[admin_types.PedmPolicy]
if isinstance(policy_names, list):
policy_name_lookup = PedmUtils.get_policy_name_lookup(pedm)
for policy_name in policy_names:
p = pedm.policies.get_entity(policy_name)
if p is None:
raise base.CommandError(f'Policy name "{policy_name}" is not found')
found_policies[p.policy_uid] = p
name_lower = (policy_name.strip().lower() if isinstance(policy_name, str) else '')
match = policy_name_lookup.get(name_lower)
if match is None:
raise base.CommandError(f'Policy name "{policy_name}" is not found')
if isinstance(match, list):
if len(match) > 1:
raise base.CommandError(
f'Policy name "{policy_name}" is not unique. Please use Policy UID'
)
p = match[0]
else:
p = match
if p is not None:
found_policies[p.policy_uid] = p
if len(found_policies) == 0:
raise base.CommandError('No policies were found')
return list(found_policies.values())
Expand All @@ -76,7 +89,17 @@ def resolve_single_policy(pedm: admin_plugin.PedmPlugin, policy_uid: Any) -> adm

if isinstance(policy, admin_types.PedmPolicy):
return policy
raise base.CommandError(f'Policy UID \"{policy_uid}\" does not exist')
name_lower = policy_uid.strip().lower()
match = PedmUtils.get_policy_name_lookup(pedm).get(name_lower)
if match is not None:
if isinstance(match, list):
if len(match) > 1:
raise base.CommandError(
f'Policy name "{policy_uid}" is not unique. Please use Policy UID'
)
return match[0]
return match
raise base.CommandError(f'Policy "{policy_uid}" is not found')

@staticmethod
def get_collection_name_lookup(
Expand Down Expand Up @@ -108,6 +131,27 @@ def get_orphan_resources(pedm: admin_plugin.PedmPlugin) -> List[str]:
links = {x.collection_uid for x in pedm.storage.collection_links.get_all_links() if x.link_type == pedm_pb2.CollectionLinkType.CLT_AGENT}
return list(collections.difference(links))

@staticmethod
def get_policy_name_lookup(
pedm: admin_plugin.PedmPlugin
) -> Dict[str, Union[admin_types.PedmPolicy, List[admin_types.PedmPolicy]]]:
policy_lookup: Dict[str, Union[admin_types.PedmPolicy, List[admin_types.PedmPolicy]]] = {}
for policy in pedm.policies.get_all_entities():
if not isinstance(policy.data, dict):
continue
name = policy.data.get('PolicyName')
if not isinstance(name, str) or not name.strip():
continue
name_lower = name.strip().lower()
existing = policy_lookup.get(name_lower)
if existing is None:
policy_lookup[name_lower] = policy
elif isinstance(existing, list):
existing.append(policy)
else:
policy_lookup[name_lower] = [existing, policy]
return policy_lookup

@staticmethod
def resolve_existing_collections(
pedm: admin_plugin.PedmPlugin,
Expand Down Expand Up @@ -478,8 +522,8 @@ def execute(self, context: KeeperParams, **kwargs) -> Any:

class PedmAgentDeleteCommand(base.ArgparseCommand):
def __init__(self):
parser = argparse.ArgumentParser(prog='update', description='Delete PEDM agents')
parser.add_argument('--force', dest='force', action='store_true',
parser = argparse.ArgumentParser(prog='delete', description='Delete PEDM agents')
parser.add_argument('-f', '--force', dest='force', action='store_true',
help='do not prompt for confirmation')
parser.add_argument('agent', nargs='+', help='Agent UID(s)')

Expand All @@ -493,20 +537,36 @@ def execute(self, context: KeeperParams, **kwargs) -> Any:
if isinstance(agents, str):
agents = [agents]
agent_uid_list: List[str] = []
agent_names: List[str] = []
if isinstance(agents, list):
for agent_name in agents:
agent = PedmUtils.resolve_single_agent(plugin, agent_name)
agent_uid_list.append(agent.agent_uid)
name = (agent.properties or {}).get('MachineName') or agent.agent_uid
agent_names.append(name)

if len(agent_uid_list) == 0:
return

statuses = plugin.modify_agents( remove_agents=agent_uid_list)
force = kwargs.get('force') is True
if not force:
prompt_msg = f'Do you want to delete {len(agent_uid_list)} agent(s)?'
if len(agent_names) <= 3:
prompt_msg += f' ({", ".join(agent_names)})'
answer = prompt_utils.user_choice(prompt_msg, 'yN')
if answer.lower() not in {'y', 'yes'}:
return

statuses = plugin.modify_agents(remove_agents=agent_uid_list)
if isinstance(statuses.remove, list):
for status in statuses.remove:
if isinstance(status, admin_types.EntityStatus) and not status.success:
utils.get_logger().warning(f'Failed to remove agent "{status.entity_uid}": {status.message}')

if len(agent_names) == 1:
return f'Successfully deleted agent: {agent_names[0]}'
return 'Successfully deleted agents: ' + ', '.join(agent_names)


class PedmAgentEditCommand(base.ArgparseCommand):
def __init__(self):
Expand All @@ -523,13 +583,10 @@ def execute(self, context: KeeperParams, **kwargs) -> Any:
base.require_enterprise_admin(context)
plugin = context.pedm_plugin

deployment_uid = kwargs.get('deployment')
if deployment_uid:
deployment = plugin.deployments.get_entity(deployment_uid)
if not deployment:
raise base.CommandError(f'Deployment "{deployment_uid}" does not exist')
else:
deployment_uid = None
deployment_uid = None
if kwargs.get('deployment'):
deployment = PedmUtils.resolve_single_deployment(plugin, kwargs.get('deployment'))
deployment_uid = deployment.deployment_uid

disabled: Optional[bool] = None
enable = kwargs.get('enable')
Expand All @@ -542,6 +599,7 @@ def execute(self, context: KeeperParams, **kwargs) -> Any:
raise base.CommandError(f'"enable" argument must be "on" or "off"')

update_agents: List[admin_types.UpdateAgent] = []
agent_names: List[str] = []
agents = kwargs['agent']
if isinstance(agents, str):
agents = [agents]
Expand All @@ -555,12 +613,17 @@ def execute(self, context: KeeperParams, **kwargs) -> Any:
deployment_uid=deployment_uid,
disabled=disabled,
))
name = (agent.properties or {}).get('MachineName') or agent.agent_uid
agent_names.append(name)
if len(update_agents) > 0:
statuses = plugin.modify_agents(update_agents=update_agents)
if isinstance(statuses.update, list):
for status in statuses.update:
if isinstance(status, admin_types.EntityStatus) and not status.success:
utils.get_logger().warning(f'Failed to update agent "{status.entity_uid}": {status.message}')
if len(agent_names) == 1:
return f'Successfully updated agent: {agent_names[0]}'
return 'Successfully updated agents: ' + ', '.join(agent_names)


class PedmAgentListCommand(base.ArgparseCommand):
Expand Down Expand Up @@ -1125,23 +1188,65 @@ def execute(self, context: KeeperParams, **kwargs) -> Any:
plugin = context.pedm_plugin

policy = PedmUtils.resolve_single_policy(plugin, kwargs.get('policy'))

body = json.dumps(policy.data, indent=4)
filename = kwargs.get('output')
if kwargs.get('format') == 'json' and filename:
with open(filename, 'w') as f:
f.write(body)
else:
data = policy.data or {}

if kwargs.get('format') == 'json':
body = json.dumps(data, indent=4)
filename = kwargs.get('output')
if filename:
with open(filename, 'w') as f:
f.write(body)
return body

# Table format: same columns as policy list - single row summary
def _cell_str(v: Any) -> str:
if v is None:
return ''
if isinstance(v, list):
return ', '.join(str(x) for x in v) if v else ''
return str(v)

all_agents = utils.base64_url_encode(plugin.all_agents)
actions = data.get('Actions') or {}
on_success = actions.get('OnSuccess') or {}
controls = on_success.get('Controls') or []
status = data.get('Status') or ''
if policy.disabled:
status = 'off'

collections = [
x.collection_uid for x in plugin.storage.collection_links.get_links_by_object(policy.policy_uid)
]
collections = ['*' if x == all_agents else x for x in collections]
collections.sort()

headers = ['policy_uid', 'policy_name', 'policy_type', 'status', 'controls', 'users', 'machines', 'applications', 'collections']
row = [
policy.policy_uid,
data.get('PolicyName') or '',
data.get('PolicyType') or '',
status,
_cell_str(controls),
_cell_str(data.get('UserCheck')),
_cell_str(data.get('MachineCheck')),
_cell_str(data.get('ApplicationCheck')),
_cell_str(collections),
]
fmt = kwargs.get('format', 'table')
if fmt != 'json':
headers = [report_utils.field_to_title(x) for x in headers]
return report_utils.dump_report_data(
[row], headers, fmt=fmt, filename=kwargs.get('output')
)


class PedmPolicyDeleteCommand(base.ArgparseCommand):
def __init__(self):
parser = argparse.ArgumentParser(prog='delete', description='Delete PEDM policy')
parser.add_argument('policy', type=str, nargs='+', help='Policy UID or name')
super().__init__(parser)

def execute(self, context: KeeperParams, **kwargs) -> None:
def execute(self, context: KeeperParams, **kwargs) -> Any:
plugin = context.pedm_plugin

policies = PedmUtils.resolve_existing_policies(plugin, kwargs.get('policy'))
Expand All @@ -1153,6 +1258,11 @@ def execute(self, context: KeeperParams, **kwargs) -> None:
if isinstance(status, admin_types.EntityStatus) and not status.success:
raise base.CommandError(f'Failed to delete policy "{status.entity_uid}": {status.message}')

names = [(p.data or {}).get('PolicyName') or p.policy_uid for p in policies]
if len(names) == 1:
return f'Successfully deleted policy: {names[0]}'
return '\n'.join(f'Successfully deleted policy: {name}' for name in names)


class PedmPolicyAgentsCommand(base.ArgparseCommand):
def __init__(self):
Expand Down