From dcbd665572ce76df9ad673adf4a76ea32f03c558 Mon Sep 17 00:00:00 2001 From: Aditya Gollamudi Date: Wed, 18 Feb 2026 04:06:32 -0500 Subject: [PATCH] feat: Extend `\dt` psql command output with shard metadata (#709) Intercept \dt command, and add a `Shard` column to the output. Add a flag to `Route` that indicates if \dt is being executed so the Shard column is conditionally applied. Add `shard_map` HashMap to `Route` as well that stores tables with their corresponding shard. Introduce `forward_with_shard` function in backend/pool/connection/binding.rs that exposes the shard_map property to be streamed in the query engine. Add engine logic to populate the new column correctly and handle tables sharded across multiple databases Ex. output: List of tables Schema | Name | Type | Owner | Shard --------+-----------+-------+--------+--------- public | only_on_0 | table | ubuntu | 0 public | only_on_1 | table | ubuntu | 1 public | only_on_2 | table | ubuntu | 2 public | users | table | ubuntu | 0, 1, 2 Signed-off-by: Aditya Gollamudi --- pgdog/src/backend/pool/connection/binding.rs | 24 ++++++++- .../pool/connection/multi_shard/mod.rs | 13 +++++ pgdog/src/frontend/client/query_engine/mod.rs | 3 ++ .../src/frontend/client/query_engine/query.rs | 51 ++++++++++++++++++- pgdog/src/frontend/router/parser/query/mod.rs | 15 ++++++ pgdog/src/frontend/router/parser/route.rs | 23 ++++++++- pgdog/src/net/messages/mod.rs | 11 ++++ 7 files changed, 135 insertions(+), 5 deletions(-) diff --git a/pgdog/src/backend/pool/connection/binding.rs b/pgdog/src/backend/pool/connection/binding.rs index 677cd3cf3..cfaa067be 100644 --- a/pgdog/src/backend/pool/connection/binding.rs +++ b/pgdog/src/backend/pool/connection/binding.rs @@ -2,11 +2,12 @@ use crate::{ frontend::{client::query_engine::TwoPcPhase, ClientRequest}, - net::{parameter::Parameters, BackendKeyData, ProtocolMessage, Query}, + net::{parameter::Parameters, BackendKeyData, Message, ProtocolMessage, Query}, state::State, }; use futures::future::join_all; +use std::collections::HashMap; use super::*; @@ -53,6 +54,13 @@ impl Binding { self.disconnect(); } + pub fn forward_with_shard(&self) -> Option>> { + match self { + Binding::MultiShard(_shards, state) => state.table_shard_map(), + _ => None, + } + } + /// Are we connected to a backend? pub fn connected(&self) -> bool { match self { @@ -91,13 +99,25 @@ impl Binding { return Ok(message); } let mut read = false; - for server in shards.iter_mut() { + + for (shard, server) in shards.iter_mut().enumerate() { if !server.has_more_messages() { continue; } let message = server.read().await?; + if state.display_table() { + if let Some(table_name) = message.table_name_from_dt().unwrap() { + let mut map: HashMap> = + state.table_shard_map().unwrap_or_default(); + map.entry(table_name.clone()) + .or_insert_with(Vec::new) + .push(shard); + state.set_table_shard_map(Some(map)); + } + } + read = true; if let Some(message) = state.forward(message)? { return Ok(message); diff --git a/pgdog/src/backend/pool/connection/multi_shard/mod.rs b/pgdog/src/backend/pool/connection/multi_shard/mod.rs index 719e2078b..7bb459964 100644 --- a/pgdog/src/backend/pool/connection/multi_shard/mod.rs +++ b/pgdog/src/backend/pool/connection/multi_shard/mod.rs @@ -1,6 +1,7 @@ //! Multi-shard connection state. use context::Context; +use std::collections::HashMap; use crate::{ frontend::{router::Route, PreparedStatements}, @@ -345,4 +346,16 @@ impl MultiShard { } } } + + pub fn display_table(&self) -> bool { + self.route.display_table() + } + + pub fn set_table_shard_map(&mut self, map: Option>>) { + self.route.set_table_shard_map(map); + } + + pub fn table_shard_map(&self) -> Option>> { + self.route.table_shard_map() + } } diff --git a/pgdog/src/frontend/client/query_engine/mod.rs b/pgdog/src/frontend/client/query_engine/mod.rs index b383b69d5..67d79c73c 100644 --- a/pgdog/src/frontend/client/query_engine/mod.rs +++ b/pgdog/src/frontend/client/query_engine/mod.rs @@ -10,6 +10,7 @@ use crate::{ state::State, }; +use std::collections::HashSet; use tracing::debug; pub mod connect; @@ -78,6 +79,7 @@ pub struct QueryEngine { notify_buffer: NotifyBuffer, pending_explain: Option, hooks: QueryEngineHooks, + seen_tables: HashSet, } impl QueryEngine { @@ -105,6 +107,7 @@ impl QueryEngine { pending_explain: None, begin_stmt: None, router: Router::default(), + seen_tables: HashSet::new(), }) } diff --git a/pgdog/src/frontend/client/query_engine/query.rs b/pgdog/src/frontend/client/query_engine/query.rs index 5e87f7a2f..f7203810c 100644 --- a/pgdog/src/frontend/client/query_engine/query.rs +++ b/pgdog/src/frontend/client/query_engine/query.rs @@ -7,7 +7,7 @@ use crate::{ router::parser::{explain_trace::ExplainTrace, rewrite::statement::plan::RewriteResult}, }, net::{ - DataRow, FromBytes, Message, Protocol, ProtocolMessage, Query, ReadyForQuery, + DataRow, Field, FromBytes, Message, Protocol, ProtocolMessage, Query, ReadyForQuery, RowDescription, ToBytes, TransactionState, }, state::State, @@ -36,7 +36,7 @@ impl QueryEngine { // We need to run a query now. if context.in_transaction() { // Connect to one shard if not sharded or to all shards - // for a cross-shard tranasction. + // for a cross-shard transaction. if !self.connect_transaction(context).await? { return Ok(()); } @@ -123,8 +123,23 @@ impl QueryEngine { ) -> Result<(), Error> { self.streaming = message.streaming(); + let should_rewrite_for_display_table = + if let Some(route) = context.client_request.route.as_ref() { + route.display_table() + } else { + false + }; + let code = message.code(); let payload = if code == 'T' { + if should_rewrite_for_display_table { + let mut fields = RowDescription::from_bytes(message.payload()) + .unwrap() + .fields + .to_vec(); + fields.push(Field::text("Shard")); + message = RowDescription::new(&fields).message()?; + } Some(message.payload()) } else { None @@ -152,6 +167,38 @@ impl QueryEngine { self.pending_explain = None; } + if code == 'D' { + if should_rewrite_for_display_table { + let mut dr = DataRow::from_bytes(message.payload()).unwrap(); + let col = dr.column(1).unwrap(); + + let shard_map = self.backend.forward_with_shard(); + let table_lookup = std::str::from_utf8(&col).unwrap(); + + if let Some(map) = shard_map { + if self.seen_tables.contains(table_lookup) { + return Ok(()); + } + + self.seen_tables.insert(table_lookup.to_string()); + + let mut new_col = String::new(); + for (i, val) in map[table_lookup].iter().enumerate() { + if i > 0 { + new_col.push_str(", ") + } + new_col.push_str(&val.to_string()); + } + dr.add(new_col); + } else { + dr.add(None); + } + + message = dr.message()?; + Some(message.payload()); + } + } + // Messages that we need to send to the client immediately. // ReadyForQuery (B) | CopyInResponse (B) | ErrorResponse(B) | NoticeResponse(B) | NotificationResponse (B) let flush = matches!(code, 'Z' | 'G' | 'E' | 'N' | 'A') diff --git a/pgdog/src/frontend/router/parser/query/mod.rs b/pgdog/src/frontend/router/parser/query/mod.rs index 1207f22a1..6fe4f1824 100644 --- a/pgdog/src/frontend/router/parser/query/mod.rs +++ b/pgdog/src/frontend/router/parser/query/mod.rs @@ -108,6 +108,21 @@ impl QueryParser { Command::default() }; + // Check if we are executing \dt command + if let Command::Query(route) = &mut command { + let query = match context.query() { + Ok(res) => res, + Err(e) => return Err(e), + }; + if query.contains("pg_catalog.pg_class") + && query.contains("pg_catalog.pg_namespace") + && query.contains("relkind") + && query.contains("pg_toast") + { + route.set_display_table(true); + } + } + if let Command::Query(route) = &mut command { if route.is_cross_shard() && context.shards == 1 { context diff --git a/pgdog/src/frontend/router/parser/route.rs b/pgdog/src/frontend/router/parser/route.rs index 4ff723fa6..99fa7a9d7 100644 --- a/pgdog/src/frontend/router/parser/route.rs +++ b/pgdog/src/frontend/router/parser/route.rs @@ -1,4 +1,4 @@ -use std::{fmt::Display, ops::Deref}; +use std::{collections::HashMap, fmt::Display, ops::Deref}; use lazy_static::lazy_static; @@ -90,6 +90,8 @@ pub struct Route { rollback_savepoint: bool, search_path_driven: bool, schema_changed: bool, + display_table: bool, + table_shard_map: Option>>, } impl Display for Route { @@ -326,6 +328,25 @@ impl Route { ShardSource::Table(TableReason::Omni) | ShardSource::RoundRobin(RoundRobinReason::Omni) ) } + pub fn set_display_table(&mut self, v: bool) { + self.display_table = v; + } + + pub fn display_table(&self) -> bool { + self.display_table + } + + pub fn table_shard_map(&self) -> Option>> { + if self.table_shard_map == None { + Some(HashMap::new()) + } else { + self.table_shard_map.clone() + } + } + + pub fn set_table_shard_map(&mut self, map: Option>>) { + self.table_shard_map = map; + } } /// Shard source. diff --git a/pgdog/src/net/messages/mod.rs b/pgdog/src/net/messages/mod.rs index 9604a22c3..bceb82b72 100644 --- a/pgdog/src/net/messages/mod.rs +++ b/pgdog/src/net/messages/mod.rs @@ -256,6 +256,17 @@ impl Message { pub fn transaction_error(&self) -> bool { self.code() == 'Z' && self.payload[5] as char == 'E' } + + pub fn table_name_from_dt(&self) -> Result, Error> { + if self.code() != 'D' { + return Ok(None); + } + let byte_name = DataRow::from_bytes(self.payload()).unwrap().column(1); + + let table_name = std::str::from_utf8(&byte_name.unwrap())?.to_string(); + + return Ok(Some(table_name)); + } } /// Check that the message we received is what we expected.