diff --git a/pgdog/src/backend/pool/inner.rs b/pgdog/src/backend/pool/inner.rs index 361c9e36..80cb2bd8 100644 --- a/pgdog/src/backend/pool/inner.rs +++ b/pgdog/src/backend/pool/inner.rs @@ -252,7 +252,7 @@ impl Inner { /// Place connection back into the pool /// or give it to a waiting client. #[inline] - pub(super) fn put(&mut self, mut conn: Box, now: Instant) -> Result<(), Error> { + pub(super) fn put(&mut self, mut conn: Box) -> Result<(), Error> { // Try to give it to a client that's been waiting, if any. let id = *conn.id(); while let Some(waiter) = self.waiting.pop_front() { @@ -263,8 +263,8 @@ impl Inner { server: id, client: waiter.request.id, })?; - self.stats.counts.server_assignment_count += 1; - self.stats.counts.wait_time += now.duration_since(waiter.request.created_at); + self.stats + .record_checkout(waiter.request.created_at, waiter.request.read); return Ok(()); } } @@ -380,7 +380,7 @@ impl Inner { // Finally, if the server is ok, // place the connection back into the idle list. if server.can_check_in() { - self.put(server, now)?; + self.put(server)?; result.replenish = false; } else { self.out_of_sync += 1; @@ -854,7 +854,7 @@ mod test { }); let server = Box::new(Server::default()); - inner.put(server, Instant::now()).unwrap(); + inner.put(server).unwrap(); assert_eq!(inner.idle(), 0); // Connection given to waiter, not idle assert_eq!(inner.checked_out(), 1); // Connection now checked out to waiter @@ -869,7 +869,7 @@ mod test { let mut inner = Inner::default(); let server = Box::new(Server::default()); - inner.put(server, Instant::now()).unwrap(); + inner.put(server).unwrap(); assert_eq!(inner.idle(), 1); // Connection added to idle pool assert_eq!(inner.checked_out(), 0); @@ -1046,7 +1046,7 @@ mod test { assert_eq!(inner.waiting.len(), 3); let server = Box::new(Server::default()); - inner.put(server, Instant::now()).unwrap(); + inner.put(server).unwrap(); // All waiters should be removed from queue since we tried each one assert_eq!(inner.waiting.len(), 0); @@ -1083,7 +1083,7 @@ mod test { assert_eq!(inner.waiting.len(), 2); let server = Box::new(Server::default()); - inner.put(server, Instant::now()).unwrap(); + inner.put(server).unwrap(); // All waiters should be removed since they were all dropped assert_eq!(inner.waiting.len(), 0); diff --git a/pgdog/src/backend/pool/monitor.rs b/pgdog/src/backend/pool/monitor.rs index 7ad02cfe..ad0a7b10 100644 --- a/pgdog/src/backend/pool/monitor.rs +++ b/pgdog/src/backend/pool/monitor.rs @@ -226,11 +226,10 @@ impl Monitor { /// Replenish pool with one new connection. async fn replenish(&self, reason: ConnectReason) -> Result { if let Ok(conn) = Self::create_connection(&self.pool, reason).await { - let now = Instant::now(); let server = Box::new(conn); let mut guard = self.pool.lock(); if guard.online { - guard.put(server, now)?; + guard.put(server)? } Ok(true) } else { diff --git a/pgdog/src/backend/pool/pool_impl.rs b/pgdog/src/backend/pool/pool_impl.rs index 8c2a6312..60e0a2c8 100644 --- a/pgdog/src/backend/pool/pool_impl.rs +++ b/pgdog/src/backend/pool/pool_impl.rs @@ -123,10 +123,6 @@ impl Pool { // Fast path, idle connection probably available. let (server, granted_at, paused) = { - // Ask for time before we acquire the lock - // and only if we actually waited for a connection. - let granted_at = request.created_at; - let elapsed = granted_at.saturating_duration_since(request.created_at); let mut guard = self.lock(); if !guard.online { @@ -134,15 +130,14 @@ impl Pool { } let conn = guard.take(request)?; + // Capture the grant time after the lock and after take() so that + // lock contention and any in-lock work are included in wait_time. + let granted_at = Instant::now(); if conn.is_some() { - guard.stats.counts.wait_time += elapsed; - guard.stats.counts.server_assignment_count += 1; - if request.read { - guard.stats.counts.reads += 1; - } else { - guard.stats.counts.writes += 1; - } + guard + .stats + .record_checkout(request.created_at, request.read); } (conn, granted_at, guard.paused) @@ -296,7 +291,6 @@ impl Pool { pub(crate) fn move_conns_to(&self, destination: &Pool) -> Result<(), Error> { // Ensure no deadlock. assert!(self.inner.id != destination.id()); - let now = Instant::now(); { let mut from_guard = self.lock(); @@ -305,7 +299,7 @@ impl Pool { from_guard.online = false; let (idle, taken) = from_guard.move_conns_to(destination); for server in idle { - to_guard.put(server, now)?; + to_guard.put(server)?; } to_guard.set_taken(taken); } diff --git a/pgdog/src/backend/pool/state.rs b/pgdog/src/backend/pool/state.rs index 2c9c99f8..4631eb69 100644 --- a/pgdog/src/backend/pool/state.rs +++ b/pgdog/src/backend/pool/state.rs @@ -50,6 +50,7 @@ impl State { maxwait: guard .waiting .iter() + // The first waiter is the oldest, so their metric is basically the max wait time .next() .map(|req| now.duration_since(req.request.created_at)) .unwrap_or(Duration::ZERO), diff --git a/pgdog/src/backend/pool/stats.rs b/pgdog/src/backend/pool/stats.rs index 0ad625c7..7b2013bd 100644 --- a/pgdog/src/backend/pool/stats.rs +++ b/pgdog/src/backend/pool/stats.rs @@ -17,6 +17,7 @@ use pgdog_stats::memory::MemoryStats as StatsMemoryStats; use pgdog_stats::pool::Counts as StatsCounts; use pgdog_stats::pool::Stats as StatsStats; use pgdog_stats::MessageBufferStats; +use tokio::time::Instant; /// Pool statistics. /// @@ -106,6 +107,19 @@ impl Stats { pub fn calc_averages(&mut self, time: Duration) { self.inner.calc_averages(time); } + + /// Record a successful connection checkout. + /// Centralises the four counts that must always move together: + /// wait time, assignment counter, and the read/write routing counter. + pub fn record_checkout(&mut self, started_at: Instant, read: bool) { + self.counts.wait_time += started_at.elapsed(); + self.counts.server_assignment_count += 1; + if read { + self.counts.reads += 1; + } else { + self.counts.writes += 1; + } + } } /// Statistics calculated for the network buffer used