diff --git a/backend/crates/atlas-server/src/indexer/da_worker.rs b/backend/crates/atlas-server/src/indexer/da_worker.rs index 17b1fcf..97c40cf 100644 --- a/backend/crates/atlas-server/src/indexer/da_worker.rs +++ b/backend/crates/atlas-server/src/indexer/da_worker.rs @@ -3,29 +3,16 @@ //! This worker queries ev-node's Connect RPC service to determine at which Celestia //! height each block's header and data were submitted. //! -//! ## Two-phase design +//! ## Design //! -//! The worker runs in a loop with a fixed RPC budget per cycle (BATCH_SIZE): +//! Each cycle the worker fetches the BATCH_SIZE highest-numbered blocks that still +//! need DA info — either because they have no entry in `block_da_status` yet, or +//! because a previous check returned 0 heights (not yet included on Celestia). +//! Both cases are handled by a single unified query ordered by block number DESC, +//! so fresh blocks always get priority over the historical backfill. //! -//! 1. **Backfill** — Discovers blocks in the `blocks` table that are missing from -//! `block_da_status`. Queries ev-node for each and INSERTs the result. -//! **Always inserts a row, even when DA heights are 0** (block not yet included -//! on Celestia). This marks the block as "checked" so the backfill phase won't -//! re-query it on the next cycle. Processes newest blocks first so the UI shows -//! current data immediately. -//! -//! 2. **Update pending** — Finds rows where `header_da_height = 0 OR data_da_height = 0` -//! and re-queries ev-node. Updates with new values when the block has been included. -//! Processes newest pending blocks first (most relevant to UI users). -//! -//! Both phases share the same per-cycle RPC budget. Backfill runs first and takes -//! what it needs; pending gets the remainder. This ensures new blocks are checked -//! promptly while pending blocks still make progress every cycle. -//! -//! A block flows: backfill (phase 1) → update-pending (phase 2) → done. -//! -//! After each batch, the worker sends updated block numbers through an in-process -//! broadcast channel so the SSE handler can push live DA status changes to clients. +//! After each batch, updated block numbers are sent through an in-process broadcast +//! channel so the SSE handler can push live DA status changes to clients. use anyhow::Result; use futures::stream::{self, StreamExt}; @@ -38,35 +25,32 @@ use tokio::sync::broadcast; use super::evnode::EvnodeClient; -/// Total RPC budget per cycle, split between backfill and pending. +/// Maximum blocks processed per cycle. const BATCH_SIZE: i64 = 100; -/// Sleep when idle (no work in either phase). +/// Sleep when there is no work to do. const IDLE_SLEEP: Duration = Duration::from_millis(500); -const SELECT_MISSING_BLOCKS_SQL: &str = "SELECT b.number FROM blocks b - LEFT JOIN block_da_status d ON d.block_number = b.number - WHERE d.block_number IS NULL - ORDER BY b.number DESC - LIMIT $1"; - -const INSERT_DA_STATUS_SQL: &str = - "INSERT INTO block_da_status (block_number, header_da_height, data_da_height) - VALUES ($1, $2, $3) - ON CONFLICT (block_number) DO UPDATE SET - header_da_height = EXCLUDED.header_da_height, - data_da_height = EXCLUDED.data_da_height, - updated_at = NOW()"; - -const SELECT_PENDING_BLOCKS_SQL: &str = "SELECT block_number FROM block_da_status - WHERE header_da_height = 0 OR data_da_height = 0 - ORDER BY block_number DESC - LIMIT $1"; - -const UPDATE_PENDING_DA_STATUS_SQL: &str = "UPDATE block_da_status - SET header_da_height = $2, data_da_height = $3, updated_at = NOW() - WHERE block_number = $1 - AND (header_da_height, data_da_height) IS DISTINCT FROM ($2, $3)"; +/// Unified query: blocks missing from block_da_status OR still at 0 heights, +/// newest first so fresh blocks are always prioritized over historical backfill. +const SELECT_NEEDS_DA_SQL: &str = " + SELECT b.number FROM blocks b + LEFT JOIN block_da_status d ON d.block_number = b.number + WHERE d.block_number IS NULL + OR d.header_da_height = 0 + OR d.data_da_height = 0 + ORDER BY b.number DESC + LIMIT $1"; + +const UPSERT_DA_STATUS_SQL: &str = " + INSERT INTO block_da_status (block_number, header_da_height, data_da_height) + VALUES ($1, $2, $3) + ON CONFLICT (block_number) DO UPDATE SET + header_da_height = EXCLUDED.header_da_height, + data_da_height = EXCLUDED.data_da_height, + updated_at = NOW() + WHERE (block_da_status.header_da_height, block_da_status.data_da_height) + IS DISTINCT FROM (EXCLUDED.header_da_height, EXCLUDED.data_da_height)"; #[derive(Clone, Debug)] pub struct DaSseUpdate { @@ -120,24 +104,9 @@ impl DaWorker { ); loop { - // Phase 1: backfill gets first pick of the budget - let backfilled = self.backfill_new_blocks(BATCH_SIZE).await?; - - // Phase 2: pending gets whatever budget remains - let remaining = BATCH_SIZE - backfilled as i64; - let updated = if remaining > 0 { - self.update_pending_blocks(remaining).await? - } else { - 0 - }; - - let did_work = backfilled > 0 || updated > 0; - if did_work { - tracing::info!( - "DA worker cycle: backfilled {}, updated {} pending", - backfilled, - updated - ); + let processed = self.process_blocks(BATCH_SIZE).await?; + if processed > 0 { + tracing::info!("DA worker cycle: processed {}", processed); } else { tokio::time::sleep(IDLE_SLEEP).await; } @@ -152,76 +121,15 @@ impl DaWorker { let _ = self.da_events_tx.send(updates.to_vec()); } - /// Phase 1: Find blocks missing from block_da_status and query ev-node. - /// Returns the number of blocks processed. - async fn backfill_new_blocks(&self, limit: i64) -> Result { - let missing: Vec<(i64,)> = sqlx::query_as(SELECT_MISSING_BLOCKS_SQL) - .bind(limit) - .fetch_all(&self.pool) - .await?; - - if missing.is_empty() { - return Ok(0); - } - - let pool = &self.pool; - let client = &self.client; - let rate_limiter = &self.rate_limiter; - - let results: Vec> = stream::iter(missing) - .map(|(block_number,)| async move { - rate_limiter.until_ready().await; - match client.get_da_status(block_number as u64).await { - Ok((header_da, data_da)) => { - if let Err(e) = sqlx::query(INSERT_DA_STATUS_SQL) - .bind(block_number) - .bind(header_da as i64) - .bind(data_da as i64) - .execute(pool) - .await - { - tracing::warn!( - "Failed to insert DA status for block {}: {}", - block_number, - e - ); - return None; - } - Some(DaSseUpdate { - block_number, - header_da_height: header_da as i64, - data_da_height: data_da as i64, - }) - } - Err(e) => { - tracing::warn!( - "Failed to fetch DA status for block {}: {}", - block_number, - e - ); - None - } - } - }) - .buffer_unordered(self.concurrency) - .collect() - .await; - - let updates: Vec = results.into_iter().flatten().collect(); - self.notify_da_updates(&updates); - - Ok(updates.len()) - } - - /// Phase 2: Re-check blocks where DA heights are still 0. - /// Returns the number of blocks processed. - async fn update_pending_blocks(&self, limit: i64) -> Result { - let pending: Vec<(i64,)> = sqlx::query_as(SELECT_PENDING_BLOCKS_SQL) + /// Fetch DA status for the highest-numbered blocks that still need it. + /// Returns the number of blocks where DA heights actually changed. + async fn process_blocks(&self, limit: i64) -> Result { + let blocks: Vec<(i64,)> = sqlx::query_as(SELECT_NEEDS_DA_SQL) .bind(limit) .fetch_all(&self.pool) .await?; - if pending.is_empty() { + if blocks.is_empty() { return Ok(0); } @@ -229,12 +137,12 @@ impl DaWorker { let client = &self.client; let rate_limiter = &self.rate_limiter; - let results: Vec> = stream::iter(pending) + let results: Vec> = stream::iter(blocks) .map(|(block_number,)| async move { rate_limiter.until_ready().await; match client.get_da_status(block_number as u64).await { Ok((header_da, data_da)) => { - match sqlx::query(UPDATE_PENDING_DA_STATUS_SQL) + match sqlx::query(UPSERT_DA_STATUS_SQL) .bind(block_number) .bind(header_da as i64) .bind(data_da as i64) @@ -249,7 +157,7 @@ impl DaWorker { Ok(_) => None, Err(e) => { tracing::warn!( - "Failed to update DA status for block {}: {}", + "Failed to upsert DA status for block {}: {}", block_number, e ); @@ -349,15 +257,17 @@ mod tests { } #[test] - fn scheduler_queries_prioritize_newest_blocks() { - assert!(SELECT_MISSING_BLOCKS_SQL.contains("ORDER BY b.number DESC")); - assert!(SELECT_PENDING_BLOCKS_SQL.contains("ORDER BY block_number DESC")); - assert!(SELECT_MISSING_BLOCKS_SQL.contains("LIMIT $1")); - assert!(SELECT_PENDING_BLOCKS_SQL.contains("LIMIT $1")); + fn query_prioritizes_newest_blocks() { + assert!(SELECT_NEEDS_DA_SQL.contains("ORDER BY b.number DESC")); + assert!(SELECT_NEEDS_DA_SQL.contains("LIMIT $1")); + // Covers both missing rows and 0-height rows in one pass + assert!(SELECT_NEEDS_DA_SQL.contains("d.block_number IS NULL")); + assert!(SELECT_NEEDS_DA_SQL.contains("header_da_height = 0")); + assert!(SELECT_NEEDS_DA_SQL.contains("data_da_height = 0")); } #[test] - fn pending_update_sql_suppresses_noop_writes() { - assert!(UPDATE_PENDING_DA_STATUS_SQL.contains("IS DISTINCT FROM")); + fn upsert_suppresses_noop_writes() { + assert!(UPSERT_DA_STATUS_SQL.contains("IS DISTINCT FROM")); } }