Skip to content

Commit

Permalink
Merge pull request #73 from fjall-rs/2.3.0
Browse files Browse the repository at this point in the history
2.3.0
  • Loading branch information
marvin-j97 authored Oct 31, 2024
2 parents 0d98a1d + 7fcf3f4 commit e414fcd
Show file tree
Hide file tree
Showing 25 changed files with 574 additions and 272 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "lsm-tree"
description = "A K.I.S.S. implementation of log-structured merge trees (LSM-trees/LSMTs)"
license = "MIT OR Apache-2.0"
version = "2.1.1"
version = "2.3.0"
edition = "2021"
rust-version = "1.74.0"
readme = "README.md"
Expand Down Expand Up @@ -30,7 +30,7 @@ double-ended-peekable = "0.1.0"
enum_dispatch = "0.3.13"
guardian = "1.1.0"
log = "0.4.22"
lz4_flex = { version = "0.11.3", optional = true }
lz4_flex = { version = "0.11.3", optional = true, default-features = false }
miniz_oxide = { version = "0.8.0", optional = true }
path-absolutize = "3.1.1"
quick_cache = { version = "0.6.5", default-features = false, features = [] }
Expand Down
5 changes: 3 additions & 2 deletions src/blob_tree/gc/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ impl<'a> GcReader<'a> {
impl<'a> value_log::IndexReader for GcReader<'a> {
fn get(&self, key: &[u8]) -> std::io::Result<Option<ValueHandle>> {
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use MaybeInlineValue::{Indirect, Inline};

let Some(item) = self
.get_internal(key)
Expand All @@ -45,8 +46,8 @@ impl<'a> value_log::IndexReader for GcReader<'a> {
};

match item {
MaybeInlineValue::Inline(_) => Ok(None),
MaybeInlineValue::Indirect { vhandle, .. } => Ok(Some(vhandle)),
Inline(_) => Ok(None),
Indirect { vhandle, .. } => Ok(Some(vhandle)),
}
}
}
7 changes: 2 additions & 5 deletions src/blob_tree/gc/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,12 @@ impl<'a> value_log::IndexWriter for GcWriter<'a> {
}

fn finish(&mut self) -> std::io::Result<()> {
use std::io::{Error as IoError, ErrorKind as IoErrorKind};

log::trace!("Finish blob GC index writer");

#[allow(clippy::significant_drop_in_scrutinee)]
for (key, vhandle, size) in self.buffer.drain(..) {
let buf = MaybeInlineValue::Indirect { vhandle, size }
.encode_into_vec()
.map_err(|e| IoError::new(IoErrorKind::Other, e.to_string()))?;
// TODO: encode into slice using Slice::with_size...
let buf = MaybeInlineValue::Indirect { vhandle, size }.encode_into_vec();

self.memtable.insert(InternalValue::from_components(
key,
Expand Down
109 changes: 89 additions & 20 deletions src/blob_tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,30 @@ use index::IndexTree;
use std::{
io::Cursor,
ops::{RangeBounds, RangeFull},
sync::{Arc, RwLockWriteGuard},
sync::{atomic::AtomicUsize, Arc, RwLockWriteGuard},
};
use value::MaybeInlineValue;
use value_log::ValueLog;

fn resolve_value_handle(vlog: &ValueLog<MyCompressor>, item: RangeItem) -> RangeItem {
use MaybeInlineValue::{Indirect, Inline};

match item {
Ok((key, value)) => {
let mut cursor = Cursor::new(value);
let item = MaybeInlineValue::decode_from(&mut cursor)?;

match item {
MaybeInlineValue::Inline(bytes) => Ok((key, bytes)),
MaybeInlineValue::Indirect { vhandle, .. } => match vlog.get(&vhandle) {
Ok(Some(bytes)) => Ok((key, bytes)),
Err(e) => Err(e.into()),
_ => panic!("value handle did not match any blob - this is a bug"),
},

match MaybeInlineValue::decode_from(&mut cursor)? {
Inline(bytes) => Ok((key, bytes)),
Indirect { vhandle, .. } => {
// Resolve indirection using value log
match vlog.get(&vhandle) {
Ok(Some(bytes)) => Ok((key, bytes)),
Err(e) => Err(e.into()),
_ => {
panic!("value handle ({:?} => {vhandle:?}) did not match any blob - this is a bug", String::from_utf8_lossy(&key))
}
}
}
}
}
Err(e) => Err(e),
Expand All @@ -62,6 +68,10 @@ pub struct BlobTree {
/// Log-structured value-log that stores large values
#[doc(hidden)]
pub blobs: ValueLog<MyCompressor>,

// TODO: maybe replace this with a nonce system
#[doc(hidden)]
pub pending_segments: Arc<AtomicUsize>,
}

impl BlobTree {
Expand All @@ -79,6 +89,7 @@ impl BlobTree {
Ok(Self {
index,
blobs: ValueLog::open(vlog_path, vlog_cfg)?,
pending_segments: Arc::new(AtomicUsize::new(0)),
})
}

Expand All @@ -93,9 +104,27 @@ impl BlobTree {
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use MaybeInlineValue::{Indirect, Inline};

while self
.pending_segments
.load(std::sync::atomic::Ordering::Acquire)
> 0
{
// IMPORTANT: Busy wait until all segments in-flight are committed
// to the tree
}

// IMPORTANT: Lock + snapshot memtable to avoid read skew + preventing tampering with memtable
let _memtable_lock = self.index.read_lock_active_memtable();

while self
.pending_segments
.load(std::sync::atomic::Ordering::Acquire)
> 0
{
// IMPORTANT: Busy wait again until all segments in-flight are committed
// to the tree
}

let iter = self
.index
.create_internal_range::<&[u8], RangeFull>(&.., Some(seqno), None);
Expand Down Expand Up @@ -342,20 +371,46 @@ impl AbstractTree for BlobTree {
blob_writer.write(&item.key.user_key, value)?;
} else {
let direct = MaybeInlineValue::Inline(value);
let serialized_direct = direct.encode_into_vec()?;
let serialized_direct = direct.encode_into_vec();
segment_writer.write(InternalValue::new(item.key, serialized_direct))?;
}
}

let _memtable_lock = self.lock_active_memtable();

log::trace!("Register blob writer into value log");
self.blobs.register_writer(blob_writer)?;

log::trace!("Creating segment");
self.index.consume_writer(segment_id, segment_writer)
log::trace!("Creating LSM-tree segment {segment_id}");
let segment = self.index.consume_writer(segment_id, segment_writer)?;

// TODO: this can probably solved in a nicer way
if segment.is_some() {
// IMPORTANT: Increment the pending count
// so there cannot be a GC scan now, until the segment is registered
self.pending_segments
.fetch_add(1, std::sync::atomic::Ordering::Release);
}

Ok(segment)
}

fn register_segments(&self, segments: &[Arc<crate::Segment>]) -> crate::Result<()> {
self.index.register_segments(segments)
self.index.register_segments(segments)?;

let count = self
.pending_segments
.load(std::sync::atomic::Ordering::Acquire);

assert!(
count >= segments.len(),
"pending_segments is less than segments to register - this is a bug"
);

self.pending_segments
.fetch_sub(segments.len(), std::sync::atomic::Ordering::Release);

Ok(())
}

fn lock_active_memtable(&self) -> std::sync::RwLockWriteGuard<'_, Memtable> {
Expand Down Expand Up @@ -530,7 +585,7 @@ impl AbstractTree for BlobTree {
// into inline or indirect values
let item = MaybeInlineValue::Inline(value.as_ref().into());

let value = item.encode_into_vec().expect("should serialize");
let value = item.encode_into_vec();

let value = InternalValue::from_components(key.as_ref(), value, seqno, r#type);
lock.insert(value)
Expand All @@ -544,7 +599,7 @@ impl AbstractTree for BlobTree {
// into inline or indirect values
let item = MaybeInlineValue::Inline(value.as_ref().into());

let value = item.encode_into_vec().expect("should serialize");
let value = item.encode_into_vec();

self.index.insert(key, value, seqno)
}
Expand All @@ -556,31 +611,45 @@ impl AbstractTree for BlobTree {
) -> crate::Result<Option<crate::UserValue>> {
use value::MaybeInlineValue::{Indirect, Inline};

let Some(value) = self.index.get_internal_with_seqno(key.as_ref(), seqno)? else {
let key = key.as_ref();

let Some(value) = self.index.get_internal_with_seqno(key, seqno)? else {
return Ok(None);
};

match value {
Inline(bytes) => Ok(Some(bytes)),
Indirect { vhandle, .. } => {
// Resolve indirection using value log
Ok(self.blobs.get(&vhandle)?.map(Slice::from))
match self.blobs.get(&vhandle)? {
Some(bytes) => Ok(Some(bytes)),
None => {
panic!("value handle ({key:?} => {vhandle:?}) did not match any blob - this is a bug")
}
}
}
}
}

fn get<K: AsRef<[u8]>>(&self, key: K) -> crate::Result<Option<Slice>> {
use value::MaybeInlineValue::{Indirect, Inline};

let Some(value) = self.index.get_internal(key.as_ref())? else {
let key = key.as_ref();

let Some(value) = self.index.get_internal(key)? else {
return Ok(None);
};

match value {
Inline(bytes) => Ok(Some(bytes)),
Indirect { vhandle, .. } => {
// Resolve indirection using value log
Ok(self.blobs.get(&vhandle)?.map(Slice::from))
match self.blobs.get(&vhandle)? {
Some(bytes) => Ok(Some(bytes)),
None => {
panic!("value handle ({:?} => {vhandle:?}) did not match any blob - this is a bug", String::from_utf8_lossy(key))
}
}
}
}
}
Expand Down
9 changes: 6 additions & 3 deletions src/coding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,13 @@ pub trait Encode {
fn encode_into<W: Write>(&self, writer: &mut W) -> Result<(), EncodeError>;

/// Serializes into vector.
fn encode_into_vec(&self) -> Result<Vec<u8>, EncodeError> {
fn encode_into_vec(&self) -> Vec<u8> {
let mut v = vec![];
self.encode_into(&mut v)?;
Ok(v)

#[allow(clippy::expected_used)]
self.encode_into(&mut v).expect("cannot fail");

v
}
}

Expand Down
11 changes: 5 additions & 6 deletions src/compaction/fifo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,7 @@ impl CompactionStrategy for Strategy {
super::maintenance::Strategy.choose(levels, config)
}
} else {
let mut ids = segment_ids_to_delete.into_iter().collect::<Vec<_>>();
ids.sort_unstable();

let ids = segment_ids_to_delete.into_iter().collect();
Choice::Drop(ids)
}
}
Expand All @@ -133,6 +131,7 @@ mod tests {
Segment,
},
time::unix_timestamp,
HashSet,
};
use std::sync::Arc;
use test_log::test;
Expand Down Expand Up @@ -181,7 +180,7 @@ mod tests {
block_cache,

#[cfg(feature = "bloom")]
bloom_filter: BloomFilter::with_fp_rate(1, 0.1),
bloom_filter: Some(BloomFilter::with_fp_rate(1, 0.1)),
})
}

Expand All @@ -197,7 +196,7 @@ mod tests {

assert_eq!(
compactor.choose(&levels, &Config::default()),
Choice::Drop(vec![1])
Choice::Drop(set![1])
);

Ok(())
Expand Down Expand Up @@ -265,7 +264,7 @@ mod tests {

assert_eq!(
compactor.choose(&levels, &Config::default()),
Choice::Drop(vec![1, 2])
Choice::Drop([1, 2].into_iter().collect::<HashSet<_>>())
);

Ok(())
Expand Down
Loading

0 comments on commit e414fcd

Please sign in to comment.