From 141bd1c2e1482f303089a4303cee656d9a3b0c6d Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 16 Oct 2024 18:52:59 +0200 Subject: [PATCH] Add support for co-locating sequencers and partition processor leaders Let Scheduler respect replicated loglet placements Let LogsController respect the preferred sequencer node This fixes #2081. --- Cargo.lock | 1 + crates/admin/Cargo.toml | 1 + .../src/cluster_controller/logs_controller.rs | 154 +++++++++-- .../admin/src/cluster_controller/scheduler.rs | 240 +++++++++++++++--- .../admin/src/cluster_controller/service.rs | 11 +- crates/types/src/cluster_controller.rs | 7 +- crates/types/src/nodes_config.rs | 7 + 7 files changed, 361 insertions(+), 60 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ca758054c..60e0d62b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5365,6 +5365,7 @@ dependencies = [ "http-body 1.0.1", "http-body-util", "hyper-util", + "itertools 0.13.0", "mime_guess", "okapi-operation", "prost", diff --git a/crates/admin/Cargo.toml b/crates/admin/Cargo.toml index 033f311ea..743a84421 100644 --- a/crates/admin/Cargo.toml +++ b/crates/admin/Cargo.toml @@ -44,6 +44,7 @@ http = { workspace = true } http-body = { workspace = true } http-body-util = { workspace = true } hyper-util = { workspace = true } +itertools = { workspace = true } mime_guess = { version = "2.0.5", optional = true } okapi-operation = { version = "0.3.0-rc2", features = ["axum-integration"] } prost = { workspace = true } diff --git a/crates/admin/src/cluster_controller/logs_controller.rs b/crates/admin/src/cluster_controller/logs_controller.rs index 2d8fd863e..da66799a1 100644 --- a/crates/admin/src/cluster_controller/logs_controller.rs +++ b/crates/admin/src/cluster_controller/logs_controller.rs @@ -8,13 +8,14 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use rand::prelude::IteratorRandom; +use rand::{thread_rng, RngCore}; use std::collections::HashMap; +use std::iter; use std::num::NonZeroU8; use std::ops::Deref; use std::sync::Arc; -use rand::prelude::IteratorRandom; -use rand::{thread_rng, RngCore}; use tokio::task::JoinSet; use tracing::debug; use xxhash_rust::xxh3::Xxh3Builder; @@ -26,6 +27,7 @@ use restate_core::metadata_store::{ use restate_core::{metadata, task_center, Metadata, MetadataWriter, ShutdownError}; use restate_types::config::Configuration; use restate_types::errors::GenericError; +use restate_types::identifiers::PartitionId; use restate_types::live::Pinned; use restate_types::logs::builder::LogsBuilder; use restate_types::logs::metadata::{ @@ -38,9 +40,10 @@ use restate_types::partition_table::PartitionTable; use restate_types::replicated_loglet::{ NodeSet, ReplicatedLogletId, ReplicatedLogletParams, ReplicationProperty, }; -use restate_types::{logs, Version, Versioned}; +use restate_types::{logs, GenerationalNodeId, NodeId, PlainNodeId, Version, Versioned}; use crate::cluster_controller::observed_cluster_state::ObservedClusterState; +use crate::cluster_controller::scheduler; type Result = std::result::Result; @@ -58,6 +61,18 @@ pub enum LogsControllerError { Shutdown(#[from] ShutdownError), } +/// Node set selector hints for the [`LogsController`]. +pub trait NodeSetSelectorHints { + /// A specific [`NodeId`] where the sequencer should run. + fn preferred_sequencer(&self, log_id: &LogId) -> Option; +} + +impl NodeSetSelectorHints for &T { + fn preferred_sequencer(&self, log_id: &LogId) -> Option { + (*self).preferred_sequencer(log_id) + } +} + /// States of a log managed by the [`LogsController`]. /// /// If a log does not have an entry in the [`Logs`] configuration, then it starts in state @@ -183,15 +198,19 @@ impl LogState { &mut self, observed_cluster_state: &ObservedClusterState, log_builder: &mut LogsBuilder, + node_set_selector_hints: impl NodeSetSelectorHints, ) -> Result<()> { match self { LogState::Provisioning { provider_kind, log_id, } => { - if let Some(loglet_configuration) = - try_provisioning(*log_id, *provider_kind, observed_cluster_state) - { + if let Some(loglet_configuration) = try_provisioning( + *log_id, + *provider_kind, + observed_cluster_state, + node_set_selector_hints, + ) { let chain = Chain::new(*provider_kind, loglet_configuration.to_loglet_params()?); log_builder @@ -215,10 +234,11 @@ impl LogState { Ok(()) } - fn try_reconfiguring( + fn try_reconfiguring( &mut self, observed_cluster_state: &ObservedClusterState, mut append_segment: F, + preferred_sequencer_hint: G, ) -> Result<()> where F: FnMut( @@ -226,6 +246,7 @@ impl LogState { ProviderKind, LogletParams, ) -> Result, + G: Fn() -> Option, { match self { // We can only reconfigure if we are in state Sealed @@ -234,8 +255,8 @@ impl LogState { configuration, .. } => { - if let Some(loglet_configuration) = - configuration.try_reconfiguring(observed_cluster_state) + if let Some(loglet_configuration) = configuration + .try_reconfiguring(observed_cluster_state, preferred_sequencer_hint()) { let segment_index = append_segment( *seal_lsn, @@ -274,6 +295,7 @@ fn try_provisioning( log_id: LogId, provider_kind: ProviderKind, observed_cluster_state: &ObservedClusterState, + node_set_selector_hints: impl NodeSetSelectorHints, ) -> Option { match provider_kind { ProviderKind::Local => { @@ -290,6 +312,7 @@ fn try_provisioning( ReplicatedLogletId::new(log_id, SegmentIndex::OLDEST), observed_cluster_state, None, + node_set_selector_hints.preferred_sequencer(&log_id), ) .map(LogletConfiguration::Replicated), } @@ -302,6 +325,7 @@ fn build_new_replicated_loglet_configuration( loglet_id: ReplicatedLogletId, observed_cluster_state: &ObservedClusterState, previous_configuration: Option<&ReplicatedLogletParams>, + preferred_sequencer: Option, ) -> Option { let mut rng = thread_rng(); // todo make min nodeset size configurable, respect roles, respect StorageState, etc. @@ -318,6 +342,16 @@ fn build_new_replicated_loglet_configuration( }) .collect(); + let sequencer = preferred_sequencer + .and_then(|node_id| { + // map to a known alive node + observed_cluster_state.alive_nodes.get(&node_id.id()) + }) + .or_else(|| { + // we can place the sequencer on any alive node + observed_cluster_state.alive_nodes.values().choose(&mut rng) + }); + if log_servers.len() >= 3 { let replication = ReplicationProperty::new(NonZeroU8::new(2).expect("to be valid")); let mut nodeset = NodeSet::empty(); @@ -328,18 +362,15 @@ fn build_new_replicated_loglet_configuration( Some(ReplicatedLogletParams { loglet_id, - sequencer: **log_servers - .iter() - .choose(&mut rng) - .expect("one node must exist"), + sequencer: *sequencer.expect("one node must exist"), replication, nodeset, write_set: None, }) - } else if let Some(sequencer) = log_servers.iter().choose(&mut rng) { + } else if let Some(sequencer) = sequencer { previous_configuration.cloned().map(|mut configuration| { configuration.loglet_id = loglet_id; - configuration.sequencer = **sequencer; + configuration.sequencer = *sequencer; configuration }) } else { @@ -400,6 +431,7 @@ impl LogletConfiguration { fn try_reconfiguring( &self, observed_cluster_state: &ObservedClusterState, + preferred_sequencer: Option, ) -> Option { match self { #[cfg(feature = "replicated-loglet")] @@ -408,6 +440,7 @@ impl LogletConfiguration { configuration.loglet_id.next(), observed_cluster_state, Some(configuration), + preferred_sequencer, ) .map(LogletConfiguration::Replicated) } @@ -422,6 +455,24 @@ impl LogletConfiguration { } } } + + fn node_set_iter(&self) -> impl Iterator { + match self { + LogletConfiguration::Replicated(configuration) => { + itertools::Either::Left(configuration.nodeset.iter()) + } + LogletConfiguration::Local(_) => itertools::Either::Right(iter::empty()), + LogletConfiguration::Memory(_) => itertools::Either::Right(iter::empty()), + } + } + + fn sequencer_node(&self) -> Option { + match self { + LogletConfiguration::Replicated(configuration) => Some(configuration.sequencer), + LogletConfiguration::Local(_) => None, + LogletConfiguration::Memory(_) => None, + } + } } impl TryFrom<&LogletConfig> for LogletConfiguration { @@ -532,14 +583,23 @@ impl LogsControllerInner { &mut self, observed_cluster_state: &ObservedClusterState, effects: &mut Vec, + node_set_selector_hints: impl NodeSetSelectorHints, ) -> Result<()> { // we don't do concurrent updates to avoid complexity if self.logs_write_in_progress.is_none() { self.seal_logs(observed_cluster_state, effects); let mut builder = self.current_logs.deref().clone().into_builder(); - self.provision_logs(observed_cluster_state, &mut builder)?; - self.reconfigure_logs(observed_cluster_state, &mut builder)?; + self.provision_logs( + observed_cluster_state, + &mut builder, + &node_set_selector_hints, + )?; + self.reconfigure_logs( + observed_cluster_state, + &mut builder, + node_set_selector_hints, + )?; if let Some(logs) = builder.build_if_modified() { self.logs_write_in_progress = Some(logs.version()); @@ -579,9 +639,14 @@ impl LogsControllerInner { &mut self, observed_cluster_state: &ObservedClusterState, logs_builder: &mut LogsBuilder, + node_set_selector_hints: impl NodeSetSelectorHints, ) -> Result<()> { for log_state in self.logs_state.values_mut() { - log_state.try_provisioning(observed_cluster_state, logs_builder)?; + log_state.try_provisioning( + observed_cluster_state, + logs_builder, + &node_set_selector_hints, + )?; } Ok(()) @@ -591,6 +656,7 @@ impl LogsControllerInner { &mut self, observed_cluster_state: &ObservedClusterState, logs_builder: &mut LogsBuilder, + node_set_selector_hints: impl NodeSetSelectorHints, ) -> Result<()> { for (log_id, log_state) in &mut self.logs_state { log_state.try_reconfiguring( @@ -602,6 +668,7 @@ impl LogsControllerInner { chain_builder.append_segment(seal_lsn, provider_kind, loglet_params) }, + || node_set_selector_hints.preferred_sequencer(log_id), )?; } @@ -787,10 +854,12 @@ impl LogsController { pub fn on_observed_cluster_state_update( &mut self, observed_cluster_state: &ObservedClusterState, + node_set_selector_hints: impl NodeSetSelectorHints, ) -> Result<(), anyhow::Error> { self.inner.on_observed_cluster_state_update( observed_cluster_state, self.effects.as_mut().expect("to be present"), + node_set_selector_hints, )?; self.apply_effects(); @@ -945,3 +1014,52 @@ impl LogsController { } } } + +/// Placement hints for the [`scheduler::Scheduler`] based on the logs configuration +#[derive(derive_more::From)] +pub struct LogsBasedPartitionProcessorPlacementHints<'a> { + logs_controller: &'a LogsController, +} + +impl<'a> scheduler::PartitionProcessorPlacementHints + for LogsBasedPartitionProcessorPlacementHints<'a> +{ + fn preferred_nodes(&self, partition_id: &PartitionId) -> impl Iterator { + let log_id = LogId::from(*partition_id); + + self.logs_controller + .inner + .logs_state + .get(&log_id) + .and_then(|log_state| match log_state { + LogState::Available { configuration, .. } => configuration + .as_ref() + .map(|configuration| itertools::Either::Left(configuration.node_set_iter())), + LogState::Sealing { .. } + | LogState::Sealed { .. } + | LogState::Provisioning { .. } => None, + }) + .unwrap_or_else(|| itertools::Either::Right(iter::empty())) + } + + fn preferred_leader(&self, partition_id: &PartitionId) -> Option { + let log_id = LogId::from(*partition_id); + + self.logs_controller + .inner + .logs_state + .get(&log_id) + .and_then(|log_state| match log_state { + LogState::Available { configuration, .. } => { + configuration.as_ref().and_then(|configuration| { + configuration + .sequencer_node() + .map(GenerationalNodeId::as_plain) + }) + } + LogState::Sealing { .. } + | LogState::Sealed { .. } + | LogState::Provisioning { .. } => None, + }) + } +} diff --git a/crates/admin/src/cluster_controller/scheduler.rs b/crates/admin/src/cluster_controller/scheduler.rs index e4f48e562..6585bc1aa 100644 --- a/crates/admin/src/cluster_controller/scheduler.rs +++ b/crates/admin/src/cluster_controller/scheduler.rs @@ -8,11 +8,9 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use rand::seq::IteratorRandom; -use std::collections::{BTreeMap, BTreeSet}; -use tracing::{debug, trace}; - +use crate::cluster_controller::logs_controller; use crate::cluster_controller::observed_cluster_state::ObservedClusterState; +use rand::seq::IteratorRandom; use restate_core::metadata_store::{ retry_on_network_error, MetadataStoreClient, Precondition, ReadError, ReadWriteError, WriteError, @@ -25,13 +23,20 @@ use restate_types::cluster_controller::{ use restate_types::config::Configuration; use restate_types::identifiers::PartitionId; use restate_types::logs::metadata::Logs; +use restate_types::logs::LogId; use restate_types::metadata_store::keys::SCHEDULING_PLAN_KEY; use restate_types::net::cluster_controller::Action; use restate_types::net::partition_processor_manager::{ ControlProcessor, ControlProcessors, ProcessorCommand, }; +use restate_types::nodes_config::NodesConfiguration; use restate_types::partition_table::PartitionTable; -use restate_types::{GenerationalNodeId, PlainNodeId, Versioned}; +use restate_types::{GenerationalNodeId, NodeId, PlainNodeId, Versioned}; +use std::collections::{BTreeMap, BTreeSet}; +use tracing::{debug, trace}; +use xxhash_rust::xxh3::Xxh3Builder; + +type HashSet = std::collections::HashSet; #[derive(Debug, thiserror::Error)] #[error("failed reading scheduling plan from metadata store: {0}")] @@ -63,6 +68,24 @@ impl UpdateOutcome { } } +/// Placement hints for the [`Scheduler`]. The hints can specify which nodes should be chosen for +/// the partition processor placement and on which node the leader should run. +pub trait PartitionProcessorPlacementHints { + fn preferred_nodes(&self, partition_id: &PartitionId) -> impl Iterator; + + fn preferred_leader(&self, partition_id: &PartitionId) -> Option; +} + +impl PartitionProcessorPlacementHints for &T { + fn preferred_nodes(&self, partition_id: &PartitionId) -> impl Iterator { + (*self).preferred_nodes(partition_id) + } + + fn preferred_leader(&self, partition_id: &PartitionId) -> Option { + (*self).preferred_leader(partition_id) + } +} + pub struct Scheduler { scheduling_plan: SchedulingPlan, @@ -111,9 +134,19 @@ impl Scheduler { pub async fn on_observed_cluster_state( &mut self, observed_cluster_state: &ObservedClusterState, + nodes_config: &NodesConfiguration, + placement_hints: impl PartitionProcessorPlacementHints, ) -> Result<(), Error> { // todo: Only update scheduling plan on observed cluster changes? - self.update_scheduling_plan(observed_cluster_state).await?; + let alive_workers = observed_cluster_state + .alive_nodes + .keys() + .cloned() + .filter(|node_id| nodes_config.has_worker_role(node_id)) + .collect(); + + self.update_scheduling_plan(&alive_workers, nodes_config, placement_hints) + .await?; self.instruct_nodes(observed_cluster_state)?; Ok(()) @@ -172,12 +205,14 @@ impl Scheduler { async fn update_scheduling_plan( &mut self, - observed_cluster_state: &ObservedClusterState, + alive_workers: &HashSet, + nodes_config: &NodesConfiguration, + placement_hints: impl PartitionProcessorPlacementHints, ) -> Result<(), Error> { let mut builder = self.scheduling_plan.clone().into_builder(); - self.ensure_replication(&mut builder, observed_cluster_state); - self.ensure_leadership(&mut builder); + self.ensure_replication(&mut builder, alive_workers, nodes_config, &placement_hints); + self.ensure_leadership(&mut builder, placement_hints); if let Some(scheduling_plan) = builder.build_if_modified() { let scheduling_plan = self @@ -229,11 +264,12 @@ impl Scheduler { fn ensure_replication( &self, scheduling_plan_builder: &mut SchedulingPlanBuilder, - observed_cluster_state: &ObservedClusterState, + alive_workers: &HashSet, + nodes_config: &NodesConfiguration, + placement_hints: impl PartitionProcessorPlacementHints, ) { let partition_ids: Vec<_> = scheduling_plan_builder.partition_ids().cloned().collect(); - let alive_nodes: BTreeSet<_> = observed_cluster_state.alive_nodes.keys().cloned().collect(); let mut rng = rand::thread_rng(); for partition_id in &partition_ids { @@ -242,15 +278,15 @@ impl Scheduler { match target_state.replication_strategy { ReplicationStrategy::OnAllNodes => { - if target_state.node_set != alive_nodes { - target_state.node_set.clone_from(&alive_nodes); + if target_state.node_set != *alive_workers { + target_state.node_set.clone_from(alive_workers); modified = true; } } ReplicationStrategy::Factor(replication_factor) => { // only retain alive nodes => remove dead ones target_state.node_set.retain(|node| { - let result = alive_nodes.contains(node); + let result = alive_workers.contains(node); modified |= !result; result }); @@ -258,16 +294,36 @@ impl Scheduler { let replication_factor = usize::try_from(replication_factor.get()) .expect("u32 should fit into usize"); + if target_state.node_set.len() == replication_factor { + return modified; + } + + let preferred_worker_nodes = placement_hints + .preferred_nodes(partition_id) + .filter(|node_id| nodes_config.has_worker_role(node_id)); + let preferred_leader = placement_hints + .preferred_leader(partition_id) + .and_then(|node_id| { + if alive_workers.contains(&node_id) { + Some(node_id) + } else { + None + } + }); + // if we are under replicated and have other alive nodes available if target_state.node_set.len() < replication_factor - && target_state.node_set.len() < alive_nodes.len() + && target_state.node_set.len() < alive_workers.len() { - // randomly choose from the available set of nodes + if let Some(preferred_leader) = preferred_leader { + modified |= !target_state.node_set.contains(&preferred_leader); + target_state.node_set.insert(preferred_leader); + } + // todo: Implement cleverer strategies - let new_nodes = alive_nodes - .iter() - .filter(|node| !target_state.node_set.contains(*node)) - .cloned() + // randomly choose from the preferred workers nodes first + let new_nodes = preferred_worker_nodes + .filter(|node_id| !target_state.node_set.contains(node_id)) .choose_multiple( &mut rng, replication_factor - target_state.node_set.len(), @@ -275,14 +331,58 @@ impl Scheduler { modified |= !new_nodes.is_empty(); target_state.node_set.extend(new_nodes); + + if target_state.node_set.len() < replication_factor { + // randomly choose from the remaining worker nodes + let new_nodes = alive_workers + .iter() + .filter(|node| !target_state.node_set.contains(*node)) + .cloned() + .choose_multiple( + &mut rng, + replication_factor - target_state.node_set.len(), + ); + + modified |= !new_nodes.is_empty(); + target_state.node_set.extend(new_nodes); + } } else if target_state.node_set.len() > replication_factor { - for node_id in target_state.node_set.iter().cloned().choose_multiple( - &mut rng, - replication_factor - target_state.node_set.len(), - ) { + let preferred_worker_nodes: HashSet = + preferred_worker_nodes.cloned().collect(); + + // first remove the not preferred nodes + for node_id in target_state + .node_set + .iter() + .filter(|node_id| { + !preferred_worker_nodes.contains(node_id) + && Some(**node_id) != preferred_leader + }) + .cloned() + .choose_multiple( + &mut rng, + replication_factor - target_state.node_set.len(), + ) + { target_state.node_set.remove(&node_id); modified = true; } + + if target_state.node_set.len() > replication_factor { + for node_id in target_state + .node_set + .iter() + .filter(|node_id| Some(**node_id) != preferred_leader) + .cloned() + .choose_multiple( + &mut rng, + replication_factor - target_state.node_set.len(), + ) + { + target_state.node_set.remove(&node_id); + modified = true; + } + } } } } @@ -300,14 +400,26 @@ impl Scheduler { } } - fn ensure_leadership(&self, scheduling_plan_builder: &mut SchedulingPlanBuilder) { + fn ensure_leadership( + &self, + scheduling_plan_builder: &mut SchedulingPlanBuilder, + placement_hints: impl PartitionProcessorPlacementHints, + ) { let partition_ids: Vec<_> = scheduling_plan_builder.partition_ids().cloned().collect(); for partition_id in partition_ids { scheduling_plan_builder.modify_partition(&partition_id, |target_state| { + let preferred_leader = placement_hints.preferred_leader(&partition_id); if target_state.leader.is_none() { - target_state.leader = self.select_leader_from(&target_state.node_set); + target_state.leader = + self.select_leader_from(&target_state.node_set, preferred_leader); // check whether we modified the leader return target_state.leader.is_some(); + } else if preferred_leader.is_some_and(|preferred_leader| { + Some(preferred_leader) != target_state.leader + && target_state.node_set.contains(&preferred_leader) + }) { + target_state.leader = preferred_leader; + return true; } false @@ -315,10 +427,18 @@ impl Scheduler { } } - fn select_leader_from(&self, leader_candidates: &BTreeSet) -> Option { + fn select_leader_from( + &self, + leader_candidates: &HashSet, + preferred_leader: Option, + ) -> Option { // todo: Implement leader balancing between nodes - let mut rng = rand::thread_rng(); - leader_candidates.iter().choose(&mut rng).cloned() + preferred_leader + .filter(|leader| leader_candidates.contains(leader)) + .or_else(|| { + let mut rng = rand::thread_rng(); + leader_candidates.iter().choose(&mut rng).cloned() + }) } fn instruct_nodes(&self, observed_cluster_state: &ObservedClusterState) -> Result<(), Error> { @@ -400,6 +520,30 @@ impl Scheduler { } } +/// Placement hints for the [`logs_controller::LogsController`] based on the current +/// [`SchedulingPlan`]. +pub struct SchedulingPlanNodeSetSelectorHints<'a> { + scheduling_plan: &'a SchedulingPlan, +} + +impl<'a, T> From<&'a Scheduler> for SchedulingPlanNodeSetSelectorHints<'a> { + fn from(value: &'a Scheduler) -> Self { + Self { + scheduling_plan: &value.scheduling_plan, + } + } +} + +impl<'a> logs_controller::NodeSetSelectorHints for SchedulingPlanNodeSetSelectorHints<'a> { + fn preferred_sequencer(&self, log_id: &LogId) -> Option { + let partition_id = PartitionId::from(*log_id); + + self.scheduling_plan + .get(&partition_id) + .and_then(|target_state| target_state.leader.map(Into::into)) + } +} + #[cfg(test)] mod tests { use futures::StreamExt; @@ -408,7 +552,8 @@ mod tests { use http::Uri; use rand::prelude::ThreadRng; use rand::Rng; - use std::collections::{BTreeMap, BTreeSet}; + use std::collections::BTreeMap; + use std::iter; use std::num::NonZero; use std::time::Duration; use test_log::test; @@ -416,9 +561,11 @@ mod tests { use tokio_stream::wrappers::ReceiverStream; use crate::cluster_controller::observed_cluster_state::ObservedClusterState; - use crate::cluster_controller::scheduler::Scheduler; + use crate::cluster_controller::scheduler::{ + HashSet, PartitionProcessorPlacementHints, Scheduler, + }; use restate_core::network::{ForwardingHandler, Incoming, MessageCollectorMockConnector}; - use restate_core::{TaskCenterBuilder, TestCoreEnv, TestCoreEnvBuilder}; + use restate_core::{metadata, TaskCenterBuilder, TestCoreEnv, TestCoreEnvBuilder}; use restate_types::cluster::cluster_state::{ AliveNode, ClusterState, DeadNode, NodeState, PartitionProcessorStatus, RunMode, }; @@ -434,6 +581,21 @@ mod tests { use restate_types::time::MillisSinceEpoch; use restate_types::{GenerationalNodeId, PlainNodeId, Version}; + struct NoPlacementHints; + + impl PartitionProcessorPlacementHints for NoPlacementHints { + fn preferred_nodes( + &self, + _partition_id: &PartitionId, + ) -> impl Iterator { + iter::empty() + } + + fn preferred_leader(&self, _partition_id: &PartitionId) -> Option { + None + } + } + #[test(tokio::test)] async fn empty_leadership_changes_dont_modify_plan() -> googletest::Result<()> { let test_env = TestCoreEnv::create_with_single_node(0, 0).await; @@ -458,7 +620,11 @@ mod tests { let observed_cluster_state = ObservedClusterState::default(); scheduler - .on_observed_cluster_state(&observed_cluster_state) + .on_observed_cluster_state( + &observed_cluster_state, + &metadata().nodes_config_ref(), + NoPlacementHints, + ) .await?; let scheduling_plan = metadata_store_client @@ -574,7 +740,11 @@ mod tests { observed_cluster_state.update(&cluster_state); scheduler - .on_observed_cluster_state(&observed_cluster_state) + .on_observed_cluster_state( + &observed_cluster_state, + &metadata().nodes_config_ref(), + NoPlacementHints, + ) .await?; // collect all control messages from the network to build up the effective scheduling plan let control_messages = control_recv @@ -596,7 +766,7 @@ mod tests { matches_scheduling_plan(&target_scheduling_plan) ); - let alive_nodes: BTreeSet<_> = cluster_state + let alive_nodes: HashSet<_> = cluster_state .alive_nodes() .map(|node| node.generational_node_id.as_plain()) .collect(); diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index b48266b7d..1a9cd250c 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -47,9 +47,11 @@ use restate_types::protobuf::common::AdminStatus; use restate_types::{GenerationalNodeId, Version}; use super::cluster_state_refresher::{ClusterStateRefresher, ClusterStateWatcher}; -use crate::cluster_controller::logs_controller::LogsController; +use crate::cluster_controller::logs_controller::{ + LogsBasedPartitionProcessorPlacementHints, LogsController, +}; use crate::cluster_controller::observed_cluster_state::ObservedClusterState; -use crate::cluster_controller::scheduler::Scheduler; +use crate::cluster_controller::scheduler::{Scheduler, SchedulingPlanNodeSetSelectorHints}; #[derive(Debug, thiserror::Error, CodedError)] pub enum Error { @@ -286,6 +288,7 @@ impl Service { let mut partition_table_watcher = self.metadata.watch(MetadataKind::PartitionTable); let mut logs = self.metadata.updateable_logs_metadata(); let mut partition_table = self.metadata.updateable_partition_table(); + let mut nodes_config = self.metadata.updateable_nodes_config(); self.health_status.update(AdminStatus::Ready); @@ -304,8 +307,8 @@ impl Service { } Ok(cluster_state) = cluster_state_watcher.next_cluster_state() => { observed_cluster_state.update(&cluster_state); - logs_controller.on_observed_cluster_state_update(&observed_cluster_state)?; - scheduler.on_observed_cluster_state(&observed_cluster_state).await?; + logs_controller.on_observed_cluster_state_update(&observed_cluster_state, SchedulingPlanNodeSetSelectorHints::from(&scheduler))?; + scheduler.on_observed_cluster_state(&observed_cluster_state, nodes_config.live_load(), LogsBasedPartitionProcessorPlacementHints::from(&logs_controller)).await?; } result = logs_controller.run_async_operations() => { result?; diff --git a/crates/types/src/cluster_controller.rs b/crates/types/src/cluster_controller.rs index 9e6bdfe53..a57af3bb5 100644 --- a/crates/types/src/cluster_controller.rs +++ b/crates/types/src/cluster_controller.rs @@ -13,9 +13,10 @@ use crate::identifiers::{PartitionId, PartitionKey}; use crate::partition_table::PartitionTable; use crate::{flexbuffers_storage_encode_decode, PlainNodeId, Version, Versioned}; use serde_with::serde_as; -use std::collections::{BTreeMap, BTreeSet}; +use std::collections::{BTreeMap, HashSet}; use std::num::NonZero; use std::ops::RangeInclusive; +use xxhash_rust::xxh3::Xxh3Builder; /// Replication strategy for partition processors. #[derive(Debug, Copy, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -106,7 +107,7 @@ pub struct TargetPartitionState { /// Node which is the designated leader pub leader: Option, /// Set of nodes that should run a partition processor for this partition - pub node_set: BTreeSet, + pub node_set: HashSet, pub replication_strategy: ReplicationStrategy, } @@ -119,7 +120,7 @@ impl TargetPartitionState { partition_key_range, replication_strategy, leader: None, - node_set: BTreeSet::default(), + node_set: HashSet::default(), } } diff --git a/crates/types/src/nodes_config.rs b/crates/types/src/nodes_config.rs index 3a079ea99..b7ef3bb64 100644 --- a/crates/types/src/nodes_config.rs +++ b/crates/types/src/nodes_config.rs @@ -217,6 +217,13 @@ impl NodesConfiguration { }) } + pub fn has_worker_role(&self, node_id: &PlainNodeId) -> bool { + self.nodes.get(node_id).is_some_and(|maybe| match maybe { + MaybeNode::Node(node) => node.has_role(Role::Worker), + _ => false, + }) + } + pub fn iter(&self) -> impl Iterator { self.nodes.iter().filter_map(|(k, v)| { if let MaybeNode::Node(node) = v {