Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 50 additions & 140 deletions backend/crates/atlas-server/src/indexer/da_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,16 @@
//! This worker queries ev-node's Connect RPC service to determine at which Celestia
//! height each block's header and data were submitted.
//!
//! ## Two-phase design
//! ## Design
//!
//! The worker runs in a loop with a fixed RPC budget per cycle (BATCH_SIZE):
//! Each cycle the worker fetches the BATCH_SIZE highest-numbered blocks that still
//! need DA info — either because they have no entry in `block_da_status` yet, or
//! because a previous check returned 0 heights (not yet included on Celestia).
//! Both cases are handled by a single unified query ordered by block number DESC,
//! so fresh blocks always get priority over the historical backfill.
//!
//! 1. **Backfill** — Discovers blocks in the `blocks` table that are missing from
//! `block_da_status`. Queries ev-node for each and INSERTs the result.
//! **Always inserts a row, even when DA heights are 0** (block not yet included
//! on Celestia). This marks the block as "checked" so the backfill phase won't
//! re-query it on the next cycle. Processes newest blocks first so the UI shows
//! current data immediately.
//!
//! 2. **Update pending** — Finds rows where `header_da_height = 0 OR data_da_height = 0`
//! and re-queries ev-node. Updates with new values when the block has been included.
//! Processes newest pending blocks first (most relevant to UI users).
//!
//! Both phases share the same per-cycle RPC budget. Backfill runs first and takes
//! what it needs; pending gets the remainder. This ensures new blocks are checked
//! promptly while pending blocks still make progress every cycle.
//!
//! A block flows: backfill (phase 1) → update-pending (phase 2) → done.
//!
//! After each batch, the worker sends updated block numbers through an in-process
//! broadcast channel so the SSE handler can push live DA status changes to clients.
//! After each batch, updated block numbers are sent through an in-process broadcast
//! channel so the SSE handler can push live DA status changes to clients.

use anyhow::Result;
use futures::stream::{self, StreamExt};
Expand All @@ -38,35 +25,32 @@ use tokio::sync::broadcast;

use super::evnode::EvnodeClient;

/// Total RPC budget per cycle, split between backfill and pending.
/// Maximum blocks processed per cycle.
const BATCH_SIZE: i64 = 100;

/// Sleep when idle (no work in either phase).
/// Sleep when there is no work to do.
const IDLE_SLEEP: Duration = Duration::from_millis(500);

const SELECT_MISSING_BLOCKS_SQL: &str = "SELECT b.number FROM blocks b
LEFT JOIN block_da_status d ON d.block_number = b.number
WHERE d.block_number IS NULL
ORDER BY b.number DESC
LIMIT $1";

const INSERT_DA_STATUS_SQL: &str =
"INSERT INTO block_da_status (block_number, header_da_height, data_da_height)
VALUES ($1, $2, $3)
ON CONFLICT (block_number) DO UPDATE SET
header_da_height = EXCLUDED.header_da_height,
data_da_height = EXCLUDED.data_da_height,
updated_at = NOW()";

const SELECT_PENDING_BLOCKS_SQL: &str = "SELECT block_number FROM block_da_status
WHERE header_da_height = 0 OR data_da_height = 0
ORDER BY block_number DESC
LIMIT $1";

const UPDATE_PENDING_DA_STATUS_SQL: &str = "UPDATE block_da_status
SET header_da_height = $2, data_da_height = $3, updated_at = NOW()
WHERE block_number = $1
AND (header_da_height, data_da_height) IS DISTINCT FROM ($2, $3)";
/// Unified query: blocks missing from block_da_status OR still at 0 heights,
/// newest first so fresh blocks are always prioritized over historical backfill.
const SELECT_NEEDS_DA_SQL: &str = "
SELECT b.number FROM blocks b
LEFT JOIN block_da_status d ON d.block_number = b.number
WHERE d.block_number IS NULL
OR d.header_da_height = 0
OR d.data_da_height = 0
ORDER BY b.number DESC
LIMIT $1";

const UPSERT_DA_STATUS_SQL: &str = "
INSERT INTO block_da_status (block_number, header_da_height, data_da_height)
VALUES ($1, $2, $3)
ON CONFLICT (block_number) DO UPDATE SET
header_da_height = EXCLUDED.header_da_height,
data_da_height = EXCLUDED.data_da_height,
updated_at = NOW()
WHERE (block_da_status.header_da_height, block_da_status.data_da_height)
IS DISTINCT FROM (EXCLUDED.header_da_height, EXCLUDED.data_da_height)";

#[derive(Clone, Debug)]
pub struct DaSseUpdate {
Expand Down Expand Up @@ -120,24 +104,9 @@ impl DaWorker {
);

loop {
// Phase 1: backfill gets first pick of the budget
let backfilled = self.backfill_new_blocks(BATCH_SIZE).await?;

// Phase 2: pending gets whatever budget remains
let remaining = BATCH_SIZE - backfilled as i64;
let updated = if remaining > 0 {
self.update_pending_blocks(remaining).await?
} else {
0
};

let did_work = backfilled > 0 || updated > 0;
if did_work {
tracing::info!(
"DA worker cycle: backfilled {}, updated {} pending",
backfilled,
updated
);
let processed = self.process_blocks(BATCH_SIZE).await?;
if processed > 0 {
tracing::info!("DA worker cycle: processed {}", processed);
} else {
tokio::time::sleep(IDLE_SLEEP).await;
}
Expand All @@ -152,89 +121,28 @@ impl DaWorker {
let _ = self.da_events_tx.send(updates.to_vec());
}

/// Phase 1: Find blocks missing from block_da_status and query ev-node.
/// Returns the number of blocks processed.
async fn backfill_new_blocks(&self, limit: i64) -> Result<usize> {
let missing: Vec<(i64,)> = sqlx::query_as(SELECT_MISSING_BLOCKS_SQL)
.bind(limit)
.fetch_all(&self.pool)
.await?;

if missing.is_empty() {
return Ok(0);
}

let pool = &self.pool;
let client = &self.client;
let rate_limiter = &self.rate_limiter;

let results: Vec<Option<DaSseUpdate>> = stream::iter(missing)
.map(|(block_number,)| async move {
rate_limiter.until_ready().await;
match client.get_da_status(block_number as u64).await {
Ok((header_da, data_da)) => {
if let Err(e) = sqlx::query(INSERT_DA_STATUS_SQL)
.bind(block_number)
.bind(header_da as i64)
.bind(data_da as i64)
.execute(pool)
.await
{
tracing::warn!(
"Failed to insert DA status for block {}: {}",
block_number,
e
);
return None;
}
Some(DaSseUpdate {
block_number,
header_da_height: header_da as i64,
data_da_height: data_da as i64,
})
}
Err(e) => {
tracing::warn!(
"Failed to fetch DA status for block {}: {}",
block_number,
e
);
None
}
}
})
.buffer_unordered(self.concurrency)
.collect()
.await;

let updates: Vec<DaSseUpdate> = results.into_iter().flatten().collect();
self.notify_da_updates(&updates);

Ok(updates.len())
}

/// Phase 2: Re-check blocks where DA heights are still 0.
/// Returns the number of blocks processed.
async fn update_pending_blocks(&self, limit: i64) -> Result<usize> {
let pending: Vec<(i64,)> = sqlx::query_as(SELECT_PENDING_BLOCKS_SQL)
/// Fetch DA status for the highest-numbered blocks that still need it.
/// Returns the number of blocks where DA heights actually changed.
async fn process_blocks(&self, limit: i64) -> Result<usize> {
let blocks: Vec<(i64,)> = sqlx::query_as(SELECT_NEEDS_DA_SQL)
.bind(limit)
.fetch_all(&self.pool)
.await?;

if pending.is_empty() {
if blocks.is_empty() {
return Ok(0);
}

let pool = &self.pool;
let client = &self.client;
let rate_limiter = &self.rate_limiter;

let results: Vec<Option<DaSseUpdate>> = stream::iter(pending)
let results: Vec<Option<DaSseUpdate>> = stream::iter(blocks)
.map(|(block_number,)| async move {
rate_limiter.until_ready().await;
match client.get_da_status(block_number as u64).await {
Ok((header_da, data_da)) => {
match sqlx::query(UPDATE_PENDING_DA_STATUS_SQL)
match sqlx::query(UPSERT_DA_STATUS_SQL)
.bind(block_number)
.bind(header_da as i64)
.bind(data_da as i64)
Expand All @@ -249,7 +157,7 @@ impl DaWorker {
Ok(_) => None,
Err(e) => {
tracing::warn!(
"Failed to update DA status for block {}: {}",
"Failed to upsert DA status for block {}: {}",
block_number,
e
);
Expand Down Expand Up @@ -349,15 +257,17 @@ mod tests {
}

#[test]
fn scheduler_queries_prioritize_newest_blocks() {
assert!(SELECT_MISSING_BLOCKS_SQL.contains("ORDER BY b.number DESC"));
assert!(SELECT_PENDING_BLOCKS_SQL.contains("ORDER BY block_number DESC"));
assert!(SELECT_MISSING_BLOCKS_SQL.contains("LIMIT $1"));
assert!(SELECT_PENDING_BLOCKS_SQL.contains("LIMIT $1"));
fn query_prioritizes_newest_blocks() {
assert!(SELECT_NEEDS_DA_SQL.contains("ORDER BY b.number DESC"));
assert!(SELECT_NEEDS_DA_SQL.contains("LIMIT $1"));
// Covers both missing rows and 0-height rows in one pass
assert!(SELECT_NEEDS_DA_SQL.contains("d.block_number IS NULL"));
assert!(SELECT_NEEDS_DA_SQL.contains("header_da_height = 0"));
assert!(SELECT_NEEDS_DA_SQL.contains("data_da_height = 0"));
}

#[test]
fn pending_update_sql_suppresses_noop_writes() {
assert!(UPDATE_PENDING_DA_STATUS_SQL.contains("IS DISTINCT FROM"));
fn upsert_suppresses_noop_writes() {
assert!(UPSERT_DA_STATUS_SQL.contains("IS DISTINCT FROM"));
}
}
Loading