From a91a02c82084247932f0d59f8af6c2c39e511233 Mon Sep 17 00:00:00 2001 From: Christian Chwala Date: Tue, 10 Mar 2026 21:29:05 +0100 Subject: [PATCH 1/3] fix: cycle source data at native pace; add on-demand NetCDF download - _get_netcdf_index_for_timestamp: use original_duration as denominator instead of loop_duration_seconds so the source file cycles at its native 10 s resolution rather than being stretched across the archive period (eliminated flat plateaus followed by sudden jumps) - add ensure_netcdf_file() helper: downloads via temp file with progress logging; archive_generator and mno_simulator call it at startup so the 3-month file is fetched automatically when not already present - generate_archive.py: add --netcdf-file-url / NETCDF_FILE_URL support; switch defaults to 1 day / 10 s; replace per-slice isel() loop with a single contiguous slice load (slice(0, max_idx+1)) for fast bulk read --- mno_data_source_simulator/data_generator.py | 52 +++++++++++++- mno_data_source_simulator/generate_archive.py | 72 +++++++++++-------- mno_data_source_simulator/main.py | 9 ++- 3 files changed, 102 insertions(+), 31 deletions(-) diff --git a/mno_data_source_simulator/data_generator.py b/mno_data_source_simulator/data_generator.py index a816bf9..94ba44c 100644 --- a/mno_data_source_simulator/data_generator.py +++ b/mno_data_source_simulator/data_generator.py @@ -5,6 +5,8 @@ by altering timestamps and looping through the existing data. """ +import urllib.request +import urllib.error import xarray as xr import pandas as pd import numpy as np @@ -15,6 +17,49 @@ logger = logging.getLogger(__name__) +def ensure_netcdf_file(path: Path, url: str | None) -> None: + """Download the NetCDF file from *url* if *path* does not exist yet. + + Downloads via a temp file so an interrupted transfer never leaves a + truncated file behind. Does nothing if the file already exists or if no + URL is provided. + """ + if path.exists(): + logger.info(f"NetCDF file found: {path}") + return + if not url: + return # caller's existence check will log the error + logger.info(f"NetCDF file not found at {path}") + logger.info(f"Downloading from: {url}") + path.parent.mkdir(parents=True, exist_ok=True) + tmp_path = path.with_suffix(".nc.download") + try: + with urllib.request.urlopen(url) as response, open(tmp_path, "wb") as out: + total_raw = response.headers.get("Content-Length") + total = int(total_raw) if total_raw else None + downloaded = 0 + block_size = 8 * 1024 * 1024 # 8 MB chunks + while True: + block = response.read(block_size) + if not block: + break + out.write(block) + downloaded += len(block) + if total: + pct = downloaded / total * 100 + logger.info( + f" {pct:.0f}% ({downloaded / 1e6:.0f} / {total / 1e6:.0f} MB)" + ) + else: + logger.info(f" {downloaded / 1e6:.0f} MB downloaded") + tmp_path.rename(path) + logger.info(f"Download complete: {path} ({path.stat().st_size / 1e6:.1f} MB)") + except Exception as exc: + tmp_path.unlink(missing_ok=True) + logger.error(f"Download failed: {exc}") + raise + + class CMLDataGenerator: """Generate fake real-time CML data from historical NetCDF files.""" @@ -100,7 +145,12 @@ def _get_netcdf_index_for_timestamp(self, timestamp: pd.Timestamp) -> int: ).total_seconds() if original_duration > 0: - time_fraction = loop_position / self.loop_duration_seconds + # Cycle through the source data at its native pace rather than + # stretching/compressing it to fill loop_duration_seconds. This + # avoids long plateaus of identical values followed by sudden jumps + # when the archive period is much longer than the source file. + position_in_original = loop_position % original_duration + time_fraction = position_in_original / original_duration original_index = int(time_fraction * (len(self.original_time_points) - 1)) else: original_index = 0 diff --git a/mno_data_source_simulator/generate_archive.py b/mno_data_source_simulator/generate_archive.py index 26aea0d..a16f7bf 100755 --- a/mno_data_source_simulator/generate_archive.py +++ b/mno_data_source_simulator/generate_archive.py @@ -25,7 +25,7 @@ import numpy as np import pandas as pd -from data_generator import CMLDataGenerator +from data_generator import CMLDataGenerator, ensure_netcdf_file # Configure logging logging.basicConfig( @@ -35,17 +35,22 @@ logger = logging.getLogger(__name__) # Defaults (overridable via CLI args or environment variables) -DEFAULT_NETCDF_FILE = "../parser/example_data/openMRG_cmls_20150827_12hours.nc" +# Production uses the 3-month / 10-second-resolution file (downloaded at startup). +# Tests point to the small 12-hour file directly via the --netcdf-file flag. +DEFAULT_NETCDF_FILE = "../parser/example_data/openMRG_cmls_20150827_3months.nc" +DEFAULT_NETCDF_FILE_URL = "https://bwsyncandshare.kit.edu/s/jSAFftGXcJjQbSJ/download" DEFAULT_OUTPUT_DIR = "../database/archive_data" -DEFAULT_ARCHIVE_DAYS = 7 -DEFAULT_INTERVAL_SECONDS = 300 # 5-minute default; use 10 for raw real-time resolution +DEFAULT_ARCHIVE_DAYS = 1 +DEFAULT_INTERVAL_SECONDS = 10 # Output files METADATA_OUTPUT = "metadata_archive.csv" DATA_OUTPUT = "data_archive.csv" -def generate_archive_data(archive_days, output_dir, netcdf_file, interval_seconds): +def generate_archive_data( + archive_days, output_dir, netcdf_file, interval_seconds, netcdf_file_url=None +): """Generate archive metadata and time-series data.""" netcdf_path = Path(netcdf_file) @@ -56,6 +61,8 @@ def generate_archive_data(archive_days, output_dir, netcdf_file, interval_second if not output_path.is_absolute(): output_path = Path(__file__).parent / output_dir + ensure_netcdf_file(netcdf_path, netcdf_file_url) + if not netcdf_path.exists(): logger.error(f"NetCDF file not found: {netcdf_path}") sys.exit(1) @@ -106,7 +113,7 @@ def generate_archive_data(archive_days, output_dir, netcdf_file, interval_second generator.loop_start_time = archive_start_date # --- Fast numpy-cached generation --- - # Map each archive timestamp to a NetCDF index (cycles through 720 steps) + # Map each archive timestamp to a NetCDF index. all_indices = np.array( [generator._get_netcdf_index_for_timestamp(ts) for ts in timestamps] ) @@ -116,26 +123,26 @@ def generate_archive_data(archive_days, output_dir, netcdf_file, interval_second f"(of {len(generator.original_time_points)} in file)" ) - # Pre-cache RSL/TSL arrays for each unique NetCDF index (one isel call each) - logger.info(" Pre-caching NetCDF slices...") - base_df = ( - generator.dataset.isel(time=int(unique_indices[0])) - .to_dataframe() - .reset_index()[["cml_id", "sublink_id", "tsl", "rsl"]] - ) - cml_ids = base_df["cml_id"].values - sublink_ids = base_df["sublink_id"].values + # Load RSL/TSL for all needed time steps in one contiguous slice. + # unique_indices are always low-numbered (they start at 0 for any archive + # shorter than the source file), so reading slice(0, max+1) is a single + # sequential disk read — much faster than indexed/fancy access. + logger.info(" Loading RSL/TSL arrays from NetCDF (one contiguous slice)...") + max_idx = int(unique_indices.max()) + ds_slice = generator.dataset[["rsl", "tsl"]].isel(time=slice(0, max_idx + 1)) + ds_stacked = ds_slice.stack(link=("cml_id", "sublink_id")) + rsl_arr = ds_stacked["rsl"].values # shape: (max_idx+1, n_links) + tsl_arr = ds_stacked["tsl"].values + # Recover per-link identifiers from the stacked MultiIndex + link_index = ds_stacked.indexes["link"] + cml_ids = np.array([v[0] for v in link_index]) + sublink_ids = np.array([v[1] for v in link_index]) n_links = len(cml_ids) - - rsl_cache = {} - tsl_cache = {} - for idx in unique_indices: - df_slice = ( - generator.dataset.isel(time=int(idx)).to_dataframe().reset_index() - ) - rsl_cache[idx] = df_slice["rsl"].values - tsl_cache[idx] = df_slice["tsl"].values - logger.info(f" Cached {len(unique_indices)} slices, generating output...") + # For a 0-based slice the original index IS the row in rsl_arr/tsl_arr + idx_to_row = {int(idx): int(idx) for idx in unique_indices} + logger.info( + f" Loaded {max_idx + 1} time slices × {n_links} links, generating output..." + ) # Write in batches using pre-cached numpy arrays batch_size = 5000 # timestamps per batch (not rows) @@ -152,8 +159,9 @@ def generate_archive_data(archive_days, output_dir, netcdf_file, interval_second time_col = np.repeat(batch_ts.values, n_links) cml_col = np.tile(cml_ids, batch_n) sub_col = np.tile(sublink_ids, batch_n) - tsl_col = np.concatenate([tsl_cache[idx] for idx in batch_indices]) - rsl_col = np.concatenate([rsl_cache[idx] for idx in batch_indices]) + rows = [idx_to_row[int(idx)] for idx in batch_indices] + tsl_col = tsl_arr[rows, :].ravel() + rsl_col = rsl_arr[rows, :].ravel() df = pd.DataFrame( { @@ -202,7 +210,9 @@ def generate_archive_data(archive_days, output_dir, netcdf_file, interval_second parser.add_argument( "--interval-seconds", type=int, - default=int(os.getenv("ARCHIVE_INTERVAL_SECONDS", str(DEFAULT_INTERVAL_SECONDS))), + default=int( + os.getenv("ARCHIVE_INTERVAL_SECONDS", str(DEFAULT_INTERVAL_SECONDS)) + ), help=f"Time resolution in seconds between archive data points (default: {DEFAULT_INTERVAL_SECONDS}, or ARCHIVE_INTERVAL_SECONDS env var)", ) parser.add_argument( @@ -215,6 +225,11 @@ def generate_archive_data(archive_days, output_dir, netcdf_file, interval_second default=os.getenv("NETCDF_FILE", DEFAULT_NETCDF_FILE), help="Path to the NetCDF source file (default: ../parser/example_data/..., or NETCDF_FILE env var)", ) + parser.add_argument( + "--netcdf-file-url", + default=os.getenv("NETCDF_FILE_URL", DEFAULT_NETCDF_FILE_URL), + help="URL to download the NetCDF file if it is not already present (or NETCDF_FILE_URL env var)", + ) args = parser.parse_args() generate_archive_data( @@ -222,4 +237,5 @@ def generate_archive_data(archive_days, output_dir, netcdf_file, interval_second output_dir=args.output_dir, netcdf_file=args.netcdf_file, interval_seconds=args.interval_seconds, + netcdf_file_url=args.netcdf_file_url, ) diff --git a/mno_data_source_simulator/main.py b/mno_data_source_simulator/main.py index c42aa32..53c994a 100644 --- a/mno_data_source_simulator/main.py +++ b/mno_data_source_simulator/main.py @@ -12,7 +12,7 @@ from datetime import datetime, timedelta import yaml -from data_generator import CMLDataGenerator +from data_generator import CMLDataGenerator, ensure_netcdf_file from sftp_uploader import SFTPUploader # Configure logging @@ -55,9 +55,14 @@ def main(): # Load configuration config = load_config() + # Resolve NetCDF file path (env var overrides config.yml for deployment) + netcdf_file = os.getenv("NETCDF_FILE") or config["data_source"]["netcdf_file"] + netcdf_file_url = os.getenv("NETCDF_FILE_URL") + ensure_netcdf_file(Path(netcdf_file), netcdf_file_url) + # Initialize data generator generator = CMLDataGenerator( - netcdf_file=config["data_source"]["netcdf_file"], + netcdf_file=netcdf_file, loop_duration_seconds=config["data_source"]["loop_duration_seconds"], output_dir=config["generator"]["output_dir"], ) From e1e4c07a83495b13964a087bde8012d21fb75054 Mon Sep 17 00:00:00 2001 From: Christian Chwala Date: Tue, 10 Mar 2026 21:29:13 +0100 Subject: [PATCH 2/3] config: use 3-month/10s NetCDF file with auto-download in deployment - archive_generator and mno_simulator: set NETCDF_FILE to 3-month file, NETCDF_FILE_URL to the KIT download link; volumes writable so the file can be persisted to ./parser/example_data/ on the host - Archive defaults: 1 day, 10 s interval (ARCHIVE_DAYS / ARCHIVE_INTERVAL_SECONDS) - config.yml: update netcdf_file path to 3-month file --- docker-compose.yml | 11 +++++++---- mno_data_source_simulator/config.yml | 7 +++++-- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 089fd6c..e844737 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,12 +1,13 @@ services: archive_generator: build: ./mno_data_source_simulator - command: python generate_archive.py --days ${ARCHIVE_DAYS:-30} --interval-seconds ${ARCHIVE_INTERVAL_SECONDS:-300} + command: python generate_archive.py --days ${ARCHIVE_DAYS:-1} --interval-seconds ${ARCHIVE_INTERVAL_SECONDS:-10} environment: - ARCHIVE_OUTPUT_DIR=/archive_output - - NETCDF_FILE=/app/example_data/openMRG_cmls_20150827_12hours.nc + - NETCDF_FILE=/app/example_data/openMRG_cmls_20150827_3months.nc + - NETCDF_FILE_URL=https://bwsyncandshare.kit.edu/s/jSAFftGXcJjQbSJ/download volumes: - - ./parser/example_data:/app/example_data:ro + - ./parser/example_data:/app/example_data # writable so the file can be downloaded - archive_data:/archive_output mno_simulator: @@ -14,11 +15,13 @@ services: depends_on: - sftp_receiver volumes: - - ./parser/example_data:/app/example_data:ro + - ./parser/example_data:/app/example_data # writable so the file can be downloaded - mno_data_to_upload:/app/data_to_upload - mno_data_uploaded:/app/data_uploaded - ./ssh_keys:/app/ssh_keys:ro environment: + - NETCDF_FILE=/app/example_data/openMRG_cmls_20150827_3months.nc + - NETCDF_FILE_URL=https://bwsyncandshare.kit.edu/s/jSAFftGXcJjQbSJ/download - SFTP_HOST=sftp_receiver - SFTP_PORT=22 - SFTP_USERNAME=cml_user diff --git a/mno_data_source_simulator/config.yml b/mno_data_source_simulator/config.yml index f40dbe5..a17150b 100644 --- a/mno_data_source_simulator/config.yml +++ b/mno_data_source_simulator/config.yml @@ -2,8 +2,11 @@ # Data source configuration data_source: - # Path to the NetCDF file with CML data - netcdf_file: "/app/example_data/openMRG_cmls_20150827_12hours.nc" + # Path to the NetCDF file with CML data. + # The 3-month / 10-second-resolution file is used for production; it is + # downloaded automatically at startup if NETCDF_FILE_URL is set. + # Override via the NETCDF_FILE environment variable. + netcdf_file: "/app/example_data/openMRG_cmls_20150827_3months.nc" # Loop duration in seconds (1 hour = 3600 seconds) loop_duration_seconds: 3600 From e3959757d02397d0200b9bf14afede443dce4cb9 Mon Sep 17 00:00:00 2001 From: Christian Chwala Date: Tue, 10 Mar 2026 21:56:31 +0100 Subject: [PATCH 3/3] docs: update README for archive data generation and configuration details --- README.md | 73 +++++++------------ mno_data_source_simulator/README.md | 17 ++++- mno_data_source_simulator/generate_archive.py | 2 +- 3 files changed, 40 insertions(+), 52 deletions(-) diff --git a/README.md b/README.md index df54b79..06b0aa6 100644 --- a/README.md +++ b/README.md @@ -61,71 +61,50 @@ The webserver provides an intuitive interface with four main pages: ## Archive Data -The database can be initialized with archive CML data using two methods: +On `docker compose up` the `archive_generator` service automatically generates +a 1-day archive at 10-second resolution from the 3-month OpenMRG NetCDF file +and the `archive_loader` service bulk-loads it into the database. -### Method 1: CSV Files (Default, Fast) - -Pre-generated CSV files included in the repository: +**Defaults** (overridable via environment variables): - **728 CML sublinks** (364 unique CML IDs) covering Berlin area -- **~1.5M data rows** at 5-minute intervals over 7 days -- **Gzip-compressed** (~7.6 MB total, included in repo) -- **Loads in ~3 seconds** via PostgreSQL COPY - -Files are located in `/database/archive_data/` and loaded automatically on first database startup. - -### Method 2: Load from NetCDF (For Larger/Higher Resolution Archives) +- **~6.3M data rows** at 10-second intervals over 1 day +- Generates in ~15 s, loads in ~15 s -Load data directly from the full 3-month NetCDF archive with configurable time range: +**NetCDF source file** (`openMRG_cmls_20150827_3months.nc`, ~193 MB) is +gitignored. If not present in `parser/example_data/`, it is downloaded +automatically at startup via `NETCDF_FILE_URL`. -#### Default: 7 Days at 10-Second Resolution (~44M rows, ~5 minutes) +### Configuring the archive ```sh -# Rebuild parser if needed -docker compose build parser - -# Start database -docker compose up -d database - -# Load last 7 days from NetCDF -docker compose run --rm -e DB_HOST=database parser python /app/parser/parse_netcdf_archive.py +# Longer archive or different resolution via environment variables: +ARCHIVE_DAYS=7 ARCHIVE_INTERVAL_SECONDS=60 docker compose up -d ``` -#### Custom Time Range +| Variable | Default | Description | +|---|---|---| +| `ARCHIVE_DAYS` | `1` | Days of history to generate | +| `ARCHIVE_INTERVAL_SECONDS` | `10` | Time step in seconds | +| `NETCDF_FILE_URL` | KIT download link | URL to fetch the NetCDF file if absent | -Use `ARCHIVE_MAX_DAYS` to control how much data to load: +### Reloading archive data ```sh -# Load last 14 days (~88M rows, ~10 minutes) -docker compose run --rm -e DB_HOST=database -e ARCHIVE_MAX_DAYS=14 parser python /app/parser/parse_netcdf_archive.py - -# Load full 3 months (~579M rows, ~1 hour) -docker compose run --rm -e DB_HOST=database -e ARCHIVE_MAX_DAYS=0 parser python /app/parser/parse_netcdf_archive.py +docker compose down -v # Remove volumes +docker compose up -d # Regenerate and reload from scratch ``` -**Note**: Set `ARCHIVE_MAX_DAYS=0` to disable the time limit and load the entire dataset. Larger datasets require more database memory (recommend at least 4GB RAM for full 3-month archive). - -**Features**: -- Auto-downloads 3-month NetCDF file (~209 MB) on first run -- **10-second resolution** (vs 5-minute for CSV method) -- **Automatic timestamp shifting** - data ends at current time -- **Progress reporting** with batch-by-batch status (~155K rows/sec) -- PostgreSQL COPY for maximum performance -- Configurable time window to balance demo realism vs load time - -The NetCDF file is downloaded to `parser/example_data/openMRG_cmls_20150827_3months.nc` and gitignored. +### Loading a larger archive directly from NetCDF -### Managing Archive Data +For a full 3-month archive at native 10-second resolution (~579M rows): -To regenerate CSV archive data: ```sh -python mno_data_source_simulator/generate_archive.py +docker compose run --rm -e DB_HOST=database parser \ + python /app/parser/parse_netcdf_archive.py ``` -To reload archive data (either method): -```sh -docker compose down -v # Remove volumes -docker compose up -d # Restart with fresh database -``` +Use `ARCHIVE_MAX_DAYS` to limit the time window (default: 7 days, +`0` = no limit). Requires at least 4 GB RAM for the full dataset. ## Storage Backend diff --git a/mno_data_source_simulator/README.md b/mno_data_source_simulator/README.md index 851423c..facf1fb 100644 --- a/mno_data_source_simulator/README.md +++ b/mno_data_source_simulator/README.md @@ -39,21 +39,30 @@ python main.py ## Configuration -Edit `config.yml`: +Edit `config.yml` for local/standalone use, or set environment variables for Docker deployment: + ```yaml data_source: - loop_duration_seconds: 3600 # How fast to replay historical data + netcdf_file: "/app/example_data/openMRG_cmls_20150827_3months.nc" + loop_duration_seconds: 3600 # Lookback window for real-time replay generator: - generation_frequency_seconds: 60 # How often to generate files + generation_frequency_seconds: 10 # How often to generate files sftp: enabled: true - upload_frequency_seconds: 60 # How often to upload + upload_frequency_seconds: 10 # How often to upload private_key_path: "/path/to/ssh/key" # Recommended known_hosts_path: "/path/to/known_hosts" # For host verification ``` +**Key environment variables** (override config.yml in Docker): + +| Variable | Description | +|---|---| +| `NETCDF_FILE` | Path to the NetCDF source file | +| `NETCDF_FILE_URL` | URL to download the file if not already present | + ### Authentication **SSH Key Authentication (Recommended):** diff --git a/mno_data_source_simulator/generate_archive.py b/mno_data_source_simulator/generate_archive.py index a16f7bf..6192ea3 100755 --- a/mno_data_source_simulator/generate_archive.py +++ b/mno_data_source_simulator/generate_archive.py @@ -83,7 +83,7 @@ def generate_archive_data( # Initialize the data generator generator = CMLDataGenerator( netcdf_file=str(netcdf_path), - loop_duration_seconds=archive_days * 24 * 3600, # Loop over archive period + loop_duration_seconds=archive_days * 24 * 3600, # bounds the replay window ) # Generate and save metadata using existing function