Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 12 additions & 21 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pgdog-plugin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ crate-type = ["rlib", "cdylib"]

[dependencies]
libloading = "0.8"
pg_query = { git = "https://github.com/pgdogdev/pg_query.rs.git", rev = "f8c216209f90525f065b47ffde9eb5da803d2dc6" }
pg_query = { git = "https://github.com/pgdogdev/pg_query.rs.git", rev = "d30dcb47fdf3fa77d102b813a34392146642903a" }
pgdog-macros = { path = "../pgdog-macros", version = "0.1.1" }

tracing = "0.1"
Expand Down
2 changes: 1 addition & 1 deletion pgdog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ base64 = "0.22"
md5 = "0.7"
futures = "0.3"
csv-core = "0.1"
pg_query = { git = "https://github.com/pgdogdev/pg_query.rs.git", rev = "f8c216209f90525f065b47ffde9eb5da803d2dc6" }
pg_query = { git = "https://github.com/pgdogdev/pg_query.rs.git", rev = "d30dcb47fdf3fa77d102b813a34392146642903a" }
regex = "1"
semver = "1"
uuid = { version = "1", features = ["v4", "serde"] }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use std::time::Duration;

use pgdog_postgres_types::Format;
use tokio::time::sleep;

use crate::{
backend::{server::test::test_server, Server},
expect_message,
net::{DataRow, RowDescription},
};

use super::prelude::*;

#[tokio::test]
async fn test_idle_in_transaction_partial_recovery() {
crate::logger();

// Direct connection for testing state.
let mut test_server = test_server().await;

let mut client = TestClient::new_replicas(Parameters::default())
.await
.leak_pool();

client.send_simple(Query::new("BEGIN")).await;
client.read_until('Z').await.unwrap();

client
.send_simple(Query::new("SELECT pg_backend_pid()::text"))
.await;
expect_message!(client.read().await, RowDescription);
let rd = expect_message!(client.read().await, DataRow);
let pid: String = rd.get(0, Format::Text).unwrap();

client.read_until('Z').await.unwrap();

// This won't fire because we'll be stuck inside the extended exchange.
client
.send_simple(Query::new("SET idle_in_transaction_session_timeout TO 100"))
.await;
client.read_until('Z').await.unwrap();

client.send(Parse::named("__test_1", "SELECT $1")).await;
client.send(Flush).await;
client.try_process().await.unwrap();
client.read_until('1').await.unwrap();

// Stuck inside extended exchange, idle in transaction timeout will not fire.
sleep(Duration::from_millis(100)).await;

client.send(Parse::named("__test_2", "SELECT $1")).await;
client.send(Flush).await;

client.try_process().await.unwrap();
client.read_until('1').await.unwrap();

// Server in active state.
assert_server_state(&mut test_server, &pid, "active").await;

client.send(Terminate).await;
drop(client);

sleep(Duration::from_millis(50)).await;

// Cleanup works.
assert_server_state(&mut test_server, &pid, "idle").await;
}

async fn assert_server_state(conn: &mut Server, pid: &str, expected: &str) {
let response: Vec<String> = conn
.fetch_all(format!(
"SELECT state::text FROM pg_stat_activity WHERE pid = {}",
pid
))
.await
.unwrap();
assert_eq!(response[0], expected);
}
1 change: 1 addition & 0 deletions pgdog/src/frontend/client/query_engine/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ mod extended;
mod fatal_error;
mod graceful_disconnect;
mod graceful_shutdown;
mod idle_in_transaction_recovery;
mod lock_session;
mod omni;
pub mod prelude;
Expand Down
13 changes: 11 additions & 2 deletions pgdog/src/frontend/client/test/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ pub struct TestClient {
pub(crate) client: Client,
pub(crate) engine: QueryEngine,
pub(crate) conn: TcpStream,
pub(crate) leak_pool: bool,
}

impl TestClient {
Expand All @@ -119,13 +120,14 @@ impl TestClient {
///
/// Config needs to be loaded.
///
async fn new(params: Parameters) -> Self {
pub(crate) async fn new(params: Parameters) -> Self {
let (conn, client) = new_client_pair(params).await;

Self {
conn,
engine: QueryEngine::from_client(&client).expect("create query engine from client"),
client,
leak_pool: false,
}
}

Expand All @@ -135,6 +137,11 @@ impl TestClient {
Self::new(params).await
}

pub(crate) fn leak_pool(mut self) -> Self {
self.leak_pool = true;
self
}

/// New client with replicas but not sharded.
pub(crate) async fn new_replicas(params: Parameters) -> Self {
load_test_replicas();
Expand Down Expand Up @@ -243,7 +250,9 @@ impl TestClient {

impl Drop for TestClient {
fn drop(&mut self) {
shutdown();
if !self.leak_pool {
shutdown();
}
}
}

Expand Down
Loading