Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
532 changes: 532 additions & 0 deletions .claude/commands/test-sync-roundtrip-rls.md

Large diffs are not rendered by default.

33 changes: 32 additions & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,41 @@ jobs:
path: dist/${{ matrix.name == 'apple-xcframework' && 'CloudSync.*' || 'cloudsync.*'}}
if-no-files-found: error

postgres-test:
runs-on: ubuntu-22.04
name: postgresql build + test
timeout-minutes: 10

steps:

- uses: actions/checkout@v4.2.2

- name: build and start postgresql container
run: make postgres-docker-rebuild

- name: wait for postgresql to be ready
run: |
for i in $(seq 1 30); do
if docker exec cloudsync-postgres pg_isready -U postgres > /dev/null 2>&1; then
echo "PostgreSQL is ready"
exit 0
fi
sleep 2
done
echo "PostgreSQL failed to start within 60s"
docker logs cloudsync-postgres
exit 1

- name: run postgresql tests
run: |
docker exec cloudsync-postgres mkdir -p /tmp/cloudsync/test
docker cp test/postgresql cloudsync-postgres:/tmp/cloudsync/test/postgresql
docker exec cloudsync-postgres psql -U postgres -d postgres -f /tmp/cloudsync/test/postgresql/full_test.sql

release:
runs-on: ubuntu-22.04
name: release
needs: build
needs: [build, postgres-test]
if: github.ref == 'refs/heads/main'

env:
Expand Down
30 changes: 15 additions & 15 deletions docker/Makefile.postgresql
Original file line number Diff line number Diff line change
Expand Up @@ -137,32 +137,32 @@ PG_DOCKER_DB_PASSWORD ?= postgres

# Build Docker image with pre-installed extension
postgres-docker-build:
@echo "Building Docker image via docker-compose (rebuilt when sources change)..."
@echo "Building Docker image via docker compose (rebuilt when sources change)..."
# To force plaintext BuildKit logs, run: make postgres-docker-build DOCKER_BUILD_ARGS="--progress=plain"
cd docker/postgresql && docker-compose build $(DOCKER_BUILD_ARGS)
cd docker/postgresql && docker compose build $(DOCKER_BUILD_ARGS)
@echo ""
@echo "Docker image built successfully!"

# Build Docker image with AddressSanitizer enabled (override compose file)
postgres-docker-build-asan:
@echo "Building Docker image with ASAN via docker-compose..."
@echo "Building Docker image with ASAN via docker compose..."
# To force plaintext BuildKit logs, run: make postgres-docker-build-asan DOCKER_BUILD_ARGS=\"--progress=plain\"
cd docker/postgresql && docker-compose -f docker-compose.debug.yml -f docker-compose.asan.yml build $(DOCKER_BUILD_ARGS)
cd docker/postgresql && docker compose -f docker-compose.debug.yml -f docker-compose.asan.yml build $(DOCKER_BUILD_ARGS)
@echo ""
@echo "ASAN Docker image built successfully!"

# Build Docker image using docker-compose.debug.yml
postgres-docker-debug-build:
@echo "Building debug Docker image via docker-compose..."
@echo "Building debug Docker image via docker compose..."
# To force plaintext BuildKit logs, run: make postgres-docker-debug-build DOCKER_BUILD_ARGS=\"--progress=plain\"
cd docker/postgresql && docker-compose -f docker-compose.debug.yml build $(DOCKER_BUILD_ARGS)
cd docker/postgresql && docker compose -f docker-compose.debug.yml build $(DOCKER_BUILD_ARGS)
@echo ""
@echo "Debug Docker image built successfully!"

# Run PostgreSQL container with CloudSync
postgres-docker-run:
@echo "Starting PostgreSQL with CloudSync..."
cd docker/postgresql && docker-compose up -d --build
cd docker/postgresql && docker compose up -d --build
@echo ""
@echo "Container started successfully!"
@echo ""
Expand All @@ -179,7 +179,7 @@ postgres-docker-run:
# Run PostgreSQL container with CloudSync and AddressSanitizer enabled
postgres-docker-run-asan:
@echo "Starting PostgreSQL with CloudSync (ASAN enabled)..."
cd docker/postgresql && docker-compose -f docker-compose.debug.yml -f docker-compose.asan.yml up -d --build
cd docker/postgresql && docker compose -f docker-compose.debug.yml -f docker-compose.asan.yml up -d --build
@echo ""
@echo "Container started successfully!"
@echo ""
Expand All @@ -196,7 +196,7 @@ postgres-docker-run-asan:
# Run PostgreSQL container using docker-compose.debug.yml
postgres-docker-debug-run:
@echo "Starting PostgreSQL with CloudSync (debug compose)..."
cd docker/postgresql && docker-compose -f docker-compose.debug.yml up -d --build
cd docker/postgresql && docker compose -f docker-compose.debug.yml up -d --build
@echo ""
@echo "Container started successfully!"
@echo ""
Expand All @@ -213,21 +213,21 @@ postgres-docker-debug-run:
# Stop PostgreSQL container
postgres-docker-stop:
@echo "Stopping PostgreSQL container..."
cd docker/postgresql && docker-compose down
cd docker/postgresql && docker compose down
@echo "Container stopped"

# Rebuild and restart container
postgres-docker-rebuild: postgres-docker-build
@echo "Rebuilding and restarting container..."
cd docker/postgresql && docker-compose down
cd docker/postgresql && docker-compose up -d --build
cd docker/postgresql && docker compose down
cd docker/postgresql && docker compose up -d --build
@echo "Container restarted with new image"

# Rebuild and restart container using docker-compose.debug.yml
postgres-docker-debug-rebuild: postgres-docker-debug-build
@echo "Rebuilding and restarting debug container..."
cd docker/postgresql && docker-compose -f docker-compose.debug.yml down
cd docker/postgresql && docker-compose -f docker-compose.debug.yml up -d --build
cd docker/postgresql && docker compose -f docker-compose.debug.yml down
cd docker/postgresql && docker compose -f docker-compose.debug.yml up -d --build
@echo "Debug container restarted with new image"

# Interactive shell in container
Expand Down Expand Up @@ -353,5 +353,5 @@ postgres-help:
# Simple smoke test: rebuild image/container, create extension, and query version
unittest-pg: postgres-docker-rebuild
@echo "Running PostgreSQL extension smoke test..."
cd docker/postgresql && docker-compose exec -T postgres psql -U postgres -d cloudsync_test -f /tmp/cloudsync/docker/postgresql/smoke_test.sql
cd docker/postgresql && docker compose exec -T postgres psql -U postgres -d cloudsync_test -f /tmp/cloudsync/docker/postgresql/smoke_test.sql
@echo "Smoke test completed."
22 changes: 12 additions & 10 deletions src/cloudsync.c
Original file line number Diff line number Diff line change
Expand Up @@ -1208,18 +1208,20 @@ int merge_insert_col (cloudsync_context *data, cloudsync_table_context *table, c
return rc;
}

// bind value
// bind value (always bind all expected parameters for correct prepared statement handling)
if (col_value) {
rc = databasevm_bind_value(vm, table->npks+1, col_value);
if (rc == DBRES_OK) rc = databasevm_bind_value(vm, table->npks+2, col_value);
if (rc != DBRES_OK) {
cloudsync_set_dberror(data);
dbvm_reset(vm);
return rc;
}

} else {
rc = databasevm_bind_null(vm, table->npks+1);
if (rc == DBRES_OK) rc = databasevm_bind_null(vm, table->npks+2);
}

if (rc != DBRES_OK) {
cloudsync_set_dberror(data);
dbvm_reset(vm);
return rc;
}

// perform real operation and disable triggers

// in case of GOS we reused the table->col_merge_stmt statement
Expand Down Expand Up @@ -2444,8 +2446,8 @@ int cloudsync_payload_get (cloudsync_context *data, char **blob, int *blob_size,

// retrieve BLOB
char sql[1024];
snprintf(sql, sizeof(sql), "WITH max_db_version AS (SELECT MAX(db_version) AS max_db_version FROM cloudsync_changes) "
"SELECT * FROM (SELECT cloudsync_payload_encode(tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq) AS payload, max_db_version AS max_db_version, MAX(IIF(db_version = max_db_version, seq, NULL)) FROM cloudsync_changes, max_db_version WHERE site_id=cloudsync_siteid() AND (db_version>%d OR (db_version=%d AND seq>%d))) WHERE payload IS NOT NULL", *db_version, *db_version, *seq);
snprintf(sql, sizeof(sql), "WITH max_db_version AS (SELECT MAX(db_version) AS max_db_version FROM cloudsync_changes WHERE site_id=cloudsync_siteid()) "
"SELECT * FROM (SELECT cloudsync_payload_encode(tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq) AS payload, max_db_version AS max_db_version, MAX(IIF(db_version = max_db_version, seq, 0)) FROM cloudsync_changes, max_db_version WHERE site_id=cloudsync_siteid() AND (db_version>%d OR (db_version=%d AND seq>%d))) WHERE payload IS NOT NULL", *db_version, *db_version, *seq);

int64_t len = 0;
int rc = database_select_blob_2int(data, sql, blob, &len, new_db_version, new_seq);
Expand Down
2 changes: 1 addition & 1 deletion src/cloudsync.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
extern "C" {
#endif

#define CLOUDSYNC_VERSION "0.9.101"
#define CLOUDSYNC_VERSION "0.9.102"
#define CLOUDSYNC_MAX_TABLENAME_LEN 512

#define CLOUDSYNC_VALUE_NOTSET -1
Expand Down
18 changes: 18 additions & 0 deletions src/postgresql/cloudsync--1.0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,21 @@ CREATE OR REPLACE FUNCTION cloudsync_table_schema(table_name text)
RETURNS text
AS 'MODULE_PATHNAME', 'pg_cloudsync_table_schema'
LANGUAGE C VOLATILE;

-- ============================================================================
-- Type Casts
-- ============================================================================

-- Cast function: converts bigint to boolean (0 = false, non-zero = true)
-- Required because BOOLEAN values are encoded as INT8 in sync payloads,
-- but PostgreSQL has no built-in cast from bigint to boolean.
CREATE FUNCTION cloudsync_int8_to_bool(bigint) RETURNS boolean AS $$
SELECT $1 <> 0
$$ LANGUAGE SQL IMMUTABLE STRICT;

-- ASSIGNMENT cast: auto-applies in INSERT/UPDATE context only
-- This enables BOOLEAN column sync where values are encoded as INT8.
-- Using ASSIGNMENT (not IMPLICIT) to avoid unintended conversions in WHERE clauses.
CREATE CAST (bigint AS boolean)
WITH FUNCTION cloudsync_int8_to_bool(bigint)
AS ASSIGNMENT;
89 changes: 87 additions & 2 deletions src/postgresql/cloudsync_postgresql.c
Original file line number Diff line number Diff line change
Expand Up @@ -1638,6 +1638,77 @@ static int cloudsync_decode_value_cb (void *xdata, int index, int type, int64_t
return DBRES_OK;
}

// Map a column Oid to the decoded type Oid that would be used for non-NULL values.
// This ensures NULL and non-NULL values use consistent types for SPI plan caching.
// The mapping must match pgvalue_dbtype() in pgvalue.c which determines encode/decode types.
// For example, INT4OID columns decode to INT8OID, UUIDOID columns decode to TEXTOID.
static Oid map_column_oid_to_decoded_oid(Oid col_oid) {
switch (col_oid) {
// Integer types → INT8OID (all integers decode to int64)
// Must match DBTYPE_INTEGER cases in pgvalue_dbtype()
case INT2OID:
case INT4OID:
case INT8OID:
case BOOLOID: // BOOLEAN encodes/decodes as INTEGER
case CHAROID: // "char" encodes/decodes as INTEGER
case OIDOID: // OID encodes/decodes as INTEGER
return INT8OID;
// Float types → FLOAT8OID (all floats decode to double)
// Must match DBTYPE_FLOAT cases in pgvalue_dbtype()
case FLOAT4OID:
case FLOAT8OID:
case NUMERICOID:
return FLOAT8OID;
// Binary types → BYTEAOID
// Must match DBTYPE_BLOB cases in pgvalue_dbtype()
case BYTEAOID:
return BYTEAOID;
// All other types (text, varchar, uuid, json, date, timestamp, etc.) → TEXTOID
// These all encode/decode as DBTYPE_TEXT
default:
return TEXTOID;
}
}

// Get the Oid of a column from the system catalog.
// Requires SPI to be connected. Returns InvalidOid if not found.
static Oid get_column_oid(const char *schema, const char *table_name, const char *column_name) {
if (!table_name || !column_name) return InvalidOid;

const char *query =
"SELECT a.atttypid "
"FROM pg_attribute a "
"JOIN pg_class c ON c.oid = a.attrelid "
"LEFT JOIN pg_namespace n ON n.oid = c.relnamespace "
"WHERE c.relname = $1 "
"AND a.attname = $2 "
"AND a.attnum > 0 "
"AND NOT a.attisdropped "
"AND (n.nspname = $3 OR $3 IS NULL)";

Oid argtypes[3] = {TEXTOID, TEXTOID, TEXTOID};
Datum values[3];
char nulls[3] = {' ', ' ', schema ? ' ' : 'n'};

values[0] = CStringGetTextDatum(table_name);
values[1] = CStringGetTextDatum(column_name);
values[2] = schema ? CStringGetTextDatum(schema) : (Datum)0;

int ret = SPI_execute_with_args(query, 3, argtypes, values, nulls, true, 1);

pfree(DatumGetPointer(values[0]));
pfree(DatumGetPointer(values[1]));
if (schema) pfree(DatumGetPointer(values[2]));

if (ret != SPI_OK_SELECT || SPI_processed == 0) return InvalidOid;

bool isnull;
Datum col_oid = SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &isnull);
if (isnull) return InvalidOid;

return DatumGetObjectId(col_oid);
}

// Decode encoded bytea into a pgvalue_t with the decoded base type.
// Type casting to the target column type is handled by the SQL statement.
static pgvalue_t *cloudsync_decode_bytea_to_pgvalue (bytea *encoded, bool *out_isnull) {
Expand Down Expand Up @@ -2247,9 +2318,23 @@ Datum cloudsync_changes_insert_trigger (PG_FUNCTION_ARGS) {
if (SPI_connect() != SPI_OK_CONNECT) ereport(ERROR, (errmsg("cloudsync: SPI_connect failed in trigger")));
spi_connected = true;

// Decode value to base type; SQL statement handles type casting via $n::typename
// Decode value to base type; SQL statement handles type casting via $n::typename.
// For non-NULL values, we get the decoded base type (INT8OID for integers, TEXTOID for text/UUID, etc).
// For NULL values, we must use the SAME decoded type that non-NULL values would use.
// This ensures type consistency across all calls, as SPI caches parameter types on first prepare.
if (!is_tombstone) {
col_value = cloudsync_decode_bytea_to_pgvalue(insert_value_encoded, NULL);
bool value_is_null = false;
col_value = cloudsync_decode_bytea_to_pgvalue(insert_value_encoded, &value_is_null);

// When value is NULL, create a typed NULL pgvalue with the decoded type.
// We map the column's actual Oid to the corresponding decoded Oid (e.g., INT4OID → INT8OID).
if (!col_value && value_is_null) {
Oid col_oid = get_column_oid(table_schema(table), insert_tbl, insert_name);
if (OidIsValid(col_oid)) {
Oid decoded_oid = map_column_oid_to_decoded_oid(col_oid);
col_value = pgvalue_create((Datum)0, decoded_oid, -1, InvalidOid, true);
}
}
}

int rc = DBRES_OK;
Expand Down
5 changes: 3 additions & 2 deletions src/postgresql/database_postgresql.c
Original file line number Diff line number Diff line change
Expand Up @@ -2140,7 +2140,7 @@ int databasevm_bind_null (dbvm_t *vm, int index) {

pg_stmt_t *stmt = (pg_stmt_t*)vm;
stmt->values[idx] = (Datum)0;
stmt->types[idx] = BYTEAOID;
stmt->types[idx] = TEXTOID; // TEXTOID has casts to most types
stmt->nulls[idx] = 'n';

if (stmt->nparams < idx + 1) stmt->nparams = idx + 1;
Expand Down Expand Up @@ -2185,7 +2185,8 @@ int databasevm_bind_value (dbvm_t *vm, int index, dbvalue_t *value) {
pgvalue_t *v = (pgvalue_t *)value;
if (!v || v->isnull) {
stmt->values[idx] = (Datum)0;
stmt->types[idx] = TEXTOID;
// Use the actual column type if available, otherwise default to TEXTOID
stmt->types[idx] = (v && OidIsValid(v->typeid)) ? v->typeid : TEXTOID;
stmt->nulls[idx] = 'n';
} else {
int16 typlen;
Expand Down
2 changes: 1 addition & 1 deletion test/postgresql/01_unittest.sql
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ SELECT cloudsync_version() AS version \gset

-- Test uuid generation
SELECT cloudsync_uuid() AS uuid1 \gset
SELECT pg_sleep(0.1);
SELECT pg_sleep(0.1) \gset
SELECT cloudsync_uuid() AS uuid2 \gset

-- Test 1: Format check (UUID v7 has standard format: xxxxxxxx-xxxx-7xxx-xxxx-xxxxxxxxxxxx)
Expand Down
Loading