Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ mutation replay
## Usage

```
mutation play [--verbose] [--exclude=<glob>]... [--only-deadcode-detection] [--include=<glob>]... [--sampling=<s>] [--randomly-seed=<n>] [--max-workers=<n>] [<file-or-directory> ...] [-- PYTEST-COMMAND ...]
mutation play [--verbose] [--exclude=<glob>]... [--only-deadcode-detection] [--without-exception-injection] [--include=<glob>]... [--sampling=<s>] [--randomly-seed=<n>] [--max-workers=<n>] [<file-or-directory> ...] [-- PYTEST-COMMAND ...]
mutation replay [--verbose] [--max-workers=<n>]
mutation list
mutation show MUTATION
Expand Down Expand Up @@ -171,6 +171,38 @@ if True:

</details>

<details><summary><code>InjectException</code> — replace expressions with the exception they raise</summary>

Replace expressions that have well-known failure modes with a `raise` of the exception they can produce. This targets error-handling paths that pass on the happy path but silently break when the environment misbehaves.

The contracts are intentionally narrow — stdlib only, no inference:

| Expression | Injected mutation |
|---|---|
| `d[key]` (string key) | `raise KeyError(key)` |
| `lst[i]` (integer index) | `raise IndexError(i)` |
| `d[k]` (ambiguous) | both `raise KeyError(k)` and `raise IndexError(k)` |
| `int(x)`, `float(x)` | `raise ValueError(x)` |
| `open(path)` | `raise FileNotFoundError(path)` |
| `next(it)` | `raise StopIteration` |
| `x / y`, `x // y`, `x % y` | `raise ZeroDivisionError` |
| `obj.attr` | `raise AttributeError('attr')` |
| `for x in iterable` | `raise StopIteration` |

Mutations are skipped when the expression is already inside a `try/except` that handles the relevant exception, and never injected inside `except` blocks.

```python
# before
value = data[key]

# after
raise KeyError(key)
```

Use `--without-exception-injection` to skip all `InjectException` mutations when error-handling paths are intentionally untested or produce too much noise.

</details>

<details><summary><code>MutateAssignment</code> — replace assignment values with None</summary>

Replace the right-hand side of a plain assignment with `None`, verifying that the assigned value is not silently ignored.
Expand Down
1 change: 1 addition & 0 deletions foobar/ex.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
def decrement_by_two(a):
"""docstring for testing false-positive mutation (function)"""
abc = 42
ijk = 42 / a
return a - 2

167 changes: 163 additions & 4 deletions mutation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"""Mutation.

Usage:
mutation play [--verbose] [--exclude=<glob>]... [--only-deadcode-detection] [--include=<glob>]... [--sampling=<s>] [--randomly-seed=<n>] [--max-workers=<n>] [<file-or-directory> ...] [-- PYTEST-COMMAND ...]
mutation play [--verbose] [--exclude=<glob>]... [--only-deadcode-detection] [--without-exception-injection] [--include=<glob>]... [--sampling=<s>] [--randomly-seed=<n>] [--max-workers=<n>] [<file-or-directory> ...] [-- PYTEST-COMMAND ...]
mutation replay [--verbose] [--max-workers=<n>]
mutation list
mutation show MUTATION
Expand All @@ -24,6 +24,8 @@
(default: current Unix timestamp)
--only-deadcode-detection Only apply dead-code detection mutations (StatementDrop,
DefinitionDrop).
--without-exception-injection Skip all InjectException mutations (useful when error-handling
paths are intentionally untested or produce too much noise).
--max-workers=<n> Number of parallel workers (default: cpu_count - 1)
--verbose Show more information.
-h --help Show this screen.
Expand Down Expand Up @@ -1092,6 +1094,150 @@ def mutate(self, node, index, tree):
yield tree_copy, node_copy


class InjectException(metaclass=Mutation):
"""Replace expressions with the exception they can raise, targeting error-handling paths that are commonly forgotten."""

_DICT_HINTS = frozenset(["dict", "map", "table", "cache", "store", "config", "registry", "lookup"])
_LIST_HINTS = frozenset(["list", "array", "arr", "seq", "items", "elements"])

def predicate(self, node):
if isinstance(node, (ast.For, ast.AsyncFor)):
return True
if isinstance(node, ast.Subscript) and not isinstance(node.slice, ast.Slice):
return True
if (isinstance(node, ast.Call)
and isinstance(node.func, ast.Name)
and node.func.id in ("int", "float", "open", "next")):
return True
if isinstance(node, ast.BinOp) and isinstance(node.op, (ast.Div, ast.FloorDiv, ast.Mod)):
return True
if isinstance(node, ast.Attribute):
return True
return False

def _type_specs(self, node):
"""Return list of (exc_name, args_fn) where args_fn(node_copy) -> list of AST nodes."""
if isinstance(node, (ast.For, ast.AsyncFor)):
return [("StopIteration", lambda n: [])]
if isinstance(node, ast.Subscript):
return self._subscript_specs(node)
if isinstance(node, ast.Call) and isinstance(node.func, ast.Name):
fn = node.func.id
if fn in ("int", "float"):
return [("ValueError", lambda n: n.args[:1] if n.args else [])]
if fn == "open":
return [("FileNotFoundError", lambda n: n.args[:1] if n.args else [])]
if fn == "next":
return [("StopIteration", lambda n: [])]
if isinstance(node, ast.BinOp):
return [("ZeroDivisionError", lambda n: [])]
if isinstance(node, ast.Attribute):
return [("AttributeError", lambda n: [ast.Constant(
value=n.attr, lineno=n.lineno, col_offset=n.col_offset,
)])]
return []

def _subscript_specs(self, node):
"""Heuristic: KeyError for dict-like, IndexError for list-like, both if ambiguous."""
slice_ = node.slice
value = node.value
# String key → definitely KeyError
if isinstance(slice_, ast.Constant) and isinstance(slice_.value, str):
return [("KeyError", lambda n: [n.slice])]
# Integer key → more likely IndexError, but could be dict
if isinstance(slice_, ast.Constant) and isinstance(slice_.value, int):
return [("IndexError", lambda n: [n.slice])]
# Check variable name for hints
name = None
if isinstance(value, ast.Name):
name = value.id.lower()
elif isinstance(value, ast.Attribute):
name = value.attr.lower()
if name:
if any(h in name for h in self._DICT_HINTS):
return [("KeyError", lambda n: [n.slice])]
if any(h in name for h in self._LIST_HINTS):
return [("IndexError", lambda n: [n.slice])]
# Ambiguous — generate both
return [
("KeyError", lambda n: [n.slice]),
("IndexError", lambda n: [n.slice]),
]

def _build_parent_map(self, tree):
parent_map = {}
for n in ast.walk(tree):
for child in ast.iter_child_nodes(n):
parent_map[id(child)] = n
return parent_map

def _find_enclosing_stmt(self, parent_map, node):
"""Return (stmt, parent, field, idx) for the innermost statement in a body list."""
current = node
while id(current) in parent_map:
parent = parent_map[id(current)]
if isinstance(current, ast.stmt):
for field, value in ast.iter_fields(parent):
if isinstance(value, list):
for i, item in enumerate(value):
if item is current:
return current, parent, field, i
current = parent
return None, None, None, None

def _is_guarded(self, parent_map, node, exc_name):
"""Return True if node is inside an except block or a try that handles exc_name."""
current = node
while id(current) in parent_map:
parent = parent_map[id(current)]
if isinstance(parent, ast.ExceptHandler):
return True # never inject inside except blocks
if isinstance(parent, ast.Try) and any(current is s for s in parent.body):
for handler in parent.handlers:
if handler.type is None:
return True # bare except catches everything
names = []
if isinstance(handler.type, ast.Name):
names = [handler.type.id]
elif isinstance(handler.type, ast.Tuple):
names = [e.id for e in handler.type.elts if isinstance(e, ast.Name)]
if exc_name in names or "Exception" in names or "BaseException" in names:
return True
current = parent
return False

def _make_raise(self, exc_name, args, lineno, col_offset):
if args:
exc = ast.Call(
func=ast.Name(id=exc_name, ctx=ast.Load(), lineno=lineno, col_offset=col_offset),
args=args,
keywords=[],
lineno=lineno,
col_offset=col_offset,
)
else:
exc = ast.Name(id=exc_name, ctx=ast.Load(), lineno=lineno, col_offset=col_offset)
return ast.Raise(exc=exc, cause=None, lineno=lineno, col_offset=col_offset)

def mutate(self, node, index, tree):
specs = self._type_specs(node)
for exc_name, args_fn in specs:
tree_copy, node_copy = copy_tree_at(tree, index)
parent_map = self._build_parent_map(tree_copy)
if self._is_guarded(parent_map, node_copy, exc_name):
continue
lineno = getattr(node_copy, "lineno", 1)
col_offset = getattr(node_copy, "col_offset", 0)
stmt, parent, field, idx = self._find_enclosing_stmt(parent_map, node_copy)
if stmt is None or idx is None:
continue
exc_args = args_fn(node_copy)
raise_node = self._make_raise(exc_name, exc_args, lineno, col_offset)
getattr(parent, field)[idx] = raise_node
ast.fix_missing_locations(tree_copy)
yield tree_copy, node_copy


def diff(source, target, filename=""):
lines = unified_diff(
source.split("\n"), target.split("\n"), filename, filename, lineterm=""
Expand Down Expand Up @@ -1197,11 +1343,18 @@ def install_module_loader(uid):

components = path[:-3].split("/")

# For package __init__ files (e.g. schema/__init__.py), the module name
# is the package name (e.g. "schema"), not "schema.__init__".
if components and components[-1] == "__init__":
components = components[:-1]

while components:
for pythonpath in sys.path:
filepath = os.path.join(pythonpath, "/".join(components))
filepath += ".py"
ok = os.path.exists(filepath)
# Check for both package __init__.py and regular module .py
init_filepath = filepath + "/__init__.py"
regular_filepath = filepath + ".py"
ok = os.path.exists(init_filepath) or os.path.exists(regular_filepath)
if ok:
module_path = ".".join(components)
break
Expand Down Expand Up @@ -1229,7 +1382,10 @@ def pytest_configure(config):


def pytest_addoption(parser, pluginmanager):
parser.addoption("--mutation", dest="mutation", type=str)
try:
parser.addoption("--mutation", dest="mutation", type=str)
except ValueError:
pass # already registered (e.g. conftest.py + -p mutation both active)


def for_each_par_map(loop, pool, inc, proc, items):
Expand Down Expand Up @@ -1506,8 +1662,11 @@ async def play_create_mutations(loop, root, db, max_workers, arguments):
# setup coverage support
coverage = coverage_read(root)
only_dead_code = arguments["--only-deadcode-detection"]
without_inject = arguments.get("--without-exception-injection", False)
if only_dead_code:
mutation_predicate = mutation_only_deadcode
elif without_inject:
mutation_predicate = lambda x: not isinstance(x, InjectException)
else:
mutation_predicate = mutation_all

Expand Down