diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index c014397f132e..c50cd3621bc8 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -346,6 +346,9 @@ pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> { def_levels_sink: Vec, rep_levels_sink: Vec, data_pages: VecDeque, + // Reusable buffers for page assembly (avoids per-page allocation) + page_buf: Vec, + compressed_buf: Vec, // column index and offset index column_index_builder: ColumnIndexBuilder, offset_index_builder: Option, @@ -412,6 +415,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { def_levels_sink: vec![], rep_levels_sink: vec![], data_pages: VecDeque::new(), + page_buf: Vec::new(), + compressed_buf: Vec::new(), page_metrics, column_metrics, column_index_builder, @@ -557,7 +562,10 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { /// of the current memory usage and not the final anticipated encoded size. #[cfg(feature = "arrow")] pub(crate) fn memory_size(&self) -> usize { - self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size() + self.column_metrics.total_bytes_written as usize + + self.encoder.estimated_memory_size() + + self.page_buf.capacity() + + self.compressed_buf.capacity() } /// Returns total number of bytes written by this column writer so far. @@ -1048,40 +1056,50 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { let compressed_page = match self.props.writer_version() { WriterVersion::PARQUET_1_0 => { - let mut buffer = vec![]; - - if max_rep_level > 0 { - buffer.extend_from_slice( - &self.encode_levels_v1( - Encoding::RLE, - &self.rep_levels_sink[..], - max_rep_level, - )[..], - ); - } + // Encode levels into locals first to avoid borrow conflict + // (encode_levels_v1 borrows &self, page_buf needs &mut self) + let rep_levels_encoded = if max_rep_level > 0 { + Some(self.encode_levels_v1( + Encoding::RLE, + &self.rep_levels_sink[..], + max_rep_level, + )) + } else { + None + }; + let def_levels_encoded = if max_def_level > 0 { + Some(self.encode_levels_v1( + Encoding::RLE, + &self.def_levels_sink[..], + max_def_level, + )) + } else { + None + }; - if max_def_level > 0 { - buffer.extend_from_slice( - &self.encode_levels_v1( - Encoding::RLE, - &self.def_levels_sink[..], - max_def_level, - )[..], - ); + // Assemble page data into reusable buffer + self.page_buf.clear(); + if let Some(ref levels) = rep_levels_encoded { + self.page_buf.extend_from_slice(levels); } - - buffer.extend_from_slice(&values_data.buf); - let uncompressed_size = buffer.len(); - - if let Some(ref mut cmpr) = self.compressor { - let mut compressed_buf = Vec::with_capacity(uncompressed_size); - cmpr.compress(&buffer[..], &mut compressed_buf)?; - compressed_buf.shrink_to_fit(); - buffer = compressed_buf; + if let Some(ref levels) = def_levels_encoded { + self.page_buf.extend_from_slice(levels); } + self.page_buf.extend_from_slice(&values_data.buf); + let uncompressed_size = self.page_buf.len(); + + let page_bytes = if let Some(ref mut cmpr) = self.compressor { + self.compressed_buf.clear(); + cmpr.compress(&self.page_buf[..], &mut self.compressed_buf)?; + Bytes::copy_from_slice(&self.compressed_buf) + } else { + // Zero-cost ownership transfer instead of memcpy. + // page_buf will regrow on next page (one alloc, same as pre-reuse code). + Bytes::from(std::mem::take(&mut self.page_buf)) + }; let data_page = Page::DataPage { - buf: buffer.into(), + buf: page_bytes, num_values: self.page_metrics.num_buffered_values, encoding: values_data.encoding, def_level_encoding: Encoding::RLE, @@ -1094,18 +1112,28 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { WriterVersion::PARQUET_2_0 => { let mut rep_levels_byte_len = 0; let mut def_levels_byte_len = 0; - let mut buffer = vec![]; - if max_rep_level > 0 { - let levels = self.encode_levels_v2(&self.rep_levels_sink[..], max_rep_level); + // Encode levels into locals first (same borrow-checker rationale as V1) + let rep_levels_encoded = if max_rep_level > 0 { + Some(self.encode_levels_v2(&self.rep_levels_sink[..], max_rep_level)) + } else { + None + }; + let def_levels_encoded = if max_def_level > 0 { + Some(self.encode_levels_v2(&self.def_levels_sink[..], max_def_level)) + } else { + None + }; + + // Assemble page data into reusable buffer + self.page_buf.clear(); + if let Some(ref levels) = rep_levels_encoded { rep_levels_byte_len = levels.len(); - buffer.extend_from_slice(&levels[..]); + self.page_buf.extend_from_slice(levels); } - - if max_def_level > 0 { - let levels = self.encode_levels_v2(&self.def_levels_sink[..], max_def_level); + if let Some(ref levels) = def_levels_encoded { def_levels_byte_len = levels.len(); - buffer.extend_from_slice(&levels[..]); + self.page_buf.extend_from_slice(levels); } let uncompressed_size = @@ -1114,24 +1142,32 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { // Data Page v2 compresses values only. let is_compressed = match self.compressor { Some(ref mut cmpr) => { - let buffer_len = buffer.len(); - cmpr.compress(&values_data.buf, &mut buffer)?; - if uncompressed_size <= buffer.len() - buffer_len { - buffer.truncate(buffer_len); - buffer.extend_from_slice(&values_data.buf); + let buffer_len = self.page_buf.len(); + cmpr.compress(&values_data.buf, &mut self.page_buf)?; + if uncompressed_size <= self.page_buf.len() - buffer_len { + self.page_buf.truncate(buffer_len); + self.page_buf.extend_from_slice(&values_data.buf); false } else { true } } None => { - buffer.extend_from_slice(&values_data.buf); + self.page_buf.extend_from_slice(&values_data.buf); false } }; + let page_bytes = if is_compressed { + // Compressed: copy smaller data, reuse buffer for next page + Bytes::copy_from_slice(&self.page_buf) + } else { + // Uncompressed: zero-cost ownership transfer + Bytes::from(std::mem::take(&mut self.page_buf)) + }; + let data_page = Page::DataPageV2 { - buf: buffer.into(), + buf: page_bytes, num_values: self.page_metrics.num_buffered_values, encoding: values_data.encoding, num_nulls: self.page_metrics.num_page_nulls as u32, @@ -4419,4 +4455,239 @@ mod tests { result.metadata.compressed_size() ); } + + /// Validates that reused page buffers are properly cleared between pages. + /// Writes enough data to produce multiple data pages with compression enabled, + /// then reads back and verifies all data is correct (no stale data leakage). + #[test] + fn test_multi_page_roundtrip_with_compression() { + let props = WriterProperties::builder() + .set_data_page_row_count_limit(1000) + .set_compression(Compression::SNAPPY) + .set_dictionary_enabled(false) + .build(); + + // Write 5000 values -- should produce 5 data pages + let values: Vec = (0..5000).collect(); + column_roundtrip::(props, &values, None, None); + } + + /// Same as above but for the V2 data page path with compression. + #[test] + fn test_multi_page_roundtrip_with_compression_v2() { + let props = WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_2_0) + .set_data_page_row_count_limit(1000) + .set_compression(Compression::SNAPPY) + .set_dictionary_enabled(false) + .build(); + + let values: Vec = (0..5000).collect(); + column_roundtrip::(props, &values, None, None); + } + + /// Multi-page roundtrip with nullable values and compression to test + /// that definition levels are correctly handled across reused buffers. + #[test] + fn test_multi_page_roundtrip_nullable_with_compression() { + let props = WriterProperties::builder() + .set_data_page_row_count_limit(1000) + .set_compression(Compression::SNAPPY) + .set_dictionary_enabled(false) + .build(); + + // 5000 values with every 3rd value being null + let num_levels = 5000; + let def_levels: Vec = (0..num_levels) + .map(|i| if i % 3 == 0 { 0 } else { 1 }) + .collect(); + let num_values = def_levels.iter().filter(|&&d| d == 1).count(); + let values: Vec = (0..num_values as i32).collect(); + + column_roundtrip::(props, &values, Some(&def_levels), None); + } + + /// Reproduces the memory bloat scenario from issue #8526. + /// + /// When writing highly compressible data (~1MB uncompressed pages that compress + /// to ~20KB), the old code allocated a fresh `compressed_buf` per page and called + /// `shrink_to_fit()` on it. With dictionary fallback, many pages could accumulate + /// in the `data_pages` queue, each retaining its full 1MB allocation. This caused + /// 3.6GB RSS for a 550MB file. + /// + /// With buffer reuse (ARS-1), `page_buf` and `compressed_buf` are cleared and + /// reused each page. The `Bytes::copy_from_slice` produces a right-sized copy for + /// the `CompressedPage`, so memory is O(1) buffers regardless of page count. + /// + /// This test writes 100+ pages of highly compressible ByteArray data and asserts + /// that the reusable buffer capacities stay bounded at roughly the uncompressed + /// page size -- they do NOT grow with the number of pages written. + #[test] + fn test_memory_bounded_with_highly_compressible_data() { + // Each value is a 1024-byte string of repeated 'A'. Highly compressible. + let value_size = 1024; + let repeated_string: String = "A".repeat(value_size); + let rows_per_page = 1000; + let num_pages = 100; + let total_rows = rows_per_page * num_pages; + + let values: Vec = (0..total_rows) + .map(|_| ByteArray::from(repeated_string.as_str())) + .collect(); + + let props = Arc::new( + WriterProperties::builder() + .set_data_page_row_count_limit(rows_per_page) + .set_compression(Compression::SNAPPY) + .set_dictionary_enabled(false) + .build(), + ); + + let page_writer = get_test_page_writer(); + let mut writer = get_test_column_writer::(page_writer, 0, 0, props); + + writer.write_batch(&values, None, None).unwrap(); + + // After writing 100 pages, check that reusable buffers are bounded. + // The page_buf capacity should be roughly the size of one uncompressed page + // (~1MB = 1000 rows * 1024 bytes + overhead), NOT 100x that. + let page_buf_cap = writer.page_buf.capacity(); + let compressed_buf_cap = writer.compressed_buf.capacity(); + + // One uncompressed page is ~1MB. Allow generous 2x headroom for encoding overhead. + let one_page_uncompressed = rows_per_page * value_size; + let max_expected_buf = one_page_uncompressed * 3; + + assert!( + page_buf_cap <= max_expected_buf, + "page_buf capacity ({page_buf_cap}) should be bounded at ~1 page size \ + ({one_page_uncompressed}), not grow with page count. \ + Max expected: {max_expected_buf}" + ); + assert!( + compressed_buf_cap <= max_expected_buf, + "compressed_buf capacity ({compressed_buf_cap}) should be bounded at ~1 page size \ + ({one_page_uncompressed}), not grow with page count. \ + Max expected: {max_expected_buf}" + ); + + // Verify the data still round-trips correctly + let _result = writer.close().unwrap(); + } + + /// Same scenario as above but for Parquet V2 data pages. + #[test] + fn test_memory_bounded_with_highly_compressible_data_v2() { + let value_size = 1024; + let repeated_string: String = "A".repeat(value_size); + let rows_per_page = 1000; + let num_pages = 100; + let total_rows = rows_per_page * num_pages; + + let values: Vec = (0..total_rows) + .map(|_| ByteArray::from(repeated_string.as_str())) + .collect(); + + let props = Arc::new( + WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_2_0) + .set_data_page_row_count_limit(rows_per_page) + .set_compression(Compression::SNAPPY) + .set_dictionary_enabled(false) + .build(), + ); + + let page_writer = get_test_page_writer(); + let mut writer = get_test_column_writer::(page_writer, 0, 0, props); + + writer.write_batch(&values, None, None).unwrap(); + + let page_buf_cap = writer.page_buf.capacity(); + let one_page_uncompressed = rows_per_page * value_size; + let max_expected_buf = one_page_uncompressed * 3; + + assert!( + page_buf_cap <= max_expected_buf, + "page_buf capacity ({page_buf_cap}) should be bounded at ~1 page size, \ + not grow with page count. Max expected: {max_expected_buf}" + ); + + let _result = writer.close().unwrap(); + } + + /// Verifies that dictionary fallback does not cause memory bloat. + /// + /// When dictionary encoding is enabled but the dictionary grows too large (high + /// cardinality data), the writer falls back to plain encoding. During fallback, + /// all buffered dictionary-encoded pages are re-encoded and flushed. With buffer + /// reuse, the per-page encoding buffers should not retain oversized allocations. + /// + /// This test writes enough unique values to trigger dictionary fallback, then + /// continues writing to produce additional plain-encoded pages. The reusable + /// buffers should remain bounded throughout. + #[test] + fn test_dictionary_fallback_memory_bounded() { + let rows_per_page = 500; + // Default dictionary page size limit is 1MB. Each unique ~200-byte string + // entry will fill the dictionary quickly. + let value_size = 200; + + // Phase 1: Write unique values to force dictionary overflow and fallback. + // With 500 rows/page and 200 bytes/value, each page is ~100KB of dict data. + // After ~50 pages (25K unique values, ~5MB), dictionary should overflow. + let phase1_rows = 25_000; + let phase1_values: Vec = (0..phase1_rows) + .map(|i| { + let s = format!("{:0>width$}", i, width = value_size); + ByteArray::from(s.as_str()) + }) + .collect(); + + // Phase 2: After fallback, write more data (plain encoding, compressible). + let phase2_rows = 50_000; + let repeated_string: String = "B".repeat(value_size); + let phase2_values: Vec = (0..phase2_rows) + .map(|_| ByteArray::from(repeated_string.as_str())) + .collect(); + + let props = Arc::new( + WriterProperties::builder() + .set_data_page_row_count_limit(rows_per_page) + .set_compression(Compression::SNAPPY) + .set_dictionary_enabled(true) + // Keep dictionary page size small to trigger fallback sooner + .set_dictionary_page_size_limit(512 * 1024) // 512KB + .build(), + ); + + let page_writer = get_test_page_writer(); + let mut writer = get_test_column_writer::(page_writer, 0, 0, props); + + // Write phase 1 (triggers dict fallback internally) + writer.write_batch(&phase1_values, None, None).unwrap(); + // Write phase 2 (plain encoded, compressible) + writer.write_batch(&phase2_values, None, None).unwrap(); + + let page_buf_cap = writer.page_buf.capacity(); + let compressed_buf_cap = writer.compressed_buf.capacity(); + + // Buffers should be bounded at roughly 1 page worth of data. + // One page is ~500 * 200 = 100KB. Allow 5x headroom for encoding overhead + // and potential dictionary page data. + let one_page_approx = rows_per_page * value_size; + let max_expected = one_page_approx * 5; + + assert!( + page_buf_cap <= max_expected, + "page_buf capacity ({page_buf_cap}) grew beyond expected bound \ + after dictionary fallback. Max expected: {max_expected}" + ); + assert!( + compressed_buf_cap <= max_expected, + "compressed_buf capacity ({compressed_buf_cap}) grew beyond expected bound \ + after dictionary fallback. Max expected: {max_expected}" + ); + + let _result = writer.close().unwrap(); + } }