diff --git a/docs/solvers.rst b/docs/solvers.rst index 5c4476b..636b2df 100644 --- a/docs/solvers.rst +++ b/docs/solvers.rst @@ -111,13 +111,18 @@ Notice this local optimum may be different for distinct perturbation schemes and max_processing_time {None} Maximum processing time in seconds. If not provided, the method stops - only when a local minimum is obtained + only when a local minimum is obtained or if max_iterations is set. - log_file + rng {None} + A python random number generator which can be seeded to ensure + consistent results given a call to this function. If not provided + the initial random number generator is used. + + log_file {None} If not `None`, creates a log file with details about the whole execution - verbose + verbose {False} If true, prints algorithm status every iteration @@ -138,6 +143,8 @@ An implementation of the `Simulated Annealing Tuple[List, float]: @@ -41,6 +44,10 @@ def solve_tsp_local_search( Maximum processing time in seconds. If not provided, the method stops only when a local minimum is obtained + rng + Random number generator to be passed to the pertubation scheme. If not + provided, the initial random generator is used. + log_file If not `None`, creates a log file with details about the whole execution @@ -79,7 +86,9 @@ def solve_tsp_local_search( while improvement and (not stop_early): improvement = False - for n_index, xn in enumerate(neighborhood_gen[perturbation_scheme](x)): + for n_index, xn in enumerate( + neighborhood_gen[perturbation_scheme](x, rng) + ): if default_timer() - tic > max_processing_time: _print_message(TIME_LIMIT_MSG, verbose, log_file_handler) stop_early = True diff --git a/python_tsp/heuristics/perturbation_schemes.py b/python_tsp/heuristics/perturbation_schemes.py index 73a4ae0..0b79b5a 100644 --- a/python_tsp/heuristics/perturbation_schemes.py +++ b/python_tsp/heuristics/perturbation_schemes.py @@ -13,64 +13,78 @@ .. [2] 2-opt: https://en.wikipedia.org/wiki/2-opt """ -from random import sample -from typing import Callable, Dict, Generator, List +from random import Random +from typing import Callable, Dict, Generator, List, Union +initial_random_generator = Random() -def ps1_gen(x: List[int]) -> Generator[List[int], List[int], None]: + +def ps1_gen( + x: List[int], rng: Union[Random, None] = None +) -> Generator[List[int], List[int], None]: """PS1 perturbation scheme: Swap two adjacent terms [1] This scheme has at most n - 1 swaps. """ + rng = rng or initial_random_generator n = len(x) i_range = range(1, n - 1) - for i in sample(i_range, len(i_range)): + for i in rng.sample(i_range, len(i_range)): xn = x.copy() xn[i], xn[i + 1] = xn[i + 1], xn[i] yield xn -def ps2_gen(x: List[int]) -> Generator[List[int], List[int], None]: +def ps2_gen( + x: List[int], rng: Union[Random, None] = None +) -> Generator[List[int], List[int], None]: """PS2 perturbation scheme: Swap any two elements [1] This scheme has n * (n - 1) / 2 swaps. """ + rng = rng or initial_random_generator n = len(x) i_range = range(1, n - 1) - for i in sample(i_range, len(i_range)): + for i in rng.sample(i_range, len(i_range)): j_range = range(i + 1, n) - for j in sample(j_range, len(j_range)): + for j in rng.sample(j_range, len(j_range)): xn = x.copy() xn[i], xn[j] = xn[j], xn[i] yield xn -def ps3_gen(x: List[int]) -> Generator[List[int], List[int], None]: +def ps3_gen( + x: List[int], rng: Union[Random, None] = None +) -> Generator[List[int], List[int], None]: """PS3 perturbation scheme: A single term is moved [1] This scheme has n * (n - 1) swaps. """ + rng = rng or initial_random_generator n = len(x) i_range = range(1, n) - for i in sample(i_range, len(i_range)): + for i in rng.sample(i_range, len(i_range)): j_range = [j for j in range(1, n) if j != i] - for j in sample(j_range, len(j_range)): + for j in rng.sample(j_range, len(j_range)): xn = x.copy() node = xn.pop(i) xn.insert(j, node) yield xn -def ps4_gen(x: List[int]) -> Generator[List[int], List[int], None]: +def ps4_gen( + x: List[int], rng: Union[Random, None] = None +) -> Generator[List[int], List[int], None]: """PS4 perturbation scheme: A subsequence is moved [1]""" + rng = rng or initial_random_generator n = len(x) i_range = range(1, n) - for i in sample(i_range, len(i_range)): + for i in rng.sample(i_range, len(i_range)): j_range = range(i + 1, n + 1) - for j in sample(j_range, len(j_range)): + for j in rng.sample(j_range, len(j_range)): k_range = [k for k in range(1, n + 1) if k not in range(i, j + 1)] - for k in sample(k_range, len(k_range)): + for k in rng.sample(k_range, len(k_range)): xn = x.copy() if k < i: xn = xn[:k] + xn[i:j] + xn[k:i] + xn[j:] @@ -79,29 +93,35 @@ def ps4_gen(x: List[int]) -> Generator[List[int], List[int], None]: yield xn -def ps5_gen(x: List[int]) -> Generator[List[int], List[int], None]: +def ps5_gen( + x: List[int], rng: Union[Random, None] = None +) -> Generator[List[int], List[int], None]: """PS5 perturbation scheme: A subsequence is reversed [1]""" + rng = rng or initial_random_generator n = len(x) i_range = range(1, n) - for i in sample(i_range, len(i_range)): + for i in rng.sample(i_range, len(i_range)): j_range = range(i + 2, n + 1) - for j in sample(j_range, len(j_range)): + for j in rng.sample(j_range, len(j_range)): xn = x.copy() xn = xn[:i] + list(reversed(xn[i:j])) + xn[j:] yield xn -def ps6_gen(x: List[int]) -> Generator[List[int], List[int], None]: +def ps6_gen( + x: List[int], rng: Union[Random, None] = None +) -> Generator[List[int], List[int], None]: """PS6 perturbation scheme: A subsequence is reversed and moved [1]""" + rng = rng or initial_random_generator n = len(x) i_range = range(1, n) - for i in sample(i_range, len(i_range)): + for i in rng.sample(i_range, len(i_range)): j_range = range(i + 1, n + 1) - for j in sample(j_range, len(j_range)): + for j in rng.sample(j_range, len(j_range)): k_range = [k for k in range(1, n + 1) if k not in range(i, j + 1)] - for k in sample(k_range, len(k_range)): + for k in rng.sample(k_range, len(k_range)): xn = x.copy() if k < i: xn = xn[:k] + list(reversed(xn[i:j])) + xn[k:i] + xn[j:] @@ -110,13 +130,17 @@ def ps6_gen(x: List[int]) -> Generator[List[int], List[int], None]: yield xn -def two_opt_gen(x: List[int]) -> Generator[List[int], List[int], None]: +def two_opt_gen( + x: List[int], rng: Union[Random, None] = None +) -> Generator[List[int], List[int], None]: """2-opt perturbation scheme [2]""" + rng = rng or initial_random_generator + n = len(x) i_range = range(2, n) - for i in sample(i_range, len(i_range)): + for i in rng.sample(i_range, len(i_range)): j_range = range(i + 1, n + 1) - for j in sample(j_range, len(j_range)): + for j in rng.sample(j_range, len(j_range)): xn = x.copy() xn = xn[: i - 1] + list(reversed(xn[i - 1 : j])) + xn[j:] yield xn @@ -124,7 +148,10 @@ def two_opt_gen(x: List[int]) -> Generator[List[int], List[int], None]: # Mapping with all possible neighborhood generators in this module neighborhood_gen: Dict[ - str, Callable[[List[int]], Generator[List[int], List[int], None]] + str, + Callable[ + [List[int], Union[Random, None]], Generator[List[int], List[int], None] + ], ] = { "ps1": ps1_gen, "ps2": ps2_gen, diff --git a/python_tsp/heuristics/record_to_record.py b/python_tsp/heuristics/record_to_record.py index 89a0a60..da22e1f 100644 --- a/python_tsp/heuristics/record_to_record.py +++ b/python_tsp/heuristics/record_to_record.py @@ -1,4 +1,4 @@ -from random import randint +from random import Random, randint from typing import List, Optional, TextIO import numpy as np @@ -21,6 +21,7 @@ def solve_tsp_record_to_record( distance_matrix: np.ndarray, x0: Optional[List[int]] = None, max_iterations: Optional[int] = None, + rng: Optional[Random] = None, log_file: Optional[str] = None, verbose: bool = False, ): @@ -41,6 +42,10 @@ def solve_tsp_record_to_record( The maximum number of iterations for the algorithm. If not specified, it defaults to the number of nodes in the distance matrix. + rng + Random number generator to be passed to the pertubation scheme. If not + provided, the initial random generator is used. + log_file If not `None`, creates a log file with details about the whole execution. @@ -62,6 +67,8 @@ def solve_tsp_record_to_record( max_iterations = max_iterations or n x, fx = setup_initial_solution(distance_matrix=distance_matrix, x0=x0) + new_int = rng.randint if rng else randint + log_file_handler = ( open(log_file, "w", encoding="utf-8") if log_file else None ) @@ -69,8 +76,8 @@ def solve_tsp_record_to_record( for iteration in range(1, max_iterations + 1): xn = x[:] for _ in range(2): - u = randint(1, n - 1) - v = randint(1, n - 1) + u = new_int(1, n - 1) + v = new_int(1, n - 1) xn[u], xn[v] = xn[v], xn[u] xn, fn = solve_tsp_lin_kernighan( diff --git a/python_tsp/heuristics/simulated_annealing.py b/python_tsp/heuristics/simulated_annealing.py index c19252e..b9dfb3c 100644 --- a/python_tsp/heuristics/simulated_annealing.py +++ b/python_tsp/heuristics/simulated_annealing.py @@ -1,5 +1,6 @@ from math import inf from timeit import default_timer +from random import Random from typing import List, Optional, Tuple, TextIO import numpy as np @@ -22,6 +23,7 @@ def solve_tsp_simulated_annealing( perturbation_scheme: str = "two_opt", alpha: float = 0.9, max_processing_time: Optional[float] = None, + rng: Optional[Random] = None, log_file: Optional[str] = None, verbose: bool = False, ) -> Tuple[List, float]: @@ -72,7 +74,9 @@ def solve_tsp_simulated_annealing( """ x, fx = setup_initial_solution(distance_matrix, x0) - temp = _initial_temperature(distance_matrix, x, fx, perturbation_scheme) + temp = _initial_temperature( + distance_matrix, x, fx, perturbation_scheme, rng=rng + ) max_processing_time = max_processing_time or inf log_file_handler = ( open(log_file, "w", encoding="utf-8") if log_file else None @@ -93,10 +97,10 @@ def solve_tsp_simulated_annealing( stop_early = True break - xn = _perturbation(x, perturbation_scheme) + xn = _perturbation(x, perturbation_scheme, rng=rng) fn = compute_permutation_distance(distance_matrix, xn) - if _acceptance_rule(fx, fn, temp): + if _acceptance_rule(fx, fn, temp, rng=rng): x, fx = xn, fn k_accepted += 1 k_noimprovements = 0 @@ -136,6 +140,7 @@ def _initial_temperature( x: List[int], fx: float, perturbation_scheme: str, + rng: Optional[Random], ) -> float: """Compute initial temperature Instead of relying on problem-dependent parameters, this function estimates @@ -159,7 +164,7 @@ def _initial_temperature( # Step 1 dfx_list = [] for _ in range(100): - xn = _perturbation(x, perturbation_scheme) + xn = _perturbation(x, perturbation_scheme, rng=rng) fn = compute_permutation_distance(distance_matrix, xn) dfx_list.append(fn - fx) @@ -170,19 +175,22 @@ def _initial_temperature( return -dfx_mean / np.log(tau0) -def _perturbation(x: List[int], perturbation_scheme: str): +def _perturbation( + x: List[int], perturbation_scheme: str, rng: Optional[Random] +): """Generate a random neighbor of a current solution ``x`` In this case, we can use the generators created in the `local_search` module, and pick the first solution. Since the neighborhood is randomized, it is the same as creating a random perturbation. """ - return next(neighborhood_gen[perturbation_scheme](x)) + return next(neighborhood_gen[perturbation_scheme](x, rng)) -def _acceptance_rule(fx: float, fn: float, temp: float) -> bool: +def _acceptance_rule( + fx: float, fn: float, temp: float, rng: Optional[Random] +) -> bool: """Metropolis acceptance rule""" + rand = rng.random() if rng else np.random.rand() dfx = fn - fx - return (dfx < 0) or ( - (dfx > 0) and (np.random.rand() <= np.exp(-(fn - fx) / temp)) - ) + return (dfx < 0) or ((dfx > 0) and (rand <= np.exp(-(fn - fx) / temp))) diff --git a/tests/heuristics/test_local_search.py b/tests/heuristics/test_local_search.py index a2b6c0c..a1a19e3 100644 --- a/tests/heuristics/test_local_search.py +++ b/tests/heuristics/test_local_search.py @@ -1,3 +1,4 @@ +import random import sys from io import StringIO @@ -68,6 +69,53 @@ def test_local_search_returns_equal_optimal_solution( assert fopt == fx +@pytest.mark.parametrize("scheme", PERTURBATION_SCHEMES) +@pytest.mark.parametrize( + "distance_matrix", [distance_matrix1, distance_matrix2, distance_matrix3] +) +def test_local_search_calls_rng(scheme, distance_matrix): + """ + Ensure that the rng is actually called when solving the TSP. + """ + + called = False + + class psuedo_rng(random.Random): + def getrandbits(self, k: int, /) -> int: + nonlocal called + called = True + return super().getrandbits(k) + + x = [0, 4, 2, 3, 1] + local_search.solve_tsp_local_search( + distance_matrix, x, perturbation_scheme=scheme, rng=psuedo_rng(0) + ) + + assert called + + +@pytest.mark.parametrize("scheme", PERTURBATION_SCHEMES) +@pytest.mark.parametrize( + "distance_matrix", [distance_matrix1, distance_matrix2, distance_matrix3] +) +def test_local_search_eql_with_same_seed(scheme, distance_matrix): + """ + Ensure that the same solution is returned when using the same seed. + """ + + x = [0, 4, 2, 3, 1] + xopt, fopt = local_search.solve_tsp_local_search( + distance_matrix, x, perturbation_scheme=scheme, rng=random.Random(42) + ) + + xopt2, fopt2 = local_search.solve_tsp_local_search( + distance_matrix, x, perturbation_scheme=scheme, rng=random.Random(42) + ) + + assert all(e1 == e2 for e1, e2 in zip(xopt, xopt2)) + assert fopt == fopt2 + + @pytest.mark.parametrize("scheme", PERTURBATION_SCHEMES) def test_local_search_with_time_constraints(scheme): """ diff --git a/tests/heuristics/test_record_to_record.py b/tests/heuristics/test_record_to_record.py index feee246..fb29047 100644 --- a/tests/heuristics/test_record_to_record.py +++ b/tests/heuristics/test_record_to_record.py @@ -1,3 +1,4 @@ +import random import pytest from python_tsp.heuristics import solve_tsp_record_to_record @@ -77,6 +78,53 @@ def test_record_to_record_returns_equal_optimal_solution( assert fopt == optimal_distance +@pytest.mark.parametrize( + "distance_matrix", [distance_matrix1, distance_matrix2, distance_matrix3] +) +def test_record_to_record_calls_rng(distance_matrix): + """ + Ensure that the rng is actually called when solving the TSP. + """ + + called = False + + class psuedo_rng(random.Random): + def getrandbits(self, k: int, /) -> int: + nonlocal called + called = True + return super().getrandbits(k) + + x0 = [0, 4, 2, 3, 1] + solve_tsp_record_to_record( + distance_matrix=distance_matrix, + x0=x0, + rng=psuedo_rng(0), + ) + + assert called + + +@pytest.mark.parametrize( + "distance_matrix", [distance_matrix1, distance_matrix2, distance_matrix3] +) +def test_record_to_record_eql_with_same_seed(distance_matrix): + """ + Ensure that the same solution is returned when using the same seed. + """ + + x0 = [0, 4, 2, 3, 1] + xopt, fopt = solve_tsp_record_to_record( + distance_matrix=distance_matrix, x0=x0, rng=random.Random(42) + ) + + xopt2, fopt2 = solve_tsp_record_to_record( + distance_matrix=distance_matrix, x0=x0, rng=random.Random(42) + ) + + assert all(e1 == e2 for e1, e2 in zip(xopt, xopt2)) + assert fopt == fopt2 + + def test_record_to_record_log_file_is_created_if_required(tmp_path): """ If a log_file is provided, it contains information about the execution. diff --git a/tests/heuristics/test_simulated_annealing.py b/tests/heuristics/test_simulated_annealing.py index c7a937b..3065512 100644 --- a/tests/heuristics/test_simulated_annealing.py +++ b/tests/heuristics/test_simulated_annealing.py @@ -1,3 +1,4 @@ +import random import sys from io import StringIO @@ -69,6 +70,52 @@ def test_simulated_annealing_with_time_constraints(permutation, scheme): assert simulated_annealing.TIME_LIMIT_MSG in captured_output.getvalue() +@pytest.mark.parametrize("scheme", PERTURBATION_SCHEMES) +@pytest.mark.parametrize( + "distance_matrix", [distance_matrix1, distance_matrix2, distance_matrix3] +) +def test_simulated_annealing_calls_rng(scheme, distance_matrix): + """ + Ensure that the rng is actually called when solving the TSP. + """ + + called = False + + class psuedo_rng(random.Random): + def getrandbits(self, k: int, /) -> int: + nonlocal called + called = True + return super().getrandbits(k) + + simulated_annealing.solve_tsp_simulated_annealing( + distance_matrix, perturbation_scheme=scheme, rng=psuedo_rng(0) + ) + + assert called + + +@pytest.mark.parametrize("scheme", PERTURBATION_SCHEMES) +@pytest.mark.parametrize( + "distance_matrix", [distance_matrix1, distance_matrix2, distance_matrix3] +) +def test_simulated_annealing_eql_with_same_seed(scheme, distance_matrix): + """ + Ensure that the same solution is returned when using the same seed. + """ + + x = [0, 4, 2, 3, 1] + xopt, fopt = simulated_annealing.solve_tsp_simulated_annealing( + distance_matrix, x, perturbation_scheme=scheme, rng=random.Random(42) + ) + + xopt2, fopt2 = simulated_annealing.solve_tsp_simulated_annealing( + distance_matrix, x, perturbation_scheme=scheme, rng=random.Random(42) + ) + + assert all(e1 == e2 for e1, e2 in zip(xopt, xopt2)) + assert fopt == fopt2 + + def test_log_file_is_created_if_required(permutation, tmp_path): """ If a log_file is provided, it contains information about the execution.