Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions src/cloudsync.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@
#define CLOUDSYNC_INIT_NTABLES 64
#define CLOUDSYNC_MIN_DB_VERSION 0

#define CLOUDSYNC_PAYLOAD_SKIP_SCHEMA_HASH_CHECK 1
#define CLOUDSYNC_PAYLOAD_MINBUF_SIZE (512*1024)
#define CLOUDSYNC_PAYLOAD_SIGNATURE 0x434C5359 /* 'C','L','S','Y' */
#define CLOUDSYNC_PAYLOAD_VERSION_ORIGNAL 1
#define CLOUDSYNC_PAYLOAD_VERSION_1 CLOUDSYNC_PAYLOAD_VERSION_ORIGNAL
#define CLOUDSYNC_PAYLOAD_VERSION_2 2
#define CLOUDSYNC_PAYLOAD_VERSION_LATEST CLOUDSYNC_PAYLOAD_VERSION_2
#define CLOUDSYNC_PAYLOAD_MIN_VERSION_WITH_CHECKSUM CLOUDSYNC_PAYLOAD_VERSION_2

#ifndef MAX
Expand All @@ -63,10 +63,6 @@

#define DEBUG_DBERROR(_rc, _fn, _data) do {if (_rc != DBRES_OK) printf("Error in %s: %s\n", _fn, database_errmsg(_data));} while (0)

#if CLOUDSYNC_PAYLOAD_SKIP_SCHEMA_HASH_CHECK
bool schema_hash_disabled = true;
#endif

typedef enum {
CLOUDSYNC_PK_INDEX_TBL = 0,
CLOUDSYNC_PK_INDEX_PK = 1,
Expand Down Expand Up @@ -2263,15 +2259,17 @@ int cloudsync_payload_apply (cloudsync_context *data, const char *payload, int b
header.nrows = ntohl(header.nrows);
header.schema_hash = ntohll(header.schema_hash);

#if !CLOUDSYNC_PAYLOAD_SKIP_SCHEMA_HASH_CHECK
if (!data || header.schema_hash != data->schema_hash) {
if (!database_check_schema_hash(data, header.schema_hash)) {
char buffer[1024];
snprintf(buffer, sizeof(buffer), "Cannot apply the received payload because the schema hash is unknown %llu.", header.schema_hash);
return cloudsync_set_error(data, buffer, DBRES_MISUSE);
// compare schema_hash only if not disabled and if the received payload was created with the current header version
// to avoid schema hash mismatch when processed by a peer with a different extension version during software updates.
if (dbutils_settings_get_int64_value(data, CLOUDSYNC_KEY_SKIP_SCHEMA_HASH_CHECK) == 0 && header.version == CLOUDSYNC_PAYLOAD_VERSION_LATEST ) {
if (header.schema_hash != data->schema_hash) {
if (!database_check_schema_hash(data, header.schema_hash)) {
char buffer[1024];
snprintf(buffer, sizeof(buffer), "Cannot apply the received payload because the schema hash is unknown %llu.", header.schema_hash);
return cloudsync_set_error(data, buffer, DBRES_MISUSE);
}
}
}
#endif

// sanity check header
if ((header.signature != CLOUDSYNC_PAYLOAD_SIGNATURE) || (header.ncols == 0)) {
Expand Down
1 change: 1 addition & 0 deletions src/dbutils.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#define CLOUDSYNC_KEY_SCHEMA "schema"
#define CLOUDSYNC_KEY_DEBUG "debug"
#define CLOUDSYNC_KEY_ALGO "algo"
#define CLOUDSYNC_KEY_SKIP_SCHEMA_HASH_CHECK "skip_schema_hash_check"

// settings
int dbutils_settings_init (cloudsync_context *data);
Expand Down
75 changes: 56 additions & 19 deletions src/postgresql/database_postgresql.c
Original file line number Diff line number Diff line change
Expand Up @@ -1612,21 +1612,9 @@ int64_t database_schema_version (cloudsync_context *data) {
}

uint64_t database_schema_hash (cloudsync_context *data) {
char *schema = NULL;
database_select_text(data,
"SELECT string_agg(LOWER(table_name || column_name || data_type), '' ORDER BY table_name, column_name) "
"FROM information_schema.columns WHERE table_schema = COALESCE(cloudsync_schema(), current_schema())",
&schema);

if (!schema) {
elog(INFO, "database_schema_hash: schema is NULL");
return 0;
}

size_t schema_len = strlen(schema);
uint64_t hash = fnv1a_hash(schema, schema_len);
cloudsync_memory_free(schema);
return hash;
int64_t value = 0;
int rc = database_select_int(data, "SELECT hash FROM cloudsync_schema_versions ORDER BY seq DESC LIMIT 1;", &value);
return (rc == DBRES_OK) ? (uint64_t)value : 0;
}

bool database_check_schema_hash (cloudsync_context *data, uint64_t hash) {
Expand All @@ -1639,16 +1627,65 @@ bool database_check_schema_hash (cloudsync_context *data, uint64_t hash) {
}

int database_update_schema_hash (cloudsync_context *data, uint64_t *hash) {
// Build normalized schema string using only: column name (lowercase), type (SQLite affinity), pk flag
// Format: tablename:colname:affinity:pk,... (ordered by table name, then column ordinal position)
// This makes the hash resilient to formatting, quoting, case differences and portable across databases
//
// PostgreSQL type to SQLite affinity mapping:
// - integer, smallint, bigint, boolean → 'integer'
// - bytea → 'blob'
// - real, double precision → 'real'
// - numeric, decimal → 'numeric'
// - Everything else → 'text' (default)
// This includes: text, varchar, char, uuid, timestamp, timestamptz, date, time,
// interval, json, jsonb, inet, cidr, macaddr, geometric types, xml, enums,
// and any custom/extension types. Using 'text' as default ensures compatibility
// since most types serialize to text representation and SQLite stores unknown
// types as TEXT affinity.

char *schema = NULL;
int rc = database_select_text(data,
"SELECT string_agg(LOWER(table_name || column_name || data_type), '' ORDER BY table_name, column_name) "
"FROM information_schema.columns WHERE table_schema = COALESCE(cloudsync_schema(), current_schema())",
"SELECT string_agg("
" LOWER(c.table_name) || ':' || LOWER(c.column_name) || ':' || "
" CASE "
// Integer types (including boolean as 0/1)
" WHEN c.data_type IN ('integer', 'smallint', 'bigint', 'boolean') THEN 'integer' "
// Blob type
" WHEN c.data_type = 'bytea' THEN 'blob' "
// Real/float types
" WHEN c.data_type IN ('real', 'double precision') THEN 'real' "
// Numeric types (explicit precision/scale)
" WHEN c.data_type IN ('numeric', 'decimal') THEN 'numeric' "
// Default to text for everything else:
// - String types: text, character varying, character, name, uuid
// - Date/time: timestamp, date, time, interval, etc.
// - JSON: json, jsonb
// - Network: inet, cidr, macaddr
// - Geometric: point, line, box, etc.
// - Custom/extension types
" ELSE 'text' "
" END || ':' || "
" CASE WHEN kcu.column_name IS NOT NULL THEN '1' ELSE '0' END, "
" ',' ORDER BY c.table_name, c.ordinal_position"
") "
"FROM information_schema.columns c "
"JOIN cloudsync_table_settings cts ON LOWER(c.table_name) = LOWER(cts.tbl_name) "
"LEFT JOIN information_schema.table_constraints tc "
" ON tc.table_name = c.table_name "
" AND tc.table_schema = c.table_schema "
" AND tc.constraint_type = 'PRIMARY KEY' "
"LEFT JOIN information_schema.key_column_usage kcu "
" ON kcu.table_name = c.table_name "
" AND kcu.column_name = c.column_name "
" AND kcu.table_schema = c.table_schema "
" AND kcu.constraint_name = tc.constraint_name "
"WHERE c.table_schema = COALESCE(cloudsync_schema(), current_schema())",
&schema);

if (rc != DBRES_OK || !schema) return cloudsync_set_error(data, "database_update_schema_hash error 1", DBRES_ERROR);

size_t schema_len = strlen(schema);
DEBUG_ALWAYS("database_update_schema_hash len %zu", schema_len);
DEBUG_MERGE("database_update_schema_hash len %zu schema %s", schema_len, schema);
uint64_t h = fnv1a_hash(schema, schema_len);
cloudsync_memory_free(schema);
if (hash && *hash == h) return cloudsync_set_error(data, "database_update_schema_hash constraint", DBRES_CONSTRAINT);
Expand All @@ -1664,7 +1701,7 @@ int database_update_schema_hash (cloudsync_context *data, uint64_t *hash) {
if (rc == DBRES_OK) {
if (hash) *hash = h;
return rc;
}
}

return cloudsync_set_error(data, "database_update_schema_hash error 2", DBRES_ERROR);
}
Expand Down
125 changes: 112 additions & 13 deletions src/sqlite/database_sqlite.c
Original file line number Diff line number Diff line change
Expand Up @@ -803,25 +803,124 @@ bool database_check_schema_hash (cloudsync_context *data, uint64_t hash) {
}

int database_update_schema_hash (cloudsync_context *data, uint64_t *hash) {
char *schemasql = "SELECT group_concat(LOWER(sql)) FROM sqlite_master "
"WHERE type = 'table' AND name IN (SELECT tbl_name FROM cloudsync_table_settings ORDER BY tbl_name) "
"ORDER BY name;";

// Build normalized schema string using only: column name (lowercase), type (SQLite affinity), pk flag
// Format: tablename:colname:affinity:pk,... (ordered by table name, then column id)
// This makes the hash resilient to formatting, quoting, case differences and portable across databases
//
// Type mapping (simplified from SQLite affinity rules for cross-database compatibility):
// - Types containing 'INT' → 'integer'
// - Types containing 'CHAR', 'CLOB', 'TEXT' → 'text'
// - Types containing 'BLOB' or empty → 'blob'
// - Types containing 'REAL', 'FLOA', 'DOUB' → 'real'
// - Types exactly 'NUMERIC' or 'DECIMAL' → 'numeric'
// - Everything else → 'text' (default)
//
// NOTE: This deviates from SQLite's actual affinity rules where unknown types get NUMERIC affinity.
// We use 'text' as default to improve cross-database compatibility with PostgreSQL, where types
// like TIMESTAMPTZ, UUID, JSON, etc. are commonly used and map to 'text' in the PostgreSQL
// implementation. This ensures schemas with PostgreSQL-specific type names in SQLite DDL
// will hash consistently across both databases.
sqlite3 *db = (sqlite3 *)cloudsync_db(data);

char **tables = NULL;
int ntables, tcols;
int rc = sqlite3_get_table(db, "SELECT DISTINCT tbl_name FROM cloudsync_table_settings ORDER BY tbl_name;",
&tables, &ntables, &tcols, NULL);
if (rc != SQLITE_OK || ntables == 0) {
if (tables) sqlite3_free_table(tables);
return SQLITE_ERROR;
}

char *schema = NULL;
int rc = database_select_text(data, schemasql, &schema);
if (rc != DBRES_OK) return rc;
if (!schema) return DBRES_ERROR;

uint64_t h = fnv1a_hash(schema, strlen(schema));
size_t schema_len = 0;
size_t schema_cap = 0;

for (int t = 1; t <= ntables; t++) {
const char *tbl_name = tables[t];

// Query pragma_table_info for this table with normalized type
char *col_sql = cloudsync_memory_mprintf(
"SELECT LOWER(name), "
"CASE "
" WHEN UPPER(type) LIKE '%%INT%%' THEN 'integer' "
" WHEN UPPER(type) LIKE '%%CHAR%%' OR UPPER(type) LIKE '%%CLOB%%' OR UPPER(type) LIKE '%%TEXT%%' THEN 'text' "
" WHEN UPPER(type) LIKE '%%BLOB%%' OR type = '' THEN 'blob' "
" WHEN UPPER(type) LIKE '%%REAL%%' OR UPPER(type) LIKE '%%FLOA%%' OR UPPER(type) LIKE '%%DOUB%%' THEN 'real' "
" WHEN UPPER(type) IN ('NUMERIC', 'DECIMAL') THEN 'numeric' "
" ELSE 'text' "
"END, "
"CASE WHEN pk > 0 THEN '1' ELSE '0' END "
"FROM pragma_table_info('%q') ORDER BY cid;", tbl_name);

if (!col_sql) {
if (schema) cloudsync_memory_free(schema);
sqlite3_free_table(tables);
return SQLITE_NOMEM;
}

char **cols = NULL;
int nrows, ncols;
rc = sqlite3_get_table(db, col_sql, &cols, &nrows, &ncols, NULL);
cloudsync_memory_free(col_sql);

if (rc != SQLITE_OK || ncols != 3) {
if (cols) sqlite3_free_table(cols);
if (schema) cloudsync_memory_free(schema);
sqlite3_free_table(tables);
return SQLITE_ERROR;
}

// Append each column: tablename:colname:affinity:pk
for (int r = 1; r <= nrows; r++) {
const char *col_name = cols[r * 3];
const char *col_type = cols[r * 3 + 1];
const char *col_pk = cols[r * 3 + 2];

// Calculate required size: tbl_name:col_name:col_type:col_pk,
size_t entry_len = strlen(tbl_name) + 1 + strlen(col_name) + 1 + strlen(col_type) + 1 + strlen(col_pk) + 1;

if (schema_len + entry_len + 1 > schema_cap) {
schema_cap = (schema_cap == 0) ? 1024 : schema_cap * 2;
if (schema_cap < schema_len + entry_len + 1) schema_cap = schema_len + entry_len + 1;
char *new_schema = cloudsync_memory_realloc(schema, schema_cap);
if (!new_schema) {
if (schema) cloudsync_memory_free(schema);
sqlite3_free_table(cols);
sqlite3_free_table(tables);
return SQLITE_NOMEM;
}
schema = new_schema;
}

int written = snprintf(schema + schema_len, schema_cap - schema_len, "%s:%s:%s:%s,",
tbl_name, col_name, col_type, col_pk);
schema_len += written;
}

sqlite3_free_table(cols);
}

sqlite3_free_table(tables);

if (!schema || schema_len == 0) return SQLITE_ERROR;

// Remove trailing comma
if (schema_len > 0 && schema[schema_len - 1] == ',') {
schema[schema_len - 1] = '\0';
schema_len--;
}

DEBUG_MERGE("database_update_schema_hash len %zu schema %s", schema_len, schema);
sqlite3_uint64 h = fnv1a_hash(schema, schema_len);
cloudsync_memory_free(schema);
if (hash && *hash == h) return SQLITE_CONSTRAINT;

char sql[1024];
snprintf(sql, sizeof(sql), "INSERT INTO cloudsync_schema_versions (hash, seq) "
"VALUES (%" PRIu64 ", COALESCE((SELECT MAX(seq) FROM cloudsync_schema_versions), 0) + 1) "
"VALUES (%lld, COALESCE((SELECT MAX(seq) FROM cloudsync_schema_versions), 0) + 1) "
"ON CONFLICT(hash) DO UPDATE SET "
"seq = (SELECT COALESCE(MAX(seq), 0) + 1 FROM cloudsync_schema_versions);", h);
rc = database_exec(data, sql);
" seq = (SELECT COALESCE(MAX(seq), 0) + 1 FROM cloudsync_schema_versions);", (sqlite3_int64)h);
rc = sqlite3_exec(db, sql, NULL, NULL, NULL);
if (rc == SQLITE_OK && hash) *hash = h;
return rc;
}
Expand Down
9 changes: 3 additions & 6 deletions test/unit.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
extern char *OUT_OF_MEMORY_BUFFER;
extern bool force_vtab_filter_abort;
extern bool force_uncompressed_blob;
extern bool schema_hash_disabled;

void dbvm_reset (dbvm_t *stmt);
int dbvm_count (dbvm_t *stmt, const char *value, size_t len, int type);
Expand Down Expand Up @@ -4511,11 +4510,9 @@ bool do_test_merge_alter_schema_1 (int nclients, bool print_result, bool cleanup
do_insert(db[0], TEST_PRIKEYS, NINSERT, print_result);

// merge changes from db0 to db1, it should fail because db0 has a newer schema hash
if (!schema_hash_disabled) {
// perform the test ONLY if schema hash is enabled
if (do_merge_using_payload(db[0], db[1], only_locals, false) == true) {
return false;
}
// perform the test ONLY if schema hash is enabled
if (do_merge_using_payload(db[0], db[1], only_locals, false) == true) {
return false;
}

// augment TEST_NOCOLS also on db1
Expand Down