Qt-native pub/sub runtime for connecting GUIs to ezmsg graphs.
ezmsg-qt lets Qt widgets publish to and subscribe from ezmsg topics using a
familiar signal/slot style while keeping ezmsg runtime ownership explicit.
EzSessionowns the ezmsg runtime thread,GraphContext, and sidecar pipelinesEzSubscriberandEzPublisherattach to a sessionProcessorChaincompiles processing stages into a sidecar runtime owned by the session
from enum import Enum, auto
from qtpy import QtWidgets
from ezmsg.qt import EzPublisher, EzSession, EzSubscriber
class VelocityTopic(Enum):
INPUT_SETTINGS = auto()
OUTPUT_DATA = auto()
class VelocityWidget(QtWidgets.QWidget):
def __init__(self, session: EzSession):
super().__init__()
self.slider = QtWidgets.QSlider()
self.label = QtWidgets.QLabel("Waiting for data...")
self.data_sub = EzSubscriber(
VelocityTopic.OUTPUT_DATA,
parent=self,
session=session,
)
self.settings_pub = EzPublisher(
VelocityTopic.INPUT_SETTINGS,
parent=self,
session=session,
)
self.data_sub.connect(self.on_data)
self.slider.valueChanged.connect(self.on_slider)
def on_data(self, msg):
self.label.setText(f"Received: {msg}")
def on_slider(self, value):
self.settings_pub.emit({"gain": value})
def main() -> None:
app = QtWidgets.QApplication([])
session = EzSession()
window = VelocityWidget(session)
window.show()
with session:
app.exec()EzSubscriber can retarget to a new topic at runtime without reconnecting Qt slots.
class TopicSwitcher(QtWidgets.QWidget):
def __init__(self, session: EzSession):
super().__init__()
self.sub = EzSubscriber(MyTopic.A, parent=self, session=session)
self.sub.connect(self.on_data)
def switch_to_b(self) -> None:
self.sub.set_topic(MyTopic.B)
def pause_updates(self) -> None:
self.sub.clear_topic()set_topic(...)blocks until the running session applies the switchclear_topic()disconnects the subscriber from all topicstopicreflects the currently active topic- topic switching requires the subscriber to be attached to a running session
- signals available for UI state:
switch_started,topic_changed,topic_cleared,switch_failed
Use ProcessorChain when widget-facing data needs ezmsg processing off the UI thread.
from ezmsg.qt import EzSession, ProcessorChain
session = EzSession()
chain = (
ProcessorChain(MyTopic.RAW, parent=widget)
.parallel(LowPassFilter)
.local(ThresholdDetector)
.connect(widget.on_processed)
.attach(session)
).parallel(...)runs a stage group in its own sidecar process.local(...)runs a stage group in the shared sidecar process- neither mode runs on the Qt UI thread
Point a session at an existing GraphRunner or GraphServer by providing
graph_address.
runner.start()
session = EzSession(graph_address=runner.graph_address)
try:
with session:
app.exec()
finally:
if runner.running:
runner.stop()session = EzSession(graph_address=None)
with session:
app.exec()graph_address: Optional GraphServer addressattach(obj): Attach anEzSubscriber,EzPublisher, orProcessorChaindetach(obj): Detach anEzSubscriberorEzPublisherrunning: Whether the session runtime is active
EzSubscriber(
topic: Enum | str | None = None,
parent: QObject = None,
*,
session: EzSession | None = None,
leaky: bool = False,
max_queue: int | None = None,
throttle_hz: float | None = None,
)session: The session that owns this subscriberconnect(slot): Connect a handler to receive messagesset_topic(topic): Switch to a new topic on a running sessionclear_topic(): Unsubscribe from the current topic on a running sessionreceived: Qt signal emitted when a message arrivesswitch_started,topic_changed,topic_cleared,switch_failed: lifecycle signals for runtime switching
EzPublisher(
topic: Enum | str | None = None,
parent: QObject = None,
*,
session: EzSession | None = None,
queue_policy: QueuePolicy = "unbounded",
max_pending: int | None = None,
)session: The session that owns this publisheremit(message): Send a message to the topicset_topic(topic): Switch to a new topic on a running sessionclear_topic(): Unpublish from the current topic on a running sessionswitch_started,topic_changed,topic_cleared,switch_failed: lifecycle signals for runtime switching
Install from PyPI:
pip install ezmsg-qtYou also need a Qt binding — pick one:
pip install ezmsg-qt[pyqt6] # or
pip install ezmsg-qt[pyside6]Or install the latest development version:
pip install git+https://github.com/ezmsg-org/ezmsg-qt@mainWe use uv for development.
- Install
uvif not already installed. - Fork this repository and clone your fork locally.
- Open a terminal and
cdto the cloned folder. - Run
uv sync --extra pyside6to create a.venvand install dependencies. - (Optional) Install pre-commit hooks:
uv run pre-commit install - After making changes, run the test suite:
uv run pytest tests
MIT License - see LICENSE for details.