Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions tests/unit/executor/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,23 @@ def test_empty(self):
cache_lst = get_cache_data(cache_directory="rather_this_dir")
self.assertEqual(len(cache_lst), 1)

def test_executor_dependencies(self):
with TestClusterExecutor(cache_directory="cache_dir") as exe:
cloudpickle_register(ind=1)
fs1 = exe.submit(add_with_sleep, 1, 1)
fs2 = exe.submit(add_with_sleep, fs1, 1)
fs3 = exe.submit(add_with_sleep, fs1, fs2)
self.assertFalse(fs1.done())
self.assertFalse(fs2.done())
self.assertFalse(fs3.done())
self.assertEqual(fs1.result(), 2)
self.assertEqual(fs2.result(), 3)
self.assertEqual(fs3.result(), 5)
self.assertEqual(len(os.listdir("cache_dir")), 3)
self.assertTrue(fs1.done())
self.assertTrue(fs2.done())
self.assertTrue(fs3.done())

def test_executor_dependency_plot(self):
with TestClusterExecutor(
plot_dependency_graph=True,
Expand Down
26 changes: 26 additions & 0 deletions tests/unit/executor/test_flux_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ def stop_function():
return True


def add_with_sleep(parameter_1, parameter_2):
sleep(1)
return parameter_1 + parameter_2


@unittest.skipIf(
skip_flux_test or skip_mpi4py_test,
"h5py or mpi4py or flux are not installed, so the h5py, flux and mpi4py tests are skipped.",
Expand Down Expand Up @@ -81,6 +86,27 @@ def test_executor_blockallocation(self):
self.assertEqual(len(os.listdir("executorlib_cache")), 2)
self.assertTrue(fs1.done())

def test_executor_dependencies(self):
with FluxClusterExecutor(
resource_dict={"cores": 1, "cwd": "executorlib_cache"},
block_allocation=False,
cache_directory="executorlib_cache",
pmi_mode=pmi,
) as exe:
fs1 = exe.submit(add_with_sleep, 1, 1)
fs2 = exe.submit(add_with_sleep, fs1, 1)
fs3 = exe.submit(add_with_sleep, fs1, fs2)
self.assertFalse(fs1.done())
self.assertFalse(fs2.done())
self.assertFalse(fs3.done())
self.assertEqual(fs1.result(), 2)
self.assertEqual(fs2.result(), 3)
self.assertEqual(fs3.result(), 5)
self.assertEqual(len(os.listdir("executorlib_cache")), 6)
self.assertTrue(fs1.done())
self.assertTrue(fs2.done())
self.assertTrue(fs3.done())

def test_executor_blockallocation_echo(self):
with FluxClusterExecutor(
resource_dict={"cores": 1, "cwd": "executorlib_cache"},
Expand Down
30 changes: 30 additions & 0 deletions tests/unit/executor/test_slurm_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import importlib
import unittest
import shutil
from time import sleep

from executorlib import SlurmClusterExecutor
from executorlib.standalone.serialize import cloudpickle_register
Expand Down Expand Up @@ -34,6 +35,9 @@
#SBATCH --chdir={{working_directory}}
#SBATCH --get-user-env=L
#SBATCH --ntasks={{cores}}
{%- if dependency_list %}
#SBATCH --dependency=afterok:{{ dependency_list | join(',') }}
{%- endif %}

{{command}}
"""
Expand All @@ -47,6 +51,11 @@ def mpi_funct(i):
return i, size, rank


def add_with_sleep(parameter_1, parameter_2):
sleep(1)
return parameter_1 + parameter_2


@unittest.skipIf(
skip_slurm_test or skip_mpi4py_test or skip_h5py_test,
"h5py or mpi4py or SLRUM are not installed, so the h5py, slurm and mpi4py tests are skipped.",
Expand All @@ -66,6 +75,27 @@ def test_executor(self):
self.assertEqual(len(os.listdir("executorlib_cache")), 3)
self.assertTrue(fs1.done())

def test_executor_dependencies(self):
with SlurmClusterExecutor(
resource_dict={"cores": 1, "cwd": "executorlib_cache", "submission_template": submission_template},
block_allocation=False,
cache_directory="executorlib_cache",
pmi_mode="pmi2",
) as exe:
fs1 = exe.submit(add_with_sleep, 1, 1)
fs2 = exe.submit(add_with_sleep, fs1, 1)
fs3 = exe.submit(add_with_sleep, fs1, fs2)
self.assertFalse(fs1.done())
self.assertFalse(fs2.done())
self.assertFalse(fs3.done())
self.assertEqual(fs1.result(), 2)
self.assertEqual(fs2.result(), 3)
self.assertEqual(fs3.result(), 5)
self.assertEqual(len(os.listdir("executorlib_cache")), 5)
self.assertTrue(fs1.done())
self.assertTrue(fs2.done())
self.assertTrue(fs3.done())

def test_executor_no_cwd(self):
with SlurmClusterExecutor(
resource_dict={"cores": 2, "submission_template": submission_template},
Expand Down
Loading