A Python event bus library based on the CloudEvents specification, supporting in-process and distributed event handling.
- CloudEvents Specification: Full implementation of CloudEvents v1.0 specification
- Dual-Mode Event Handling:
- PROCESS scope: Fast in-process event handling
- APP scope: Distributed event distribution across instances
- FastStream Compatible: Provides memory broker compatible with FastStream API
- RPC Support: Built-in request-response pattern
- FastAPI Integration: Seamless integration with FastAPI applications
- Decorator Pattern: Clean event handler registration
- Statistics and Introspection: Built-in event statistics and handler query functionality
# Install with uv (recommended)
uv pip install -e .
# Install with optional dependencies for examples
uv pip install -e ".[examples]"
# Install with Redis support for distributed events
uv pip install -e ".[redis]"
# Install with development dependencies (pytest, pytest-asyncio)
uv pip install -e ".[dev]"
# Or install core dependencies only
pip install pydantic faststreamimport asyncio
from opensecflow.eventbus.memory_broker import AsyncQueueBroker
from opensecflow.eventbus.eventbus import EventBus
from opensecflow.eventbus.event import ScopedEvent, EventScope
# Create brokers (both using AsyncQueueBroker for simplicity)
process_broker = AsyncQueueBroker()
app_broker = AsyncQueueBroker()
# Create EventBus
bus = EventBus(process_broker, app_broker)
# Define event class
class OrderCreatedEvent(ScopedEvent):
type: str = "order.created"
order_id: str
amount: float
scope: EventScope = EventScope.PROCESS
# Define handler function
async def handle_order(event_data: dict):
print(f"Processing order: {event_data['order_id']}")
# Subscribe to event
bus.subscribe("order.created", handle_order)
# Start EventBus
await bus.start()
# Publish event
event = OrderCreatedEvent(
source="order-service",
order_id="ORD-001",
amount=99.9
)
await bus.publish(event)
# Stop EventBus
await bus.stop()from contextlib import asynccontextmanager
from fastapi import FastAPI
from opensecflow.eventbus.memory_broker import AsyncQueueBroker
from opensecflow.eventbus.eventbus import init_eventbus, event_handler
from opensecflow.eventbus.event import ScopedEvent, EventScope
# Define event
class OrderCreatedEvent(ScopedEvent):
type: str = "order.created"
order_id: str
scope: EventScope = EventScope.PROCESS
# Register handler using decorator with automatic type conversion
@event_handler(OrderCreatedEvent)
async def handle_order(event: OrderCreatedEvent):
# event is automatically converted to OrderCreatedEvent instance
# Full type safety and IDE autocomplete available
print(f"Processing order: {event.order_id}")
# FastAPI lifecycle management
@asynccontextmanager
async def lifespan(app: FastAPI):
process_broker = AsyncQueueBroker()
app_broker = AsyncQueueBroker()
event_bus = init_eventbus(process_broker, app_broker)
app.state.event_bus = event_bus
await event_bus.start()
yield
await event_bus.stop()
app = FastAPI(lifespan=lifespan)
@app.post("/orders")
async def create_order(order_id: str, amount: float):
event = OrderCreatedEvent(
source="api",
order_id=order_id,
amount=amount
)
await app.state.event_bus.publish(event)
return {"status": "created"}For distributed event handling across multiple instances, use FastStream brokers:
from opensecflow.eventbus.memory_broker import AsyncQueueBroker
from faststream.redis import RedisBroker
from opensecflow.eventbus.eventbus import EventBus
# Process broker: in-memory for fast local events
process_broker = AsyncQueueBroker()
# App broker: Redis for distributed events across instances
app_broker = RedisBroker("redis://localhost:6379")
bus = EventBus(process_broker, app_broker)Note: Redis support requires the [redis] extra: pip install -e ".[redis]". For single-instance applications, you can use AsyncQueueBroker for both brokers.
Event class based on CloudEvents v1.0 specification.
Required Attributes:
id: Event unique identifier (auto-generated UUID)source: Event source identifier (URI-reference format)specversion: CloudEvents specification version (default "1.0")type: Event type identifier
Optional Attributes:
datacontenttype: Data content type (default "application/json")dataschema: Data schema URIsubject: Event subjecttime: Event timestamp (auto-generated)data: Event payload dataextensions: Extension attributes dictionary
Extends CloudEvent with event scope functionality.
Additional Attributes:
scope: Event scope (EventScope.PROCESS or EventScope.APP)
EventScope Explanation:
PROCESS: In-process events, handled only in current instance, uses memory queue, fast responseAPP: Application-level events, distributed to all instances via broker (e.g., Redis)
FastStream API-compatible memory broker, suitable for:
- Development and testing environments
- Single-process applications
- Scenarios without Redis requirement
Main Methods:
subscriber(channel): Create subscriber decoratorpublisher(channel): Create publisher decoratorpublish(message, channel): Publish messagerequest(message, channel, timeout): RPC requeststart()/stop(): Start/stop brokerget_stats(): Get statisticsget_subscribers(): Get subscriber information
Event bus core class, manages event subscription and publishing.
Main Methods:
subscribe(event_type, handler): Register event handlerpublish(event): Publish eventstart()/stop(): Start/stop event busget_handlers(): Get all registered handlers
Decorators:
@event_handler(EventClass): Auto-register event handler
from eventbus.memory_broker import AsyncQueueBroker
broker = AsyncQueueBroker()
@broker.subscriber("events.user.created")
async def handle_user(data: dict):
print(f"New user: {data['username']}")
await broker.start()
await broker.publish({"username": "alice"}, channel="events.user.created")
await broker.stop()@broker.subscriber("events.order.created")
async def send_email(data: dict):
print(f"Sending email: Order {data['order_id']}")
@broker.subscriber("events.order.created")
async def update_inventory(data: dict):
print(f"Updating inventory: Order {data['order_id']}")
await broker.publish({"order_id": "123"}, channel="events.order.created")@broker.publisher("events.result")
async def process_data(data: dict) -> dict:
# Return value automatically published to events.result
return {"status": "processed", "data": data}
@broker.subscriber("events.result")
async def handle_result(data: dict):
print(f"Result: {data}")
await process_data({"value": 42})@broker.subscriber("rpc.calculate")
async def calculate(data: dict) -> dict:
result = data["a"] + data["b"]
return {"result": result}
# Send request and wait for response
response = await broker.request(
{"a": 10, "b": 20},
channel="rpc.calculate",
timeout=1.0
)
print(response) # {"result": 30}async with AsyncQueueBroker() as broker:
@broker.subscriber("events.test")
async def handler(data: dict):
print(data)
await broker.publish({"msg": "hello"}, channel="events.test")
await asyncio.sleep(0.1)# In-process event (fast)
process_event = ScopedEvent(
type="cache.cleared",
source="cache-service",
scope=EventScope.PROCESS,
data={"cache_key": "user_123"}
)
await bus.publish(process_event)
# Application-level event (distributed)
app_event = ScopedEvent(
type="user.registered",
source="auth-service",
scope=EventScope.APP,
data={"user_id": "456"}
)
await bus.publish(app_event)stats = broker.get_stats()
print(f"Published: {stats['published']}")
print(f"Consumed: {stats['consumed']}")
print(f"Errors: {stats['errors']}")
print(f"Channels: {stats['channels']}")
print(f"Subscribers: {stats['subscribers']}")handlers = bus.get_handlers()
for event_type, handler_list in handlers.items():
print(f"{event_type}:")
for h in handler_list:
print(f" - {h['function_name']} ({h['module']})")The examples directory contains complete example code:
- 01_basic_pubsub.py - Basic publish/subscribe
- 02_multiple_subscribers.py - Multiple subscribers
- 03_publisher_decorator.py - Publisher decorator
- 04_context_manager.py - Context manager
- 05_statistics.py - Statistics functionality
- 06_rpc_pattern.py - RPC pattern
- 07_fastapi_integration.py - FastAPI integration
- 08_eventbus_basic.py - EventBus basic usage
- 09_eventbus_decorator.py - Decorator pattern
- 10_eventbus_scopes.py - Event scopes
- 11_eventbus_context_manager.py - Context manager
- 12_eventbus_multiple_handlers.py - Multiple handlers
- 13_eventbus_introspection.py - Handler introspection
- 14_eventbus_custom_logger.py - Custom logger
- 15_fastapi_eventbus_integration.py - FastAPI complete integration
- 16_event_handler_auto_conversion.py - Automatic dict-to-object conversion
- 17_event_handler_type_safety.py - Type safety demonstration
Running examples:
# AsyncQueueBroker examples
python examples/01_basic_pubsub.py
# EventBus examples
python examples/08_eventbus_basic.py
# FastAPI integration (requires Redis)
python examples/15_fastapi_eventbus_integration.pyclass CloudEvent(BaseModel):
id: str # Auto-generated
source: str # Required
specversion: str = "1.0" # Default value
type: str # Required
datacontenttype: Optional[str] # Default "application/json"
dataschema: Optional[str]
subject: Optional[str]
time: Optional[datetime] # Auto-generated
data: Optional[Dict[str, Any]]
extensions: Dict[str, Any] # Extension attributes
def to_dict() -> Dict[str, Any]
def to_json() -> str
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "CloudEvent"class ScopedEvent(CloudEvent):
scope: EventScope = EventScope.APP # Event scope
@property
def event_id(self) -> str # Compatibility property
@property
def event_type(self) -> str # Compatibility property
@property
def timestamp(self) -> datetime # Compatibility propertyclass AsyncQueueBroker:
def __init__(self, url: str = "", *, max_queue_size: int = 1000)
async def start() -> None
async def stop() -> None
async def connect() -> None
async def ping(timeout: Optional[float] = None) -> bool
def subscriber(self, channel: str, **kwargs) -> InMemorySubscriber
def publisher(self, channel: str, **kwargs) -> InMemoryPublisher
async def publish(
self,
message: Any = None,
channel: Optional[str] = None,
*,
headers: Optional[Dict[str, Any]] = None,
correlation_id: Optional[str] = None,
reply_to: str = "",
**kwargs
) -> int
async def request(
self,
message: Any = None,
channel: Optional[str] = None,
*,
timeout: float = 0.5,
**kwargs
) -> Any
def get_stats() -> Dict[str, Any]
def get_subscribers() -> Dict[str, List[Dict[str, str]]]class EventBus:
def __init__(
self,
process_level_broker: Any,
app_level_broker: Any,
logger: Optional[logging.Logger] = None
)
async def start() -> None
async def stop() -> None
def subscribe(self, event_type: str, handler: Callable) -> None
async def publish(self, event: ScopedEvent) -> None
def get_handlers() -> Dict[str, List[Dict[str, str]]]
# Decorator (automatically converts dict to typed event objects)
def event_handler(event_class: Type[ScopedEvent]) -> Callable
# Initialization function
def init_eventbus(
process_level_broker: Any,
app_level_broker: Any,
logger: Optional[logging.Logger] = None
) -> EventBus-
PROCESS Scope: Suitable for scenarios without cross-instance communication
- Cache updates
- Local state changes
- Fast-response internal events
-
APP Scope: Suitable for scenarios requiring distributed processing
- User registration/login
- Order creation
- Notification sending
- Business requiring multi-instance coordination
Use reverse domain name style:
# Good naming
"com.example.user.created"
"com.example.order.payment.completed"
"com.example.notification.email.sent"
# Avoid
"user_created"
"ORDER_CREATED"
"notification"Keep event data concise, include only necessary information:
# Good practice
class UserCreatedEvent(ScopedEvent):
type: str = "user.created"
user_id: str
email: str
created_at: datetime
# Avoid including large amounts of data in events
# If detailed information is needed, query in the handlerExceptions in event handlers are caught and logged, won't affect other handlers:
@event_handler(OrderCreatedEvent)
async def handle_order(event_data: dict):
try:
# Business logic
await process_order(event_data)
except Exception as e:
logger.error(f"Failed to process order: {e}")
# Can publish error event or perform compensationThe project includes a comprehensive test suite using pytest with async support:
# Install dev dependencies
pip install -e ".[dev]"
# Run all tests
pytest tests/
# Run with verbose output
pytest tests/ -v
# Run specific test file
pytest tests/test_event_handler_conversion.py -vExample unit test using AsyncQueueBroker:
import pytest
from opensecflow.eventbus.memory_broker import AsyncQueueBroker
@pytest.mark.asyncio
async def test_event_handling():
broker = AsyncQueueBroker()
received = []
@broker.subscriber("test.event")
async def handler(data: dict):
received.append(data)
await broker.start()
await broker.publish({"value": 42}, channel="test.event")
await asyncio.sleep(0.1)
await broker.stop()
assert len(received) == 1
assert received[0]["value"] == 42EventBus can integrate with various FastStream brokers:
from faststream.redis import RedisBroker
from opensecflow.eventbus.memory_broker import AsyncQueueBroker
from opensecflow.eventbus.eventbus import EventBus
process_broker = AsyncQueueBroker()
app_broker = RedisBroker("redis://localhost:6379")
event_bus = EventBus(process_broker, app_broker)from faststream.kafka import KafkaBroker
from opensecflow.eventbus.memory_broker import AsyncQueueBroker
from opensecflow.eventbus.eventbus import EventBus
process_broker = AsyncQueueBroker()
app_broker = KafkaBroker("localhost:9092")
event_bus = EventBus(process_broker, app_broker)from faststream.rabbit import RabbitBroker
from opensecflow.eventbus.memory_broker import AsyncQueueBroker
from opensecflow.eventbus.eventbus import EventBus
process_broker = AsyncQueueBroker()
app_broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
event_bus = EventBus(process_broker, app_broker)- AsyncQueueBroker: Memory queue, highest performance, suitable for single process
- In-process events (PROCESS): Direct invocation, lowest latency
- Application-level events (APP): Distributed via broker, has network latency
- Queue size: Default 1000, adjustable via
max_queue_size - Concurrent processing: Each channel has independent consumer task
- Check if broker is started:
await broker.start() - Check if channel names match
- Check if handler is correctly registered
- Review log output
- Increase timeout parameter
- Check if handler returns value
- Confirm handler doesn't throw exceptions
- Reduce
max_queue_size - Check for message backlog
- Optimize handler performance
Apache License 2.0
Issues and Pull Requests are welcome!