diff --git a/.claude/skills/mutation-test/SKILL.md b/.claude/skills/mutation-test/SKILL.md new file mode 100644 index 0000000..c523424 --- /dev/null +++ b/.claude/skills/mutation-test/SKILL.md @@ -0,0 +1,57 @@ +--- +name: mutation-test +description: Apply mutation testing to an existing project's pytest test suite using mutation.py +argument-hint: [project-path] +disable-model-invocation: true +allowed-tools: Bash(python3:*), Bash(pip:*), Bash(pip3:*) +--- + +Apply mutation testing to the project at: $ARGUMENTS + +## Steps + +### 1. Install mutation.py's dependencies into the target project's venv + +From within the target project directory (with its venv active): + +```sh +pip install aiostream docopt humanize loguru pygments \ + pytest-cov pytest-randomly pytest-xdist python-ulid \ + termcolor tqdm zstandard coverage +``` + +### 2. Verify the baseline test suite is green + +```sh +pytest +``` + +mutation.py will also check this automatically, but it's good to confirm first. + +### 3. Run mutation testing + +```sh +python3 /src/python-mutation/mutation.py play \ + --include= \ + --max-workers= +``` + +- `--include` — glob of files to mutate (e.g. `src/mylib.py` or `src/**/*.py`); omit to mutate all non-test `.py` files +- `--exclude` — defaults to `*test*`, so test files are already excluded +- `--max-workers` — parallelism (e.g. `--max-workers=8`); default is cpu_count-1 +- Results are stored in `.mutation.db` in the current directory + +### 4. Inspect results + +```sh +python3 /src/python-mutation/mutation.py list # surviving mutations +python3 /src/python-mutation/mutation.py show # diff of a specific survivor +python3 /src/python-mutation/mutation.py replay # interactively re-test survivors +``` + +## Gotchas + +- The target project must use **pytest** (mutation.py is pytest-only) +- Run mutation.py from **inside** the target project directory so `.mutation.db` and coverage data land there +- If `play` errors with "Tests are not green", check that `pytest-xdist` can run the suite in parallel — some tests have ordering dependencies +- `mutation.py` acts as a pytest plugin; it patches source files in-memory, never on disk diff --git a/.gitignore b/.gitignore index 90beca4..8db0da7 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ +.mutation.db + # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..ebc0c4a --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,108 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +`mutation` is a Python mutation testing tool. It introduces controlled mutations into source code and verifies that a test suite detects them, revealing gaps in test coverage quality. + +## Environment Setup + +Dependencies are managed with pip-tools. The `./venv` script creates a cached virtual environment: + +```sh +./venv pip install -r requirements.txt +./venv pip install -r requirements.dev.txt +``` + +Or directly with pip (if already in a venv): + +```sh +pip install -r requirements.txt +``` + +To regenerate the locked `requirements.txt` from `requirements.source.txt`: + +```sh +make lock +``` + +## Common Commands + +```sh +make check # Run mutation tests on foobar/ example + bandit security scan +make check-only # Run mutation tests only (no bandit) +make check-fast # Run pytest with fail-fast (-x) +make check-coverage # Generate HTML coverage report +make lint # Run pylama linter +make clean # Remove untracked/ignored files (git clean -fX) +make wip # Format with black+isort and commit as "wip" +``` + +**Running mutation tests directly:** + +```sh +python3 mutation.py play tests.py --include="foobar/ex.py,foobar/__init__.py" --exclude="tests.py" +mutation replay # Re-test previously failed mutations interactively +mutation list # List stored mutation failures +mutation show MUTATION # Display a specific mutation diff (syntax highlighted) +mutation apply MUTATION # Apply a mutation to source files +``` + +**Running the test suite for mutation.py itself:** + +```sh +pytest -x -vvv --capture=no mutation.py +``` + +## Architecture + +Everything lives in a single file: **`mutation.py`** (1052 lines). It functions as both a standalone CLI tool and a pytest plugin. + +### Mutation Classes + +Mutations are implemented via a `Mutation` metaclass that auto-registers all subclasses. Each mutation class implements two key methods: + +- **`predicate(node)`** — returns `True` if the AST node matches this mutation type (e.g., `isinstance(node, ast.Constant)` for numeric mutations) +- **`mutate(node, index, tree)`** — generator that yields `(mutated_tree_copy, new_node)` tuples, one per valid mutation of the node. **Must call `copy_tree_at(tree, index)` first** to get a deep copy of the AST; mutating `tree` directly would side-effect other mutations sharing the same tree object. + +The metaclass (`Mutation.__init__`) instantiates each subclass and stores it in `Mutation.ALL` (a set of all mutation instances). Optional `deadcode_detection = True` flags a mutation as part of dead-code detection (e.g., `StatementDrop`, `DefinitionDrop`), limiting it to the `--only-deadcode-detection` workflow. + +For each covered AST node in `iter_deltas`, the pipeline calls `predicate()` on every registered mutation instance; those matching call `mutate()` to generate candidate diffs. The resulting mutations are syntax-checked (via `ast.parse`) and stored as compressed diffs in the SQLite database. + +### Core Pipeline (`play` command) + +1. **`check_tests`** — runs the baseline test suite to confirm it passes; detects xdist parallel support +2. **`coverage_read`** — parses `.coverage` data to determine which lines are actually executed +3. **`iter_deltas`** — walks the AST via `ast.parse`, applies `mutate()` per node, filters to covered lines via `interesting()`, yields unified diffs +4. **`mutation_create`** — parallelizes delta generation using a process pool; stores mutations in the SQLite database compressed with zstandard +5. **`mutation_pass`** — runs each mutation through the test suite via a thread pool; records survivors (undetected mutations) + +### Storage + +Mutations are persisted in `.mutation.db` (a SQLite database via `sqlite3` from the stdlib) with three tables: + +- **`config`** — key/value pairs for run metadata (e.g. sampling config) +- **`mutations`** — each row holds a ULID (`uid`), source file path, and `zstandard`-compressed unified diff +- **`results`** — maps `uid` to a status integer (pass/fail/skip) after test execution + +### Pytest-Only + +`mutation` is fundamentally pytest-specific. Although the CLI accepts a custom `-- PYTEST-COMMAND`, it always appends `--mutation=` to whatever command is used. That flag is a pytest option registered by `mutation.py` itself acting as a pytest plugin (`pytest_configure` / `pytest_addoption` hooks). The plugin calls `install_module_loader` to patch the target module in-memory for that test run, without modifying files on disk. Any custom `PYTEST-COMMAND` must therefore still be a pytest invocation — swapping in a different test runner is not supported. + +### Async Execution + +`pool_for_each_par_map` drives the parallel mutation workflow using `asyncio` + `concurrent.futures` (process pool for mutation creation, thread pool for test execution) with `aiostream` for streaming results. + +## Key Files + +| File | Purpose | +|------|---------| +| `mutation.py` | Entire application: CLI, mutation engine, pytest plugin | +| `tests.py` | Example test suite (tests `foobar/ex.py`) used for self-testing | +| `foobar/ex.py` | Example module (`decrement_by_two`) mutated during self-tests | +| `requirements.source.txt` | Hand-maintained dependency list (input to pip-compile) | +| `requirements.txt` | pip-compiled locked dependencies (auto-generated, do not edit) | +| `requirements.dev.txt` | Dev-only tools: black, isort, bandit, tbvaccine | + + diff --git a/README.md b/README.md index 073d7f6..6b697dd 100644 --- a/README.md +++ b/README.md @@ -1,28 +1,34 @@ -# 🐛 mutation 🐛 +# 🐛 mutation -**early draft** requires more testing, please report any findings in -[my public inbox](https://lists.sr.ht/~amirouche/public-inbox). +Mutation testing tells you something coverage numbers can't: whether your tests would actually catch a bug. It works by introducing small deliberate changes into your code — flipping a `+` to a `-`, removing a condition — and checking whether your tests fail. If they don't, the mutation *survived*, and that's a gap worth knowing about. -The goal of `mutation` is to give an idea on how robust, and help -improve test suites. +`mutation` is built around three ideas: + +**Fast.** Mutations run in parallel. Most tools write mutated code to disk and run one test at a time — `mutation` doesn't, so you get results in minutes rather than hours. + +**Interactive.** `mutation replay` is a guided workflow, not a report. It walks you through each surviving mutation one by one: you inspect it, fix your tests, verify they're green, commit, and move on to the next. Less like a dashboard, more like an interactive rebase. + +**Light.** A single Python file. No Rust compiler, no configuration ceremony. Results stored in a local `.mutation.db` SQLite file. Source code you can actually read and understand — which matters when you're trusting a tool to tell you the truth about your tests. ## Getting started -```sh +`mutation` runs your tests with pytest. The `-- PYTEST-COMMAND` option lets you pass any pytest arguments — specific paths, flags, plugins — giving you full control over how the test suite runs. + +``` pip install mutation -mutation play tests.py --include="foobar/ex.py,foobar/__init__.py" --exclude="tests.py" +mutation play tests.py --include=foobar/ex.py --include=foobar/__init__.py --exclude=tests.py ``` -Then call: +Then work through the results: -```sh +``` mutation replay ``` ## Usage ``` -mutation play [--verbose] [--exclude=] [--only-deadcode-detection] [--include=] [--sampling=] [--randomly-seed=] [--max-workers=] [ ...] [-- TEST-COMMAND ...] +mutation play [--verbose] [--exclude=]... [--only-deadcode-detection] [--include=]... [--sampling=] [--randomly-seed=] [--max-workers=] [ ...] [-- PYTEST-COMMAND ...] mutation replay [--verbose] [--max-workers=] mutation list mutation show MUTATION @@ -31,30 +37,547 @@ mutation (-h | --help) mutation --version ``` -Both `--include` and `--exclude` support glob patterns. They are -optional but highly recommended to avoid the production of useless -mutations. +`mutation` only mutates code with test coverage, so it works best when coverage is high. + +`mutation` detects whether tests can run in parallel — making your test suite parallel-safe will significantly speed things up. + +## Options + +**`--include=` and `--exclude=`** + +Glob patterns matched against relative file paths. Repeat the flag to supply multiple patterns. + +``` +# Mutate only specific modules, exclude both test files and migrations +mutation play tests.py --include=src/*.py --include=lib/*.py --exclude=tests.py --exclude=migrations/*.py +``` + +Default `--include` is `*.py` (all Python files). Default `--exclude` is `*test*` (any path whose relative path contains "test"). The patterns are applied before the coverage filter, so files with no coverage are always skipped regardless. + +**`--sampling=`** + +Limit how many mutations are actually tested — useful for a quick sanity check before a full run. + +- `--sampling=100` — test only the first 100 mutations (deterministic order) +- `--sampling=10%` — test a random 10% of all mutations (probability-based; set `--randomly-seed` for reproducibility) + +Default: all mutations are tested. + +**`--randomly-seed=`** + +Integer seed that controls three things at once: the order pytest-randomly uses to shuffle your tests, the random values injected by numeric mutations (`MutateNumber`), and which mutations are selected when using `--sampling=N%`. Setting a fixed seed makes any of these behaviors reproducible across runs. + +Default: current Unix timestamp (a different seed each run). + +``` +mutation play tests.py --randomly-seed=12345 --sampling=20% +``` + +**`-- PYTEST-COMMAND`** + +A full pytest invocation to run instead of the built-in default (`pytest --exitfirst --no-header --tb=no --quiet --assert=plain`). Useful when you need specific pytest flags, plugins, or a subset of tests. + +`mutation` always appends `--mutation=` to whatever command you supply — this flag is how it injects each mutation in-process without touching files on disk. Because of this, the command **must** be a `pytest` invocation; other test runners are not supported. Coverage flags (`--cov`, etc.) are added automatically during the baseline run. + +`-- PYTEST-COMMAND` and `` are mutually exclusive. + +``` +# Run only the unit tests, with verbose output +mutation play --include="src/*.py" -- pytest -x -v tests/unit/ +``` + +## Mutations + +
AugAssignToAssign — convert augmented assignment to plain assignment + +Convert an augmented assignment (`x += v`) to a plain assignment (`x = v`), dropping the accumulation, verifying that the update operator is tested. + +```python +# before +total += amount + +# after +total = amount +``` + +
+ +
BreakToReturn — replace break with return + +Replace `break` with `return`, exiting the enclosing function instead of just the loop, verifying that the loop's exit path is tested. + +```python +# before +for item in items: + if item.done: + break + +# after +for item in items: + if item.done: + return +``` + +
+ +
Comparison — negate comparison expressions + +Negate a comparison expression by wrapping it with `not (...)`, verifying that the direction of every comparison is tested. + +```python +# before +if x > 0: + process(x) + +# after +if not (x > 0): + process(x) +``` + +
+ +
DefinitionDrop — remove function or class definitions + +Remove a function or class definition entirely (only when others remain in the same body), surfacing unreferenced definitions. + +```python +# before +def helper(): + return 42 + +def main(): + return helper() + +# after +def main(): + return helper() +``` + +
+ +
ForceConditional — force conditions to True or False + +Force the test of an if/while/assert/ternary to always be `True` or always `False`, verifying that both branches are meaningfully exercised. + +```python +# before +if is_valid(x): + save(x) + +# after +if True: + save(x) +``` + +
+ +
MutateAssignment — replace assignment values with None + +Replace the right-hand side of a plain assignment with `None`, verifying that the assigned value is not silently ignored. + +```python +# before +result = compute() + +# after +result = None +``` + +
+ +
MutateCallArgs — replace or drop function arguments + +Replace each positional call argument with `None`, and drop one argument at a time from multi-argument calls, verifying that every argument is actually used. + +```python +# before +result = process(data, config) + +# after +result = process(None, config) +``` + +
+ +
MutateContainment — swap in and not in operators + +Swap `in` ↔ `not in` in membership tests, verifying that the expected membership relationship is directly tested. + +```python +# before +if key in cache: + return cache[key] + +# after +if key not in cache: + return cache[key] +``` + +
+ +
MutateContextManager — strip context managers from with blocks + +Strip context managers from a `with` statement one at a time, keeping the body, verifying that each manager's effect is tested. + +```python +# before +with lock: + update_shared_state() + +# after +update_shared_state() +``` + +
+ +
MutateDefaultArgument — remove default argument values + +Remove leading default argument values one at a time, making parameters required, verifying that callers always supply them explicitly. + +```python +# before +def connect(host, port=8080, timeout=30): + ... + +# after +def connect(host, port, timeout=30): + ... +``` + +
+ +
MutateExceptionHandler — replace exception types with Exception + +Replace the specific exception type in an except clause with the generic `Exception`, verifying that the handler is tested for the right error kind. + +```python +# before +try: + connect() +except ConnectionError: + retry() + +# after +try: + connect() +except Exception: + retry() +``` + +
+ +
MutateFString — replace f-string interpolations with empty strings + +Replace each interpolated expression in an f-string with an empty string, verifying that callers check the formatted content rather than just the surrounding template. + +```python +# before +msg = f"expected {actual}, got {result}" + +# after +msg = f"expected , got {result}" +``` + +
+ +
MutateGlobal — remove global and nonlocal declarations + +Remove a `global` or `nonlocal` declaration entirely, causing assignments to bind a local variable instead, verifying that the scoping is exercised by tests. + +```python +# before +def increment(): + global counter + counter += 1 + +# after +def increment(): + counter += 1 +``` + +
+ +
MutateIdentity — swap is and is not operators + +Swap `is` ↔ `is not` in identity comparisons, verifying that the expected identity relationship is directly tested. + +```python +# before +if obj is None: + init() + +# after +if obj is not None: + init() +``` + +
+ +
MutateIterator — wrap iterables in reversed() + +Wrap a for-loop's iterable in `reversed()`, verifying that iteration order assumptions are tested. + +```python +# before +for item in queue: + process(item) + +# after +for item in reversed(queue): + process(item) +``` + +
+ +
MutateKeyword — rotate flow and boolean keywords + +Rotate flow keywords (break/continue/pass), swap boolean constants (True/False/None), and flip boolean operators (and/or). + +```python +# before +while True: + if done: + break + +# after +while True: + if done: + continue +``` + +
+ +
MutateLambda — replace lambda bodies with None + +Replace the body of a lambda with `None` (or `0` when the body is already `None`), verifying that the lambda's computation is actually used. + +```python +# before +transform = lambda x: x * 2 + +# after +transform = lambda x: None +``` + +
+ +
MutateMatchCase — remove match case branches + +Remove one case branch at a time from a match statement (Python 3.10+ only), verifying that each branch is exercised by the test suite. + +```python +# before +match command: + case "quit": + quit() + case "go": + go() + +# after +match command: + case "go": + go() +``` + +
+ +
MutateNumber — replace numeric literals with random values + +Replace an integer or float literal with a random value in the same bit-range, verifying that the exact numeric value is tested. + +```python +# before +TIMEOUT = 30 + +# after +TIMEOUT = 17 +``` + +
+ +
MutateOperator — replace arithmetic and comparison operators -`mutation` will only mutate code that has test coverage, hence it -works better with a high coverage. +Replace an arithmetic, bitwise, shift, or comparison operator with another in the same group, verifying the exact operator matters. -`mutation` will detect whether the tests can be run in parallel. It is -recommended to make the test suite work in parallel to speed up the -work of `mutation`. +```python +# before +result = a + b -Also, it is better to work with a random seed, otherwise add the -option `--randomly-seed=n` that works. +# after +result = a - b +``` + +
+ +
MutateReturn — replace return values with defaults + +Replace a return value with a type-appropriate default (`None`, `0`, `False`, or `""`), verifying that callers check what the function returns. + +```python +# before +def get_count(): + return len(items) + +# after +def get_count(): + return 0 +``` + +
+ +
MutateSlice — drop slice bounds and negate steps + +Drop the lower or upper bound of a slice (`a[i:j]` → `a[:j]` or `a[i:]`) and negate the step (`a[::2]` → `a[::-2]`), verifying that slice boundary conditions and direction are tested. + +```python +# before +chunk = data[start:end] + +# after +chunk = data[:end] +``` -## TODO +
+ +
MutateString — prepend prefixes to string literals + +Prepend a fixed prefix to a string or bytes literal, verifying that callers check the actual content. + +```python +# before +label = "hello" + +# after +label = "mutated string hello" +``` + +
+ +
MutateStringMethod — swap symmetric string methods + +Swap directionally symmetric string methods (lower↔upper, lstrip↔rstrip, find↔rfind, ljust↔rjust, removeprefix↔removesuffix, partition↔rpartition, split↔rsplit), verifying that the direction matters. + +```python +# before +name = text.lower() + +# after +name = text.upper() +``` + +
+ +
MutateYield — replace yield values with None + +Replace the value of a yield expression with `None`, verifying that the yielded value is actually used by callers. + +```python +# before +def generate(): + yield compute() + +# after +def generate(): + yield None +``` + +
+ +
NegateCondition — wrap conditions with not + +Wrap a bare (non-comparison) condition with `not`, inserting the logical inverse of the test, verifying that the truthiness of the value actually matters. + +```python +# before +if user.is_active: + allow() + +# after +if not user.is_active: + allow() +``` + +
+ +
RemoveDecorator — remove decorators from functions and classes + +Remove one decorator at a time from a decorated function or class, verifying that each decorator's effect is covered by tests. + +```python +# before +@login_required +def dashboard(request): + return render(request) + +# after +def dashboard(request): + return render(request) +``` + +
+ +
RemoveUnaryOp — strip unary operators + +Strip a unary operator (`not`, `-`, `~`) and leave only the operand, verifying that the operator's effect is covered by tests. + +```python +# before +if not flag: + skip() + +# after +if flag: + skip() +``` + +
+ +
StatementDrop — replace statements with pass + +Replace a covered statement with `pass`, verifying that no statement is inert dead code. + +```python +# before +x = compute() +validate(x) + +# after +x = compute() +pass +``` + +
+ +
SwapArguments — swap function call arguments + +Swap each pair of positional call arguments, verifying that argument order is tested. + +```python +# before +result = process(source, dest) + +# after +result = process(dest, source) +``` + +
+ +
ZeroIteration — replace iterables with empty lists + +Replace a for-loop's iterable with an empty list, forcing the body to never execute, verifying that callers handle empty-collection cases. + +```python +# before +for item in items: + process(item) + +# after +for item in []: + process(item) +``` -- [ ] Avoid mutations that are syntax errors to improve efficiency, - use python 3.9+ `ast.unparse`, possibly with `black`; -- [ ] `mutation play` can error even if the code and the test suite is - valid e.g. removing a docstring will trigger an error: reduce, - and hopefully eliminate that problem, requires python 3.9+ (see - above); -- [ ] Add PyPy support; +
-## Links +## Status -- [forge](https://git.sr.ht/~amirouche/mutation) +Early stage. Things may break. Bug reports and questions welcome at amirouche.boubekki@gmail.com. diff --git a/REDO.md b/REDO.md new file mode 100644 index 0000000..885b424 --- /dev/null +++ b/REDO.md @@ -0,0 +1,6 @@ +# REDO + +Projects to re-run after fixes: + +- **whenever** — coverage shows `—` in summary; `coverage report` exits non-zero due to `--fail-under=100` in pytest.ini; fix: add `--fail-under=0` to `get_coverage()` in `_update_summaries` (already fixed in manual regen, needs script update) +- **confuse** — 0 mutations; likely no covered source files found; investigate coverage_read output diff --git a/WHY.md b/WHY.md new file mode 100644 index 0000000..5562399 --- /dev/null +++ b/WHY.md @@ -0,0 +1,128 @@ +# Why Did This Mutation Survive? + +You just ran `mutation replay` and a mutation came back green — meaning your test suite +**did not catch** the code change. This document helps you decide what to do about it. + +--- + +## The Five Cases + +### 1. Real gap — write a test + +The mutation exposes a genuine hole in your test coverage. The code behaviour +it changes **is observable** and **should be tested**, but currently isn't. + +**Action:** Write a test that fails when this mutation is applied, then commit it. +The mutation will disappear from the queue once the test catches it. + +--- + +### 2. Fragile coverage — risk accepted + +A test *does* exercise this code path, but only incidentally — the assertion +doesn't pin down the exact behaviour that the mutation changes. Adding a +targeted assertion would make the test brittle or overly implementation-aware. + +**Action:** Mark it as fragile and move on. Keep it on your radar — if the +surrounding code changes, revisit whether coverage is still adequate. + +--- + +### 3. Equivalent mutation — semantically invisible + +The mutation produces code that is **behaviourally identical** to the original +under all reachable inputs. No test can ever catch it, because there is nothing +wrong to catch. + +Classic examples: +- Swapping `x * 1` → `x * 2` when `x` is always `0` at that point +- Reordering independent dictionary key assignments +- Changing a constant that is never read + +**Action:** Mark it as equivalent. The tool writes a `.diff` file to +`.mutations.ignored/` so this mutation is silently skipped on every future +`play` run without touching the database. Commit the ignore file so CI never +flags it again. + +The ignore file is named by a SHA-256 hash of the diff text, so it remains +valid across refactors that don't touch the mutated lines. + +--- + +### 4. Won't fix — consciously accepted gap + +The gap is real, but fixing it is not worth the effort right now (or ever). +Maybe the code path is deprecated, the risk is negligible, or the test would be +too expensive to maintain. + +**Action:** Mark it as "won't fix". It is permanently removed from the replay +queue and will not resurface in future runs. + +--- + +### 5. Todo — real gap, not fixing now + +Same as "Real gap", but you're acknowledging you won't fix it in this session. +It stays on the queue and will appear again in the next `replay`. + +**Action:** Mark it as todo and come back to it later. + +--- + +## Decision Guide + +``` +Does the mutation change observable behaviour? +│ +├── No → Equivalent (Case 3) +│ +└── Yes → Is it tested anywhere, even loosely? + │ + ├── No → Real gap (Case 1) or Todo (Case 5) + │ + └── Yes → Is the existing assertion precise enough? + │ + ├── Yes (test should catch it but doesn't) → Real gap (Case 1) + │ + └── No (only incidental coverage) + │ + ├── Risk matters → Real gap (Case 1) + └── Risk low → Fragile (Case 2) or Won't fix (Case 4) +``` + +--- + +## The `.mutations.ignored/` directory + +Each file is a human-readable diff prefixed with a comment explaining why it +was classified as equivalent: + +``` +# Case 3: equivalent mutation — x is always 0 at this point +--- a/mymodule/calc.py ++++ b/mymodule/calc.py +@@ -10,7 +10,7 @@ +- return x * 1 ++ return x * 2 +``` + +These files are safe to commit. They are content-addressed (SHA-256 of the diff), +so moving or renaming source files does not break them — only editing the +mutated lines themselves would make the ignore file stale. + +Run `mutation gc` to remove stale ignore files whose diffs no longer apply to +the current source. + +--- + +## Quick reference + +| Choice | Meaning | Resurfaces? | +|--------|---------|-------------| +| `1` Real gap | Write a test | Yes, until caught | +| `2` Fragile | Risk accepted | Yes, as reminder | +| `3` Equivalent | Writes ignore file | No (skipped at play time) | +| `4` Won't fix | Consciously accepted | No | +| `5` Todo | Fix later | Yes, next replay | +| `s` Skip | Undecided | Yes, later this session | +| `q` Quit | Save and exit | Yes, next replay | diff --git a/conftest.py b/conftest.py new file mode 100644 index 0000000..da4c7cf --- /dev/null +++ b/conftest.py @@ -0,0 +1,6 @@ +import os +import sys + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from mutation import pytest_addoption, pytest_configure # noqa: E402, F401 diff --git a/foobar/__init__.py b/foobar/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/foobar/ex.py b/foobar/ex.py index 0b29afe..7737bd7 100644 --- a/foobar/ex.py +++ b/foobar/ex.py @@ -2,4 +2,6 @@ def decrement_by_two(a): """docstring for testing false-positive mutation (function)""" + abc = 42 return a - 2 + diff --git a/foobar/test.py b/foobar/test.py new file mode 100644 index 0000000..e79a5dd --- /dev/null +++ b/foobar/test.py @@ -0,0 +1,9 @@ +"""docstring for testing false-positive mutation (module)""" + +from foobar.ex import decrement_by_two + + +def test_decrement_by_two(): + """docstring for testing false-positive mutation (function)""" + decrement_by_two(5) + assert True diff --git a/foobar/tests.py b/foobar/tests.py new file mode 100644 index 0000000..a6a983e --- /dev/null +++ b/foobar/tests.py @@ -0,0 +1,8 @@ +from foobar import ex + + +def test_000(): + assert ex.decrement_by_two(42) == 40 + +def test_001(): + assert ex.decrement_by_two(5) == 3 diff --git a/makefile b/makefile index f52e729..2379fa3 100644 --- a/makefile +++ b/makefile @@ -10,7 +10,11 @@ init: ## Prepare the host sytem for development @echo "\033[95m\n\nYou may now run 'make check'.\n\033[0m" check-only: - python3 mutation.py play tests.py --include="foobar/ex.py,foobar/__init__.py" --exclude="tests.py" || exit 1 + python3 mutation.py play tests.py --include="foobar/ex.py" --include="foobar/__init__.py" --exclude="tests.py" || exit 1 + +check-foobar: ## Verify foobar/test.py is weak: mutation survivors expected + python3 mutation.py play foobar/test.py --include="foobar/ex.py" + python3 -c "import sqlite3,sys; n=sqlite3.connect('.mutation.db').execute('SELECT COUNT(*) FROM results').fetchone()[0]; print(f'Survivors: {n}'); sys.exit(0 if n>0 else 1)" check: ## Run tests make check-only diff --git a/mutation.py b/mutation.py index 1718af7..b7d5299 100755 --- a/mutation.py +++ b/mutation.py @@ -2,30 +2,49 @@ """Mutation. Usage: - mutation play [--verbose] [--exclude=] [--only-deadcode-detection] [--include=] [--sampling=] [--randomly-seed=] [--max-workers=] [ ...] [-- TEST-COMMAND ...] + mutation play [--verbose] [--exclude=]... [--only-deadcode-detection] [--include=]... [--sampling=] [--randomly-seed=] [--max-workers=] [ ...] [-- PYTEST-COMMAND ...] mutation replay [--verbose] [--max-workers=] mutation list mutation show MUTATION mutation apply MUTATION + mutation summary + mutation gc mutation (-h | --help) mutation --version Options: - --verbose Show more information. - -h --help Show this screen. - --version Show version. + --include= Glob pattern for files to mutate, matched against relative paths. + Repeat the flag for multiple patterns [default: *.py] + --exclude= Glob pattern for files to skip. Repeat the flag for multiple + patterns [default: *test*] + --sampling= Limit mutations tested: N tests the first N, N% tests a random + N% (e.g. "--sampling=100" or "--sampling=10%") (default: all) + --randomly-seed= Integer seed controlling test order (pytest-randomly) and random + number mutations; also makes --sampling=N% reproducible + (default: current Unix timestamp) + --only-deadcode-detection Only apply dead-code detection mutations (StatementDrop, + DefinitionDrop). + --max-workers= Number of parallel workers (default: cpu_count - 1) + --verbose Show more information. + -h --help Show this screen. + --version Show version. """ +import ast import asyncio import fnmatch import functools +import hashlib import itertools +import json import os import random import re import shlex +import sqlite3 +import subprocess import sys import time -from ast import Constant +import types from concurrent import futures from contextlib import contextmanager from copy import deepcopy @@ -33,19 +52,15 @@ from difflib import unified_diff from uuid import UUID -import lexode -import parso import pygments import pygments.formatters import pygments.lexers import zstandard as zstd from aiostream import pipe, stream -from astunparse import unparse from coverage import Coverage from docopt import docopt from humanize import precisedelta from loguru import logger as log -from lsm import LSM from pathlib import Path from termcolor import colored from tqdm import tqdm @@ -59,6 +74,12 @@ DAY = 24 * HOUR MONTH = 31 * DAY +CLASSIFICATION_REAL_GAP = 1 +CLASSIFICATION_FRAGILE = 2 +CLASSIFICATION_EQUIVALENT = 3 +CLASSIFICATION_WONT_FIX = 4 +CLASSIFICATION_TODO = 5 + def humanize(seconds): if seconds < 1: @@ -148,24 +169,30 @@ def predicate(path): return predicate -def node_iter(node, level=1): - yield node - for child in node.children: - if not getattr(child, "children", False): - yield child - continue +def ast_walk(tree): + """Depth-first traversal of an AST, yielding every node.""" + yield tree + for child in ast.iter_child_nodes(tree): + yield from ast_walk(child) - yield from node_iter(child, level + 1) +def copy_tree_at(tree, index): + """Deep-copy *tree* and return (copy, node_at_index_in_copy).""" + tree_copy = deepcopy(tree) + return tree_copy, list(ast_walk(tree_copy))[index] -def node_copy_tree(node, index): - root = node.get_root_node() - root = deepcopy(root) - iterator = itertools.dropwhile( - lambda x: x[0] != index, zip(itertools.count(0), node_iter(root)) - ) - index, node = next(iterator) - return root, node + +def get_parent_field_idx(tree, node): + """Return (parent, field_name, list_index_or_None) for *node* in *tree*.""" + for parent in ast_walk(tree): + for field, value in ast.iter_fields(parent): + if isinstance(value, list): + for i, child in enumerate(value): + if child is node: + return parent, field, i + elif value is node: + return parent, field, None + return None, None, None @contextmanager @@ -174,6 +201,123 @@ def timeit(): yield lambda: time.perf_counter() - start +class Database: + def __init__(self, path, timeout=300): + self._conn = sqlite3.connect(str(path), check_same_thread=False, timeout=timeout) + self._conn.execute("PRAGMA journal_mode=WAL") + self._conn.execute( + "CREATE TABLE IF NOT EXISTS config (key TEXT PRIMARY KEY, value TEXT)" + ) + self._conn.execute( + "CREATE TABLE IF NOT EXISTS mutations " + "(uid BLOB PRIMARY KEY, path TEXT, diff BLOB)" + ) + self._conn.execute( + "CREATE TABLE IF NOT EXISTS results " + "(uid BLOB PRIMARY KEY, status INTEGER)" + ) + self._conn.commit() + try: + self._conn.execute("ALTER TABLE results ADD COLUMN classification INTEGER") + self._conn.commit() + except sqlite3.OperationalError: + pass # column already exists + + def __enter__(self): + return self + + def __exit__(self, *args): + self._conn.close() + + # --- config --- + def get_config(self, key): + row = self._conn.execute( + "SELECT value FROM config WHERE key = ?", (key,) + ).fetchone() + if row is None: + raise KeyError(key) + return json.loads(row[0]) + + def set_config(self, key, value): + self._conn.execute( + "INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)", + (key, json.dumps(value)), + ) + self._conn.commit() + + # --- mutations --- + def store_mutations(self, rows): + """Insert multiple (uid, path, diff) rows in a single transaction.""" + self._conn.executemany( + "INSERT OR REPLACE INTO mutations (uid, path, diff) VALUES (?, ?, ?)", + rows, + ) + self._conn.commit() + + def get_mutation(self, uid): + row = self._conn.execute( + "SELECT path, diff FROM mutations WHERE uid = ?", (uid,) + ).fetchone() + if row is None: + raise KeyError(uid) + return row[0], row[1] # path: str, diff: bytes + + def list_mutations(self): + return self._conn.execute( + "SELECT uid FROM mutations ORDER BY uid" + ).fetchall() + + # --- results --- + def set_result(self, uid, status): + self._conn.execute( + "INSERT OR REPLACE INTO results (uid, status) VALUES (?, ?)", + (uid, status), + ) + self._conn.commit() + + def del_result(self, uid): + self._conn.execute("DELETE FROM results WHERE uid = ?", (uid,)) + self._conn.commit() + + def list_results(self, status=None): + if status is not None: + return self._conn.execute( + "SELECT uid, status FROM results WHERE status = ? ORDER BY uid", + (status,), + ).fetchall() + return self._conn.execute( + "SELECT uid, status FROM results ORDER BY uid" + ).fetchall() + + def count_results(self): + return self._conn.execute("SELECT COUNT(*) FROM results").fetchone()[0] + + def count_mutations(self): + return self._conn.execute("SELECT COUNT(*) FROM mutations").fetchone()[0] + + def set_classification(self, uid, cls): + self._conn.execute( + "UPDATE results SET classification = ? WHERE uid = ?", (cls, uid) + ) + self._conn.commit() + + def list_results_for_replay(self): + """Return uids in the replay queue: survived + not permanently dismissed.""" + return self._conn.execute( + "SELECT uid FROM results " + "WHERE status IN (0, 1) " + "AND (classification IS NULL OR classification NOT IN (?, ?))", + (CLASSIFICATION_EQUIVALENT, CLASSIFICATION_WONT_FIX), + ).fetchall() + + def get_classification_counts(self): + """Return dict mapping classification value (or None) -> count.""" + rows = self._conn.execute( + "SELECT classification, COUNT(*) FROM results GROUP BY classification" + ).fetchall() + return {cls: count for cls, count in rows} + + class Mutation(type): ALL = set() DEADCODE = set() @@ -189,38 +333,45 @@ def __init__(cls, *args, **kwargs): class StatementDrop(metaclass=Mutation): + """Replace a statement with pass, verifying that no covered statement is inert dead code.""" deadcode_detection = True - NEWLINE = "a = 42\n" def predicate(self, node): - return "stmt" in node.type and node.type != "expr_stmt" + return isinstance(node, ast.stmt) and not isinstance( + node, (ast.Expr, ast.Pass, ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef) + ) - def mutate(self, node, index): - root, new = node_copy_tree(node, index) - index = new.parent.children.index(new) - passi = parso.parse("pass").children[0] - passi.prefix = new.get_first_leaf().prefix - new.parent.children[index] = passi - newline = parso.parse(type(self).NEWLINE).children[0].children[1] - new.parent.children.insert(index + 1, newline) - yield root, new + def mutate(self, node, index, tree): + tree_copy, node_copy = copy_tree_at(tree, index) + parent, field, idx = get_parent_field_idx(tree_copy, node_copy) + if parent is None or idx is None: + return + replacement = ast.Pass(lineno=node_copy.lineno, col_offset=node_copy.col_offset) + getattr(parent, field)[idx] = replacement + ast.fix_missing_locations(tree_copy) + yield tree_copy, node_copy class DefinitionDrop(metaclass=Mutation): + """Remove a function or class definition entirely (only when others remain in the same body), surfacing unreferenced definitions.""" deadcode_detection = True def predicate(self, node): - # There is also node.type = 'lambdadef' but lambadef are - # always part of a assignation statement. So, that case is - # handled in StatementDrop. - return node.type in ("classdef", "funcdef") + return isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)) - def mutate(self, node, index): - root, new = node_copy_tree(node, index) - new.parent.children.remove(new) - yield root, new + def mutate(self, node, index, tree): + tree_copy, node_copy = copy_tree_at(tree, index) + parent, field, idx = get_parent_field_idx(tree_copy, node_copy) + if parent is None or idx is None: + return + body = getattr(parent, field) + if len(body) <= 1: + return + body.pop(idx) + ast.fix_missing_locations(tree_copy) + yield tree_copy, node_copy def chunks(iterable, n): @@ -231,22 +382,24 @@ def chunks(iterable, n): class MutateNumber(metaclass=Mutation): + """Replace an integer or float literal with a random value in the same bit-range, verifying that the exact numeric value is tested.""" COUNT = 5 def predicate(self, node): - return node.type == "number" + return ( + isinstance(node, ast.Constant) + and isinstance(node.value, (int, float)) + and not isinstance(node.value, bool) + ) - def mutate(self, node, index): - value = eval(node.value) + def mutate(self, node, index, tree): + value = node.value if isinstance(value, int): - def randomize(x): return random.randint(0, x) - else: - def randomize(x): return random.random() * x @@ -257,101 +410,686 @@ def randomize(x): count = 0 while count != self.COUNT: count += 1 - root, new = node_copy_tree(node, index) - new.value = str(randomize(2 ** size)) - if new.value == node.value: + new_value = randomize(2 ** size) + if new_value == value: continue - yield root, new + tree_copy, node_copy = copy_tree_at(tree, index) + node_copy.value = new_value + ast.fix_missing_locations(tree_copy) + yield tree_copy, node_copy class MutateString(metaclass=Mutation): + """Prepend a fixed prefix to a string or bytes literal, verifying that callers check the actual content.""" + def predicate(self, node): - # str or bytes. - return node.type == "string" - - def mutate(self, node, index): - root, new = node_copy_tree(node, index) - value = eval(new.value) - if isinstance(value, bytes): - value = b"coffeebad" + value + return isinstance(node, ast.Constant) and isinstance(node.value, (str, bytes)) + + def mutate(self, node, index, tree): + tree_copy, node_copy = copy_tree_at(tree, index) + if isinstance(node_copy.value, bytes): + node_copy.value = b"coffeebad" + node_copy.value else: - value = "mutated string " + value - value = Constant(value=value, kind="") - value = unparse(value).strip() - new.value = value - yield root, new + node_copy.value = "mutated string " + node_copy.value + ast.fix_missing_locations(tree_copy) + yield tree_copy, node_copy class MutateKeyword(metaclass=Mutation): + """Rotate flow keywords (break/continue/pass), swap boolean constants (True/False/None), and flip boolean operators (and/or).""" + + FLOW_STMTS = (ast.Continue, ast.Break, ast.Pass) + BOOL_OPS = (ast.And, ast.Or) + + def predicate(self, node): + if isinstance(node, self.FLOW_STMTS): + return True + if isinstance(node, ast.Constant) and ( + node.value is True or node.value is False or node.value is None + ): + return True + if isinstance(node, ast.BoolOp): + return True + return False + + def mutate(self, node, index, tree): + if isinstance(node, self.FLOW_STMTS): + for new_cls in self.FLOW_STMTS: + if isinstance(node, new_cls): + continue + tree_copy, node_copy = copy_tree_at(tree, index) + parent, field, idx = get_parent_field_idx(tree_copy, node_copy) + if parent is None or idx is None: + continue + getattr(parent, field)[idx] = new_cls( + lineno=node_copy.lineno, col_offset=node_copy.col_offset + ) + ast.fix_missing_locations(tree_copy) + yield tree_copy, node_copy + + elif isinstance(node, ast.Constant): + if node.value is True: + swaps = [False, None] + elif node.value is False: + swaps = [True, None] + else: + swaps = [True, False] + for new_value in swaps: + tree_copy, node_copy = copy_tree_at(tree, index) + node_copy.value = new_value + ast.fix_missing_locations(tree_copy) + yield tree_copy, node_copy + + elif isinstance(node, ast.BoolOp): + for new_op_cls in self.BOOL_OPS: + if isinstance(node.op, new_op_cls): + continue + tree_copy, node_copy = copy_tree_at(tree, index) + node_copy.op = new_op_cls() + ast.fix_missing_locations(tree_copy) + yield tree_copy, node_copy - KEYWORDS = set(["continue", "break", "pass"]) - SINGLETON = set(["True", "False", "None"]) - # Support xor operator ^ - BOOLEAN = set(["and", "or"]) - TARGETS = KEYWORDS | SINGLETON | BOOLEAN +class Comparison(metaclass=Mutation): + """Negate a comparison expression by wrapping it with not (...), verifying that the direction of every comparison is tested.""" def predicate(self, node): - return node.type == "keyword" and node.value in type(self).TARGETS + return isinstance(node, ast.Compare) - def mutate(self, node, index): - value = node.value - for targets in [self.KEYWORDS, self.SINGLETON, self.BOOLEAN]: - if value in targets: - break + def mutate(self, node, index, tree): + tree_copy, node_copy = copy_tree_at(tree, index) + parent, field, idx = get_parent_field_idx(tree_copy, node_copy) + if parent is None: + return + not_node = ast.UnaryOp( + op=ast.Not(), + operand=node_copy, + lineno=node_copy.lineno, + col_offset=node_copy.col_offset, + ) + if idx is not None: + getattr(parent, field)[idx] = not_node else: - raise NotImplementedError + setattr(parent, field, not_node) + ast.fix_missing_locations(tree_copy) + yield tree_copy, not_node + + +class MutateOperator(metaclass=Mutation): + """Replace an arithmetic, bitwise, shift, or comparison operator with another in the same group, verifying the exact operator matters.""" + + BINARY_OPS = [ + ast.Add, ast.Sub, ast.Mod, ast.BitOr, ast.BitAnd, + ast.FloorDiv, ast.Div, ast.Mult, ast.BitXor, ast.Pow, ast.MatMult, + ] + SHIFT_OPS = [ast.LShift, ast.RShift] + CMP_OPS = [ast.Lt, ast.LtE, ast.Eq, ast.NotEq, ast.GtE, ast.Gt] + + BINOP_GROUPS = [BINARY_OPS, SHIFT_OPS] + + def predicate(self, node): + return isinstance(node, (ast.BinOp, ast.AugAssign, ast.Compare)) + + def mutate(self, node, index, tree): + if isinstance(node, (ast.BinOp, ast.AugAssign)): + for op_group in self.BINOP_GROUPS: + if type(node.op) not in op_group: + continue + for new_op_cls in op_group: + if new_op_cls is type(node.op): + continue + tree_copy, node_copy = copy_tree_at(tree, index) + node_copy.op = new_op_cls() + ast.fix_missing_locations(tree_copy) + yield tree_copy, node_copy + + elif isinstance(node, ast.Compare): + for i, op in enumerate(node.ops): + if type(op) not in self.CMP_OPS: + continue + for new_op_cls in self.CMP_OPS: + if new_op_cls is type(op): + continue + tree_copy, node_copy = copy_tree_at(tree, index) + node_copy.ops[i] = new_op_cls() + ast.fix_missing_locations(tree_copy) + yield tree_copy, node_copy + + +if hasattr(ast, "Match"): + + class MutateMatchCase(metaclass=Mutation): + """Remove one case branch at a time from a match statement (Python 3.10+), verifying that each branch is exercised by the test suite.""" + + def predicate(self, node): + return isinstance(node, ast.Match) and len(node.cases) > 1 + + def mutate(self, node, index, tree): + for i in range(len(node.cases)): + tree_copy, node_copy = copy_tree_at(tree, index) + node_copy.cases.pop(i) + ast.fix_missing_locations(tree_copy) + yield tree_copy, node_copy + + +_STRING_METHOD_SWAPS = { + "lower": ["upper"], "upper": ["lower"], + "lstrip": ["rstrip", "removeprefix"], "rstrip": ["lstrip", "removesuffix"], + "find": ["rfind"], "rfind": ["find"], + "ljust": ["rjust"], "rjust": ["ljust"], + "removeprefix": ["removesuffix"], "removesuffix": ["removeprefix"], + "partition": ["rpartition"], "rpartition": ["partition"], + "split": ["rsplit"], "rsplit": ["split"], +} + + +class MutateStringMethod(metaclass=Mutation): + """Swap directionally symmetric string methods (lower↔upper, lstrip↔rstrip, lstrip↔removeprefix, rstrip↔removesuffix, find↔rfind, ljust↔rjust, removeprefix↔removesuffix, partition↔rpartition, split↔rsplit), verifying that the direction matters.""" + + def predicate(self, node): + return ( + isinstance(node, ast.Call) + and isinstance(node.func, ast.Attribute) + and node.func.attr in _STRING_METHOD_SWAPS + ) + + def mutate(self, node, index, tree): + for target_attr in _STRING_METHOD_SWAPS[node.func.attr]: + tree_copy, node_copy = copy_tree_at(tree, index) + node_copy.func.attr = target_attr + ast.fix_missing_locations(tree_copy) + yield tree_copy, node_copy + - for target in targets: - if target == value: +class MutateCallArgs(metaclass=Mutation): + """Replace each positional call argument with None, and drop one argument at a time from multi-argument calls, verifying that every argument is actually used.""" + + def predicate(self, node): + return isinstance(node, ast.Call) and len(node.args) > 0 + + def mutate(self, node, index, tree): + for i, arg in enumerate(node.args): + if isinstance(arg, ast.Constant) and arg.value is None: continue - root, new = node_copy_tree(node, index) - new.value = target - yield root, new + tree_copy, node_copy = copy_tree_at(tree, index) + node_copy.args[i] = ast.Constant( + value=None, lineno=arg.lineno, col_offset=arg.col_offset + ) + ast.fix_missing_locations(tree_copy) + yield tree_copy, node_copy + if len(node.args) > 1: + for i in range(len(node.args)): + tree_copy, node_copy = copy_tree_at(tree, index) + node_copy.args.pop(i) + ast.fix_missing_locations(tree_copy) + yield tree_copy, node_copy + + +class ForceConditional(metaclass=Mutation): + """Force the test of an if/while/assert/ternary to always be True or always False, verifying that both branches are meaningfully exercised.""" -class Comparison(metaclass=Mutation): def predicate(self, node): - return node == "comparison" + return isinstance(node, (ast.If, ast.While, ast.Assert, ast.IfExp)) + + def mutate(self, node, index, tree): + for value in (True, False): + if isinstance(node.test, ast.Constant) and node.test.value is value: + continue + tree_copy, node_copy = copy_tree_at(tree, index) + node_copy.test = ast.Constant( + value=value, lineno=node_copy.test.lineno, col_offset=node_copy.test.col_offset + ) + ast.fix_missing_locations(tree_copy) + yield tree_copy, node_copy - def mutate(self, node, index): - root, new = node_copy_tree(node, index) - not_test = parso.parse("not ({})".format(new.get_code())) - index = new.parent.children.index(new) - new.parent.children[index] = not_test - return root, new +class MutateExceptionHandler(metaclass=Mutation): + """Replace the specific exception type in an except clause with the generic Exception, verifying that the handler is tested for the right error kind.""" -class MutateOperator(metaclass=Mutation): + def predicate(self, node): + return isinstance(node, ast.ExceptHandler) and node.type is not None + + def mutate(self, node, index, tree): + tree_copy, node_copy = copy_tree_at(tree, index) + node_copy.type = ast.Name( + id="Exception", + ctx=ast.Load(), + lineno=node_copy.type.lineno, + col_offset=node_copy.type.col_offset, + ) + ast.fix_missing_locations(tree_copy) + yield tree_copy, node_copy - BINARY = ["+", "-", "%", "|", "&", "//", "/", "*", "^", "**", "@"] - BITWISE = ["<<", ">>"] - COMPARISON = ["<", "<=", "==", "!=", ">=", ">"] - ASSIGNEMENT = ["="] + [x + "=" for x in BINARY + BITWISE] - # TODO support OPERATORS_CONTAINS = ["in", "not in"] +class ZeroIteration(metaclass=Mutation): + """Replace a for-loop's iterable with an empty list, forcing the body to never execute, verifying that callers handle empty-collection cases.""" - OPERATORS = [ - BINARY, - BITWISE, - BITWISE, - COMPARISON, - ASSIGNEMENT, - ] + def predicate(self, node): + return isinstance(node, (ast.For, ast.AsyncFor)) + + def mutate(self, node, index, tree): + tree_copy, node_copy = copy_tree_at(tree, index) + node_copy.iter = ast.List( + elts=[], + ctx=ast.Load(), + lineno=node_copy.iter.lineno, + col_offset=node_copy.iter.col_offset, + ) + ast.fix_missing_locations(tree_copy) + yield tree_copy, node_copy + + +class RemoveDecorator(metaclass=Mutation): + """Remove one decorator at a time from a decorated function or class, verifying that each decorator's effect is covered by tests.""" + + def predicate(self, node): + return ( + isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)) + and len(node.decorator_list) > 0 + ) + + def mutate(self, node, index, tree): + for i in range(len(node.decorator_list)): + tree_copy, node_copy = copy_tree_at(tree, index) + node_copy.decorator_list.pop(i) + ast.fix_missing_locations(tree_copy) + yield tree_copy, node_copy + + +class NegateCondition(metaclass=Mutation): + """Wrap a bare (non-comparison) condition with not, inserting the logical inverse of the test, verifying that the truthiness of the value actually matters.""" + + def predicate(self, node): + return isinstance(node, (ast.If, ast.While, ast.Assert, ast.IfExp)) and not isinstance( + node.test, ast.Compare + ) + + def mutate(self, node, index, tree): + tree_copy, node_copy = copy_tree_at(tree, index) + test = node_copy.test + node_copy.test = ast.UnaryOp( + op=ast.Not(), + operand=test, + lineno=test.lineno, + col_offset=test.col_offset, + ) + ast.fix_missing_locations(tree_copy) + yield tree_copy, node_copy + + +class MutateReturn(metaclass=Mutation): + """Replace a return value with a type-appropriate default (None, 0, False, or ""), verifying that callers check what the function returns.""" + + DEFAULTS = [None, 0, False, ""] def predicate(self, node): - return node.type == "operator" + return isinstance(node, ast.Return) and node.value is not None - def mutate(self, node, index): - for operators in type(self).OPERATORS: - if node.value not in operators: + def mutate(self, node, index, tree): + for default in self.DEFAULTS: + if isinstance(node.value, ast.Constant) and node.value.value is default: continue - for new_operator in operators: - if node.value == new_operator: + tree_copy, node_copy = copy_tree_at(tree, index) + node_copy.value = ast.Constant( + value=default, lineno=node_copy.lineno, col_offset=node_copy.col_offset + ) + ast.fix_missing_locations(tree_copy) + yield tree_copy, node_copy + + +class MutateLambda(metaclass=Mutation): + """Replace the body of a lambda with None (or 0 when the body is already None), verifying that the lambda's computation is actually used.""" + + def predicate(self, node): + return isinstance(node, ast.Lambda) + + def mutate(self, node, index, tree): + new_value = 0 if (isinstance(node.body, ast.Constant) and node.body.value is None) else None + tree_copy, node_copy = copy_tree_at(tree, index) + node_copy.body = ast.Constant( + value=new_value, lineno=node_copy.body.lineno, col_offset=node_copy.body.col_offset + ) + ast.fix_missing_locations(tree_copy) + yield tree_copy, node_copy + + +class MutateAssignment(metaclass=Mutation): + """Replace the right-hand side of a plain assignment with None, verifying that the assigned value is not silently ignored.""" + + def predicate(self, node): + return isinstance(node, ast.Assign) and not ( + isinstance(node.value, ast.Constant) and node.value.value is None + ) + + def mutate(self, node, index, tree): + tree_copy, node_copy = copy_tree_at(tree, index) + node_copy.value = ast.Constant( + value=None, lineno=node_copy.lineno, col_offset=node_copy.col_offset + ) + ast.fix_missing_locations(tree_copy) + yield tree_copy, node_copy + + +class AugAssignToAssign(metaclass=Mutation): + """Convert an augmented assignment (x += v) to a plain assignment (x = v), dropping the accumulation, verifying that the update operator is tested.""" + + def predicate(self, node): + return isinstance(node, ast.AugAssign) + + def mutate(self, node, index, tree): + tree_copy, node_copy = copy_tree_at(tree, index) + parent, field, idx = get_parent_field_idx(tree_copy, node_copy) + if parent is None or idx is None: + return + assign = ast.Assign( + targets=[node_copy.target], + value=node_copy.value, + lineno=node_copy.lineno, + col_offset=node_copy.col_offset, + ) + getattr(parent, field)[idx] = assign + ast.fix_missing_locations(tree_copy) + yield tree_copy, node_copy + + +class RemoveUnaryOp(metaclass=Mutation): + """Strip a unary operator (not, -, ~) and leave only the operand, verifying that the operator's effect is covered by tests.""" + + def predicate(self, node): + return isinstance(node, ast.UnaryOp) and isinstance( + node.op, (ast.Not, ast.USub, ast.Invert) + ) + + def mutate(self, node, index, tree): + tree_copy, node_copy = copy_tree_at(tree, index) + parent, field, idx = get_parent_field_idx(tree_copy, node_copy) + if parent is None: + return + operand = node_copy.operand + if idx is not None: + getattr(parent, field)[idx] = operand + else: + setattr(parent, field, operand) + ast.fix_missing_locations(tree_copy) + yield tree_copy, node_copy + + +class MutateIdentity(metaclass=Mutation): + """Swap is ↔ is not in identity comparisons, verifying that the expected identity relationship is directly tested.""" + + def predicate(self, node): + return isinstance(node, ast.Compare) and any( + isinstance(op, (ast.Is, ast.IsNot)) for op in node.ops + ) + + def mutate(self, node, index, tree): + for i, op in enumerate(node.ops): + if not isinstance(op, (ast.Is, ast.IsNot)): + continue + new_op = ast.IsNot() if isinstance(op, ast.Is) else ast.Is() + tree_copy, node_copy = copy_tree_at(tree, index) + node_copy.ops[i] = new_op + ast.fix_missing_locations(tree_copy) + yield tree_copy, node_copy + + +class MutateContainment(metaclass=Mutation): + """Swap in ↔ not in in membership tests, verifying that the expected membership relationship is directly tested.""" + + def predicate(self, node): + return isinstance(node, ast.Compare) and any( + isinstance(op, (ast.In, ast.NotIn)) for op in node.ops + ) + + def mutate(self, node, index, tree): + for i, op in enumerate(node.ops): + if not isinstance(op, (ast.In, ast.NotIn)): + continue + new_op = ast.NotIn() if isinstance(op, ast.In) else ast.In() + tree_copy, node_copy = copy_tree_at(tree, index) + node_copy.ops[i] = new_op + ast.fix_missing_locations(tree_copy) + yield tree_copy, node_copy + + +class BreakToReturn(metaclass=Mutation): + """Replace break with return, exiting the enclosing function instead of just the loop, verifying that the loop's exit path is tested.""" + + def predicate(self, node): + return isinstance(node, ast.Break) + + def mutate(self, node, index, tree): + tree_copy, node_copy = copy_tree_at(tree, index) + parent, field, idx = get_parent_field_idx(tree_copy, node_copy) + if parent is None or idx is None: + return + getattr(parent, field)[idx] = ast.Return( + value=None, lineno=node_copy.lineno, col_offset=node_copy.col_offset + ) + ast.fix_missing_locations(tree_copy) + yield tree_copy, node_copy + + +class SwapArguments(metaclass=Mutation): + """Swap each pair of positional call arguments, verifying that argument order is tested.""" + + def predicate(self, node): + return isinstance(node, ast.Call) and len(node.args) >= 2 + + def mutate(self, node, index, tree): + for i in range(len(node.args)): + for j in range(i + 1, len(node.args)): + tree_copy, node_copy = copy_tree_at(tree, index) + node_copy.args[i], node_copy.args[j] = node_copy.args[j], node_copy.args[i] + ast.fix_missing_locations(tree_copy) + yield tree_copy, node_copy + + +class MutateSlice(metaclass=Mutation): + """Drop the lower or upper bound of a slice (a[i:j] → a[:j] or a[i:]) and negate the step (a[::2] → a[::-2]), verifying that slice boundary conditions and direction are tested.""" + + def predicate(self, node): + return isinstance(node, ast.Slice) and ( + node.lower is not None or node.upper is not None or node.step is not None + ) + + def mutate(self, node, index, tree): + if node.lower is not None: + tree_copy, node_copy = copy_tree_at(tree, index) + node_copy.lower = None + ast.fix_missing_locations(tree_copy) + yield tree_copy, node_copy + if node.upper is not None: + tree_copy, node_copy = copy_tree_at(tree, index) + node_copy.upper = None + ast.fix_missing_locations(tree_copy) + yield tree_copy, node_copy + if node.step is not None: + tree_copy, node_copy = copy_tree_at(tree, index) + step = node_copy.step + if isinstance(step, ast.UnaryOp) and isinstance(step.op, ast.USub): + node_copy.step = step.operand + else: + node_copy.step = ast.UnaryOp( + op=ast.USub(), + operand=step, + lineno=step.lineno, + col_offset=step.col_offset, + ) + ast.fix_missing_locations(tree_copy) + yield tree_copy, node_copy + + +class MutateYield(metaclass=Mutation): + """Replace the value of a yield expression with None, verifying that the yielded value is actually used by callers.""" + + def predicate(self, node): + return ( + isinstance(node, ast.Yield) + and node.value is not None + and not (isinstance(node.value, ast.Constant) and node.value.value is None) + ) + + def mutate(self, node, index, tree): + tree_copy, node_copy = copy_tree_at(tree, index) + node_copy.value = ast.Constant( + value=None, lineno=node_copy.lineno, col_offset=node_copy.col_offset + ) + ast.fix_missing_locations(tree_copy) + yield tree_copy, node_copy + + +class MutateDefaultArgument(metaclass=Mutation): + """Remove leading default argument values one at a time, making parameters required, verifying that callers always supply them explicitly.""" + + def predicate(self, node): + return isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.Lambda)) and ( + len(node.args.defaults) > 0 + or any(d is not None for d in node.args.kw_defaults) + ) + + def mutate(self, node, index, tree): + for i in range(len(node.args.defaults)): + tree_copy, node_copy = copy_tree_at(tree, index) + node_copy.args.defaults = node_copy.args.defaults[i + 1:] + ast.fix_missing_locations(tree_copy) + yield tree_copy, node_copy + for i, default in enumerate(node.args.kw_defaults): + if default is None: + continue + tree_copy, node_copy = copy_tree_at(tree, index) + node_copy.args.kw_defaults[i] = None + ast.fix_missing_locations(tree_copy) + yield tree_copy, node_copy + + +class MutateIterator(metaclass=Mutation): + """Wrap a for-loop's iterable in reversed() or random.shuffle(), verifying that iteration order assumptions are tested.""" + + def predicate(self, node): + return isinstance(node, (ast.For, ast.AsyncFor)) and not ( + isinstance(node.iter, ast.Call) + and isinstance(node.iter.func, ast.Name) + and node.iter.func.id == "reversed" + ) + + def mutate(self, node, index, tree): + # Mutation 1: reversed(iterable) + tree_copy, node_copy = copy_tree_at(tree, index) + node_copy.iter = ast.Call( + func=ast.Name(id="reversed", ctx=ast.Load()), + args=[node_copy.iter], + keywords=[], + lineno=node_copy.iter.lineno, + col_offset=node_copy.iter.col_offset, + ) + ast.fix_missing_locations(tree_copy) + yield tree_copy, node_copy + + # Mutation 2: (__import__('random').shuffle(_s := list(iterable)) or _s) + # Shuffles the iterable in-place without adding an import statement. + tree_copy, node_copy = copy_tree_at(tree, index) + lineno = node_copy.iter.lineno + col = node_copy.iter.col_offset + seq_name = ast.Name(id="_mutation_seq_", ctx=ast.Store(), lineno=lineno, col_offset=col) + list_call = ast.Call( + func=ast.Name(id="list", ctx=ast.Load(), lineno=lineno, col_offset=col), + args=[node_copy.iter], + keywords=[], + lineno=lineno, + col_offset=col, + ) + walrus = ast.NamedExpr(target=seq_name, value=list_call, lineno=lineno, col_offset=col) + shuffle_call = ast.Call( + func=ast.Attribute( + value=ast.Call( + func=ast.Name(id="__import__", ctx=ast.Load(), lineno=lineno, col_offset=col), + args=[ast.Constant(value="random", lineno=lineno, col_offset=col)], + keywords=[], + lineno=lineno, + col_offset=col, + ), + attr="shuffle", + ctx=ast.Load(), + lineno=lineno, + col_offset=col, + ), + args=[walrus], + keywords=[], + lineno=lineno, + col_offset=col, + ) + seq_load = ast.Name(id="_mutation_seq_", ctx=ast.Load(), lineno=lineno, col_offset=col) + node_copy.iter = ast.BoolOp( + op=ast.Or(), + values=[shuffle_call, seq_load], + lineno=lineno, + col_offset=col, + ) + ast.fix_missing_locations(tree_copy) + yield tree_copy, node_copy + + +class MutateContextManager(metaclass=Mutation): + """Strip context managers from a with statement one at a time, keeping the body, verifying that each manager's effect is tested.""" + + def predicate(self, node): + return isinstance(node, (ast.With, ast.AsyncWith)) + + def mutate(self, node, index, tree): + for i in range(len(node.items)): + tree_copy, node_copy = copy_tree_at(tree, index) + if len(node_copy.items) == 1: + parent, field, idx = get_parent_field_idx(tree_copy, node_copy) + if parent is None or idx is None: continue - root, new = node_copy_tree(node, index) - new.value = new_operator - yield root, new + getattr(parent, field)[idx:idx + 1] = node_copy.body + else: + node_copy.items.pop(i) + ast.fix_missing_locations(tree_copy) + yield tree_copy, node_copy + + +class MutateFString(metaclass=Mutation): + """Replace each interpolated expression in an f-string with an empty string, verifying that callers check the formatted content rather than just the surrounding template.""" + + def predicate(self, node): + return isinstance(node, ast.JoinedStr) and any( + isinstance(v, ast.FormattedValue) for v in node.values + ) + + def mutate(self, node, index, tree): + for i, value in enumerate(node.values): + if not isinstance(value, ast.FormattedValue): + continue + tree_copy, node_copy = copy_tree_at(tree, index) + node_copy.values[i] = ast.Constant( + value="", lineno=node_copy.lineno, col_offset=node_copy.col_offset + ) + ast.fix_missing_locations(tree_copy) + yield tree_copy, node_copy + + +class MutateGlobal(metaclass=Mutation): + """Remove a global or nonlocal declaration entirely, causing assignments to bind a local variable instead, verifying that the scoping is exercised by tests.""" + + deadcode_detection = True + + def predicate(self, node): + return isinstance(node, (ast.Global, ast.Nonlocal)) + + def mutate(self, node, index, tree): + tree_copy, node_copy = copy_tree_at(tree, index) + parent, field, idx = get_parent_field_idx(tree_copy, node_copy) + if parent is None or idx is None: + return + body = getattr(parent, field) + if len(body) <= 1: + return + body.pop(idx) + ast.fix_missing_locations(tree_copy) + yield tree_copy, node_copy def diff(source, target, filename=""): @@ -362,34 +1100,42 @@ def diff(source, target, filename=""): return out -def mutate(node, index, mutations): +def mutate(node, index, tree, mutations): for mutation in mutations: if not mutation.predicate(node): continue - yield from mutation.mutate(node, index) + yield from mutation.mutate(node, index, tree) -def interesting(new_node, coverage): - if getattr(new_node, "line", False): - return new_node.line in coverage - return new_node.get_first_leaf().line in coverage +def interesting(node, coverage): + return getattr(node, "lineno", None) in coverage def iter_deltas(source, path, coverage, mutations): - ast = parso.parse(source) + tree = ast.parse(source) + canonical = ast.unparse(tree) ignored = 0 - for (index, (index, node)) in enumerate(zip(itertools.count(0), node_iter(ast))): - for root, new_node in mutate(node, index, mutations): + invalid = 0 + for index, node in enumerate(ast_walk(tree)): + for tree_copy, new_node in mutate(node, index, tree, mutations): if not interesting(new_node, coverage): ignored += 1 continue - target = root.get_code() - delta = diff(source, target, path) + target = ast.unparse(tree_copy) + try: + ast.parse(target) + except SyntaxError: + invalid += 1 + continue + delta = diff(canonical, target, path) yield delta if ignored > 1: msg = "Ignored {} mutations from file at {}" msg += " because there is no associated coverage." log.trace(msg, ignored, path) + if invalid > 0: + msg = "Skipped {} invalid (syntax error) mutations from {}" + log.trace(msg, invalid, path) async def pool_for_each_par_map(loop, pool, f, p, iterator): @@ -438,19 +1184,16 @@ def mutation_create(item): def install_module_loader(uid): - db = LSM(".mutation.okvslite") - mutation_show(uid.hex) - path, diff = lexode.unpack(db[lexode.pack([1, uid.bytes])]) + with Database(".mutation.db") as db: + path, diff = db.get_mutation(uid.bytes) diff = zstd.decompress(diff).decode("utf8") with open(path) as f: source = f.read() - patched = patch(diff, source) - - import imp + patched = patch(diff, ast.unparse(ast.parse(source))) components = path[:-3].split("/") @@ -469,11 +1212,10 @@ def install_module_loader(uid): if module_path is None: raise Exception("sys.path oops!") - patched_module = imp.new_module(module_path) + patched_module = types.ModuleType(module_path) try: exec(patched, patched_module.__dict__) except Exception: - # TODO: syntaxerror, do not produce those mutations exec("", patched_module.__dict__) sys.modules[module_path] = patched_module @@ -501,23 +1243,45 @@ def for_each_par_map(loop, pool, inc, proc, items): def mutation_pass(args): # TODO: rename command, uid, timeout = args + # Check if this mutation was previously classified as equivalent + with database_open(".", timeout=timeout) as db: + _, diff_bytes = db.get_mutation(uid) + diff_text = zstd.decompress(diff_bytes).decode("utf8") + ignored_file = Path(".mutations.ignored") / "{}.diff".format(diff_hash(diff_text)) + if ignored_file.exists(): + log.debug("Skipping ignored mutation: {}", uid.hex()) + with database_open(".", timeout=timeout) as db: + db.del_result(uid) + return True command = command + ["--mutation={}".format(uid.hex())] log.debug("Running command: {}", ' '.join(command)) out = run(command, timeout=timeout, silent=True) + if out == 4: + # pytest exit code 4 = "command line usage error": --mutation flag was + # not recognised, which means mutation.py is not loaded as a pytest + # plugin. Treat this as a hard error so it doesn't silently look like + # every mutation was caught. + log.error( + "pytest exited with code 4 (unrecognised arguments) for command: `{}`\n" + "Hint: mutation.py is not loaded as a pytest plugin. " + "Add `pytest_plugins = [\"mutation\"]` to your conftest.py.", + " ".join(command), + ) + sys.exit(1) if out == 0: msg = "no error with mutation: {} ({})" log.trace(msg, " ".join(command), out) - with database_open(".") as db: - db[lexode.pack([2, uid])] = b"\x00" + with database_open(".", timeout=timeout) as db: + db.set_result(uid, 0) return False else: # TODO: pass root path... - with database_open(".") as db: - del db[lexode.pack([2, uid])] + with database_open(".", timeout=timeout) as db: + db.del_result(uid) return True -PYTEST = "pytest --exitfirst --no-header --tb=no --quiet --assert=plain" +PYTEST = "python3 -m pytest -p mutation --exitfirst --no-header --tb=no --quiet --assert=plain" PYTEST = shlex.split(PYTEST) @@ -529,49 +1293,58 @@ def coverage_read(root): out = dict() root = root.resolve() for filepath in filepaths: - if not filepath.startswith(str(root)): - continue - key = str(Path(filepath).relative_to(root)) - value = set(data.lines(filepath)) + if filepath.startswith(str(root)): + key = str(Path(filepath).relative_to(root)) + else: + # coverage.py sometimes records relative paths; resolve against root + resolved = (root / filepath).resolve() + if not resolved.is_relative_to(root): + continue + key = str(resolved.relative_to(root)) + value = set(data.lines(filepath) or []) out[key] = value return out -def database_open(root, recreate=False): +def database_open(root, recreate=False, timeout=300): root = root if isinstance(root, Path) else Path(root) - db = root / ".mutation.okvslite" + db = root / ".mutation.db" if recreate and db.exists(): log.trace("Deleting existing database...") - for file in root.glob(".mutation.okvslite*"): + for file in root.glob(".mutation.db*"): file.unlink() if not recreate and not db.exists(): log.error("No database, can not proceed!") sys.exit(1) - db = LSM(str(db)) + return Database(str(db), timeout=timeout) - return db - -def run(command, timeout=None, silent=True): +def run(command, timeout=None, silent=True, verbose=False): if timeout and timeout < 60: timeout = 60 - if timeout: - command.insert(0, "timeout {}".format(timeout)) - - command.insert(0, "PYTHONDONTWRITEBYTECODE=1") + env = {**os.environ, "PYTHONDONTWRITEBYTECODE": "1"} + devnull = subprocess.DEVNULL if (silent and not verbose) else None - if silent: - command.append("> /dev/null 2>&1") - - return os.system(" ".join(command)) + try: + result = subprocess.run( + command, + env=env, + timeout=timeout, + stdout=devnull, + stderr=devnull, + ) + return result.returncode + except subprocess.TimeoutExpired: + return 1 def sampling_setup(sampling, total): + if sampling is None: - return lambda x: x, total + sampling = "100%" if sampling.endswith("%"): # randomly choose percent mutations @@ -611,13 +1384,12 @@ def sampler(iterable): # TODO: the `command` is a hack, maybe there is a way to avoid the # following code: `if command is not None. def check_tests(root, seed, arguments, command=None): - max_workers = arguments["--max-workers"] or (os.cpu_count() - 1) or 1 - max_workers = int(max_workers) + max_workers = int(arguments["--max-workers"] or (os.cpu_count() - 1) or 1) log.info("Let's check that the tests are green...") - if arguments[""] and arguments["TEST-COMMAND"]: - log.error(" and TEST-COMMAND are exclusive!") + if arguments[""] and arguments["PYTEST-COMMAND"]: + log.error(" and PYTEST-COMMAND are exclusive!") sys.exit(1) if command is not None: @@ -631,8 +1403,8 @@ def check_tests(root, seed, arguments, command=None): ] ) else: - if arguments["TEST-COMMAND"]: - command = list(arguments["TEST-COMMAND"]) + if arguments["PYTEST-COMMAND"]: + command = list(arguments["PYTEST-COMMAND"]) else: command = list(PYTEST) command.extend(arguments[""]) @@ -650,13 +1422,18 @@ def check_tests(root, seed, arguments, command=None): "--cov=.", "--cov-branch", "--no-cov-on-fail", + # Override any project-level --cov-fail-under; mutation.py only + # cares whether tests pass, not whether coverage meets a threshold. + "--cov-fail-under=0", # Pass random seed "--randomly-seed={}".format(seed), ] ) + verbose = arguments.get("--verbose", False) + with timeit() as alpha: - out = run(command) + out = run(command, verbose=verbose) if out == 0: log.info("Tests are green 💚") @@ -667,8 +1444,8 @@ def check_tests(root, seed, arguments, command=None): log.warning("I tried the following command: `{}`", " ".join(command)) # Same command without parallelization - if arguments["TEST-COMMAND"]: - command = list(arguments["TEST-COMMAND"]) + if arguments["PYTEST-COMMAND"]: + command = list(arguments["PYTEST-COMMAND"]) else: command = list(PYTEST) command.extend(arguments[""]) @@ -678,12 +1455,13 @@ def check_tests(root, seed, arguments, command=None): "--cov=.", "--cov-branch", "--no-cov-on-fail", + "--cov-fail-under=0", # Pass random seed "--randomly-seed={}".format(seed), ] with timeit() as alpha: - out = run(command) + out = run(command, verbose=verbose) if out != 0: msg = "Tests are definitly red! Return code is {}!!" @@ -692,10 +1470,10 @@ def check_tests(root, seed, arguments, command=None): sys.exit(2) # Otherwise, it is possible to run the tests but without - # parallelization. - msg = "Setting max_workers=1 because tests do not pass in parallel" + # parallelization via xdist. Mutations can still be tested + # concurrently (each as an independent serial pytest run). + msg = "Tests do not pass with xdist; each mutation will run without --numprocesses" log.warning(msg) - max_workers = 1 alpha = alpha() msg = "Approximate time required to run the tests once: {}..." @@ -716,12 +1494,10 @@ async def play_create_mutations(loop, root, db, max_workers, arguments): # Go through all files, and produce mutations, take into account # include pattern, and exclude patterns. Also, exclude what has # no coverage. - include = arguments.get("--include") or "*.py" - include = include.split(",") + include = arguments.get("--include") or ["*.py"] include = glob2predicate(include) - exclude = arguments.get("--exclude") or "*test*" - exclude = exclude.split(",") + exclude = arguments.get("--exclude") or ["*test*"] exclude = glob2predicate(exclude) filepaths = root.rglob("*.py") @@ -764,11 +1540,9 @@ def on_mutations_created(items): progress.update() total += len(items) - for path, delta in items: - # TODO: replace ULID with a content addressable hash. - uid = ULID().to_uuid().bytes - # delta is a compressed unified diff - db[lexode.pack([1, uid])] = lexode.pack([path, delta]) + # TODO: replace ULID with a content addressable hash. + rows = [(ULID().to_uuid().bytes, str(path), delta) for path, delta in items] + db.store_mutations(rows) with timeit() as delta: with futures.ProcessPoolExecutor(max_workers=max_workers) as pool: @@ -784,60 +1558,28 @@ def on_mutations_created(items): async def play_mutations(loop, db, seed, alpha, total, max_workers, arguments): # prepare to run tests against mutations - command = list(arguments["TEST-COMMAND"] or PYTEST) + command = list(arguments["PYTEST-COMMAND"] or PYTEST) command.append("--randomly-seed={}".format(seed)) command.extend(arguments[""]) eta = humanize(alpha * total / max_workers) - log.info("At most, it will take {} to run the mutations", eta) + log.info("Worst-case estimate (if every mutation takes the full test suite): {}", eta) timeout = alpha * 2 - uids = db[lexode.pack([1]) : lexode.pack([2])] - uids = ((command, lexode.unpack(key)[1], timeout) for (key, _) in uids) + rows = db.list_mutations() + uids = ((command, uid, timeout) for (uid,) in rows) # sampling sampling = arguments["--sampling"] make_sample, total = sampling_setup(sampling, total) uids = make_sample(uids) - step = 10 - - gamma = time.perf_counter() - - remaining = total - log.info("Testing mutations in progress...") - with tqdm(total=100) as progress: + with tqdm(total=total, desc="Mutations") as progress: def on_progress(_): - nonlocal remaining - nonlocal step - nonlocal gamma - - remaining -= 1 - - if (remaining % step) == 0: - - percent = 100 - ((remaining / total) * 100) - now = time.perf_counter() - delta = now - gamma - eta = (delta / step) * remaining - - progress.update(int(percent)) - progress.set_description("ETA {}".format(humanize(eta))) - - msg = "Mutation tests {:.2f}% done..." - log.debug(msg, percent) - log.debug("ETA {}...", humanize(eta)) - - for speed in [10_000, 1_000, 100, 10, 1]: - if total // speed == 0: - continue - step = speed - break - - gamma = time.perf_counter() + progress.update(1) with timeit() as delta: with futures.ThreadPoolExecutor(max_workers=max_workers) as pool: @@ -845,7 +1587,7 @@ def on_progress(_): loop, pool, on_progress, mutation_pass, uids ) - errors = len(list(db[lexode.pack([2]) : lexode.pack([3])])) + errors = db.count_results() if errors > 0: msg = "It took {} to compute {} mutation failures!" @@ -869,8 +1611,8 @@ async def play(loop, arguments): with database_open(root, recreate=True) as db: # store arguments used to execute command - if arguments["TEST-COMMAND"]: - command = list(arguments["TEST-COMMAND"]) + if arguments["PYTEST-COMMAND"]: + command = list(arguments["PYTEST-COMMAND"]) else: command = list(PYTEST) command += arguments[""] @@ -878,9 +1620,10 @@ async def play(loop, arguments): command=command, seed=seed, ) - value = list(command.items()) - db[lexode.pack((0, "command"))] = lexode.pack(value) + db.set_config("command", command) + # GC stale ignore files before generating new mutations + mutation_ignored_gc(root) # let's create mutations! count = await play_create_mutations(loop, root, db, max_workers, arguments) # Let's run tests against mutations! @@ -888,17 +1631,33 @@ async def play(loop, arguments): def mutation_diff_size(db, uid): - _, diff = lexode.unpack(db[lexode.pack([1, uid])]) + _, diff = db.get_mutation(uid) out = len(zstd.decompress(diff)) return out +def diff_hash(diff_text): + return hashlib.sha256(diff_text.encode()).hexdigest() + + +def write_ignored_file(root, diff_text, path, reason): + ignored_dir = Path(root) / ".mutations.ignored" + ignored_dir.mkdir(exist_ok=True) + header = "# Case 3: equivalent mutation" + if reason: + header += " — " + reason + content = header + "\n" + diff_text + h = diff_hash(diff_text) + ignored_file = ignored_dir / "{}.diff".format(h) + ignored_file.write_text(content) + return h + + def replay_mutation(db, uid, alpha, seed, max_workers, command): - log.info("* You can use Ctrl+C to exit at anytime, you progress is saved.") + log.info("* You can use Ctrl+C to exit at anytime, your progress is saved.") command = list(command) command.append("--randomly-seed={}".format(seed)) - max_workers = 1 if max_workers > 1: command.append("--numprocesses={}".format(max_workers)) @@ -906,28 +1665,57 @@ def replay_mutation(db, uid, alpha, seed, max_workers, command): while True: ok = mutation_pass((command, uid, timeout)) - if not ok: - mutation_show(uid.hex()) - msg = "* Type 'skip' to go to next mutation or enter to retry." - log.info(msg) - skip = input().startswith("s") - if skip: - db[lexode.pack([2, uid])] = b"\x01" - return - # Otherwise loop to re-test... - else: - del db[lexode.pack([2, uid])] - return + if ok: + # Mutation now caught — green + return None + + # Mutation still survives — show diff and classification menu + mutation_show(uid.hex()) + log.info("") + log.info(" [r] Replay — re-run this mutation against current test suite") + log.info(" [1] Real gap — I will write a test (keeps in queue)") + log.info(" [2] Fragile — risk accepted (keeps in queue, marked)") + log.info(" [3] Equivalent — semantically invisible (writes to .mutations.ignored/)") + log.info(" [4] Won't fix — known gap, consciously accepted (never resurfaces)") + log.info(" [5] Todo — real gap, not fixing now (resurfaces next replay)") + log.info(" [s] Skip — undecided, back of queue") + log.info(" [q] Quit") + choice = input("> ").strip().lower() + + if choice == "r": + continue + elif choice == "1": + db.set_classification(uid, CLASSIFICATION_REAL_GAP) + return None + elif choice == "2": + db.set_classification(uid, CLASSIFICATION_FRAGILE) + return None + elif choice == "3": + reason = input("Optional one-line reason (or Enter to skip): ").strip() + path, diff_bytes = db.get_mutation(uid) + diff = zstd.decompress(diff_bytes).decode("utf8") + write_ignored_file(".", diff, path, reason) + db.set_classification(uid, CLASSIFICATION_EQUIVALENT) + return None + elif choice == "4": + db.set_classification(uid, CLASSIFICATION_WONT_FIX) + return None + elif choice == "5": + db.set_classification(uid, CLASSIFICATION_TODO) + return None + elif choice == "s": + return "skip" # caller appends uid to back of queue + elif choice == "q": + sys.exit(0) + # else: invalid input — loop back to menu def replay(arguments): root = Path(".") with database_open(root) as db: - command = db[lexode.pack((0, "command"))] + command = db.get_config("command") - command = lexode.unpack(command) - command = dict(command) seed = command.pop("seed") random.seed(seed) command = command.pop("command") @@ -936,9 +1724,7 @@ def replay(arguments): with database_open(root) as db: while True: - uids = ( - lexode.unpack(k)[1] for k, v in db[lexode.pack([2]) :] if v == b"\x00" - ) + uids = [uid for (uid,) in db.list_results_for_replay()] uids = sorted( uids, key=functools.partial(mutation_diff_size, db), @@ -949,18 +1735,89 @@ def replay(arguments): sys.exit(0) while uids: uid = uids.pop(0) - replay_mutation(db, uid, alpha, seed, max_workers, command) + result = replay_mutation(db, uid, alpha, seed, max_workers, command) + if result == "skip": + uids.append(uid) def mutation_list(): with database_open(".") as db: - uids = ((lexode.unpack(k)[1], v) for k, v in db[lexode.pack([2]) :]) + uids = db.list_results() uids = sorted(uids, key=lambda x: mutation_diff_size(db, x[0]), reverse=True) if not uids: log.info("No mutation failures 👍") sys.exit(0) - for (uid, type) in uids: - log.info("{}\t{}".format(uid.hex(), "skipped" if type == b"\x01" else "")) + for (uid, status) in uids: + log.info("{}\t{}".format(uid.hex(), "skipped" if status == 1 else "")) + + +def mutation_summary(): + root = Path(".") + with database_open(root) as db: + total_mutations = db.count_mutations() + total_results = db.count_results() + counts = db.get_classification_counts() + + killed = total_mutations - total_results + + unreviewed = counts.get(None, 0) + counts.get(1, 0) # None + old status=1 skip + real_gaps = counts.get(CLASSIFICATION_REAL_GAP, 0) + fragile = counts.get(CLASSIFICATION_FRAGILE, 0) + equivalent = counts.get(CLASSIFICATION_EQUIVALENT, 0) + wont_fix = counts.get(CLASSIFICATION_WONT_FIX, 0) + todo = counts.get(CLASSIFICATION_TODO, 0) + + survived = total_results + tested = total_mutations + + ignored_dir = root / ".mutations.ignored" + ignored_files = len(list(ignored_dir.glob("*.diff"))) if ignored_dir.exists() else 0 + + log.info("Mutations generated: {:>6,}", total_mutations) + log.info("Tested: {:>6,}", tested) + log.info("Killed: {:>6,}", killed) + log.info("Survived: {:>6,}", survived) + log.info(" — Real gaps: {:>6,}", real_gaps) + log.info(" — Fragile coverage: {:>6,}", fragile) + log.info(" — Equivalent: {:>6,} (in .mutations.ignored/)", equivalent) + log.info(" — Won't fix: {:>6,}", wont_fix) + log.info(" — Todo: {:>6,}", todo) + log.info(" — Unreviewed: {:>6,}", unreviewed) + log.info("Ignored: {:>6,}", ignored_files) + + +def mutation_ignored_gc(root): + root = Path(root) + ignored_dir = root / ".mutations.ignored" + if not ignored_dir.exists(): + return + removed = 0 + for ignore_file in ignored_dir.glob("*.diff"): + content = ignore_file.read_text() + # Strip header comment lines to isolate the diff + diff_lines = [l for l in content.splitlines(keepends=True) if not l.startswith("#")] + diff_text = "".join(diff_lines).lstrip("\n") + # Extract target path from "--- a/path/to/file.py" + path = None + for line in diff_lines: + if line.startswith("--- "): + path = line[4:].strip().removeprefix("a/") + break + if path is None or not (root / path).exists(): + ignore_file.unlink() + log.info("GC: removed stale ignore file {} (source not found)", ignore_file.name) + removed += 1 + continue + try: + source = (root / path).read_text() + normalized = ast.unparse(ast.parse(source)) + patch(diff_text, normalized) + except Exception: + ignore_file.unlink() + log.info("GC: removed stale ignore file {} (diff no longer applies)", ignore_file.name) + removed += 1 + if removed: + log.info("Removed {} stale .mutations.ignored/ file(s).", removed) def mutation_show(uid): @@ -968,7 +1825,7 @@ def mutation_show(uid): log.info("mutation show {}", uid.hex) log.info("") with database_open(".") as db: - path, diff = lexode.unpack(db[lexode.pack([1, uid.bytes])]) + path, diff = db.get_mutation(uid.bytes) diff = zstd.decompress(diff).decode("utf8") terminal256 = pygments.formatters.get_formatter_by_name("terminal256") @@ -997,14 +1854,13 @@ def mutation_show(uid): def mutation_apply(uid): - # TODO: add support for uuid inside lexode uid = UUID(hex=uid) with database_open(".") as db: - path, diff = lexode.unpack(db[lexode.pack([1, uid])]) + path, diff = db.get_mutation(uid.bytes) diff = zstd.decompress(diff).decode("utf8") with open(path, "r") as f: source = f.read() - patched = patch(diff, source) + patched = patch(diff, ast.unparse(ast.parse(source))) with open(path, "w") as f: f.write(patched) @@ -1042,8 +1898,17 @@ def main(): mutation_apply(arguments["MUTATION"]) sys.exit(0) + if arguments.get("summary", False): + mutation_summary() + sys.exit(0) + + if arguments.get("gc", False): + mutation_ignored_gc(".") + sys.exit(0) + # Otherwise run play. - loop = asyncio.get_event_loop() + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) loop.run_until_complete(play(loop, arguments)) loop.close() diff --git a/rc.sh b/rc.sh index 2bbd0ae..5382230 100644 --- a/rc.sh +++ b/rc.sh @@ -1,8 +1,4 @@ -<<<<<<< HEAD MYVENV=1 - -======= ->>>>>>> c3952d2 (wip) HISTCONTROL=ignoreboth # append to the history file, don't overwrite it diff --git a/requirements.source.txt b/requirements.source.txt index 7a397ed..47753ae 100644 --- a/requirements.source.txt +++ b/requirements.source.txt @@ -1,18 +1,13 @@ loguru aiostream docopt -lsm-db -parso -lexode zstandard[cffi] python-ulid pytest-cov pytest-randomly humanize -astunparse tqdm pytest pygments termcolor pytest-xdist -cython diff --git a/requirements.txt b/requirements.txt index 29aac00..c91c7a8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,21 +1,15 @@ # -# This file is autogenerated by pip-compile with Python 3.11 +# This file is autogenerated by pip-compile with Python 3.12 # by the following command: # # pip-compile --output-file=requirements.txt requirements.source.txt # aiostream==0.5.2 # via -r requirements.source.txt -astunparse==1.6.3 - # via -r requirements.source.txt cffi==1.16.0 # via zstandard coverage[toml]==7.3.2 - # via - # coverage - # pytest-cov -cython==3.0.5 - # via -r requirements.source.txt + # via pytest-cov docopt==0.6.2 # via -r requirements.source.txt execnet==2.0.2 @@ -24,16 +18,10 @@ humanize==4.8.0 # via -r requirements.source.txt iniconfig==2.0.0 # via pytest -lexode==0.3.0 - # via -r requirements.source.txt loguru==0.7.2 # via -r requirements.source.txt -lsm-db==0.7.1 - # via -r requirements.source.txt packaging==23.2 # via pytest -parso==0.8.3 - # via -r requirements.source.txt pluggy==1.3.0 # via pytest pycparser==2.21 @@ -54,15 +42,11 @@ pytest-xdist==3.4.0 # via -r requirements.source.txt python-ulid==2.2.0 # via -r requirements.source.txt -six==1.16.0 - # via astunparse termcolor==2.3.0 # via -r requirements.source.txt tqdm==4.66.1 # via -r requirements.source.txt typing-extensions==4.8.0 # via aiostream -wheel==0.41.3 - # via astunparse zstandard[cffi]==0.22.0 # via -r requirements.source.txt diff --git a/run-sequential.sh b/run-sequential.sh new file mode 100755 index 0000000..5335aa2 --- /dev/null +++ b/run-sequential.sh @@ -0,0 +1,445 @@ +#!/bin/bash +# Sequential mutation testing runner for tip-of-the-top projects. +# Runs one project at a time, each with --max-workers=20. +# On baseline test failure (rc=2), calls claude to diagnose and fix, then retries once. +# Appends to summary.log and regenerates summary.md after every project. +# +# Usage: bash /home/ada/src/python/mutation/run-sequential.sh + +set -uo pipefail + +MUTATION_PY="/home/ada/src/python/mutation/mutation.py" +TMP_DIR="/home/ada/tmp/mutation/tip-of-the-top" +LOG_DIR="$TMP_DIR/logs" +SUMMARY_LOG="$TMP_DIR/summary.log" +SUMMARY_MD="$TMP_DIR/summary.md" +WORKERS=25 +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +URLS_FILE="$SCRIPT_DIR/tip-of-the-top.txt" + +mkdir -p "$LOG_DIR" + +ts() { date '+%Y-%m-%d %H:%M'; } +ts_long() { date '+%Y-%m-%d %H:%M:%S'; } + +# ─── Record one line to summary.log ─────────────────────────────────────────── +_record() { + local repo_name="$1" + local status="$2" + local project_dir="$TMP_DIR/$repo_name" + local db="$project_dir/.mutation.db" + + if [ "$status" = "SUCCESS" ] && [ -f "$db" ]; then + read -r total survived killed rate < <(python3 -c " +import sqlite3 +db = '$db' +con = sqlite3.connect(db) +cur = con.cursor() +try: + cur.execute('SELECT COUNT(*) FROM mutations') + total = cur.fetchone()[0] + cur.execute(\"SELECT COUNT(*) FROM results WHERE status=1\") + survived = cur.fetchone()[0] + killed = total - survived + rate = (survived / total * 100) if total > 0 else 0.0 + print(total, survived, killed, f'{rate:.1f}%') +except Exception as e: + print('0 0 0 0.0%') +con.close() +" 2>/dev/null || echo "0 0 0 0.0%") + printf '%s %-8s %-20s total=%-6s survived=%-6s killed=%-6s rate=%s\n' \ + "$(ts)" "$status" "$repo_name" "$total" "$survived" "$killed" "$rate" \ + >> "$SUMMARY_LOG" + else + printf '%s %-8s %-20s total=%-6s survived=%-6s killed=%-6s rate=%s\n' \ + "$(ts)" "$status" "$repo_name" "-" "-" "-" "-" \ + >> "$SUMMARY_LOG" + fi +} + +# ─── Regenerate summary.md from all .mutation.db files ──────────────────────── +_update_summaries() { + local progress_done total_urls + progress_done=$(grep -c '' "$SUMMARY_LOG" 2>/dev/null || echo 0) + total_urls=$(grep -cP 'https://\S+' "$URLS_FILE" 2>/dev/null || echo 67) + + local _py_script + _py_script=$(cat <<'PYEOF' +import sys, sqlite3, os, glob, subprocess +from datetime import datetime + +tmp_dir = sys.argv[1] +progress = int(sys.argv[2]) +total = int(sys.argv[3]) + +def get_coverage(project_dir): + venv_python = os.path.join(project_dir, "venv", "bin", "python3") + cov_file = os.path.join(project_dir, ".coverage") + if not os.path.exists(venv_python) or not os.path.exists(cov_file): + return None + try: + r = subprocess.run( + [venv_python, "-m", "coverage", "report", + "--format=total", "--ignore-errors", "--fail-under=0", + "--include=*.py", "--omit=venv/*,mutation.py,conftest.py,setup.py"], + cwd=project_dir, capture_output=True, text=True, timeout=15 + ) + val = r.stdout.strip() + return int(val) if val.isdigit() else None + except Exception: + return None + +def get_loc(project_dir): + total = 0 + for root, dirs, files in os.walk(project_dir): + dirs[:] = [d for d in dirs if d not in ("venv", ".git", "__pycache__", ".tox", "build", "dist")] + for f in files: + if not f.endswith(".py"): + continue + if f in ("mutation.py", "conftest.py", "setup.py"): + continue + path = os.path.join(root, f) + # skip test files + rel = os.path.relpath(path, project_dir) + if any(p in rel for p in ("test", "tests", "testing")): + continue + try: + with open(path, "rb") as fh: + total += sum(1 for _ in fh) + except Exception: + pass + return total + +rows = [] +for db_path in sorted(glob.glob(f"{tmp_dir}/*/.mutation.db")): + project_dir = os.path.dirname(db_path) + repo_name = os.path.basename(project_dir) + try: + con = sqlite3.connect(db_path) + cur = con.cursor() + cur.execute("SELECT COUNT(*) FROM mutations") + total_mut = cur.fetchone()[0] + cur.execute("SELECT COUNT(*) FROM results WHERE status=1") + survived = cur.fetchone()[0] + killed = total_mut - survived + rate = (survived / total_mut * 100) if total_mut > 0 else 0.0 + con.close() + cov = get_coverage(project_dir) + loc = get_loc(project_dir) + rows.append((repo_name, total_mut, killed, survived, rate, cov, loc, "done")) + except Exception: + rows.append((repo_name, 0, 0, 0, 0.0, None, 0, "error")) + +rows.sort(key=lambda r: r[4], reverse=True) + +now = datetime.now().strftime("%Y-%m-%d %H:%M") +lines = [] +lines.append("# Mutation Testing — tip-of-the-top") +lines.append(f"Updated: {now} | Progress: {progress}/{total}") +lines.append("") +lines.append("| # | Project | LOC | Coverage | Mutations | Killed | Survived | Survive% |") +lines.append("|---|---------|----:|--------:|--------:|-------:|---------:|---------:|") +for i, (name, tot, killed, survived, rate, cov, loc, status) in enumerate(rows, 1): + cov_str = f"{cov}%" if cov is not None else "—" + loc_str = f"{loc:,}" if loc else "—" + lines.append( + f"| {i} | {name} | {loc_str} | {cov_str} | {tot:,} | {killed:,} | {survived:,} | {rate:.1f}% |" + ) + +print("\n".join(lines)) +PYEOF +) + python3 -c "$_py_script" "$TMP_DIR" "$progress_done" "$total_urls" \ + > "$SUMMARY_MD.tmp" 2>/dev/null \ + && mv "$SUMMARY_MD.tmp" "$SUMMARY_MD" +} + +# ─── Call claude to debug baseline test failures ─────────────────────────────── +_claude_debug() { + local project_dir="$1" + local repo_name="$2" + echo "--- Calling claude to debug $repo_name ---" + # Unset CLAUDECODE so nested claude invocation is allowed; cap at 30 min + CLAUDECODE="" timeout 1800 claude --dangerously-skip-permissions -p \ + "You are fixing a Python project so its test suite passes baseline. +Project: $repo_name +Directory: $project_dir +Venv: $project_dir/venv + +Steps: +1. Run: cd '$project_dir' && venv/bin/python3 -m pytest --no-header --tb=short -q -x 2>&1 | head -40 +2. Diagnose what's failing (missing deps, import errors, config issues, submodules, etc.) +3. Fix it: install missing packages with venv/bin/pip, add collect_ignore to conftest.py, + run git submodule update --init, patch pyproject.toml/setup.cfg as needed. +4. Do NOT modify test logic or source code — only install deps or add pytest ignores. +5. Verify: venv/bin/python3 -m pytest --no-header --tb=no -q 2>&1 | tail -5 +6. Stop once tests pass (exit 0) or conclude it is unfixable." 2>&1 +} + +# Projects to skip (unfixable in this environment): +# anyio — OpenSSL 3.x TLS shutdown regression; test_send_eof_not_implemented fails +SKIP_REPOS=( anyio ) + +# ─── Core per-project logic ──────────────────────────────────────────────────── +_run_project() { + local url="$1" + local repo_name log_file project_dir + + repo_name="$(basename "$url" .git)" + project_dir="$TMP_DIR/$repo_name" + log_file="$LOG_DIR/${repo_name}.log" + + for _skip in "${SKIP_REPOS[@]}"; do + if [ "$repo_name" = "$_skip" ]; then + echo "[$(ts_long)] SKIPPED (skip list): $repo_name" + _record "$repo_name" "SKIPPED" + _update_summaries + return + fi + done + + { + echo "=== START: $repo_name [$(ts_long)] ===" + + # 1. Clone --depth=1 (skip if dir already exists) + if [ ! -d "$project_dir" ]; then + echo "--- Cloning $url ---" + git clone --depth=1 "$url" "$project_dir" \ + || { echo "FAILED: git clone"; _record "$repo_name" "FAILED"; _update_summaries; return; } + else + echo "--- Already cloned ---" + fi + + cd "$project_dir" || { echo "FAILED: cd"; _record "$repo_name" "FAILED"; _update_summaries; return; } + + # 2. Skip if .mutation.db already exists + if [ -f ".mutation.db" ]; then + echo "SKIPPED: .mutation.db exists" + _record "$repo_name" "SKIPPED" + _update_summaries + return + fi + + # 3. Create venv + if [ ! -d "venv" ]; then + echo "--- Creating venv ---" + python3 -m venv venv \ + || { echo "FAILED: venv creation"; _record "$repo_name" "FAILED"; _update_summaries; return; } + fi + + local PIP="$project_dir/venv/bin/pip" + local PYTHON="$project_dir/venv/bin/python3" + + # 4. Install mutation.py deps + echo "--- Installing mutation.py deps ---" + "$PIP" install -q --upgrade pip 2>&1 + "$PIP" install -q \ + aiostream docopt humanize loguru pygments \ + pytest-cov pytest-randomly pytest-xdist python-ulid \ + termcolor tqdm zstandard coverage 2>&1 + + # 5. Install project with extras fallback chain + echo "--- Installing project ---" + "$PIP" install -q -e ".[dev,test,tests,testing]" 2>/dev/null \ + || "$PIP" install -q -e ".[dev,test,tests]" 2>/dev/null \ + || "$PIP" install -q -e ".[dev,test]" 2>/dev/null \ + || "$PIP" install -q -e ".[dev]" 2>/dev/null \ + || "$PIP" install -q -e ".[test]" 2>/dev/null \ + || "$PIP" install -q -e ".[testing]" 2>/dev/null \ + || "$PIP" install -q -e "." 2>&1 \ + || echo "WARNING: project install partial" + + # 6a. Install any extra requirement files + for req in \ + requirements-dev.txt requirements-test.txt requirements_test.txt \ + test-requirements.txt requirements/test.txt requirements/dev.txt \ + requirements/testing.txt dev-requirements.txt; do + [ -f "$req" ] && "$PIP" install -q -r "$req" 2>/dev/null || true + done + + # 6b. Install PEP 735 dependency-groups (pip 24.3+ supports --group) + for grp in tests test testing dev; do + "$PIP" install -q --group "$grp" 2>/dev/null || true + done + + # 6c. Project-specific fixups + case "$repo_name" in + black) + # needs aiohttp (the [d] extra) so test_blackd.py can be collected + "$PIP" install -q "black[d]" aiohttp 2>&1 || true + ;; + cattrs) + # bench/ requires msgspec + PyYAML; ignore bench/ so baseline passes + "$PIP" install -q msgspec PyYAML 2>&1 || true + if ! grep -q "collect_ignore" conftest.py 2>/dev/null; then + printf '\ncollect_ignore_glob = ["bench/*"]\n' >> conftest.py + fi + ;; + dpath-python) + "$PIP" install -q nose2 hypothesis 2>&1 || true + ;; + gitlint) + "$PIP" install -q arrow click 2>&1 || true + ;; + "jaraco.text") + "$PIP" install -q inflect 2>&1 || true + ;; + jedi) + # parso + missing git submodule (typeshed) + "$PIP" install -q parso 2>&1 || true + git submodule update --init --recursive 2>&1 || true + ;; + "jmespath.py") + # extra/ requires hypothesis; ignore it so baseline collects cleanly + "$PIP" install -q hypothesis 2>&1 || true + if ! grep -q "collect_ignore" conftest.py 2>/dev/null; then + printf '\ncollect_ignore_glob = ["extra/*"]\n' >> conftest.py + fi + ;; + klein) + "$PIP" install -q treq 2>&1 || true + ;; + blist) + # ez_setup.py tries to download setuptools; pre-install it to avoid network fetch + "$PIP" install -q setuptools 2>&1 || true + "$PIP" install -q -e . 2>&1 || true + ;; + mpmath) + # editable install produces broken version metadata; reinstall as non-editable + "$PIP" install -q --force-reinstall . 2>&1 || true + ;; + pyparsing) + # railroad-diagrams + jinja2 needed for diagram tests + "$PIP" install -q railroad-diagrams jinja2 2>&1 || true + ;; + pyrsistent) + "$PIP" install -q hypothesis 2>&1 || true + ;; + pytz) + # zdump.out is a generated file that can't be reproduced; ignore the test + "$PIP" install -q pytz 2>&1 || true + if ! grep -q "collect_ignore" conftest.py 2>/dev/null; then + printf '\ncollect_ignore_glob = ["*/zdump*"]\n' >> conftest.py + fi + ;; + result) + "$PIP" install -q mypy pytest-mypy-plugins 2>&1 || true + ;; + returns) + "$PIP" install -q covdefaults pytest-mypy-plugins 2>&1 || true + ;; + thefuzz) + "$PIP" install -q pycodestyle hypothesis 2>&1 || true + ;; + glom) + "$PIP" install -q PyYAML 2>&1 || true + ;; + whenever) + "$PIP" install -q tzdata 2>&1 || true + # pytest.ini has filterwarnings=error which turns pytest-benchmark's + # xdist-incompatibility warning into a hard crash; suppress it + if ! grep -q "PytestBenchmarkWarning" pytest.ini 2>/dev/null; then + printf ' ignore::pytest_benchmark.logger.PytestBenchmarkWarning\n' >> pytest.ini + fi + ;; + tomlkit) + # tests/toml-test is a git submodule; init it now + git submodule update --init --recursive 2>&1 || true + ;; + "python-ftfy") + # doctest in source files has stale unicode-width expectation; skip doctests + "$PIP" install -q ftfy 2>&1 || true + if ! grep -qE "addopts.*doctest|doctest_optionflags" setup.cfg pytest.ini pyproject.toml 2>/dev/null; then + printf '\n[tool.pytest.ini_options]\naddopts = "--ignore-glob=**/*.py"\n' >> pyproject.toml 2>/dev/null || true + fi + ;; + esac + + # 7. Re-pin mutation.py deps (project may have downgraded something) + "$PIP" install -q \ + aiostream docopt humanize loguru pygments \ + pytest-cov pytest-randomly pytest-xdist python-ulid \ + termcolor tqdm zstandard coverage 2>&1 + + # 8. Copy mutation.py to project root + local _copied=0 + if [ ! -f "mutation.py" ]; then + cp "$MUTATION_PY" "mutation.py" + _copied=1 + fi + + # 9. Run mutation testing + echo "--- Running mutation.py play (workers=$WORKERS) ---" + PATH="$project_dir/venv/bin:$PATH" \ + "$PYTHON" "$MUTATION_PY" play --max-workers="$WORKERS" 2>&1 + local _rc=$? + + # 11. If rc==2 (baseline tests red) → call claude, then retry once + if [ "$_rc" = "2" ]; then + echo "--- Baseline tests failed (rc=2). Invoking claude debugger... ---" + _claude_debug "$project_dir" "$repo_name" + echo "--- Retrying mutation.py play after claude fix... ---" + PATH="$project_dir/venv/bin:$PATH" \ + "$PYTHON" "$MUTATION_PY" play --max-workers="$WORKERS" 2>&1 + _rc=$? + fi + + [ "$_copied" = "1" ] && rm -f "mutation.py" + + # 12. Record result and update summaries + if [ "$_rc" = "0" ]; then + echo "=== DONE: $repo_name [$(ts_long)] ===" + _record "$repo_name" "SUCCESS" + else + echo "FAILED: mutation.py play (rc=$_rc)" + _record "$repo_name" "FAILED" + fi + _update_summaries + + } > "$log_file" 2>&1 + + # Echo progress to stdout (outside the log redirect) + local result + result=$(tail -1 "$SUMMARY_LOG" 2>/dev/null || echo "?") + echo "[$(ts_long)] $result" +} + +# ─── Main: sequential execution ─────────────────────────────────────────────── +mapfile -t ALL_URLS < <(grep -oP 'https://\S+' "$URLS_FILE") +TOTAL="${#ALL_URLS[@]}" + +echo "" +echo "╔══════════════════════════════════════════════════════╗" +echo "║ Sequential Mutation Testing — tip-of-the-top ║" +printf "║ Projects: %-3d | Workers/project: %-3d | %s ║\n" "$TOTAL" "$WORKERS" "$(date '+%Y-%m-%d')" +echo "╚══════════════════════════════════════════════════════╝" +echo "" +echo "TMP dir: $TMP_DIR" +echo "Log dir: $LOG_DIR" +echo "Summary: $SUMMARY_LOG" +echo "" + +# Initialize summary log header if new +if [ ! -f "$SUMMARY_LOG" ]; then + printf '%-16s %-8s %-20s %-12s %-12s %-12s %s\n' \ + "timestamp" "status" "project" "total" "survived" "killed" "rate" \ + >> "$SUMMARY_LOG" + printf '%-16s %-8s %-20s %-12s %-12s %-12s %s\n' \ + "----------------" "--------" "--------------------" "------------" "------------" "------------" "----" \ + >> "$SUMMARY_LOG" +fi + +LAUNCHED=0 +for url in "${ALL_URLS[@]}"; do + repo_name="$(basename "$url" .git)" + LAUNCHED=$((LAUNCHED + 1)) + echo "[$(ts_long)] [$LAUNCHED/$TOTAL] Starting: $repo_name" + _run_project "$url" +done + +echo "" +echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" +echo " ALL DONE — $(ts_long)" +echo " Results: $SUMMARY_LOG" +echo " Report: $SUMMARY_MD" +echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" diff --git a/tests.py b/tests.py index 7e3d7a9..3ad7a6c 100644 --- a/tests.py +++ b/tests.py @@ -1,5 +1,52 @@ +import ast as stdlib_ast +import random import sys + from foobar.ex import decrement_by_two +import ast as _ast +from mutation import ( + AugAssignToAssign, + BreakToReturn, + Comparison, + ForceConditional, + MutateAssignment, + MutateCallArgs, + MutateContainment, + MutateContextManager, + MutateDefaultArgument, + MutateExceptionHandler, + MutateFString, + MutateGlobal, + MutateIdentity, + MutateIterator, + MutateKeyword, + MutateLambda, + MutateNumber, + MutateOperator, + MutateReturn, + MutateSlice, + MutateString, + MutateStringMethod, + MutateYield, + Mutation, + NegateCondition, + RemoveDecorator, + RemoveUnaryOp, + SwapArguments, + ZeroIteration, + iter_deltas, + Database, + diff_hash, + write_ignored_file, + mutation_ignored_gc, + CLASSIFICATION_REAL_GAP, + CLASSIFICATION_EQUIVALENT, + CLASSIFICATION_WONT_FIX, +) +from mutation import patch as mutation_patch + +if hasattr(_ast, "Match"): + from mutation import MutateMatchCase def test_one(): @@ -11,3 +58,621 @@ def test_one(): def test_two(): x = decrement_by_two(44) assert x < 44 + + +def test_mutate_string_method(): + source = "def f(s):\n return s.lower()\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [MutateStringMethod()])) + assert deltas + mutated = [mutation_patch(d, canonical) for d in deltas] + assert any("s.upper()" in m for m in mutated) + + +def test_mutate_string_method_lstrip_multi_target(): + # lstrip should produce both rstrip and removeprefix mutations + source = "def f(s):\n return s.lstrip('x')\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [MutateStringMethod()])) + mutated = [mutation_patch(d, canonical) for d in deltas] + assert any("rstrip" in m for m in mutated) + assert any("removeprefix" in m for m in mutated) + + +def test_mutate_call_args(): + source = "def f(a, b):\n return g(a, b)\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [MutateCallArgs()])) + assert deltas + mutated = [mutation_patch(d, canonical) for d in deltas] + assert any("g(None, b)" in m for m in mutated) + assert any("g(a, None)" in m for m in mutated) + assert any("g(b)" in m for m in mutated) + assert any("g(a)" in m for m in mutated) + + +def test_force_conditional(): + source = "def f(x):\n if x > 0:\n return 1\n return 0\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [ForceConditional()])) + assert deltas + mutated = [mutation_patch(d, canonical) for d in deltas] + assert any("if True" in m for m in mutated) + assert any("if False" in m for m in mutated) + + +def test_mutate_exception_handler(): + source = "def f():\n try:\n pass\n except ValueError:\n pass\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [MutateExceptionHandler()])) + assert deltas + mutated = [mutation_patch(d, canonical) for d in deltas] + assert any("except Exception" in m for m in mutated) + assert all("except ValueError" not in m for m in mutated) + + +def test_zero_iteration(): + source = "def f(items):\n for x in items:\n pass\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [ZeroIteration()])) + assert deltas + mutated = [mutation_patch(d, canonical) for d in deltas] + assert any("for x in []" in m for m in mutated) + + +def test_remove_decorator(): + source = "def decorator(f): return f\n\n@decorator\ndef f(): pass\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [RemoveDecorator()])) + assert deltas + mutated = [mutation_patch(d, canonical) for d in deltas] + assert any("@decorator" not in m for m in mutated) + + +def test_negate_condition(): + source = "def f(flag):\n if flag:\n return 1\n return 0\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [NegateCondition()])) + assert deltas + mutated = [mutation_patch(d, canonical) for d in deltas] + assert any("if not flag" in m for m in mutated) + + +def test_mutate_return(): + source = "def f(x):\n return x + 1\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [MutateReturn()])) + assert deltas + mutated = [mutation_patch(d, canonical) for d in deltas] + assert any("return None" in m for m in mutated) + assert any("return 0" in m for m in mutated) + assert any("return False" in m for m in mutated) + assert any("return ''" in m for m in mutated) + + +def test_mutate_lambda(): + source = "f = lambda x: x * 2\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [MutateLambda()])) + assert deltas + mutated = [mutation_patch(d, canonical) for d in deltas] + assert any("lambda x: None" in m for m in mutated) + + +def test_mutate_assignment(): + source = "def f():\n result = compute()\n return result\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [MutateAssignment()])) + assert deltas + mutated = [mutation_patch(d, canonical) for d in deltas] + assert any("result = None" in m for m in mutated) + + +def test_aug_assign_to_assign(): + source = "def f():\n x = 0\n x += 1\n return x\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [AugAssignToAssign()])) + assert deltas + mutated = [mutation_patch(d, canonical) for d in deltas] + assert any("+=" not in m and "x = 1" in m for m in mutated) + + +def test_remove_unary_op(): + source = "def f(x):\n return not x\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [RemoveUnaryOp()])) + assert deltas + mutated = [mutation_patch(d, canonical) for d in deltas] + assert any("not" not in m.split("def")[1] for m in mutated) + + +def test_mutate_identity(): + source = "def f(x):\n return x is None\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [MutateIdentity()])) + assert deltas + mutated = [mutation_patch(d, canonical) for d in deltas] + assert any("is not" in m for m in mutated) + + +def test_mutate_containment(): + source = "def f(x, c):\n return x in c\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [MutateContainment()])) + assert deltas + mutated = [mutation_patch(d, canonical) for d in deltas] + assert any("not in" in m for m in mutated) + + +def test_mutate_containment_not_in(): + source = "def f(x, c):\n return x not in c\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [MutateContainment()])) + assert deltas + mutated = [mutation_patch(d, canonical) for d in deltas] + assert any("not in" not in m and "in" in m for m in mutated) + + +def test_break_to_return(): + source = "def f():\n for x in range(10):\n break\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [BreakToReturn()])) + assert deltas + mutated = [mutation_patch(d, canonical) for d in deltas] + assert any("return" in m for m in mutated) + assert all("break" not in m for m in mutated) + + +def test_mutate_global(): + source = "x = 0\ndef f():\n global x\n x = 1\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [MutateGlobal()])) + assert deltas + mutated = [mutation_patch(d, canonical) for d in deltas] + assert any("global" not in m for m in mutated) + + +def test_mutate_nonlocal(): + source = "def outer():\n x = 0\n def inner():\n nonlocal x\n x = 1\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [MutateGlobal()])) + assert deltas + mutated = [mutation_patch(d, canonical) for d in deltas] + assert any("nonlocal" not in m for m in mutated) + + +def test_mutate_global_only_statement_skipped(): + # sole statement in body — removing it would produce empty body → skipped + source = "x = 0\ndef f():\n global x\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [MutateGlobal()])) + assert not deltas + + +def test_mutate_fstring(): + source = 'def f(name):\n return f"hello {name}"\n' + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [MutateFString()])) + assert deltas + mutated = [mutation_patch(d, canonical) for d in deltas] + assert any("{name}" not in m for m in mutated) + + +def test_mutate_fstring_multiple(): + source = 'def f(a, b):\n return f"{a} and {b}"\n' + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [MutateFString()])) + assert len(deltas) == 2 + mutated = [mutation_patch(d, canonical) for d in deltas] + assert any("{a}" not in m and "{b}" in m for m in mutated) + assert any("{b}" not in m and "{a}" in m for m in mutated) + + +def test_mutate_context_manager(): + source = "def f():\n with lock:\n do_stuff()\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [MutateContextManager()])) + assert deltas + mutated = [mutation_patch(d, canonical) for d in deltas] + # with stripped, body preserved + assert any("with" not in m.split("def")[1] for m in mutated) + assert any("do_stuff()" in m for m in mutated) + + +def test_mutate_context_manager_multi(): + source = "def f():\n with a, b:\n pass\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [MutateContextManager()])) + assert len(deltas) == 2 + mutated = [mutation_patch(d, canonical) for d in deltas] + assert any("with a:" in m for m in mutated) + assert any("with b:" in m for m in mutated) + + +def test_mutate_default_argument(): + source = "def f(x, y=1, z=2):\n return x + y + z\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [MutateDefaultArgument()])) + assert deltas + mutated = [mutation_patch(d, canonical) for d in deltas] + assert any("def f(x, y, z=2):" in m for m in mutated) # drop y's default + assert any("def f(x, y, z):" in m for m in mutated) # drop both defaults + + +def test_mutate_default_argument_kwonly(): + source = "def f(x, *, y=1):\n return x + y\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [MutateDefaultArgument()])) + assert deltas + mutated = [mutation_patch(d, canonical) for d in deltas] + assert any("def f(x, *, y):" in m for m in mutated) + + +def test_mutate_iterator(): + source = "def f(items):\n for x in items:\n pass\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [MutateIterator()])) + assert deltas + mutated = [mutation_patch(d, canonical) for d in deltas] + assert any("reversed(items)" in m for m in mutated) + assert any("shuffle" in m for m in mutated) + assert any("_mutation_seq_" in m for m in mutated) + + +def test_mutate_iterator_no_double_wrap(): + source = "def f(items):\n for x in reversed(items):\n pass\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [MutateIterator()])) + assert not deltas + + +def test_swap_arguments(): + source = "def f(a, b):\n return g(a, b)\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [SwapArguments()])) + assert deltas + mutated = [mutation_patch(d, canonical) for d in deltas] + assert any("g(b, a)" in m for m in mutated) + + +def test_mutate_slice(): + source = "def f(a):\n return a[1:3]\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [MutateSlice()])) + assert deltas + mutated = [mutation_patch(d, canonical) for d in deltas] + assert any("a[:3]" in m for m in mutated) + assert any("a[1:]" in m for m in mutated) + + +def test_mutate_slice_step_negation(): + # positive step → negated + source = "def f(a):\n return a[::2]\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [MutateSlice()])) + assert deltas + mutated = [mutation_patch(d, canonical) for d in deltas] + assert any("a[::-2]" in m for m in mutated) + + +def test_mutate_slice_step_negation_reverse(): + # negative step (reversal idiom) → stripped to positive + source = "def f(a):\n return a[::-1]\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [MutateSlice()])) + assert deltas + mutated = [mutation_patch(d, canonical) for d in deltas] + assert any("a[::1]" in m for m in mutated) + + +def test_mutate_yield(): + source = "def f():\n yield 42\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [MutateYield()])) + assert deltas + mutated = [mutation_patch(d, canonical) for d in deltas] + assert any("yield None" in m for m in mutated) + + +# -- regression tests for syntax-error mutations ------------------------------ + + +def _full_coverage(source): + """Return a coverage set spanning every line in source.""" + return set(range(1, source.count("\n") + 2)) + + +def test_no_syntax_error_mutations_empty_class_body(): + """DefinitionDrop on the sole method of a class produces an empty class + body, which is a SyntaxError. iter_deltas must not yield such deltas.""" + source = "class Foo:\n def bar(self):\n pass\n" + coverage = _full_coverage(source) + + bad = [] + for delta in iter_deltas(source, "test.py", coverage, list(Mutation.ALL)): + mutated = mutation_patch(delta, source) + try: + stdlib_ast.parse(mutated) + except SyntaxError: + bad.append(delta) + + assert not bad, "iter_deltas yielded {:d} syntax-error mutation(s):\n{}".format( + len(bad), "\n---\n".join(bad) + ) + + +def test_no_syntax_error_mutations_docstring(): + """Mutations on code with module- and function-level docstrings (the + structure of foobar/ex.py) must not produce syntax-error mutations.""" + source = ( + '"""Module docstring."""\n' + "\n" + "def decrement_by_two(a):\n" + ' """Function docstring."""\n' + " return a - 2\n" + ) + coverage = _full_coverage(source) + + bad = [] + for delta in iter_deltas(source, "test.py", coverage, list(Mutation.ALL)): + mutated = mutation_patch(delta, source) + try: + stdlib_ast.parse(mutated) + except SyntaxError: + bad.append(delta) + + assert not bad, "iter_deltas yielded {:d} syntax-error mutation(s):\n{}".format( + len(bad), "\n---\n".join(bad) + ) + + +def test_mutate_number(): + source = "def f():\n return 100\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + random.seed(42) + deltas = list(iter_deltas(source, "test.py", coverage, [MutateNumber()])) + assert deltas + for d in deltas: + m = mutation_patch(d, canonical) + assert "return 100" not in m + + +def test_mutate_string(): + source = "def f():\n return 'hello'\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [MutateString()])) + assert deltas + mutated = [mutation_patch(d, canonical) for d in deltas] + assert any("mutated string hello" in m for m in mutated) + + +def test_mutate_string_bytes(): + source = "def f():\n return b'data'\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [MutateString()])) + assert deltas + mutated = [mutation_patch(d, canonical) for d in deltas] + assert any("coffeebad" in m for m in mutated) + + +def test_mutate_keyword_bool_constants(): + source = "def f():\n return True\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [MutateKeyword()])) + assert deltas + mutated = [mutation_patch(d, canonical) for d in deltas] + assert any("return False" in m for m in mutated) + assert any("return None" in m for m in mutated) + + +def test_mutate_keyword_bool_op(): + source = "def f(a, b):\n return a and b\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [MutateKeyword()])) + assert deltas + mutated = [mutation_patch(d, canonical) for d in deltas] + assert any("a or b" in m for m in mutated) + + +def test_mutate_keyword_flow(): + source = "def f(items):\n for x in items:\n break\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [MutateKeyword()])) + assert deltas + mutated = [mutation_patch(d, canonical) for d in deltas] + assert any("continue" in m for m in mutated) + assert any("pass" in m for m in mutated) + + +def test_comparison(): + source = "def f(x):\n return x > 0\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [Comparison()])) + assert deltas + mutated = [mutation_patch(d, canonical) for d in deltas] + assert any("not (x > 0)" in m or "not x > 0" in m for m in mutated) + + +def test_mutate_operator_binop(): + source = "def f(a, b):\n return a + b\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [MutateOperator()])) + assert deltas + mutated = [mutation_patch(d, canonical) for d in deltas] + assert any("a - b" in m for m in mutated) + assert any("a * b" in m for m in mutated) + + +def test_mutate_operator_compare(): + source = "def f(a, b):\n return a < b\n" + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [MutateOperator()])) + assert deltas + mutated = [mutation_patch(d, canonical) for d in deltas] + assert any("a > b" in m for m in mutated) + assert any("a <= b" in m for m in mutated) + assert any("a == b" in m for m in mutated) + + +# -- Database and helper tests ------------------------------------------------- + + +def _make_db(tmp_path): + """Create a Database at a temp path with one mutation and one result.""" + import zstandard as zstd + from ulid import ULID + db = Database(str(tmp_path / "test.mutation.db")) + uid = ULID().to_uuid().bytes + diff = b"--- a/f.py\n+++ b/f.py\n@@ -1 +1 @@\n-x = 1\n+x = 2\n" + db.store_mutations([(uid, "f.py", zstd.compress(diff))]) + db.set_result(uid, 0) # survived + return db, uid, diff + + +def test_diff_hash(): + h = diff_hash("hello") + assert len(h) == 64 # sha256 hex digest + assert h == diff_hash("hello") # deterministic + assert h != diff_hash("world") + + +def test_write_ignored_file(tmp_path): + diff_text = "--- a/f.py\n+++ b/f.py\n@@ -1 +1 @@\n-x = 1\n+x = 2\n" + h = write_ignored_file(str(tmp_path), diff_text, "f.py", "always zero") + ignored = tmp_path / ".mutations.ignored" / "{}.diff".format(h) + assert ignored.exists() + content = ignored.read_text() + assert "Case 3: equivalent mutation" in content + assert "always zero" in content + assert diff_text in content + + +def test_write_ignored_file_no_reason(tmp_path): + diff_text = "--- a/f.py\n+++ b/f.py\n@@ -1 +1 @@\n-x = 1\n+x = 2\n" + h = write_ignored_file(str(tmp_path), diff_text, "f.py", "") + ignored = tmp_path / ".mutations.ignored" / "{}.diff".format(h) + content = ignored.read_text() + assert " \u2014 " not in content # no separator when no reason + + +def test_database_count_mutations(tmp_path): + db, uid, _ = _make_db(tmp_path) + assert db.count_mutations() == 1 + + +def test_database_set_classification(tmp_path): + db, uid, _ = _make_db(tmp_path) + db.set_classification(uid, CLASSIFICATION_REAL_GAP) + counts = db.get_classification_counts() + assert counts[CLASSIFICATION_REAL_GAP] == 1 + + +def test_database_list_results_for_replay_unclassified(tmp_path): + db, uid, _ = _make_db(tmp_path) + rows = db.list_results_for_replay() + assert any(r[0] == uid for r in rows) + + +def test_database_list_results_for_replay_wont_fix_excluded(tmp_path): + db, uid, _ = _make_db(tmp_path) + db.set_classification(uid, CLASSIFICATION_WONT_FIX) + rows = db.list_results_for_replay() + assert not any(r[0] == uid for r in rows) + + +def test_database_list_results_for_replay_equivalent_excluded(tmp_path): + db, uid, _ = _make_db(tmp_path) + db.set_classification(uid, CLASSIFICATION_EQUIVALENT) + rows = db.list_results_for_replay() + assert not any(r[0] == uid for r in rows) + + +def test_mutation_ignored_gc_removes_stale(tmp_path): + diff_text = "--- a/nosuchfile.py\n+++ b/nosuchfile.py\n@@ -1 +1 @@\n-x=1\n+x=2\n" + write_ignored_file(str(tmp_path), diff_text, "nosuchfile.py", "") + assert len(list((tmp_path / ".mutations.ignored").glob("*.diff"))) == 1 + mutation_ignored_gc(str(tmp_path)) + assert len(list((tmp_path / ".mutations.ignored").glob("*.diff"))) == 0 + + +def test_mutation_ignored_gc_keeps_valid(tmp_path): + import difflib + py_file = tmp_path / "m.py" + py_file.write_text("x = 1\n") + source = stdlib_ast.unparse(stdlib_ast.parse("x = 1\n")) + diff_lines = list(difflib.unified_diff( + source.splitlines(keepends=True), + "x = 2\n".splitlines(keepends=True), + fromfile="a/m.py", tofile="b/m.py", + )) + diff_text = "".join(diff_lines) + write_ignored_file(str(tmp_path), diff_text, "m.py", "") + mutation_ignored_gc(str(tmp_path)) + assert len(list((tmp_path / ".mutations.ignored").glob("*.diff"))) == 1 + + +def test_mutation_ignored_gc_no_dir(tmp_path): + # Should not raise when .mutations.ignored/ doesn't exist + mutation_ignored_gc(str(tmp_path)) + + +if hasattr(_ast, "Match"): + + def test_mutate_match_case(): + source = ( + "def f(x):\n" + " match x:\n" + " case 1:\n" + " return 'one'\n" + " case 2:\n" + " return 'two'\n" + ) + canonical = stdlib_ast.unparse(stdlib_ast.parse(source)) + coverage = _full_coverage(source) + deltas = list(iter_deltas(source, "test.py", coverage, [MutateMatchCase()])) + assert deltas + mutated = [mutation_patch(d, canonical) for d in deltas] + assert any("case 1:" not in m for m in mutated) + assert any("case 2:" not in m for m in mutated) diff --git a/tip-of-the-top.txt b/tip-of-the-top.txt new file mode 100644 index 0000000..f275857 --- /dev/null +++ b/tip-of-the-top.txt @@ -0,0 +1,67 @@ +https://github.com/agronholm/anyio +https://github.com/ariebovenberg/whenever +https://github.com/astanin/python-tabulate +https://github.com/asweigart/pyautogui +https://github.com/avian2/unidecode +https://github.com/beetbox/confuse +https://github.com/carpedm20/emoji +https://github.com/chaimleib/intervaltree +https://github.com/chardet/chardet +https://github.com/DanielStutzbach/blist +https://github.com/danthedeckie/simpleeval +https://github.com/dateparser/dateparser +https://github.com/dateutil/dateutil +https://github.com/davidhalter/jedi +https://github.com/dbader/schedule +https://github.com/dgilland/pydash +https://github.com/dpath-maintainers/dpath-python +https://github.com/dry-python/returns +https://github.com/encode/uvicorn +https://github.com/erikrose/parsimonious +https://github.com/google/latexify_py +https://github.com/grantjenks/python-sortedcontainers +https://github.com/hgrecco/pint +https://github.com/hynek/structlog +https://github.com/jaraco/inflect +https://github.com/jaraco/jaraco.text +https://github.com/jazzband/prettytable +https://github.com/jek/blinker +https://github.com/jmespath/jmespath.py +https://github.com/jmoiron/humanfriendly +https://github.com/joke2k/faker +https://github.com/jorisroovers/gitlint +https://github.com/jpadilla/pyjwt +https://github.com/keleshev/schema +https://github.com/kiorky/croniter +https://github.com/Knio/dominate +https://github.com/lark-parser/lark +https://github.com/mahmoud/boltons +https://github.com/mahmoud/glom +https://github.com/marshmallow-code/marshmallow +https://github.com/miracle2k/webassets +https://github.com/mitsuhiko/itsdangerous +https://github.com/more-itertools/more-itertools +https://github.com/mpmath/mpmath +https://github.com/networkx/networkx +https://github.com/nickcoutsos/python-statemachine +https://github.com/okunishinishi/python-stringcase +https://github.com/psf/black +https://github.com/Pylons/webob +https://github.com/pyparsing/pyparsing +https://github.com/python-attrs/cattrs +https://github.com/python-humanize/humanize +https://github.com/pytoolz/toolz +https://github.com/pytransitions/transitions +https://github.com/r1chardj0n3s/parse +https://github.com/rspeer/python-ftfy +https://github.com/rustedpy/result +https://github.com/scrapinghub/extruct +https://github.com/sdispater/tomlkit +https://github.com/seatgeek/thefuzz +https://github.com/simplejson/simplejson +https://github.com/stub42/pytz +https://github.com/svinota/pyroute2 +https://github.com/Tinche/aiofiles +https://github.com/tobgu/pyrsistent +https://github.com/twisted/klein +https://github.com/un33k/python-slugify