diff --git a/Cargo.toml b/Cargo.toml index 6e3723d..85a2675 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ thiserror = "1" tracing = "0.1.37" [dev-dependencies] +mock_instant = "0.3" criterion = "0.4" futures = "0.3" proptest = "1" diff --git a/src/lib.rs b/src/lib.rs index 0a6d1ce..169630d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ -use std::borrow::Cow; +use std::sync::Arc; mod block_read_write; @@ -11,21 +11,70 @@ mod record; mod recordlog; mod rolling; +pub use mem::PagesBuf; pub use multi_record_log::{MultiRecordLog, SyncPolicy}; -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug)] pub struct Record<'a> { pub position: u64, - pub payload: Cow<'a, [u8]>, + pub payload: PagesBuf<'a>, } impl<'a> Record<'a> { - pub fn new(position: u64, payload: &'a [u8]) -> Self { - Record { - position, - payload: Cow::Borrowed(payload), + #[cfg(test)] + pub fn payload_equal(&self, payload: &[u8]) -> bool { + self.payload.to_cow() == payload + } +} + +#[derive(Clone, Default, Debug, Ord, PartialOrd, Eq, PartialEq)] +pub struct FileNumber { + file_number: Arc, +} + +impl FileNumber { + fn new(file_number: u64) -> Self { + FileNumber { + file_number: Arc::new(file_number), + } + } + + /// Returns whether there is no clone of this FileNumber in existance. + /// + /// /!\ care should be taken to not have some other code store a &FileNumber which could alias + /// with self as it might then be sementically incorrect to delete content based only on this + /// returning `true`. + pub fn can_be_deleted(&self) -> bool { + Arc::strong_count(&self.file_number) == 1 + } + + #[cfg(test)] + pub fn unroll(&self, tracker: &crate::rolling::FileTracker) -> Vec { + let mut file = self.clone(); + let mut file_numbers = Vec::new(); + loop { + file_numbers.push(file.file_number()); + if let Some(next_file) = tracker.next(&file) { + file = next_file; + } else { + return file_numbers; + } } } + + pub fn filename(&self) -> String { + format!("wal-{:020}", self.file_number) + } + + #[cfg(test)] + pub fn file_number(&self) -> u64 { + *self.file_number + } + + #[cfg(test)] + pub fn for_test(file_number: u64) -> Self { + FileNumber::new(file_number) + } } /// Resources used by mrecordlog diff --git a/src/mem/arena.rs b/src/mem/arena.rs new file mode 100644 index 0000000..086cbb3 --- /dev/null +++ b/src/mem/arena.rs @@ -0,0 +1,207 @@ +/// 256 KiB. +#[cfg(not(test))] +pub const PAGE_SIZE: usize = 1 << 18; + +#[cfg(test)] +pub const PAGE_SIZE: usize = 7; + +// TODO make it an array once we get a way to allocate array on the heap. +pub type Page = Box<[u8]>; + +#[derive(Clone, Copy, Eq, PartialEq, Debug)] +pub struct PageId(usize); + +/// An arena of fixed sized pages. +#[derive(Default)] +pub struct Arena { + /// We use an array to store the list of pages. + /// It can be seen as an efficient map from page id to pages. + /// + /// This map's len can-only grows. Its size is therefore the maximum number of pages + /// that was ever allocated. One page being 1MB long, this is not a problem. + /// + /// If a page is not allocated, the corresponding entry is `None`. + pages: Vec>, + /// `free_slots` slots keeps track of the pages that are not allocated. + free_slots: Vec, + /// `free_page_ids` keeps track of the allocated pages that are + /// available. + free_page_ids: Vec, + /// Arena stats used to track how many pages should be freed. + stats: ArenaStats, +} + +// The idea here is that we keep track of the maximum number of pages used through time. +// To identify if it is worth deallocating pages, we look at the maximum number of pages +// in use in the last few minutes minutes. +// +// We then allow ourselves to free memory down to this value. +// Tracking exactly the maximum number of pages in use in the last 5 minutes is unnecessarily +// complicated. +// +// For instance, we could run an extra task or thread. +// +// Instead, we just run a routine whenever someone interacts with the GC. +// This routine only checks time 1 out of 256 calls. +// +// Pitfall: If pages are requests way less often than 256 times per minutes, +// this arena may take way too much time to release its memory. +struct ArenaStats { + max_num_used_pages_former: usize, + max_num_used_pages_current: usize, +} + +impl Default for ArenaStats { + fn default() -> ArenaStats { + ArenaStats { + max_num_used_pages_former: 0, + max_num_used_pages_current: 0, + } + } +} + +impl ArenaStats { + /// This method happens when we are changing time window. + fn roll(&mut self) { + self.max_num_used_pages_former = self.max_num_used_pages_current; + self.max_num_used_pages_current = 0; + } + + /// Records the number of used pages, and returns an estimation of the maximum number of pages + /// in the last 5 minutes. + pub fn record_num_used_page(&mut self, num_used_pages: usize) -> usize { + self.max_num_used_pages_current = self.max_num_used_pages_current.max(num_used_pages); + self.max_num_used_pages_former + .max(self.max_num_used_pages_current) + } +} + +impl Arena { + /// Returns an allocated page id. + pub fn acquire_page(&mut self) -> PageId { + if let Some(page_id) = self.free_page_ids.pop() { + assert!(self.pages[page_id.0].is_some()); + return page_id; + } + let page: Page = vec![0u8; PAGE_SIZE].into_boxed_slice(); + if let Some(free_slot) = self.free_slots.pop() { + let slot = &mut self.pages[free_slot.0]; + assert!(slot.is_none()); + *slot = Some(page); + free_slot + } else { + let new_page_id = self.pages.len(); + self.pages.push(Some(page)); + PageId(new_page_id) + } + } + + #[inline] + pub fn page(&self, page_id: PageId) -> &[u8] { + self.pages[page_id.0].as_ref().unwrap() + } + + #[inline] + pub fn page_mut(&mut self, page_id: PageId) -> &mut [u8] { + self.pages[page_id.0].as_mut().unwrap() + } + + pub fn release_page(&mut self, page_id: PageId) { + self.free_page_ids.push(page_id); + assert!(self.pages[page_id.0].is_some()); + self.gc(); + } + + /// Clients are expected roll the stats regularly. + pub fn roll_and_gc(&mut self) { + self.stats.roll(); + self.gc(); + } + + /// `gc` releases memory by some of the free pages. + fn gc(&mut self) { + let num_used_pages = self.num_used_pages(); + let max_used_num_pages_in_last_5_min = self.stats.record_num_used_page(num_used_pages); + // We pick a target slightly higher than the maximum number of pages to avoid needless + // allocations when we are experience a general increase + // in memory usage. + let target_num_pages = max_used_num_pages_in_last_5_min + 10; + let num_pages_to_free = self.num_allocated_pages().saturating_sub(target_num_pages); + let num_free_pages_to_keep = self.free_page_ids.len() - num_pages_to_free; + for free_page_id in self.free_page_ids.drain(num_free_pages_to_keep..) { + self.pages[free_page_id.0] = None; + self.free_slots.push(free_page_id); + } + } + + /// Returns the number of pages that are allocated + /// (regardless of whether they are in use or not). + pub fn num_allocated_pages(&self) -> usize { + self.pages.len() - self.free_slots.len() + } + + /// Returns the number of pages that are allocated AND actually used. + pub fn num_used_pages(&self) -> usize { + self.pages.len() - self.free_slots.len() - self.free_page_ids.len() + } + + pub fn unused_capacity(&self) -> usize { + self.free_page_ids.len() * PAGE_SIZE + } +} + +#[cfg(test)] +mod tests { + use mock_instant::MockClock; + + use super::*; + + #[test] + fn test_arena_simple() { + let mut arena = Arena::default(); + assert_eq!(arena.num_allocated_pages(), 0); + assert_eq!(arena.acquire_page(), PageId(0)); + assert_eq!(arena.acquire_page(), PageId(1)); + arena.release_page(PageId(0)); + assert_eq!(arena.acquire_page(), PageId(0)); + } + + #[test] + fn test_arena_gc() { + let mut arena = Arena::default(); + assert_eq!(arena.num_allocated_pages(), 0); + assert_eq!(arena.acquire_page(), PageId(0)); + assert_eq!(arena.acquire_page(), PageId(1)); + arena.release_page(PageId(1)); + assert_eq!(arena.num_allocated_pages(), 2); + arena.gc(); + assert_eq!(arena.num_allocated_pages(), 2); + assert_eq!(arena.acquire_page(), PageId(1)); + assert_eq!(arena.num_allocated_pages(), 2); + } + + #[test] + fn test_arena_stats() { + let mut arena_stats = ArenaStats::default(); + for _ in 0..256 { + assert_eq!(arena_stats.record_num_used_page(10), 10); + } + MockClock::advance(WINDOW.mul_f32(1.1f32)); + for _ in 0..256 { + assert_eq!(arena_stats.record_num_used_page(1), 10); + } + MockClock::advance(WINDOW.mul_f32(1.1f32)); + for _ in 0..256 { + arena_stats.record_num_used_page(1); + } + assert_eq!(arena_stats.record_num_used_page(1), 1); + assert_eq!(arena_stats.record_num_used_page(2), 2); + for _ in 0..256 { + assert_eq!(arena_stats.record_num_used_page(1), 2); + } + MockClock::advance(WINDOW); + for _ in 0..256 { + assert_eq!(arena_stats.record_num_used_page(1), 2); + } + } +} diff --git a/src/mem/mod.rs b/src/mem/mod.rs index 11db7db..9587e60 100644 --- a/src/mem/mod.rs +++ b/src/mem/mod.rs @@ -1,8 +1,13 @@ +mod arena; mod queue; mod queues; +mod rolling_buffer; +use self::arena::Arena; pub(crate) use self::queue::MemQueue; pub(crate) use self::queues::MemQueues; +pub use self::rolling_buffer::PagesBuf; +use self::rolling_buffer::RollingBuffer; #[cfg(test)] mod tests; diff --git a/src/mem/queue.rs b/src/mem/queue.rs index 74b66a2..e292a25 100644 --- a/src/mem/queue.rs +++ b/src/mem/queue.rs @@ -1,94 +1,11 @@ -use std::borrow::Cow; use std::collections::VecDeque; use std::ops::{Bound, RangeBounds}; use crate::error::AppendError; -use crate::rolling::FileNumber; -use crate::Record; +use crate::mem::{Arena, RollingBuffer}; +use crate::{FileNumber, Record}; -#[derive(Default)] -struct RollingBuffer { - buffer: VecDeque, -} - -impl RollingBuffer { - fn new() -> Self { - RollingBuffer { - buffer: VecDeque::new(), - } - } - - fn len(&self) -> usize { - self.buffer.len() - } - - fn capacity(&self) -> usize { - self.buffer.capacity() - } - - fn clear(&mut self) { - self.buffer.clear(); - self.buffer.shrink_to_fit(); - } - - fn drain_start(&mut self, pos: usize) { - let target_capacity = self.len() * 9 / 8; - self.buffer.drain(..pos); - // In order to avoid leaking memory we shrink the buffer. - // The last maximum length (= the length before drain) - // is a good estimate of what we will need in the future. - // - // We add 1/8 to that in order to make sure that we don't end up - // shrinking / allocating for small variations. - - if self.buffer.capacity() > target_capacity { - self.buffer.shrink_to(target_capacity); - } - } - - fn extend(&mut self, slice: &[u8]) { - self.buffer.extend(slice.iter().copied()); - } - - fn get_range(&self, bounds: impl RangeBounds) -> Cow<[u8]> { - let start = match bounds.start_bound() { - Bound::Included(pos) => *pos, - Bound::Excluded(pos) => pos + 1, - Bound::Unbounded => 0, - }; - - let end = match bounds.end_bound() { - Bound::Included(pos) => pos + 1, - Bound::Excluded(pos) => *pos, - Bound::Unbounded => self.len(), - }; - - let (left_part_of_queue, right_part_of_queue) = self.buffer.as_slices(); - - if end < left_part_of_queue.len() { - Cow::Borrowed(&left_part_of_queue[start..end]) - } else if start >= left_part_of_queue.len() { - let start = start - left_part_of_queue.len(); - let end = end - left_part_of_queue.len(); - - Cow::Borrowed(&right_part_of_queue[start..end]) - } else { - // VecDeque is a rolling buffer. As a result, we do not have - // access to a continuous buffer. - // - // Here the requested slice cross the boundary and we need to allocate and copy the data - // in a new buffer. - let mut res = Vec::with_capacity(end - start); - res.extend_from_slice(&left_part_of_queue[start..]); - let end = end - left_part_of_queue.len(); - res.extend_from_slice(&right_part_of_queue[..end]); - - Cow::Owned(res) - } - } -} - -#[derive(Clone)] +#[derive(Clone, Debug)] struct RecordMeta { start_offset: usize, // in a vec of RecordMeta, this field should be set only on the last record @@ -101,8 +18,9 @@ struct RecordMeta { pub(crate) struct MemQueue { // Concatenated records concatenated_records: RollingBuffer, + // If `record_metas` is not empty, `start_position` should be the position of the first record. start_position: u64, - record_metas: Vec, + record_metas: VecDeque, } impl MemQueue { @@ -110,7 +28,7 @@ impl MemQueue { MemQueue { concatenated_records: RollingBuffer::new(), start_position: next_position, - record_metas: Vec::new(), + record_metas: Default::default(), } } @@ -124,17 +42,21 @@ impl MemQueue { } /// Returns the last record stored in the queue. - pub fn last_record(&self) -> Option { - self.record_metas.last().map(|record| Record { + pub fn last_record<'a>(&'a self, arena: &'a Arena) -> Option> { + let record = self.record_metas.back()?; + let record_payload = self + .concatenated_records + .get_range(record.start_offset.., arena); + Some(Record { position: record.position, - payload: self.concatenated_records.get_range(record.start_offset..), + payload: record_payload, }) } /// Returns what the next position should be. pub fn next_position(&self) -> u64 { self.record_metas - .last() + .back() .map(|record| record.position + 1) .unwrap_or(self.start_position) } @@ -148,6 +70,7 @@ impl MemQueue { file_number: &FileNumber, target_position: u64, payload: &[u8], + arena: &mut Arena, ) -> Result<(), AppendError> { let next_position = self.next_position(); if target_position < next_position { @@ -158,7 +81,7 @@ impl MemQueue { self.start_position = target_position; } - let file_number = if let Some(record_meta) = self.record_metas.last_mut() { + let file_number = if let Some(record_meta) = self.record_metas.back_mut() { if record_meta.file_number.as_ref() == Some(file_number) { record_meta.file_number.take().unwrap() } else { @@ -169,12 +92,12 @@ impl MemQueue { }; let record_meta = RecordMeta { - start_offset: self.concatenated_records.len(), + start_offset: self.concatenated_records.end_offset(), file_number: Some(file_number), position: target_position, }; - self.record_metas.push(record_meta); - self.concatenated_records.extend(payload); + self.record_metas.push_back(record_meta); + self.concatenated_records.extend_from_slice(payload, arena); Ok(()) } @@ -187,7 +110,7 @@ impl MemQueue { .binary_search_by_key(&position, |record| record.position) } - pub fn range(&self, range: R) -> impl Iterator + '_ + pub fn range<'a, R>(&'a self, range: R, arena: &'a Arena) -> impl Iterator + 'a where R: RangeBounds + 'static { let start_idx: usize = match range.start_bound() { Bound::Included(&start_from) => { @@ -209,14 +132,16 @@ impl MemQueue { .map(move |idx| { let record = &self.record_metas[idx]; let position = record.position; - let start_offset = record.start_offset; - let payload = if let Some(next_record_meta) = self.record_metas.get(idx + 1) { - let end_offset = next_record_meta.start_offset; - self.concatenated_records - .get_range(start_offset..end_offset) + let start_bound = Bound::Included(record.start_offset); + let end_bound = if let Some(next_record_meta) = self.record_metas.get(idx + 1) { + Bound::Excluded(next_record_meta.start_offset) } else { - self.concatenated_records.get_range(start_offset..) + Bound::Unbounded }; + let payload = self + .concatenated_records + .get_range((start_bound, end_bound), arena); + // let payload = concatenate_buffers(payload_buf); Record { position, payload } }) } @@ -225,13 +150,13 @@ impl MemQueue { /// /// If truncating to a future position, make the queue go forward to that position. /// Return the number of record removed. - pub fn truncate(&mut self, truncate_up_to_pos: u64) -> usize { + pub fn truncate_up_to_included(&mut self, truncate_up_to_pos: u64, arena: &mut Arena) -> usize { if self.start_position > truncate_up_to_pos { return 0; } if truncate_up_to_pos + 1 >= self.next_position() { self.start_position = truncate_up_to_pos + 1; - self.concatenated_records.clear(); + self.concatenated_records.clear(arena); let record_count = self.record_metas.len(); self.record_metas.clear(); return record_count; @@ -242,10 +167,8 @@ impl MemQueue { let start_offset_to_keep: usize = self.record_metas[first_record_to_keep].start_offset; self.record_metas.drain(..first_record_to_keep); - for record_meta in &mut self.record_metas { - record_meta.start_offset -= start_offset_to_keep; - } - self.concatenated_records.drain_start(start_offset_to_keep); + self.concatenated_records + .truncate_up_to_excluded(start_offset_to_keep, arena); self.start_position = truncate_up_to_pos + 1; first_record_to_keep } diff --git a/src/mem/queues.rs b/src/mem/queues.rs index 3adbbac..e116745 100644 --- a/src/mem/queues.rs +++ b/src/mem/queues.rs @@ -4,14 +4,15 @@ use std::ops::RangeBounds; use tracing::{info, warn}; use crate::error::{AlreadyExists, AppendError, MissingQueue}; -use crate::mem::MemQueue; -use crate::rolling::FileNumber; -use crate::Record; +use crate::mem::{Arena, MemQueue}; +use crate::{FileNumber, Record}; #[derive(Default)] pub(crate) struct MemQueues { queues: HashMap, + pub(crate) arena: Arena, } + impl MemQueues { /// The file number argument is here unused. Its point is just to make sure we /// flushed the file before updating the in memory queue. @@ -52,7 +53,7 @@ impl MemQueues { R: RangeBounds + 'static, { if let Some(queue) = self.queues.get(queue) { - Ok(queue.range(range)) + Ok(queue.range(range, &self.arena)) } else { Err(MissingQueue(queue.to_string())) } @@ -66,12 +67,17 @@ impl MemQueues { .ok_or_else(|| MissingQueue(queue.to_string())) } - pub(crate) fn get_queue_mut(&mut self, queue: &str) -> Result<&mut MemQueue, MissingQueue> { + pub(crate) fn get_queue_mut( + &mut self, + queue: &str, + ) -> Result<(&mut MemQueue, &mut Arena), MissingQueue> { // We do not rely on `entry` in order to avoid // the allocation. - self.queues + let queue = self + .queues .get_mut(queue) - .ok_or_else(|| MissingQueue(queue.to_string())) + .ok_or_else(|| MissingQueue(queue.to_string()))?; + Ok((queue, &mut self.arena)) } pub fn append_record( @@ -81,8 +87,11 @@ impl MemQueues { target_position: u64, payload: &[u8], ) -> Result<(), AppendError> { - self.get_queue_mut(queue)? - .append_record(file_number, target_position, payload) + let queue = self + .queues + .get_mut(queue) + .ok_or_else(|| MissingQueue(queue.to_string()))?; + queue.append_record(file_number, target_position, payload, &mut self.arena) } pub fn contains_queue(&self, queue: &str) -> bool { @@ -134,7 +143,7 @@ impl MemQueues { /// Returns the last record stored in the queue. pub fn last_record(&self, queue: &str) -> Result, MissingQueue> { - Ok(self.get_queue(queue)?.last_record()) + Ok(self.get_queue(queue)?.last_record(&self.arena)) } pub fn next_position(&self, queue: &str) -> Result { @@ -146,12 +155,13 @@ impl MemQueues { /// /// If there are no records `<= position`, the method will /// not do anything. - pub fn truncate(&mut self, queue: &str, position: u64) -> Option { - if let Ok(queue) = self.get_queue_mut(queue) { - Some(queue.truncate(position)) - } else { - None - } + pub fn truncate(&mut self, queue_id: &str, position: u64) -> Option { + let queue = self.queues.get_mut(queue_id)?; + Some(queue.truncate_up_to_included(position, &mut self.arena)) + } + + pub fn roll_and_gc(&mut self) { + self.arena.roll_and_gc(); } /// Return a tuple of (size, capacity) of memory used by the memqueues @@ -166,7 +176,8 @@ impl MemQueues { .queues .iter() .map(|(name, queue)| name.capacity() + queue.capacity()) - .sum(); + .sum::() + + self.arena.unused_capacity(); (size, capacity) } diff --git a/src/mem/rolling_buffer.rs b/src/mem/rolling_buffer.rs new file mode 100644 index 0000000..007b062 --- /dev/null +++ b/src/mem/rolling_buffer.rs @@ -0,0 +1,286 @@ +use std::ops::{Bound, Range, RangeBounds}; + +use bytes::Buf; + +use crate::mem::arena::{Arena, PageId, PAGE_SIZE}; + +/// `RollingBuffer` stores a short slice of an seemingly infinite buffer with offset spawning from +/// [0..u64::MAX). +/// +/// It is possible to append bytes to the buffer with `.extend_from_slice(..)`, +/// or drop the bytes up to a given offset with `truncate_up_to()`. +#[derive(Default)] +pub struct RollingBuffer { + page_ids: Vec, + range: Range, +} + +fn num_pages_required(range: Range) -> usize { + let Range { start, end } = range; + if start >= end { + // This is an important non-trivial edge case. + // If the range is empty, we retain no pages. + return 0; + } + let first_page = start / PAGE_SIZE; + let last_page = end.saturating_sub(1) / PAGE_SIZE; + last_page - first_page + 1 +} + +impl RollingBuffer { + pub fn new() -> Self { + RollingBuffer { + page_ids: Vec::new(), + range: 0..0, + } + } + + pub fn len(&self) -> usize { + self.range.len() + } + + pub fn end_offset(&self) -> usize { + self.range.end + } + + pub fn capacity(&self) -> usize { + self.page_ids.len() * PAGE_SIZE + } + + pub fn clear(&mut self, arena: &mut Arena) { + self.truncate_up_to_excluded(self.range.end, arena); + } + + fn check_invariants(&self) { + debug_assert_eq!(num_pages_required(self.range.clone()), self.page_ids.len()); + } + + /// Truncate the buffer, all of the bytes striclty below `new_start``. + pub fn truncate_up_to_excluded(&mut self, new_start: usize, arena: &mut Arena) { + assert!(new_start <= self.range.end); + let num_pages = num_pages_required(new_start..self.range.end); + assert!(num_pages <= self.page_ids.len()); + let num_pages_to_drop = self.page_ids.len() - num_pages; + self.range.start = new_start; + if num_pages_to_drop > 0 { + for page_id in self.page_ids.drain(..(self.page_ids.len() - num_pages)) { + arena.release_page(page_id); + } + } + self.check_invariants(); + } + + /// Returns a chunk of available memory, either remaining from the last page, + /// or acquires a new page from the arena. + fn get_page_with_room<'a>(&mut self, arena: &'a mut Arena) -> &'a mut [u8] { + let start_offset = self.range.end % PAGE_SIZE; + if start_offset == 0 || self.page_ids.is_empty() { + // The page is entirely used, or there are no pages at all. + // Let's allocate a new page. + let new_page_id = arena.acquire_page(); + self.page_ids.push(new_page_id); + } + let page_id = self.page_ids.last().copied().unwrap(); + let page = arena.page_mut(page_id); + &mut page[start_offset..] + } + + pub fn extend_from_slice(&mut self, mut slice: &[u8], arena: &mut Arena) { + while !slice.is_empty() { + let page = self.get_page_with_room(arena); + let len = page.len().min(slice.len()); + let (head, queue) = slice.split_at(len); + page[..len].copy_from_slice(head); + slice = queue; + self.range.end += len; + } + self.check_invariants(); + } + + fn get_range_aux<'slf, 'a: 'slf>( + &'slf self, + range: Range, + arena: &'a Arena, + ) -> PagesBuf<'slf> { + let Range { start, end } = range; + assert!(start >= self.range.start); + assert!(end <= self.range.end); + if end <= start { + return PagesBuf { + arena, + start_offset: 0, + page_ids: &[], + remaining_len: 0, + }; + } + let start_page_id = start / PAGE_SIZE; + let start_inner_page_id = self.range.start / PAGE_SIZE; + let skip_pages = start_page_id - start_inner_page_id; + let start_offset = start % PAGE_SIZE; + PagesBuf { + arena, + start_offset, + page_ids: &self.page_ids[skip_pages..], + remaining_len: end - start, + } + } + + pub fn get_range<'slf, 'a: 'slf, R>( + &'slf self, + range_bounds: R, + arena: &'a Arena, + ) -> PagesBuf<'slf> + where + R: RangeBounds + 'static, + { + let start = match range_bounds.start_bound() { + Bound::Included(pos) => *pos, + Bound::Excluded(pos) => pos + 1, + Bound::Unbounded => 0, + }; + let end = match range_bounds.end_bound() { + Bound::Included(pos) => pos + 1, + Bound::Excluded(pos) => *pos, + Bound::Unbounded => self.range.end, + }; + self.get_range_aux(start..end, arena) + } +} + +#[derive(Clone, Copy)] +pub struct PagesBuf<'a> { + arena: &'a Arena, + page_ids: &'a [PageId], + start_offset: usize, + remaining_len: usize, +} + +impl<'a> std::fmt::Debug for PagesBuf<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + self.to_cow().fmt(f) + } +} + +impl<'a> PagesBuf<'a> { + pub fn to_cow(mut self) -> std::borrow::Cow<'a, [u8]> { + if self.page_ids.len() <= 1 { + let chunk = self.chunk_with_lifetime(); + return std::borrow::Cow::Borrowed(chunk); + } + let mut buf = Vec::with_capacity(self.remaining_len); + while self.has_remaining() { + let chunk = self.chunk_with_lifetime(); + buf.extend_from_slice(chunk); + self.advance(chunk.len()); + } + std::borrow::Cow::Owned(buf) + } + + // Contrary to Buf::chunk, this method returns a slice with a `'a` lifetime (so it can outlive + // 'self). + fn chunk_with_lifetime(&self) -> &'a [u8] { + let Some(first_page_id) = self.page_ids.first().copied() else { + return &[]; + }; + let current_page = &self.arena.page(first_page_id)[self.start_offset..]; + if current_page.len() > self.remaining_len { + ¤t_page[..self.remaining_len] + } else { + current_page + } + } +} + +impl<'a> Buf for PagesBuf<'a> { + fn remaining(&self) -> usize { + self.remaining_len + } + + #[inline] + fn chunk(&self) -> &[u8] { + self.chunk_with_lifetime() + } + + fn advance(&mut self, mut cnt: usize) { + cnt = cnt.min(self.remaining_len); + while cnt > 0 { + if self.page_ids.is_empty() { + return; + } + let page = self.chunk(); + if page.len() > cnt { + self.start_offset += cnt; + self.remaining_len -= cnt; + } else { + cnt -= page.len(); + self.remaining_len -= page.len(); + self.start_offset = 0; + self.page_ids = &self.page_ids[1..]; + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn to_vec(mut buf: B) -> Vec { + let mut output = Vec::with_capacity(buf.remaining()); + while buf.has_remaining() { + let chunk = buf.chunk(); + output.extend_from_slice(chunk); + buf.advance(chunk.len()); + } + output + } + + #[test] + fn test_rolling_buffer() { + let mut arena = Arena::default(); + let text = b"hello happy tax payer"; + for new_start in 0..text.len() { + let mut rolling_buffer = RollingBuffer::new(); + rolling_buffer.extend_from_slice(&b"hello"[..], &mut arena); + rolling_buffer.extend_from_slice(&b" happy"[..], &mut arena); + rolling_buffer.extend_from_slice(&b" tax payer"[..], &mut arena); + rolling_buffer.truncate_up_to_excluded(new_start, &mut arena); + for start in new_start..text.len() { + for end in start..text.len() { + let bytes: Vec = to_vec(rolling_buffer.get_range(start..end, &arena)); + assert_eq!(&text[start..end], &bytes[..]); + } + } + } + } + + #[test] + fn test_rolling_buffer_clear() { + let mut arena = Arena::default(); + let mut rolling_buffer = RollingBuffer::new(); + rolling_buffer.clear(&mut arena); + assert_eq!(rolling_buffer.len(), 0); + rolling_buffer.extend_from_slice(&b"abcdefghik"[..], &mut arena); + assert_eq!(rolling_buffer.len(), 10); + assert_eq!(arena.num_used_pages(), 2); + rolling_buffer.clear(&mut arena); + assert_eq!(rolling_buffer.len(), 0); + assert_eq!(arena.num_used_pages(), 0); + } + + #[test] + fn test_num_pages_required() { + assert_eq!(num_pages_required(0..0), 0); + assert_eq!(num_pages_required(2..2), 0); + assert_eq!(num_pages_required(2..1), 0); + assert_eq!(num_pages_required(0..1), 1); + assert_eq!(num_pages_required(0..PAGE_SIZE), 1); + assert_eq!(num_pages_required(0..PAGE_SIZE + 1), 2); + assert_eq!(num_pages_required(0..2 * PAGE_SIZE), 2); + assert_eq!(num_pages_required(0..2 * PAGE_SIZE + 1), 3); + assert_eq!(num_pages_required(PAGE_SIZE - 1..2 * PAGE_SIZE), 2); + assert_eq!(num_pages_required(PAGE_SIZE - 1..2 * PAGE_SIZE + 1), 3); + assert_eq!(num_pages_required(PAGE_SIZE..2 * PAGE_SIZE), 1); + assert_eq!(num_pages_required(PAGE_SIZE..2 * PAGE_SIZE + 1), 2); + } +} diff --git a/src/mem/tests.rs b/src/mem/tests.rs index aca9c28..1f0704b 100644 --- a/src/mem/tests.rs +++ b/src/mem/tests.rs @@ -1,7 +1,6 @@ use super::*; use crate::error::{AlreadyExists, AppendError}; -use crate::rolling::FileNumber; -use crate::Record; +use crate::{FileNumber, Record}; #[test] fn test_mem_queues_already_exists() { @@ -43,22 +42,23 @@ fn test_mem_queues() { assert!(mem_queues .append_record("droopy", &FileNumber::for_test(1), 3, b"payer") .is_ok()); - assert_eq!( - mem_queues.range("droopy", 0..).unwrap().next(), - Some(Record::new(0, b"hello")) - ); + let record = mem_queues.range("droopy", 0..).unwrap().next().unwrap(); + assert_eq!(record.position, 0); + assert!(record.payload_equal(b"hello")); let droopy: Vec = mem_queues.range("droopy", 1..).unwrap().collect(); - assert_eq!( - &droopy, - &[ - Record::new(1, b"happy"), - Record::new(2, b"tax"), - Record::new(3, b"payer"), - ], - ); + assert_eq!(droopy.len(), 3); + assert_eq!(droopy[0].position, 1); + assert_eq!(droopy[1].position, 2); + assert_eq!(droopy[2].position, 3); + assert!(droopy[0].payload_equal(b"happy")); + assert!(droopy[1].payload_equal(b"tax")); + assert!(droopy[2].payload_equal(b"payer")); } let fable: Vec = mem_queues.range("fable", 1..).unwrap().collect(); - assert_eq!(&fable, &[Record::new(1, b"corbeau")]); + assert_eq!(fable.len(), 1); + let fable_record = fable.into_iter().next().unwrap(); + assert_eq!(fable_record.position, 1); + assert!(fable_record.payload_equal(b"corbeau")); } #[test] @@ -82,15 +82,17 @@ fn test_mem_queues_truncate() { .append_record("droopy", &1.into(), 4, b"!") .is_ok()); mem_queues - .append_record("droopy", &1.into(), 5, b"payer") + .append_record("droopy", &1.into(), 5, b"payer2") .unwrap(); } mem_queues.truncate("droopy", 3); let droopy: Vec = mem_queues.range("droopy", 0..).unwrap().collect(); - assert_eq!( - &droopy[..], - &[Record::new(4, b"!"), Record::new(5, b"payer"),] - ); + assert_eq!(droopy.len(), 2); + assert_eq!(droopy[0].position, 4); + assert!(droopy[0].payload_equal(b"!")); + + assert_eq!(droopy[1].position, 5); + assert!(droopy[1].payload_equal(b"payer2")); } #[test] @@ -110,26 +112,29 @@ fn test_mem_queues_skip_advance() { .append_record("droopy", &1.into(), 1, b"happy") .is_err()); let droopy: Vec = mem_queues.range("droopy", 0..).unwrap().collect(); - assert_eq!( - &droopy[..], - &[ - Record::new(0, b"hello"), - Record::new(2, b"happy"), - Record::new(3, b"happy"), - ] - ); + assert_eq!(droopy.len(), 3); + assert_eq!(droopy[0].position, 0); + assert!(droopy[0].payload_equal(b"hello")); + assert_eq!(droopy[1].position, 2); + assert!(droopy[1].payload_equal(b"happy")); + assert_eq!(droopy[2].position, 3); + assert!(droopy[2].payload_equal(b"happy")); let droopy: Vec = mem_queues.range("droopy", 1..).unwrap().collect(); - assert_eq!( - &droopy[..], - &[Record::new(2, b"happy"), Record::new(3, b"happy"),] - ); + assert_eq!(droopy.len(), 2); + assert_eq!(droopy[0].position, 2); + assert!(droopy[0].payload_equal(b"happy")); + assert_eq!(droopy[1].position, 3); + assert!(droopy[1].payload_equal(b"happy")); let droopy: Vec = mem_queues.range("droopy", 2..).unwrap().collect(); - assert_eq!( - &droopy[..], - &[Record::new(2, b"happy"), Record::new(3, b"happy"),] - ); + assert_eq!(droopy.len(), 2); + assert_eq!(droopy[0].position, 2); + assert!(droopy[0].payload_equal(b"happy")); + assert_eq!(droopy[1].position, 3); + assert!(droopy[1].payload_equal(b"happy")); let droopy: Vec = mem_queues.range("droopy", 3..).unwrap().collect(); - assert_eq!(&droopy[..], &[Record::new(3, b"happy")]); + assert_eq!(droopy.len(), 1); + assert_eq!(droopy[0].position, 3); + assert!(droopy[0].payload_equal(b"happy")); } #[test] @@ -162,7 +167,9 @@ fn test_mem_queues_append_idempotence() { AppendError::Past )); let droopy: Vec = mem_queues.range("droopy", 0..).unwrap().collect(); - assert_eq!(&droopy, &[Record::new(0, b"hello")]); + assert_eq!(droopy.len(), 1); + assert_eq!(droopy[0].position, 0); + assert!(droopy[0].payload_equal(b"hello")); } #[test] @@ -173,7 +180,9 @@ fn test_mem_queues_non_zero_first_el() { .append_record("droopy", &1.into(), 5, b"hello") .is_ok()); let droopy: Vec = mem_queues.range("droopy", 0..).unwrap().collect(); - assert_eq!(droopy, &[Record::new(5, b"hello")]); + assert_eq!(droopy.len(), 1); + assert_eq!(droopy[0].position, 5); + assert!(droopy[0].payload_equal(b"hello")); } #[test] diff --git a/src/multi_record_log.rs b/src/multi_record_log.rs index 44963bc..b7bc935 100644 --- a/src/multi_record_log.rs +++ b/src/multi_record_log.rs @@ -242,12 +242,12 @@ impl MultiRecordLog { self.record_log_writer.write_record(record)?; self.sync_on_policy()?; - let mem_queue = self.in_mem_queues.get_queue_mut(queue)?; + let (mem_queue, arena) = self.in_mem_queues.get_queue_mut(queue)?; let mut max_position = position; for record in records { // we just serialized it, we know it's valid let (position, payload) = record.unwrap(); - mem_queue.append_record(&file_number, position, payload)?; + mem_queue.append_record(&file_number, position, payload, arena)?; max_position = position; } @@ -293,6 +293,10 @@ impl MultiRecordLog { Ok(removed_count) } + pub fn memory_gc(&mut self) { + self.in_mem_queues.roll_and_gc(); + } + fn run_gc_if_necessary(&mut self) -> io::Result<()> { debug!("run_gc_if_necessary"); if self @@ -314,7 +318,10 @@ impl MultiRecordLog { if event_enabled!(Level::DEBUG) { for queue in self.list_queues() { let queue: &MemQueue = self.in_mem_queues.get_queue(queue).unwrap(); - let first_pos = queue.range(..).next().map(|record| record.position); + let first_pos = queue + .range(.., &self.in_mem_queues.arena) + .next() + .map(|record| record.position); let last_pos = queue.last_position(); debug!(first_pos=?first_pos, last_pos=?last_pos, "queue positions after gc"); } diff --git a/src/proptests.rs b/src/proptests.rs index a523a7a..d488600 100644 --- a/src/proptests.rs +++ b/src/proptests.rs @@ -1,4 +1,3 @@ -use std::borrow::Cow; use std::collections::HashMap; use std::ops::Range; @@ -357,16 +356,13 @@ fn test_multi_record() { { let mut multi_record_log = MultiRecordLog::open(tempdir.path()).unwrap(); multi_record_log.truncate("queue", 0).unwrap(); - assert_eq!( - multi_record_log - .range("queue", ..) - .unwrap() - .collect::>(), - [Record { - position: 1, - payload: Cow::Borrowed(&b"22"[..]) - }], - ); + let records: Vec = multi_record_log + .range("queue", ..) + .unwrap() + .collect::>(); + assert_eq!(records.len(), 1); + assert_eq!(records[0].position, 1); + assert_eq!(records[0].payload.to_cow(), b"22".as_slice()); } { let mut multi_record_log = MultiRecordLog::open(tempdir.path()).unwrap(); @@ -379,13 +375,15 @@ fn test_multi_record() { } { let multi_record_log = MultiRecordLog::open(tempdir.path()).unwrap(); - assert_eq!( - multi_record_log - .range("queue", ..) - .unwrap() - .collect::>(), - [Record::new(1, b"22"), Record::new(2, b"hello"),] - ); + let records = multi_record_log + .range("queue", ..) + .unwrap() + .collect::>(); + assert_eq!(records.len(), 2); + assert_eq!(records[0].position, 1); + assert_eq!(records[0].payload.to_cow(), b"22".as_slice()); + assert_eq!(records[1].position, 2); + assert_eq!(records[1].payload.to_cow(), b"hello".as_slice()); } } diff --git a/src/rolling/file_number.rs b/src/rolling/file_tracker.rs similarity index 72% rename from src/rolling/file_number.rs rename to src/rolling/file_tracker.rs index 6f697ad..1623e55 100644 --- a/src/rolling/file_number.rs +++ b/src/rolling/file_tracker.rs @@ -1,5 +1,6 @@ use std::collections::BTreeSet; -use std::sync::Arc; + +use crate::FileNumber; /// RefCount a set of ordered files. Always track at least one file. pub struct FileTracker { @@ -79,56 +80,6 @@ impl FileTracker { } } -#[derive(Clone, Default, Debug, Ord, PartialOrd, Eq, PartialEq)] -pub struct FileNumber { - file_number: Arc, -} - -impl FileNumber { - fn new(file_number: u64) -> Self { - FileNumber { - file_number: Arc::new(file_number), - } - } - - /// Returns whether there is no clone of this FileNumber in existance. - /// - /// /!\ care should be taken to not have some other code store a &FileNumber which could alias - /// with self as it might then be sementically incorrect to delete content based only on this - /// returning `true`. - pub fn can_be_deleted(&self) -> bool { - Arc::strong_count(&self.file_number) == 1 - } - - #[cfg(test)] - pub fn unroll(&self, tracker: &FileTracker) -> Vec { - let mut file = self.clone(); - let mut file_numbers = Vec::new(); - loop { - file_numbers.push(file.file_number()); - if let Some(next_file) = tracker.next(&file) { - file = next_file; - } else { - return file_numbers; - } - } - } - - pub fn filename(&self) -> String { - format!("wal-{:020}", self.file_number) - } - - #[cfg(test)] - pub fn file_number(&self) -> u64 { - *self.file_number - } - - #[cfg(test)] - pub fn for_test(file_number: u64) -> Self { - FileNumber::new(file_number) - } -} - impl std::borrow::Borrow for FileNumber { fn borrow(&self) -> &u64 { &self.file_number diff --git a/src/rolling/mod.rs b/src/rolling/mod.rs index 5cca8d0..2b27ff2 100644 --- a/src/rolling/mod.rs +++ b/src/rolling/mod.rs @@ -1,8 +1,9 @@ mod directory; -mod file_number; +mod file_tracker; pub use self::directory::{Directory, RollingReader, RollingWriter}; -pub use self::file_number::{FileNumber, FileTracker}; +pub use self::file_tracker::FileTracker; +pub use crate::FileNumber; const FRAME_NUM_BYTES: usize = 1 << 15; diff --git a/src/tests.rs b/src/tests.rs index 7617f45..fc51b72 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -9,7 +9,7 @@ fn read_all_records<'a>(multi_record_log: &'a MultiRecordLog, queue: &str) -> Ve let mut next_pos = u64::default(); for Record { position, payload } in multi_record_log.range(queue, next_pos..).unwrap() { assert_eq!(position, next_pos); - records.push(payload); + records.push(payload.to_cow()); next_pos += 1; } records @@ -235,7 +235,7 @@ fn test_multi_insert_truncate() { &multi_record_log .range("queue", ..) .unwrap() - .map(|record| record.payload) + .map(|record| record.payload.to_cow()) .collect::>(), &[b"2".as_slice(), b"3".as_slice(), b"4".as_slice()] ) @@ -243,12 +243,11 @@ fn test_multi_insert_truncate() { { let mut multi_record_log = MultiRecordLog::open(tempdir.path()).unwrap(); multi_record_log.truncate("queue", 1).unwrap(); - assert_eq!( &multi_record_log .range("queue", ..) .unwrap() - .map(|record| record.payload) + .map(|record| record.payload.to_cow()) .collect::>(), &[b"3".as_slice(), b"4".as_slice()] ) @@ -259,7 +258,7 @@ fn test_multi_insert_truncate() { &multi_record_log .range("queue", ..) .unwrap() - .map(|record| record.payload) + .map(|record| record.payload.to_cow()) .collect::>(), &[b"3".as_slice(), b"4".as_slice()] ) @@ -269,52 +268,56 @@ fn test_multi_insert_truncate() { #[test] fn test_truncate_range_correct_pos() { let tempdir = tempfile::tempdir().unwrap(); + let mut multi_record_log = MultiRecordLog::open(tempdir.path()).unwrap(); + multi_record_log.create_queue("queue").unwrap(); + assert_eq!( + multi_record_log + .append_record("queue", None, &b"1"[..]) + .unwrap(), + Some(0) + ); + assert_eq!( + multi_record_log + .append_record("queue", None, &b"2"[..]) + .unwrap(), + Some(1) + ); + multi_record_log.truncate("queue", 1).unwrap(); + assert_eq!( + multi_record_log + .append_record("queue", None, &b"3"[..]) + .unwrap(), + Some(2) + ); { - let mut multi_record_log = MultiRecordLog::open(tempdir.path()).unwrap(); - multi_record_log.create_queue("queue").unwrap(); - assert_eq!( - multi_record_log - .append_record("queue", None, &b"1"[..]) - .unwrap(), - Some(0) - ); - assert_eq!( - multi_record_log - .append_record("queue", None, &b"2"[..]) - .unwrap(), - Some(1) - ); - multi_record_log.truncate("queue", 1).unwrap(); - assert_eq!( - multi_record_log - .append_record("queue", None, &b"3"[..]) - .unwrap(), - Some(2) - ); - assert_eq!( - multi_record_log - .range("queue", ..) - .unwrap() - .collect::>(), - &[Record::new(2u64, b"3")] - ); + let records = multi_record_log + .range("queue", ..) + .unwrap() + .collect::>(); + assert_eq!(records.len(), 1); + assert_eq!(records[0].position, 2); + assert_eq!(records[0].payload.to_cow(), b"3".as_slice()); + } - assert_eq!( - multi_record_log - .range("queue", 2..) - .unwrap() - .collect::>(), - &[Record::new(2, b"3")] - ); + { + let records = multi_record_log + .range("queue", 2..) + .unwrap() + .collect::>(); + assert_eq!(records.len(), 1); + assert_eq!(records[0].position, 2); + assert_eq!(records[0].payload.to_cow(), b"3".as_slice()); + } + { use std::ops::Bound; - assert_eq!( - multi_record_log - .range("queue", (Bound::Excluded(1), Bound::Unbounded)) - .unwrap() - .collect::>(), - &[Record::new(2, b"3")] - ); + let records = multi_record_log + .range("queue", (Bound::Excluded(1), Bound::Unbounded)) + .unwrap() + .collect::>(); + assert_eq!(records.len(), 1); + assert_eq!(records[0].position, 2); + assert_eq!(records[0].payload.to_cow(), b"3".as_slice()); } } @@ -382,7 +385,7 @@ fn test_open_corrupted() { let mut count = 0; for Record { position, payload } in multi_record_log.range("queue", ..).unwrap() { - assert_eq!(payload, format!("{position:08}").as_bytes()); + assert_eq!(payload.to_cow(), format!("{position:08}").as_bytes()); count += 1; } assert!(count > 4096); @@ -448,7 +451,7 @@ fn test_last_record() { let Record { position, payload } = multi_record_log.last_record("queue1").unwrap().unwrap(); assert_eq!(position, 0); - assert_eq!(payload, &b"hello"[..]); + assert_eq!(payload.to_cow(), &b"hello"[..]); multi_record_log.truncate("queue1", 0).unwrap();