From 082e9db35f44de24a9bcd771c74594e13a028110 Mon Sep 17 00:00:00 2001 From: 2witstudios <2witstudios@gmail.com> Date: Sat, 7 Mar 2026 22:05:00 -0600 Subject: [PATCH 1/3] =?UTF-8?q?feat:=20Add=20`pu=20pulse`=20command=20?= =?UTF-8?q?=E2=80=94=20workspace=20heartbeat=20at=20a=20glance?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Shows all active worktrees with their agents, runtimes, git stats (files changed, insertions, deletions), and prompt snippets in one structured view. Supports --json for programmatic consumption. Co-Authored-By: Claude Opus 4.6 --- crates/pu-cli/src/commands/mod.rs | 1 + crates/pu-cli/src/commands/pulse.rs | 16 +++ crates/pu-cli/src/main.rs | 7 ++ crates/pu-cli/src/output.rs | 184 ++++++++++++++++++++++++++++ crates/pu-core/src/protocol.rs | 35 ++++++ crates/pu-engine/src/engine.rs | 124 ++++++++++++++++++- 6 files changed, 366 insertions(+), 1 deletion(-) create mode 100644 crates/pu-cli/src/commands/pulse.rs diff --git a/crates/pu-cli/src/commands/mod.rs b/crates/pu-cli/src/commands/mod.rs index cb25e00..4d51fd9 100644 --- a/crates/pu-cli/src/commands/mod.rs +++ b/crates/pu-cli/src/commands/mod.rs @@ -8,6 +8,7 @@ pub mod init; pub mod kill; pub mod logs; pub mod prompt; +pub mod pulse; pub mod schedule; pub mod send; pub mod spawn; diff --git a/crates/pu-cli/src/commands/pulse.rs b/crates/pu-cli/src/commands/pulse.rs new file mode 100644 index 0000000..40b197c --- /dev/null +++ b/crates/pu-cli/src/commands/pulse.rs @@ -0,0 +1,16 @@ +use crate::client; +use crate::daemon_ctrl; +use crate::error::CliError; +use crate::output; +use pu_core::protocol::Request; +use std::path::Path; + +pub async fn run(socket: &Path, json: bool) -> Result<(), CliError> { + daemon_ctrl::ensure_daemon(socket).await?; + + let project_root = crate::commands::cwd_string()?; + let resp = client::send_request(socket, &Request::Pulse { project_root }).await?; + let resp = output::check_response(resp, json)?; + output::print_response(&resp, json); + Ok(()) +} diff --git a/crates/pu-cli/src/main.rs b/crates/pu-cli/src/main.rs index ee8111d..7656e34 100644 --- a/crates/pu-cli/src/main.rs +++ b/crates/pu-cli/src/main.rs @@ -147,6 +147,12 @@ enum Commands { #[command(subcommand)] action: ScheduleAction, }, + /// Workspace pulse — agents, runtimes, and git stats at a glance + Pulse { + /// Output as JSON + #[arg(long)] + json: bool, + }, /// Show git diffs across agent worktrees Diff { /// Diff a specific worktree @@ -616,6 +622,7 @@ async fn main() { json, } => commands::send::run(&socket, &agent_id, text, no_enter, keys, json).await, Commands::Grid { action } => commands::grid::run(&socket, action).await, + Commands::Pulse { json } => commands::pulse::run(&socket, json).await, Commands::Diff { worktree, stat, diff --git a/crates/pu-cli/src/output.rs b/crates/pu-cli/src/output.rs index da59fa8..7b70bb0 100644 --- a/crates/pu-cli/src/output.rs +++ b/crates/pu-cli/src/output.rs @@ -34,6 +34,46 @@ fn status_colored(status: AgentStatus, exit_code: Option) -> String { } } +fn format_duration(seconds: i64) -> String { + if seconds < 60 { + format!("{seconds}s") + } else if seconds < 3600 { + format!("{}m {}s", seconds / 60, seconds % 60) + } else { + let h = seconds / 3600; + let m = (seconds % 3600) / 60; + format!("{h}h {m}m") + } +} + +fn print_agent_pulse(a: &pu_core::protocol::AgentPulseEntry) { + let status_str = status_colored(a.status, a.exit_code); + let runtime = format_duration(a.runtime_seconds); + let idle = a + .idle_seconds + .map(|s| { + if s > 0 { + format!(" idle {s}s") + } else { + String::new() + } + }) + .unwrap_or_default(); + + println!( + " {} {} {} ({}{}){}", + a.id.dimmed(), + a.name, + status_str, + runtime.dimmed(), + idle.dimmed(), + a.prompt_snippet + .as_ref() + .map(|s| format!("\n {}", s.dimmed())) + .unwrap_or_default() + ); +} + pub fn print_response(response: &Response, json_mode: bool) { if json_mode { println!( @@ -429,6 +469,66 @@ pub fn print_response(response: &Response, json_mode: bool) { } } } + Response::PulseReport { + worktrees, + root_agents, + } => { + if worktrees.is_empty() && root_agents.is_empty() { + println!("{}", "No active workspace".dimmed()); + return; + } + + // Root-level agents + if !root_agents.is_empty() { + println!("{}", "Root Agents".bold().underline()); + for a in root_agents { + print_agent_pulse(a); + } + if !worktrees.is_empty() { + println!(); + } + } + + for (i, wt) in worktrees.iter().enumerate() { + if i > 0 { + println!(); + } + // Worktree header with elapsed time + let elapsed = format_duration(wt.elapsed_seconds); + println!( + "{} {} {} ({})", + "Worktree".bold(), + wt.worktree_name.bold(), + wt.branch.green(), + elapsed.dimmed() + ); + + // Git stats + if let Some(ref err) = wt.diff_error { + println!(" git: {} {}", "error".red(), err); + } else if wt.files_changed > 0 { + println!( + " git: {} file(s), {} {}, {} {}", + wt.files_changed.to_string().bold(), + format!("+{}", wt.insertions).green(), + "ins".dimmed(), + format!("-{}", wt.deletions).red(), + "del".dimmed() + ); + } else { + println!(" git: {}", "no changes yet".dimmed()); + } + + // Agents in this worktree + if wt.agents.is_empty() { + println!(" {}", "no agents".dimmed()); + } else { + for a in &wt.agents { + print_agent_pulse(a); + } + } + } + } Response::ScheduleList { schedules } => { if schedules.is_empty() { println!("No schedules"); @@ -887,6 +987,90 @@ mod tests { print_response(&resp, false); } + // --- pulse output --- + + #[test] + fn given_pulse_report_should_not_panic() { + let resp = Response::PulseReport { + worktrees: vec![pu_core::protocol::WorktreePulseEntry { + worktree_id: "wt-1".into(), + worktree_name: "feature-5".into(), + branch: "pu/feature-5".into(), + elapsed_seconds: 3661, + agents: vec![pu_core::protocol::AgentPulseEntry { + id: "ag-1".into(), + name: "claude".into(), + agent_type: "claude".into(), + status: AgentStatus::Streaming, + exit_code: None, + runtime_seconds: 120, + idle_seconds: Some(5), + prompt_snippet: Some("Add pulse command to CLI".into()), + }], + files_changed: 3, + insertions: 42, + deletions: 7, + diff_error: None, + }], + root_agents: vec![pu_core::protocol::AgentPulseEntry { + id: "ag-2".into(), + name: "point-guard".into(), + agent_type: "claude".into(), + status: AgentStatus::Waiting, + exit_code: None, + runtime_seconds: 7200, + idle_seconds: Some(30), + prompt_snippet: None, + }], + }; + print_response(&resp, false); + } + + #[test] + fn given_empty_pulse_report_should_not_panic() { + let resp = Response::PulseReport { + worktrees: vec![], + root_agents: vec![], + }; + print_response(&resp, false); + } + + #[test] + fn given_pulse_report_json_should_produce_valid_json() { + let resp = Response::PulseReport { + worktrees: vec![pu_core::protocol::WorktreePulseEntry { + worktree_id: "wt-1".into(), + worktree_name: "test".into(), + branch: "pu/test".into(), + elapsed_seconds: 60, + agents: vec![], + files_changed: 0, + insertions: 0, + deletions: 0, + diff_error: None, + }], + root_agents: vec![], + }; + let json = serde_json::to_string_pretty(&resp).unwrap(); + let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); + assert_eq!(parsed["type"], "pulse_report"); + } + + #[test] + fn given_format_duration_under_60s() { + assert_eq!(format_duration(45), "45s"); + } + + #[test] + fn given_format_duration_minutes() { + assert_eq!(format_duration(125), "2m 5s"); + } + + #[test] + fn given_format_duration_hours() { + assert_eq!(format_duration(3661), "1h 1m"); + } + // --- schedule output --- #[test] diff --git a/crates/pu-core/src/protocol.rs b/crates/pu-core/src/protocol.rs index d35cf56..6bf02c5 100644 --- a/crates/pu-core/src/protocol.rs +++ b/crates/pu-core/src/protocol.rs @@ -252,6 +252,9 @@ pub enum Request { #[serde(default)] stat: bool, }, + Pulse { + project_root: String, + }, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -530,6 +533,10 @@ pub enum Response { DiffResult { diffs: Vec, }, + PulseReport { + worktrees: Vec, + root_agents: Vec, + }, Ok, ShuttingDown, Error { @@ -573,6 +580,34 @@ pub struct WorktreeDiffEntry { pub error: Option, } +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct AgentPulseEntry { + pub id: String, + pub name: String, + pub agent_type: String, + pub status: AgentStatus, + pub exit_code: Option, + pub runtime_seconds: i64, + pub idle_seconds: Option, + pub prompt_snippet: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub struct WorktreePulseEntry { + pub worktree_id: String, + pub worktree_name: String, + pub branch: String, + pub elapsed_seconds: i64, + pub agents: Vec, + pub files_changed: usize, + pub insertions: usize, + pub deletions: usize, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub diff_error: Option, +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/pu-engine/src/engine.rs b/crates/pu-engine/src/engine.rs index 33fdcf3..7f63faa 100644 --- a/crates/pu-engine/src/engine.rs +++ b/crates/pu-engine/src/engine.rs @@ -168,7 +168,8 @@ impl Engine { | Request::SaveSchedule { project_root, .. } | Request::EnableSchedule { project_root, .. } | Request::DisableSchedule { project_root, .. } - | Request::Diff { project_root, .. } => { + | Request::Diff { project_root, .. } + | Request::Pulse { project_root, .. } => { self.register_project(project_root); } _ => {} @@ -427,6 +428,7 @@ impl Engine { self.handle_diff(&project_root, worktree_id.as_deref(), stat) .await } + Request::Pulse { project_root } => self.handle_pulse(&project_root).await, } } @@ -3155,6 +3157,126 @@ impl Engine { } } + async fn handle_pulse(&self, project_root: &str) -> Response { + let m = match self.read_manifest_async(project_root).await { + Ok(m) => m, + Err(e) => return Self::error_response(&e), + }; + + let sessions = self.sessions.lock().await; + let now = chrono::Utc::now(); + + // Build root-level agents + let root_agents: Vec = m + .agents + .values() + .map(|a| { + let (status, exit_code, idle_seconds) = + self.live_agent_status_sync(&a.id, a, &sessions); + let runtime = (now - a.started_at).num_seconds(); + let snippet = a.prompt.as_ref().map(|p| { + let trimmed = p.trim(); + if trimmed.len() > 80 { + format!("{}...", &trimmed[..77]) + } else { + trimmed.to_string() + } + }); + pu_core::protocol::AgentPulseEntry { + id: a.id.clone(), + name: a.name.clone(), + agent_type: a.agent_type.clone(), + status, + exit_code, + runtime_seconds: runtime, + idle_seconds, + prompt_snippet: snippet, + } + }) + .collect(); + + // Build worktree entries with agents + git stats + let active_worktrees: Vec<_> = m + .worktrees + .values() + .filter(|wt| wt.status == WorktreeStatus::Active) + .cloned() + .collect(); + + // Drop sessions lock before shelling out to git + drop(sessions); + + let mut worktrees = Vec::new(); + for wt in &active_worktrees { + let elapsed = (now - wt.created_at).num_seconds(); + + // Gather agent pulse entries (re-acquire lock briefly) + let sessions = self.sessions.lock().await; + let agents: Vec = wt + .agents + .values() + .map(|a| { + let (status, exit_code, idle_seconds) = + self.live_agent_status_sync(&a.id, a, &sessions); + let runtime = (now - a.started_at).num_seconds(); + let snippet = a.prompt.as_ref().map(|p| { + let trimmed = p.trim(); + if trimmed.len() > 80 { + format!("{}...", &trimmed[..77]) + } else { + trimmed.to_string() + } + }); + pu_core::protocol::AgentPulseEntry { + id: a.id.clone(), + name: a.name.clone(), + agent_type: a.agent_type.clone(), + status, + exit_code, + runtime_seconds: runtime, + idle_seconds, + prompt_snippet: snippet, + } + }) + .collect(); + drop(sessions); + + // Get git diff stats + let wt_path = std::path::PathBuf::from(&wt.path); + let (files_changed, insertions, deletions, diff_error) = if wt_path.exists() { + let base = wt.base_branch.as_deref(); + match git::diff_worktree(&wt_path, base, true).await { + Ok(output) => ( + output.files_changed, + output.insertions, + output.deletions, + None, + ), + Err(e) => (0, 0, 0, Some(format!("{e}"))), + } + } else { + (0, 0, 0, None) + }; + + worktrees.push(pu_core::protocol::WorktreePulseEntry { + worktree_id: wt.id.clone(), + worktree_name: wt.name.clone(), + branch: wt.branch.clone(), + elapsed_seconds: elapsed, + agents, + files_changed, + insertions, + deletions, + diff_error, + }); + } + + Response::PulseReport { + worktrees, + root_agents, + } + } + async fn handle_diff( &self, project_root: &str, From d325efbcea6f06a1e00aae6b2c8fea3c13886da1 Mon Sep 17 00:00:00 2001 From: 2witstudios <2witstudios@gmail.com> Date: Sun, 8 Mar 2026 14:00:25 -0500 Subject: [PATCH 2/3] fix: Unicode-safe truncation and cleanup in pulse handler - Use chars().take(77) instead of byte slicing to prevent panic on multi-byte prompts - Extract agent_pulse_entry() helper to eliminate duplicated mapping logic - Collect all agent data in a single lock acquisition before the git loop - Use format_duration() for idle seconds for consistent time display Co-Authored-By: Claude Opus 4.6 --- crates/pu-cli/src/output.rs | 2 +- crates/pu-engine/src/engine.rs | 99 +++++++++++++++------------------- 2 files changed, 44 insertions(+), 57 deletions(-) diff --git a/crates/pu-cli/src/output.rs b/crates/pu-cli/src/output.rs index 7b70bb0..bd38d97 100644 --- a/crates/pu-cli/src/output.rs +++ b/crates/pu-cli/src/output.rs @@ -53,7 +53,7 @@ fn print_agent_pulse(a: &pu_core::protocol::AgentPulseEntry) { .idle_seconds .map(|s| { if s > 0 { - format!(" idle {s}s") + format!(" idle {}", format_duration(s as i64)) } else { String::new() } diff --git a/crates/pu-engine/src/engine.rs b/crates/pu-engine/src/engine.rs index 7f63faa..ebc1a39 100644 --- a/crates/pu-engine/src/engine.rs +++ b/crates/pu-engine/src/engine.rs @@ -3157,6 +3157,36 @@ impl Engine { } } + fn agent_pulse_entry( + &self, + agent: &AgentEntry, + sessions: &HashMap, + now: chrono::DateTime, + ) -> pu_core::protocol::AgentPulseEntry { + let (status, exit_code, idle_seconds) = + self.live_agent_status_sync(&agent.id, agent, sessions); + let runtime = (now - agent.started_at).num_seconds(); + let snippet = agent.prompt.as_ref().map(|p| { + let trimmed = p.trim(); + let truncated: String = trimmed.chars().take(77).collect(); + if truncated.len() < trimmed.len() { + format!("{truncated}...") + } else { + truncated + } + }); + pu_core::protocol::AgentPulseEntry { + id: agent.id.clone(), + name: agent.name.clone(), + agent_type: agent.agent_type.clone(), + status, + exit_code, + runtime_seconds: runtime, + idle_seconds, + prompt_snippet: snippet, + } + } + async fn handle_pulse(&self, project_root: &str) -> Response { let m = match self.read_manifest_async(project_root).await { Ok(m) => m, @@ -3170,32 +3200,10 @@ impl Engine { let root_agents: Vec = m .agents .values() - .map(|a| { - let (status, exit_code, idle_seconds) = - self.live_agent_status_sync(&a.id, a, &sessions); - let runtime = (now - a.started_at).num_seconds(); - let snippet = a.prompt.as_ref().map(|p| { - let trimmed = p.trim(); - if trimmed.len() > 80 { - format!("{}...", &trimmed[..77]) - } else { - trimmed.to_string() - } - }); - pu_core::protocol::AgentPulseEntry { - id: a.id.clone(), - name: a.name.clone(), - agent_type: a.agent_type.clone(), - status, - exit_code, - runtime_seconds: runtime, - idle_seconds, - prompt_snippet: snippet, - } - }) + .map(|a| self.agent_pulse_entry(a, &sessions, now)) .collect(); - // Build worktree entries with agents + git stats + // Build worktree entries — collect all agent data in one lock acquisition let active_worktrees: Vec<_> = m .worktrees .values() @@ -3203,44 +3211,23 @@ impl Engine { .cloned() .collect(); + let wt_agents: Vec> = active_worktrees + .iter() + .map(|wt| { + wt.agents + .values() + .map(|a| self.agent_pulse_entry(a, &sessions, now)) + .collect() + }) + .collect(); + // Drop sessions lock before shelling out to git drop(sessions); let mut worktrees = Vec::new(); - for wt in &active_worktrees { + for (wt, agents) in active_worktrees.iter().zip(wt_agents) { let elapsed = (now - wt.created_at).num_seconds(); - // Gather agent pulse entries (re-acquire lock briefly) - let sessions = self.sessions.lock().await; - let agents: Vec = wt - .agents - .values() - .map(|a| { - let (status, exit_code, idle_seconds) = - self.live_agent_status_sync(&a.id, a, &sessions); - let runtime = (now - a.started_at).num_seconds(); - let snippet = a.prompt.as_ref().map(|p| { - let trimmed = p.trim(); - if trimmed.len() > 80 { - format!("{}...", &trimmed[..77]) - } else { - trimmed.to_string() - } - }); - pu_core::protocol::AgentPulseEntry { - id: a.id.clone(), - name: a.name.clone(), - agent_type: a.agent_type.clone(), - status, - exit_code, - runtime_seconds: runtime, - idle_seconds, - prompt_snippet: snippet, - } - }) - .collect(); - drop(sessions); - // Get git diff stats let wt_path = std::path::PathBuf::from(&wt.path); let (files_changed, insertions, deletions, diff_error) = if wt_path.exists() { From 15fb4e76ff486d34e9995ae3552acf319be1bea6 Mon Sep 17 00:00:00 2001 From: 2witstudios <2witstudios@gmail.com> Date: Sun, 8 Mar 2026 14:19:28 -0500 Subject: [PATCH 3/3] fix: Bump PROTOCOL_VERSION to 3 for Pulse variants The Pulse request and PulseReport response add new wire-format variants, requiring a version bump to prevent deserialization failures between mismatched CLI/daemon versions. Co-Authored-By: Claude Opus 4.6 --- crates/pu-core/src/protocol.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/pu-core/src/protocol.rs b/crates/pu-core/src/protocol.rs index 6bf02c5..5748e8a 100644 --- a/crates/pu-core/src/protocol.rs +++ b/crates/pu-core/src/protocol.rs @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize}; use crate::types::{AgentStatus, WorktreeEntry}; -pub const PROTOCOL_VERSION: u32 = 2; +pub const PROTOCOL_VERSION: u32 = 3; /// Serde helper: encode Vec as hex in JSON for binary PTY data. mod hex_bytes { @@ -984,7 +984,7 @@ mod tests { #[test] fn given_protocol_version_should_be_current() { - assert_eq!(PROTOCOL_VERSION, 2); + assert_eq!(PROTOCOL_VERSION, 3); } // --- GridCommand round-trips ---