Skip to content

Commit

Permalink
Merge branch 'main' into refactor/flight
Browse files Browse the repository at this point in the history
  • Loading branch information
zhang2014 committed May 23, 2024
2 parents 36eb655 + ddf4099 commit a17d6a7
Show file tree
Hide file tree
Showing 32 changed files with 729 additions and 406 deletions.
318 changes: 162 additions & 156 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/common/building/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ test = true
anyhow = { workspace = true }
cargo-license = "0.6.1"
cargo_metadata = "0.18"
gix = "0.62.0"
gix = "0.63.0"
log = { workspace = true }
vergen = { version = "8.3.1", default-features = false, features = ["build", "cargo", "git", "gix", "rustc"] }
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::borrow::Borrow;
use std::io;
use std::ops::Deref;
use std::ops::RangeBounds;
use std::sync::Arc;

Expand All @@ -27,10 +28,50 @@ use crate::sm_v002::leveled_store::map_api::MarkedOf;
use crate::sm_v002::marked::Marked;
use crate::state_machine::ExpireKey;

impl Level {
/// A single **immutable** level of state machine data.
///
/// Immutable level implement only [`MapApiRO`], but not [`MapApi`].
///
/// [`MapApi`]: crate::sm_v002::leveled_store::map_api::MapApi
#[derive(Debug, Clone)]
pub struct Immutable {
level: Arc<Level>,
}

impl Immutable {
pub fn new(level: Arc<Level>) -> Self {
Self { level }
}

pub fn new_from_level(level: Level) -> Self {
Self {
level: Arc::new(level),
}
}

pub fn inner(&self) -> &Arc<Level> {
&self.level
}
}

impl AsRef<Level> for Immutable {
fn as_ref(&self) -> &Level {
self.level.as_ref()
}
}

impl Deref for Immutable {
type Target = Level;

fn deref(&self) -> &Self::Target {
self.level.as_ref()
}
}

impl Immutable {
/// Build a static stream that yields key values for primary index
#[futures_async_stream::try_stream(boxed, ok = MapKV<String>, error = io::Error)]
async fn str_range<Q, R>(self: Arc<Level>, range: R)
async fn str_range<Q, R>(self: Immutable, range: R)
where
String: Borrow<Q>,
Q: Ord + Send + Sync + ?Sized,
Expand All @@ -45,7 +86,7 @@ impl Level {

/// Build a static stream that yields expire key and key for the secondary expiration index
#[futures_async_stream::try_stream(boxed, ok = MapKV<ExpireKey>, error = io::Error)]
async fn expire_range<Q, R>(self: Arc<Level>, range: R)
async fn expire_range<Q, R>(self: Immutable, range: R)
where
ExpireKey: Borrow<Q>,
Q: Ord + Send + Sync + ?Sized,
Expand All @@ -60,7 +101,7 @@ impl Level {
}

#[async_trait::async_trait]
impl MapApiRO<String> for Arc<Level> {
impl MapApiRO<String> for Immutable {
async fn get<Q>(&self, key: &Q) -> Result<Marked<<String as MapKey>::V>, io::Error>
where
String: Borrow<Q>,
Expand All @@ -78,7 +119,7 @@ impl MapApiRO<String> for Arc<Level> {
}

#[async_trait::async_trait]
impl MapApiRO<ExpireKey> for Arc<Level> {
impl MapApiRO<ExpireKey> for Immutable {
async fn get<Q>(&self, key: &Q) -> Result<MarkedOf<ExpireKey>, io::Error>
where
ExpireKey: Borrow<Q>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
use std::borrow::Borrow;
use std::io;
use std::ops::RangeBounds;
use std::sync::Arc;

use crate::sm_v002::leveled_store::immutable::Immutable;
use crate::sm_v002::leveled_store::level::Level;
use crate::sm_v002::leveled_store::map_api::compacted_get;
use crate::sm_v002::leveled_store::map_api::compacted_range;
Expand All @@ -28,20 +28,20 @@ use crate::sm_v002::marked::Marked;

/// A readonly leveled map that owns the data.
#[derive(Debug, Default, Clone)]
pub struct StaticLevels {
pub struct ImmutableLevels {
/// From oldest to newest, i.e., levels[0] is the oldest
levels: Vec<Arc<Level>>,
levels: Vec<Immutable>,
}

impl StaticLevels {
pub(in crate::sm_v002) fn new(levels: impl IntoIterator<Item = Arc<Level>>) -> Self {
impl ImmutableLevels {
pub(in crate::sm_v002) fn new(levels: impl IntoIterator<Item = Immutable>) -> Self {
Self {
levels: levels.into_iter().collect(),
}
}

/// Return an iterator of all Arc of levels from newest to oldest.
pub(in crate::sm_v002) fn iter_arc_levels(&self) -> impl Iterator<Item = &Arc<Level>> {
pub(in crate::sm_v002) fn iter_immutable_levels(&self) -> impl Iterator<Item = &Immutable> {
self.levels.iter().rev()
}

Expand All @@ -50,11 +50,11 @@ impl StaticLevels {
self.levels.iter().map(|x| x.as_ref()).rev()
}

pub(in crate::sm_v002) fn newest(&self) -> Option<&Arc<Level>> {
pub(in crate::sm_v002) fn newest(&self) -> Option<&Immutable> {
self.levels.last()
}

pub(in crate::sm_v002) fn push(&mut self, level: Arc<Level>) {
pub(in crate::sm_v002) fn push(&mut self, level: Immutable) {
self.levels.push(level);
}

Expand All @@ -69,24 +69,24 @@ impl StaticLevels {
}

#[async_trait::async_trait]
impl<K> MapApiRO<K> for StaticLevels
impl<K> MapApiRO<K> for ImmutableLevels
where
K: MapKey,
Level: MapApiRO<K>,
Arc<Level>: MapApiRO<K>,
Immutable: MapApiRO<K>,
{
async fn get<Q>(&self, key: &Q) -> Result<Marked<K::V>, io::Error>
where
K: Borrow<Q>,
Q: Ord + Send + Sync + ?Sized,
{
let levels = self.iter_arc_levels();
let levels = self.iter_immutable_levels();
compacted_get(key, levels).await
}

async fn range<R>(&self, range: R) -> Result<KVResultStream<K>, io::Error>
where R: RangeBounds<K> + Clone + Send + Sync + 'static {
let levels = self.iter_arc_levels();
let levels = self.iter_immutable_levels();
compacted_range::<_, _, _, Level>(range, None, levels).await
}
}
44 changes: 24 additions & 20 deletions src/meta/raft-store/src/sm_v002/leveled_store/leveled_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ use std::borrow::Borrow;
use std::fmt;
use std::io;
use std::ops::RangeBounds;
use std::sync::Arc;

use databend_common_meta_types::KVMeta;

use crate::sm_v002::leveled_store::immutable::Immutable;
use crate::sm_v002::leveled_store::immutable_levels::ImmutableLevels;
use crate::sm_v002::leveled_store::level::Level;
use crate::sm_v002::leveled_store::map_api::compacted_get;
use crate::sm_v002::leveled_store::map_api::compacted_range;
Expand All @@ -31,7 +32,6 @@ use crate::sm_v002::leveled_store::map_api::MarkedOf;
use crate::sm_v002::leveled_store::map_api::Transition;
use crate::sm_v002::leveled_store::ref_::Ref;
use crate::sm_v002::leveled_store::ref_mut::RefMut;
use crate::sm_v002::leveled_store::static_levels::StaticLevels;
use crate::sm_v002::marked::Marked;

/// State machine data organized in multiple levels.
Expand All @@ -46,39 +46,43 @@ pub struct LeveledMap {
writable: Level,

/// The immutable levels.
frozen: StaticLevels,
immutable_levels: ImmutableLevels,
}

impl LeveledMap {
pub(crate) fn new(writable: Level) -> Self {
Self {
writable,
frozen: Default::default(),
immutable_levels: Default::default(),
}
}

/// Return an iterator of all levels in reverse order.
pub(in crate::sm_v002) fn iter_levels(&self) -> impl Iterator<Item = &Level> {
[&self.writable]
.into_iter()
.chain(self.frozen.iter_levels())
.chain(self.immutable_levels.iter_levels())
}

/// Return the top level and an iterator of all frozen levels, in newest to oldest order.
/// Return the top level and an iterator of all immutable levels, in newest to oldest order.
pub(in crate::sm_v002) fn iter_shared_levels(
&self,
) -> (Option<&Level>, impl Iterator<Item = &Arc<Level>>) {
(Some(&self.writable), self.frozen.iter_arc_levels())
) -> (Option<&Level>, impl Iterator<Item = &Immutable>) {
(
Some(&self.writable),
self.immutable_levels.iter_immutable_levels(),
)
}

/// Freeze the current writable level and create a new empty writable level.
pub fn freeze_writable(&mut self) -> &StaticLevels {
pub fn freeze_writable(&mut self) -> &ImmutableLevels {
let new_writable = self.writable.new_level();

let frozen = std::mem::replace(&mut self.writable, new_writable);
self.frozen.push(Arc::new(frozen));
let immutable = std::mem::replace(&mut self.writable, new_writable);
self.immutable_levels
.push(Immutable::new_from_level(immutable));

&self.frozen
&self.immutable_levels
}

/// Return an immutable reference to the top level i.e., the writable level.
Expand All @@ -92,22 +96,22 @@ impl LeveledMap {
}

/// Return a reference to the immutable levels.
pub fn frozen_ref(&self) -> &StaticLevels {
&self.frozen
pub fn immutable_levels_ref(&self) -> &ImmutableLevels {
&self.immutable_levels
}

/// Replace all immutable levels with the given one.
pub(crate) fn replace_frozen(&mut self, b: StaticLevels) {
self.frozen = b;
pub(crate) fn replace_immutable_levels(&mut self, b: ImmutableLevels) {
self.immutable_levels = b;
}

pub(crate) fn to_ref_mut(&mut self) -> RefMut {
RefMut::new(&mut self.writable, &self.frozen)
RefMut::new(&mut self.writable, &self.immutable_levels)
}

#[allow(dead_code)]
pub(crate) fn to_ref(&self) -> Ref {
Ref::new(Some(&self.writable), &self.frozen)
Ref::new(Some(&self.writable), &self.immutable_levels)
}
}

Expand All @@ -116,7 +120,7 @@ impl<K> MapApiRO<K> for LeveledMap
where
K: MapKey + fmt::Debug,
Level: MapApiRO<K>,
Arc<Level>: MapApiRO<K>,
Immutable: MapApiRO<K>,
{
async fn get<Q>(&self, key: &Q) -> Result<Marked<K::V>, io::Error>
where
Expand All @@ -139,7 +143,7 @@ impl<K> MapApi<K> for LeveledMap
where
K: MapKey,
Level: MapApi<K>,
Arc<Level>: MapApiRO<K>,
Immutable: MapApiRO<K>,
{
async fn set(
&mut self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ async fn test_freeze() -> anyhow::Result<()> {
]);

// Listing from the base level sees the old value.
let frozen = l.frozen_ref();
let immutables = l.immutable_levels_ref();

let got = frozen
let got = immutables
.str_map()
.range(s("")..)
.await?
Expand Down Expand Up @@ -193,9 +193,9 @@ async fn test_two_levels() -> anyhow::Result<()> {

// Check base level

let frozen = l.frozen_ref();
let immutables = l.immutable_levels_ref();

let strm = frozen.str_map().range(s("")..).await?;
let strm = immutables.str_map().range(s("")..).await?;
let got = strm.try_collect::<Vec<_>>().await?;
assert_eq!(got, vec![
//
Expand Down
10 changes: 5 additions & 5 deletions src/meta/raft-store/src/sm_v002/leveled_store/map_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,9 @@ where
/// There could be tombstone entries: [`Marked::TombStone`].
///
/// The `TOP` is the type of the top level.
/// The `L` is the type of frozen levels.
/// The `L` is the type of immutable levels.
///
/// Because the top level is very likely to be a different type from the frozen levels, i.e., it is writable.
/// Because the top level is very likely to be a different type from the immutable levels, i.e., it is writable.
pub(in crate::sm_v002) async fn compacted_range<'d, K, R, L, TOP>(
range: R,
top: Option<&'d TOP>,
Expand Down Expand Up @@ -257,10 +257,10 @@ where

#[cfg(test)]
mod tests {
use std::sync::Arc;

use futures_util::TryStreamExt;

use crate::sm_v002::leveled_store::immutable::Immutable;
use crate::sm_v002::leveled_store::level::Level;
use crate::sm_v002::leveled_store::map_api::compacted_get;
use crate::sm_v002::leveled_store::map_api::compacted_range;
Expand Down Expand Up @@ -301,12 +301,12 @@ mod tests {
let mut l0 = Level::default();
l0.set(s("a"), Some((b("a"), None))).await?;
l0.set(s("b"), Some((b("b"), None))).await?;
let l0 = Arc::new(l0);
let l0 = Immutable::new_from_level(l0);

let mut l1 = l0.new_level();
l1.set(s("a"), None).await?;
l1.set(s("c"), None).await?;
let l1 = Arc::new(l1);
let l1 = Immutable::new_from_level(l1);

let mut l2 = l1.new_level();
l2.set(s("b"), Some((b("b2"), None))).await?;
Expand Down
4 changes: 2 additions & 2 deletions src/meta/raft-store/src/sm_v002/leveled_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod arc_level_impl;
pub mod immutable;
pub mod immutable_levels;
pub mod level;
pub mod leveled_map;
pub mod map_api;
pub mod ref_;
pub mod ref_mut;
pub mod static_levels;
pub mod sys_data;
pub mod sys_data_api;
pub mod util;
Expand Down
Loading

0 comments on commit a17d6a7

Please sign in to comment.