Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions keepercommander/security_audit.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,16 @@ def needs_security_audit(params, record): # type: (KeeperParams, KeeperRecord)
saved_score_data = params.security_score_data.get(record.record_uid, {})
saved_sec_data = params.breach_watch_security_data.get(record.record_uid, {})
score_data = saved_score_data.get('data', {})
score_revision = saved_score_data.get('revision')
sec_revision = saved_sec_data.get('revision')
current_password = _get_pass(record)
if current_password != score_data.get('password') or None:
if current_password != score_data.get('password'):
return True

scores = dict(new=get_security_score(record) or 0, old=score_data.get('score', 0))
score_changed_on_passkey = any(x >= 100 for x in scores.values()) and any(x < 100 for x in scores.values())
creds_removed = bool(scores.get('old') and not scores.get('new'))
needs_alignment = bool(scores.get('new')) and not saved_sec_data
needs_alignment = current_password is not None and score_revision != sec_revision
return score_changed_on_passkey or creds_removed or needs_alignment

def update_security_audit_data(params, records): # type: (KeeperParams, List[KeeperRecord]) -> int
Expand Down
237 changes: 237 additions & 0 deletions tests/test_security_audit_refresh.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
import os
import json
from collections import Counter
from unittest import TestCase

import pytest

from data_config import read_config_file
from keepercommander import api, cli, security_audit
from keepercommander.commands.security_audit import SecurityAuditReportCommand, SecurityAuditSyncCommand
from keepercommander.error import CommandError
from keepercommander.params import KeeperParams
from keepercommander.utils import is_pw_fair, is_pw_strong, is_pw_weak
from keepercommander import vault


@pytest.mark.integration
class TestSecurityAuditRefresh(TestCase):
params = None # type: KeeperParams

@classmethod
def setUpClass(cls):
cls.params = KeeperParams()
read_config_file(cls.params, os.environ.get('KEEPER_CONFIG', '../config.json'))
api.login(cls.params)
api.query_enterprise(cls.params)
api.sync_down(cls.params, record_types=True)

@classmethod
def tearDownClass(cls):
try:
if cls.params:
cli.do_command(cls.params, 'delete-all --force')
api.sync_down(cls.params, record_types=True)
except Exception:
pass

def setUp(self):
api.sync_down(self.params, record_types=True)
cli.do_command(self.params, 'delete-all --force')
api.sync_down(self.params, record_types=True)

def add_legacy_record(self, title, password, extra_fields=''):
command = (
f'record-add --title="{title}" --record-type=legacy '
f'login=security.audit@example.com password={password} url=https://example.com'
)
if extra_fields:
command = f'{command} {extra_fields}'
record_uid = cli.do_command(self.params, command)
api.sync_down(self.params, record_types=True)
return record_uid

def add_typed_login_record(self, title, password):
command = (
f'record-add --title="{title}" --record-type=login '
f'login=security.audit@example.com password={password} url=https://example.com'
)
try:
record_uid = cli.do_command(self.params, command)
except CommandError as err:
if 'Record type "login" cannot be found.' in str(err):
self.skipTest('Typed login record type is not available in this integration environment')
raise
api.sync_down(self.params, record_types=True)
return record_uid

def update_password(self, record_uid, password):
cli.do_command(self.params, f'record-update --record={record_uid} password={password}')
api.sync_down(self.params, record_types=True)

def rotate_password(self, record_uid):
cli.do_command(self.params, f'rotate -- {record_uid}')
api.sync_down(self.params, record_types=True)

def hard_clear_current_user_security_data(self):
SecurityAuditSyncCommand().execute(
self.params,
email=[self.params.user],
hard=True,
force=True,
)
api.sync_down(self.params, record_types=True)

def current_user_report_row(self):
report = json.loads(SecurityAuditReportCommand().execute(self.params, save=True, format='json'))
return next((x for x in report if x.get('email') == self.params.user), None)

def current_user_debug_row(self):
report = json.loads(SecurityAuditReportCommand().execute(self.params, debug=True, format='json'))
return next((x for x in report if x.get('vault_owner') == self.params.user), None)

def get_score_payload(self, record_uid):
return (self.params.security_score_data.get(record_uid) or {}).get('data', {})

def assert_record_security_state(self, record_uid, password, score, has_security_data):
score_data = self.get_score_payload(record_uid)
self.assertEqual(score_data.get('password'), password)
self.assertEqual(score_data.get('score'), score)

security_data = self.params.breach_watch_security_data.get(record_uid)
if has_security_data:
self.assertIsNotNone(security_data)
else:
self.assertIsNone(security_data)

def assert_record_revisions_aligned(self, record_uid):
score_revision = (self.params.security_score_data.get(record_uid) or {}).get('revision')
security_revision = (self.params.breach_watch_security_data.get(record_uid) or {}).get('revision')
self.assertEqual(score_revision, security_revision)

def assert_record_has_no_password_score_data(self, record_uid):
self.assertEqual(self.get_score_payload(record_uid), {})

def expected_summary(self, record_uids):
summary = {
'weak': 0,
'fair': 0,
'medium': 0,
'strong': 0,
'reused': 0,
'unique': 0,
'securityScore': 25,
}
password_counts = Counter()
total = 0
for record_uid in record_uids:
score_data = self.get_score_payload(record_uid)
password = score_data.get('password')
score = score_data.get('score')
if password is None or score is None:
continue
total += 1
password_counts[password] += 1
if is_pw_strong(score):
summary['strong'] += 1
elif is_pw_fair(score):
summary['fair'] += 1
elif is_pw_weak(score):
summary['weak'] += 1
else:
summary['medium'] += 1

summary['reused'] = sum(count for count in password_counts.values() if count > 1)
summary['unique'] = total - summary['reused']
if total > 0:
strong_ratio = summary['strong'] / total
unique_ratio = summary['unique'] / total
summary['securityScore'] = int(100 * round((strong_ratio + unique_ratio + 1) / 4, 2))
return summary

def assert_debug_pending(self):
debug_row = self.current_user_debug_row()
self.assertIsNotNone(debug_row)
raw_old = debug_row.get('old_incremental_data') or []
raw_curr = debug_row.get('current_incremental_data') or []
self.assertTrue(any(item is not None for item in raw_old + raw_curr))

def assert_admin_summary_matches_records(self, record_uids, expect_debug_pending=True):
if expect_debug_pending:
self.assert_debug_pending()

row = self.current_user_report_row()
self.assertIsNotNone(row)
expected = self.expected_summary(record_uids)
for key, value in expected.items():
self.assertEqual(row.get(key), value, msg=f'{key} mismatch: {row}')
self.assertIsNone(self.current_user_debug_row())

def test_summary_alignment_for_add_update_reuse_and_password_removal(self):
record_uid_1 = self.add_legacy_record('Security audit lifecycle-1', 'aa')
self.assert_record_security_state(record_uid_1, 'aa', 0, True)
self.assert_record_revisions_aligned(record_uid_1)
self.assert_admin_summary_matches_records([record_uid_1])

self.update_password(record_uid_1, 'weak-password')
self.assert_record_security_state(record_uid_1, 'weak-password', 41, True)
self.assert_record_revisions_aligned(record_uid_1)
self.assert_admin_summary_matches_records([record_uid_1])

self.update_password(record_uid_1, 'A1!bcdefgh')
self.assert_record_security_state(record_uid_1, 'A1!bcdefgh', 61, True)
self.assert_record_revisions_aligned(record_uid_1)
self.assert_admin_summary_matches_records([record_uid_1])

self.update_password(record_uid_1, 'StrongPass123!')
self.assert_record_security_state(record_uid_1, 'StrongPass123!', 100, True)
self.assert_record_revisions_aligned(record_uid_1)
self.assert_admin_summary_matches_records([record_uid_1])

record_uid_2 = self.add_legacy_record('Security audit lifecycle-2', 'StrongPass123!')
self.assert_record_security_state(record_uid_2, 'StrongPass123!', 100, True)
self.assert_record_revisions_aligned(record_uid_2)
self.assert_admin_summary_matches_records([record_uid_1, record_uid_2])

self.update_password(record_uid_1, '')
self.assert_record_has_no_password_score_data(record_uid_1)
self.assert_admin_summary_matches_records([record_uid_1, record_uid_2])

def test_rotation_and_hard_clear_repair_align_admin_summary(self):
record_uid = self.add_legacy_record('Security audit rotate/repair', 'aa', extra_fields='cmdr:plugin=noop')
self.assert_record_security_state(record_uid, 'aa', 0, True)
self.assert_record_revisions_aligned(record_uid)
self.assert_admin_summary_matches_records([record_uid])

self.rotate_password(record_uid)
rotated_score_data = self.get_score_payload(record_uid)
self.assertIsInstance(rotated_score_data.get('password'), str)
self.assertTrue(rotated_score_data.get('password'))
self.assertIn('score', rotated_score_data)
self.assertIsNotNone(self.params.breach_watch_security_data.get(record_uid))
self.assert_record_revisions_aligned(record_uid)
self.assert_admin_summary_matches_records([record_uid])

self.hard_clear_current_user_security_data()
self.assertIsNotNone(self.get_score_payload(record_uid))
self.assertIsNone(self.params.breach_watch_security_data.get(record_uid))

record = vault.KeeperRecord.load(self.params, record_uid)
self.assertTrue(security_audit.needs_security_audit(self.params, record))

cli.do_command(self.params, f'sync-security-data {record_uid} --quiet')
api.sync_down(self.params, record_types=True)
self.assertIsNotNone(self.params.breach_watch_security_data.get(record_uid))
self.assert_record_revisions_aligned(record_uid)
self.assert_admin_summary_matches_records([record_uid])

def test_typed_login_add_and_update_align_admin_summary(self):
record_uid = self.add_typed_login_record('Security audit typed login', 'aa')
self.assert_record_security_state(record_uid, 'aa', 0, True)
self.assert_record_revisions_aligned(record_uid)
self.assert_admin_summary_matches_records([record_uid])

self.update_password(record_uid, 'StrongPass123!')
self.assert_record_security_state(record_uid, 'StrongPass123!', 100, True)
self.assert_record_revisions_aligned(record_uid)
self.assert_admin_summary_matches_records([record_uid])
85 changes: 85 additions & 0 deletions unit-tests/test_security_audit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from types import SimpleNamespace
from unittest import TestCase, mock

from keepercommander import security_audit


class TestSecurityAudit(TestCase):
def setUp(self):
self.record = SimpleNamespace(record_uid='record_uid')
self.params = SimpleNamespace(
enterprise_ec_key=b'enterprise-key',
security_score_data={},
breach_watch_security_data={},
)

def test_needs_security_audit_updates_missing_security_data_for_weak_password(self):
self.params.security_score_data = {
self.record.record_uid: {
'data': {'password': 'weak-password', 'score': 0},
'revision': 7,
}
}

with mock.patch('keepercommander.security_audit._get_pass', return_value='weak-password'), \
mock.patch('keepercommander.security_audit.get_security_score', return_value=0):
self.assertTrue(security_audit.needs_security_audit(self.params, self.record))

def test_needs_security_audit_updates_missing_security_data_for_nonzero_score(self):
self.params.security_score_data = {
self.record.record_uid: {
'data': {'password': 'StrongPass123!', 'score': 100},
'revision': 9,
}
}

with mock.patch('keepercommander.security_audit._get_pass', return_value='StrongPass123!'), \
mock.patch('keepercommander.security_audit.get_security_score', return_value=100):
self.assertTrue(security_audit.needs_security_audit(self.params, self.record))

def test_needs_security_audit_skips_when_no_password_and_no_security_data(self):
with mock.patch('keepercommander.security_audit._get_pass', return_value=None), \
mock.patch('keepercommander.security_audit.get_security_score', return_value=None):
self.assertFalse(security_audit.needs_security_audit(self.params, self.record))

def test_needs_security_audit_skips_when_security_data_already_exists_for_weak_password(self):
self.params.security_score_data = {
self.record.record_uid: {
'data': {'password': 'weak-password', 'score': 0},
'revision': 7,
}
}
self.params.breach_watch_security_data = {
self.record.record_uid: {'revision': 7}
}

with mock.patch('keepercommander.security_audit._get_pass', return_value='weak-password'), \
mock.patch('keepercommander.security_audit.get_security_score', return_value=0):
self.assertFalse(security_audit.needs_security_audit(self.params, self.record))

def test_needs_security_audit_updates_when_security_data_revision_is_stale(self):
self.params.security_score_data = {
self.record.record_uid: {
'data': {'password': 'StrongPass123!', 'score': 100},
'revision': 11,
}
}
self.params.breach_watch_security_data = {
self.record.record_uid: {'revision': 7}
}

with mock.patch('keepercommander.security_audit._get_pass', return_value='StrongPass123!'), \
mock.patch('keepercommander.security_audit.get_security_score', return_value=100):
self.assertTrue(security_audit.needs_security_audit(self.params, self.record))

def test_needs_security_audit_updates_when_password_is_removed(self):
self.params.security_score_data = {
self.record.record_uid: {
'data': {'password': 'StrongPass123!', 'score': 100},
'revision': 11,
}
}

with mock.patch('keepercommander.security_audit._get_pass', return_value=None), \
mock.patch('keepercommander.security_audit.get_security_score', return_value=None):
self.assertTrue(security_audit.needs_security_audit(self.params, self.record))
Loading