diff --git a/crates/pu-cli/src/commands/bench.rs b/crates/pu-cli/src/commands/bench.rs index 24055f8..db4a35b 100644 --- a/crates/pu-cli/src/commands/bench.rs +++ b/crates/pu-cli/src/commands/bench.rs @@ -57,7 +57,7 @@ pub async fn run_bench( } } - output::print_response(&resp, json); + output::print_response(&resp, json)?; Ok(()) } @@ -75,6 +75,6 @@ pub async fn run_play(socket: &Path, agent_id: &str, json: bool) -> Result<(), C ) .await?; let resp = output::check_response(resp, json)?; - output::print_response(&resp, json); + output::print_response(&resp, json)?; Ok(()) } diff --git a/crates/pu-cli/src/commands/mod.rs b/crates/pu-cli/src/commands/mod.rs index 054f55c..067c244 100644 --- a/crates/pu-cli/src/commands/mod.rs +++ b/crates/pu-cli/src/commands/mod.rs @@ -9,6 +9,7 @@ pub mod init; pub mod kill; pub mod logs; pub mod prompt; +pub mod pulse; pub mod schedule; pub mod send; pub mod spawn; diff --git a/crates/pu-cli/src/commands/pulse.rs b/crates/pu-cli/src/commands/pulse.rs new file mode 100644 index 0000000..846b81c --- /dev/null +++ b/crates/pu-cli/src/commands/pulse.rs @@ -0,0 +1,16 @@ +use crate::client; +use crate::daemon_ctrl; +use crate::error::CliError; +use crate::output; +use pu_core::protocol::Request; +use std::path::Path; + +pub async fn run(socket: &Path, json: bool) -> Result<(), CliError> { + daemon_ctrl::ensure_daemon(socket).await?; + + let project_root = crate::commands::cwd_string()?; + let resp = client::send_request(socket, &Request::Pulse { project_root }).await?; + let resp = output::check_response(resp, json)?; + output::print_response(&resp, json)?; + Ok(()) +} diff --git a/crates/pu-cli/src/main.rs b/crates/pu-cli/src/main.rs index a01a6fc..1a0143c 100644 --- a/crates/pu-cli/src/main.rs +++ b/crates/pu-cli/src/main.rs @@ -170,6 +170,12 @@ enum Commands { #[command(subcommand)] action: ScheduleAction, }, + /// Workspace pulse — agents, runtimes, and git stats at a glance + Pulse { + /// Output as JSON + #[arg(long)] + json: bool, + }, /// Show git diffs across agent worktrees Diff { /// Diff a specific worktree @@ -653,6 +659,7 @@ async fn main() { json, } => commands::send::run(&socket, &agent_id, text, no_enter, keys, json).await, Commands::Grid { action } => commands::grid::run(&socket, action).await, + Commands::Pulse { json } => commands::pulse::run(&socket, json).await, Commands::Diff { worktree, stat, diff --git a/crates/pu-cli/src/output.rs b/crates/pu-cli/src/output.rs index 5421c3e..ebac5ed 100644 --- a/crates/pu-cli/src/output.rs +++ b/crates/pu-cli/src/output.rs @@ -47,6 +47,46 @@ fn status_colored_with_suspended( } } +fn format_duration(seconds: i64) -> String { + if seconds < 60 { + format!("{seconds}s") + } else if seconds < 3600 { + format!("{}m {}s", seconds / 60, seconds % 60) + } else { + let h = seconds / 3600; + let m = (seconds % 3600) / 60; + format!("{h}h {m}m") + } +} + +fn print_agent_pulse(a: &pu_core::protocol::AgentPulseEntry) { + let status_str = status_colored(a.status, a.exit_code); + let runtime = format_duration(a.runtime_seconds); + let idle = a + .idle_seconds + .map(|s| { + if s > 0 { + format!(" idle {}", format_duration(s as i64)) + } else { + String::new() + } + }) + .unwrap_or_default(); + + println!( + " {} {} {} ({}{}){}", + a.id.dimmed(), + a.name, + status_str, + runtime.dimmed(), + idle.dimmed(), + a.prompt_snippet + .as_ref() + .map(|s| format!("\n {}", s.dimmed())) + .unwrap_or_default() + ); +} + pub fn print_response(response: &Response, json_mode: bool) -> Result<(), CliError> { if json_mode { println!("{}", serde_json::to_string_pretty(response)?); @@ -443,6 +483,66 @@ pub fn print_response(response: &Response, json_mode: bool) -> Result<(), CliErr } } } + Response::PulseReport { + worktrees, + root_agents, + } => { + if worktrees.is_empty() && root_agents.is_empty() { + println!("{}", "No active workspace".dimmed()); + return Ok(()); + } + + // Root-level agents + if !root_agents.is_empty() { + println!("{}", "Root Agents".bold().underline()); + for a in root_agents { + print_agent_pulse(a); + } + if !worktrees.is_empty() { + println!(); + } + } + + for (i, wt) in worktrees.iter().enumerate() { + if i > 0 { + println!(); + } + // Worktree header with elapsed time + let elapsed = format_duration(wt.elapsed_seconds); + println!( + "{} {} {} ({})", + "Worktree".bold(), + wt.worktree_name.bold(), + wt.branch.green(), + elapsed.dimmed() + ); + + // Git stats + if let Some(ref err) = wt.diff_error { + println!(" git: {} {}", "error".red(), err); + } else if wt.files_changed > 0 { + println!( + " git: {} file(s), {} {}, {} {}", + wt.files_changed.to_string().bold(), + format!("+{}", wt.insertions).green(), + "ins".dimmed(), + format!("-{}", wt.deletions).red(), + "del".dimmed() + ); + } else { + println!(" git: {}", "no changes yet".dimmed()); + } + + // Agents in this worktree + if wt.agents.is_empty() { + println!(" {}", "no agents".dimmed()); + } else { + for a in &wt.agents { + print_agent_pulse(a); + } + } + } + } Response::ScheduleList { schedules } => { if schedules.is_empty() { println!("No schedules"); @@ -719,7 +819,7 @@ mod tests { #[test] fn given_empty_suspend_result_should_not_panic() { let resp = Response::SuspendResult { suspended: vec![] }; - print_response(&resp, false); + print_response(&resp, false).unwrap(); } #[test] @@ -935,6 +1035,90 @@ mod tests { print_response(&resp, false).unwrap(); } + // --- pulse output --- + + #[test] + fn given_pulse_report_should_not_panic() { + let resp = Response::PulseReport { + worktrees: vec![pu_core::protocol::WorktreePulseEntry { + worktree_id: "wt-1".into(), + worktree_name: "feature-5".into(), + branch: "pu/feature-5".into(), + elapsed_seconds: 3661, + agents: vec![pu_core::protocol::AgentPulseEntry { + id: "ag-1".into(), + name: "claude".into(), + agent_type: "claude".into(), + status: AgentStatus::Streaming, + exit_code: None, + runtime_seconds: 120, + idle_seconds: Some(5), + prompt_snippet: Some("Add pulse command to CLI".into()), + }], + files_changed: 3, + insertions: 42, + deletions: 7, + diff_error: None, + }], + root_agents: vec![pu_core::protocol::AgentPulseEntry { + id: "ag-2".into(), + name: "point-guard".into(), + agent_type: "claude".into(), + status: AgentStatus::Waiting, + exit_code: None, + runtime_seconds: 7200, + idle_seconds: Some(30), + prompt_snippet: None, + }], + }; + print_response(&resp, false).unwrap(); + } + + #[test] + fn given_empty_pulse_report_should_not_panic() { + let resp = Response::PulseReport { + worktrees: vec![], + root_agents: vec![], + }; + print_response(&resp, false).unwrap(); + } + + #[test] + fn given_pulse_report_json_should_produce_valid_json() { + let resp = Response::PulseReport { + worktrees: vec![pu_core::protocol::WorktreePulseEntry { + worktree_id: "wt-1".into(), + worktree_name: "test".into(), + branch: "pu/test".into(), + elapsed_seconds: 60, + agents: vec![], + files_changed: 0, + insertions: 0, + deletions: 0, + diff_error: None, + }], + root_agents: vec![], + }; + let json = serde_json::to_string_pretty(&resp).unwrap(); + let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); + assert_eq!(parsed["type"], "pulse_report"); + } + + #[test] + fn given_format_duration_under_60s() { + assert_eq!(format_duration(45), "45s"); + } + + #[test] + fn given_format_duration_minutes() { + assert_eq!(format_duration(125), "2m 5s"); + } + + #[test] + fn given_format_duration_hours() { + assert_eq!(format_duration(3661), "1h 1m"); + } + // --- schedule output --- #[test] diff --git a/crates/pu-core/src/protocol.rs b/crates/pu-core/src/protocol.rs index 6f4245f..e19e9ee 100644 --- a/crates/pu-core/src/protocol.rs +++ b/crates/pu-core/src/protocol.rs @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize}; use crate::types::{AgentStatus, WorktreeEntry}; -pub const PROTOCOL_VERSION: u32 = 2; +pub const PROTOCOL_VERSION: u32 = 3; /// Serde helper: encode `Vec` as hex in JSON for binary PTY data. mod hex_bytes { @@ -252,6 +252,9 @@ pub enum Request { #[serde(default)] stat: bool, }, + Pulse { + project_root: String, + }, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -530,6 +533,10 @@ pub enum Response { DiffResult { diffs: Vec, }, + PulseReport { + worktrees: Vec, + root_agents: Vec, + }, Ok, ShuttingDown, Error { @@ -573,6 +580,34 @@ pub struct WorktreeDiffEntry { pub error: Option, } +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct AgentPulseEntry { + pub id: String, + pub name: String, + pub agent_type: String, + pub status: AgentStatus, + pub exit_code: Option, + pub runtime_seconds: i64, + pub idle_seconds: Option, + pub prompt_snippet: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct WorktreePulseEntry { + pub worktree_id: String, + pub worktree_name: String, + pub branch: String, + pub elapsed_seconds: i64, + pub agents: Vec, + pub files_changed: usize, + pub insertions: usize, + pub deletions: usize, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub diff_error: Option, +} + #[cfg(test)] mod tests { use super::*; @@ -949,7 +984,7 @@ mod tests { #[test] fn given_protocol_version_should_be_current() { - assert_eq!(PROTOCOL_VERSION, 2); + assert_eq!(PROTOCOL_VERSION, 3); } // --- GridCommand round-trips --- diff --git a/crates/pu-engine/src/engine.rs b/crates/pu-engine/src/engine.rs index 02824f8..894a29e 100644 --- a/crates/pu-engine/src/engine.rs +++ b/crates/pu-engine/src/engine.rs @@ -168,7 +168,8 @@ impl Engine { | Request::SaveSchedule { project_root, .. } | Request::EnableSchedule { project_root, .. } | Request::DisableSchedule { project_root, .. } - | Request::Diff { project_root, .. } => { + | Request::Diff { project_root, .. } + | Request::Pulse { project_root, .. } => { self.register_project(project_root); } _ => {} @@ -427,6 +428,7 @@ impl Engine { self.handle_diff(&project_root, worktree_id.as_deref(), stat) .await } + Request::Pulse { project_root } => self.handle_pulse(&project_root).await, } } @@ -3151,6 +3153,113 @@ impl Engine { } } + fn agent_pulse_entry( + &self, + agent: &AgentEntry, + sessions: &HashMap, + now: chrono::DateTime, + ) -> pu_core::protocol::AgentPulseEntry { + let (status, exit_code, idle_seconds) = + self.live_agent_status_sync(&agent.id, agent, sessions); + let runtime = (now - agent.started_at).num_seconds(); + let snippet = agent.prompt.as_ref().map(|p| { + let trimmed = p.trim(); + let truncated: String = trimmed.chars().take(77).collect(); + if truncated.len() < trimmed.len() { + format!("{truncated}...") + } else { + truncated + } + }); + pu_core::protocol::AgentPulseEntry { + id: agent.id.clone(), + name: agent.name.clone(), + agent_type: agent.agent_type.clone(), + status, + exit_code, + runtime_seconds: runtime, + idle_seconds, + prompt_snippet: snippet, + } + } + + async fn handle_pulse(&self, project_root: &str) -> Response { + let m = match self.read_manifest_async(project_root).await { + Ok(m) => m, + Err(e) => return Self::error_response(&e), + }; + + let sessions = self.sessions.lock().await; + let now = chrono::Utc::now(); + + // Build root-level agents + let root_agents: Vec = m + .agents + .values() + .map(|a| self.agent_pulse_entry(a, &sessions, now)) + .collect(); + + // Build worktree entries — collect all agent data in one lock acquisition + let active_worktrees: Vec<_> = m + .worktrees + .values() + .filter(|wt| wt.status == WorktreeStatus::Active) + .cloned() + .collect(); + + let wt_agents: Vec> = active_worktrees + .iter() + .map(|wt| { + wt.agents + .values() + .map(|a| self.agent_pulse_entry(a, &sessions, now)) + .collect() + }) + .collect(); + + // Drop sessions lock before shelling out to git + drop(sessions); + + let mut worktrees = Vec::new(); + for (wt, agents) in active_worktrees.iter().zip(wt_agents) { + let elapsed = (now - wt.created_at).num_seconds(); + + // Get git diff stats + let wt_path = std::path::PathBuf::from(&wt.path); + let (files_changed, insertions, deletions, diff_error) = if wt_path.exists() { + let base = wt.base_branch.as_deref(); + match git::diff_worktree(&wt_path, base, true).await { + Ok(output) => ( + output.files_changed, + output.insertions, + output.deletions, + None, + ), + Err(e) => (0, 0, 0, Some(format!("{e}"))), + } + } else { + (0, 0, 0, None) + }; + + worktrees.push(pu_core::protocol::WorktreePulseEntry { + worktree_id: wt.id.clone(), + worktree_name: wt.name.clone(), + branch: wt.branch.clone(), + elapsed_seconds: elapsed, + agents, + files_changed, + insertions, + deletions, + diff_error, + }); + } + + Response::PulseReport { + worktrees, + root_agents, + } + } + async fn handle_diff( &self, project_root: &str,