Dieses Repository stellt eine Docker-basierte Umgebung bereit, in der unterschiedliche Python-Skripte Echtzeitdatenströme innerhalb eines Stadtquartiers simulieren. Die erzeugten Daten werden zunächst in Kafka verarbeitet und anschließend über HiveMQ als MQTT-Stream zur Verfügung gestellt. Zookeeper dient dabei der Koordination eines kleinen Kafka-Clusters, sodass mehrere Simulationen parallel laufen können.
Directory: ./docker_realtimesimulation
- energyconsumption_simulation – generiert stündliche Stromverbrauchsdaten der Wohngebäude
- mobility_simulation – simuliert ein- und ausfahrende Fahrzeuge an drei Messstellen
- delivery_simulation – bewegt einen Lieferbot vom Supermarkt hin zu zufällig ausgewählten Gebäuden und zurück
- *_bridge – leiten Daten aus Kafka an den MQTT‑Broker weiter
- energyconsumption_consumer – stellt einen einfachen HTTP-Endpunkt zum Mitlesen der Daten bereit
- mqtt_to_db – schreibt MQTT-Nachrichten in eine TimescaleDB (optional)
- delivery_webmap – Einfache Webkarte zur Visualisierung der Bot-Bewegungen (optional)
Die zentrale Orchestrierung erfolgt über docker-compose.yml.
Directory: ./kommonitor
Beinhaltet ein minimales Docker Compose Setup und Config-Files zum Starten von KomMonitor und Keycloak. Die KomMonitor Datenbank mit Gebäudendaten als Georessourcen und Raumeinheiten initialisiert
Directory: ./services Beinhaltet ein Docker Compose Setup für Middleware der Realtime Simulation:
- Kafka: Streaming von IoT-Daten der Realtime Simulation
- HiveMQ: Pub/Sub der IoT Daten via MQTT
- Node-RED: Plattform für ETL Flows zur Integration der IoT Datenströme in KomMonitor
- TimescaleDB: Persistierung der IoT Daten als Zeitreihendaten
Alle Anwendungen der Realtime Simulation können innerhalb des ./docker_realtimesimulation Verzeichnis gebaut werden mit:
docker compose --profile "*" buildUm den KomMonitor Web Client mit Echtzeit-Features zu bauen:
docker build -t kommonitor/web-client-ng-realtime https://github.com/KomMonitor/web-client.git#feature/real-time-dataBevor die Realtime Siulation gestartet werden kann, müssen zunächst alle Middleware-Komponenten laufen. Hierzu im Verzeichnis ./services die Komponenten per Docker Compose starten:
docker compose up Folgende Dienste stehen anschließend bereit:
- Kafka unter
localhost:9092(externer Zugriff über Port 29092) - HiveMQ unter http://localhost:8080
- Node-RED unter http://localhost:1880
- TimescaleDB unter
localhost:5434 - pgAdmin für die Timescale-Datenbank unter http://localhost:8082
KomMonitor und Keycloak werden im Verzeichnis ./kommonitor gestartet:
docker compose up Folgende Dienste sind damit erreichbar:
- KomMonitor Web Client: http://localhost:8084
- KomMonitor Data Management API: http://localhost:8085
- KomMonitor Importer API: http://localhost:8087
- Keycloak http://localhost:8080
Das KomMonitor Setup erwartet, dass Keycloak außerdem unter http://keycloak:8080 erreichbar ist. Hierzu muss ggf. die Host-Datei des Betriebssystems angepasst werden, indem folgende Zeile ergänzt wird:
127.0.0.1 keycloak
Einzelne Komponenten der Realtime Simulation können per Docker Compose im Verzeichnis ./docker_realtimesimulation gestartet werden:
docker compose --profile <profile_name> up Folgende Profile werden unterstützt, um nur bestimmte Komponenten für verschiedene Use Cases zu starten:
- energyconsumption: Generierung von Stromverbrauchsdaten der Wohngebäude
- delivery: Lieferbot eines Supermarkts
- mobility: Fahrzeugsimulation
- storage: Bridge von MQTT nach TimescaleDB
Anschließend stehen folgende Dienste bereit:
- Stromdaten-Stream via HTTP unter http://localhost:5000/stream
- Delivery-Webkarte unter http://localhost:8081
Über eine Bridge werden die in Kafka verarbeiteten Datenströme an HiveMQ weitergeleitet. Relevante Parameter:
- Broker:
hivemq - Port:
1883 - Topics:
node-red/stromdaten,node-red/deliverydata,node-red/mobilitydata - Format: JSON
Die optional aktivierbare Komponente mqtt_to_db kann sämtliche MQTT-Nachrichten zusätzlich in einer TimescaleDB
persistieren. Die Zugangsdaten lassen sich über Umgebungsvariablen konfigurieren (siehe docker-compose.yml).
Die Node-RED Flows sind standardmäßig für das bereitgestellte Docker Setup konfiguriert. Generell lassen sich die Flows jedoch auch an andere Umgebunden anpassen.
Zu diesem Zwecke müssen folgende Environment Variablen innerhalb der Node-RED Instanz unter Einstellungen -> Environment gesetzt werden:
| Env Variable | Description | Example |
|---|---|---|
KEYCLOAK_URL |
Root URL zu Keycloak. | http://keycloak:8080 |
REALM_NAME |
Keycloak Realm Name. | kommonitor |
KEYCLOAK_ACCESS_TOKEN_URL |
URL zum Keycloak Token Endpunkt | http://keycloak:8080/realms/kommonitor/protocol/openid-connect/token |
KOMMONITOR_DATA_MANAGEMENT_URL |
Root URL zur KomMonitor Data Management API | http://kommonitor-data-management:8085 |
KOMMONITOR_TIMESERIES_API_URL |
Root URL zur KomMonitor Timeseries API | http://kommonitor-timeseries-management:8086 |
Der Node-RED Flow 01 - Spatio-temporal Aggregation benötigt außerdem Informationen darüber, für welche Indikatoren und Raumebenen die aggregierte Zeitreihendaten hinzugefügt werden sollen. Die Informationen werden in der Config-Datei unter ./services/nodered/data/config.json bereitgestellt. Diese Datei beinhaltet folgende Parameter:
indicatorMapping:
Liste mit Mappings zwischen aggregierten Parametern im Node-RED Flow (parameter) und
der Indikator ID in KomMonitor (indicator). Metdaten zu allen hier aufgeführten Indikatoren müssen vorab in
KomMonitor angelegt worden sein.
spatialUnits:
Liste mit Raumebenen, für die eine Aggregation erfolgen soll. id und name entsprechen den Metadateninformationen
zu den Raumebenen in KomMonitor.
TBD
Der Node-RED Flow 01 - Spatio-temporal Aggregation empfängt Stramoverbrauchsdaten von einem MQTT-Stream und aggregiert diese zu Tageswerten. Der Flow ist in mehrere Sub-Flows unterteilt, wobei einige der Sub-Flows Kontextinformationen für den Hauptflow (04 - Aggregate Energy Consumption) setzen. Jeder Flow startet mit einem Inject-Knoten, der auch manuell getriggert werde kann. Zur Nutzung des Flows sollte wie folgt vorgegangen werden:
Um die KomMonitor Config Datei ./services/nodered/data/config.json auszulesen und Mapping Informationen zu setzen, kann der Inject Mapping File Knoten des 01 - Read KomMonitor Config Flows ausgeführt werden.
Um Daten aus der KomMonitor Data Management API auszulesen und die aggregierten Indikator-Zeitreihen zu speichern, ist ein Keycloak Token notwendig. Dieser kann über den Subflow 02 - Keycloak Authentication abgerufen und als Kontextinformation gesetzt werden. Der Knoten KomMonitor Auth enthält wichtige Verbindungsparameter für Keycloak und muss ggf. angepasst werden.
Der Subflow 03 - Request and Store Spatial Units ruft die in der KomMonitor Config Datei ./services/nodered/data/config.json definiereten Raumebenen ab. Diese werden im Hauptflow für die Durchführung der räumlichen Aggregation verwendet.
Die gesamte Anwendungslogik steckt im Hauptflow 04 - Aggregate Energy Consumption. Dieser geht wie folgt vor:
-
Ein MQTT-Inject Knoten empfängt Nachrichten zu Stromverbrauchstdaten, die über den HiveMQ Broker unter dem Topic node-red/stromdaten veröffentlicht werden.
-
Die Knoten Reset Timer, Fire Timer Finished und Wait for Timer Finished sorgen dafür, dass alle MQTT-Nachrichten für eine bestimmte Zeit gesammelt werden, bevor die raum-zeitliche Aggregation erfolgt. Über den Reset Timer Knoten, kann dieses Zeitintervall gesetz werdenb. Wenn etwa eine Aggregation zu Tageswerten gewünscht ist, müsste die Wiederholungsrate auf einen festen täglichen Zeitpunkt z.B.
0:01(kleiner zeitlicher Lag, um sicherzustellen, dass verspätete Nachrichten des vergangenen Tages mit berücksichtigt werden). Für den simulierten Use Case sollte ein geringeres Zeitintervall gesetzt werden, das auch zum Simulationsintervall passt. -
Die raum-zeitliche Aggregation kann auch ohne vorherigen MQTT-Input angestoßen werden. Hierzu lassen sich über den Inject Test Data Knoten einige Testdaten manuell injecten.
-
Im Knoten Daily Aggregation erfolgt eine zeitliche Aggregation der Stromverbrauchsdaten unter Berücksichtigung aller bis zum Trigger-Zeitpunkt eingelaufenen MQTT-Nachrichten pro Wohngebäude.
-
Der Knoten Loop for Spatial Units sorgt dafür, dass für jede in der KomMonitor Config Datei hinterlegte Raumebne die räumliche Aggregation durchgeführt wird.
-
Im Knoten Spatial Aggregation erfolgt die räumliche Aggregation der zuvor zeitliche aggregierten Stromverbrauchsdaten von Gebäuden. Hierzu wird für jedes Raumeinheiten-Feature einer Raumebene ermittelt, welche Gebäude innerhalb des Feature-Polygons liegen, und über alle zeitliche aggregierten Stromverbrauchsdaten des Gebäudes eine räumliche Aggregation durchgeführt. Es werden vier verschiedene aggregierte Metriken ermittelt: Durchschnittlicher stündlicher Energieverbrauch, Durchschnittlicher täglicher Energieverbrauch Gebäude, Gesamter täglicher Energieverbrauch Gebäude und Kumulierter durchschnittlicher stündlicher Energieverbrauch Gebäude.
-
Der Knoten Create Multiple Indicator Update Requests überführt die aggregierten Zeitreihendaten in das von der KomMonitor Data Management API erwartete Datenschema und stellt für jede der aggregierten Metriken einen entsprechenden API Request zum Update der Zeitreihen des entsprechenden Indikators zusammen.
-
Im Knoten Update Indicator wird der API Request zum Update der Indikatorzeitreihen durchgeführt.
Der Node-RED Flow 02 - Timeseries Harvesting empfängt Messwerte von Klimamessstationen und speichert die Rohwerte über eine Timeseries API in einer Timeseries Datenbank:
-
Ein MQTT-Inject Knoten verbindet sich mit einem externen MQTT-Stream und empfängt regelmäßig Nachrichten mit Klimamesswerten. Die Verbindungsparameter zum MQTT-Broker müssen für die eigene Umgebung angepasst werden.
-
Alternativ lassen sich einige exemplarische Messwerte über den Inject Test Data Knoten injecten.
-
Der Create Add Timeseries Request liest die relevanten Parameter aus den empfangenen MQTT-Nachrichten aus und bereitet damit einen Request gegen die Timeseries API vor, um Messdaten der Stationen zu einzelnen Zeitpunkten zu ergänzen. Ein solcher Request wird gegen den Endpunkt
/timeseries-management/timeseries/{stationId}abgesetzt und besitzt folgendes Payload-Schema:
{
"parameter_name": "Temperatur",
"timeseries": [
{
"value": 12.1,
"timestamp": "2026-03-24T10:39:22.336Z"
}
]
}Wichtig ist, dass sich Werte sowohl für {stationId} aus dem API Endpunkt sowie für den Parameter parameter_name
im Request Payload in den MQTT-Nachrichten vorhanden sind und ausgelesen werden können. Über diese beiden Werte
erfolgt in der Timeseries API nämlich die Zuordnung der Messwerte zu einem bestimmten Messparameter einer bestimmten Station.
- Der Knoten Update Timeseries wird schließlich der API Request zum Fortführen der Messzeitreihe in der Timeseries API abgesetzt.