diff --git a/tests/unit/executor/test_api.py b/tests/unit/executor/test_api.py index 2b9a5d52..160c79f6 100644 --- a/tests/unit/executor/test_api.py +++ b/tests/unit/executor/test_api.py @@ -119,6 +119,23 @@ def test_empty(self): cache_lst = get_cache_data(cache_directory="rather_this_dir") self.assertEqual(len(cache_lst), 1) + def test_executor_dependencies(self): + with TestClusterExecutor(cache_directory="cache_dir") as exe: + cloudpickle_register(ind=1) + fs1 = exe.submit(add_with_sleep, 1, 1) + fs2 = exe.submit(add_with_sleep, fs1, 1) + fs3 = exe.submit(add_with_sleep, fs1, fs2) + self.assertFalse(fs1.done()) + self.assertFalse(fs2.done()) + self.assertFalse(fs3.done()) + self.assertEqual(fs1.result(), 2) + self.assertEqual(fs2.result(), 3) + self.assertEqual(fs3.result(), 5) + self.assertEqual(len(os.listdir("cache_dir")), 3) + self.assertTrue(fs1.done()) + self.assertTrue(fs2.done()) + self.assertTrue(fs3.done()) + def test_executor_dependency_plot(self): with TestClusterExecutor( plot_dependency_graph=True, diff --git a/tests/unit/executor/test_flux_cluster.py b/tests/unit/executor/test_flux_cluster.py index 57b58f2c..37661b13 100644 --- a/tests/unit/executor/test_flux_cluster.py +++ b/tests/unit/executor/test_flux_cluster.py @@ -47,6 +47,11 @@ def stop_function(): return True +def add_with_sleep(parameter_1, parameter_2): + sleep(1) + return parameter_1 + parameter_2 + + @unittest.skipIf( skip_flux_test or skip_mpi4py_test, "h5py or mpi4py or flux are not installed, so the h5py, flux and mpi4py tests are skipped.", @@ -81,6 +86,27 @@ def test_executor_blockallocation(self): self.assertEqual(len(os.listdir("executorlib_cache")), 2) self.assertTrue(fs1.done()) + def test_executor_dependencies(self): + with FluxClusterExecutor( + resource_dict={"cores": 1, "cwd": "executorlib_cache"}, + block_allocation=False, + cache_directory="executorlib_cache", + pmi_mode=pmi, + ) as exe: + fs1 = exe.submit(add_with_sleep, 1, 1) + fs2 = exe.submit(add_with_sleep, fs1, 1) + fs3 = exe.submit(add_with_sleep, fs1, fs2) + self.assertFalse(fs1.done()) + self.assertFalse(fs2.done()) + self.assertFalse(fs3.done()) + self.assertEqual(fs1.result(), 2) + self.assertEqual(fs2.result(), 3) + self.assertEqual(fs3.result(), 5) + self.assertEqual(len(os.listdir("executorlib_cache")), 6) + self.assertTrue(fs1.done()) + self.assertTrue(fs2.done()) + self.assertTrue(fs3.done()) + def test_executor_blockallocation_echo(self): with FluxClusterExecutor( resource_dict={"cores": 1, "cwd": "executorlib_cache"}, diff --git a/tests/unit/executor/test_slurm_cluster.py b/tests/unit/executor/test_slurm_cluster.py index 41057119..feccf2f0 100644 --- a/tests/unit/executor/test_slurm_cluster.py +++ b/tests/unit/executor/test_slurm_cluster.py @@ -2,6 +2,7 @@ import importlib import unittest import shutil +from time import sleep from executorlib import SlurmClusterExecutor from executorlib.standalone.serialize import cloudpickle_register @@ -34,6 +35,9 @@ #SBATCH --chdir={{working_directory}} #SBATCH --get-user-env=L #SBATCH --ntasks={{cores}} +{%- if dependency_list %} +#SBATCH --dependency=afterok:{{ dependency_list | join(',') }} +{%- endif %} {{command}} """ @@ -47,6 +51,11 @@ def mpi_funct(i): return i, size, rank +def add_with_sleep(parameter_1, parameter_2): + sleep(1) + return parameter_1 + parameter_2 + + @unittest.skipIf( skip_slurm_test or skip_mpi4py_test or skip_h5py_test, "h5py or mpi4py or SLRUM are not installed, so the h5py, slurm and mpi4py tests are skipped.", @@ -66,6 +75,27 @@ def test_executor(self): self.assertEqual(len(os.listdir("executorlib_cache")), 3) self.assertTrue(fs1.done()) + def test_executor_dependencies(self): + with SlurmClusterExecutor( + resource_dict={"cores": 1, "cwd": "executorlib_cache", "submission_template": submission_template}, + block_allocation=False, + cache_directory="executorlib_cache", + pmi_mode="pmi2", + ) as exe: + fs1 = exe.submit(add_with_sleep, 1, 1) + fs2 = exe.submit(add_with_sleep, fs1, 1) + fs3 = exe.submit(add_with_sleep, fs1, fs2) + self.assertFalse(fs1.done()) + self.assertFalse(fs2.done()) + self.assertFalse(fs3.done()) + self.assertEqual(fs1.result(), 2) + self.assertEqual(fs2.result(), 3) + self.assertEqual(fs3.result(), 5) + self.assertEqual(len(os.listdir("executorlib_cache")), 5) + self.assertTrue(fs1.done()) + self.assertTrue(fs2.done()) + self.assertTrue(fs3.done()) + def test_executor_no_cwd(self): with SlurmClusterExecutor( resource_dict={"cores": 2, "submission_template": submission_template},