Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions keepercli-package/src/keepercli/commands/security_audit_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,22 @@ def _resolve_node_ids(self, enterprise_data, nodes: Optional[List[str]]) -> List
return []

node_ids = []
unresolved = []
for name_or_id in nodes:
matched = False
for n in enterprise_data.nodes.get_all_entities():
if name_or_id == str(n.node_id) or name_or_id == n.name:
node_ids.append(n.node_id)
matched = True
break
if not matched:
unresolved.append(name_or_id)

if unresolved:
raise base.CommandError(
f'Invalid node(s): {", ".join(repr(x) for x in unresolved)}. '
'Provide a valid node name or node UID.'
)
return node_ids

def _format_report(
Expand Down
14 changes: 10 additions & 4 deletions keepercli-package/src/keepercli/commands/share_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,10 @@ def execute(self, context: KeeperParams, **kwargs) -> Any:
if config.folders_only:
return self._generate_folders_report(generator, output_format, output_file)
if config.show_ownership:
return self._generate_ownership_report(generator, output_format, output_file, verbose)
return self._generate_ownership_report(
generator, output_format, output_file, verbose,
show_share_date=config.show_share_date
)
if config.record_filter:
return self._generate_record_detail_report(generator, config)
if config.user_filter:
Expand Down Expand Up @@ -158,15 +161,18 @@ def _generate_ownership_report(
generator: share_report.ShareReportGenerator,
output_format: str,
output_file: Optional[str],
verbose: bool
verbose: bool,
show_share_date: bool = False
) -> Optional[str]:
"""Generate record ownership report."""
entries = generator.generate_records_report()
headers = share_report.ShareReportGenerator.get_headers(ownership=True)
headers = share_report.ShareReportGenerator.get_headers(
ownership=True, show_share_date=show_share_date
)
table = [
[e.record_owner, e.record_uid, e.record_title,
e.shared_with if verbose else e.shared_with_count,
'\n'.join(e.folder_paths)]
'\n'.join(e.folder_paths)] + ([e.share_date or ''] if show_share_date else [])
for e in entries
]

Expand Down
158 changes: 130 additions & 28 deletions keepersdk-package/src/keepersdk/vault/share_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,19 @@

import dataclasses
import datetime
import logging
from typing import Optional, List, Dict, Any, Iterable, Set, NamedTuple

from . import vault_online, vault_types, vault_utils
from . import share_management_utils
from ..authentication import keeper_auth
from ..enterprise import enterprise_data as enterprise_data_types

_SHARE_DATE_EVENT_TYPES = ['folder_add_record', 'record_add']
_AUDIT_EVENT_LIMIT = 1000
_SHARE_DATE_PAGINATION_MAX = 100
_logger = logging.getLogger(__name__)


@dataclasses.dataclass
class ShareReportEntry:
Expand Down Expand Up @@ -217,21 +223,35 @@ def generate_records_report(self) -> List[ShareReportEntry]:
share_info_map = self._fetch_share_info(list(record_uids)) or {}
entries: List[ShareReportEntry] = []
processed_uids: Set[str] = set()

user_filter_lower = {u.lower() for u in self._config.user_filter} if self._config.user_filter else None


user_filter_lower = (
{u.lower() for u in self._config.user_filter} if self._config.user_filter else None
)
share_date_map: Dict[str, str] = {}
if self._config.show_share_date and self._auth and self._enterprise:
sf_records = (
self._get_shared_folder_records_for_user(user_filter_lower)
if user_filter_lower is not None
else self._get_all_shared_folder_records()
)
all_uids = set(share_info_map.keys()) | sf_records
if all_uids:
share_date_map = self._fetch_share_dates(list(all_uids))

for uid, share_info in share_info_map.items():
if not self._should_include_record(share_info):
continue

if user_filter_lower and not self._record_matches_user_filter(share_info, user_filter_lower):
continue
entries.append(self._build_share_entry(share_info))

entries.append(self._build_share_entry(share_info, share_date_map.get(uid)))
processed_uids.add(uid)

self._add_shared_folder_records(entries, processed_uids, share_info_map, user_filter_lower)


self._add_shared_folder_records(
entries, processed_uids, share_info_map, user_filter_lower, share_date_map
)

return entries

def _should_include_record(self, share_info: RecordShareInfo) -> bool:
Expand All @@ -252,42 +272,45 @@ def _add_shared_folder_records(
entries: List[ShareReportEntry],
processed_uids: Set[str],
share_info_map: Dict[str, RecordShareInfo],
user_filter_lower: Optional[Set[str]]
user_filter_lower: Optional[Set[str]],
share_date_map: Optional[Dict[str, str]] = None
) -> None:
"""Add records from shared folders that weren't returned by the share API."""
should_include = (
self._config.user_filter or
self._config.show_ownership or
self._config.user_filter or
self._config.show_ownership or
not self._config.record_filter
)

if not should_include:
return

sf_records = (
self._get_shared_folder_records_for_user(user_filter_lower)
if user_filter_lower
else self._get_all_shared_folder_records()
)

share_dates = share_date_map or {}

for record_uid in sf_records:
if record_uid in processed_uids:
continue

record_info = self._vault.vault_data.get_record(record_uid)
if not record_info:
continue

folder_paths = self._get_folder_paths(record_uid)
owner = self._get_owner_from_share_info(share_info_map, record_uid)

entries.append(ShareReportEntry(
record_uid=record_uid,
record_title=record_info.title,
record_owner=owner,
shared_with='',
shared_with_count=0,
folder_paths=folder_paths
folder_paths=folder_paths,
share_date=share_dates.get(record_uid)
))
processed_uids.add(record_uid)

Expand Down Expand Up @@ -454,27 +477,38 @@ def generate_report_rows(self) -> Iterable[List[Any]]:
elif self._config.show_ownership:
for entry in self.generate_records_report():
shared_info = entry.shared_with if self._config.verbose else entry.shared_with_count
yield [entry.record_owner, entry.record_uid, entry.record_title,
row = [entry.record_owner, entry.record_uid, entry.record_title,
shared_info, '\n'.join(entry.folder_paths)]
if self._config.show_share_date:
row.append(entry.share_date or '')
yield row
else:
for entry in self.generate_summary_report():
yield [entry.shared_to, entry.record_count, entry.shared_folder_count]

@staticmethod
def get_headers(folders_only: bool = False, ownership: bool = False) -> List[str]:
def get_headers(
folders_only: bool = False,
ownership: bool = False,
show_share_date: bool = False
) -> List[str]:
"""Get report headers based on configuration.

Args:
folders_only: True if generating shared folders report
ownership: True if generating ownership report
show_share_date: True to include share date column (ownership report only)

Returns:
List of header column names
"""
if folders_only:
return ['folder_uid', 'folder_name', 'shared_to', 'permissions', 'folder_path']
if ownership:
return ['record_owner', 'record_uid', 'record_title', 'shared_with', 'folder_path']
headers = ['record_owner', 'record_uid', 'record_title', 'shared_with', 'folder_path']
if show_share_date:
headers.append('share_date')
return headers
return ['shared_to', 'records', 'shared_folders']

def _resolve_record_uids(self, record_refs: List[str]) -> Set[str]:
Expand Down Expand Up @@ -504,7 +538,9 @@ def _fetch_share_info(self, record_uids: List[str]) -> Dict[str, RecordShareInfo

try:
shares_data = share_management_utils.get_record_shares(
self._vault, record_uids, is_share_admin=False
self._vault,
record_uids,
is_share_admin=self._config.show_share_date,
)

if not shares_data:
Expand Down Expand Up @@ -533,7 +569,70 @@ def _fetch_share_info(self, record_uids: List[str]) -> Dict[str, RecordShareInfo
pass

return result


def _fetch_share_dates(self, record_uids: List[str]) -> Dict[str, str]:
"""Fetch earliest share-related audit event date per record (enterprise only).
Returns dict of record_uid -> formatted date string, or empty if not available.
"""
if not record_uids or not self._auth or not self._enterprise:
return {}
record_uid_set = set(record_uids)
min_ts: Dict[str, int] = {}
search_min_ts = int(
(datetime.datetime.now() - datetime.timedelta(days=365 * 5)).timestamp()
)
audit_filter: Dict[str, Any] = {
'audit_event_type': _SHARE_DATE_EVENT_TYPES,
'created': {'min': search_min_ts},
'record_uid': record_uids,
}
rq: Dict[str, Any] = {
'command': 'get_audit_event_reports',
'scope': 'enterprise',
'report_type': 'raw',
'filter': audit_filter,
'limit': _AUDIT_EVENT_LIMIT,
'order': 'ascending',
}
iterations = 0
try:
while iterations < _SHARE_DATE_PAGINATION_MAX:
iterations += 1
rs = self._auth.execute_auth_command(rq)
events = rs.get('audit_event_overview_report_rows') or []
if not events:
break
for event in events:
uid = event.get('record_uid') or ''
if uid not in record_uid_set:
continue
ts = event.get('created')
if ts is None:
continue
try:
ts_int = int(ts)
except (TypeError, ValueError):
continue
if uid not in min_ts or ts_int < min_ts[uid]:
min_ts[uid] = ts_int
if len(events) < _AUDIT_EVENT_LIMIT:
break
last_ts = max(int(e.get('created', 0)) for e in events)
audit_filter['created'] = {'min': last_ts + 1}
except Exception as e:
_logger.debug('Failed to fetch share dates from audit: %s', e)
# Format as date string (created may be Unix seconds or milliseconds)
result: Dict[str, str] = {}
for uid, ts in min_ts.items():
try:
if ts > 1e12:
ts = ts // 1000
dt = datetime.datetime.fromtimestamp(ts, tz=datetime.timezone.utc)
result[uid] = dt.strftime('%Y-%m-%d %H:%M UTC')
except (OSError, ValueError):
result[uid] = str(ts)
return result

def _parse_user_permissions(self, shares: Dict) -> List[UserPermissionInfo]:
"""Parse user permissions from share data."""
permissions = []
Expand All @@ -554,22 +653,25 @@ def _parse_user_permissions(self, shares: Dict) -> List[UserPermissionInfo]:
))
return permissions

def _build_share_entry(self, share_info: RecordShareInfo) -> ShareReportEntry:
def _build_share_entry(
self, share_info: RecordShareInfo, share_date: Optional[str] = None
) -> ShareReportEntry:
"""Build a ShareReportEntry from RecordShareInfo."""
owner = self._get_owner_from_share_info({share_info.record_uid: share_info}, share_info.record_uid)
non_owner_shares = [p for p in share_info.user_permissions if not p.is_owner]

shared_with = ''
if self._config.verbose:
shared_with = self._format_verbose_permissions(share_info)

return ShareReportEntry(
record_uid=share_info.record_uid,
record_title=share_info.record_title,
record_owner=owner,
shared_with=shared_with,
shared_with_count=len(non_owner_shares),
folder_paths=share_info.folder_paths
folder_paths=share_info.folder_paths,
share_date=share_date
)

def _format_verbose_permissions(self, share_info: RecordShareInfo) -> str:
Expand Down