diff --git a/src/drivers/cloudwatch.rs b/src/drivers/cloudwatch.rs index 5824223..c51e771 100644 --- a/src/drivers/cloudwatch.rs +++ b/src/drivers/cloudwatch.rs @@ -2,6 +2,7 @@ use crate::types::{LogDriver, Message}; use anyhow::Result; use async_trait::async_trait; use aws_sdk_cloudwatchlogs::{ + error::SdkError, operation::{ create_log_group::CreateLogGroupError, create_log_stream::CreateLogStreamError, put_log_events::PutLogEventsError, @@ -33,34 +34,36 @@ impl CloudWatchDriver { .set_log_group_name(Some(group_name.to_owned())) .send() .await + .map_err(SdkError::into_service_error) { Ok(_) => { - info!("created log group: {}", group_name); + info!(?group_name, "created log group"); + } + Err(CreateLogGroupError::ResourceAlreadyExistsException(_)) => { + info!(?group_name, "log group already exists"); + } + Err(e) => { + error!(?group_name, "failed to create log group: {e:?}"); + return Err(e.into()); } - Err(e) => match e.into_service_error() { - CreateLogGroupError::ResourceAlreadyExistsException(_) => {} - inner_err => { - error!(?group_name, "failed to create log group: {:?}", inner_err); - } - }, } self.groups.insert(group_name.to_owned()); - match self + if let Err(e) = self .client .put_retention_policy() .log_group_name(group_name) .retention_in_days(90) .send() .await + .map_err(SdkError::into_service_error) { - Ok(_) => {} - Err(e) => { - error!(?group_name, "failed to set retention policy: {:?}", e); - } + warn!(?group_name, "failed to set retention policy: {:?}", e); } + return Ok(()); } + async fn create_stream(&mut self, group_name: &str, stream_name: &str) -> Result<()> { match self .client @@ -69,44 +72,45 @@ impl CloudWatchDriver { .set_log_stream_name(Some(stream_name.to_owned())) .send() .await + .map_err(SdkError::into_service_error) { Ok(_) => { info!("created log stream: {}", stream_name); } - Err(e) => match e.into_service_error() { - CreateLogStreamError::ResourceAlreadyExistsException(_) => { - info!("caching log stream: {} (other node created)", stream_name); - } - inner_err => { - error!( - ?group_name, - ?stream_name, - "failed to create log stream: {:?}", - inner_err - ); - return Err(inner_err.into()); - } - }, + + Err(CreateLogStreamError::ResourceAlreadyExistsException(_)) => { + info!("caching log stream: {} (other node created)", stream_name); + } + + Err(e) => { + error!( + ?group_name, + ?stream_name, + "failed to create log stream: {e:?}", + ); + return Err(e.into()); + } } self.streams.insert(stream_name.to_owned()); return Ok(()); } + async fn check_or_create_group(&mut self, group_name: &str) -> Result<()> { if !self.groups.contains(group_name) { self.create_group(group_name).await?; } - return Ok(()); + Ok(()) } async fn check_or_create_stream(&mut self, group_name: &str, stream_name: &str) -> Result<()> { if !self.streams.contains(stream_name) { self.create_stream(group_name, stream_name).await?; } - return Ok(()); + Ok(()) } async fn check_or_create(&mut self, group_name: &str, stream_name: &str) -> Result<()> { self.check_or_create_group(group_name).await?; self.check_or_create_stream(group_name, stream_name).await?; - return Ok(()); + Ok(()) } }