Skip to content

Commit

Permalink
Add support for co-locating sequencers and partition processor leaders
Browse files Browse the repository at this point in the history
Let Scheduler respect replicated loglet placements

Let LogsController respect the preferred sequencer node

This fixes #2081.
  • Loading branch information
tillrohrmann committed Oct 17, 2024
1 parent 47e8f23 commit 141bd1c
Show file tree
Hide file tree
Showing 7 changed files with 361 additions and 60 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/admin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ http = { workspace = true }
http-body = { workspace = true }
http-body-util = { workspace = true }
hyper-util = { workspace = true }
itertools = { workspace = true }
mime_guess = { version = "2.0.5", optional = true }
okapi-operation = { version = "0.3.0-rc2", features = ["axum-integration"] }
prost = { workspace = true }
Expand Down
154 changes: 136 additions & 18 deletions crates/admin/src/cluster_controller/logs_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use rand::prelude::IteratorRandom;
use rand::{thread_rng, RngCore};
use std::collections::HashMap;
use std::iter;
use std::num::NonZeroU8;
use std::ops::Deref;
use std::sync::Arc;

use rand::prelude::IteratorRandom;
use rand::{thread_rng, RngCore};
use tokio::task::JoinSet;
use tracing::debug;
use xxhash_rust::xxh3::Xxh3Builder;
Expand All @@ -26,6 +27,7 @@ use restate_core::metadata_store::{
use restate_core::{metadata, task_center, Metadata, MetadataWriter, ShutdownError};
use restate_types::config::Configuration;
use restate_types::errors::GenericError;
use restate_types::identifiers::PartitionId;
use restate_types::live::Pinned;
use restate_types::logs::builder::LogsBuilder;
use restate_types::logs::metadata::{
Expand All @@ -38,9 +40,10 @@ use restate_types::partition_table::PartitionTable;
use restate_types::replicated_loglet::{
NodeSet, ReplicatedLogletId, ReplicatedLogletParams, ReplicationProperty,
};
use restate_types::{logs, Version, Versioned};
use restate_types::{logs, GenerationalNodeId, NodeId, PlainNodeId, Version, Versioned};

use crate::cluster_controller::observed_cluster_state::ObservedClusterState;
use crate::cluster_controller::scheduler;

type Result<T, E = LogsControllerError> = std::result::Result<T, E>;

Expand All @@ -58,6 +61,18 @@ pub enum LogsControllerError {
Shutdown(#[from] ShutdownError),
}

/// Node set selector hints for the [`LogsController`].
pub trait NodeSetSelectorHints {
/// A specific [`NodeId`] where the sequencer should run.
fn preferred_sequencer(&self, log_id: &LogId) -> Option<NodeId>;
}

impl<T: NodeSetSelectorHints> NodeSetSelectorHints for &T {
fn preferred_sequencer(&self, log_id: &LogId) -> Option<NodeId> {
(*self).preferred_sequencer(log_id)
}
}

/// States of a log managed by the [`LogsController`].
///
/// If a log does not have an entry in the [`Logs`] configuration, then it starts in state
Expand Down Expand Up @@ -183,15 +198,19 @@ impl LogState {
&mut self,
observed_cluster_state: &ObservedClusterState,
log_builder: &mut LogsBuilder,
node_set_selector_hints: impl NodeSetSelectorHints,
) -> Result<()> {
match self {
LogState::Provisioning {
provider_kind,
log_id,
} => {
if let Some(loglet_configuration) =
try_provisioning(*log_id, *provider_kind, observed_cluster_state)
{
if let Some(loglet_configuration) = try_provisioning(
*log_id,
*provider_kind,
observed_cluster_state,
node_set_selector_hints,
) {
let chain =
Chain::new(*provider_kind, loglet_configuration.to_loglet_params()?);
log_builder
Expand All @@ -215,17 +234,19 @@ impl LogState {
Ok(())
}

fn try_reconfiguring<F>(
fn try_reconfiguring<F, G>(
&mut self,
observed_cluster_state: &ObservedClusterState,
mut append_segment: F,
preferred_sequencer_hint: G,
) -> Result<()>
where
F: FnMut(
Lsn,
ProviderKind,
LogletParams,
) -> Result<SegmentIndex, logs::builder::BuilderError>,
G: Fn() -> Option<NodeId>,
{
match self {
// We can only reconfigure if we are in state Sealed
Expand All @@ -234,8 +255,8 @@ impl LogState {
configuration,
..
} => {
if let Some(loglet_configuration) =
configuration.try_reconfiguring(observed_cluster_state)
if let Some(loglet_configuration) = configuration
.try_reconfiguring(observed_cluster_state, preferred_sequencer_hint())
{
let segment_index = append_segment(
*seal_lsn,
Expand Down Expand Up @@ -274,6 +295,7 @@ fn try_provisioning(
log_id: LogId,
provider_kind: ProviderKind,
observed_cluster_state: &ObservedClusterState,
node_set_selector_hints: impl NodeSetSelectorHints,
) -> Option<LogletConfiguration> {
match provider_kind {
ProviderKind::Local => {
Expand All @@ -290,6 +312,7 @@ fn try_provisioning(
ReplicatedLogletId::new(log_id, SegmentIndex::OLDEST),
observed_cluster_state,
None,
node_set_selector_hints.preferred_sequencer(&log_id),
)
.map(LogletConfiguration::Replicated),
}
Expand All @@ -302,6 +325,7 @@ fn build_new_replicated_loglet_configuration(
loglet_id: ReplicatedLogletId,
observed_cluster_state: &ObservedClusterState,
previous_configuration: Option<&ReplicatedLogletParams>,
preferred_sequencer: Option<NodeId>,
) -> Option<ReplicatedLogletParams> {
let mut rng = thread_rng();
// todo make min nodeset size configurable, respect roles, respect StorageState, etc.
Expand All @@ -318,6 +342,16 @@ fn build_new_replicated_loglet_configuration(
})
.collect();

let sequencer = preferred_sequencer
.and_then(|node_id| {
// map to a known alive node
observed_cluster_state.alive_nodes.get(&node_id.id())
})
.or_else(|| {
// we can place the sequencer on any alive node
observed_cluster_state.alive_nodes.values().choose(&mut rng)
});

if log_servers.len() >= 3 {
let replication = ReplicationProperty::new(NonZeroU8::new(2).expect("to be valid"));
let mut nodeset = NodeSet::empty();
Expand All @@ -328,18 +362,15 @@ fn build_new_replicated_loglet_configuration(

Some(ReplicatedLogletParams {
loglet_id,
sequencer: **log_servers
.iter()
.choose(&mut rng)
.expect("one node must exist"),
sequencer: *sequencer.expect("one node must exist"),
replication,
nodeset,
write_set: None,
})
} else if let Some(sequencer) = log_servers.iter().choose(&mut rng) {
} else if let Some(sequencer) = sequencer {
previous_configuration.cloned().map(|mut configuration| {
configuration.loglet_id = loglet_id;
configuration.sequencer = **sequencer;
configuration.sequencer = *sequencer;
configuration
})
} else {
Expand Down Expand Up @@ -400,6 +431,7 @@ impl LogletConfiguration {
fn try_reconfiguring(
&self,
observed_cluster_state: &ObservedClusterState,
preferred_sequencer: Option<NodeId>,
) -> Option<LogletConfiguration> {
match self {
#[cfg(feature = "replicated-loglet")]
Expand All @@ -408,6 +440,7 @@ impl LogletConfiguration {
configuration.loglet_id.next(),
observed_cluster_state,
Some(configuration),
preferred_sequencer,
)
.map(LogletConfiguration::Replicated)
}
Expand All @@ -422,6 +455,24 @@ impl LogletConfiguration {
}
}
}

fn node_set_iter(&self) -> impl Iterator<Item = &PlainNodeId> {
match self {
LogletConfiguration::Replicated(configuration) => {
itertools::Either::Left(configuration.nodeset.iter())
}
LogletConfiguration::Local(_) => itertools::Either::Right(iter::empty()),
LogletConfiguration::Memory(_) => itertools::Either::Right(iter::empty()),
}
}

fn sequencer_node(&self) -> Option<GenerationalNodeId> {
match self {
LogletConfiguration::Replicated(configuration) => Some(configuration.sequencer),
LogletConfiguration::Local(_) => None,
LogletConfiguration::Memory(_) => None,
}
}
}

impl TryFrom<&LogletConfig> for LogletConfiguration {
Expand Down Expand Up @@ -532,14 +583,23 @@ impl LogsControllerInner {
&mut self,
observed_cluster_state: &ObservedClusterState,
effects: &mut Vec<Effect>,
node_set_selector_hints: impl NodeSetSelectorHints,
) -> Result<()> {
// we don't do concurrent updates to avoid complexity
if self.logs_write_in_progress.is_none() {
self.seal_logs(observed_cluster_state, effects);

let mut builder = self.current_logs.deref().clone().into_builder();
self.provision_logs(observed_cluster_state, &mut builder)?;
self.reconfigure_logs(observed_cluster_state, &mut builder)?;
self.provision_logs(
observed_cluster_state,
&mut builder,
&node_set_selector_hints,
)?;
self.reconfigure_logs(
observed_cluster_state,
&mut builder,
node_set_selector_hints,
)?;

if let Some(logs) = builder.build_if_modified() {
self.logs_write_in_progress = Some(logs.version());
Expand Down Expand Up @@ -579,9 +639,14 @@ impl LogsControllerInner {
&mut self,
observed_cluster_state: &ObservedClusterState,
logs_builder: &mut LogsBuilder,
node_set_selector_hints: impl NodeSetSelectorHints,
) -> Result<()> {
for log_state in self.logs_state.values_mut() {
log_state.try_provisioning(observed_cluster_state, logs_builder)?;
log_state.try_provisioning(
observed_cluster_state,
logs_builder,
&node_set_selector_hints,
)?;
}

Ok(())
Expand All @@ -591,6 +656,7 @@ impl LogsControllerInner {
&mut self,
observed_cluster_state: &ObservedClusterState,
logs_builder: &mut LogsBuilder,
node_set_selector_hints: impl NodeSetSelectorHints,
) -> Result<()> {
for (log_id, log_state) in &mut self.logs_state {
log_state.try_reconfiguring(
Expand All @@ -602,6 +668,7 @@ impl LogsControllerInner {

chain_builder.append_segment(seal_lsn, provider_kind, loglet_params)
},
|| node_set_selector_hints.preferred_sequencer(log_id),
)?;
}

Expand Down Expand Up @@ -787,10 +854,12 @@ impl LogsController {
pub fn on_observed_cluster_state_update(
&mut self,
observed_cluster_state: &ObservedClusterState,
node_set_selector_hints: impl NodeSetSelectorHints,
) -> Result<(), anyhow::Error> {
self.inner.on_observed_cluster_state_update(
observed_cluster_state,
self.effects.as_mut().expect("to be present"),
node_set_selector_hints,
)?;
self.apply_effects();

Expand Down Expand Up @@ -945,3 +1014,52 @@ impl LogsController {
}
}
}

/// Placement hints for the [`scheduler::Scheduler`] based on the logs configuration
#[derive(derive_more::From)]
pub struct LogsBasedPartitionProcessorPlacementHints<'a> {
logs_controller: &'a LogsController,
}

impl<'a> scheduler::PartitionProcessorPlacementHints
for LogsBasedPartitionProcessorPlacementHints<'a>
{
fn preferred_nodes(&self, partition_id: &PartitionId) -> impl Iterator<Item = &PlainNodeId> {
let log_id = LogId::from(*partition_id);

self.logs_controller
.inner
.logs_state
.get(&log_id)
.and_then(|log_state| match log_state {
LogState::Available { configuration, .. } => configuration
.as_ref()
.map(|configuration| itertools::Either::Left(configuration.node_set_iter())),
LogState::Sealing { .. }
| LogState::Sealed { .. }
| LogState::Provisioning { .. } => None,
})
.unwrap_or_else(|| itertools::Either::Right(iter::empty()))
}

fn preferred_leader(&self, partition_id: &PartitionId) -> Option<PlainNodeId> {
let log_id = LogId::from(*partition_id);

self.logs_controller
.inner
.logs_state
.get(&log_id)
.and_then(|log_state| match log_state {
LogState::Available { configuration, .. } => {
configuration.as_ref().and_then(|configuration| {
configuration
.sequencer_node()
.map(GenerationalNodeId::as_plain)
})
}
LogState::Sealing { .. }
| LogState::Sealed { .. }
| LogState::Provisioning { .. } => None,
})
}
}
Loading

0 comments on commit 141bd1c

Please sign in to comment.