diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 3be703e1..5cfdc7e4 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -7,67 +7,58 @@ on:
branches: [main, dev]
jobs:
- lint-and-test:
+ lint:
runs-on: ubuntu-latest
-
steps:
- - name: Checkout repository
- uses: actions/checkout@v4
+ - uses: actions/checkout@v4
- - name: Set up Python
- uses: actions/setup-python@v5
+ - uses: actions/setup-python@v5
with:
python-version: '3.11'
cache: 'pip'
+ - name: Install ruff
+ run: pip install ruff
+
+ - name: Check formatting
+ run: ruff format --check .
+
+ - name: Lint
+ run: ruff check . --output-format=github
+
+ test:
+ runs-on: ubuntu-latest
+ strategy:
+ matrix:
+ python-version: ['3.9', '3.10', '3.11', '3.12']
+
+ steps:
+ - uses: actions/checkout@v4
+
+ - uses: actions/setup-python@v5
+ with:
+ python-version: ${{ matrix.python-version }}
+ cache: 'pip'
+
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements-ci.txt
- # Uses minimal CI requirements (no tensorflow/heavy packages)
- - name: Run linter (ruff)
+ - name: Run tests
run: |
- ruff check . --select=E9,F63,F7,F82 --output-format=github \
- --exclude="Dockerfile.*" \
- --exclude="linktest/" \
- --exclude="measurements/" \
- --exclude="0mq/" \
- --exclude="ratc/"
- # E9: Runtime errors (syntax errors, etc.)
- # F63: Invalid print syntax
- # F7: Syntax errors in type comments
- # F82: Undefined names in __all__
- # Excludes: Dockerfiles (not Python), linktest (symlinks),
- # measurements/0mq/ratc (config-dependent experimental scripts)
-
- - name: Run tests (pytest)
- run: |
- set +e
pytest --tb=short -q \
+ --cov=concore_cli --cov=concore_base \
+ --cov-report=term-missing \
--ignore=measurements/ \
--ignore=0mq/ \
--ignore=ratc/ \
--ignore=linktest/
- status=$?
- set -e
- # Allow success if no tests are collected (pytest exit code 5)
- if [ "$status" -ne 0 ] && [ "$status" -ne 5 ]; then
- exit "$status"
- fi
- # Fails on real test failures, passes on no tests collected
docker-build:
runs-on: ubuntu-latest
- # Only run when Dockerfile.py or related files change
- if: |
- github.event_name == 'push' ||
- (github.event_name == 'pull_request' &&
- contains(github.event.pull_request.changed_files, 'Dockerfile'))
-
steps:
- - name: Checkout repository
- uses: actions/checkout@v4
+ - uses: actions/checkout@v4
- name: Check if Dockerfile.py changed
uses: dorny/paths-filter@v3
@@ -80,7 +71,4 @@ jobs:
- name: Validate Dockerfile build
if: steps.filter.outputs.dockerfile == 'true'
- run: |
- docker build -f Dockerfile.py -t concore-py-test .
- # Validates that Dockerfile.py can be built successfully
- # Does not push the image
+ run: docker build -f Dockerfile.py -t concore-py-test .
diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml
deleted file mode 100644
index 6b7cdf04..00000000
--- a/.github/workflows/tests.yml
+++ /dev/null
@@ -1,33 +0,0 @@
-name: Tests
-
-on:
- push:
- branches: [main, dev]
- pull_request:
- branches: [main, dev]
-
-jobs:
- test:
- runs-on: ubuntu-latest
- strategy:
- matrix:
- python-version: ['3.9', '3.10', '3.11', '3.12']
-
- steps:
- - name: Checkout repository
- uses: actions/checkout@v4
-
- - name: Set up Python ${{ matrix.python-version }}
- uses: actions/setup-python@v5
- with:
- python-version: ${{ matrix.python-version }}
-
- - name: Install dependencies
- run: |
- python -m pip install --upgrade pip
- pip install -r requirements.txt
- pip install -r requirements-dev.txt
- pip install pyzmq
-
- - name: Run tests
- run: pytest -v
\ No newline at end of file
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
new file mode 100644
index 00000000..461fd44e
--- /dev/null
+++ b/.pre-commit-config.yaml
@@ -0,0 +1,7 @@
+repos:
+ - repo: https://github.com/astral-sh/ruff-pre-commit
+ rev: v0.11.12
+ hooks:
+ - id: ruff
+ args: [--output-format=full]
+ - id: ruff-format
diff --git a/concore_cli/__init__.py b/concore_cli/__init__.py
index 8e8a3220..a5ccc5d1 100644
--- a/concore_cli/__init__.py
+++ b/concore_cli/__init__.py
@@ -2,4 +2,4 @@
from .cli import cli
-__all__ = ['cli']
+__all__ = ["cli"]
diff --git a/concore_cli/cli.py b/concore_cli/cli.py
index fadb8b6e..15801838 100644
--- a/concore_cli/cli.py
+++ b/concore_cli/cli.py
@@ -13,16 +13,18 @@
from . import __version__
console = Console()
-DEFAULT_EXEC_TYPE = 'windows' if os.name == 'nt' else 'posix'
+DEFAULT_EXEC_TYPE = "windows" if os.name == "nt" else "posix"
+
@click.group()
-@click.version_option(version=__version__, prog_name='concore')
+@click.version_option(version=__version__, prog_name="concore")
def cli():
pass
+
@cli.command()
-@click.argument('name', required=True)
-@click.option('--template', default='basic', help='Template type to use')
+@click.argument("name", required=True)
+@click.option("--template", default="basic", help="Template type to use")
def init(name, template):
"""Create a new concore project"""
try:
@@ -31,12 +33,21 @@ def init(name, template):
console.print(f"[red]Error:[/red] {str(e)}")
sys.exit(1)
+
@cli.command()
-@click.argument('workflow_file', type=click.Path(exists=True))
-@click.option('--source', '-s', default='src', help='Source directory')
-@click.option('--output', '-o', default='out', help='Output directory')
-@click.option('--type', '-t', default=DEFAULT_EXEC_TYPE, type=click.Choice(['windows', 'posix', 'docker']), help='Execution type')
-@click.option('--auto-build', is_flag=True, help='Automatically run build after generation')
+@click.argument("workflow_file", type=click.Path(exists=True))
+@click.option("--source", "-s", default="src", help="Source directory")
+@click.option("--output", "-o", default="out", help="Output directory")
+@click.option(
+ "--type",
+ "-t",
+ default=DEFAULT_EXEC_TYPE,
+ type=click.Choice(["windows", "posix", "docker"]),
+ help="Execution type",
+)
+@click.option(
+ "--auto-build", is_flag=True, help="Automatically run build after generation"
+)
def run(workflow_file, source, output, type, auto_build):
"""Run a concore workflow"""
try:
@@ -45,9 +56,10 @@ def run(workflow_file, source, output, type, auto_build):
console.print(f"[red]Error:[/red] {str(e)}")
sys.exit(1)
+
@cli.command()
-@click.argument('workflow_file', type=click.Path(exists=True))
-@click.option('--source', '-s', default='src', help='Source directory')
+@click.argument("workflow_file", type=click.Path(exists=True))
+@click.option("--source", "-s", default="src", help="Source directory")
def validate(workflow_file, source):
"""Validate a workflow file"""
try:
@@ -58,10 +70,11 @@ def validate(workflow_file, source):
console.print(f"[red]Error:[/red] {str(e)}")
sys.exit(1)
+
@cli.command()
-@click.argument('workflow_file', type=click.Path(exists=True))
-@click.option('--source', '-s', default='src', help='Source directory')
-@click.option('--json', 'output_json', is_flag=True, help='Output in JSON format')
+@click.argument("workflow_file", type=click.Path(exists=True))
+@click.option("--source", "-s", default="src", help="Source directory")
+@click.option("--json", "output_json", is_flag=True, help="Output in JSON format")
def inspect(workflow_file, source, output_json):
"""Inspect a workflow file and show its structure"""
try:
@@ -70,6 +83,7 @@ def inspect(workflow_file, source, output_json):
console.print(f"[red]Error:[/red] {str(e)}")
sys.exit(1)
+
@cli.command()
def status():
"""Show running concore processes"""
@@ -79,8 +93,9 @@ def status():
console.print(f"[red]Error:[/red] {str(e)}")
sys.exit(1)
+
@cli.command()
-@click.confirmation_option(prompt='Stop all running concore processes?')
+@click.confirmation_option(prompt="Stop all running concore processes?")
def stop():
"""Stop all running concore processes"""
try:
@@ -89,10 +104,11 @@ def stop():
console.print(f"[red]Error:[/red] {str(e)}")
sys.exit(1)
+
@cli.command()
-@click.argument('study_dir', type=click.Path(exists=True))
-@click.option('--interval', '-n', default=2.0, help='Refresh interval in seconds')
-@click.option('--once', is_flag=True, help='Print a single snapshot and exit')
+@click.argument("study_dir", type=click.Path(exists=True))
+@click.option("--interval", "-n", default=2.0, help="Refresh interval in seconds")
+@click.option("--once", is_flag=True, help="Print a single snapshot and exit")
def watch(study_dir, interval, once):
"""Watch a running simulation study for live monitoring"""
try:
@@ -101,5 +117,6 @@ def watch(study_dir, interval, once):
console.print(f"[red]Error:[/red] {str(e)}")
sys.exit(1)
-if __name__ == '__main__':
+
+if __name__ == "__main__":
cli()
diff --git a/concore_cli/commands/__init__.py b/concore_cli/commands/__init__.py
index 77820b85..e98d4cd5 100644
--- a/concore_cli/commands/__init__.py
+++ b/concore_cli/commands/__init__.py
@@ -5,4 +5,11 @@
from .stop import stop_all
from .watch import watch_study
-__all__ = ['init_project', 'run_workflow', 'validate_workflow', 'show_status', 'stop_all', 'watch_study']
+__all__ = [
+ "init_project",
+ "run_workflow",
+ "validate_workflow",
+ "show_status",
+ "stop_all",
+ "watch_study",
+]
diff --git a/concore_cli/commands/init.py b/concore_cli/commands/init.py
index 0b6badc3..eb73e916 100644
--- a/concore_cli/commands/init.py
+++ b/concore_cli/commands/init.py
@@ -1,9 +1,7 @@
-import os
-import shutil
from pathlib import Path
from rich.panel import Panel
-SAMPLE_GRAPHML = '''
+SAMPLE_GRAPHML = """
@@ -21,9 +19,9 @@
-'''
+"""
-SAMPLE_PYTHON = '''import concore
+SAMPLE_PYTHON = """import concore
concore.default_maxtime(100)
concore.delay = 0.02
@@ -36,9 +34,9 @@
val = concore.read(1,"data",init_simtime_val)
result = [v * 2 for v in val]
concore.write(1,"result",result,delta=0)
-'''
+"""
-README_TEMPLATE = '''# {project_name}
+README_TEMPLATE = """# {project_name}
A concore workflow project.
@@ -63,38 +61,41 @@
- Add Python/C++/MATLAB scripts to `src/`
- Use `concore validate workflow.graphml` to check your workflow
- Use `concore status` to monitor running processes
-'''
+"""
+
def init_project(name, template, console):
project_path = Path(name)
-
+
if project_path.exists():
raise FileExistsError(f"Directory '{name}' already exists")
-
+
console.print(f"[cyan]Creating project:[/cyan] {name}")
-
+
project_path.mkdir()
- (project_path / 'src').mkdir()
-
- workflow_file = project_path / 'workflow.graphml'
- with open(workflow_file, 'w') as f:
+ (project_path / "src").mkdir()
+
+ workflow_file = project_path / "workflow.graphml"
+ with open(workflow_file, "w") as f:
f.write(SAMPLE_GRAPHML)
-
- sample_script = project_path / 'src' / 'script.py'
- with open(sample_script, 'w') as f:
+
+ sample_script = project_path / "src" / "script.py"
+ with open(sample_script, "w") as f:
f.write(SAMPLE_PYTHON)
-
- readme_file = project_path / 'README.md'
- with open(readme_file, 'w') as f:
+
+ readme_file = project_path / "README.md"
+ with open(readme_file, "w") as f:
f.write(README_TEMPLATE.format(project_name=name))
-
+
console.print()
- console.print(Panel.fit(
- f"[green]✓[/green] Project created successfully!\n\n"
- f"Next steps:\n"
- f" cd {name}\n"
- f" concore validate workflow.graphml\n"
- f" concore run workflow.graphml",
- title="Success",
- border_style="green"
- ))
+ console.print(
+ Panel.fit(
+ f"[green]✓[/green] Project created successfully!\n\n"
+ f"Next steps:\n"
+ f" cd {name}\n"
+ f" concore validate workflow.graphml\n"
+ f" concore run workflow.graphml",
+ title="Success",
+ border_style="green",
+ )
+ )
diff --git a/concore_cli/commands/inspect.py b/concore_cli/commands/inspect.py
index c84fbed4..0dce24a8 100644
--- a/concore_cli/commands/inspect.py
+++ b/concore_cli/commands/inspect.py
@@ -2,262 +2,270 @@
from bs4 import BeautifulSoup
from rich.table import Table
from rich.tree import Tree
-from rich.panel import Panel
from collections import defaultdict
import re
+
def inspect_workflow(workflow_file, source_dir, output_json, console):
workflow_path = Path(workflow_file)
-
+
if output_json:
return _inspect_json(workflow_path, source_dir)
-
+
_inspect_rich(workflow_path, source_dir, console)
+
def _inspect_rich(workflow_path, source_dir, console):
console.print()
console.print(f"[bold cyan]Workflow:[/bold cyan] {workflow_path.name}")
console.print()
-
+
try:
- with open(workflow_path, 'r') as f:
+ with open(workflow_path, "r") as f:
content = f.read()
-
- soup = BeautifulSoup(content, 'xml')
-
- if not soup.find('graphml'):
+
+ soup = BeautifulSoup(content, "xml")
+
+ if not soup.find("graphml"):
console.print("[red]Not a valid GraphML file[/red]")
return
-
- nodes = soup.find_all('node')
- edges = soup.find_all('edge')
-
+
+ nodes = soup.find_all("node")
+ edges = soup.find_all("edge")
+
tree = Tree("📊 [bold]Workflow Overview[/bold]")
-
+
lang_counts = defaultdict(int)
node_files = []
missing_files = []
-
+
for node in nodes:
- label_tag = node.find('y:NodeLabel')
+ label_tag = node.find("y:NodeLabel")
if label_tag and label_tag.text:
label = label_tag.text.strip()
- if ':' in label:
- _, filename = label.split(':', 1)
+ if ":" in label:
+ _, filename = label.split(":", 1)
node_files.append(filename)
-
+
ext = Path(filename).suffix
- if ext == '.py':
- lang_counts['Python'] += 1
- elif ext == '.m':
- lang_counts['MATLAB'] += 1
- elif ext == '.java':
- lang_counts['Java'] += 1
- elif ext == '.cpp' or ext == '.hpp':
- lang_counts['C++'] += 1
- elif ext == '.v':
- lang_counts['Verilog'] += 1
+ if ext == ".py":
+ lang_counts["Python"] += 1
+ elif ext == ".m":
+ lang_counts["MATLAB"] += 1
+ elif ext == ".java":
+ lang_counts["Java"] += 1
+ elif ext == ".cpp" or ext == ".hpp":
+ lang_counts["C++"] += 1
+ elif ext == ".v":
+ lang_counts["Verilog"] += 1
else:
- lang_counts['Other'] += 1
-
+ lang_counts["Other"] += 1
+
src_dir = workflow_path.parent / source_dir
if not (src_dir / filename).exists():
missing_files.append(filename)
-
+
nodes_branch = tree.add(f"Nodes: [bold]{len(nodes)}[/bold]")
if lang_counts:
for lang, count in sorted(lang_counts.items(), key=lambda x: -x[1]):
nodes_branch.add(f"{lang}: {count}")
-
+
edges_branch = tree.add(f"Edges: [bold]{len(edges)}[/bold]")
-
+
edge_label_regex = re.compile(r"0x([a-fA-F0-9]+)_(\S+)")
zmq_count = 0
file_count = 0
-
+
for edge in edges:
- label_tag = edge.find('y:EdgeLabel')
+ label_tag = edge.find("y:EdgeLabel")
label_text = label_tag.text.strip() if label_tag and label_tag.text else ""
if label_text and edge_label_regex.match(label_text):
zmq_count += 1
else:
file_count += 1
-
+
if zmq_count > 0:
edges_branch.add(f"ZMQ: {zmq_count}")
if file_count > 0:
edges_branch.add(f"File-based: {file_count}")
-
- comm_type = "ZMQ (0mq)" if zmq_count > 0 else "File-based" if file_count > 0 else "None"
+
+ comm_type = (
+ "ZMQ (0mq)" if zmq_count > 0 else "File-based" if file_count > 0 else "None"
+ )
tree.add(f"Communication: [bold]{comm_type}[/bold]")
-
+
if missing_files:
- missing_branch = tree.add(f"[yellow]Missing files: {len(missing_files)}[/yellow]")
+ missing_branch = tree.add(
+ f"[yellow]Missing files: {len(missing_files)}[/yellow]"
+ )
for f in missing_files[:5]:
missing_branch.add(f"[yellow]{f}[/yellow]")
if len(missing_files) > 5:
missing_branch.add(f"[dim]...and {len(missing_files) - 5} more[/dim]")
-
+
console.print(tree)
console.print()
-
+
if nodes:
- table = Table(title="Node Details", show_header=True, header_style="bold magenta")
+ table = Table(
+ title="Node Details", show_header=True, header_style="bold magenta"
+ )
table.add_column("ID", style="cyan", width=12)
table.add_column("File", style="white")
table.add_column("Language", style="green")
table.add_column("Status", style="yellow")
-
+
for node in nodes[:10]:
- label_tag = node.find('y:NodeLabel')
+ label_tag = node.find("y:NodeLabel")
if label_tag and label_tag.text:
label = label_tag.text.strip()
- if ':' in label:
- node_id, filename = label.split(':', 1)
-
+ if ":" in label:
+ node_id, filename = label.split(":", 1)
+
ext = Path(filename).suffix
lang_map = {
- '.py': 'Python',
- '.m': 'MATLAB',
- '.java': 'Java',
- '.cpp': 'C++',
- '.hpp': 'C++',
- '.v': 'Verilog'
+ ".py": "Python",
+ ".m": "MATLAB",
+ ".java": "Java",
+ ".cpp": "C++",
+ ".hpp": "C++",
+ ".v": "Verilog",
}
- lang = lang_map.get(ext, 'Other')
-
+ lang = lang_map.get(ext, "Other")
+
src_dir = workflow_path.parent / source_dir
status = "✓" if (src_dir / filename).exists() else "✗"
-
+
table.add_row(node_id, filename, lang, status)
-
+
if len(nodes) > 10:
table.caption = f"Showing 10 of {len(nodes)} nodes"
-
+
console.print(table)
console.print()
-
+
if edges:
- edge_table = Table(title="Edge Connections", show_header=True, header_style="bold magenta")
+ edge_table = Table(
+ title="Edge Connections", show_header=True, header_style="bold magenta"
+ )
edge_table.add_column("From", style="cyan", width=12)
edge_table.add_column("To", style="cyan", width=12)
edge_table.add_column("Type", style="green")
-
+
for edge in edges[:10]:
- source = edge.get('source', 'unknown')
- target = edge.get('target', 'unknown')
-
- label_tag = edge.find('y:EdgeLabel')
+ source = edge.get("source", "unknown")
+ target = edge.get("target", "unknown")
+
+ label_tag = edge.find("y:EdgeLabel")
edge_type = "File"
if label_tag and label_tag.text:
if edge_label_regex.match(label_tag.text.strip()):
edge_type = "ZMQ"
-
+
edge_table.add_row(source, target, edge_type)
-
+
if len(edges) > 10:
edge_table.caption = f"Showing 10 of {len(edges)} edges"
-
+
console.print(edge_table)
console.print()
-
+
except FileNotFoundError:
console.print(f"[red]File not found:[/red] {workflow_path}")
except Exception as e:
console.print(f"[red]Inspection failed:[/red] {str(e)}")
+
def _inspect_json(workflow_path, source_dir):
import json
-
+
try:
- with open(workflow_path, 'r') as f:
+ with open(workflow_path, "r") as f:
content = f.read()
-
- soup = BeautifulSoup(content, 'xml')
-
- if not soup.find('graphml'):
- print(json.dumps({'error': 'Not a valid GraphML file'}, indent=2))
+
+ soup = BeautifulSoup(content, "xml")
+
+ if not soup.find("graphml"):
+ print(json.dumps({"error": "Not a valid GraphML file"}, indent=2))
return
-
- nodes = soup.find_all('node')
- edges = soup.find_all('edge')
-
+
+ nodes = soup.find_all("node")
+ edges = soup.find_all("edge")
+
lang_counts = defaultdict(int)
node_list = []
edge_list = []
missing_files = []
-
+
for node in nodes:
- label_tag = node.find('y:NodeLabel')
+ label_tag = node.find("y:NodeLabel")
if label_tag and label_tag.text:
label = label_tag.text.strip()
- if ':' in label:
- node_id, filename = label.split(':', 1)
-
+ if ":" in label:
+ node_id, filename = label.split(":", 1)
+
ext = Path(filename).suffix
lang_map = {
- '.py': 'python',
- '.m': 'matlab',
- '.java': 'java',
- '.cpp': 'cpp',
- '.hpp': 'cpp',
- '.v': 'verilog'
+ ".py": "python",
+ ".m": "matlab",
+ ".java": "java",
+ ".cpp": "cpp",
+ ".hpp": "cpp",
+ ".v": "verilog",
}
- lang = lang_map.get(ext, 'other')
+ lang = lang_map.get(ext, "other")
lang_counts[lang] += 1
-
+
src_dir = workflow_path.parent / source_dir
exists = (src_dir / filename).exists()
if not exists:
missing_files.append(filename)
-
- node_list.append({
- 'id': node_id,
- 'file': filename,
- 'language': lang,
- 'exists': exists
- })
-
+
+ node_list.append(
+ {
+ "id": node_id,
+ "file": filename,
+ "language": lang,
+ "exists": exists,
+ }
+ )
+
edge_label_regex = re.compile(r"0x([a-fA-F0-9]+)_(\S+)")
zmq_count = 0
file_count = 0
-
+
for edge in edges:
- source = edge.get('source')
- target = edge.get('target')
-
- label_tag = edge.find('y:EdgeLabel')
+ source = edge.get("source")
+ target = edge.get("target")
+
+ label_tag = edge.find("y:EdgeLabel")
label_text = label_tag.text.strip() if label_tag and label_tag.text else ""
- edge_type = 'file'
+ edge_type = "file"
if label_text and edge_label_regex.match(label_text):
- edge_type = 'zmq'
+ edge_type = "zmq"
zmq_count += 1
else:
file_count += 1
-
- edge_list.append({
- 'source': source,
- 'target': target,
- 'type': edge_type
- })
-
+
+ edge_list.append({"source": source, "target": target, "type": edge_type})
+
result = {
- 'workflow': str(workflow_path.name),
- 'nodes': {
- 'total': len(nodes),
- 'by_language': dict(lang_counts),
- 'list': node_list
+ "workflow": str(workflow_path.name),
+ "nodes": {
+ "total": len(nodes),
+ "by_language": dict(lang_counts),
+ "list": node_list,
},
- 'edges': {
- 'total': len(edges),
- 'zmq': zmq_count,
- 'file': file_count,
- 'list': edge_list
+ "edges": {
+ "total": len(edges),
+ "zmq": zmq_count,
+ "file": file_count,
+ "list": edge_list,
},
- 'missing_files': missing_files
+ "missing_files": missing_files,
}
-
+
print(json.dumps(result, indent=2))
-
+
except Exception as e:
- print(json.dumps({'error': str(e)}, indent=2))
+ print(json.dumps({"error": str(e)}, indent=2))
diff --git a/concore_cli/commands/run.py b/concore_cli/commands/run.py
index 91a876b7..a80dbe05 100644
--- a/concore_cli/commands/run.py
+++ b/concore_cli/commands/run.py
@@ -1,10 +1,10 @@
-import os
import sys
import subprocess
from pathlib import Path
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, TextColumn
+
def _find_mkconcore_path():
for parent in Path(__file__).resolve().parents:
candidate = parent / "mkconcore.py"
@@ -12,73 +12,89 @@ def _find_mkconcore_path():
return candidate
return None
+
def run_workflow(workflow_file, source, output, exec_type, auto_build, console):
workflow_path = Path(workflow_file).resolve()
source_path = Path(source).resolve()
output_path = Path(output).resolve()
-
+
if not source_path.exists():
raise FileNotFoundError(f"Source directory '{source}' not found")
-
+
if output_path.exists():
- console.print(f"[yellow]Warning:[/yellow] Output directory '{output}' already exists")
+ console.print(
+ f"[yellow]Warning:[/yellow] Output directory '{output}' already exists"
+ )
console.print("Remove it first or choose a different output directory")
return
-
+
console.print(f"[cyan]Workflow:[/cyan] {workflow_path.name}")
console.print(f"[cyan]Source:[/cyan] {source_path}")
console.print(f"[cyan]Output:[/cyan] {output_path}")
console.print(f"[cyan]Type:[/cyan] {exec_type}")
console.print()
-
+
mkconcore_path = _find_mkconcore_path()
if mkconcore_path is None:
- raise FileNotFoundError("mkconcore.py not found. Please install concore from source.")
+ raise FileNotFoundError(
+ "mkconcore.py not found. Please install concore from source."
+ )
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
- console=console
+ console=console,
) as progress:
task = progress.add_task("Generating workflow...", total=None)
-
+
try:
result = subprocess.run(
- [sys.executable, str(mkconcore_path), str(workflow_path), str(source_path), str(output_path), exec_type],
+ [
+ sys.executable,
+ str(mkconcore_path),
+ str(workflow_path),
+ str(source_path),
+ str(output_path),
+ exec_type,
+ ],
cwd=mkconcore_path.parent,
capture_output=True,
text=True,
- check=True
+ check=True,
)
-
+
progress.update(task, completed=True)
-
+
if result.stdout:
console.print(result.stdout)
-
- console.print(f"[green]✓[/green] Workflow generated in [cyan]{output_path}[/cyan]")
-
+
+ console.print(
+ f"[green]✓[/green] Workflow generated in [cyan]{output_path}[/cyan]"
+ )
+
except subprocess.CalledProcessError as e:
progress.stop()
- console.print(f"[red]Generation failed:[/red]")
+ console.print("[red]Generation failed:[/red]")
if e.stdout:
console.print(e.stdout)
if e.stderr:
console.print(e.stderr)
raise
-
+
if auto_build:
console.print()
- build_script = output_path / ('build.bat' if exec_type == 'windows' else 'build')
-
+ build_script = output_path / (
+ "build.bat" if exec_type == "windows" else "build"
+ )
+
if build_script.exists():
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
- console=console
+ console=console,
) as progress:
task = progress.add_task("Building workflow...", total=None)
-
+
try:
result = subprocess.run(
[str(build_script)],
@@ -86,23 +102,25 @@ def run_workflow(workflow_file, source, output, exec_type, auto_build, console):
capture_output=True,
text=True,
shell=True,
- check=True
+ check=True,
)
progress.update(task, completed=True)
- console.print(f"[green]✓[/green] Build completed")
+ console.print("[green]✓[/green] Build completed")
except subprocess.CalledProcessError as e:
progress.stop()
- console.print(f"[yellow]Build failed[/yellow]")
+ console.print("[yellow]Build failed[/yellow]")
if e.stderr:
console.print(e.stderr)
-
+
console.print()
- console.print(Panel.fit(
- f"[green]✓[/green] Workflow ready!\n\n"
- f"To run your workflow:\n"
- f" cd {output_path}\n"
- f" {'build.bat' if exec_type == 'windows' else './build'}\n"
- f" {'run.bat' if exec_type == 'windows' else './run'}",
- title="Next Steps",
- border_style="green"
- ))
+ console.print(
+ Panel.fit(
+ f"[green]✓[/green] Workflow ready!\n\n"
+ f"To run your workflow:\n"
+ f" cd {output_path}\n"
+ f" {'build.bat' if exec_type == 'windows' else './build'}\n"
+ f" {'run.bat' if exec_type == 'windows' else './run'}",
+ title="Next Steps",
+ border_style="green",
+ )
+ )
diff --git a/concore_cli/commands/status.py b/concore_cli/commands/status.py
index 6eaf6c8b..7ef1fca4 100644
--- a/concore_cli/commands/status.py
+++ b/concore_cli/commands/status.py
@@ -1,92 +1,106 @@
import psutil
import os
-from pathlib import Path
from rich.table import Table
from rich.panel import Panel
from datetime import datetime
+
def show_status(console):
console.print("[cyan]Scanning for concore processes...[/cyan]\n")
-
+
concore_processes = []
-
+
try:
current_pid = os.getpid()
-
- for proc in psutil.process_iter(['pid', 'name', 'cmdline', 'create_time', 'memory_info', 'cpu_percent']):
+
+ for proc in psutil.process_iter(
+ ["pid", "name", "cmdline", "create_time", "memory_info", "cpu_percent"]
+ ):
try:
- cmdline = proc.info.get('cmdline') or []
- name = proc.info.get('name', '').lower()
-
- if proc.info['pid'] == current_pid:
+ cmdline = proc.info.get("cmdline") or []
+ name = proc.info.get("name", "").lower()
+
+ if proc.info["pid"] == current_pid:
continue
-
- cmdline_str = ' '.join(cmdline) if cmdline else ''
-
+
+ cmdline_str = " ".join(cmdline) if cmdline else ""
+
is_concore = (
- 'concore' in cmdline_str.lower() or
- 'concore.py' in cmdline_str.lower() or
- any('concorekill.bat' in str(item) for item in cmdline) or
- (name in ['python.exe', 'python', 'python3'] and 'concore' in cmdline_str)
+ "concore" in cmdline_str.lower()
+ or "concore.py" in cmdline_str.lower()
+ or any("concorekill.bat" in str(item) for item in cmdline)
+ or (
+ name in ["python.exe", "python", "python3"]
+ and "concore" in cmdline_str
+ )
)
-
+
if is_concore:
try:
- create_time = datetime.fromtimestamp(proc.info['create_time'])
+ create_time = datetime.fromtimestamp(proc.info["create_time"])
uptime = datetime.now() - create_time
hours, remainder = divmod(int(uptime.total_seconds()), 3600)
minutes, seconds = divmod(remainder, 60)
uptime_str = f"{hours}h {minutes}m {seconds}s"
- except:
+ except (OSError, OverflowError, ValueError):
# Failed to calculate uptime
uptime_str = "unknown"
-
+
try:
- mem_mb = proc.info['memory_info'].rss / 1024 / 1024
+ mem_mb = proc.info["memory_info"].rss / 1024 / 1024
mem_str = f"{mem_mb:.1f} MB"
- except:
+ except (psutil.NoSuchProcess, psutil.AccessDenied, AttributeError):
# Failed to get memory info
mem_str = "unknown"
-
- command = ' '.join(cmdline[:3]) if len(cmdline) >= 3 else cmdline_str[:50]
-
- concore_processes.append({
- 'pid': proc.info['pid'],
- 'name': proc.info.get('name', 'unknown'),
- 'command': command,
- 'uptime': uptime_str,
- 'memory': mem_str
- })
+
+ command = (
+ " ".join(cmdline[:3]) if len(cmdline) >= 3 else cmdline_str[:50]
+ )
+
+ concore_processes.append(
+ {
+ "pid": proc.info["pid"],
+ "name": proc.info.get("name", "unknown"),
+ "command": command,
+ "uptime": uptime_str,
+ "memory": mem_str,
+ }
+ )
except (psutil.NoSuchProcess, psutil.AccessDenied):
# Process may have exited or be inaccessible; safe to ignore
continue
-
+
except Exception as e:
console.print(f"[red]Error scanning processes:[/red] {str(e)}")
return
-
+
if not concore_processes:
- console.print(Panel.fit(
- "[yellow]No concore processes currently running[/yellow]",
- border_style="yellow"
- ))
+ console.print(
+ Panel.fit(
+ "[yellow]No concore processes currently running[/yellow]",
+ border_style="yellow",
+ )
+ )
else:
- table = Table(title=f"Concore Processes ({len(concore_processes)} running)", show_header=True)
+ table = Table(
+ title=f"Concore Processes ({len(concore_processes)} running)",
+ show_header=True,
+ )
table.add_column("PID", style="cyan", justify="right")
table.add_column("Name", style="green")
table.add_column("Uptime", style="yellow")
table.add_column("Memory", style="magenta")
table.add_column("Command", style="white")
-
+
for proc in concore_processes:
table.add_row(
- str(proc['pid']),
- proc['name'],
- proc['uptime'],
- proc['memory'],
- proc['command']
+ str(proc["pid"]),
+ proc["name"],
+ proc["uptime"],
+ proc["memory"],
+ proc["command"],
)
-
+
console.print(table)
console.print()
- console.print(f"[dim]Use 'concore stop' to terminate all processes[/dim]")
+ console.print("[dim]Use 'concore stop' to terminate all processes[/dim]")
diff --git a/concore_cli/commands/stop.py b/concore_cli/commands/stop.py
index 5b0a9a92..27b5796e 100644
--- a/concore_cli/commands/stop.py
+++ b/concore_cli/commands/stop.py
@@ -2,94 +2,106 @@
import os
import subprocess
import sys
-from pathlib import Path
from rich.panel import Panel
+
def stop_all(console):
console.print("[cyan]Finding concore processes...[/cyan]\n")
-
+
processes_to_kill = []
current_pid = os.getpid()
-
+
try:
- for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
+ for proc in psutil.process_iter(["pid", "name", "cmdline"]):
try:
- if proc.info['pid'] == current_pid:
+ if proc.info["pid"] == current_pid:
continue
-
- cmdline = proc.info.get('cmdline') or []
- name = proc.info.get('name', '').lower()
- cmdline_str = ' '.join(cmdline) if cmdline else ''
-
+
+ cmdline = proc.info.get("cmdline") or []
+ name = proc.info.get("name", "").lower()
+ cmdline_str = " ".join(cmdline) if cmdline else ""
+
is_concore = (
- 'concore' in cmdline_str.lower() or
- 'concore.py' in cmdline_str.lower() or
- any('concorekill.bat' in str(item) for item in cmdline) or
- (name in ['python.exe', 'python', 'python3'] and 'concore' in cmdline_str)
+ "concore" in cmdline_str.lower()
+ or "concore.py" in cmdline_str.lower()
+ or any("concorekill.bat" in str(item) for item in cmdline)
+ or (
+ name in ["python.exe", "python", "python3"]
+ and "concore" in cmdline_str
+ )
)
-
+
if is_concore:
processes_to_kill.append(proc)
except (psutil.NoSuchProcess, psutil.AccessDenied):
# Process already exited or access denied; continue
continue
-
+
except Exception as e:
console.print(f"[red]Error:[/red] {str(e)}")
return
-
+
if not processes_to_kill:
- console.print(Panel.fit(
- "[yellow]No concore processes found[/yellow]",
- border_style="yellow"
- ))
+ console.print(
+ Panel.fit(
+ "[yellow]No concore processes found[/yellow]", border_style="yellow"
+ )
+ )
return
-
- console.print(f"[yellow]Stopping {len(processes_to_kill)} process(es)...[/yellow]\n")
-
+
+ console.print(
+ f"[yellow]Stopping {len(processes_to_kill)} process(es)...[/yellow]\n"
+ )
+
killed_count = 0
failed_count = 0
-
+
for proc in processes_to_kill:
try:
- pid = proc.info['pid']
- name = proc.info.get('name', 'unknown')
-
- if sys.platform == 'win32':
- result = subprocess.run(['taskkill', '/F', '/PID', str(pid)],
- capture_output=True,
- check=False)
+ pid = proc.info["pid"]
+ name = proc.info.get("name", "unknown")
+
+ if sys.platform == "win32":
+ result = subprocess.run(
+ ["taskkill", "/F", "/PID", str(pid)],
+ capture_output=True,
+ check=False,
+ )
if result.returncode != 0:
raise RuntimeError(f"taskkill failed with code {result.returncode}")
else:
proc.terminate()
proc.wait(timeout=3)
-
+
console.print(f" [green]✓[/green] Stopped {name} (PID: {pid})")
killed_count += 1
-
+
except psutil.TimeoutExpired:
try:
proc.kill()
console.print(f" [yellow]⚠[/yellow] Force killed {name} (PID: {pid})")
killed_count += 1
- except:
+ except (psutil.NoSuchProcess, psutil.AccessDenied):
console.print(f" [red]✗[/red] Failed to stop {name} (PID: {pid})")
failed_count += 1
except Exception as e:
console.print(f" [red]✗[/red] Failed to stop PID {pid}: {str(e)}")
failed_count += 1
-
+
console.print()
-
+
if failed_count == 0:
- console.print(Panel.fit(
- f"[green]✓[/green] Successfully stopped all {killed_count} process(es)",
- border_style="green"
- ))
+ console.print(
+ Panel.fit(
+ f"[green]✓[/green] Successfully stopped all {killed_count} process(es)",
+ border_style="green",
+ )
+ )
else:
- console.print(Panel.fit(
- f"[yellow]Stopped {killed_count} process(es)\n"
- f"Failed to stop {failed_count} process(es)[/yellow]",
- border_style="yellow"
- ))
+ console.print(
+ Panel.fit(
+ f"[yellow]Stopped {killed_count} process(es)\n"
+ f"Failed to stop {failed_count} process(es)[/yellow]",
+ border_style="yellow",
+ )
+ )
diff --git a/concore_cli/commands/validate.py b/concore_cli/commands/validate.py
index d9a39dd4..e987c8ad 100644
--- a/concore_cli/commands/validate.py
+++ b/concore_cli/commands/validate.py
@@ -1,114 +1,123 @@
from pathlib import Path
from bs4 import BeautifulSoup
from rich.panel import Panel
-from rich.table import Table
import re
import xml.etree.ElementTree as ET
+
def validate_workflow(workflow_file, source_dir, console):
workflow_path = Path(workflow_file)
- source_root = (workflow_path.parent / source_dir)
-
+ source_root = workflow_path.parent / source_dir
+
console.print(f"[cyan]Validating:[/cyan] {workflow_path.name}")
console.print()
-
+
errors = []
warnings = []
info = []
-
+
def finalize():
show_results(console, errors, warnings, info)
return len(errors) == 0
-
+
try:
- with open(workflow_path, 'r') as f:
+ with open(workflow_path, "r") as f:
content = f.read()
-
+
if not content.strip():
errors.append("File is empty")
return finalize()
-
+
# strict XML syntax check
try:
ET.fromstring(content)
except ET.ParseError as e:
errors.append(f"Invalid XML: {str(e)}")
return finalize()
-
+
try:
- soup = BeautifulSoup(content, 'xml')
+ soup = BeautifulSoup(content, "xml")
except Exception as e:
errors.append(f"Invalid XML: {str(e)}")
return finalize()
-
- root = soup.find('graphml')
+
+ root = soup.find("graphml")
if not root:
errors.append("Not a valid GraphML file - missing root element")
return finalize()
-
+
# check the graph attributes
- graph = soup.find('graph')
+ graph = soup.find("graph")
if not graph:
- errors.append("Missing element")
+ errors.append("Missing element")
else:
- edgedefault = graph.get('edgedefault')
- if not edgedefault:
- errors.append("Graph missing required 'edgedefault' attribute")
- elif edgedefault not in ['directed', 'undirected']:
- errors.append(f"Invalid edgedefault value '{edgedefault}' (must be 'directed' or 'undirected')")
-
- nodes = soup.find_all('node')
- edges = soup.find_all('edge')
-
+ edgedefault = graph.get("edgedefault")
+ if not edgedefault:
+ errors.append("Graph missing required 'edgedefault' attribute")
+ elif edgedefault not in ["directed", "undirected"]:
+ errors.append(
+ f"Invalid edgedefault value '{edgedefault}' (must be 'directed' or 'undirected')"
+ )
+
+ nodes = soup.find_all("node")
+ edges = soup.find_all("edge")
+
if len(nodes) == 0:
warnings.append("No nodes found in workflow")
else:
info.append(f"Found {len(nodes)} node(s)")
-
+
if len(edges) == 0:
warnings.append("No edges found in workflow")
else:
info.append(f"Found {len(edges)} edge(s)")
-
+
if not source_root.exists():
warnings.append(f"Source directory not found: {source_root}")
-
+
node_labels = []
for node in nodes:
- #check the node id
- node_id = node.get('id')
+ # check the node id
+ node_id = node.get("id")
if not node_id:
errors.append("Node missing required 'id' attribute")
- #skip further checks for this node to avoid noise
+ # skip further checks for this node to avoid noise
continue
try:
- #robust find: try with namespace prefix first, then without
- label_tag = node.find('y:NodeLabel')
+ # robust find: try with namespace prefix first, then without
+ label_tag = node.find("y:NodeLabel")
if not label_tag:
- label_tag = node.find('NodeLabel')
-
+ label_tag = node.find("NodeLabel")
+
if label_tag and label_tag.text:
label = label_tag.text.strip()
node_labels.append(label)
-
+
# reject shell metacharacters to prevent command injection (#251)
if re.search(r'[;&|`$\'"()\\]', label):
- errors.append(f"Node '{label}' contains unsafe shell characters")
+ errors.append(
+ f"Node '{label}' contains unsafe shell characters"
+ )
continue
-
- if ':' not in label:
+
+ if ":" not in label:
warnings.append(f"Node '{label}' missing format 'ID:filename'")
else:
- parts = label.split(':')
+ parts = label.split(":")
if len(parts) != 2:
warnings.append(f"Node '{label}' has invalid format")
else:
nodeId_part, filename = parts
if not filename:
errors.append(f"Node '{label}' has no filename")
- elif not any(filename.endswith(ext) for ext in ['.py', '.cpp', '.m', '.v', '.java']):
- warnings.append(f"Node '{label}' has unusual file extension")
+ elif not any(
+ filename.endswith(ext)
+ for ext in [".py", ".cpp", ".m", ".v", ".java"]
+ ):
+ warnings.append(
+ f"Node '{label}' has unusual file extension"
+ )
elif source_root.exists():
file_path = source_root / filename
if not file_path.exists():
@@ -125,48 +134,48 @@ def finalize():
errors.append(f"Duplicate node label: '{label}'")
seen.add(label)
- node_ids = {node.get('id') for node in nodes if node.get('id')}
+ node_ids = {node.get("id") for node in nodes if node.get("id")}
for edge in edges:
- source = edge.get('source')
- target = edge.get('target')
-
+ source = edge.get("source")
+ target = edge.get("target")
+
if not source or not target:
errors.append("Edge missing source or target")
continue
-
+
if source not in node_ids:
errors.append(f"Edge references non-existent source node: {source}")
if target not in node_ids:
errors.append(f"Edge references non-existent target node: {target}")
-
+
edge_label_regex = re.compile(r"0x([a-fA-F0-9]+)_(\S+)")
zmq_edges = 0
file_edges = 0
-
+
for edge in edges:
try:
- label_tag = edge.find('y:EdgeLabel')
+ label_tag = edge.find("y:EdgeLabel")
if not label_tag:
- label_tag = edge.find('EdgeLabel')
-
+ label_tag = edge.find("EdgeLabel")
+
if label_tag and label_tag.text:
if edge_label_regex.match(label_tag.text.strip()):
zmq_edges += 1
else:
file_edges += 1
- except:
+ except Exception:
pass
-
+
if zmq_edges > 0:
info.append(f"ZMQ-based edges: {zmq_edges}")
if file_edges > 0:
info.append(f"File-based edges: {file_edges}")
-
+
_check_cycles(soup, errors, warnings)
_check_zmq_ports(soup, errors, warnings)
-
+
return finalize()
-
+
except FileNotFoundError:
console.print(f"[red]Error:[/red] File not found: {workflow_path}")
return False
@@ -174,35 +183,36 @@ def finalize():
console.print(f"[red]Validation failed:[/red] {str(e)}")
return False
+
def _check_cycles(soup, errors, warnings):
- nodes = soup.find_all('node')
- edges = soup.find_all('edge')
-
- node_ids = [node.get('id') for node in nodes if node.get('id')]
+ nodes = soup.find_all("node")
+ edges = soup.find_all("edge")
+
+ node_ids = [node.get("id") for node in nodes if node.get("id")]
if not node_ids:
return
-
+
graph = {nid: [] for nid in node_ids}
for edge in edges:
- source = edge.get('source')
- target = edge.get('target')
+ source = edge.get("source")
+ target = edge.get("target")
if source and target and source in graph:
graph[source].append(target)
-
+
def has_cycle_from(start, visited, rec_stack):
visited.add(start)
rec_stack.add(start)
-
+
for neighbor in graph.get(start, []):
if neighbor not in visited:
if has_cycle_from(neighbor, visited, rec_stack):
return True
elif neighbor in rec_stack:
return True
-
+
rec_stack.remove(start)
return False
-
+
visited = set()
for node_id in node_ids:
if node_id not in visited:
@@ -210,41 +220,51 @@ def has_cycle_from(start, visited, rec_stack):
warnings.append("Workflow contains cycles (expected for control loops)")
return
+
def _check_zmq_ports(soup, errors, warnings):
- edges = soup.find_all('edge')
+ edges = soup.find_all("edge")
port_pattern = re.compile(r"0x([a-fA-F0-9]+)_(\S+)")
-
+
ports_used = {}
-
+
for edge in edges:
- label_tag = edge.find('y:EdgeLabel') or edge.find('EdgeLabel')
+ label_tag = edge.find("y:EdgeLabel") or edge.find("EdgeLabel")
if not label_tag or not label_tag.text:
continue
-
+
match = port_pattern.match(label_tag.text.strip())
if not match:
continue
-
+
port_hex = match.group(1)
port_name = match.group(2)
port_num = int(port_hex, 16)
-
+
if port_num < 1:
- errors.append(f"Invalid port number: {port_num} (0x{port_hex}) must be at least 1")
+ errors.append(
+ f"Invalid port number: {port_num} (0x{port_hex}) must be at least 1"
+ )
continue
elif port_num > 65535:
- errors.append(f"Invalid port number: {port_num} (0x{port_hex}) exceeds maximum (65535)")
+ errors.append(
+ f"Invalid port number: {port_num} (0x{port_hex}) exceeds maximum (65535)"
+ )
continue
-
+
if port_num in ports_used:
existing_name = ports_used[port_num]
if existing_name != port_name:
- errors.append(f"Port conflict: 0x{port_hex} used for both '{existing_name}' and '{port_name}'")
+ errors.append(
+ f"Port conflict: 0x{port_hex} used for both '{existing_name}' and '{port_name}'"
+ )
else:
ports_used[port_num] = port_name
-
+
if port_num < 1024:
- warnings.append(f"Port {port_num} (0x{port_hex}) is in reserved range (< 1024)")
+ warnings.append(
+ f"Port {port_num} (0x{port_hex}) is in reserved range (< 1024)"
+ )
+
def show_results(console, errors, warnings, info):
if errors:
@@ -253,27 +273,31 @@ def show_results(console, errors, warnings, info):
console.print(f" [red]✗[/red] {error}")
else:
console.print("[green]✓ Validation passed[/green]\n")
-
+
if warnings:
console.print()
for warning in warnings:
console.print(f" [yellow]⚠[/yellow] {warning}")
-
+
if info:
console.print()
for item in info:
console.print(f" [blue]ℹ[/blue] {item}")
-
+
console.print()
-
+
if not errors:
- console.print(Panel.fit(
- "[green]✓[/green] Workflow is valid and ready to run",
- border_style="green"
- ))
+ console.print(
+ Panel.fit(
+ "[green]✓[/green] Workflow is valid and ready to run",
+ border_style="green",
+ )
+ )
else:
- console.print(Panel.fit(
- f"[red]Found {len(errors)} error(s)[/red]\n"
- "Fix the errors above before running the workflow",
- border_style="red"
- ))
+ console.print(
+ Panel.fit(
+ f"[red]Found {len(errors)} error(s)[/red]\n"
+ "Fix the errors above before running the workflow",
+ border_style="red",
+ )
+ )
diff --git a/concore_cli/commands/watch.py b/concore_cli/commands/watch.py
index 44f39e74..e82efbab 100644
--- a/concore_cli/commands/watch.py
+++ b/concore_cli/commands/watch.py
@@ -20,10 +20,14 @@ def watch_study(study_dir, interval, once, console):
edges = _find_edges(study_path, nodes)
if not nodes and not edges:
- console.print(Panel(
- "[yellow]No nodes or edge directories found.[/yellow]\n"
- "[dim]Make sure you point to a built study directory (run makestudy/build first).[/dim]",
- title="concore watch", border_style="yellow"))
+ console.print(
+ Panel(
+ "[yellow]No nodes or edge directories found.[/yellow]\n"
+ "[dim]Make sure you point to a built study directory (run makestudy/build first).[/dim]",
+ title="concore watch",
+ border_style="yellow",
+ )
+ )
return
if once:
@@ -63,15 +67,18 @@ def _build_display(study_path, nodes, edges):
parts.append(_node_table(nodes))
if not edges and not nodes:
- parts.append(Panel("[yellow]No data yet[/yellow]",
- border_style="yellow"))
+ parts.append(Panel("[yellow]No data yet[/yellow]", border_style="yellow"))
return Group(*parts)
def _edge_table(edges):
- table = Table(title="Edges (port data)", show_header=True,
- title_style="bold cyan", expand=True)
+ table = Table(
+ title="Edges (port data)",
+ show_header=True,
+ title_style="bold cyan",
+ expand=True,
+ )
table.add_column("Edge", style="green", min_width=10)
table.add_column("Port File", style="cyan")
table.add_column("Simtime", style="yellow", justify="right")
@@ -96,18 +103,23 @@ def _edge_table(edges):
def _node_table(nodes):
- table = Table(title="Nodes", show_header=True,
- title_style="bold cyan", expand=True)
+ table = Table(title="Nodes", show_header=True, title_style="bold cyan", expand=True)
table.add_column("Node", style="green", min_width=10)
table.add_column("Ports (in)", style="cyan")
table.add_column("Ports (out)", style="cyan")
table.add_column("Source", style="dim")
for node_name, node_path in sorted(nodes):
- in_dirs = sorted(d.name for d in node_path.iterdir()
- if d.is_dir() and re.match(r'^in\d+$', d.name))
- out_dirs = sorted(d.name for d in node_path.iterdir()
- if d.is_dir() and re.match(r'^out\d+$', d.name))
+ in_dirs = sorted(
+ d.name
+ for d in node_path.iterdir()
+ if d.is_dir() and re.match(r"^in\d+$", d.name)
+ )
+ out_dirs = sorted(
+ d.name
+ for d in node_path.iterdir()
+ if d.is_dir() and re.match(r"^out\d+$", d.name)
+ )
src = _detect_source(node_path)
table.add_row(
node_name,
@@ -121,14 +133,15 @@ def _node_table(nodes):
def _find_nodes(study_path):
nodes = []
- port_re = re.compile(r'^(in|out)\d+$')
- skip = {'src', '__pycache__', '.git'}
+ port_re = re.compile(r"^(in|out)\d+$")
+ skip = {"src", "__pycache__", ".git"}
for entry in study_path.iterdir():
- if not entry.is_dir() or entry.name in skip or entry.name.startswith('.'):
+ if not entry.is_dir() or entry.name in skip or entry.name.startswith("."):
continue
try:
- has_ports = any(c.is_dir() and port_re.match(c.name)
- for c in entry.iterdir())
+ has_ports = any(
+ c.is_dir() and port_re.match(c.name) for c in entry.iterdir()
+ )
except PermissionError:
continue
if has_ports:
@@ -140,12 +153,12 @@ def _find_edges(study_path, nodes=None):
if nodes is None:
nodes = _find_nodes(study_path)
node_names = {name for name, _ in nodes}
- skip = {'src', '__pycache__', '.git'}
+ skip = {"src", "__pycache__", ".git"}
edges = []
for entry in study_path.iterdir():
if not entry.is_dir():
continue
- if entry.name in skip or entry.name in node_names or entry.name.startswith('.'):
+ if entry.name in skip or entry.name in node_names or entry.name.startswith("."):
continue
try:
has_file = any(f.is_file() for f in entry.iterdir())
@@ -166,7 +179,7 @@ def _read_edge_files(edge_path):
if not f.is_file():
continue
# skip concore internal files
- if f.name.startswith('concore.'):
+ if f.name.startswith("concore."):
continue
simtime_val, value_str = _parse_port_file(f)
try:
@@ -178,11 +191,11 @@ def _read_edge_files(edge_path):
def _detect_source(node_path):
- for ext in ('*.py', '*.m', '*.cpp', '*.v', '*.sh', '*.java'):
+ for ext in ("*.py", "*.m", "*.cpp", "*.v", "*.sh", "*.java"):
matches = list(node_path.glob(ext))
for m in matches:
# skip concore library copies
- if m.name.startswith('concore'):
+ if m.name.startswith("concore"):
continue
return m.name
return "—"
diff --git a/pytest.ini b/pytest.ini
index 13bc1da9..5881b607 100644
--- a/pytest.ini
+++ b/pytest.ini
@@ -3,4 +3,4 @@ testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
-addopts = -v --tb=short
\ No newline at end of file
+addopts = -v --tb=short --cov=concore_cli --cov=concore_base --cov-report=term-missing
\ No newline at end of file
diff --git a/requirements-ci.txt b/requirements-ci.txt
index 5668f6a5..1f26eb20 100644
--- a/requirements-ci.txt
+++ b/requirements-ci.txt
@@ -1,6 +1,7 @@
# Minimal dependencies for CI (linting and testing)
# Does not include heavyweight packages like tensorflow
pytest
+pytest-cov
ruff
pyzmq
numpy
diff --git a/ruff.toml b/ruff.toml
new file mode 100644
index 00000000..d487540d
--- /dev/null
+++ b/ruff.toml
@@ -0,0 +1,41 @@
+exclude = [
+ # legacy/experimental dirs - fix incrementally
+ "demo/",
+ "ratc/",
+ "ratc2/",
+ "example/",
+ "0mq/",
+ "measurements/",
+ "tools/",
+ "nintan/",
+ "testsou/",
+ "linktest/",
+ "fri/",
+ "gi/",
+ "humanc/",
+ # core modules - maintainer-managed, don't touch yet
+ "mkconcore.py",
+ "concore.py",
+ "concoredocker.py",
+ "concore_base.py",
+ "contribute.py",
+ "copy_with_port_portname.py",
+ # not valid Python (Dockerfiles)
+ "Dockerfile.py",
+ "Dockerfile.m",
+ "Dockerfile.sh",
+ "Dockerfile.v",
+ "Dockerfile.java",
+]
+
+[lint]
+select = ["E", "F"]
+ignore = [
+ "E501", # line too long - enforce incrementally
+ "E712", # comparison to True/False - common in tests
+ "E721", # type comparison - common in tests
+]
+
+[format]
+quote-style = "double"
+indent-style = "space"
diff --git a/tests/conftest.py b/tests/conftest.py
index 10d20ac0..c303e450 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -12,4 +12,4 @@ def temp_dir():
dirpath = tempfile.mkdtemp()
yield dirpath
if os.path.exists(dirpath):
- shutil.rmtree(dirpath)
\ No newline at end of file
+ shutil.rmtree(dirpath)
diff --git a/tests/test_cli.py b/tests/test_cli.py
index 33316a71..4321e05a 100644
--- a/tests/test_cli.py
+++ b/tests/test_cli.py
@@ -6,190 +6,223 @@
from click.testing import CliRunner
from concore_cli.cli import cli
+
class TestConcoreCLI(unittest.TestCase):
-
def setUp(self):
self.runner = CliRunner()
self.temp_dir = tempfile.mkdtemp()
-
+
def tearDown(self):
if Path(self.temp_dir).exists():
shutil.rmtree(self.temp_dir)
-
+
def test_version(self):
- result = self.runner.invoke(cli, ['--version'])
+ result = self.runner.invoke(cli, ["--version"])
self.assertEqual(result.exit_code, 0)
- self.assertIn('1.0.0', result.output)
-
+ self.assertIn("1.0.0", result.output)
+
def test_help(self):
- result = self.runner.invoke(cli, ['--help'])
+ result = self.runner.invoke(cli, ["--help"])
self.assertEqual(result.exit_code, 0)
- self.assertIn('Usage:', result.output)
- self.assertIn('Commands:', result.output)
-
+ self.assertIn("Usage:", result.output)
+ self.assertIn("Commands:", result.output)
+
def test_init_command(self):
with self.runner.isolated_filesystem(temp_dir=self.temp_dir):
- result = self.runner.invoke(cli, ['init', 'test-project'])
+ result = self.runner.invoke(cli, ["init", "test-project"])
self.assertEqual(result.exit_code, 0)
-
- project_path = Path('test-project')
+
+ project_path = Path("test-project")
self.assertTrue(project_path.exists())
- self.assertTrue((project_path / 'workflow.graphml').exists())
- self.assertTrue((project_path / 'src').exists())
- self.assertTrue((project_path / 'README.md').exists())
- self.assertTrue((project_path / 'src' / 'script.py').exists())
-
+ self.assertTrue((project_path / "workflow.graphml").exists())
+ self.assertTrue((project_path / "src").exists())
+ self.assertTrue((project_path / "README.md").exists())
+ self.assertTrue((project_path / "src" / "script.py").exists())
+
def test_init_existing_directory(self):
with self.runner.isolated_filesystem(temp_dir=self.temp_dir):
- Path('existing').mkdir()
- result = self.runner.invoke(cli, ['init', 'existing'])
+ Path("existing").mkdir()
+ result = self.runner.invoke(cli, ["init", "existing"])
self.assertNotEqual(result.exit_code, 0)
- self.assertIn('already exists', result.output)
-
+ self.assertIn("already exists", result.output)
+
def test_validate_missing_file(self):
- result = self.runner.invoke(cli, ['validate', 'nonexistent.graphml'])
+ result = self.runner.invoke(cli, ["validate", "nonexistent.graphml"])
self.assertNotEqual(result.exit_code, 0)
-
+
def test_validate_valid_file(self):
with self.runner.isolated_filesystem(temp_dir=self.temp_dir):
- result = self.runner.invoke(cli, ['init', 'test-project'])
+ result = self.runner.invoke(cli, ["init", "test-project"])
self.assertEqual(result.exit_code, 0)
-
- result = self.runner.invoke(cli, ['validate', 'test-project/workflow.graphml'])
+
+ result = self.runner.invoke(
+ cli, ["validate", "test-project/workflow.graphml"]
+ )
self.assertEqual(result.exit_code, 0)
- self.assertIn('Validation passed', result.output)
+ self.assertIn("Validation passed", result.output)
def test_validate_missing_node_file(self):
with self.runner.isolated_filesystem(temp_dir=self.temp_dir):
- result = self.runner.invoke(cli, ['init', 'test-project'])
+ result = self.runner.invoke(cli, ["init", "test-project"])
self.assertEqual(result.exit_code, 0)
- missing_file = Path('test-project/src/script.py')
+ missing_file = Path("test-project/src/script.py")
if missing_file.exists():
missing_file.unlink()
- result = self.runner.invoke(cli, ['validate', 'test-project/workflow.graphml'])
+ result = self.runner.invoke(
+ cli, ["validate", "test-project/workflow.graphml"]
+ )
self.assertNotEqual(result.exit_code, 0)
- self.assertIn('Missing source file', result.output)
-
+ self.assertIn("Missing source file", result.output)
+
def test_status_command(self):
- result = self.runner.invoke(cli, ['status'])
+ result = self.runner.invoke(cli, ["status"])
self.assertEqual(result.exit_code, 0)
-
+
def test_run_command_missing_source(self):
with self.runner.isolated_filesystem(temp_dir=self.temp_dir):
- result = self.runner.invoke(cli, ['init', 'test-project'])
- result = self.runner.invoke(cli, ['run', 'test-project/workflow.graphml', '--source', 'nonexistent'])
+ result = self.runner.invoke(cli, ["init", "test-project"])
+ result = self.runner.invoke(
+ cli, ["run", "test-project/workflow.graphml", "--source", "nonexistent"]
+ )
self.assertNotEqual(result.exit_code, 0)
def test_run_command_from_project_dir(self):
with self.runner.isolated_filesystem(temp_dir=self.temp_dir):
- result = self.runner.invoke(cli, ['init', 'test-project'])
+ result = self.runner.invoke(cli, ["init", "test-project"])
self.assertEqual(result.exit_code, 0)
- result = self.runner.invoke(cli, [
- 'run',
- 'test-project/workflow.graphml',
- '--source', 'test-project/src',
- '--output', 'out',
- '--type', 'posix'
- ])
+ result = self.runner.invoke(
+ cli,
+ [
+ "run",
+ "test-project/workflow.graphml",
+ "--source",
+ "test-project/src",
+ "--output",
+ "out",
+ "--type",
+ "posix",
+ ],
+ )
self.assertEqual(result.exit_code, 0)
- self.assertTrue(Path('out/src/concore.py').exists())
+ self.assertTrue(Path("out/src/concore.py").exists())
def test_run_command_default_type(self):
with self.runner.isolated_filesystem(temp_dir=self.temp_dir):
- result = self.runner.invoke(cli, ['init', 'test-project'])
+ result = self.runner.invoke(cli, ["init", "test-project"])
self.assertEqual(result.exit_code, 0)
- result = self.runner.invoke(cli, [
- 'run',
- 'test-project/workflow.graphml',
- '--source', 'test-project/src',
- '--output', 'out'
- ])
+ result = self.runner.invoke(
+ cli,
+ [
+ "run",
+ "test-project/workflow.graphml",
+ "--source",
+ "test-project/src",
+ "--output",
+ "out",
+ ],
+ )
self.assertEqual(result.exit_code, 0)
- if os.name == 'nt':
- self.assertTrue(Path('out/build.bat').exists())
+ if os.name == "nt":
+ self.assertTrue(Path("out/build.bat").exists())
else:
- self.assertTrue(Path('out/build').exists())
+ self.assertTrue(Path("out/build").exists())
def test_run_command_nested_output_path(self):
with self.runner.isolated_filesystem(temp_dir=self.temp_dir):
- result = self.runner.invoke(cli, ['init', 'test-project'])
+ result = self.runner.invoke(cli, ["init", "test-project"])
self.assertEqual(result.exit_code, 0)
- result = self.runner.invoke(cli, [
- 'run',
- 'test-project/workflow.graphml',
- '--source', 'test-project/src',
- '--output', 'build/out',
- '--type', 'posix'
- ])
+ result = self.runner.invoke(
+ cli,
+ [
+ "run",
+ "test-project/workflow.graphml",
+ "--source",
+ "test-project/src",
+ "--output",
+ "build/out",
+ "--type",
+ "posix",
+ ],
+ )
self.assertEqual(result.exit_code, 0)
- self.assertTrue(Path('build/out/src/concore.py').exists())
+ self.assertTrue(Path("build/out/src/concore.py").exists())
def test_run_command_subdir_source(self):
with self.runner.isolated_filesystem(temp_dir=self.temp_dir):
- result = self.runner.invoke(cli, ['init', 'test-project'])
+ result = self.runner.invoke(cli, ["init", "test-project"])
self.assertEqual(result.exit_code, 0)
- subdir = Path('test-project/src/subdir')
+ subdir = Path("test-project/src/subdir")
subdir.mkdir(parents=True, exist_ok=True)
- shutil.move('test-project/src/script.py', subdir / 'script.py')
+ shutil.move("test-project/src/script.py", subdir / "script.py")
- workflow_path = Path('test-project/workflow.graphml')
+ workflow_path = Path("test-project/workflow.graphml")
content = workflow_path.read_text()
- content = content.replace('N1:script.py', 'N1:subdir/script.py')
+ content = content.replace("N1:script.py", "N1:subdir/script.py")
workflow_path.write_text(content)
- result = self.runner.invoke(cli, [
- 'run',
- 'test-project/workflow.graphml',
- '--source', 'test-project/src',
- '--output', 'out',
- '--type', 'posix'
- ])
+ result = self.runner.invoke(
+ cli,
+ [
+ "run",
+ "test-project/workflow.graphml",
+ "--source",
+ "test-project/src",
+ "--output",
+ "out",
+ "--type",
+ "posix",
+ ],
+ )
self.assertEqual(result.exit_code, 0)
- self.assertTrue(Path('out/src/subdir/script.py').exists())
+ self.assertTrue(Path("out/src/subdir/script.py").exists())
def test_run_command_docker_subdir_source_build_paths(self):
with self.runner.isolated_filesystem(temp_dir=self.temp_dir):
- result = self.runner.invoke(cli, ['init', 'test-project'])
+ result = self.runner.invoke(cli, ["init", "test-project"])
self.assertEqual(result.exit_code, 0)
- subdir = Path('test-project/src/subdir')
+ subdir = Path("test-project/src/subdir")
subdir.mkdir(parents=True, exist_ok=True)
- shutil.move('test-project/src/script.py', subdir / 'script.py')
+ shutil.move("test-project/src/script.py", subdir / "script.py")
- workflow_path = Path('test-project/workflow.graphml')
+ workflow_path = Path("test-project/workflow.graphml")
content = workflow_path.read_text()
- content = content.replace('N1:script.py', 'N1:subdir/script.py')
+ content = content.replace("N1:script.py", "N1:subdir/script.py")
workflow_path.write_text(content)
- result = self.runner.invoke(cli, [
- 'run',
- 'test-project/workflow.graphml',
- '--source', 'test-project/src',
- '--output', 'out',
- '--type', 'docker'
- ])
+ result = self.runner.invoke(
+ cli,
+ [
+ "run",
+ "test-project/workflow.graphml",
+ "--source",
+ "test-project/src",
+ "--output",
+ "out",
+ "--type",
+ "docker",
+ ],
+ )
self.assertEqual(result.exit_code, 0)
- build_script = Path('out/build').read_text()
- self.assertIn('mkdir docker-subdir__script', build_script)
- self.assertIn('cp ../src/Dockerfile.subdir/script Dockerfile', build_script)
- self.assertIn('cp ../src/subdir/script.py .', build_script)
- self.assertIn('cp ../src/subdir/script.iport concore.iport', build_script)
- self.assertIn('cd ..', build_script)
+ build_script = Path("out/build").read_text()
+ self.assertIn("mkdir docker-subdir__script", build_script)
+ self.assertIn("cp ../src/Dockerfile.subdir/script Dockerfile", build_script)
+ self.assertIn("cp ../src/subdir/script.py .", build_script)
+ self.assertIn("cp ../src/subdir/script.iport concore.iport", build_script)
+ self.assertIn("cd ..", build_script)
def test_run_command_shared_source_specialization_merges_edge_params(self):
with self.runner.isolated_filesystem(temp_dir=self.temp_dir):
- Path('src').mkdir()
- Path('src/common.py').write_text(
- "import concore\n\n"
- "def step():\n"
- " return None\n"
+ Path("src").mkdir()
+ Path("src/common.py").write_text(
+ "import concore\n\ndef step():\n return None\n"
)
workflow = """
@@ -205,78 +238,97 @@ def test_run_command_shared_source_specialization_merges_edge_params(self):
"""
- Path('workflow.graphml').write_text(workflow)
-
- result = self.runner.invoke(cli, [
- 'run',
- 'workflow.graphml',
- '--source', 'src',
- '--output', 'out',
- '--type', 'posix'
- ])
+ Path("workflow.graphml").write_text(workflow)
+
+ result = self.runner.invoke(
+ cli,
+ [
+ "run",
+ "workflow.graphml",
+ "--source",
+ "src",
+ "--output",
+ "out",
+ "--type",
+ "posix",
+ ],
+ )
self.assertEqual(result.exit_code, 0)
- specialized_script = Path('out/src/common.py')
+ specialized_script = Path("out/src/common.py")
self.assertTrue(specialized_script.exists())
content = specialized_script.read_text()
- self.assertIn('PORT_NAME_A_B', content)
- self.assertIn('PORT_A_B', content)
- self.assertIn('PORT_NAME_B_C', content)
- self.assertIn('PORT_B_C', content)
+ self.assertIn("PORT_NAME_A_B", content)
+ self.assertIn("PORT_A_B", content)
+ self.assertIn("PORT_NAME_B_C", content)
+ self.assertIn("PORT_B_C", content)
def test_run_command_existing_output(self):
with self.runner.isolated_filesystem(temp_dir=self.temp_dir):
- result = self.runner.invoke(cli, ['init', 'test-project'])
- Path('output').mkdir()
-
- result = self.runner.invoke(cli, [
- 'run',
- 'test-project/workflow.graphml',
- '--source', 'test-project/src',
- '--output', 'output'
- ])
- self.assertIn('already exists', result.output.lower())
-
+ result = self.runner.invoke(cli, ["init", "test-project"])
+ Path("output").mkdir()
+
+ result = self.runner.invoke(
+ cli,
+ [
+ "run",
+ "test-project/workflow.graphml",
+ "--source",
+ "test-project/src",
+ "--output",
+ "output",
+ ],
+ )
+ self.assertIn("already exists", result.output.lower())
+
def test_inspect_command_basic(self):
with self.runner.isolated_filesystem(temp_dir=self.temp_dir):
- result = self.runner.invoke(cli, ['init', 'test-project'])
+ result = self.runner.invoke(cli, ["init", "test-project"])
self.assertEqual(result.exit_code, 0)
-
- result = self.runner.invoke(cli, ['inspect', 'test-project/workflow.graphml'])
+
+ result = self.runner.invoke(
+ cli, ["inspect", "test-project/workflow.graphml"]
+ )
self.assertEqual(result.exit_code, 0)
- self.assertIn('Workflow Overview', result.output)
- self.assertIn('Nodes:', result.output)
- self.assertIn('Edges:', result.output)
-
+ self.assertIn("Workflow Overview", result.output)
+ self.assertIn("Nodes:", result.output)
+ self.assertIn("Edges:", result.output)
+
def test_inspect_missing_file(self):
- result = self.runner.invoke(cli, ['inspect', 'nonexistent.graphml'])
+ result = self.runner.invoke(cli, ["inspect", "nonexistent.graphml"])
self.assertNotEqual(result.exit_code, 0)
-
+
def test_inspect_json_output(self):
with self.runner.isolated_filesystem(temp_dir=self.temp_dir):
- result = self.runner.invoke(cli, ['init', 'test-project'])
+ result = self.runner.invoke(cli, ["init", "test-project"])
self.assertEqual(result.exit_code, 0)
-
- result = self.runner.invoke(cli, ['inspect', 'test-project/workflow.graphml', '--json'])
+
+ result = self.runner.invoke(
+ cli, ["inspect", "test-project/workflow.graphml", "--json"]
+ )
self.assertEqual(result.exit_code, 0)
-
+
import json
+
output_data = json.loads(result.output)
- self.assertIn('workflow', output_data)
- self.assertIn('nodes', output_data)
- self.assertIn('edges', output_data)
- self.assertEqual(output_data['workflow'], 'workflow.graphml')
-
+ self.assertIn("workflow", output_data)
+ self.assertIn("nodes", output_data)
+ self.assertIn("edges", output_data)
+ self.assertEqual(output_data["workflow"], "workflow.graphml")
+
def test_inspect_missing_source_file(self):
with self.runner.isolated_filesystem(temp_dir=self.temp_dir):
- result = self.runner.invoke(cli, ['init', 'test-project'])
+ result = self.runner.invoke(cli, ["init", "test-project"])
self.assertEqual(result.exit_code, 0)
-
- Path('test-project/src/script.py').unlink()
-
- result = self.runner.invoke(cli, ['inspect', 'test-project/workflow.graphml', '--source', 'src'])
+
+ Path("test-project/src/script.py").unlink()
+
+ result = self.runner.invoke(
+ cli, ["inspect", "test-project/workflow.graphml", "--source", "src"]
+ )
self.assertEqual(result.exit_code, 0)
- self.assertIn('Missing files', result.output)
+ self.assertIn("Missing files", result.output)
+
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main()
diff --git a/tests/test_concore.py b/tests/test_concore.py
index fd897291..a62efe86 100644
--- a/tests/test_concore.py
+++ b/tests/test_concore.py
@@ -2,40 +2,43 @@
import os
import numpy as np
-class TestSafeLiteralEval:
+class TestSafeLiteralEval:
def test_reads_dictionary_from_file(self, temp_dir):
test_file = os.path.join(temp_dir, "config.txt")
with open(test_file, "w") as f:
f.write("{'name': 'test', 'value': 123}")
-
+
from concore import safe_literal_eval
+
result = safe_literal_eval(test_file, {})
-
- assert result == {'name': 'test', 'value': 123}
+
+ assert result == {"name": "test", "value": 123}
def test_returns_default_when_file_missing(self):
from concore import safe_literal_eval
+
result = safe_literal_eval("nonexistent_file.txt", "fallback")
-
+
assert result == "fallback"
def test_returns_default_for_empty_file(self, temp_dir):
test_file = os.path.join(temp_dir, "empty.txt")
- with open(test_file, "w") as f:
+ with open(test_file, "w") as _:
pass
-
+
from concore import safe_literal_eval
+
result = safe_literal_eval(test_file, "default")
-
+
assert result == "default"
class TestTryparam:
-
@pytest.fixture(autouse=True)
def reset_params(self):
from concore import params
+
original_params = params.copy()
yield
params.clear()
@@ -43,46 +46,49 @@ def reset_params(self):
def test_returns_existing_parameter(self):
from concore import tryparam, params
- params['my_setting'] = 'custom_value'
-
- result = tryparam('my_setting', 'default_value')
-
- assert result == 'custom_value'
+
+ params["my_setting"] = "custom_value"
+
+ result = tryparam("my_setting", "default_value")
+
+ assert result == "custom_value"
def test_returns_default_for_missing_parameter(self):
from concore import tryparam
- result = tryparam('missing_param', 'fallback')
-
- assert result == 'fallback'
+ result = tryparam("missing_param", "fallback")
+
+ assert result == "fallback"
-class TestZeroMQPort:
+class TestZeroMQPort:
def test_class_is_defined(self):
from concore import ZeroMQPort
+
assert ZeroMQPort is not None
class TestDefaultConfiguration:
-
def test_default_input_path(self):
from concore import inpath
+
assert inpath == "./in"
def test_default_output_path(self):
from concore import outpath
+
assert outpath == "./out"
class TestPublicAPI:
-
def test_module_imports_successfully(self):
from concore import safe_literal_eval
+
assert safe_literal_eval is not None
def test_core_functions_exist(self):
from concore import safe_literal_eval, tryparam, default_maxtime
-
+
assert callable(safe_literal_eval)
assert callable(tryparam)
assert callable(default_maxtime)
@@ -91,6 +97,7 @@ def test_core_functions_exist(self):
class TestNumpyConversion:
def test_convert_scalar(self):
from concore import convert_numpy_to_python
+
val = np.float64(3.14)
res = convert_numpy_to_python(val)
assert type(res) == float
@@ -98,63 +105,70 @@ def test_convert_scalar(self):
def test_convert_list_and_dict(self):
from concore import convert_numpy_to_python
- data = {
- 'a': np.int32(10),
- 'b': [np.float64(1.1), np.float64(2.2)]
- }
+
+ data = {"a": np.int32(10), "b": [np.float64(1.1), np.float64(2.2)]}
res = convert_numpy_to_python(data)
- assert type(res['a']) == int
- assert type(res['b'][0]) == float
- assert res['b'][1] == 2.2
+ assert type(res["a"]) == int
+ assert type(res["b"][0]) == float
+ assert res["b"][1] == 2.2
+
class TestInitVal:
@pytest.fixture(autouse=True)
def reset_simtime(self):
import concore
+
old_simtime = concore.simtime
yield
concore.simtime = old_simtime
def test_initval_updates_simtime(self):
import concore
+
concore.simtime = 0
# initval takes string repr of a list [time, val1, val2...]
result = concore.initval("[100, 'data']")
-
+
assert concore.simtime == 100
- assert result == ['data']
+ assert result == ["data"]
def test_initval_handles_bad_input(self):
import concore
+
concore.simtime = 0
# Input that isn't a list
result = concore.initval("not_a_list")
assert concore.simtime == 0
assert result == []
+
class TestDefaultMaxTime:
def test_uses_file_value(self, temp_dir, monkeypatch):
import concore
+
# Mock the path to maxtime file
maxtime_file = os.path.join(temp_dir, "concore.maxtime")
with open(maxtime_file, "w") as f:
f.write("500")
-
- monkeypatch.setattr(concore, 'concore_maxtime_file', maxtime_file)
+
+ monkeypatch.setattr(concore, "concore_maxtime_file", maxtime_file)
concore.default_maxtime(100)
-
+
assert concore.maxtime == 500
def test_uses_default_when_missing(self, monkeypatch):
import concore
- monkeypatch.setattr(concore, 'concore_maxtime_file', "missing_file")
+
+ monkeypatch.setattr(concore, "concore_maxtime_file", "missing_file")
concore.default_maxtime(999)
assert concore.maxtime == 999
+
class TestUnchanged:
@pytest.fixture(autouse=True)
def reset_globals(self):
import concore
+
old_s = concore.s
old_olds = concore.olds
yield
@@ -163,52 +177,61 @@ def reset_globals(self):
def test_unchanged_returns_true_if_same(self):
import concore
+
concore.s = "same"
concore.olds = "same"
-
+
# Should return True and reset s to empty
assert concore.unchanged() is True
- assert concore.s == ''
+ assert concore.s == ""
def test_unchanged_returns_false_if_diff(self):
import concore
+
concore.s = "new"
concore.olds = "old"
-
+
assert concore.unchanged() is False
assert concore.olds == "new"
-class TestParseParams:
+
+class TestParseParams:
def test_simple_key_value_pairs(self):
from concore import parse_params
+
params = parse_params("a=1;b=2")
assert params == {"a": 1, "b": 2}
def test_preserves_whitespace_in_values(self):
from concore import parse_params
+
params = parse_params("label = hello world ; x = 5")
assert params["label"] == "hello world"
assert params["x"] == 5
def test_embedded_equals_in_value(self):
from concore import parse_params
+
params = parse_params("url=https://example.com?a=1&b=2")
assert params["url"] == "https://example.com?a=1&b=2"
def test_numeric_and_list_coercion(self):
from concore import parse_params
+
params = parse_params("delay=5;coeffs=[1,2,3]")
assert params["delay"] == 5
assert params["coeffs"] == [1, 2, 3]
def test_dict_literal_backward_compatibility(self):
from concore import parse_params
+
params = parse_params("{'a': 1, 'b': 2}")
assert params == {"a": 1, "b": 2}
def test_windows_quoted_input(self):
from concore import parse_params
- s = "\"a=1;b=2\""
+
+ s = '"a=1;b=2"'
s = s[1:-1] # simulate quote stripping before parse_params
params = parse_params(s)
assert params == {"a": 1, "b": 2}
@@ -218,6 +241,7 @@ class TestWriteZMQ:
@pytest.fixture(autouse=True)
def reset_zmq_ports(self):
import concore
+
original_ports = concore.zmq_ports.copy()
yield
concore.zmq_ports.clear()
@@ -289,6 +313,7 @@ class TestSimtimeNotMutatedByWrite:
@pytest.fixture(autouse=True)
def reset_simtime(self):
import concore
+
old_simtime = concore.simtime
yield
concore.simtime = old_simtime
@@ -296,6 +321,7 @@ def reset_simtime(self):
@pytest.fixture(autouse=True)
def reset_outpath(self):
import concore
+
old_outpath = concore.outpath
yield
concore.outpath = old_outpath
@@ -303,6 +329,7 @@ def reset_outpath(self):
@pytest.fixture(autouse=True)
def reset_zmq_ports(self):
import concore
+
original_ports = concore.zmq_ports.copy()
yield
concore.zmq_ports.clear()
@@ -363,10 +390,12 @@ def test_multi_port_file_writes_share_same_timestamp(self, temp_dir):
# Read back the written files and compare timestamps
from ast import literal_eval
+
payloads = []
for p in (1, 2):
- with open(os.path.join(temp_dir, "out" + str(p),
- ("u" if p == 1 else "v"))) as f:
+ with open(
+ os.path.join(temp_dir, "out" + str(p), ("u" if p == 1 else "v"))
+ ) as f:
payloads.append(literal_eval(f.read()))
ts1, ts2 = payloads[0][0], payloads[1][0]
@@ -383,6 +412,7 @@ def test_multi_port_zmq_writes_share_same_timestamp(self):
class DummyPort:
def __init__(self):
self.sent = None
+
def send_json_with_retry(self, msg):
self.sent = msg
diff --git a/tests/test_concoredocker.py b/tests/test_concoredocker.py
index c3743911..8a2f1344 100644
--- a/tests/test_concoredocker.py
+++ b/tests/test_concoredocker.py
@@ -3,16 +3,16 @@
class TestSafeLiteralEval:
-
def test_reads_dictionary_from_file(self, temp_dir):
test_file = os.path.join(temp_dir, "ports.txt")
with open(test_file, "w") as f:
f.write("{'a': 1, 'b': 2}")
from concoredocker import safe_literal_eval
+
result = safe_literal_eval(test_file, {})
- assert result == {'a': 1, 'b': 2}
+ assert result == {"a": 1, "b": 2}
def test_reads_list_from_file(self, temp_dir):
test_file = os.path.join(temp_dir, "data.txt")
@@ -20,15 +20,17 @@ def test_reads_list_from_file(self, temp_dir):
f.write("[1, 2, 3]")
from concoredocker import safe_literal_eval
+
result = safe_literal_eval(test_file, [])
assert result == [1, 2, 3]
def test_returns_default_when_file_missing(self):
from concoredocker import safe_literal_eval
- result = safe_literal_eval("/nonexistent.txt", {'default': True})
- assert result == {'default': True}
+ result = safe_literal_eval("/nonexistent.txt", {"default": True})
+
+ assert result == {"default": True}
def test_returns_default_for_bad_syntax(self, temp_dir):
test_file = os.path.join(temp_dir, "bad.txt")
@@ -36,23 +38,25 @@ def test_returns_default_for_bad_syntax(self, temp_dir):
f.write("not valid {{{")
from concoredocker import safe_literal_eval
+
result = safe_literal_eval(test_file, "fallback")
assert result == "fallback"
class TestUnchanged:
-
def test_returns_true_when_unchanged(self):
import concoredocker
+
concoredocker.s = "abc"
concoredocker.olds = "abc"
assert concoredocker.unchanged() == True
- assert concoredocker.s == ''
+ assert concoredocker.s == ""
def test_returns_false_when_changed(self):
import concoredocker
+
concoredocker.s = "new"
concoredocker.olds = "old"
@@ -61,9 +65,9 @@ def test_returns_false_when_changed(self):
class TestInitval:
-
def test_parses_simtime_and_values(self):
import concoredocker
+
concoredocker.simtime = 0
result = concoredocker.initval("[5.0, 1.0, 2.0]")
@@ -72,6 +76,7 @@ def test_parses_simtime_and_values(self):
def test_parses_single_value(self):
import concoredocker
+
concoredocker.simtime = 0
result = concoredocker.initval("[10.0, 99]")
@@ -80,9 +85,9 @@ def test_parses_single_value(self):
class TestWrite:
-
def test_writes_list_with_simtime(self, temp_dir):
import concoredocker
+
old_outpath = concoredocker.outpath
outdir = os.path.join(temp_dir, "1")
os.makedirs(outdir)
@@ -98,6 +103,7 @@ def test_writes_list_with_simtime(self, temp_dir):
def test_writes_with_delta(self, temp_dir):
import concoredocker
+
old_outpath = concoredocker.outpath
outdir = os.path.join(temp_dir, "1")
os.makedirs(outdir)
@@ -115,9 +121,9 @@ def test_writes_with_delta(self, temp_dir):
class TestRead:
-
def test_reads_and_parses_data(self, temp_dir):
import concoredocker
+
old_inpath = concoredocker.inpath
old_delay = concoredocker.delay
indir = os.path.join(temp_dir, "1")
@@ -125,10 +131,10 @@ def test_reads_and_parses_data(self, temp_dir):
concoredocker.inpath = temp_dir
concoredocker.delay = 0.001
- with open(os.path.join(indir, "data"), 'w') as f:
+ with open(os.path.join(indir, "data"), "w") as f:
f.write("[7.0, 100, 200]")
- concoredocker.s = ''
+ concoredocker.s = ""
concoredocker.simtime = 0
result = concoredocker.read(1, "data", "[0, 0, 0]")
@@ -139,6 +145,7 @@ def test_reads_and_parses_data(self, temp_dir):
def test_returns_default_when_file_missing(self, temp_dir):
import concoredocker
+
old_inpath = concoredocker.inpath
old_delay = concoredocker.delay
indir = os.path.join(temp_dir, "1")
@@ -146,7 +153,7 @@ def test_returns_default_when_file_missing(self, temp_dir):
concoredocker.inpath = temp_dir
concoredocker.delay = 0.001
- concoredocker.s = ''
+ concoredocker.s = ""
concoredocker.simtime = 0
result = concoredocker.read(1, "nofile", "[0, 5, 5]")
@@ -159,6 +166,7 @@ class TestZMQ:
@pytest.fixture(autouse=True)
def reset_zmq_ports(self):
import concoredocker
+
original_ports = concoredocker.zmq_ports.copy()
yield
concoredocker.zmq_ports.clear()
diff --git a/tests/test_graph.py b/tests/test_graph.py
index 6aef0d3c..efebe3e5 100644
--- a/tests/test_graph.py
+++ b/tests/test_graph.py
@@ -5,41 +5,41 @@
from click.testing import CliRunner
from concore_cli.cli import cli
+
class TestGraphValidation(unittest.TestCase):
-
def setUp(self):
self.runner = CliRunner()
self.temp_dir = tempfile.mkdtemp()
-
+
def tearDown(self):
if Path(self.temp_dir).exists():
shutil.rmtree(self.temp_dir)
-
+
def create_graph_file(self, filename, content):
filepath = Path(self.temp_dir) / filename
- with open(filepath, 'w') as f:
+ with open(filepath, "w") as f:
f.write(content)
return str(filepath)
def test_validate_corrupted_xml(self):
content = ''
- filepath = self.create_graph_file('corrupted.graphml', content)
-
- result = self.runner.invoke(cli, ['validate', filepath])
-
- self.assertIn('Validation failed', result.output)
- self.assertIn('Invalid XML', result.output)
+ filepath = self.create_graph_file("corrupted.graphml", content)
+
+ result = self.runner.invoke(cli, ["validate", filepath])
+
+ self.assertIn("Validation failed", result.output)
+ self.assertIn("Invalid XML", result.output)
def test_validate_empty_file(self):
- filepath = self.create_graph_file('empty.graphml', '')
-
- result = self.runner.invoke(cli, ['validate', filepath])
-
- self.assertIn('Validation failed', result.output)
- self.assertIn('File is empty', result.output)
-
+ filepath = self.create_graph_file("empty.graphml", "")
+
+ result = self.runner.invoke(cli, ["validate", filepath])
+
+ self.assertIn("Validation failed", result.output)
+ self.assertIn("File is empty", result.output)
+
def test_validate_missing_node_id(self):
- content = '''
+ content = """
@@ -47,14 +47,14 @@ def test_validate_missing_node_id(self):
- '''
- filepath = self.create_graph_file('missing_id.graphml', content)
- result = self.runner.invoke(cli, ['validate', filepath])
- self.assertIn('Validation failed', result.output)
+ """
+ filepath = self.create_graph_file("missing_id.graphml", content)
+ result = self.runner.invoke(cli, ["validate", filepath])
+ self.assertIn("Validation failed", result.output)
self.assertIn("Node missing required 'id' attribute", result.output)
def test_validate_missing_edgedefault(self):
- content = '''
+ content = """
@@ -62,23 +62,23 @@ def test_validate_missing_edgedefault(self):
- '''
- filepath = self.create_graph_file('missing_default.graphml', content)
- result = self.runner.invoke(cli, ['validate', filepath])
- self.assertIn('Validation failed', result.output)
+ """
+ filepath = self.create_graph_file("missing_default.graphml", content)
+ result = self.runner.invoke(cli, ["validate", filepath])
+ self.assertIn("Validation failed", result.output)
self.assertIn("Graph missing required 'edgedefault'", result.output)
def test_validate_missing_root_element(self):
content = ''
- filepath = self.create_graph_file('not_graphml.xml', content)
-
- result = self.runner.invoke(cli, ['validate', filepath])
-
- self.assertIn('Validation failed', result.output)
- self.assertIn('missing root element', result.output)
+ filepath = self.create_graph_file("not_graphml.xml", content)
+
+ result = self.runner.invoke(cli, ["validate", filepath])
+
+ self.assertIn("Validation failed", result.output)
+ self.assertIn("missing root element", result.output)
def test_validate_broken_edges(self):
- content = '''
+ content = """
@@ -87,16 +87,16 @@ def test_validate_broken_edges(self):
- '''
- filepath = self.create_graph_file('bad_edge.graphml', content)
-
- result = self.runner.invoke(cli, ['validate', filepath])
-
- self.assertIn('Validation failed', result.output)
- self.assertIn('Edge references non-existent target node', result.output)
+ """
+ filepath = self.create_graph_file("bad_edge.graphml", content)
+
+ result = self.runner.invoke(cli, ["validate", filepath])
+
+ self.assertIn("Validation failed", result.output)
+ self.assertIn("Edge references non-existent target node", result.output)
def test_validate_node_missing_filename(self):
- content = '''
+ content = """
@@ -104,16 +104,16 @@ def test_validate_node_missing_filename(self):
- '''
- filepath = self.create_graph_file('bad_node.graphml', content)
-
- result = self.runner.invoke(cli, ['validate', filepath])
-
- self.assertIn('Validation failed', result.output)
- self.assertIn('has no filename', result.output)
+ """
+ filepath = self.create_graph_file("bad_node.graphml", content)
+
+ result = self.runner.invoke(cli, ["validate", filepath])
+
+ self.assertIn("Validation failed", result.output)
+ self.assertIn("has no filename", result.output)
def test_validate_unsafe_node_label(self):
- content = '''
+ content = """
@@ -121,16 +121,16 @@ def test_validate_unsafe_node_label(self):
- '''
- filepath = self.create_graph_file('injection.graphml', content)
+ """
+ filepath = self.create_graph_file("injection.graphml", content)
- result = self.runner.invoke(cli, ['validate', filepath])
+ result = self.runner.invoke(cli, ["validate", filepath])
- self.assertIn('Validation failed', result.output)
- self.assertIn('unsafe shell characters', result.output)
+ self.assertIn("Validation failed", result.output)
+ self.assertIn("unsafe shell characters", result.output)
def test_validate_valid_graph(self):
- content = '''
+ content = """
@@ -138,16 +138,16 @@ def test_validate_valid_graph(self):
- '''
- filepath = self.create_graph_file('valid.graphml', content)
-
- result = self.runner.invoke(cli, ['validate', filepath])
-
- self.assertIn('Validation passed', result.output)
- self.assertIn('Workflow is valid', result.output)
-
+ """
+ filepath = self.create_graph_file("valid.graphml", content)
+
+ result = self.runner.invoke(cli, ["validate", filepath])
+
+ self.assertIn("Validation passed", result.output)
+ self.assertIn("Workflow is valid", result.output)
+
def test_validate_missing_source_file(self):
- content = '''
+ content = """
@@ -155,18 +155,20 @@ def test_validate_missing_source_file(self):
- '''
- filepath = self.create_graph_file('workflow.graphml', content)
- source_dir = Path(self.temp_dir) / 'src'
+ """
+ filepath = self.create_graph_file("workflow.graphml", content)
+ source_dir = Path(self.temp_dir) / "src"
source_dir.mkdir()
-
- result = self.runner.invoke(cli, ['validate', filepath, '--source', str(source_dir)])
-
- self.assertIn('Validation failed', result.output)
- self.assertIn('Missing source file', result.output)
-
+
+ result = self.runner.invoke(
+ cli, ["validate", filepath, "--source", str(source_dir)]
+ )
+
+ self.assertIn("Validation failed", result.output)
+ self.assertIn("Missing source file", result.output)
+
def test_validate_with_existing_source_file(self):
- content = '''
+ content = """
@@ -174,18 +176,20 @@ def test_validate_with_existing_source_file(self):
- '''
- filepath = self.create_graph_file('workflow.graphml', content)
- source_dir = Path(self.temp_dir) / 'src'
+ """
+ filepath = self.create_graph_file("workflow.graphml", content)
+ source_dir = Path(self.temp_dir) / "src"
source_dir.mkdir()
- (source_dir / 'exists.py').write_text('print("hello")')
-
- result = self.runner.invoke(cli, ['validate', filepath, '--source', str(source_dir)])
-
- self.assertIn('Validation passed', result.output)
-
+ (source_dir / "exists.py").write_text('print("hello")')
+
+ result = self.runner.invoke(
+ cli, ["validate", filepath, "--source", str(source_dir)]
+ )
+
+ self.assertIn("Validation passed", result.output)
+
def test_validate_zmq_port_conflict(self):
- content = '''
+ content = """
@@ -202,16 +206,16 @@ def test_validate_zmq_port_conflict(self):
- '''
- filepath = self.create_graph_file('conflict.graphml', content)
-
- result = self.runner.invoke(cli, ['validate', filepath])
-
- self.assertIn('Validation failed', result.output)
- self.assertIn('Port conflict', result.output)
-
+ """
+ filepath = self.create_graph_file("conflict.graphml", content)
+
+ result = self.runner.invoke(cli, ["validate", filepath])
+
+ self.assertIn("Validation failed", result.output)
+ self.assertIn("Port conflict", result.output)
+
def test_validate_reserved_port(self):
- content = '''
+ content = """
@@ -225,16 +229,16 @@ def test_validate_reserved_port(self):
- '''
- filepath = self.create_graph_file('reserved.graphml', content)
-
- result = self.runner.invoke(cli, ['validate', filepath])
-
- self.assertIn('Port 80', result.output)
- self.assertIn('reserved range', result.output)
-
+ """
+ filepath = self.create_graph_file("reserved.graphml", content)
+
+ result = self.runner.invoke(cli, ["validate", filepath])
+
+ self.assertIn("Port 80", result.output)
+ self.assertIn("reserved range", result.output)
+
def test_validate_cycle_detection(self):
- content = '''
+ content = """
@@ -251,16 +255,16 @@ def test_validate_cycle_detection(self):
- '''
- filepath = self.create_graph_file('cycle.graphml', content)
-
- result = self.runner.invoke(cli, ['validate', filepath])
-
- self.assertIn('cycles', result.output)
- self.assertIn('control loops', result.output)
-
+ """
+ filepath = self.create_graph_file("cycle.graphml", content)
+
+ result = self.runner.invoke(cli, ["validate", filepath])
+
+ self.assertIn("cycles", result.output)
+ self.assertIn("control loops", result.output)
+
def test_validate_port_zero(self):
- content = '''
+ content = """
@@ -274,16 +278,16 @@ def test_validate_port_zero(self):
- '''
- filepath = self.create_graph_file('port_zero.graphml', content)
-
- result = self.runner.invoke(cli, ['validate', filepath])
-
- self.assertIn('Validation failed', result.output)
- self.assertIn('must be at least 1', result.output)
-
+ """
+ filepath = self.create_graph_file("port_zero.graphml", content)
+
+ result = self.runner.invoke(cli, ["validate", filepath])
+
+ self.assertIn("Validation failed", result.output)
+ self.assertIn("must be at least 1", result.output)
+
def test_validate_port_exceeds_maximum(self):
- content = '''
+ content = """
@@ -297,13 +301,14 @@ def test_validate_port_exceeds_maximum(self):
- '''
- filepath = self.create_graph_file('port_max.graphml', content)
-
- result = self.runner.invoke(cli, ['validate', filepath])
-
- self.assertIn('Validation failed', result.output)
- self.assertIn('exceeds maximum (65535)', result.output)
-
-if __name__ == '__main__':
- unittest.main()
\ No newline at end of file
+ """
+ filepath = self.create_graph_file("port_max.graphml", content)
+
+ result = self.runner.invoke(cli, ["validate", filepath])
+
+ self.assertIn("Validation failed", result.output)
+ self.assertIn("exceeds maximum (65535)", result.output)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/test_openjupyter_security.py b/tests/test_openjupyter_security.py
index 230a4819..b046dc9b 100644
--- a/tests/test_openjupyter_security.py
+++ b/tests/test_openjupyter_security.py
@@ -1,4 +1,5 @@
"""Tests for the secured /openJupyter/ and /stopJupyter/ endpoints."""
+
import os
import sys
import pytest
@@ -8,7 +9,9 @@
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# Skip entire module if flask is not installed (e.g. in CI with minimal deps)
-pytest.importorskip("flask", reason="flask not installed — skipping server endpoint tests")
+pytest.importorskip(
+ "flask", reason="flask not installed — skipping server endpoint tests"
+)
# Set a test API key before importing the app module
TEST_API_KEY = "test-secret-key-12345"
@@ -18,6 +21,7 @@
def reset_jupyter_process():
"""Reset the module-level jupyter_process before each test."""
import fri.server.main as mod
+
mod.jupyter_process = None
yield
mod.jupyter_process = None
@@ -29,6 +33,7 @@ def client():
with patch.dict(os.environ, {"CONCORE_API_KEY": TEST_API_KEY}):
# Re-read env var after patching
import fri.server.main as mod
+
mod.API_KEY = TEST_API_KEY
mod.app.config["TESTING"] = True
with mod.app.test_client() as c:
@@ -39,6 +44,7 @@ def client():
def client_no_key():
"""Create a Flask test client without API key configured."""
import fri.server.main as mod
+
mod.API_KEY = None
mod.app.config["TESTING"] = True
with mod.app.test_client() as c:
@@ -60,9 +66,7 @@ def test_wrong_api_key_returns_403(self, client):
def test_server_without_api_key_configured_returns_500(self, client_no_key):
"""If CONCORE_API_KEY is not set on server, return 500."""
- resp = client_no_key.post(
- "/openJupyter/", headers={"X-API-KEY": "anything"}
- )
+ resp = client_no_key.post("/openJupyter/", headers={"X-API-KEY": "anything"})
assert resp.status_code == 500
@@ -76,9 +80,7 @@ def test_authorized_request_starts_jupyter(self, mock_popen, client):
mock_proc.poll.return_value = None # process running
mock_popen.return_value = mock_proc
- resp = client.post(
- "/openJupyter/", headers={"X-API-KEY": TEST_API_KEY}
- )
+ resp = client.post("/openJupyter/", headers={"X-API-KEY": TEST_API_KEY})
assert resp.status_code == 200
data = resp.get_json()
assert data["message"] == "Jupyter Lab started"
@@ -96,15 +98,11 @@ def test_duplicate_launch_returns_409(self, mock_popen, client):
mock_popen.return_value = mock_proc
# First launch
- resp1 = client.post(
- "/openJupyter/", headers={"X-API-KEY": TEST_API_KEY}
- )
+ resp1 = client.post("/openJupyter/", headers={"X-API-KEY": TEST_API_KEY})
assert resp1.status_code == 200
# Second launch should be rejected
- resp2 = client.post(
- "/openJupyter/", headers={"X-API-KEY": TEST_API_KEY}
- )
+ resp2 = client.post("/openJupyter/", headers={"X-API-KEY": TEST_API_KEY})
assert resp2.status_code == 409
data = resp2.get_json()
assert data["message"] == "Jupyter already running"
@@ -112,9 +110,7 @@ def test_duplicate_launch_returns_409(self, mock_popen, client):
@patch("fri.server.main.subprocess.Popen", side_effect=OSError("fail"))
def test_popen_failure_returns_500(self, mock_popen, client):
"""If Popen raises, return 500."""
- resp = client.post(
- "/openJupyter/", headers={"X-API-KEY": TEST_API_KEY}
- )
+ resp = client.post("/openJupyter/", headers={"X-API-KEY": TEST_API_KEY})
assert resp.status_code == 500
data = resp.get_json()
assert "error" in data
@@ -130,9 +126,7 @@ def test_stop_without_auth_returns_403(self, client):
def test_stop_when_no_process_returns_404(self, client):
"""Stop with no running process returns 404."""
- resp = client.post(
- "/stopJupyter/", headers={"X-API-KEY": TEST_API_KEY}
- )
+ resp = client.post("/stopJupyter/", headers={"X-API-KEY": TEST_API_KEY})
assert resp.status_code == 404
@patch("fri.server.main.subprocess.Popen")
@@ -146,9 +140,7 @@ def test_stop_running_process_returns_200(self, mock_popen, client):
client.post("/openJupyter/", headers={"X-API-KEY": TEST_API_KEY})
# Stop
- resp = client.post(
- "/stopJupyter/", headers={"X-API-KEY": TEST_API_KEY}
- )
+ resp = client.post("/stopJupyter/", headers={"X-API-KEY": TEST_API_KEY})
assert resp.status_code == 200
data = resp.get_json()
assert data["message"] == "Jupyter stopped"
diff --git a/tests/test_protocol_conformance.py b/tests/test_protocol_conformance.py
index e831165a..9bb5ab3c 100644
--- a/tests/test_protocol_conformance.py
+++ b/tests/test_protocol_conformance.py
@@ -21,11 +21,17 @@ def _validate_fixture_document_shape(doc):
required_top = {"schema_version", "runtime", "mode", "cases"}
missing = required_top - set(doc.keys())
if missing:
- raise AssertionError(f"Fixture document missing required top-level keys: {sorted(missing)}")
+ raise AssertionError(
+ f"Fixture document missing required top-level keys: {sorted(missing)}"
+ )
if doc["runtime"] != "python":
- raise AssertionError(f"Phase-1 fixture runtime must be 'python', found: {doc['runtime']}")
+ raise AssertionError(
+ f"Phase-1 fixture runtime must be 'python', found: {doc['runtime']}"
+ )
if doc["mode"] != "report_only":
- raise AssertionError(f"Phase-1 fixture mode must be 'report_only', found: {doc['mode']}")
+ raise AssertionError(
+ f"Phase-1 fixture mode must be 'report_only', found: {doc['mode']}"
+ )
if not isinstance(doc["cases"], list) or not doc["cases"]:
raise AssertionError("Fixture document must contain a non-empty 'cases' list")
diff --git a/tests/test_tool_config.py b/tests/test_tool_config.py
index 58adc903..7baa25e3 100644
--- a/tests/test_tool_config.py
+++ b/tests/test_tool_config.py
@@ -1,6 +1,6 @@
-import pytest
import os
+
# can't import mkconcore directly (sys.argv at module level), so we duplicate the parser
def _load_tool_config(filepath):
tools = {}
@@ -17,7 +17,6 @@ def _load_tool_config(filepath):
class TestLoadToolConfig:
-
def test_basic_overrides(self, temp_dir):
cfg = os.path.join(temp_dir, "concore.tools")
with open(cfg, "w") as f:
@@ -69,7 +68,7 @@ def test_whitespace_around_key_value(self, temp_dir):
def test_empty_file(self, temp_dir):
cfg = os.path.join(temp_dir, "concore.tools")
- with open(cfg, "w") as f:
+ with open(cfg, "w") as _:
pass
tools = _load_tool_config(cfg)