Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,5 @@ jniLibs/

# System
.DS_Store
Thumbs.db
Thumbs.db
CLAUDE.md
114 changes: 107 additions & 7 deletions src/cloudsync.c
Original file line number Diff line number Diff line change
Expand Up @@ -1792,13 +1792,104 @@ int cloudsync_commit_alter (cloudsync_context *data, const char *table_name) {
return rc;
}

// MARK: - Filter Rewrite -

// Replace bare column names in a filter expression with prefix-qualified names.
// E.g., filter="user_id = 42", prefix="NEW", columns=["user_id","id"] → "NEW.\"user_id\" = 42"
// Columns must be sorted by length descending by the caller to avoid partial matches.
// Skips content inside single-quoted string literals.
// Returns a newly allocated string (caller must free with cloudsync_memory_free), or NULL on error.
// Helper: check if an identifier token matches a column name.
static bool filter_is_column (const char *token, size_t token_len, char **columns, int ncols) {
for (int i = 0; i < ncols; ++i) {
if (strlen(columns[i]) == token_len && strncmp(token, columns[i], token_len) == 0)
return true;
}
return false;
}

// Helper: check if character is part of a SQL identifier.
static bool filter_is_ident_char (char c) {
return (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') ||
(c >= '0' && c <= '9') || c == '_';
}

char *cloudsync_filter_add_row_prefix (const char *filter, const char *prefix, char **columns, int ncols) {
if (!filter || !prefix || !columns || ncols <= 0) return NULL;

size_t filter_len = strlen(filter);
size_t prefix_len = strlen(prefix);

// Each identifier match grows by at most (prefix_len + 3) bytes.
// Worst case: the entire filter is one repeated column reference separated by
// single characters, so up to (filter_len / 2) matches. Use a safe upper bound.
size_t max_growth = (filter_len / 2 + 1) * (prefix_len + 3);
size_t cap = filter_len + max_growth + 64;
char *result = (char *)cloudsync_memory_alloc(cap);
if (!result) return NULL;
size_t out = 0;

// Single pass: tokenize into identifiers, quoted strings, and everything else.
size_t i = 0;
while (i < filter_len) {
// Skip single-quoted string literals verbatim (handle '' escape)
if (filter[i] == '\'') {
result[out++] = filter[i++];
while (i < filter_len) {
if (filter[i] == '\'') {
result[out++] = filter[i++];
// '' is an escaped quote — keep going
if (i < filter_len && filter[i] == '\'') {
result[out++] = filter[i++];
continue;
}
break; // single ' ends the literal
}
result[out++] = filter[i++];
}
continue;
}

// Extract identifier token
if (filter_is_ident_char(filter[i])) {
size_t start = i;
while (i < filter_len && filter_is_ident_char(filter[i])) ++i;
size_t token_len = i - start;

if (filter_is_column(&filter[start], token_len, columns, ncols)) {
// Emit PREFIX."column_name"
memcpy(&result[out], prefix, prefix_len); out += prefix_len;
result[out++] = '.';
result[out++] = '"';
memcpy(&result[out], &filter[start], token_len); out += token_len;
result[out++] = '"';
} else {
// Not a column — copy as-is
memcpy(&result[out], &filter[start], token_len); out += token_len;
}
continue;
}

// Any other character — copy as-is
result[out++] = filter[i++];
}

result[out] = '\0';
return result;
}

int cloudsync_refill_metatable (cloudsync_context *data, const char *table_name) {
cloudsync_table_context *table = table_lookup(data, table_name);
if (!table) return DBRES_ERROR;

dbvm_t *vm = NULL;
int64_t db_version = cloudsync_dbversion_next(data, CLOUDSYNC_VALUE_NOTSET);

// Read row-level filter from settings (if any)
char filter_buf[2048];
int frc = dbutils_table_settings_get_value(data, table_name, "*", "filter", filter_buf, sizeof(filter_buf));
const char *filter = (frc == DBRES_OK && filter_buf[0]) ? filter_buf : NULL;

const char *schema = table->schema ? table->schema : "";
char *sql = sql_build_pk_collist_query(schema, table_name);
char *pkclause_identifiers = NULL;
Expand All @@ -1808,18 +1899,22 @@ int cloudsync_refill_metatable (cloudsync_context *data, const char *table_name)
char *pkvalues_identifiers = (pkclause_identifiers) ? pkclause_identifiers : "rowid";

// Use database-specific query builder to handle type differences in composite PKs
sql = sql_build_insert_missing_pks_query(schema, table_name, pkvalues_identifiers, table->base_ref, table->meta_ref);
sql = sql_build_insert_missing_pks_query(schema, table_name, pkvalues_identifiers, table->base_ref, table->meta_ref, filter);
if (!sql) {rc = DBRES_NOMEM; goto finalize;}
rc = database_exec(data, sql);
cloudsync_memory_free(sql);
if (rc != DBRES_OK) goto finalize;

// fill missing colums
// for each non-pk column:
// The new query does 1 encode per source row and one indexed NOT-EXISTS probe.
// The old plan does many decodes per candidate and can’t use an index to rule out matches quickly—so it burns CPU and I/O.

sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_PKS_NOT_IN_SYNC_FOR_COL, pkvalues_identifiers, table->base_ref, table->meta_ref);
// The old plan does many decodes per candidate and can't use an index to rule out matches quickly—so it burns CPU and I/O.

if (filter) {
sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_PKS_NOT_IN_SYNC_FOR_COL_FILTERED, pkvalues_identifiers, table->base_ref, filter, table->meta_ref);
} else {
sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_PKS_NOT_IN_SYNC_FOR_COL, pkvalues_identifiers, table->base_ref, table->meta_ref);
}
rc = databasevm_prepare(data, sql, (void **)&vm, DBFLAG_PERSISTENT);
cloudsync_memory_free(sql);
if (rc != DBRES_OK) goto finalize;
Expand Down Expand Up @@ -2723,8 +2818,13 @@ int cloudsync_init_table (cloudsync_context *data, const char *table_name, const
// sync algo with table (unused in this version)
// cloudsync_sync_table_key(data, table_name, "*", CLOUDSYNC_KEY_ALGO, crdt_algo_name(algo_new));

// read row-level filter from settings (if any)
char init_filter_buf[2048];
int init_frc = dbutils_table_settings_get_value(data, table_name, "*", "filter", init_filter_buf, sizeof(init_filter_buf));
const char *init_filter = (init_frc == DBRES_OK && init_filter_buf[0]) ? init_filter_buf : NULL;

// check triggers
rc = database_create_triggers(data, table_name, algo_new);
rc = database_create_triggers(data, table_name, algo_new, init_filter);
if (rc != DBRES_OK) return cloudsync_set_error(data, "An error occurred while creating triggers", DBRES_MISUSE);

// check meta-table
Expand Down
5 changes: 4 additions & 1 deletion src/cloudsync.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
extern "C" {
#endif

#define CLOUDSYNC_VERSION "0.9.102"
#define CLOUDSYNC_VERSION "0.9.110"
#define CLOUDSYNC_MAX_TABLENAME_LEN 512

#define CLOUDSYNC_VALUE_NOTSET -1
Expand Down Expand Up @@ -121,6 +121,9 @@ int local_update_move_meta (cloudsync_table_context *table, const char *pk, size
int merge_insert_col (cloudsync_context *data, cloudsync_table_context *table, const char *pk, int pklen, const char *col_name, dbvalue_t *col_value, int64_t col_version, int64_t db_version, const char *site_id, int site_len, int64_t seq, int64_t *rowid);
int merge_insert (cloudsync_context *data, cloudsync_table_context *table, const char *insert_pk, int insert_pk_len, int64_t insert_cl, const char *insert_name, dbvalue_t *insert_value, int64_t insert_col_version, int64_t insert_db_version, const char *insert_site_id, int insert_site_id_len, int64_t insert_seq, int64_t *rowid);

// filter rewrite
char *cloudsync_filter_add_row_prefix(const char *filter, const char *prefix, char **columns, int ncols);

// decode bind context
char *cloudsync_pk_context_tbl (cloudsync_pk_decode_bind_context *ctx, int64_t *tbl_len);
void *cloudsync_pk_context_pk (cloudsync_pk_decode_bind_context *ctx, int64_t *pk_len);
Expand Down
4 changes: 2 additions & 2 deletions src/database.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ bool database_table_exists (cloudsync_context *data, const char *table_name, con
bool database_internal_table_exists (cloudsync_context *data, const char *name);
bool database_trigger_exists (cloudsync_context *data, const char *table_name);
int database_create_metatable (cloudsync_context *data, const char *table_name);
int database_create_triggers (cloudsync_context *data, const char *table_name, table_algo algo);
int database_create_triggers (cloudsync_context *data, const char *table_name, table_algo algo, const char *filter);
int database_delete_triggers (cloudsync_context *data, const char *table_name);
int database_pk_names (cloudsync_context *data, const char *table_name, char ***names, int *count);
int database_cleanup (cloudsync_context *data);
Expand Down Expand Up @@ -148,7 +148,7 @@ char *sql_build_delete_cols_not_in_schema_query(const char *schema, const char *
char *sql_build_pk_collist_query(const char *schema, const char *table_name);
char *sql_build_pk_decode_selectlist_query(const char *schema, const char *table_name);
char *sql_build_pk_qualified_collist_query(const char *schema, const char *table_name);
char *sql_build_insert_missing_pks_query(const char *schema, const char *table_name, const char *pkvalues_identifiers, const char *base_ref, const char *meta_ref);
char *sql_build_insert_missing_pks_query(const char *schema, const char *table_name, const char *pkvalues_identifiers, const char *base_ref, const char *meta_ref, const char *filter);

char *database_table_schema(const char *table_name);
char *database_build_meta_ref(const char *schema, const char *table_name);
Expand Down
5 changes: 4 additions & 1 deletion src/dbutils.c
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,10 @@ int dbutils_settings_table_load_callback (void *xdata, int ncols, char **values,
if (strcmp(key, "algo")!=0) continue;

table_algo algo = cloudsync_algo_from_name(value);
if (database_create_triggers(data, table_name, algo) != DBRES_OK) return DBRES_MISUSE;
char fbuf[2048];
int frc = dbutils_table_settings_get_value(data, table_name, "*", "filter", fbuf, sizeof(fbuf));
const char *filt = (frc == DBRES_OK && fbuf[0]) ? fbuf : NULL;
if (database_create_triggers(data, table_name, algo, filt) != DBRES_OK) return DBRES_MISUSE;
if (table_add_to_context(data, algo, table_name) == false) return DBRES_MISUSE;

DEBUG_SETTINGS("load tbl_name: %s value: %s", key, value);
Expand Down
12 changes: 12 additions & 0 deletions src/postgresql/cloudsync--1.0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,18 @@ RETURNS boolean
AS 'MODULE_PATHNAME', 'cloudsync_set_table'
LANGUAGE C VOLATILE;

-- Set row-level filter for conditional sync
CREATE OR REPLACE FUNCTION cloudsync_set_filter(table_name text, filter_expr text)
RETURNS boolean
AS 'MODULE_PATHNAME', 'cloudsync_set_filter'
LANGUAGE C VOLATILE;

-- Clear row-level filter
CREATE OR REPLACE FUNCTION cloudsync_clear_filter(table_name text)
RETURNS boolean
AS 'MODULE_PATHNAME', 'cloudsync_clear_filter'
LANGUAGE C VOLATILE;

-- Set column-level configuration
CREATE OR REPLACE FUNCTION cloudsync_set_column(table_name text, column_name text, key text, value text)
RETURNS boolean
Expand Down
119 changes: 119 additions & 0 deletions src/postgresql/cloudsync_postgresql.c
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,125 @@ Datum cloudsync_set_column (PG_FUNCTION_ARGS) {
PG_RETURN_BOOL(true);
}

// MARK: - Row Filter -

// cloudsync_set_filter - Set a row-level filter for conditional sync
PG_FUNCTION_INFO_V1(cloudsync_set_filter);
Datum cloudsync_set_filter (PG_FUNCTION_ARGS) {
if (PG_ARGISNULL(0) || PG_ARGISNULL(1)) {
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cloudsync_set_filter: table and filter expression required")));
}

const char *tbl = text_to_cstring(PG_GETARG_TEXT_PP(0));
const char *filter_expr = text_to_cstring(PG_GETARG_TEXT_PP(1));

cloudsync_context *data = get_cloudsync_context();
bool spi_connected = false;

int spi_rc = SPI_connect();
if (spi_rc != SPI_OK_CONNECT) {
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("SPI_connect failed: %d", spi_rc)));
}
spi_connected = true;

PG_TRY();
{
// Store filter in table settings
dbutils_table_settings_set_key_value(data, tbl, "*", "filter", filter_expr);

// Read current algo
table_algo algo = dbutils_table_settings_get_algo(data, tbl);
if (algo == table_algo_none) algo = table_algo_crdt_cls;

// Drop triggers
database_delete_triggers(data, tbl);

// Reconnect SPI so that the catalog changes from DROP are visible
SPI_finish();
spi_connected = false;
spi_rc = SPI_connect();
if (spi_rc != SPI_OK_CONNECT) {
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("SPI_connect failed: %d", spi_rc)));
}
spi_connected = true;

// Recreate triggers with filter
int rc = database_create_triggers(data, tbl, algo, filter_expr);
if (rc != DBRES_OK) {
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
errmsg("cloudsync_set_filter: error recreating triggers")));
}
}
PG_CATCH();
{
if (spi_connected) SPI_finish();
PG_RE_THROW();
}
PG_END_TRY();

if (spi_connected) SPI_finish();
PG_RETURN_BOOL(true);
}

// cloudsync_clear_filter - Remove the row-level filter for a table
PG_FUNCTION_INFO_V1(cloudsync_clear_filter);
Datum cloudsync_clear_filter (PG_FUNCTION_ARGS) {
if (PG_ARGISNULL(0)) {
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cloudsync_clear_filter: table name required")));
}

const char *tbl = text_to_cstring(PG_GETARG_TEXT_PP(0));

cloudsync_context *data = get_cloudsync_context();
bool spi_connected = false;

int spi_rc = SPI_connect();
if (spi_rc != SPI_OK_CONNECT) {
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("SPI_connect failed: %d", spi_rc)));
}
spi_connected = true;

PG_TRY();
{
// Remove filter from settings
dbutils_table_settings_set_key_value(data, tbl, "*", "filter", NULL);

// Read current algo
table_algo algo = dbutils_table_settings_get_algo(data, tbl);
if (algo == table_algo_none) algo = table_algo_crdt_cls;

// Drop triggers
database_delete_triggers(data, tbl);

// Reconnect SPI so that the catalog changes from DROP are visible
SPI_finish();
spi_connected = false;
spi_rc = SPI_connect();
if (spi_rc != SPI_OK_CONNECT) {
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("SPI_connect failed: %d", spi_rc)));
}
spi_connected = true;

// Recreate triggers without filter
int rc = database_create_triggers(data, tbl, algo, NULL);
if (rc != DBRES_OK) {
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
errmsg("cloudsync_clear_filter: error recreating triggers")));
}
}
PG_CATCH();
{
if (spi_connected) SPI_finish();
PG_RE_THROW();
}
PG_END_TRY();

if (spi_connected) SPI_finish();
PG_RETURN_BOOL(true);
}

// MARK: - Schema Alteration -

// cloudsync_begin_alter - Begin schema alteration
Expand Down
Loading