Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tracking and enforcing worker memory limits #653

Merged
merged 3 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ jobs:
SCCACHE_GHA_ENABLED: "true"
RUSTC_WRAPPER: "sccache"
run: cargo make cli-tests
timeout-minutes: 20
timeout-minutes: 30
sharding-tests:
runs-on: ubuntu-latest-large
steps:
Expand Down
28 changes: 22 additions & 6 deletions golem-worker-executor-base/src/durable_host/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use golem_common::model::oplog::{OplogEntry, OplogIndex, UpdateDescription, Wrap
use golem_common::model::regions::{DeletedRegions, OplogRegion};
use golem_common::model::{
AccountId, CallingConvention, ComponentId, ComponentVersion, FailedUpdateRecord,
IdempotencyKey, OwnedWorkerId, ScanCursor, ScheduledAction, SuccessfulUpdateRecord,
IdempotencyKey, OwnedWorkerId, ScanCursor, ScheduledAction, SuccessfulUpdateRecord, Timestamp,
WorkerFilter, WorkerId, WorkerMetadata, WorkerStatus, WorkerStatusRecord,
};
use golem_wasm_rpc::wasmtime::ResourceStore;
Expand Down Expand Up @@ -248,16 +248,24 @@ impl<Ctx: WorkerCtx> DurableWorkerCtx<Ctx> {
let mut execution_status = self.execution_status.write().unwrap();
let current_execution_status = execution_status.clone();
match current_execution_status {
ExecutionStatus::Running { last_known_status } => {
*execution_status = ExecutionStatus::Suspended { last_known_status };
ExecutionStatus::Running {
last_known_status, ..
} => {
*execution_status = ExecutionStatus::Suspended {
last_known_status,
timestamp: Timestamp::now_utc(),
};
}
ExecutionStatus::Suspended { .. } => {}
ExecutionStatus::Interrupting {
await_interruption,
last_known_status,
..
} => {
*execution_status = ExecutionStatus::Suspended { last_known_status };
*execution_status = ExecutionStatus::Suspended {
last_known_status,
timestamp: Timestamp::now_utc(),
};
await_interruption.send(()).ok();
}
}
Expand All @@ -268,8 +276,13 @@ impl<Ctx: WorkerCtx> DurableWorkerCtx<Ctx> {
let current_execution_status = execution_status.clone();
match current_execution_status {
ExecutionStatus::Running { .. } => {}
ExecutionStatus::Suspended { last_known_status } => {
*execution_status = ExecutionStatus::Running { last_known_status };
ExecutionStatus::Suspended {
last_known_status, ..
} => {
*execution_status = ExecutionStatus::Running {
last_known_status,
timestamp: Timestamp::now_utc(),
};
}
ExecutionStatus::Interrupting { .. } => {}
}
Expand Down Expand Up @@ -967,6 +980,8 @@ impl<Ctx: WorkerCtx + DurableWorkerCtxView<Ctx>> ExternalOperations<Ctx> for Dur
let start = Instant::now();
let mut count = 0;

store.as_context_mut().data_mut().set_running();

// Handle the case when recovery immediately starts in a deleted region
// (for example due to a manual update)
store
Expand Down Expand Up @@ -1101,6 +1116,7 @@ impl<Ctx: WorkerCtx + DurableWorkerCtxView<Ctx>> ExternalOperations<Ctx> for Dur
debug!("Retrying prepare_instance after failed update attempt");
Ok(final_decision)
} else {
store.as_context_mut().data_mut().set_suspended();
debug!("Finished prepare_instance");
result.map_err(|err| GolemError::failed_to_resume_worker(worker_id.clone(), err))
}
Expand Down
5 changes: 1 addition & 4 deletions golem-worker-executor-base/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,6 @@ impl<Ctx: WorkerCtx, Svcs: HasAll<Ctx> + UsesAllDeps<Ctx = Ctx> + Send + Sync +
// Worker does not exist, we still check if it is in the list active workers due to some inconsistency
if let Some((_, worker)) = self
.active_workers()
.enum_workers()
.iter()
.find(|(id, _)| *id == worker_id)
{
Expand Down Expand Up @@ -620,9 +619,7 @@ impl<Ctx: WorkerCtx, Svcs: HasAll<Ctx> + UsesAllDeps<Ctx = Ctx> + Send + Sync +

self.shard_service().revoke_shards(&shard_ids);

let workers = self.active_workers().enum_workers();

for (worker_id, worker_details) in workers {
for (worker_id, worker_details) in self.active_workers().iter() {
if self.shard_service().check_worker(&worker_id).is_err() {
if let Some(mut await_interrupted) = worker_details
.set_interrupting(InterruptKind::Restart)
Expand Down
29 changes: 24 additions & 5 deletions golem-worker-executor-base/src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use wasmtime::Trap;

use golem_common::model::oplog::WorkerError;
use golem_common::model::regions::DeletedRegions;
use golem_common::model::{ShardAssignment, ShardId, WorkerId, WorkerStatusRecord};
use golem_common::model::{ShardAssignment, ShardId, Timestamp, WorkerId, WorkerStatusRecord};

use crate::error::GolemError;
use crate::workerctx::WorkerCtx;
Expand Down Expand Up @@ -125,14 +125,17 @@ impl From<golem_api_grpc::proto::golem::common::ResourceLimits> for CurrentResou
pub enum ExecutionStatus {
Running {
last_known_status: WorkerStatusRecord,
timestamp: Timestamp,
},
Suspended {
last_known_status: WorkerStatusRecord,
timestamp: Timestamp,
},
Interrupting {
interrupt_kind: InterruptKind,
await_interruption: Arc<tokio::sync::broadcast::Sender<()>>,
last_known_status: WorkerStatusRecord,
timestamp: Timestamp,
},
}

Expand All @@ -143,8 +146,12 @@ impl ExecutionStatus {

pub fn last_known_status(&self) -> &WorkerStatusRecord {
match self {
ExecutionStatus::Running { last_known_status } => last_known_status,
ExecutionStatus::Suspended { last_known_status } => last_known_status,
ExecutionStatus::Running {
last_known_status, ..
} => last_known_status,
ExecutionStatus::Suspended {
last_known_status, ..
} => last_known_status,
ExecutionStatus::Interrupting {
last_known_status, ..
} => last_known_status,
Expand All @@ -153,13 +160,25 @@ impl ExecutionStatus {

pub fn set_last_known_status(&mut self, status: WorkerStatusRecord) {
match self {
ExecutionStatus::Running { last_known_status } => *last_known_status = status,
ExecutionStatus::Suspended { last_known_status } => *last_known_status = status,
ExecutionStatus::Running {
last_known_status, ..
} => *last_known_status = status,
ExecutionStatus::Suspended {
last_known_status, ..
} => *last_known_status = status,
ExecutionStatus::Interrupting {
last_known_status, ..
} => *last_known_status = status,
}
}

pub fn timestamp(&self) -> Timestamp {
match self {
ExecutionStatus::Running { timestamp, .. } => *timestamp,
ExecutionStatus::Suspended { timestamp, .. } => *timestamp,
ExecutionStatus::Interrupting { timestamp, .. } => *timestamp,
}
}
}

/// Describes the various reasons a worker can run into a trap
Expand Down
176 changes: 143 additions & 33 deletions golem-worker-executor-base/src/services/active_workers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,68 +12,178 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cmp::max;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{OwnedSemaphorePermit, Semaphore, TryAcquireError};
use tokio::time::timeout;
use tracing::{debug, Instrument};

use golem_common::cache::{BackgroundEvictionMode, Cache, FullCacheEvictionMode, SimpleCache};
use golem_common::model::WorkerId;
use golem_common::model::{OwnedWorkerId, WorkerId};

use crate::error::GolemError;
use crate::services::golem_config::MemoryConfig;
use crate::services::HasAll;
use crate::worker::Worker;
use crate::workerctx::WorkerCtx;

/// Holds the metadata and wasmtime structures of the active Golem workers
/// Holds the metadata and wasmtime structures of currently active Golem workers
pub struct ActiveWorkers<Ctx: WorkerCtx> {
cache: Cache<WorkerId, (), Arc<Worker<Ctx>>, GolemError>, // TODO: SimpleCache?
workers: Cache<WorkerId, (), Arc<Worker<Ctx>>, GolemError>,
worker_memory: Arc<Semaphore>,
worker_estimate_coefficient: f64,
}

impl<Ctx: WorkerCtx> ActiveWorkers<Ctx> {
pub fn bounded(max_active_workers: usize, drop_when_full: f64, ttl: Duration) -> Self {
let drop_count = max(1, (max_active_workers as f64 * drop_when_full) as usize);
ActiveWorkers {
cache: Cache::new(
Some(max_active_workers),
FullCacheEvictionMode::LeastRecentlyUsed(drop_count),
BackgroundEvictionMode::OlderThan {
ttl,
period: Duration::from_secs(60),
},
"active_workers",
),
}
}

pub fn unbounded() -> Self {
ActiveWorkers {
cache: Cache::new(
pub fn new(memory_config: &MemoryConfig) -> Self {
let worker_memory_size = memory_config.worker_memory();
Self {
workers: Cache::new(
None,
FullCacheEvictionMode::None,
BackgroundEvictionMode::None,
"active_workers",
),
worker_memory: Arc::new(Semaphore::new(worker_memory_size)),
worker_estimate_coefficient: memory_config.worker_estimate_coefficient,
}
}

pub async fn get_with<F>(
pub async fn get_or_add<T>(
&self,
worker_id: &WorkerId,
f: F,
deps: &T,
owned_worker_id: &OwnedWorkerId,
worker_args: Option<Vec<String>>,
worker_env: Option<Vec<(String, String)>>,
component_version: Option<u64>,
parent: Option<WorkerId>,
) -> Result<Arc<Worker<Ctx>>, GolemError>
where
F: FnOnce() -> Pin<Box<dyn Future<Output = Result<Arc<Worker<Ctx>>, GolemError>> + Send>>
+ Send,
T: HasAll<Ctx> + Clone + Send + Sync + 'static,
{
self.cache.get_or_insert_simple(worker_id, f).await
let worker_id = owned_worker_id.worker_id();

let owned_worker_id = owned_worker_id.clone();
let deps = deps.clone();
let worker_estimate_coefficient = self.worker_estimate_coefficient;
self.workers
.get_or_insert_simple(&worker_id, || {
Box::pin(async move {
Ok(Arc::new(
Worker::new(
&deps,
owned_worker_id,
worker_args,
worker_env,
component_version,
parent,
worker_estimate_coefficient,
)
.in_current_span()
.await?,
))
})
})
.await
}

pub fn remove(&self, worker_id: &WorkerId) {
self.cache.remove(worker_id)
self.workers.remove(worker_id);
}

pub fn iter(&self) -> impl Iterator<Item = (WorkerId, Arc<Worker<Ctx>>)> + '_ {
self.workers.iter()
}

pub async fn acquire(&self, memory: u64) -> OwnedSemaphorePermit {
let mem32: u32 = memory
.try_into()
.expect("requested memory size is too large");

loop {
match self.worker_memory.clone().try_acquire_many_owned(mem32) {
Ok(permit) => {
debug!(
"Acquired {} memory of {}",
mem32,
self.worker_memory.available_permits()
);
break permit;
}
Err(TryAcquireError::Closed) => panic!("worker memory semaphore has been closed"),
Err(TryAcquireError::NoPermits) => {
debug!("Not enough memory, trying to free some up");
if self.try_free_up_memory(memory).await {
debug!("Freed up some memory, retrying");
// We have enough memory unless another worker has taken it in the meantime,
// so retry the loop
continue;
} else {
debug!("Could not free up memory, waiting for permits to be available");
// Could not free up enough memory, so waiting for permits to be available.
// We cannot wait forever, because we need to force idle workers out of memory
// if there is a need - but don't want to early drop them if there's no need.
match timeout(
Duration::from_millis(500),
self.worker_memory.clone().acquire_many_owned(mem32),
)
.await
.expect("worker memory semaphore has been closed")
{
Ok(permit) => break permit,
Err(_) => {
debug!("Could not acquire memory in time, retrying");
continue;
}
}
}
}
}
}
}

pub fn enum_workers(&self) -> Vec<(WorkerId, Arc<Worker<Ctx>>)> {
self.cache.iter().collect()
async fn try_free_up_memory(&self, memory: u64) -> bool {
let current_avail = self.worker_memory.available_permits();
let needed = memory.saturating_sub(current_avail as u64);

if needed > 0 {
let mut possibilities = Vec::new();

debug!("Collecting possibilities");
// Collecting the workers which are currently idle but loaded into memory
for (worker_id, worker) in self.workers.iter() {
if worker.is_currently_idle_but_running() {
if let Ok(mem) = worker.memory_requirement().await {
let last_changed = worker.last_execution_state_change().await;
possibilities.push((worker_id, worker, mem, last_changed));
}
}
}

// Sorting them by last time they changed their status - newest first
possibilities
.sort_by_key(|(_worker_id, _worker, _mem, last_changed)| last_changed.to_millis());
possibilities.reverse();

let mut freed = 0;

// Dropping the oldest ones until we have enough memory available - rechecking the idle status before
while freed < needed && !possibilities.is_empty() {
let (worker_id, worker, mem, _) = possibilities.pop().unwrap();

debug!("Trying to stop {worker_id} to free up memory");
if worker.stop_if_idle().await {
debug!("Stopped {worker_id} to free up {mem} memory");
freed += mem;
}
}

debug!("Freed up {freed}");
freed >= needed
} else {
debug!("Memory was freed up in the meantime");
// Memory was freed up in the meantime, we can retry
true
}
}
}
4 changes: 2 additions & 2 deletions golem-worker-executor-base/src/services/golem_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,8 @@ impl MemoryConfig {
})
}

pub fn worker_memory(&self) -> u64 {
(self.system_memory() as f64 * self.worker_memory_ratio) as u64
pub fn worker_memory(&self) -> usize {
(self.total_system_memory() as f64 * self.worker_memory_ratio) as usize
}
}

Expand Down
Loading
Loading