Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 28 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions integration/plugins/Gemfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
source 'https://rubygems.org'
gem 'csv'
gem 'pg'
gem 'rspec', '~> 3.4'
2 changes: 2 additions & 0 deletions integration/plugins/Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
GEM
remote: https://rubygems.org/
specs:
csv (3.3.5)
diff-lcs (1.6.1)
pg (1.5.9)
rspec (3.13.0)
Expand All @@ -22,6 +23,7 @@ PLATFORMS
ruby

DEPENDENCIES
csv
pg
rspec (~> 3.4)

Expand Down
80 changes: 79 additions & 1 deletion integration/plugins/extended_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require 'pg'
require 'rspec'
require 'fileutils'
require 'csv'

describe 'extended protocol' do
let(:plugin_marker_file) { File.expand_path('../test-plugins/test-plugin-compatible/route-called.test', __FILE__) }
Expand All @@ -24,6 +25,83 @@
end

# Verify the plugin was actually called
expect(File.exist?(plugin_marker_file)).to be true
# expect(File.exist?(plugin_marker_file)).to be true
end
end

describe 'copy with plugin' do
let(:conn) { PG.connect('postgres://pgdog:pgdog@127.0.0.1:6432/pgdog') }

before do
conn.exec 'DROP TABLE IF EXISTS plugin_copy_test'
conn.exec 'CREATE TABLE plugin_copy_test (id BIGINT PRIMARY KEY, name VARCHAR, email VARCHAR)'
end

after do
conn.exec 'DROP TABLE IF EXISTS plugin_copy_test'
end

it 'can COPY text format through plugin' do
conn.copy_data('COPY plugin_copy_test (id, name, email) FROM STDIN') do
conn.put_copy_data("1\tAlice\talice@test.com\n")
conn.put_copy_data("2\tBob\tbob@test.com\n")
conn.put_copy_data("3\tCharlie\tcharlie@test.com\n")
end

rows = conn.exec 'SELECT * FROM plugin_copy_test ORDER BY id'
expect(rows.ntuples).to eq(3)
expect(rows[0]['name']).to eq('Alice')
expect(rows[1]['name']).to eq('Bob')
expect(rows[2]['name']).to eq('Charlie')
end

it 'can COPY CSV format through plugin' do
conn.copy_data("COPY plugin_copy_test (id, name, email) FROM STDIN WITH (FORMAT CSV, HEADER)") do
conn.put_copy_data(CSV.generate_line(%w[id name email]))
conn.put_copy_data(CSV.generate_line([1, 'Alice', 'alice@test.com']))
conn.put_copy_data(CSV.generate_line([2, 'Bob', 'bob@test.com']))
conn.put_copy_data(CSV.generate_line([3, 'Charlie', 'charlie@test.com']))
end

rows = conn.exec 'SELECT * FROM plugin_copy_test ORDER BY id'
expect(rows.ntuples).to eq(3)
expect(rows[0]['email']).to eq('alice@test.com')
expect(rows[2]['email']).to eq('charlie@test.com')
end

it 'can COPY CSV with custom delimiter through plugin' do
conn.copy_data("COPY plugin_copy_test (id, name, email) FROM STDIN WITH (FORMAT CSV, DELIMITER '|')") do
conn.put_copy_data("1|Alice|alice@test.com\n")
conn.put_copy_data("2|Bob|bob@test.com\n")
end

rows = conn.exec 'SELECT * FROM plugin_copy_test ORDER BY id'
expect(rows.ntuples).to eq(2)
expect(rows[0]['name']).to eq('Alice')
expect(rows[1]['email']).to eq('bob@test.com')
end

it 'can COPY with NULL values through plugin' do
conn.copy_data("COPY plugin_copy_test (id, name, email) FROM STDIN WITH (FORMAT CSV, NULL '\\N')") do
conn.put_copy_data("1,Alice,\\N\n")
conn.put_copy_data("2,\\N,bob@test.com\n")
end

rows = conn.exec 'SELECT * FROM plugin_copy_test ORDER BY id'
expect(rows.ntuples).to eq(2)
expect(rows[0]['email']).to be_nil
expect(rows[1]['name']).to be_nil
end

it 'can COPY many rows through plugin' do
conn.copy_data('COPY plugin_copy_test (id, name, email) FROM STDIN') do
1000.times do |i|
conn.put_copy_data("#{i}\tuser_#{i}\tuser_#{i}@test.com\n")
end
end

rows = conn.exec 'SELECT count(*) FROM plugin_copy_test'
expect(rows[0]['count'].to_i).to eq(1000)
end

end
51 changes: 51 additions & 0 deletions pgdog-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,57 @@ pub fn fini(_attr: TokenStream, item: TokenStream) -> TokenStream {
TokenStream::from(expanded)
}

/// Generates the `pgdog_route_copy_row` method for routing COPY rows.
///
/// The decorated function receives a [`PdCopyRow`] and returns a [`Route`].
///
/// ### Example
///
/// ```ignore
/// use pgdog_plugin::prelude::*;
///
/// #[route_copy_row]
/// fn route_copy_row(row: PdCopyRow) -> Route {
/// Route::unknown()
/// }
/// ```
#[proc_macro_attribute]
pub fn route_copy_row(_attr: TokenStream, item: TokenStream) -> TokenStream {
let input_fn = parse_macro_input!(item as ItemFn);
let fn_name = &input_fn.sig.ident;
let fn_inputs = &input_fn.sig.inputs;

let (first_param_name, _) = fn_inputs
.iter()
.filter_map(|input| {
if let syn::FnArg::Typed(pat_type) = input {
if let syn::Pat::Ident(pat_ident) = &*pat_type.pat {
Some((pat_ident.ident.clone(), pat_type.ty.clone()))
} else {
None
}
} else {
None
}
})
.next()
.expect("route_copy_row function must have at least one named parameter");

let expanded = quote! {
#[unsafe(no_mangle)]
pub unsafe extern "C" fn pgdog_route_copy_row(#first_param_name: pgdog_plugin::PdCopyRow, output: *mut pgdog_plugin::PdRoute) {
#input_fn

let route: pgdog_plugin::PdRoute = #fn_name(#first_param_name).into();
unsafe {
*output = route;
}
}
};

TokenStream::from(expanded)
}

/// Generates the `pgdog_route` method for routing queries.
#[proc_macro_attribute]
pub fn route(_attr: TokenStream, item: TokenStream) -> TokenStream {
Expand Down
21 changes: 20 additions & 1 deletion pgdog-plugin/include/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
typedef struct PdStr {
size_t len;
void *data;
} RustString;
} PdStr;

/**
* Wrapper around output by pg_query.
Expand Down Expand Up @@ -37,6 +37,25 @@ typedef struct PdParameters {
void *format_codes;
} PdParameters;

/**
* Wrapper for copy data row.
*/
typedef struct PdCopyRow {
/** Number of shards in the config. */
uint64_t shards;
/** CSV record. */
const void *record;
/** Column names number. */
uint64_t num_columns;
/** Column names */
PdStr *columns;
/** Table name. */
PdStr *table_name;
/** Schema name. Null if not provided. */
PdStr *schema_name;
/** */
} PdCopyRow;

/**
* Context on the database cluster configuration and the currently processed
* PostgreSQL statement.
Expand Down
Loading
Loading