Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 61 additions & 2 deletions keepercommander/commands/security_audit.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import argparse
import base64
import datetime
import json
import logging
import os
from json import JSONDecodeError
from typing import Dict, List, Optional, Any

Expand Down Expand Up @@ -30,6 +32,8 @@ def register_command_info(aliases, command_info):

report_parser = argparse.ArgumentParser(prog='security-audit-report', description='Run a security audit report.',
parents=[report_output_parser])
report_parser.add_argument('--siem', dest='siem', action='store_true',
help='output in SIEM-ready NDJSON format (one event per line)')
report_parser.add_argument('--syntax-help', dest='syntax_help', action='store_true', help='display help')
node_filter_help = 'name(s) or UID(s) of node(s) to filter results of the report by'
report_parser.add_argument('-n', '--node', action='append', help=node_filter_help)
Expand Down Expand Up @@ -492,12 +496,17 @@ def decrypt_security_data(sec_data, key_type):
fields = ('email', 'name', 'sync_pending', 'at_risk', 'passed', 'ignored') if show_breachwatch else \
('email', 'name', 'sync_pending', 'weak', 'fair', 'medium', 'strong', 'reused', 'unique', 'securityScore',
'twoFactorChannel', 'node')
field_descriptions = fields

report_title = f'Security Audit Report{" (BreachWatch)" if show_breachwatch else ""}'

# SIEM-ready NDJSON output
if kwargs.get('siem'):
return self._export_siem(params, rows, fields, show_breachwatch, out, report_title)

field_descriptions = fields
if fmt == 'table':
field_descriptions = (field_to_title(x) for x in fields)

report_title = f'Security Audit Report{" (BreachWatch)" if show_breachwatch else ""}'
table = []
for raw in rows:
row = []
Expand All @@ -506,6 +515,56 @@ def decrypt_security_data(sec_data, key_type):
table.append(row)
return dump_report_data(table, field_descriptions, fmt=fmt, filename=out, title=report_title)

@staticmethod
def _export_siem(params, rows, fields, show_breachwatch, filename, title):
"""Export security audit as NDJSON (one JSON event per line).

Each line is a self-contained JSON object that SIEMs can ingest
independently. Compatible with Splunk HEC, Elastic Filebeat,
Datadog log pipelines, and any NDJSON consumer.
"""
now = datetime.datetime.now(tz=datetime.timezone.utc).isoformat(timespec='seconds')
server = params.server if hasattr(params, 'server') else ''
ndjson_lines = []

for raw in rows:
user_data = {f: raw.get(f) for f in fields}

# Collect risk factors from the data — no hardcoded thresholds
risk_factors = []
if not show_breachwatch:
if raw.get('weak', 0) > 0:
risk_factors.append('weak_passwords')
if raw.get('reused', 0) > 0:
risk_factors.append('reused_passwords')
two_fa = raw.get('twoFactorChannel', '')
if not two_fa or two_fa == 'Off':
risk_factors.append('no_2fa')
else:
if raw.get('at_risk', 0) > 0:
risk_factors.append('breach_exposure')

event = {
'event_type': 'keeper.security_audit',
'timestamp': now,
'source': server,
'user': user_data,
'security_score': raw.get('securityScore', 0),
'risk_factors': risk_factors,
}
ndjson_lines.append(json.dumps(event, default=str))

report = '\n'.join(ndjson_lines) + '\n'

if filename:
if not os.path.splitext(filename)[1]:
filename += '.ndjson'
logging.info('SIEM report path: %s', os.path.abspath(filename))
with open(filename, 'w') as f:
f.write(report)
else:
return report

def get_updated_security_report_row(self, sr, rsa_key, ec_key, last_saved_data):
# type: (APIRequest_pb2.SecurityReport, rsa.RSAPrivateKey, ec.EllipticCurvePrivateKey, Dict[str, int]) -> Dict[str, int]

Expand Down