Skip to content

Commit

Permalink
use tokio adapter and wrapper instead of writing one
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitryDodzin committed Jan 2, 2025
1 parent 2effec1 commit 19d2bda
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 47 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions mirrord/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ tempfile.workspace = true
rcgen.workspace = true
rustls-pemfile.workspace = true
tokio-rustls.workspace = true
tokio-stream = { workspace = true, features = ["net"] }
tokio-stream = { workspace = true, features = ["io-util", "net"] }
regex.workspace = true
mid = "3.0.0"
rand.workspace = true
pin-project-lite = "0.2"


[target.'cfg(target_os = "macos")'.dependencies]
mirrord-sip = { path = "../sip" }
Expand Down
55 changes: 11 additions & 44 deletions mirrord/cli/src/container/sidecar.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,11 @@
use std::{
net::SocketAddr,
ops::Not,
pin::Pin,
process::Stdio,
task::{Context, Poll},
time::Duration,
};
use std::{net::SocketAddr, ops::Not, process::Stdio, time::Duration};

use mirrord_config::{internal_proxy::MIRRORD_INTPROXY_CONTAINER_MODE_ENV, LayerConfig};
use pin_project_lite::pin_project;
use tokio::{
io::{AsyncBufReadExt, BufReader, Lines},
io::{AsyncBufReadExt, BufReader},
process::{ChildStderr, ChildStdout, Command},
};
use tokio_stream::Stream;
use tokio_stream::{wrappers::LinesStream, StreamExt};
use tracing::Level;

use crate::{
Expand Down Expand Up @@ -121,39 +113,14 @@ impl Sidecar {
.parse()
.map_err(ContainerError::UnableParseProxySocketAddr)?;

Ok((internal_proxy_addr, SidecarLogs { stdout, stderr }))
Ok((
internal_proxy_addr,
LinesStream::new(stdout).merge(LinesStream::new(stderr)),
))
}
}

// The use pin_project is to have a simple wrapper instead of unpining and repining for accessing
// `Pin<&mut Lines<BufReader<ChildStdout>>>` and `Pin<&mut Lines<BufReader<ChildStderr>>>` as in
// accordance to their underlying `Stream` impl.
pin_project! {
#[derive(Debug)]
pub(crate) struct SidecarLogs {
#[pin]
pub stdout: Lines<BufReader<ChildStdout>>,
#[pin]
pub stderr: Lines<BufReader<ChildStderr>>,
}
}

impl Stream for SidecarLogs {
type Item = Result<String, std::io::Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();

if let result @ Poll::Ready(Some(_)) = this
.stdout
.poll_next_line(cx)
.map(|result| result.transpose())
{
return result;
}

this.stderr
.poll_next_line(cx)
.map(|result| result.transpose())
}
}
type SidecarLogs = tokio_stream::adapters::Merge<
LinesStream<BufReader<ChildStdout>>,
LinesStream<BufReader<ChildStderr>>,
>;

0 comments on commit 19d2bda

Please sign in to comment.