From 925e536b8363c436b4cdee01afeaa337fe9e205c Mon Sep 17 00:00:00 2001 From: Abigail Emery Date: Tue, 24 Feb 2026 17:20:02 +0000 Subject: [PATCH 01/13] wip --- src/blueapi/service/interface.py | 15 +++++++++++++ src/blueapi/service/main.py | 38 +++++++++++++++++++++++++++++++- 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/src/blueapi/service/interface.py b/src/blueapi/service/interface.py index 6acc29ab7..d7020ede8 100644 --- a/src/blueapi/service/interface.py +++ b/src/blueapi/service/interface.py @@ -1,5 +1,6 @@ from collections.abc import Mapping from functools import cache +from multiprocessing.connection import Connection from typing import Any from bluesky.callbacks.tiled_writer import TiledWriter @@ -270,3 +271,17 @@ def get_python_env( """Retrieve information about the Python environment""" scratch = config().scratch return get_python_environment(config=scratch, name=name, source=source) + + +def pipe_events(tx: Connection) -> int: + + def handler( + worker_event: WorkerEvent, + cor_id: str | None, + ) -> None: + tx.send(worker_event) + + task_worker = worker() + sub_id = task_worker.worker_events.subscribe(handler) + + return sub_id diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index c79dd3df3..133bd4961 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -2,6 +2,7 @@ import urllib.parse from collections.abc import Awaitable, Callable from contextlib import asynccontextmanager +from multiprocessing import Pipe from typing import Annotated, Any import jwt @@ -14,8 +15,10 @@ HTTPException, Request, Response, + WebSocket, status, ) +from fastapi.concurrency import run_in_threadpool from fastapi.datastructures import Address from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import RedirectResponse, StreamingResponse @@ -37,7 +40,7 @@ from blueapi.config import ApplicationConfig, OIDCConfig, Tag from blueapi.service import interface from blueapi.worker import TrackableTask, WorkerState -from blueapi.worker.event import TaskStatusEnum +from blueapi.worker.event import TaskStatusEnum, WorkerEvent from .model import ( DeviceModel, @@ -540,6 +543,39 @@ def logout(runner: Annotated[WorkerDispatcher, Depends(_runner)]) -> Response: ) +@secure_router.websocket("/run_plan") +async def run_plan( + ws: WebSocket, + runner: Annotated[WorkerDispatcher, Depends(_runner)], +): + user = "alice" + + # ack ws + await ws.accept() + # accept task request through socket + rq = await ws.receive_json() + # submit task to runner + task_request: TaskRequest = TaskRequest.model_validate(rq) + task_id: str = runner.run(interface.submit_task, task_request, {"user": user}) + # add listener to runner + tx, rx = Pipe() + h = runner.run(interface.pipe_events, tx=tx) + # start task + task = WorkerTask(task_id=task_id) + runner.run( + interface.begin_task, + task=task, + ) + # pipe events to ws + while True: + event: WorkerEvent = await run_in_threadpool(rx.recv) + await ws.send_json(event.model_dump(mode="json")) + if event.is_complete(): + break + # ??? + # profit + + @start_as_current_span(TRACER, "config") def start(config: ApplicationConfig): import uvicorn From 05a5de67ddb7bf407a802efd191965da02c96e02 Mon Sep 17 00:00:00 2001 From: Abigail Emery Date: Tue, 24 Feb 2026 17:37:08 +0000 Subject: [PATCH 02/13] client wip --- src/blueapi/cli/cli.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/src/blueapi/cli/cli.py b/src/blueapi/cli/cli.py index 7bafa831f..3cfa64299 100644 --- a/src/blueapi/cli/cli.py +++ b/src/blueapi/cli/cli.py @@ -362,6 +362,29 @@ def on_event(event: AnyEvent) -> None: raise ClickException(f"task could not run: {ve}") from ve +@controller.command(name="ws") +@click.argument("name", type=str) +@click.argument("parameters", type=ParametersType(), default={}, required=False) +def run_blocking( + name: str, + parameters: TaskParameters, +): + instrument_session = "cm33-3" + + from websockets.sync.client import connect + + task_req = TaskRequest( + name=name, + params=parameters, + instrument_session=instrument_session, + ) + + with connect("ws://localhost:8007/run_plan") as ws: + ws.send(task_req.model_dump_json()) + while message := ws.recv(): + print(message) + + @controller.command(name="state") @click.pass_obj @check_connection From b265114bc1ddaf8358777e211b04ceccac3b2d2d Mon Sep 17 00:00:00 2001 From: Abigail Emery Date: Tue, 24 Feb 2026 17:48:31 +0000 Subject: [PATCH 03/13] use normal iter --- src/blueapi/cli/cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/blueapi/cli/cli.py b/src/blueapi/cli/cli.py index 3cfa64299..aa05798df 100644 --- a/src/blueapi/cli/cli.py +++ b/src/blueapi/cli/cli.py @@ -381,7 +381,7 @@ def run_blocking( with connect("ws://localhost:8007/run_plan") as ws: ws.send(task_req.model_dump_json()) - while message := ws.recv(): + for message in ws: print(message) From 5cf65c3a83ca7847f073fa2b7c404bed636c734e Mon Sep 17 00:00:00 2001 From: Abigail Emery Date: Tue, 24 Feb 2026 17:48:48 +0000 Subject: [PATCH 04/13] close ws --- src/blueapi/service/main.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index 133bd4961..4ec66664f 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -572,8 +572,7 @@ async def run_plan( await ws.send_json(event.model_dump(mode="json")) if event.is_complete(): break - # ??? - # profit + await ws.close() @start_as_current_span(TRACER, "config") From 5349ded51ac2ad2d6570a8a603d002ee5c5d268a Mon Sep 17 00:00:00 2001 From: Abigail Emery Date: Tue, 24 Feb 2026 18:41:02 +0000 Subject: [PATCH 05/13] add some trys --- src/blueapi/service/main.py | 39 ++++++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index 4ec66664f..554f75aa3 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -41,6 +41,7 @@ from blueapi.service import interface from blueapi.worker import TrackableTask, WorkerState from blueapi.worker.event import TaskStatusEnum, WorkerEvent +from blueapi.worker.worker_errors import WorkerBusyError from .model import ( DeviceModel, @@ -555,24 +556,36 @@ async def run_plan( # accept task request through socket rq = await ws.receive_json() # submit task to runner - task_request: TaskRequest = TaskRequest.model_validate(rq) - task_id: str = runner.run(interface.submit_task, task_request, {"user": user}) + try: + task_request: TaskRequest = TaskRequest.model_validate(rq) + task_id: str = runner.run(interface.submit_task, task_request, {"user": user}) + except ValidationError: + await ws.close(code=1003, reason="invalid args") + return + # add listener to runner tx, rx = Pipe() h = runner.run(interface.pipe_events, tx=tx) # start task - task = WorkerTask(task_id=task_id) - runner.run( - interface.begin_task, - task=task, - ) + try: + task = WorkerTask(task_id=task_id) + runner.run( + interface.begin_task, + task=task, + ) + except WorkerBusyError: + await ws.close(code=1013, reason="Worker busy") + return # pipe events to ws - while True: - event: WorkerEvent = await run_in_threadpool(rx.recv) - await ws.send_json(event.model_dump(mode="json")) - if event.is_complete(): - break - await ws.close() + try: + while True: + event: WorkerEvent = await run_in_threadpool(rx.recv) + await ws.send_json(event.model_dump(mode="json")) + if event.is_complete(): + break + finally: + await ws.close() + runner.run(interface.unpipe_events, h=h) @start_as_current_span(TRACER, "config") From 855e0b7b6809f231454443c75b765d4457a9487d Mon Sep 17 00:00:00 2001 From: Abigail Emery Date: Tue, 24 Feb 2026 18:42:16 +0000 Subject: [PATCH 06/13] unpipe --- src/blueapi/service/interface.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/blueapi/service/interface.py b/src/blueapi/service/interface.py index d7020ede8..4ea671145 100644 --- a/src/blueapi/service/interface.py +++ b/src/blueapi/service/interface.py @@ -1,3 +1,4 @@ +import logging from collections.abc import Mapping from functools import cache from multiprocessing.connection import Connection @@ -22,6 +23,7 @@ WorkerTask, ) from blueapi.utils.serialization import access_blob +from blueapi.worker import task_worker from blueapi.worker.event import TaskStatusEnum, WorkerEvent, WorkerState from blueapi.worker.task import Task from blueapi.worker.task_worker import TaskWorker, TrackableTask @@ -29,7 +31,7 @@ """This module provides interface between web application and underlying Bluesky context and worker""" - +LOGGER = logging.getLogger(__name__) _CONFIG: ApplicationConfig = ApplicationConfig() @@ -279,9 +281,14 @@ def handler( worker_event: WorkerEvent, cor_id: str | None, ) -> None: + LOGGER.info("Sending event") tx.send(worker_event) task_worker = worker() sub_id = task_worker.worker_events.subscribe(handler) - return sub_id + + +def unpipe_events(h: int) -> None: + task_worker = worker() + task_worker.worker_events.unsubscribe(h) From f9086619b6c384101ae56e3b528de2ffc514a535 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Tue, 3 Mar 2026 11:52:21 +0000 Subject: [PATCH 07/13] Move websocket handling into BlueapiRestClient --- src/blueapi/cli/cli.py | 25 ++++++++++++++----------- src/blueapi/client/client.py | 4 ++++ src/blueapi/client/rest.py | 17 ++++++++++++++++- 3 files changed, 34 insertions(+), 12 deletions(-) diff --git a/src/blueapi/cli/cli.py b/src/blueapi/cli/cli.py index aa05798df..47cca91cd 100644 --- a/src/blueapi/cli/cli.py +++ b/src/blueapi/cli/cli.py @@ -363,26 +363,29 @@ def on_event(event: AnyEvent) -> None: @controller.command(name="ws") +@click.pass_obj @click.argument("name", type=str) @click.argument("parameters", type=ParametersType(), default={}, required=False) +@click.option( + "-i", + "--instrument-session", + type=str, + help=textwrap.dedent(""" + Instrument session associated with running the plan, + used to tell blueapi where to store any data and as a security check: + the session must be valid and active and you must be a member of it."""), + required=True, +) def run_blocking( - name: str, - parameters: TaskParameters, + obj: dict, name: str, parameters: TaskParameters, instrument_session: str ): - instrument_session = "cm33-3" - - from websockets.sync.client import connect - + client = cast(BlueapiClient, obj["client"]) task_req = TaskRequest( name=name, params=parameters, instrument_session=instrument_session, ) - - with connect("ws://localhost:8007/run_plan") as ws: - ws.send(task_req.model_dump_json()) - for message in ws: - print(message) + client.run_blocking(task_req) @controller.command(name="state") diff --git a/src/blueapi/client/client.py b/src/blueapi/client/client.py index c5b41ff45..0638f911f 100644 --- a/src/blueapi/client/client.py +++ b/src/blueapi/client/client.py @@ -439,6 +439,10 @@ def get_active_task(self) -> WorkerTask: return self.active_task + @start_as_current_span(TRACER, "request") + def run_blocking(self, request: TaskRequest): + self._rest.run_blocking(request) + @start_as_current_span(TRACER, "task", "timeout") def run_task( self, diff --git a/src/blueapi/client/rest.py b/src/blueapi/client/rest.py index 52150d36f..fb1a92313 100644 --- a/src/blueapi/client/rest.py +++ b/src/blueapi/client/rest.py @@ -8,7 +8,8 @@ get_tracer, start_as_current_span, ) -from pydantic import BaseModel, TypeAdapter, ValidationError +from pydantic import BaseModel, TypeAdapter, ValidationError, WebsocketUrl +from websockets.sync.client import connect from blueapi.config import RestConfig from blueapi.service.authentication import JWTAuth, SessionManager @@ -274,6 +275,20 @@ def _request_and_deserialize( deserialized = TypeAdapter(target_type).validate_python(response.json()) return deserialized + def run_blocking(self, req: TaskRequest): + url = self._ws_address().unicode_string().removesuffix("/") + "/run_plan" + print(url) + with connect(url) as ws: + ws.send(req.model_dump_json()) + for message in ws: + print(message) + + def _ws_address(self) -> WebsocketUrl: + # url = WebsocketUrl.build( + # scheme="ws", host=api.host, port=api.port, path=api.path + # ) + return WebsocketUrl("ws://localhost:8000/") + # https://github.com/DiamondLightSource/blueapi/issues/1256 - remove before 2.0 def __getattr__(name: str): From ad623a8ed3768bffd2c1fbf22ea883ffb2fd592a Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Tue, 3 Mar 2026 12:16:10 +0000 Subject: [PATCH 08/13] Send all events through websocket --- src/blueapi/service/interface.py | 13 ++++++++----- src/blueapi/service/main.py | 10 +++++++--- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/src/blueapi/service/interface.py b/src/blueapi/service/interface.py index 4ea671145..c05f8f871 100644 --- a/src/blueapi/service/interface.py +++ b/src/blueapi/service/interface.py @@ -11,6 +11,7 @@ from blueapi.cli.scratch import get_python_environment from blueapi.config import ApplicationConfig, OIDCConfig, StompConfig +from blueapi.core.bluesky_types import DataEvent from blueapi.core.context import BlueskyContext from blueapi.core.event import EventStream from blueapi.log import set_up_logging @@ -23,8 +24,7 @@ WorkerTask, ) from blueapi.utils.serialization import access_blob -from blueapi.worker import task_worker -from blueapi.worker.event import TaskStatusEnum, WorkerEvent, WorkerState +from blueapi.worker.event import ProgressEvent, TaskStatusEnum, WorkerEvent, WorkerState from blueapi.worker.task import Task from blueapi.worker.task_worker import TaskWorker, TrackableTask @@ -278,17 +278,20 @@ def get_python_env( def pipe_events(tx: Connection) -> int: def handler( - worker_event: WorkerEvent, - cor_id: str | None, + worker_event: WorkerEvent | DataEvent | ProgressEvent, + _cor_id: str | None, ) -> None: - LOGGER.info("Sending event") tx.send(worker_event) task_worker = worker() sub_id = task_worker.worker_events.subscribe(handler) + sub_id = task_worker.data_events.subscribe(handler) + sub_id = task_worker.progress_events.subscribe(handler) return sub_id def unpipe_events(h: int) -> None: task_worker = worker() task_worker.worker_events.unsubscribe(h) + task_worker.data_events.unsubscribe(h) + task_worker.progress_events.unsubscribe(h) diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index 554f75aa3..28ac4edbd 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -38,9 +38,10 @@ from super_state_machine.errors import TransitionError from blueapi.config import ApplicationConfig, OIDCConfig, Tag +from blueapi.core.bluesky_types import DataEvent from blueapi.service import interface from blueapi.worker import TrackableTask, WorkerState -from blueapi.worker.event import TaskStatusEnum, WorkerEvent +from blueapi.worker.event import ProgressEvent, TaskStatusEnum, WorkerEvent from blueapi.worker.worker_errors import WorkerBusyError from .model import ( @@ -66,6 +67,9 @@ LOGGER = logging.getLogger(__name__) +AnyEvent = WorkerEvent | DataEvent | ProgressEvent + + def _runner() -> WorkerDispatcher: """Intended to be used only with FastAPI Depends""" if RUNNER is None: @@ -579,9 +583,9 @@ async def run_plan( # pipe events to ws try: while True: - event: WorkerEvent = await run_in_threadpool(rx.recv) + event: AnyEvent = await run_in_threadpool(rx.recv) await ws.send_json(event.model_dump(mode="json")) - if event.is_complete(): + if isinstance(event, WorkerEvent) and event.is_complete(): break finally: await ws.close() From bf3d7eb0af0e608ba6191f5f788b2cf0aaa6a621 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Wed, 4 Mar 2026 14:50:32 +0000 Subject: [PATCH 09/13] Split pipe subscribe handles --- src/blueapi/service/interface.py | 22 +++++++++++++--------- src/blueapi/service/main.py | 2 +- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/src/blueapi/service/interface.py b/src/blueapi/service/interface.py index c05f8f871..5c402492e 100644 --- a/src/blueapi/service/interface.py +++ b/src/blueapi/service/interface.py @@ -275,7 +275,10 @@ def get_python_env( return get_python_environment(config=scratch, name=name, source=source) -def pipe_events(tx: Connection) -> int: +SubHandle = tuple[int, int, int] + + +def pipe_events(tx: Connection) -> SubHandle: def handler( worker_event: WorkerEvent | DataEvent | ProgressEvent, @@ -284,14 +287,15 @@ def handler( tx.send(worker_event) task_worker = worker() - sub_id = task_worker.worker_events.subscribe(handler) - sub_id = task_worker.data_events.subscribe(handler) - sub_id = task_worker.progress_events.subscribe(handler) - return sub_id + w_id = task_worker.worker_events.subscribe(handler) + d_id = task_worker.data_events.subscribe(handler) + p_id = task_worker.progress_events.subscribe(handler) + return (w_id, d_id, p_id) -def unpipe_events(h: int) -> None: +def unpipe_events(hnd: SubHandle) -> None: task_worker = worker() - task_worker.worker_events.unsubscribe(h) - task_worker.data_events.unsubscribe(h) - task_worker.progress_events.unsubscribe(h) + w, d, p = hnd + task_worker.worker_events.unsubscribe(w) + task_worker.data_events.unsubscribe(d) + task_worker.progress_events.unsubscribe(p) diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index 28ac4edbd..e0420eae8 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -589,7 +589,7 @@ async def run_plan( break finally: await ws.close() - runner.run(interface.unpipe_events, h=h) + runner.run(interface.unpipe_events, hnd=h) @start_as_current_span(TRACER, "config") From 1290a86628f7e8366fe52e0cf0f52c296e87e678 Mon Sep 17 00:00:00 2001 From: Peter Holloway Date: Wed, 4 Mar 2026 16:10:43 +0000 Subject: [PATCH 10/13] Re-use run subcommand for websockets --- src/blueapi/cli/cli.py | 10 +++++++++- src/blueapi/client/client.py | 19 +++++++++++++++++-- src/blueapi/client/rest.py | 5 +++-- 3 files changed, 29 insertions(+), 5 deletions(-) diff --git a/src/blueapi/cli/cli.py b/src/blueapi/cli/cli.py index 47cca91cd..9325ed642 100644 --- a/src/blueapi/cli/cli.py +++ b/src/blueapi/cli/cli.py @@ -287,6 +287,7 @@ def on_event( @controller.command(name="run") @click.argument("name", type=str) @click.argument("parameters", type=ParametersType(), default={}, required=False) +@click.option("--ws", type=bool, is_flag=True, default=False) @click.option( "--foreground/--background", "--fg/--bg", type=bool, is_flag=True, default=True ) @@ -314,6 +315,7 @@ def run_plan( name: str, timeout: float | None, foreground: bool, + ws: bool, instrument_session: str, parameters: TaskParameters, ) -> None: @@ -335,7 +337,13 @@ def on_event(event: AnyEvent) -> None: elif isinstance(event, DataEvent): callback(event.name, event.doc) - resp = client.run_task(task, on_event=on_event) + client.add_callback(on_event) + + if ws: + resp = client.run_blocking(task) + else: + resp = client.run_task(task) + match resp.result: case TaskResult(result=None, type="NoneType"): print("Plan succeeded") diff --git a/src/blueapi/client/client.py b/src/blueapi/client/client.py index 0638f911f..8c41ced7a 100644 --- a/src/blueapi/client/client.py +++ b/src/blueapi/client/client.py @@ -440,8 +440,23 @@ def get_active_task(self) -> WorkerTask: return self.active_task @start_as_current_span(TRACER, "request") - def run_blocking(self, request: TaskRequest): - self._rest.run_blocking(request) + def run_blocking( + self, request: TaskRequest, on_event: OnAnyEvent | None = None + ) -> TaskStatus: + for event in self._rest.run_blocking(request): + if on_event is not None: + on_event(event) + for cb in self._callbacks.values(): + try: + cb(event) + except Exception as e: + log.error(f"Callback ({cb}) failed for event: {event}", exc_info=e) + if isinstance(event, WorkerEvent) and event.is_complete(): + if event.task_status is None: + raise BlueskyRemoteControlError( + "Server completed without task status" + ) + return event.task_status @start_as_current_span(TRACER, "task", "timeout") def run_task( diff --git a/src/blueapi/client/rest.py b/src/blueapi/client/rest.py index fb1a92313..c2d161118 100644 --- a/src/blueapi/client/rest.py +++ b/src/blueapi/client/rest.py @@ -11,6 +11,7 @@ from pydantic import BaseModel, TypeAdapter, ValidationError, WebsocketUrl from websockets.sync.client import connect +from blueapi.client.event_bus import AnyEvent from blueapi.config import RestConfig from blueapi.service.authentication import JWTAuth, SessionManager from blueapi.service.model import ( @@ -277,11 +278,11 @@ def _request_and_deserialize( def run_blocking(self, req: TaskRequest): url = self._ws_address().unicode_string().removesuffix("/") + "/run_plan" - print(url) with connect(url) as ws: ws.send(req.model_dump_json()) for message in ws: - print(message) + event = TypeAdapter(AnyEvent).validate_json(message) + yield event def _ws_address(self) -> WebsocketUrl: # url = WebsocketUrl.build( From dae7d69f5c440cb1e28cd1e9515f535d05f9db7b Mon Sep 17 00:00:00 2001 From: Abigail Emery Date: Wed, 4 Mar 2026 16:40:04 +0000 Subject: [PATCH 11/13] Raise for connection closing pre plan completed --- src/blueapi/client/client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/blueapi/client/client.py b/src/blueapi/client/client.py index 8c41ced7a..fd2c93f53 100644 --- a/src/blueapi/client/client.py +++ b/src/blueapi/client/client.py @@ -457,6 +457,7 @@ def run_blocking( "Server completed without task status" ) return event.task_status + raise BlueskyRemoteControlError("Connection closed before plan completed.") @start_as_current_span(TRACER, "task", "timeout") def run_task( From 5e224648bfcaf4f1235d680a2598a6bd144e771c Mon Sep 17 00:00:00 2001 From: Abigail Emery Date: Wed, 4 Mar 2026 16:41:45 +0000 Subject: [PATCH 12/13] Remove run blocking from cli --- src/blueapi/cli/cli.py | 26 -------------------------- 1 file changed, 26 deletions(-) diff --git a/src/blueapi/cli/cli.py b/src/blueapi/cli/cli.py index 9325ed642..d0f2bcf1c 100644 --- a/src/blueapi/cli/cli.py +++ b/src/blueapi/cli/cli.py @@ -370,32 +370,6 @@ def on_event(event: AnyEvent) -> None: raise ClickException(f"task could not run: {ve}") from ve -@controller.command(name="ws") -@click.pass_obj -@click.argument("name", type=str) -@click.argument("parameters", type=ParametersType(), default={}, required=False) -@click.option( - "-i", - "--instrument-session", - type=str, - help=textwrap.dedent(""" - Instrument session associated with running the plan, - used to tell blueapi where to store any data and as a security check: - the session must be valid and active and you must be a member of it."""), - required=True, -) -def run_blocking( - obj: dict, name: str, parameters: TaskParameters, instrument_session: str -): - client = cast(BlueapiClient, obj["client"]) - task_req = TaskRequest( - name=name, - params=parameters, - instrument_session=instrument_session, - ) - client.run_blocking(task_req) - - @controller.command(name="state") @click.pass_obj @check_connection From a621d23229bc19ebff192485f6a98d6ee37f10b5 Mon Sep 17 00:00:00 2001 From: Abigail Emery Date: Wed, 4 Mar 2026 17:24:11 +0000 Subject: [PATCH 13/13] Catch plan key error in run_plan --- src/blueapi/service/main.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index e0420eae8..72715ea29 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -566,6 +566,9 @@ async def run_plan( except ValidationError: await ws.close(code=1003, reason="invalid args") return + except KeyError: + await ws.close(code=1003, reason="unknown plan") + return # add listener to runner tx, rx = Pipe()