Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions .schema/pgdog.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1173,6 +1173,11 @@
],
"format": "float"
},
"level": {
"description": "What kind of statements to replicate.",
"$ref": "#/$defs/MirroringLevel",
"default": "all"
},
"queue_length": {
"description": "The length of the queue to provision for mirrored transactions. See [mirroring](https://docs.pgdog.dev/features/mirroring/) for more details. This overrides the [`mirror_queue`](https://docs.pgdog.dev/configuration/pgdog.toml/general/#mirror_queue) setting.\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/mirroring/#queue_depth",
"type": [
Expand All @@ -1193,6 +1198,25 @@
"destination_db"
]
},
"MirroringLevel": {
"oneOf": [
{
"description": "Replicate all statements.",
"type": "string",
"const": "all"
},
{
"description": "Only DML (e.g., insert, update, delete, etc),",
"type": "string",
"const": "dml"
},
{
"description": "Only DDL (CREATE, DROP, etc.)",
"type": "string",
"const": "ddl"
}
]
},
"MultiTenant": {
"description": "multi-tenant routing configuration, mapping queries to shards via a tenant identifier column.",
"type": "object",
Expand Down
4 changes: 0 additions & 4 deletions integration/mirror/dev.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,3 @@ mkdir -p ${GEM_HOME}
bundle install
bundle exec rspec *_spec.rb
popd

pushd ${SCRIPT_DIR}/php
bash run.sh
popd
12 changes: 12 additions & 0 deletions integration/mirror/pgdog.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[general]
mirror_exposure = 1.0
openmetrics_port = 9090
query_parser = "on"

[rewrite]
enabled = false
Expand All @@ -16,9 +17,20 @@ name = "pgdog_mirror"
host = "127.0.0.1"
database_name = "pgdog1"

[[databases]]
name = "pgdog_mirror2"
host = "127.0.0.1"
database_name = "pgdog2"

[[mirroring]]
source_db = "pgdog"
destination_db = "pgdog_mirror"


[[mirroring]]
source_db = "pgdog"
destination_db = "pgdog_mirror2"
level = "ddl"

[admin]
password = "pgdog"
77 changes: 77 additions & 0 deletions integration/mirror/ruby/copy_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,80 @@
conn.exec 'DROP TABLE IF EXISTS mirror_copy_test'
end
end

describe 'ddl-only mirror' do
conn = PG.connect('postgres://pgdog:pgdog@127.0.0.1:6432/pgdog')
ddl_mirror = PG.connect('postgres://pgdog:pgdog@127.0.0.1:6432/pgdog_mirror2')

before do
conn.exec 'DROP TABLE IF EXISTS ddl_mirror_test'
ddl_mirror.exec 'DROP TABLE IF EXISTS ddl_mirror_test'
end

it 'replicates DDL to ddl-only mirror' do
conn.exec 'CREATE TABLE ddl_mirror_test (id BIGINT PRIMARY KEY, value VARCHAR)'
sleep(0.5)

# DDL should be mirrored
result = ddl_mirror.exec "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'ddl_mirror_test')"
expect(result[0]['exists']).to eq('t')
end

it 'does not replicate DML to ddl-only mirror' do
conn.exec 'CREATE TABLE ddl_mirror_test (id BIGINT PRIMARY KEY, value VARCHAR)'
sleep(0.5)

conn.exec "INSERT INTO ddl_mirror_test VALUES (1, 'should not mirror')"
sleep(0.5)

# Table should exist on ddl mirror (DDL was mirrored)
result = ddl_mirror.exec 'SELECT count(*) FROM ddl_mirror_test'
# But no rows (DML was not mirrored)
expect(result[0]['count'].to_i).to eq(0)
end

it 'does not replicate UPDATE to ddl-only mirror' do
conn.exec 'CREATE TABLE ddl_mirror_test (id BIGINT PRIMARY KEY, value VARCHAR)'
sleep(0.5)

# Insert directly into ddl mirror so we can check UPDATE doesn't propagate
ddl_mirror.exec "INSERT INTO ddl_mirror_test VALUES (1, 'original')"

conn.exec "INSERT INTO ddl_mirror_test VALUES (1, 'original')"
conn.exec "UPDATE ddl_mirror_test SET value = 'updated' WHERE id = 1"
sleep(0.5)

result = ddl_mirror.exec 'SELECT value FROM ddl_mirror_test WHERE id = 1'
expect(result[0]['value']).to eq('original')
end

it 'replicates ALTER TABLE to ddl-only mirror' do
conn.exec 'CREATE TABLE ddl_mirror_test (id BIGINT PRIMARY KEY, value VARCHAR)'
sleep(0.5)

conn.exec 'ALTER TABLE ddl_mirror_test ADD COLUMN extra TEXT'
sleep(0.5)

result = ddl_mirror.exec "SELECT column_name FROM information_schema.columns WHERE table_name = 'ddl_mirror_test' AND column_name = 'extra'"
expect(result.ntuples).to eq(1)
end

it 'replicates DROP TABLE to ddl-only mirror' do
conn.exec 'CREATE TABLE ddl_mirror_test (id BIGINT PRIMARY KEY, value VARCHAR)'
sleep(0.5)

result = ddl_mirror.exec "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'ddl_mirror_test')"
expect(result[0]['exists']).to eq('t')

conn.exec 'DROP TABLE ddl_mirror_test'
sleep(0.5)

result = ddl_mirror.exec "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'ddl_mirror_test')"
expect(result[0]['exists']).to eq('f')
end

after do
conn.exec 'DROP TABLE IF EXISTS ddl_mirror_test'
ddl_mirror.exec 'DROP TABLE IF EXISTS ddl_mirror_test'
end
end
6 changes: 6 additions & 0 deletions integration/mirror/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,10 @@ wait_for_pgdog

bash ${SCRIPT_DIR}/dev.sh


pushd ${SCRIPT_DIR}/php
bash run.sh
popd


stop_pgdog
5 changes: 5 additions & 0 deletions integration/mirror/users.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,8 @@ database = "pgdog"
name = "pgdog"
password = "pgdog"
database = "pgdog_mirror"

[[users]]
name = "pgdog"
password = "pgdog"
database = "pgdog_mirror2"
35 changes: 30 additions & 5 deletions pgdog-config/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use super::error::Error;
use super::general::General;
use super::networking::{MultiTenant, Tcp, TlsVerifyMode};
use super::pooling::PoolerMode;
use super::replication::{MirrorConfig, Mirroring, ReplicaLag, Replication};
use super::replication::{MirrorConfig, Mirroring, MirroringLevel, ReplicaLag, Replication};
use super::rewrite::Rewrite;
use super::sharding::{ManualQuery, OmnishardedTables, ShardedMapping, ShardedTable};
use super::users::{Admin, Plugin, ServerAuth, Users};
Expand Down Expand Up @@ -94,15 +94,13 @@ impl ConfigAndUsers {
warn!("admin password has been randomly generated");
}

let mut config_and_users = ConfigAndUsers {
let config_and_users = ConfigAndUsers {
config,
users,
config_path: config_path.to_owned(),
users_path: users_path.to_owned(),
};

config_and_users.check()?;

Ok(config_and_users)
}

Expand Down Expand Up @@ -424,6 +422,7 @@ impl Config {
role: Role,
role_warned: bool,
parser_warned: bool,
mirror_parser_warned: bool,
have_replicas: bool,
sharded: bool,
}
Expand Down Expand Up @@ -471,6 +470,7 @@ impl Config {
role: database.role,
role_warned: false,
parser_warned: false,
mirror_parser_warned: false,
have_replicas: database.role == Role::Replica,
sharded: database.shard > 0,
},
Expand Down Expand Up @@ -517,7 +517,30 @@ impl Config {
}
}

for (database, check) in checks {
for mirror in &self.mirroring {
if mirror.level == MirroringLevel::All {
continue;
}
if let Some(check) = checks.get_mut(&mirror.source_db) {
if check.mirror_parser_warned {
continue;
}
let parser_enabled = match self.general.query_parser {
QueryParserLevel::On => true,
QueryParserLevel::Off => false,
QueryParserLevel::Auto => check.have_replicas || check.sharded,
};
if !parser_enabled {
check.mirror_parser_warned = true;
warn!(
r#"mirroring from "{}" with level "{}" requires the query parser to classify statements, but it won't be enabled, set query_parser = "on""#,
mirror.source_db, mirror.level
);
}
}
}

for (database, check) in &checks {
if !check.have_replicas
&& self.general.read_write_split == ReadWriteSplit::ExcludePrimary
{
Expand Down Expand Up @@ -560,6 +583,7 @@ impl Config {
.map(|m| MirrorConfig {
queue_length: m.queue_length.unwrap_or(self.general.mirror_queue),
exposure: m.exposure.unwrap_or(self.general.mirror_exposure),
level: m.level,
})
}

Expand All @@ -571,6 +595,7 @@ impl Config {
let config = MirrorConfig {
queue_length: mirror.queue_length.unwrap_or(self.general.mirror_queue),
exposure: mirror.exposure.unwrap_or(self.general.mirror_exposure),
level: mirror.level,
};

result
Expand Down
46 changes: 45 additions & 1 deletion pgdog-config/src/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,45 @@ pub struct Mirroring {
///
/// https://docs.pgdog.dev/configuration/pgdog.toml/mirroring/#exposure
pub exposure: Option<f32>,

/// What kind of statements to replicate.
#[serde(default)]
pub level: MirroringLevel,
}

#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, JsonSchema, Copy)]
#[serde(deny_unknown_fields, rename_all = "lowercase")]
pub enum MirroringLevel {
/// Replicate all statements.
#[default]
All,
/// Only DML (e.g., insert, update, delete, etc),
Dml,
/// Only DDL (CREATE, DROP, etc.)
Ddl,
}

impl std::fmt::Display for MirroringLevel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::All => write!(f, "all"),
Self::Dml => write!(f, "dml"),
Self::Ddl => write!(f, "ddl"),
}
}
}

impl FromStr for MirroringLevel {
type Err = ();

fn from_str(value: &str) -> Result<Self, Self::Err> {
match value {
"all" => Ok(Self::All),
"dml" => Ok(Self::Dml),
"ddl" => Ok(Self::Ddl),
_ => Err(()),
}
}
}

impl FromStr for Mirroring {
Expand All @@ -166,6 +205,7 @@ impl FromStr for Mirroring {
let mut destination_db = None;
let mut queue_length = None;
let mut exposure = None;
let mut level = MirroringLevel::default();

for pair in s.split('&') {
let parts: Vec<&str> = pair.split('=').collect();
Expand All @@ -190,6 +230,7 @@ impl FromStr for Mirroring {
.map_err(|_| format!("Invalid exposure: {}", parts[1]))?,
);
}
"level" => level = MirroringLevel::from_str(parts[1]).unwrap_or_default(),
_ => return Err(format!("Unknown parameter: {}", parts[0])),
}
}
Expand All @@ -202,15 +243,18 @@ impl FromStr for Mirroring {
destination_db,
queue_length,
exposure,
level,
})
}
}

/// Runtime mirror configuration with defaults resolved from global settings.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
pub struct MirrorConfig {
/// Effective queue length for this mirror.
pub queue_length: usize,
/// Effective exposure fraction for this mirror.
pub exposure: f32,
/// What kind of statements to mirror.
pub level: MirroringLevel,
}
Loading
Loading