Skip to content

Commit

Permalink
Let auto-provision call method instead of going through a grpc call
Browse files Browse the repository at this point in the history
  • Loading branch information
tillrohrmann committed Jan 7, 2025
1 parent e1971c7 commit b506fc6
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 178 deletions.
4 changes: 2 additions & 2 deletions crates/local-cluster-runner/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use regex::{Regex, RegexSet};
use restate_core::network::net_util::create_tonic_channel_from_advertised_address;
use restate_core::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient;
use restate_core::protobuf::node_ctl_svc::ProvisionClusterRequest as ProtoProvisionClusterRequest;
use restate_types::logs::metadata::DefaultProvider;
use restate_types::logs::metadata::ProviderConfiguration;
use restate_types::partition_table::ReplicationStrategy;
use restate_types::retries::RetryPolicy;
use restate_types::{
Expand Down Expand Up @@ -759,7 +759,7 @@ impl StartedNode {
&self,
num_partitions: Option<NonZeroU16>,
placement_strategy: Option<ReplicationStrategy>,
log_provider: Option<DefaultProvider>,
log_provider: Option<ProviderConfiguration>,
) -> anyhow::Result<bool> {
let channel = create_tonic_channel_from_advertised_address(
self.node_address().clone(),
Expand Down
196 changes: 156 additions & 40 deletions crates/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ mod network_server;
mod roles;

use anyhow::Context;
use tonic::{Code, IntoRequest};
use bytestring::ByteString;
use prost_dto::IntoProto;
use std::num::NonZeroU16;
use tracing::{debug, error, info, trace, warn};

use crate::cluster_marker::ClusterValidationError;
Expand All @@ -23,31 +25,35 @@ use crate::network_server::NetworkServer;
use crate::roles::{AdminRole, BaseRole, IngressRole, WorkerRole};
use codederror::CodedError;
use restate_bifrost::BifrostService;
use restate_core::metadata_store::ReadWriteError;
use restate_core::network::net_util::create_tonic_channel_from_advertised_address;
use restate_core::metadata_store::{
retry_on_network_error, Precondition, ReadWriteError, WriteError,
};
use restate_core::network::{
GrpcConnector, MessageRouterBuilder, NetworkServerBuilder, Networking,
};
use restate_core::partitions::{spawn_partition_routing_refresher, PartitionRoutingRefresher};
use restate_core::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient;
use restate_core::protobuf::node_ctl_svc::ProvisionClusterRequest;
use restate_core::{cancellation_watcher, Metadata, TaskKind};
use restate_core::{spawn_metadata_manager, MetadataBuilder, MetadataManager, TaskCenter};
#[cfg(feature = "replicated-loglet")]
use restate_log_server::LogServerService;
use restate_metadata_store::local::LocalMetadataStoreService;
use restate_metadata_store::MetadataStoreClient;
use restate_types::config::Configuration;
use restate_types::config::{CommonOptions, Configuration};
use restate_types::errors::GenericError;
use restate_types::health::Health;
use restate_types::live::Live;
use restate_types::logs::metadata::{Logs, LogsConfiguration, ProviderConfiguration};
#[cfg(feature = "replicated-loglet")]
use restate_types::logs::RecordCache;
use restate_types::nodes_config::Role;
use restate_types::metadata_store::keys::{BIFROST_CONFIG_KEY, PARTITION_TABLE_KEY};
use restate_types::nodes_config::{LogServerConfig, NodeConfig, NodesConfiguration, Role};
use restate_types::partition_table::{PartitionTable, PartitionTableBuilder, ReplicationStrategy};
use restate_types::protobuf::common::{
AdminStatus, IngressStatus, LogServerStatus, MetadataServerStatus, NodeRpcStatus, NodeStatus,
WorkerStatus,
};
use restate_types::storage::StorageEncode;
use restate_types::{GenerationalNodeId, Version, Versioned};

#[derive(Debug, thiserror::Error, CodedError)]
pub enum Error {
Expand Down Expand Up @@ -359,45 +365,28 @@ impl Node {

if config.common.allow_bootstrap {
TaskCenter::spawn(TaskKind::SystemBoot, "auto-provision-cluster", {
let channel = create_tonic_channel_from_advertised_address(
config.common.advertised_address.clone(),
&config.networking,
);
let client = NodeCtlSvcClient::new(channel);
let retry_policy = config.common.network_error_retry_policy.clone();
let cluster_configuration = ClusterConfiguration::from_configuration(&config);
let metadata_store_client = self.metadata_store_client.clone();
let common_opts = config.common.clone();
async move {
let response = retry_policy
.retry_if(
|| {
let mut client = client.clone();
async move {
client
.provision_cluster(
ProvisionClusterRequest {
dry_run: false,
..Default::default()
}
.into_request(),
)
.await
}
},
|status| status.code() == Code::Unavailable,
)
.await;
let response = provision_cluster_metadata(
&metadata_store_client,
&common_opts,
&cluster_configuration,
)
.await;

match response {
Ok(response) => {
let response = response.into_inner();
debug_assert!(!response.dry_run, "Provision w/o dry run");
}
Err(err) => {
if err.code() == Code::AlreadyExists {
debug!("The cluster is already provisioned.")
Ok(provisioned) => {
if provisioned {
info!("Auto provisioned cluster '{}'.", common_opts.cluster_name());
} else {
warn!("Failed to auto provision the cluster. In order to continue you have to provision the cluster manually: {err}");
debug!("The cluster is already provisioned.");
}
}
Err(err) => {
warn!("Failed to auto provision the cluster. In order to continue you have to provision the cluster manually: {err}");
}
}

Ok(())
Expand Down Expand Up @@ -551,6 +540,133 @@ impl Node {
}
}

#[derive(Clone, Debug, IntoProto)]
#[proto(target = "restate_types::protobuf::cluster::ClusterConfiguration")]
pub struct ClusterConfiguration {
#[into_proto(map = "num_partitions_to_u32")]
pub num_partitions: NonZeroU16,
#[proto(required)]
pub replication_strategy: ReplicationStrategy,
#[proto(required)]
pub default_provider: ProviderConfiguration,
}

fn num_partitions_to_u32(num_partitions: NonZeroU16) -> u32 {
u32::from(num_partitions.get())
}

impl ClusterConfiguration {
pub fn from_configuration(configuration: &Configuration) -> Self {
ClusterConfiguration {
num_partitions: configuration.common.bootstrap_num_partitions,
replication_strategy: ReplicationStrategy::default(),
default_provider: ProviderConfiguration::from_configuration(configuration),
}
}
}

/// Provision the cluster metadata. Returns `true` if the cluster was newly provisioned. Returns
/// `false` if the cluster is already provisioned.
///
/// This method returns an error if any of the initial metadata couldn't be written to the
/// metadata store. In this case, the method does not try to clean the already written metadata
/// up. Instead, the caller can retry to complete the provisioning.
async fn provision_cluster_metadata(
metadata_store_client: &MetadataStoreClient,
common_opts: &CommonOptions,
cluster_configuration: &ClusterConfiguration,
) -> anyhow::Result<bool> {
let (initial_nodes_configuration, initial_partition_table, initial_logs) =
generate_initial_metadata(common_opts, cluster_configuration);

let result = retry_on_network_error(common_opts.network_error_retry_policy.clone(), || {
metadata_store_client.provision(&initial_nodes_configuration)
})
.await?;

retry_on_network_error(common_opts.network_error_retry_policy.clone(), || {
write_initial_value_dont_fail_if_it_exists(
metadata_store_client,
PARTITION_TABLE_KEY.clone(),
&initial_partition_table,
)
})
.await
.context("failed provisioning the initial partition table")?;

retry_on_network_error(common_opts.network_error_retry_policy.clone(), || {
write_initial_value_dont_fail_if_it_exists(
metadata_store_client,
BIFROST_CONFIG_KEY.clone(),
&initial_logs,
)
})
.await
.context("failed provisioning the initial logs")?;

Ok(result)
}

fn create_initial_nodes_configuration(common_opts: &CommonOptions) -> NodesConfiguration {
let mut initial_nodes_configuration =
NodesConfiguration::new(Version::MIN, common_opts.cluster_name().to_owned());
let node_config = NodeConfig::new(
common_opts.node_name().to_owned(),
common_opts
.force_node_id
.map(|force_node_id| force_node_id.with_generation(1))
.unwrap_or(GenerationalNodeId::INITIAL_NODE_ID),
common_opts.advertised_address.clone(),
common_opts.roles,
LogServerConfig::default(),
);
initial_nodes_configuration.upsert_node(node_config);
initial_nodes_configuration
}

fn generate_initial_metadata(
common_opts: &CommonOptions,
cluster_configuration: &ClusterConfiguration,
) -> (NodesConfiguration, PartitionTable, Logs) {
let mut initial_partition_table_builder = PartitionTableBuilder::default();
initial_partition_table_builder
.with_equally_sized_partitions(cluster_configuration.num_partitions.get())
.expect("Empty partition table should not have conflicts");
initial_partition_table_builder
.set_replication_strategy(cluster_configuration.replication_strategy);
let initial_partition_table = initial_partition_table_builder.build();

let initial_logs = Logs::with_logs_configuration(LogsConfiguration::from(
cluster_configuration.default_provider.clone(),
));

let initial_nodes_configuration = create_initial_nodes_configuration(common_opts);

(
initial_nodes_configuration,
initial_partition_table,
initial_logs,
)
}

async fn write_initial_value_dont_fail_if_it_exists<T: Versioned + StorageEncode>(
metadata_store_client: &MetadataStoreClient,
key: ByteString,
initial_value: &T,
) -> Result<(), WriteError> {
match metadata_store_client
.put(key, initial_value, Precondition::DoesNotExist)
.await
{
Ok(_) => Ok(()),
Err(WriteError::FailedPrecondition(_)) => {
// we might have failed on a previous attempt after writing this value; so let's continue
Ok(())
}
Err(err) => Err(err),
}
}

#[cfg(not(feature = "replicated-loglet"))]
fn warn_if_log_store_left_artifacts(config: &Configuration) {
if config.log_server.data_dir().exists() {
Expand Down
Loading

0 comments on commit b506fc6

Please sign in to comment.